Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Producer-Consumer Demo - LeRobot Arena | |
| This example demonstrates: | |
| - Producer and multiple consumers working together | |
| - Real-time joint updates | |
| - Emergency stop functionality | |
| - State synchronization | |
| - Connection management | |
| """ | |
| import asyncio | |
| import logging | |
| import random | |
| from lerobot_arena_client import RoboticsConsumer, RoboticsProducer | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class DemoConsumer: | |
| """Demo consumer that logs all received messages.""" | |
| def __init__(self, name: str, room_id: str): | |
| self.name = name | |
| self.room_id = room_id | |
| self.consumer = RoboticsConsumer("http://localhost:8000") | |
| self.update_count = 0 | |
| self.state_count = 0 | |
| async def setup(self): | |
| """Setup consumer with callbacks.""" | |
| def on_joint_update(joints): | |
| self.update_count += 1 | |
| logger.info( | |
| f"[{self.name}] Joint update #{self.update_count}: {len(joints)} joints" | |
| ) | |
| def on_state_sync(state): | |
| self.state_count += 1 | |
| logger.info( | |
| f"[{self.name}] State sync #{self.state_count}: {len(state)} joints" | |
| ) | |
| def on_error(error_msg): | |
| logger.error(f"[{self.name}] ERROR: {error_msg}") | |
| def on_connected(): | |
| logger.info(f"[{self.name}] Connected!") | |
| def on_disconnected(): | |
| logger.info(f"[{self.name}] Disconnected!") | |
| self.consumer.on_joint_update(on_joint_update) | |
| self.consumer.on_state_sync(on_state_sync) | |
| self.consumer.on_error(on_error) | |
| self.consumer.on_connected(on_connected) | |
| self.consumer.on_disconnected(on_disconnected) | |
| async def connect(self): | |
| """Connect to room.""" | |
| success = await self.consumer.connect(self.room_id, f"demo-{self.name}") | |
| if success: | |
| logger.info(f"[{self.name}] Successfully connected to room {self.room_id}") | |
| else: | |
| logger.error(f"[{self.name}] Failed to connect to room {self.room_id}") | |
| return success | |
| async def disconnect(self): | |
| """Disconnect from room.""" | |
| if self.consumer.is_connected(): | |
| await self.consumer.disconnect() | |
| logger.info( | |
| f"[{self.name}] Final stats: {self.update_count} updates, {self.state_count} states" | |
| ) | |
| async def simulate_robot_movement(producer: RoboticsProducer): | |
| """Simulate realistic robot movement.""" | |
| # Define some realistic joint ranges for a robotic arm | |
| joints = { | |
| "base": {"current": 0.0, "target": 0.0, "min": -180, "max": 180}, | |
| "shoulder": {"current": 0.0, "target": 0.0, "min": -90, "max": 90}, | |
| "elbow": {"current": 0.0, "target": 0.0, "min": -135, "max": 135}, | |
| "wrist": {"current": 0.0, "target": 0.0, "min": -180, "max": 180}, | |
| } | |
| logger.info("[Producer] Starting robot movement simulation...") | |
| for step in range(20): # 20 movement steps | |
| # Occasionally set new random targets | |
| if step % 5 == 0: | |
| for joint_name, joint_data in joints.items(): | |
| joint_data["target"] = random.uniform( | |
| joint_data["min"], joint_data["max"] | |
| ) | |
| logger.info(f"[Producer] Step {step + 1}: New targets set") | |
| # Move each joint towards its target | |
| joint_updates = [] | |
| for joint_name, joint_data in joints.items(): | |
| current = joint_data["current"] | |
| target = joint_data["target"] | |
| # Simple movement: move 10% towards target each step | |
| diff = target - current | |
| move = diff * 0.1 | |
| new_value = current + move | |
| joint_data["current"] = new_value | |
| joint_updates.append({"name": joint_name, "value": new_value}) | |
| # Send the joint updates | |
| await producer.send_joint_update(joint_updates) | |
| # Add some delay for realistic movement | |
| await asyncio.sleep(0.5) | |
| logger.info("[Producer] Movement simulation completed") | |
| async def main(): | |
| """Main demo function.""" | |
| logger.info("=== LeRobot Arena Producer-Consumer Demo ===") | |
| # Create producer | |
| producer = RoboticsProducer("http://localhost:8000") | |
| # Setup producer callbacks | |
| def on_producer_error(error_msg): | |
| logger.error(f"[Producer] ERROR: {error_msg}") | |
| def on_producer_connected(): | |
| logger.info("[Producer] Connected!") | |
| def on_producer_disconnected(): | |
| logger.info("[Producer] Disconnected!") | |
| producer.on_error(on_producer_error) | |
| producer.on_connected(on_producer_connected) | |
| producer.on_disconnected(on_producer_disconnected) | |
| try: | |
| # Create room and connect producer | |
| room_id = await producer.create_room() | |
| logger.info(f"Created room: {room_id}") | |
| success = await producer.connect(room_id, "robot-controller") | |
| if not success: | |
| logger.error("Failed to connect producer!") | |
| return | |
| # Create multiple consumers | |
| consumers = [] | |
| consumer_names = ["visualizer", "logger", "safety-monitor"] | |
| for name in consumer_names: | |
| consumer = DemoConsumer(name, room_id) | |
| await consumer.setup() | |
| consumers.append(consumer) | |
| # Connect all consumers | |
| logger.info("Connecting consumers...") | |
| for consumer in consumers: | |
| await consumer.connect() | |
| await asyncio.sleep(0.1) # Small delay between connections | |
| # Send initial state | |
| logger.info("[Producer] Sending initial state...") | |
| initial_state = {"base": 0.0, "shoulder": 0.0, "elbow": 0.0, "wrist": 0.0} | |
| await producer.send_state_sync(initial_state) | |
| await asyncio.sleep(1) | |
| # Start robot movement simulation | |
| movement_task = asyncio.create_task(simulate_robot_movement(producer)) | |
| # Let it run for a bit | |
| await asyncio.sleep(5) | |
| # Demonstrate emergency stop | |
| logger.info("🚨 [Producer] Sending emergency stop!") | |
| await producer.send_emergency_stop( | |
| "Demo emergency stop - testing safety systems" | |
| ) | |
| # Wait for movement to complete | |
| await movement_task | |
| # Final state check | |
| logger.info("=== Final Demo Summary ===") | |
| for consumer in consumers: | |
| logger.info( | |
| f"[{consumer.name}] Received {consumer.update_count} updates, {consumer.state_count} states" | |
| ) | |
| logger.info("Demo completed successfully!") | |
| except Exception as e: | |
| logger.error(f"Demo error: {e}") | |
| finally: | |
| # Cleanup | |
| logger.info("Cleaning up...") | |
| # Disconnect all consumers | |
| for consumer in consumers: | |
| await consumer.disconnect() | |
| # Disconnect producer | |
| if producer.is_connected(): | |
| await producer.disconnect() | |
| logger.info("Demo cleanup completed") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |