Spaces:
Paused
Paused
| """ | |
| Durable Objects integration for OpenManus | |
| Provides interface to Cloudflare Durable Objects operations | |
| """ | |
| import json | |
| import time | |
| from typing import Any, Dict, List, Optional | |
| from app.logger import logger | |
| from .client import CloudflareClient, CloudflareError | |
| class DurableObjects: | |
| """Cloudflare Durable Objects client""" | |
| def __init__(self, client: CloudflareClient): | |
| self.client = client | |
| async def create_agent_session( | |
| self, session_id: str, user_id: str, metadata: Optional[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """Create a new agent session""" | |
| session_data = { | |
| "sessionId": session_id, | |
| "userId": user_id, | |
| "metadata": metadata or {}, | |
| } | |
| try: | |
| response = await self.client.post( | |
| f"do/agent/{session_id}/start", data=session_data, use_worker=True | |
| ) | |
| return { | |
| "success": True, | |
| "session_id": session_id, | |
| "user_id": user_id, | |
| **response, | |
| } | |
| except CloudflareError as e: | |
| logger.error(f"Failed to create agent session: {e}") | |
| raise | |
| async def get_agent_session_status(self, session_id: str) -> Dict[str, Any]: | |
| """Get agent session status""" | |
| try: | |
| response = await self.client.get( | |
| f"do/agent/{session_id}/status?sessionId={session_id}", use_worker=True | |
| ) | |
| return response | |
| except CloudflareError as e: | |
| logger.error(f"Failed to get agent session status: {e}") | |
| raise | |
| async def update_agent_session( | |
| self, session_id: str, updates: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Update agent session""" | |
| update_data = {"sessionId": session_id, "updates": updates} | |
| try: | |
| response = await self.client.post( | |
| f"do/agent/{session_id}/update", data=update_data, use_worker=True | |
| ) | |
| return {"success": True, "session_id": session_id, **response} | |
| except CloudflareError as e: | |
| logger.error(f"Failed to update agent session: {e}") | |
| raise | |
| async def stop_agent_session(self, session_id: str) -> Dict[str, Any]: | |
| """Stop agent session""" | |
| try: | |
| response = await self.client.post( | |
| f"do/agent/{session_id}/stop", | |
| data={"sessionId": session_id}, | |
| use_worker=True, | |
| ) | |
| return {"success": True, "session_id": session_id, **response} | |
| except CloudflareError as e: | |
| logger.error(f"Failed to stop agent session: {e}") | |
| raise | |
| async def add_agent_message( | |
| self, session_id: str, message: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Add a message to agent session""" | |
| message_data = { | |
| "sessionId": session_id, | |
| "message": {"timestamp": int(time.time()), **message}, | |
| } | |
| try: | |
| response = await self.client.post( | |
| f"do/agent/{session_id}/messages", data=message_data, use_worker=True | |
| ) | |
| return {"success": True, "session_id": session_id, **response} | |
| except CloudflareError as e: | |
| logger.error(f"Failed to add agent message: {e}") | |
| raise | |
| async def get_agent_messages( | |
| self, session_id: str, limit: int = 50, offset: int = 0 | |
| ) -> Dict[str, Any]: | |
| """Get agent session messages""" | |
| try: | |
| response = await self.client.get( | |
| f"do/agent/{session_id}/messages?sessionId={session_id}&limit={limit}&offset={offset}", | |
| use_worker=True, | |
| ) | |
| return response | |
| except CloudflareError as e: | |
| logger.error(f"Failed to get agent messages: {e}") | |
| raise | |
| # Chat Room methods | |
| async def join_chat_room( | |
| self, | |
| room_id: str, | |
| user_id: str, | |
| username: str, | |
| room_config: Optional[Dict[str, Any]] = None, | |
| ) -> Dict[str, Any]: | |
| """Join a chat room""" | |
| join_data = { | |
| "userId": user_id, | |
| "username": username, | |
| "roomConfig": room_config or {}, | |
| } | |
| try: | |
| response = await self.client.post( | |
| f"do/chat/{room_id}/join", data=join_data, use_worker=True | |
| ) | |
| return {"success": True, "room_id": room_id, "user_id": user_id, **response} | |
| except CloudflareError as e: | |
| logger.error(f"Failed to join chat room: {e}") | |
| raise | |
| async def leave_chat_room(self, room_id: str, user_id: str) -> Dict[str, Any]: | |
| """Leave a chat room""" | |
| leave_data = {"userId": user_id} | |
| try: | |
| response = await self.client.post( | |
| f"do/chat/{room_id}/leave", data=leave_data, use_worker=True | |
| ) | |
| return {"success": True, "room_id": room_id, "user_id": user_id, **response} | |
| except CloudflareError as e: | |
| logger.error(f"Failed to leave chat room: {e}") | |
| raise | |
| async def get_chat_room_info(self, room_id: str) -> Dict[str, Any]: | |
| """Get chat room information""" | |
| try: | |
| response = await self.client.get(f"do/chat/{room_id}/info", use_worker=True) | |
| return response | |
| except CloudflareError as e: | |
| logger.error(f"Failed to get chat room info: {e}") | |
| raise | |
| async def send_chat_message( | |
| self, | |
| room_id: str, | |
| user_id: str, | |
| username: str, | |
| content: str, | |
| message_type: str = "text", | |
| ) -> Dict[str, Any]: | |
| """Send a message to chat room""" | |
| message_data = { | |
| "userId": user_id, | |
| "username": username, | |
| "content": content, | |
| "messageType": message_type, | |
| } | |
| try: | |
| response = await self.client.post( | |
| f"do/chat/{room_id}/messages", data=message_data, use_worker=True | |
| ) | |
| return {"success": True, "room_id": room_id, **response} | |
| except CloudflareError as e: | |
| logger.error(f"Failed to send chat message: {e}") | |
| raise | |
| async def get_chat_messages( | |
| self, room_id: str, limit: int = 50, offset: int = 0 | |
| ) -> Dict[str, Any]: | |
| """Get chat room messages""" | |
| try: | |
| response = await self.client.get( | |
| f"do/chat/{room_id}/messages?limit={limit}&offset={offset}", | |
| use_worker=True, | |
| ) | |
| return response | |
| except CloudflareError as e: | |
| logger.error(f"Failed to get chat messages: {e}") | |
| raise | |
| async def get_chat_participants(self, room_id: str) -> Dict[str, Any]: | |
| """Get chat room participants""" | |
| try: | |
| response = await self.client.get( | |
| f"do/chat/{room_id}/participants", use_worker=True | |
| ) | |
| return response | |
| except CloudflareError as e: | |
| logger.error(f"Failed to get chat participants: {e}") | |
| raise | |
| # WebSocket connection helpers | |
| def get_agent_websocket_url(self, session_id: str, user_id: str) -> str: | |
| """Get WebSocket URL for agent session""" | |
| if not self.client.worker_url: | |
| raise CloudflareError("Worker URL not configured") | |
| base_url = self.client.worker_url.replace("https://", "wss://").replace( | |
| "http://", "ws://" | |
| ) | |
| return ( | |
| f"{base_url}/do/agent/{session_id}?sessionId={session_id}&userId={user_id}" | |
| ) | |
| def get_chat_websocket_url(self, room_id: str, user_id: str, username: str) -> str: | |
| """Get WebSocket URL for chat room""" | |
| if not self.client.worker_url: | |
| raise CloudflareError("Worker URL not configured") | |
| base_url = self.client.worker_url.replace("https://", "wss://").replace( | |
| "http://", "ws://" | |
| ) | |
| return f"{base_url}/do/chat/{room_id}?userId={user_id}&username={username}" | |
| class DurableObjectsWebSocket: | |
| """Helper class for WebSocket connections to Durable Objects""" | |
| def __init__(self, url: str): | |
| self.url = url | |
| self.websocket = None | |
| self.connected = False | |
| self.message_handlers = {} | |
| async def connect(self): | |
| """Connect to WebSocket""" | |
| try: | |
| import websockets | |
| self.websocket = await websockets.connect(self.url) | |
| self.connected = True | |
| logger.info(f"Connected to Durable Object WebSocket: {self.url}") | |
| # Start message handling loop | |
| import asyncio | |
| asyncio.create_task(self._message_loop()) | |
| except Exception as e: | |
| logger.error(f"Failed to connect to WebSocket: {e}") | |
| raise CloudflareError(f"WebSocket connection failed: {e}") | |
| async def disconnect(self): | |
| """Disconnect from WebSocket""" | |
| if self.websocket and self.connected: | |
| await self.websocket.close() | |
| self.connected = False | |
| logger.info("Disconnected from Durable Object WebSocket") | |
| async def send_message(self, message_type: str, payload: Dict[str, Any]): | |
| """Send message via WebSocket""" | |
| if not self.connected or not self.websocket: | |
| raise CloudflareError("WebSocket not connected") | |
| message = { | |
| "type": message_type, | |
| "payload": payload, | |
| "timestamp": int(time.time()), | |
| } | |
| try: | |
| await self.websocket.send(json.dumps(message)) | |
| except Exception as e: | |
| logger.error(f"Failed to send WebSocket message: {e}") | |
| raise CloudflareError(f"Failed to send message: {e}") | |
| def add_message_handler(self, message_type: str, handler): | |
| """Add a message handler for specific message types""" | |
| if message_type not in self.message_handlers: | |
| self.message_handlers[message_type] = [] | |
| self.message_handlers[message_type].append(handler) | |
| async def _message_loop(self): | |
| """Handle incoming WebSocket messages""" | |
| try: | |
| async for message in self.websocket: | |
| try: | |
| data = json.loads(message) | |
| message_type = data.get("type") | |
| if message_type in self.message_handlers: | |
| for handler in self.message_handlers[message_type]: | |
| try: | |
| if callable(handler): | |
| if asyncio.iscoroutinefunction(handler): | |
| await handler(data) | |
| else: | |
| handler(data) | |
| except Exception as e: | |
| logger.error(f"Message handler error: {e}") | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to parse WebSocket message: {e}") | |
| except Exception as e: | |
| logger.error(f"WebSocket message processing error: {e}") | |
| except Exception as e: | |
| logger.error(f"WebSocket message loop error: {e}") | |
| self.connected = False | |
| # Context manager support | |
| async def __aenter__(self): | |
| await self.connect() | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| await self.disconnect() | |