Spaces:
Sleeping
Sleeping
| import argparse | |
| import queue | |
| import time | |
| import threading | |
| import logging | |
| import os | |
| from PIL import Image | |
| from daily import EventHandler, CallClient, Daily | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from auth import get_meeting_token, get_room_name | |
| from pipeline import Pipeline | |
| from device import device, torch_dtype | |
| load_dotenv() | |
| class DailyVision(EventHandler): | |
| def __init__( | |
| self, | |
| room_url, | |
| room_name, | |
| expiration, | |
| idle, | |
| bot_name="Daily Bot" | |
| ): | |
| self.__client = CallClient(event_handler=self) | |
| self.__pipeline = Pipeline | |
| self.__camera = None | |
| self.__time = time.time() | |
| self.__queue = queue.Queue() | |
| self.__app_quit = False | |
| self.__image_buffer = None | |
| self.__bot_name = bot_name | |
| self.__room_url = room_url | |
| self.__room_name = room_name | |
| self.__expiration = expiration | |
| self.__idle = idle | |
| # Create the pipeline (this might take a moment) | |
| self.__pipeline = Pipeline(device, torch_dtype) | |
| #print(self.__pipeline.InputParams.schema()) | |
| # Configure logger | |
| FORMAT = f"%(asctime)s {self.__room_url} %(message)s" | |
| logging.basicConfig(format=FORMAT) | |
| self.logger = logging.getLogger("bot-instance") | |
| self.logger.setLevel(logging.DEBUG) | |
| self.logger.info(f"Expiration timer set to: {self.__expiration}") | |
| # Setup camera | |
| self.setup_camera() | |
| def run(self, meeting_url, token): | |
| # Join | |
| self.logger.info(f"Connecting to room {meeting_url} as {self.__bot_name}") | |
| self.__client.set_user_name(self.__bot_name) | |
| self.__client.join(meeting_url, token, completion=self.on_joined) | |
| #self.__participant_id = self.client.participants()["local"]["id"] | |
| # Start thread | |
| self.__thread = threading.Thread(target = self.process_frames) | |
| self.__thread.start() | |
| # Keep-alive on thread | |
| self.__thread.join() | |
| def leave(self): | |
| self.logger.info(f"Leaving...") | |
| self.__app_quit = True | |
| self.__thread.join() | |
| self.__client.leave() | |
| def on_joined(self, join_data, client_error): | |
| self.logger.info(f"call_joined: {join_data}, {client_error}") | |
| def on_participant_joined(self, participant): | |
| self.logger.info(f"Participant {participant['id']} joined, analyzing frames...") | |
| self.__client.set_video_renderer(participant["id"], self.on_video_frame, color_format="RGB") | |
| # Say hello | |
| self.wave() | |
| def setup_camera(self): | |
| if not self.__camera: | |
| self.__camera = Daily.create_camera_device("camera", | |
| width = 640, | |
| height = 480, | |
| color_format="RGB") | |
| self.__client.update_inputs({ | |
| "camera": { | |
| "isEnabled": True, | |
| "settings": { | |
| "deviceId": "camera" | |
| } | |
| } | |
| }) | |
| def process_frames(self): | |
| params = Pipeline.InputParams() | |
| while not self.__app_quit: | |
| # Is anyone watching? | |
| if not self.__idle and len(self.__client.participants()) < 2: | |
| self.logger.info(f"No partcipants in channel. Exiting...") | |
| self.__app_quit = True | |
| break | |
| # Check expiry timer | |
| if time.time() > self.__expiration: | |
| self.logger.info(f"Expiration timer exceeded. Exiting...") | |
| self.__app_quit = True | |
| break | |
| try: | |
| #video_frame = self.__queue.get(timeout=5) | |
| video_frame = self.__image_buffer | |
| if not video_frame == None: | |
| image = Image.frombytes("RGB", (video_frame.width, video_frame.height), video_frame.buffer) | |
| result_image = self.__pipeline.predict(params, image) | |
| self.__camera.write_frame(result_image.tobytes()) | |
| except queue.Empty: | |
| pass | |
| def on_video_frame(self, participant_id, video_frame): | |
| # Process ~15 frames per second (considering incoming frames at 30fps). | |
| if time.time() - self.__time > 2: #0.05: | |
| self.__time = time.time() | |
| self.__image_buffer = video_frame | |
| #self.__queue.put(video_frame) | |
| def wave(self, emoji="👋"): | |
| self.__client.send_app_message( | |
| { | |
| "event": "sync-emoji-reaction", | |
| "reaction": { | |
| "emoji": emoji, | |
| "room": "main-room", | |
| "sessionId": "bot", | |
| "id": time.time(), | |
| }, | |
| } | |
| ) | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Daily Bot") | |
| # Required args | |
| parser.add_argument("-u", "--url", required=True, type=str, help="URL of the Daily room") | |
| parser.add_argument("-k", "--api_key", required=True, type=str, help="Daily API key") | |
| # Optional args | |
| parser.add_argument("-t", "--private", type=bool, help="Is this room private?", default=True) | |
| parser.add_argument("-n", "--bot-name", type=str, help="Name of the bot", default="Daily Bot") | |
| parser.add_argument("-e", "--expiration", type=int, help="Duration of bot", default=os.getenv("BOT_MAX_DURATION", 300)) | |
| parser.add_argument("-i", "--idle", type=bool, help="Wait for participants to join", default=os.getenv("BOT_WILL_IDLE", False)) | |
| args = parser.parse_args() | |
| Daily.init() | |
| expiration = time.time() + args.expiration | |
| room_name = get_room_name(args.url) | |
| # Retrieve a meeting token, if not provided | |
| #@TODO do room lookup to check privacy | |
| if args.private: | |
| token = get_meeting_token(room_name, args.api_key, expiration) | |
| app = DailyVision(args.url, room_name, expiration, args.idle, args.bot_name) | |
| try : | |
| app.run(args.url, token) | |
| except KeyboardInterrupt: | |
| print("Ctrl-C detected. Exiting!") | |
| finally: | |
| app.leave() | |
| # Let leave finish | |
| time.sleep(2) | |
| if __name__ == '__main__': | |
| main() |