Spaces:
Paused
Paused
| """ | |
| D1 Database integration for OpenManus | |
| Provides interface to Cloudflare D1 database operations | |
| """ | |
| from typing import Any, Dict, List, Optional, Union | |
| from app.logger import logger | |
| from .client import CloudflareClient, CloudflareError | |
| class D1Database: | |
| """Cloudflare D1 Database client""" | |
| def __init__(self, client: CloudflareClient, database_id: str): | |
| self.client = client | |
| self.database_id = database_id | |
| self.base_endpoint = f"accounts/{client.account_id}/d1/database/{database_id}" | |
| async def execute_query( | |
| self, sql: str, params: Optional[List[Any]] = None, use_worker: bool = True | |
| ) -> Dict[str, Any]: | |
| """Execute a SQL query""" | |
| query_data = {"sql": sql} | |
| if params: | |
| query_data["params"] = params | |
| try: | |
| if use_worker: | |
| # Use worker endpoint for better performance | |
| response = await self.client.post( | |
| "api/database/query", data=query_data, use_worker=True | |
| ) | |
| else: | |
| # Use Cloudflare API directly | |
| response = await self.client.post( | |
| f"{self.base_endpoint}/query", data=query_data | |
| ) | |
| return response | |
| except CloudflareError as e: | |
| logger.error(f"D1 query execution failed: {e}") | |
| raise | |
| async def batch_execute( | |
| self, queries: List[Dict[str, Any]], use_worker: bool = True | |
| ) -> Dict[str, Any]: | |
| """Execute multiple queries in a batch""" | |
| batch_data = {"queries": queries} | |
| try: | |
| if use_worker: | |
| response = await self.client.post( | |
| "api/database/batch", data=batch_data, use_worker=True | |
| ) | |
| else: | |
| response = await self.client.post( | |
| f"{self.base_endpoint}/query", data=batch_data | |
| ) | |
| return response | |
| except CloudflareError as e: | |
| logger.error(f"D1 batch execution failed: {e}") | |
| raise | |
| # User management methods | |
| async def create_user( | |
| self, | |
| user_id: str, | |
| username: str, | |
| email: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| ) -> Dict[str, Any]: | |
| """Create a new user""" | |
| sql = """ | |
| INSERT INTO users (id, username, email, metadata) | |
| VALUES (?, ?, ?, ?) | |
| ON CONFLICT(id) DO UPDATE SET | |
| username = excluded.username, | |
| email = excluded.email, | |
| metadata = excluded.metadata, | |
| updated_at = strftime('%s', 'now') | |
| """ | |
| import json | |
| params = [user_id, username, email, json.dumps(metadata or {})] | |
| return await self.execute_query(sql, params) | |
| async def get_user(self, user_id: str) -> Optional[Dict[str, Any]]: | |
| """Get user by ID""" | |
| sql = "SELECT * FROM users WHERE id = ?" | |
| params = [user_id] | |
| result = await self.execute_query(sql, params) | |
| # Parse response based on Cloudflare D1 format | |
| if result.get("success") and result.get("result"): | |
| rows = result["result"][0].get("results", []) | |
| if rows: | |
| user = rows[0] | |
| if user.get("metadata"): | |
| import json | |
| user["metadata"] = json.loads(user["metadata"]) | |
| return user | |
| return None | |
| async def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]: | |
| """Get user by username""" | |
| sql = "SELECT * FROM users WHERE username = ?" | |
| params = [username] | |
| result = await self.execute_query(sql, params) | |
| if result.get("success") and result.get("result"): | |
| rows = result["result"][0].get("results", []) | |
| if rows: | |
| user = rows[0] | |
| if user.get("metadata"): | |
| import json | |
| user["metadata"] = json.loads(user["metadata"]) | |
| return user | |
| return None | |
| # Session management methods | |
| async def create_session( | |
| self, | |
| session_id: str, | |
| user_id: str, | |
| session_data: Dict[str, Any], | |
| expires_at: Optional[int] = None, | |
| ) -> Dict[str, Any]: | |
| """Create a new session""" | |
| sql = """ | |
| INSERT INTO sessions (id, user_id, session_data, expires_at) | |
| VALUES (?, ?, ?, ?) | |
| """ | |
| import json | |
| params = [session_id, user_id, json.dumps(session_data), expires_at] | |
| return await self.execute_query(sql, params) | |
| async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: | |
| """Get session by ID""" | |
| sql = """ | |
| SELECT * FROM sessions | |
| WHERE id = ? AND (expires_at IS NULL OR expires_at > strftime('%s', 'now')) | |
| """ | |
| params = [session_id] | |
| result = await self.execute_query(sql, params) | |
| if result.get("success") and result.get("result"): | |
| rows = result["result"][0].get("results", []) | |
| if rows: | |
| session = rows[0] | |
| if session.get("session_data"): | |
| import json | |
| session["session_data"] = json.loads(session["session_data"]) | |
| return session | |
| return None | |
| async def delete_session(self, session_id: str) -> Dict[str, Any]: | |
| """Delete a session""" | |
| sql = "DELETE FROM sessions WHERE id = ?" | |
| params = [session_id] | |
| return await self.execute_query(sql, params) | |
| # Conversation methods | |
| async def create_conversation( | |
| self, | |
| conversation_id: str, | |
| user_id: str, | |
| title: Optional[str] = None, | |
| messages: Optional[List[Dict[str, Any]]] = None, | |
| ) -> Dict[str, Any]: | |
| """Create a new conversation""" | |
| sql = """ | |
| INSERT INTO conversations (id, user_id, title, messages) | |
| VALUES (?, ?, ?, ?) | |
| """ | |
| import json | |
| params = [conversation_id, user_id, title, json.dumps(messages or [])] | |
| return await self.execute_query(sql, params) | |
| async def get_conversation(self, conversation_id: str) -> Optional[Dict[str, Any]]: | |
| """Get conversation by ID""" | |
| sql = "SELECT * FROM conversations WHERE id = ?" | |
| params = [conversation_id] | |
| result = await self.execute_query(sql, params) | |
| if result.get("success") and result.get("result"): | |
| rows = result["result"][0].get("results", []) | |
| if rows: | |
| conversation = rows[0] | |
| if conversation.get("messages"): | |
| import json | |
| conversation["messages"] = json.loads(conversation["messages"]) | |
| return conversation | |
| return None | |
| async def update_conversation_messages( | |
| self, conversation_id: str, messages: List[Dict[str, Any]] | |
| ) -> Dict[str, Any]: | |
| """Update conversation messages""" | |
| sql = """ | |
| UPDATE conversations | |
| SET messages = ?, updated_at = strftime('%s', 'now') | |
| WHERE id = ? | |
| """ | |
| import json | |
| params = [json.dumps(messages), conversation_id] | |
| return await self.execute_query(sql, params) | |
| async def get_user_conversations( | |
| self, user_id: str, limit: int = 50 | |
| ) -> List[Dict[str, Any]]: | |
| """Get user's conversations""" | |
| sql = """ | |
| SELECT id, user_id, title, created_at, updated_at | |
| FROM conversations | |
| WHERE user_id = ? | |
| ORDER BY updated_at DESC | |
| LIMIT ? | |
| """ | |
| params = [user_id, limit] | |
| result = await self.execute_query(sql, params) | |
| if result.get("success") and result.get("result"): | |
| return result["result"][0].get("results", []) | |
| return [] | |
| # Agent execution methods | |
| async def create_agent_execution( | |
| self, | |
| execution_id: str, | |
| user_id: str, | |
| session_id: Optional[str] = None, | |
| task_description: Optional[str] = None, | |
| status: str = "pending", | |
| ) -> Dict[str, Any]: | |
| """Create a new agent execution record""" | |
| sql = """ | |
| INSERT INTO agent_executions (id, user_id, session_id, task_description, status) | |
| VALUES (?, ?, ?, ?, ?) | |
| """ | |
| params = [execution_id, user_id, session_id, task_description, status] | |
| return await self.execute_query(sql, params) | |
| async def update_agent_execution( | |
| self, | |
| execution_id: str, | |
| status: Optional[str] = None, | |
| result: Optional[str] = None, | |
| execution_time: Optional[int] = None, | |
| ) -> Dict[str, Any]: | |
| """Update agent execution record""" | |
| updates = [] | |
| params = [] | |
| if status: | |
| updates.append("status = ?") | |
| params.append(status) | |
| if result: | |
| updates.append("result = ?") | |
| params.append(result) | |
| if execution_time is not None: | |
| updates.append("execution_time = ?") | |
| params.append(execution_time) | |
| if status in ["completed", "failed"]: | |
| updates.append("completed_at = strftime('%s', 'now')") | |
| if not updates: | |
| return {"success": True, "message": "No updates provided"} | |
| sql = f""" | |
| UPDATE agent_executions | |
| SET {', '.join(updates)} | |
| WHERE id = ? | |
| """ | |
| params.append(execution_id) | |
| return await self.execute_query(sql, params) | |
| async def get_agent_execution(self, execution_id: str) -> Optional[Dict[str, Any]]: | |
| """Get agent execution by ID""" | |
| sql = "SELECT * FROM agent_executions WHERE id = ?" | |
| params = [execution_id] | |
| result = await self.execute_query(sql, params) | |
| if result.get("success") and result.get("result"): | |
| rows = result["result"][0].get("results", []) | |
| if rows: | |
| return rows[0] | |
| return None | |
| async def get_user_executions( | |
| self, user_id: str, limit: int = 50 | |
| ) -> List[Dict[str, Any]]: | |
| """Get user's agent executions""" | |
| sql = """ | |
| SELECT * FROM agent_executions | |
| WHERE user_id = ? | |
| ORDER BY created_at DESC | |
| LIMIT ? | |
| """ | |
| params = [user_id, limit] | |
| result = await self.execute_query(sql, params) | |
| if result.get("success") and result.get("result"): | |
| return result["result"][0].get("results", []) | |
| return [] | |
| # File record methods | |
| async def create_file_record( | |
| self, | |
| file_id: str, | |
| user_id: str, | |
| filename: str, | |
| file_key: str, | |
| file_size: int, | |
| content_type: str, | |
| bucket: str = "storage", | |
| ) -> Dict[str, Any]: | |
| """Create a file record""" | |
| sql = """ | |
| INSERT INTO files (id, user_id, filename, file_key, file_size, content_type, bucket) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """ | |
| params = [file_id, user_id, filename, file_key, file_size, content_type, bucket] | |
| return await self.execute_query(sql, params) | |
| async def get_file_record(self, file_id: str) -> Optional[Dict[str, Any]]: | |
| """Get file record by ID""" | |
| sql = "SELECT * FROM files WHERE id = ?" | |
| params = [file_id] | |
| result = await self.execute_query(sql, params) | |
| if result.get("success") and result.get("result"): | |
| rows = result["result"][0].get("results", []) | |
| if rows: | |
| return rows[0] | |
| return None | |
| async def get_user_files( | |
| self, user_id: str, limit: int = 100 | |
| ) -> List[Dict[str, Any]]: | |
| """Get user's files""" | |
| sql = """ | |
| SELECT * FROM files | |
| WHERE user_id = ? | |
| ORDER BY created_at DESC | |
| LIMIT ? | |
| """ | |
| params = [user_id, limit] | |
| result = await self.execute_query(sql, params) | |
| if result.get("success") and result.get("result"): | |
| return result["result"][0].get("results", []) | |
| return [] | |
| async def delete_file_record(self, file_id: str) -> Dict[str, Any]: | |
| """Delete a file record""" | |
| sql = "DELETE FROM files WHERE id = ?" | |
| params = [file_id] | |
| return await self.execute_query(sql, params) | |
| # Schema initialization | |
| async def initialize_schema(self) -> Dict[str, Any]: | |
| """Initialize database schema""" | |
| schema_queries = [ | |
| { | |
| "sql": """CREATE TABLE IF NOT EXISTS users ( | |
| id TEXT PRIMARY KEY, | |
| username TEXT UNIQUE NOT NULL, | |
| email TEXT UNIQUE, | |
| created_at INTEGER DEFAULT (strftime('%s', 'now')), | |
| updated_at INTEGER DEFAULT (strftime('%s', 'now')), | |
| metadata TEXT | |
| )""" | |
| }, | |
| { | |
| "sql": """CREATE TABLE IF NOT EXISTS sessions ( | |
| id TEXT PRIMARY KEY, | |
| user_id TEXT NOT NULL, | |
| session_data TEXT, | |
| created_at INTEGER DEFAULT (strftime('%s', 'now')), | |
| expires_at INTEGER, | |
| FOREIGN KEY (user_id) REFERENCES users(id) | |
| )""" | |
| }, | |
| { | |
| "sql": """CREATE TABLE IF NOT EXISTS conversations ( | |
| id TEXT PRIMARY KEY, | |
| user_id TEXT NOT NULL, | |
| title TEXT, | |
| messages TEXT, | |
| created_at INTEGER DEFAULT (strftime('%s', 'now')), | |
| updated_at INTEGER DEFAULT (strftime('%s', 'now')), | |
| FOREIGN KEY (user_id) REFERENCES users(id) | |
| )""" | |
| }, | |
| { | |
| "sql": """CREATE TABLE IF NOT EXISTS files ( | |
| id TEXT PRIMARY KEY, | |
| user_id TEXT NOT NULL, | |
| filename TEXT NOT NULL, | |
| file_key TEXT NOT NULL, | |
| file_size INTEGER, | |
| content_type TEXT, | |
| bucket TEXT DEFAULT 'storage', | |
| created_at INTEGER DEFAULT (strftime('%s', 'now')), | |
| FOREIGN KEY (user_id) REFERENCES users(id) | |
| )""" | |
| }, | |
| { | |
| "sql": """CREATE TABLE IF NOT EXISTS agent_executions ( | |
| id TEXT PRIMARY KEY, | |
| user_id TEXT NOT NULL, | |
| session_id TEXT, | |
| task_description TEXT, | |
| status TEXT DEFAULT 'pending', | |
| result TEXT, | |
| execution_time INTEGER, | |
| created_at INTEGER DEFAULT (strftime('%s', 'now')), | |
| completed_at INTEGER, | |
| FOREIGN KEY (user_id) REFERENCES users(id) | |
| )""" | |
| }, | |
| ] | |
| # Add indexes | |
| index_queries = [ | |
| { | |
| "sql": "CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions(user_id)" | |
| }, | |
| { | |
| "sql": "CREATE INDEX IF NOT EXISTS idx_conversations_user_id ON conversations(user_id)" | |
| }, | |
| {"sql": "CREATE INDEX IF NOT EXISTS idx_files_user_id ON files(user_id)"}, | |
| { | |
| "sql": "CREATE INDEX IF NOT EXISTS idx_agent_executions_user_id ON agent_executions(user_id)" | |
| }, | |
| ] | |
| all_queries = schema_queries + index_queries | |
| return await self.batch_execute(all_queries) | |