Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Basic Video Producer Example | |
| Demonstrates how to use the LeRobot Arena Python video client for streaming. | |
| This example creates animated video content and streams it to the arena server. | |
| """ | |
| import asyncio | |
| import logging | |
| import time | |
| import numpy as np | |
| # Import the video client | |
| from lerobot_arena_client.video import ( | |
| VideoProducer, | |
| create_producer_client, | |
| ) | |
| async def animated_frame_source() -> np.ndarray | None: | |
| """Create animated frames with colorful patterns""" | |
| # Create a colorful animated frame | |
| height, width = 480, 640 | |
| frame_count = int(time.time() * 30) % 1000 # 30 fps simulation | |
| # Generate animated RGB channels using vectorized operations | |
| time_factor = frame_count * 0.1 | |
| # Create colorful animated patterns | |
| y_coords, x_coords = np.meshgrid(np.arange(width), np.arange(height), indexing="xy") | |
| r = (128 + 127 * np.sin(time_factor + x_coords * 0.01)).astype(np.uint8) | |
| g = (128 + 127 * np.sin(time_factor + y_coords * 0.01)).astype(np.uint8) | |
| b = (128 + 127 * np.sin(time_factor) * np.ones((height, width))).astype(np.uint8) | |
| # Stack into RGB frame | |
| frame = np.stack([r, g, b], axis=2) | |
| # Add a moving circle for visual feedback | |
| center_x = int(320 + 200 * np.sin(frame_count * 0.05)) | |
| center_y = int(240 + 100 * np.cos(frame_count * 0.05)) | |
| # Create circle mask | |
| circle_mask = (x_coords - center_x) ** 2 + (y_coords - center_y) ** 2 < 50**2 | |
| frame[circle_mask] = [255, 255, 0] # Yellow circle | |
| # Add frame counter text overlay | |
| import cv2 | |
| cv2.putText( | |
| frame, | |
| f"Frame {frame_count}", | |
| (20, 50), | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 1.5, | |
| (0, 0, 0), | |
| 3, | |
| ) # Black outline | |
| cv2.putText( | |
| frame, | |
| f"Frame {frame_count}", | |
| (20, 50), | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 1.5, | |
| (255, 255, 255), | |
| 2, | |
| ) # White text | |
| return frame | |
| async def main(): | |
| """Main function demonstrating video producer functionality""" | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| logger.info("π Starting LeRobot Arena Video Producer Example") | |
| try: | |
| # Create video producer with configuration | |
| producer = VideoProducer(base_url="http://localhost:8000") | |
| # Set up event handlers | |
| producer.on_connected(lambda: logger.info("β Connected to video server")) | |
| producer.on_disconnected( | |
| lambda: logger.info("π Disconnected from video server") | |
| ) | |
| producer.on_error(lambda error: logger.error(f"β Error: {error}")) | |
| producer.on_status_update( | |
| lambda status, data: logger.info(f"π Status: {status}") | |
| ) | |
| producer.on_stream_stats( | |
| lambda stats: logger.debug(f"π Stats: {stats.average_fps:.1f}fps") | |
| ) | |
| # Create a room and connect | |
| room_id = await producer.create_room() | |
| logger.info(f"π Created room: {room_id}") | |
| connected = await producer.connect(room_id) | |
| if not connected: | |
| logger.error("β Failed to connect to room") | |
| return | |
| logger.info(f"β Connected as producer to room: {room_id}") | |
| # Start custom video stream with animated content | |
| logger.info("π¬ Starting animated video stream...") | |
| await producer.start_custom_stream(animated_frame_source) | |
| logger.info("πΊ Video streaming started!") | |
| logger.info(f"π Consumers can connect to room: {room_id}") | |
| logger.info( | |
| f"π± Use JS consumer: http://localhost:5173/consumer?room={room_id}" | |
| ) | |
| # Stream for demo duration | |
| duration = 30 # Stream for 30 seconds | |
| logger.info(f"β±οΈ Streaming for {duration} seconds...") | |
| for i in range(duration): | |
| await asyncio.sleep(1) | |
| if i % 5 == 0: | |
| logger.info(f"π‘ Streaming... {duration - i} seconds remaining") | |
| logger.info("π Stopping video stream...") | |
| await producer.stop_streaming() | |
| except Exception as e: | |
| logger.error(f"β Unexpected error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| finally: | |
| # Clean up | |
| logger.info("π§Ή Cleaning up...") | |
| if "producer" in locals(): | |
| await producer.disconnect() | |
| logger.info("β Video producer example completed") | |
| async def camera_example(): | |
| """Example using actual camera (if available)""" | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| logger.info("π· Starting Camera Video Producer Example") | |
| try: | |
| # Create producer using factory function | |
| producer = await create_producer_client(base_url="http://localhost:8000") | |
| room_id = producer.current_room_id | |
| logger.info(f"π Connected to room: {room_id}") | |
| # Get available cameras | |
| cameras = await producer.get_camera_devices() | |
| if cameras: | |
| logger.info("πΉ Available cameras:") | |
| for camera in cameras: | |
| logger.info( | |
| f" Device {camera['device_id']}: {camera['name']} " | |
| f"({camera['resolution']['width']}x{camera['resolution']['height']})" | |
| ) | |
| # Start camera stream | |
| logger.info("π· Starting camera stream...") | |
| await producer.start_camera(device_id=0) | |
| logger.info("πΊ Camera streaming started!") | |
| logger.info(f"π Consumers can connect to room: {room_id}") | |
| # Stream for demo duration | |
| await asyncio.sleep(30) | |
| else: | |
| logger.warning("β οΈ No cameras found") | |
| except Exception as e: | |
| logger.error(f"β Camera error: {e}") | |
| logger.info("π‘ Make sure your camera is available and not used by other apps") | |
| finally: | |
| if "producer" in locals(): | |
| await producer.disconnect() | |
| async def screen_share_example(): | |
| """Example using screen sharing (animated pattern for demo)""" | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| logger.info("π₯οΈ Starting Screen Share Example") | |
| try: | |
| producer = await create_producer_client() | |
| room_id = producer.current_room_id | |
| logger.info("π₯οΈ Starting screen share...") | |
| await producer.start_screen_share() | |
| logger.info(f"πΊ Screen sharing started! Room: {room_id}") | |
| # Share for demo duration | |
| await asyncio.sleep(20) | |
| except Exception as e: | |
| logger.error(f"β Screen share error: {e}") | |
| finally: | |
| if "producer" in locals(): | |
| await producer.disconnect() | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) > 1: | |
| mode = sys.argv[1] | |
| if mode == "camera": | |
| asyncio.run(camera_example()) | |
| elif mode == "screen": | |
| asyncio.run(screen_share_example()) | |
| elif mode == "animated": | |
| asyncio.run(main()) | |
| else: | |
| print("Usage:") | |
| print(" python video_producer_example.py animated # Animated content") | |
| print(" python video_producer_example.py camera # Camera stream") | |
| print(" python video_producer_example.py screen # Screen share") | |
| else: | |
| # Default: run animated example | |
| asyncio.run(main()) | |