EdSummariser / memo /planning.py
LiamKhoaLe's picture
Upd OS import
a72fec7
# ────────────────────────────── memo/planning.py ──────────────────────────────
"""
Memory Planning Coordinator
Main coordinator for the memory planning system that orchestrates
intent detection, strategy planning, and execution.
"""
from typing import List, Dict, Any, Tuple, Optional
import os
from utils.logger import get_logger
from utils.rag.embeddings import EmbeddingClient
from memo.plan.intent import QueryIntent, get_intent_detector
from memo.plan.strategy import MemoryStrategy, get_strategy_planner
from memo.plan.execution import get_execution_engine
logger = get_logger("MEMORY_PLANNER", __name__)
class MemoryPlanner:
"""
Main coordinator for memory planning system.
Orchestrates intent detection, strategy planning, and execution.
"""
def __init__(self, memory_system, embedder: EmbeddingClient):
self.memory_system = memory_system
self.embedder = embedder
self.intent_detector = get_intent_detector()
self.strategy_planner = get_strategy_planner()
self.execution_engine = get_execution_engine(memory_system, embedder)
async def plan_memory_strategy(self, user_id: str, question: str,
nvidia_rotator=None, project_id: Optional[str] = None) -> Dict[str, Any]:
"""Plan the optimal memory retrieval strategy based on user intent and context"""
try:
# Detect user intent
intent = await self.intent_detector.detect_intent(question, nvidia_rotator)
# Get conversation context for better planning
conversation_context = await self._get_conversation_context(user_id, question)
# Determine memory strategy based on intent and context
strategy = self.strategy_planner.determine_strategy(intent, question, conversation_context)
# Plan specific retrieval parameters
retrieval_params = self.strategy_planner.plan_retrieval_parameters(
user_id, question, intent, strategy, conversation_context, nvidia_rotator
)
# Create execution plan
execution_plan = {
"intent": intent,
"strategy": strategy,
"retrieval_params": retrieval_params,
"conversation_context": conversation_context,
"enhancement_focus": intent == QueryIntent.ENHANCEMENT,
"qa_focus": intent in [QueryIntent.ENHANCEMENT, QueryIntent.CLARIFICATION, QueryIntent.REFERENCE]
}
logger.info(f"[MEMORY_PLANNER] Planned strategy: {strategy.value} for intent: {intent.value}")
return execution_plan
except Exception as e:
logger.error(f"[MEMORY_PLANNER] Memory planning failed: {e}")
return self.strategy_planner.get_fallback_plan()
async def execute_memory_plan(self, user_id: str, question: str, execution_plan: Dict[str, Any],
nvidia_rotator=None, project_id: Optional[str] = None) -> Tuple[str, str, Dict[str, Any]]:
"""Execute the planned memory retrieval strategy"""
try:
return await self.execution_engine.execute_memory_plan(
user_id, question, execution_plan, nvidia_rotator, project_id
)
except Exception as e:
logger.error(f"[MEMORY_PLANNER] Plan execution failed: {e}")
return "", "", {"error": str(e)}
async def _get_conversation_context(self, user_id: str, question: str) -> Dict[str, Any]:
"""Get conversation context for better planning"""
try:
context = {
"has_recent_memories": False,
"memory_count": 0,
"conversation_depth": 0,
"last_question": "",
"is_continuation": False
}
if self.memory_system.is_enhanced_available():
# Get enhanced memory stats
stats = self.memory_system.get_memory_stats(user_id)
context["memory_count"] = stats.get("total_memories", 0)
# Get recent memories
recent_memories = self.memory_system.enhanced_memory.get_memories(
user_id, memory_type="conversation", limit=5
)
context["has_recent_memories"] = len(recent_memories) > 0
if recent_memories:
context["last_question"] = recent_memories[0].get("content", "")
else:
# Legacy memory stats
recent_memories = self.memory_system.recent(user_id, 3)
context["has_recent_memories"] = len(recent_memories) > 0
context["memory_count"] = len(self.memory_system.all(user_id))
if recent_memories:
context["last_question"] = recent_memories[0]
return context
except Exception as e:
logger.warning(f"[MEMORY_PLANNER] Context retrieval failed: {e}")
return {
"has_recent_memories": False,
"memory_count": 0,
"conversation_depth": 0,
"last_question": "",
"is_continuation": False
}
# ────────────────────────────── Global Instance ──────────────────────────────
_memory_planner: Optional[MemoryPlanner] = None
def get_memory_planner(memory_system=None, embedder: EmbeddingClient = None) -> MemoryPlanner:
"""Get the global memory planner instance"""
global _memory_planner
if _memory_planner is None:
if not memory_system:
from memo.core import get_memory_system
memory_system = get_memory_system()
if not embedder:
from utils.rag.embeddings import EmbeddingClient
embedder = EmbeddingClient()
_memory_planner = MemoryPlanner(memory_system, embedder)
logger.info("[MEMORY_PLANNER] Global memory planner initialized")
return _memory_planner