File size: 21,334 Bytes
fd22f8e
7196ae9
 
 
 
 
 
 
a72fec7
7196ae9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8117b70
7196ae9
 
 
 
 
 
 
 
 
 
 
 
 
 
8117b70
7196ae9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8117b70
7196ae9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8117b70
7196ae9
 
 
 
 
 
 
 
 
 
 
 
 
 
8117b70
7196ae9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8117b70
7196ae9
 
 
49f77e8
 
 
 
 
 
 
 
 
 
 
 
7196ae9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8117b70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49f77e8
 
 
 
 
 
 
 
 
 
 
7a1ebee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4386026
 
8117b70
7196ae9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
# ────────────────────────────── memo/plan/execution.py ──────────────────────────────
"""
Execution Engine

Handles memory retrieval execution based on planned strategies.
"""

from typing import List, Dict, Any, Tuple, Optional
import os

from utils.logger import get_logger
from utils.rag.embeddings import EmbeddingClient
from memo.plan.intent import QueryIntent
from memo.plan.strategy import MemoryStrategy

logger = get_logger("EXECUTION_ENGINE", __name__)

class ExecutionEngine:
    """Handles memory retrieval execution based on planned strategies"""
    
    def __init__(self, memory_system, embedder: EmbeddingClient):
        self.memory_system = memory_system
        self.embedder = embedder
    
    async def execute_memory_plan(self, user_id: str, question: str, execution_plan: Dict[str, Any],
                                 nvidia_rotator=None, project_id: Optional[str] = None) -> Tuple[str, str, Dict[str, Any]]:
        """
        Execute the planned memory retrieval strategy.
        
        Returns:
            Tuple of (recent_context, semantic_context, metadata)
        """
        try:
            params = execution_plan["retrieval_params"]
            strategy = execution_plan["strategy"]
            intent = execution_plan["intent"]
            
            # Execute based on strategy
            if strategy == MemoryStrategy.FOCUSED_QA:
                return await self._execute_focused_qa_retrieval(
                    user_id, question, params, nvidia_rotator, project_id
                )
            elif strategy == MemoryStrategy.RECENT_FOCUS:
                return await self._execute_recent_focus_retrieval(
                    user_id, question, params, nvidia_rotator, project_id
                )
            elif strategy == MemoryStrategy.BROAD_CONTEXT:
                return await self._execute_broad_context_retrieval(
                    user_id, question, params, nvidia_rotator, project_id
                )
            elif strategy == MemoryStrategy.SEMANTIC_DEEP:
                return await self._execute_semantic_deep_retrieval(
                    user_id, question, params, nvidia_rotator, project_id
                )
            else:  # MIXED_APPROACH
                return await self._execute_mixed_approach_retrieval(
                    user_id, question, params, nvidia_rotator, project_id
                )
                
        except Exception as e:
            logger.error(f"[EXECUTION_ENGINE] Plan execution failed: {e}")
            return "", "", {"error": str(e)}
    
    async def _execute_focused_qa_retrieval(self, user_id: str, question: str, params: Dict[str, Any],
                                          nvidia_rotator, project_id: Optional[str]) -> Tuple[str, str, Dict[str, Any]]:
        """Execute focused Q&A retrieval for enhancement requests"""
        try:
            recent_context = ""
            semantic_context = ""
            metadata = {"strategy": "focused_qa", "qa_focus": True}
            
            if self.memory_system.is_enhanced_available():
                # Get Q&A focused memories
                qa_memories = self.memory_system.enhanced_memory.get_memories(
                    user_id, memory_type="conversation", limit=params["recent_limit"]
                )
                
                if qa_memories:
                    # Use AI to select most relevant Q&A pairs for enhancement
                    if params["use_ai_selection"] and nvidia_rotator:
                        recent_context = await self._ai_select_qa_memories(
                            question, qa_memories, nvidia_rotator, "recent", user_id
                        )
                    else:
                        recent_context = await self._semantic_select_qa_memories(
                            question, qa_memories, params["similarity_threshold"]
                        )
                
                # Get additional semantic Q&A context
                all_memories = self.memory_system.enhanced_memory.get_memories(
                    user_id, limit=params["semantic_limit"]
                )
                
                if all_memories:
                    if params["use_ai_selection"] and nvidia_rotator:
                        semantic_context = await self._ai_select_qa_memories(
                            question, all_memories, nvidia_rotator, "semantic", user_id
                        )
                    else:
                        semantic_context = await self._semantic_select_qa_memories(
                            question, all_memories, params["similarity_threshold"]
                        )
            else:
                # Legacy fallback
                recent_memories = self.memory_system.recent(user_id, params["recent_limit"])
                rest_memories = self.memory_system.rest(user_id, params["recent_limit"])
                
                if recent_memories:
                    recent_context = await self._semantic_select_qa_memories(
                        question, [{"content": m} for m in recent_memories], params["similarity_threshold"]
                    )
                
                if rest_memories:
                    semantic_context = await self._semantic_select_qa_memories(
                        question, [{"content": m} for m in rest_memories], params["similarity_threshold"]
                    )
            
            metadata["enhancement_focus"] = True
            metadata["qa_memories_found"] = len(recent_context) > 0 or len(semantic_context) > 0
            
            return recent_context, semantic_context, metadata
            
        except Exception as e:
            logger.error(f"[EXECUTION_ENGINE] Focused Q&A retrieval failed: {e}")
            return "", "", {"error": str(e)}
    
    async def _execute_recent_focus_retrieval(self, user_id: str, question: str, params: Dict[str, Any],
                                            nvidia_rotator, project_id: Optional[str]) -> Tuple[str, str, Dict[str, Any]]:
        """Execute recent focus retrieval for clarification requests"""
        try:
            recent_context = ""
            semantic_context = ""
            metadata = {"strategy": "recent_focus"}
            
            if self.memory_system.is_enhanced_available():
                recent_memories = self.memory_system.enhanced_memory.get_memories(
                    user_id, memory_type="conversation", limit=params["recent_limit"]
                )
                
                if recent_memories:
                    recent_context = "\n\n".join([m["content"] for m in recent_memories])
                
                # Get some semantic context
                all_memories = self.memory_system.enhanced_memory.get_memories(
                    user_id, limit=params["semantic_limit"]
                )
                
                if all_memories:
                    semantic_context = await self._semantic_select_qa_memories(
                        question, all_memories, params["similarity_threshold"]
                    )
            else:
                # Legacy fallback
                recent_memories = self.memory_system.recent(user_id, params["recent_limit"])
                rest_memories = self.memory_system.rest(user_id, params["recent_limit"])
                
                recent_context = "\n\n".join(recent_memories)
                
                if rest_memories:
                    semantic_context = await self._semantic_select_qa_memories(
                        question, [{"content": m} for m in rest_memories], params["similarity_threshold"]
                    )
            
            return recent_context, semantic_context, metadata
            
        except Exception as e:
            logger.error(f"[EXECUTION_ENGINE] Recent focus retrieval failed: {e}")
            return "", "", {"error": str(e)}
    
    async def _execute_broad_context_retrieval(self, user_id: str, question: str, params: Dict[str, Any],
                                             nvidia_rotator, project_id: Optional[str]) -> Tuple[str, str, Dict[str, Any]]:
        """Execute broad context retrieval for comparison requests"""
        try:
            recent_context = ""
            semantic_context = ""
            metadata = {"strategy": "broad_context"}
            
            if self.memory_system.is_enhanced_available():
                # Get recent context
                recent_memories = self.memory_system.enhanced_memory.get_memories(
                    user_id, memory_type="conversation", limit=params["recent_limit"]
                )
                
                if recent_memories:
                    recent_context = "\n\n".join([m["content"] for m in recent_memories])
                
                # Get broad semantic context
                all_memories = self.memory_system.enhanced_memory.get_memories(
                    user_id, limit=params["semantic_limit"]
                )
                
                if all_memories:
                    semantic_context = await self._semantic_select_qa_memories(
                        question, all_memories, params["similarity_threshold"]
                    )
            else:
                # Legacy fallback
                recent_memories = self.memory_system.recent(user_id, params["recent_limit"])
                rest_memories = self.memory_system.rest(user_id, params["recent_limit"])
                
                recent_context = "\n\n".join(recent_memories)
                semantic_context = "\n\n".join(rest_memories)
            
            return recent_context, semantic_context, metadata
            
        except Exception as e:
            logger.error(f"[EXECUTION_ENGINE] Broad context retrieval failed: {e}")
            return "", "", {"error": str(e)}
    
    async def _execute_semantic_deep_retrieval(self, user_id: str, question: str, params: Dict[str, Any],
                                             nvidia_rotator, project_id: Optional[str]) -> Tuple[str, str, Dict[str, Any]]:
        """Execute semantic deep retrieval for new topics"""
        try:
            recent_context = ""
            semantic_context = ""
            metadata = {"strategy": "semantic_deep"}
            
            if self.memory_system.is_enhanced_available():
                # Get all memories for deep semantic search
                all_memories = self.memory_system.enhanced_memory.get_memories(
                    user_id, limit=params["semantic_limit"]
                )
                
                if all_memories:
                    if params["use_ai_selection"] and nvidia_rotator:
                        semantic_context = await self._ai_select_qa_memories(
                            question, all_memories, nvidia_rotator, "semantic", user_id
                        )
                    else:
                        semantic_context = await self._semantic_select_qa_memories(
                            question, all_memories, params["similarity_threshold"]
                        )
                
                # Get some recent context
                recent_memories = self.memory_system.enhanced_memory.get_memories(
                    user_id, memory_type="conversation", limit=params["recent_limit"]
                )
                
                if recent_memories:
                    recent_context = "\n\n".join([m["content"] for m in recent_memories])
            else:
                # Legacy fallback
                all_memories = self.memory_system.all(user_id)
                recent_memories = self.memory_system.recent(user_id, params["recent_limit"])
                
                if all_memories:
                    semantic_context = await self._semantic_select_qa_memories(
                        question, [{"content": m} for m in all_memories], params["similarity_threshold"]
                    )
                
                recent_context = "\n\n".join(recent_memories)
            
            return recent_context, semantic_context, metadata
            
        except Exception as e:
            logger.error(f"[EXECUTION_ENGINE] Semantic deep retrieval failed: {e}")
            return "", "", {"error": str(e)}
    
    async def _execute_mixed_approach_retrieval(self, user_id: str, question: str, params: Dict[str, Any],
                                              nvidia_rotator, project_id: Optional[str]) -> Tuple[str, str, Dict[str, Any]]:
        """Execute mixed approach retrieval for continuation requests"""
        try:
            recent_context = ""
            semantic_context = ""
            metadata = {"strategy": "mixed_approach"}
            
            if self.memory_system.is_enhanced_available():
                # Get recent context
                recent_memories = self.memory_system.enhanced_memory.get_memories(
                    user_id, memory_type="conversation", limit=params["recent_limit"]
                )
                
                if recent_memories:
                    if params["use_ai_selection"] and nvidia_rotator:
                        recent_context = await self._ai_select_qa_memories(
                            question, recent_memories, nvidia_rotator, "recent", user_id
                        )
                    else:
                        recent_context = await self._semantic_select_qa_memories(
                            question, recent_memories, params["similarity_threshold"]
                        )
                
                # Get semantic context
                all_memories = self.memory_system.enhanced_memory.get_memories(
                    user_id, limit=params["semantic_limit"]
                )
                
                if all_memories:
                    if params["use_ai_selection"] and nvidia_rotator:
                        semantic_context = await self._ai_select_qa_memories(
                            question, all_memories, nvidia_rotator, "semantic", user_id
                        )
                    else:
                        semantic_context = await self._semantic_select_qa_memories(
                            question, all_memories, params["similarity_threshold"]
                        )
            else:
                # Legacy fallback
                recent_memories = self.memory_system.recent(user_id, params["recent_limit"])
                rest_memories = self.memory_system.rest(user_id, params["recent_limit"])
                
                if recent_memories:
                    recent_context = await self._semantic_select_qa_memories(
                        question, [{"content": m} for m in recent_memories], params["similarity_threshold"]
                    )
                
                if rest_memories:
                    semantic_context = await self._semantic_select_qa_memories(
                        question, [{"content": m} for m in rest_memories], params["similarity_threshold"]
                    )
            
            return recent_context, semantic_context, metadata
            
        except Exception as e:
            logger.error(f"[EXECUTION_ENGINE] Mixed approach retrieval failed: {e}")
            return "", "", {"error": str(e)}
    
    async def _ai_select_qa_memories(self, question: str, memories: List[Dict[str, Any]], 
                                   nvidia_rotator, context_type: str, user_id: str = "") -> str:
        """Use AI to select the most relevant Q&A memories"""
        try:
            from utils.api.router import generate_answer_with_model
            from utils.analytics import get_analytics_tracker
            
            # Track memory agent usage
            tracker = get_analytics_tracker()
            if tracker:
                await tracker.track_agent_usage(
                    user_id=user_id,
                    agent_name="memory",
                    action="select",
                    context="memory_selection",
                    metadata={"question": question[:100], "memories_count": len(memories)}
                )
            
            if not memories:
                return ""
            
            sys_prompt = f"""You are an expert at selecting the most relevant Q&A memories for {context_type} context.

Given a user's question and a list of Q&A memories, select the most relevant ones that would help provide a comprehensive and detailed answer.

Focus on:
1. Direct relevance to the question
2. Q&A pairs that provide supporting information
3. Memories that add context and depth
4. Past discussions that relate to the current question

Return ONLY the selected Q&A memories, concatenated together. If none are relevant, return nothing."""
            
            # Format memories for AI
            formatted_memories = []
            for i, memory in enumerate(memories):
                content = memory.get("content", "")
                if content:
                    formatted_memories.append(f"Memory {i+1}: {content}")
            
            user_prompt = f"""Question: {question}

Available Q&A Memories:
{chr(10).join(formatted_memories)}

Select the most relevant Q&A memories:"""
            
            # Track memory agent usage
            try:
                from utils.analytics import get_analytics_tracker
                tracker = get_analytics_tracker()
                if tracker and user_id:
                    await tracker.track_agent_usage(
                        user_id=user_id,
                        agent_name="memory",
                        action="select",
                        context="memory_selection",
                        metadata={"context_type": context_type, "memories_count": len(memories)}
                    )
            except Exception:
                pass
            
            # Track memory agent usage
            tracker = get_analytics_tracker()
            if tracker:
                await tracker.track_agent_usage(
                    user_id=user_id,
                    agent_name="memory",
                    action="select",
                    context="memory_selection",
                    metadata={"question": question[:100], "memories_count": len(memories)}
                )
            
            # Track memo agent usage
            try:
                from utils.analytics import get_analytics_tracker
                tracker = get_analytics_tracker()
                if tracker:
                    await tracker.track_agent_usage(
                        user_id=user_id,
                        agent_name="memo",
                        action="select",
                        context="memory_selection",
                        metadata={"query": query}
                    )
            except Exception:
                pass
            
            # Use Qwen for better memory selection reasoning
            from utils.api.router import qwen_chat_completion
            response = await qwen_chat_completion(sys_prompt, user_prompt, nvidia_rotator, user_id, "memory_selection")
            
            return response.strip()
            
        except Exception as e:
            logger.warning(f"[EXECUTION_ENGINE] AI Q&A selection failed: {e}")
            return ""
    
    async def _semantic_select_qa_memories(self, question: str, memories: List[Dict[str, Any]], 
                                         threshold: float) -> str:
        """Use semantic similarity to select Q&A memories"""
        try:
            if not memories:
                return ""
            
            # Extract content from memories
            memory_contents = [memory.get("content", "") for memory in memories if memory.get("content")]
            
            if not memory_contents:
                return ""
            
            # Use semantic similarity
            from memo.context import semantic_context
            selected = await semantic_context(question, memory_contents, self.embedder, len(memory_contents))
            
            return selected
            
        except Exception as e:
            logger.warning(f"[EXECUTION_ENGINE] Semantic Q&A selection failed: {e}")
            return ""


# ────────────────────────────── Global Instance ──────────────────────────────

_execution_engine: Optional[ExecutionEngine] = None

def get_execution_engine(memory_system=None, embedder: EmbeddingClient = None) -> ExecutionEngine:
    """Get the global execution engine instance"""
    global _execution_engine
    
    if _execution_engine is None:
        if not memory_system:
            from memo.core import get_memory_system
            memory_system = get_memory_system()
        if not embedder:
            from utils.rag.embeddings import EmbeddingClient
            embedder = EmbeddingClient()
        
        _execution_engine = ExecutionEngine(memory_system, embedder)
        logger.info("[EXECUTION_ENGINE] Global execution engine initialized")
    
    return _execution_engine