Spaces:
Sleeping
Sleeping
| import asyncio | |
| import pytest | |
| from lerobot_arena_client import ( | |
| RoboticsProducer, | |
| create_consumer_client, | |
| create_producer_client, | |
| ) | |
| class TestIntegration: | |
| """End-to-end integration tests.""" | |
| async def test_full_producer_consumer_workflow(self): | |
| """Test complete producer-consumer workflow.""" | |
| # Create producer and room | |
| producer = await create_producer_client("http://localhost:8000") | |
| room_id = producer.room_id | |
| try: | |
| # Create consumer and connect to same room | |
| consumer = await create_consumer_client(room_id, "http://localhost:8000") | |
| try: | |
| # Set up consumer to collect messages | |
| received_states = [] | |
| received_updates = [] | |
| received_errors = [] | |
| def on_state_sync(state): | |
| received_states.append(state) | |
| def on_joint_update(joints): | |
| received_updates.append(joints) | |
| def on_error(error): | |
| received_errors.append(error) | |
| consumer.on_state_sync(on_state_sync) | |
| consumer.on_joint_update(on_joint_update) | |
| consumer.on_error(on_error) | |
| # Wait for connections to stabilize | |
| await asyncio.sleep(0.2) | |
| # Producer sends initial state | |
| initial_state = {"shoulder": 0.0, "elbow": 0.0, "wrist": 0.0} | |
| await producer.send_state_sync(initial_state) | |
| await asyncio.sleep(0.1) | |
| # Producer sends series of joint updates | |
| joint_sequences = [ | |
| [{"name": "shoulder", "value": 45.0}], | |
| [{"name": "elbow", "value": -30.0}], | |
| [{"name": "wrist", "value": 15.0}], | |
| [ | |
| {"name": "shoulder", "value": 90.0}, | |
| {"name": "elbow", "value": -60.0}, | |
| ], | |
| ] | |
| for joints in joint_sequences: | |
| await producer.send_joint_update(joints) | |
| await asyncio.sleep(0.1) | |
| # Wait for all messages to be received | |
| await asyncio.sleep(0.3) | |
| # Verify consumer received messages | |
| assert len(received_updates) >= 4 # At least the joint updates | |
| # Verify final state | |
| final_state = await consumer.get_state_sync() | |
| expected_final_state = {"shoulder": 90.0, "elbow": -60.0, "wrist": 15.0} | |
| assert final_state == expected_final_state | |
| finally: | |
| await consumer.disconnect() | |
| finally: | |
| await producer.disconnect() | |
| await producer.delete_room(room_id) | |
| async def test_multiple_consumers_same_room(self): | |
| """Test multiple consumers receiving same messages.""" | |
| producer = await create_producer_client("http://localhost:8000") | |
| room_id = producer.room_id | |
| try: | |
| # Create multiple consumers | |
| consumer1 = await create_consumer_client(room_id, "http://localhost:8000") | |
| consumer2 = await create_consumer_client(room_id, "http://localhost:8000") | |
| try: | |
| # Set up message collection for both consumers | |
| consumer1_updates = [] | |
| consumer2_updates = [] | |
| consumer1.on_joint_update( | |
| lambda joints: consumer1_updates.append(joints) | |
| ) | |
| consumer2.on_joint_update( | |
| lambda joints: consumer2_updates.append(joints) | |
| ) | |
| # Wait for connections | |
| await asyncio.sleep(0.2) | |
| # Producer sends updates | |
| test_joints = [ | |
| {"name": "joint1", "value": 10.0}, | |
| {"name": "joint2", "value": 20.0}, | |
| ] | |
| await producer.send_joint_update(test_joints) | |
| # Wait for message propagation | |
| await asyncio.sleep(0.2) | |
| # Both consumers should receive the same update | |
| assert len(consumer1_updates) >= 1 | |
| assert len(consumer2_updates) >= 1 | |
| # Verify both received same data | |
| if consumer1_updates and consumer2_updates: | |
| assert consumer1_updates[-1] == consumer2_updates[-1] | |
| finally: | |
| await consumer1.disconnect() | |
| await consumer2.disconnect() | |
| finally: | |
| await producer.disconnect() | |
| await producer.delete_room(room_id) | |
| async def test_emergency_stop_propagation(self): | |
| """Test emergency stop propagation to all consumers.""" | |
| producer = await create_producer_client("http://localhost:8000") | |
| room_id = producer.room_id | |
| try: | |
| # Create consumers | |
| consumer1 = await create_consumer_client(room_id, "http://localhost:8000") | |
| consumer2 = await create_consumer_client(room_id, "http://localhost:8000") | |
| try: | |
| # Set up error collection | |
| consumer1_errors = [] | |
| consumer2_errors = [] | |
| consumer1.on_error(lambda error: consumer1_errors.append(error)) | |
| consumer2.on_error(lambda error: consumer2_errors.append(error)) | |
| # Wait for connections | |
| await asyncio.sleep(0.2) | |
| # Producer sends emergency stop | |
| await producer.send_emergency_stop("Integration test emergency stop") | |
| # Wait for message propagation | |
| await asyncio.sleep(0.2) | |
| # Both consumers should receive emergency stop | |
| assert len(consumer1_errors) >= 1 | |
| assert len(consumer2_errors) >= 1 | |
| # Verify error messages contain emergency stop info | |
| if consumer1_errors: | |
| assert "emergency stop" in consumer1_errors[-1].lower() | |
| if consumer2_errors: | |
| assert "emergency stop" in consumer2_errors[-1].lower() | |
| finally: | |
| await consumer1.disconnect() | |
| await consumer2.disconnect() | |
| finally: | |
| await producer.disconnect() | |
| await producer.delete_room(room_id) | |
| async def test_producer_reconnection_workflow(self): | |
| """Test producer reconnecting and resuming operation.""" | |
| # Create room first | |
| temp_producer = RoboticsProducer("http://localhost:8000") | |
| room_id = await temp_producer.create_room() | |
| try: | |
| # Create consumer first | |
| consumer = await create_consumer_client(room_id, "http://localhost:8000") | |
| try: | |
| received_updates = [] | |
| consumer.on_joint_update(lambda joints: received_updates.append(joints)) | |
| # Create producer and connect | |
| producer = RoboticsProducer("http://localhost:8000") | |
| await producer.connect(room_id) | |
| # Send initial update | |
| await producer.send_state_sync({"joint1": 10.0}) | |
| await asyncio.sleep(0.1) | |
| # Disconnect producer | |
| await producer.disconnect() | |
| # Reconnect producer | |
| await producer.connect(room_id) | |
| # Send another update | |
| await producer.send_state_sync({"joint1": 20.0}) | |
| await asyncio.sleep(0.2) | |
| # Consumer should have received both updates | |
| assert len(received_updates) >= 2 | |
| await producer.disconnect() | |
| finally: | |
| await consumer.disconnect() | |
| finally: | |
| await temp_producer.delete_room(room_id) | |
| async def test_consumer_late_join(self): | |
| """Test consumer joining room after producer has sent updates.""" | |
| producer = await create_producer_client("http://localhost:8000") | |
| room_id = producer.room_id | |
| try: | |
| # Producer sends some updates before consumer joins | |
| await producer.send_state_sync({"joint1": 10.0, "joint2": 20.0}) | |
| await asyncio.sleep(0.1) | |
| await producer.send_joint_update([{"name": "joint3", "value": 30.0}]) | |
| await asyncio.sleep(0.1) | |
| # Now consumer joins | |
| consumer = await create_consumer_client(room_id, "http://localhost:8000") | |
| try: | |
| # Consumer should be able to get current state | |
| current_state = await consumer.get_state_sync() | |
| # Should contain all previously sent updates | |
| expected_state = {"joint1": 10.0, "joint2": 20.0, "joint3": 30.0} | |
| assert current_state == expected_state | |
| finally: | |
| await consumer.disconnect() | |
| finally: | |
| await producer.disconnect() | |
| await producer.delete_room(room_id) | |
| async def test_room_cleanup_on_producer_disconnect(self): | |
| """Test room state when producer disconnects.""" | |
| producer = await create_producer_client("http://localhost:8000") | |
| room_id = producer.room_id | |
| try: | |
| consumer = await create_consumer_client(room_id, "http://localhost:8000") | |
| try: | |
| # Send some state | |
| await producer.send_state_sync({"joint1": 42.0}) | |
| await asyncio.sleep(0.1) | |
| # Verify state exists | |
| state_before = await consumer.get_state_sync() | |
| assert state_before == {"joint1": 42.0} | |
| # Producer disconnects | |
| await producer.disconnect() | |
| await asyncio.sleep(0.1) | |
| # State should still be accessible to consumer | |
| state_after = await consumer.get_state_sync() | |
| assert state_after == {"joint1": 42.0} | |
| finally: | |
| await consumer.disconnect() | |
| finally: | |
| # Clean up room manually since producer disconnected | |
| temp_producer = RoboticsProducer("http://localhost:8000") | |
| await temp_producer.delete_room(room_id) | |
| async def test_high_frequency_updates(self): | |
| """Test handling high frequency updates.""" | |
| producer = await create_producer_client("http://localhost:8000") | |
| room_id = producer.room_id | |
| try: | |
| consumer = await create_consumer_client(room_id, "http://localhost:8000") | |
| try: | |
| received_updates = [] | |
| consumer.on_joint_update(lambda joints: received_updates.append(joints)) | |
| # Wait for connection | |
| await asyncio.sleep(0.1) | |
| # Send rapid updates | |
| for i in range(20): | |
| await producer.send_state_sync({"joint1": float(i), "timestamp": i}) | |
| await asyncio.sleep(0.01) # 10ms intervals | |
| # Wait for all messages | |
| await asyncio.sleep(0.5) | |
| # Should have received multiple updates | |
| # (exact number may vary due to change detection) | |
| assert len(received_updates) >= 5 | |
| # Final state should reflect last update | |
| final_state = await consumer.get_state_sync() | |
| assert final_state["joint1"] == 19.0 | |
| assert final_state["timestamp"] == 19 | |
| finally: | |
| await consumer.disconnect() | |
| finally: | |
| await producer.disconnect() | |
| await producer.delete_room(room_id) | |