File size: 6,433 Bytes
f80e242
 
7196ae9
f80e242
7196ae9
 
f80e242
 
7196ae9
a72fec7
f80e242
 
 
7196ae9
 
 
f80e242
 
 
 
 
7196ae9
 
f80e242
 
 
 
 
7196ae9
 
 
f80e242
 
 
7196ae9
f80e242
 
7196ae9
f80e242
 
 
 
 
7196ae9
f80e242
 
7196ae9
f80e242
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7196ae9
f80e242
7196ae9
 
 
f80e242
7196ae9
 
f80e242
 
7196ae9
 
f80e242
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7196ae9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# ────────────────────────────── memo/planning.py ──────────────────────────────
"""
Memory Planning Coordinator

Main coordinator for the memory planning system that orchestrates
intent detection, strategy planning, and execution.
"""

from typing import List, Dict, Any, Tuple, Optional
import os

from utils.logger import get_logger
from utils.rag.embeddings import EmbeddingClient
from memo.plan.intent import QueryIntent, get_intent_detector
from memo.plan.strategy import MemoryStrategy, get_strategy_planner
from memo.plan.execution import get_execution_engine

logger = get_logger("MEMORY_PLANNER", __name__)

class MemoryPlanner:
    """
    Main coordinator for memory planning system.
    Orchestrates intent detection, strategy planning, and execution.
    """
    
    def __init__(self, memory_system, embedder: EmbeddingClient):
        self.memory_system = memory_system
        self.embedder = embedder
        self.intent_detector = get_intent_detector()
        self.strategy_planner = get_strategy_planner()
        self.execution_engine = get_execution_engine(memory_system, embedder)
    
    async def plan_memory_strategy(self, user_id: str, question: str, 
                                 nvidia_rotator=None, project_id: Optional[str] = None) -> Dict[str, Any]:
        """Plan the optimal memory retrieval strategy based on user intent and context"""
        try:
            # Detect user intent
            intent = await self.intent_detector.detect_intent(question, nvidia_rotator)
            
            # Get conversation context for better planning
            conversation_context = await self._get_conversation_context(user_id, question)
            
            # Determine memory strategy based on intent and context
            strategy = self.strategy_planner.determine_strategy(intent, question, conversation_context)
            
            # Plan specific retrieval parameters
            retrieval_params = self.strategy_planner.plan_retrieval_parameters(
                user_id, question, intent, strategy, conversation_context, nvidia_rotator
            )
            
            # Create execution plan
            execution_plan = {
                "intent": intent,
                "strategy": strategy,
                "retrieval_params": retrieval_params,
                "conversation_context": conversation_context,
                "enhancement_focus": intent == QueryIntent.ENHANCEMENT,
                "qa_focus": intent in [QueryIntent.ENHANCEMENT, QueryIntent.CLARIFICATION, QueryIntent.REFERENCE]
            }
            
            logger.info(f"[MEMORY_PLANNER] Planned strategy: {strategy.value} for intent: {intent.value}")
            return execution_plan
            
        except Exception as e:
            logger.error(f"[MEMORY_PLANNER] Memory planning failed: {e}")
            return self.strategy_planner.get_fallback_plan()
    
    async def execute_memory_plan(self, user_id: str, question: str, execution_plan: Dict[str, Any],
                                 nvidia_rotator=None, project_id: Optional[str] = None) -> Tuple[str, str, Dict[str, Any]]:
        """Execute the planned memory retrieval strategy"""
        try:
            return await self.execution_engine.execute_memory_plan(
                user_id, question, execution_plan, nvidia_rotator, project_id
            )
        except Exception as e:
            logger.error(f"[MEMORY_PLANNER] Plan execution failed: {e}")
            return "", "", {"error": str(e)}
    
    async def _get_conversation_context(self, user_id: str, question: str) -> Dict[str, Any]:
        """Get conversation context for better planning"""
        try:
            context = {
                "has_recent_memories": False,
                "memory_count": 0,
                "conversation_depth": 0,
                "last_question": "",
                "is_continuation": False
            }
            
            if self.memory_system.is_enhanced_available():
                # Get enhanced memory stats
                stats = self.memory_system.get_memory_stats(user_id)
                context["memory_count"] = stats.get("total_memories", 0)
                
                # Get recent memories
                recent_memories = self.memory_system.enhanced_memory.get_memories(
                    user_id, memory_type="conversation", limit=5
                )
                context["has_recent_memories"] = len(recent_memories) > 0
                
                if recent_memories:
                    context["last_question"] = recent_memories[0].get("content", "")
            else:
                # Legacy memory stats
                recent_memories = self.memory_system.recent(user_id, 3)
                context["has_recent_memories"] = len(recent_memories) > 0
                context["memory_count"] = len(self.memory_system.all(user_id))
                
                if recent_memories:
                    context["last_question"] = recent_memories[0]
            
            return context
            
        except Exception as e:
            logger.warning(f"[MEMORY_PLANNER] Context retrieval failed: {e}")
            return {
                "has_recent_memories": False,
                "memory_count": 0,
                "conversation_depth": 0,
                "last_question": "",
                "is_continuation": False
            }


# ────────────────────────────── Global Instance ──────────────────────────────

_memory_planner: Optional[MemoryPlanner] = None

def get_memory_planner(memory_system=None, embedder: EmbeddingClient = None) -> MemoryPlanner:
    """Get the global memory planner instance"""
    global _memory_planner
    
    if _memory_planner is None:
        if not memory_system:
            from memo.core import get_memory_system
            memory_system = get_memory_system()
        if not embedder:
            from utils.rag.embeddings import EmbeddingClient
            embedder = EmbeddingClient()
        
        _memory_planner = MemoryPlanner(memory_system, embedder)
        logger.info("[MEMORY_PLANNER] Global memory planner initialized")
    
    return _memory_planner