Spaces:
Sleeping
Sleeping
| # ────────────────────────────── memo/plan/execution.py ────────────────────────────── | |
| """ | |
| Execution Engine | |
| Handles memory retrieval execution based on planned strategies. | |
| """ | |
| from typing import List, Dict, Any, Tuple, Optional | |
| import os | |
| from utils.logger import get_logger | |
| from utils.rag.embeddings import EmbeddingClient | |
| from memo.plan.intent import QueryIntent | |
| from memo.plan.strategy import MemoryStrategy | |
| logger = get_logger("EXECUTION_ENGINE", __name__) | |
| class ExecutionEngine: | |
| """Handles memory retrieval execution based on planned strategies""" | |
| def __init__(self, memory_system, embedder: EmbeddingClient): | |
| self.memory_system = memory_system | |
| self.embedder = embedder | |
| async def execute_memory_plan(self, user_id: str, question: str, execution_plan: Dict[str, Any], | |
| nvidia_rotator=None, project_id: Optional[str] = None) -> Tuple[str, str, Dict[str, Any]]: | |
| """ | |
| Execute the planned memory retrieval strategy. | |
| Returns: | |
| Tuple of (recent_context, semantic_context, metadata) | |
| """ | |
| try: | |
| params = execution_plan["retrieval_params"] | |
| strategy = execution_plan["strategy"] | |
| intent = execution_plan["intent"] | |
| # Execute based on strategy | |
| if strategy == MemoryStrategy.FOCUSED_QA: | |
| return await self._execute_focused_qa_retrieval( | |
| user_id, question, params, nvidia_rotator, project_id | |
| ) | |
| elif strategy == MemoryStrategy.RECENT_FOCUS: | |
| return await self._execute_recent_focus_retrieval( | |
| user_id, question, params, nvidia_rotator, project_id | |
| ) | |
| elif strategy == MemoryStrategy.BROAD_CONTEXT: | |
| return await self._execute_broad_context_retrieval( | |
| user_id, question, params, nvidia_rotator, project_id | |
| ) | |
| elif strategy == MemoryStrategy.SEMANTIC_DEEP: | |
| return await self._execute_semantic_deep_retrieval( | |
| user_id, question, params, nvidia_rotator, project_id | |
| ) | |
| else: # MIXED_APPROACH | |
| return await self._execute_mixed_approach_retrieval( | |
| user_id, question, params, nvidia_rotator, project_id | |
| ) | |
| except Exception as e: | |
| logger.error(f"[EXECUTION_ENGINE] Plan execution failed: {e}") | |
| return "", "", {"error": str(e)} | |
| async def _execute_focused_qa_retrieval(self, user_id: str, question: str, params: Dict[str, Any], | |
| nvidia_rotator, project_id: Optional[str]) -> Tuple[str, str, Dict[str, Any]]: | |
| """Execute focused Q&A retrieval for enhancement requests""" | |
| try: | |
| recent_context = "" | |
| semantic_context = "" | |
| metadata = {"strategy": "focused_qa", "qa_focus": True} | |
| if self.memory_system.is_enhanced_available(): | |
| # Get Q&A focused memories | |
| qa_memories = self.memory_system.enhanced_memory.get_memories( | |
| user_id, memory_type="conversation", limit=params["recent_limit"] | |
| ) | |
| if qa_memories: | |
| # Use AI to select most relevant Q&A pairs for enhancement | |
| if params["use_ai_selection"] and nvidia_rotator: | |
| recent_context = await self._ai_select_qa_memories( | |
| question, qa_memories, nvidia_rotator, "recent", user_id | |
| ) | |
| else: | |
| recent_context = await self._semantic_select_qa_memories( | |
| question, qa_memories, params["similarity_threshold"] | |
| ) | |
| # Get additional semantic Q&A context | |
| all_memories = self.memory_system.enhanced_memory.get_memories( | |
| user_id, limit=params["semantic_limit"] | |
| ) | |
| if all_memories: | |
| if params["use_ai_selection"] and nvidia_rotator: | |
| semantic_context = await self._ai_select_qa_memories( | |
| question, all_memories, nvidia_rotator, "semantic", user_id | |
| ) | |
| else: | |
| semantic_context = await self._semantic_select_qa_memories( | |
| question, all_memories, params["similarity_threshold"] | |
| ) | |
| else: | |
| # Legacy fallback | |
| recent_memories = self.memory_system.recent(user_id, params["recent_limit"]) | |
| rest_memories = self.memory_system.rest(user_id, params["recent_limit"]) | |
| if recent_memories: | |
| recent_context = await self._semantic_select_qa_memories( | |
| question, [{"content": m} for m in recent_memories], params["similarity_threshold"] | |
| ) | |
| if rest_memories: | |
| semantic_context = await self._semantic_select_qa_memories( | |
| question, [{"content": m} for m in rest_memories], params["similarity_threshold"] | |
| ) | |
| metadata["enhancement_focus"] = True | |
| metadata["qa_memories_found"] = len(recent_context) > 0 or len(semantic_context) > 0 | |
| return recent_context, semantic_context, metadata | |
| except Exception as e: | |
| logger.error(f"[EXECUTION_ENGINE] Focused Q&A retrieval failed: {e}") | |
| return "", "", {"error": str(e)} | |
| async def _execute_recent_focus_retrieval(self, user_id: str, question: str, params: Dict[str, Any], | |
| nvidia_rotator, project_id: Optional[str]) -> Tuple[str, str, Dict[str, Any]]: | |
| """Execute recent focus retrieval for clarification requests""" | |
| try: | |
| recent_context = "" | |
| semantic_context = "" | |
| metadata = {"strategy": "recent_focus"} | |
| if self.memory_system.is_enhanced_available(): | |
| recent_memories = self.memory_system.enhanced_memory.get_memories( | |
| user_id, memory_type="conversation", limit=params["recent_limit"] | |
| ) | |
| if recent_memories: | |
| recent_context = "\n\n".join([m["content"] for m in recent_memories]) | |
| # Get some semantic context | |
| all_memories = self.memory_system.enhanced_memory.get_memories( | |
| user_id, limit=params["semantic_limit"] | |
| ) | |
| if all_memories: | |
| semantic_context = await self._semantic_select_qa_memories( | |
| question, all_memories, params["similarity_threshold"] | |
| ) | |
| else: | |
| # Legacy fallback | |
| recent_memories = self.memory_system.recent(user_id, params["recent_limit"]) | |
| rest_memories = self.memory_system.rest(user_id, params["recent_limit"]) | |
| recent_context = "\n\n".join(recent_memories) | |
| if rest_memories: | |
| semantic_context = await self._semantic_select_qa_memories( | |
| question, [{"content": m} for m in rest_memories], params["similarity_threshold"] | |
| ) | |
| return recent_context, semantic_context, metadata | |
| except Exception as e: | |
| logger.error(f"[EXECUTION_ENGINE] Recent focus retrieval failed: {e}") | |
| return "", "", {"error": str(e)} | |
| async def _execute_broad_context_retrieval(self, user_id: str, question: str, params: Dict[str, Any], | |
| nvidia_rotator, project_id: Optional[str]) -> Tuple[str, str, Dict[str, Any]]: | |
| """Execute broad context retrieval for comparison requests""" | |
| try: | |
| recent_context = "" | |
| semantic_context = "" | |
| metadata = {"strategy": "broad_context"} | |
| if self.memory_system.is_enhanced_available(): | |
| # Get recent context | |
| recent_memories = self.memory_system.enhanced_memory.get_memories( | |
| user_id, memory_type="conversation", limit=params["recent_limit"] | |
| ) | |
| if recent_memories: | |
| recent_context = "\n\n".join([m["content"] for m in recent_memories]) | |
| # Get broad semantic context | |
| all_memories = self.memory_system.enhanced_memory.get_memories( | |
| user_id, limit=params["semantic_limit"] | |
| ) | |
| if all_memories: | |
| semantic_context = await self._semantic_select_qa_memories( | |
| question, all_memories, params["similarity_threshold"] | |
| ) | |
| else: | |
| # Legacy fallback | |
| recent_memories = self.memory_system.recent(user_id, params["recent_limit"]) | |
| rest_memories = self.memory_system.rest(user_id, params["recent_limit"]) | |
| recent_context = "\n\n".join(recent_memories) | |
| semantic_context = "\n\n".join(rest_memories) | |
| return recent_context, semantic_context, metadata | |
| except Exception as e: | |
| logger.error(f"[EXECUTION_ENGINE] Broad context retrieval failed: {e}") | |
| return "", "", {"error": str(e)} | |
| async def _execute_semantic_deep_retrieval(self, user_id: str, question: str, params: Dict[str, Any], | |
| nvidia_rotator, project_id: Optional[str]) -> Tuple[str, str, Dict[str, Any]]: | |
| """Execute semantic deep retrieval for new topics""" | |
| try: | |
| recent_context = "" | |
| semantic_context = "" | |
| metadata = {"strategy": "semantic_deep"} | |
| if self.memory_system.is_enhanced_available(): | |
| # Get all memories for deep semantic search | |
| all_memories = self.memory_system.enhanced_memory.get_memories( | |
| user_id, limit=params["semantic_limit"] | |
| ) | |
| if all_memories: | |
| if params["use_ai_selection"] and nvidia_rotator: | |
| semantic_context = await self._ai_select_qa_memories( | |
| question, all_memories, nvidia_rotator, "semantic", user_id | |
| ) | |
| else: | |
| semantic_context = await self._semantic_select_qa_memories( | |
| question, all_memories, params["similarity_threshold"] | |
| ) | |
| # Get some recent context | |
| recent_memories = self.memory_system.enhanced_memory.get_memories( | |
| user_id, memory_type="conversation", limit=params["recent_limit"] | |
| ) | |
| if recent_memories: | |
| recent_context = "\n\n".join([m["content"] for m in recent_memories]) | |
| else: | |
| # Legacy fallback | |
| all_memories = self.memory_system.all(user_id) | |
| recent_memories = self.memory_system.recent(user_id, params["recent_limit"]) | |
| if all_memories: | |
| semantic_context = await self._semantic_select_qa_memories( | |
| question, [{"content": m} for m in all_memories], params["similarity_threshold"] | |
| ) | |
| recent_context = "\n\n".join(recent_memories) | |
| return recent_context, semantic_context, metadata | |
| except Exception as e: | |
| logger.error(f"[EXECUTION_ENGINE] Semantic deep retrieval failed: {e}") | |
| return "", "", {"error": str(e)} | |
| async def _execute_mixed_approach_retrieval(self, user_id: str, question: str, params: Dict[str, Any], | |
| nvidia_rotator, project_id: Optional[str]) -> Tuple[str, str, Dict[str, Any]]: | |
| """Execute mixed approach retrieval for continuation requests""" | |
| try: | |
| recent_context = "" | |
| semantic_context = "" | |
| metadata = {"strategy": "mixed_approach"} | |
| if self.memory_system.is_enhanced_available(): | |
| # Get recent context | |
| recent_memories = self.memory_system.enhanced_memory.get_memories( | |
| user_id, memory_type="conversation", limit=params["recent_limit"] | |
| ) | |
| if recent_memories: | |
| if params["use_ai_selection"] and nvidia_rotator: | |
| recent_context = await self._ai_select_qa_memories( | |
| question, recent_memories, nvidia_rotator, "recent", user_id | |
| ) | |
| else: | |
| recent_context = await self._semantic_select_qa_memories( | |
| question, recent_memories, params["similarity_threshold"] | |
| ) | |
| # Get semantic context | |
| all_memories = self.memory_system.enhanced_memory.get_memories( | |
| user_id, limit=params["semantic_limit"] | |
| ) | |
| if all_memories: | |
| if params["use_ai_selection"] and nvidia_rotator: | |
| semantic_context = await self._ai_select_qa_memories( | |
| question, all_memories, nvidia_rotator, "semantic", user_id | |
| ) | |
| else: | |
| semantic_context = await self._semantic_select_qa_memories( | |
| question, all_memories, params["similarity_threshold"] | |
| ) | |
| else: | |
| # Legacy fallback | |
| recent_memories = self.memory_system.recent(user_id, params["recent_limit"]) | |
| rest_memories = self.memory_system.rest(user_id, params["recent_limit"]) | |
| if recent_memories: | |
| recent_context = await self._semantic_select_qa_memories( | |
| question, [{"content": m} for m in recent_memories], params["similarity_threshold"] | |
| ) | |
| if rest_memories: | |
| semantic_context = await self._semantic_select_qa_memories( | |
| question, [{"content": m} for m in rest_memories], params["similarity_threshold"] | |
| ) | |
| return recent_context, semantic_context, metadata | |
| except Exception as e: | |
| logger.error(f"[EXECUTION_ENGINE] Mixed approach retrieval failed: {e}") | |
| return "", "", {"error": str(e)} | |
| async def _ai_select_qa_memories(self, question: str, memories: List[Dict[str, Any]], | |
| nvidia_rotator, context_type: str, user_id: str = "") -> str: | |
| """Use AI to select the most relevant Q&A memories""" | |
| try: | |
| from utils.api.router import generate_answer_with_model | |
| from utils.analytics import get_analytics_tracker | |
| # Track memory agent usage | |
| tracker = get_analytics_tracker() | |
| if tracker: | |
| await tracker.track_agent_usage( | |
| user_id=user_id, | |
| agent_name="memory", | |
| action="select", | |
| context="memory_selection", | |
| metadata={"question": question[:100], "memories_count": len(memories)} | |
| ) | |
| if not memories: | |
| return "" | |
| sys_prompt = f"""You are an expert at selecting the most relevant Q&A memories for {context_type} context. | |
| Given a user's question and a list of Q&A memories, select the most relevant ones that would help provide a comprehensive and detailed answer. | |
| Focus on: | |
| 1. Direct relevance to the question | |
| 2. Q&A pairs that provide supporting information | |
| 3. Memories that add context and depth | |
| 4. Past discussions that relate to the current question | |
| Return ONLY the selected Q&A memories, concatenated together. If none are relevant, return nothing.""" | |
| # Format memories for AI | |
| formatted_memories = [] | |
| for i, memory in enumerate(memories): | |
| content = memory.get("content", "") | |
| if content: | |
| formatted_memories.append(f"Memory {i+1}: {content}") | |
| user_prompt = f"""Question: {question} | |
| Available Q&A Memories: | |
| {chr(10).join(formatted_memories)} | |
| Select the most relevant Q&A memories:""" | |
| # Track memory agent usage | |
| try: | |
| from utils.analytics import get_analytics_tracker | |
| tracker = get_analytics_tracker() | |
| if tracker and user_id: | |
| await tracker.track_agent_usage( | |
| user_id=user_id, | |
| agent_name="memory", | |
| action="select", | |
| context="memory_selection", | |
| metadata={"context_type": context_type, "memories_count": len(memories)} | |
| ) | |
| except Exception: | |
| pass | |
| # Track memory agent usage | |
| tracker = get_analytics_tracker() | |
| if tracker: | |
| await tracker.track_agent_usage( | |
| user_id=user_id, | |
| agent_name="memory", | |
| action="select", | |
| context="memory_selection", | |
| metadata={"question": question[:100], "memories_count": len(memories)} | |
| ) | |
| # Track memo agent usage | |
| try: | |
| from utils.analytics import get_analytics_tracker | |
| tracker = get_analytics_tracker() | |
| if tracker: | |
| await tracker.track_agent_usage( | |
| user_id=user_id, | |
| agent_name="memo", | |
| action="select", | |
| context="memory_selection", | |
| metadata={"query": query} | |
| ) | |
| except Exception: | |
| pass | |
| # Use Qwen for better memory selection reasoning | |
| from utils.api.router import qwen_chat_completion | |
| response = await qwen_chat_completion(sys_prompt, user_prompt, nvidia_rotator, user_id, "memory_selection") | |
| return response.strip() | |
| except Exception as e: | |
| logger.warning(f"[EXECUTION_ENGINE] AI Q&A selection failed: {e}") | |
| return "" | |
| async def _semantic_select_qa_memories(self, question: str, memories: List[Dict[str, Any]], | |
| threshold: float) -> str: | |
| """Use semantic similarity to select Q&A memories""" | |
| try: | |
| if not memories: | |
| return "" | |
| # Extract content from memories | |
| memory_contents = [memory.get("content", "") for memory in memories if memory.get("content")] | |
| if not memory_contents: | |
| return "" | |
| # Use semantic similarity | |
| from memo.context import semantic_context | |
| selected = await semantic_context(question, memory_contents, self.embedder, len(memory_contents)) | |
| return selected | |
| except Exception as e: | |
| logger.warning(f"[EXECUTION_ENGINE] Semantic Q&A selection failed: {e}") | |
| return "" | |
| # ────────────────────────────── Global Instance ────────────────────────────── | |
| _execution_engine: Optional[ExecutionEngine] = None | |
| def get_execution_engine(memory_system=None, embedder: EmbeddingClient = None) -> ExecutionEngine: | |
| """Get the global execution engine instance""" | |
| global _execution_engine | |
| if _execution_engine is None: | |
| if not memory_system: | |
| from memo.core import get_memory_system | |
| memory_system = get_memory_system() | |
| if not embedder: | |
| from utils.rag.embeddings import EmbeddingClient | |
| embedder = EmbeddingClient() | |
| _execution_engine = ExecutionEngine(memory_system, embedder) | |
| logger.info("[EXECUTION_ENGINE] Global execution engine initialized") | |
| return _execution_engine | |