File size: 13,950 Bytes
aa9003d
 
 
 
 
 
 
 
 
 
 
 
6d2a17c
aa9003d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8f17704
aa9003d
 
8f17704
aa9003d
 
 
8f17704
 
 
aa9003d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8f17704
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aa9003d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3855268
aa9003d
 
 
 
 
 
 
 
 
3855268
 
 
 
 
 
 
 
 
 
 
aa9003d
 
 
 
 
 
 
 
3855268
 
 
 
 
 
 
 
 
 
 
 
 
aa9003d
 
3855268
aa9003d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# ────────────────────────────── memo/core.py ──────────────────────────────
"""
Core Memory System

Main memory system that provides both legacy and enhanced functionality.
"""

import os
import asyncio
from typing import List, Dict, Any, Optional, Tuple

from utils.logger import get_logger
from utils.rag.embeddings import EmbeddingClient
from memo.legacy import MemoryLRU
from memo.persistent import PersistentMemory

logger = get_logger("CORE_MEMORY", __name__)

class MemorySystem:
    """
    Main memory system that provides both legacy and enhanced functionality.
    Automatically uses enhanced features when MongoDB is available.
    """
    
    def __init__(self, mongo_uri: str = None, db_name: str = "studybuddy"):
        self.mongo_uri = mongo_uri or os.getenv("MONGO_URI", "mongodb://localhost:27017")
        self.db_name = db_name
        
        # Initialize legacy memory system (always available)
        self.legacy_memory = MemoryLRU()
        
        # Initialize enhanced memory system if MongoDB is available
        self.enhanced_available = False
        self.enhanced_memory = None
        self.embedder = None
        
        try:
            self.embedder = EmbeddingClient()
            self.enhanced_memory = PersistentMemory(self.mongo_uri, self.db_name, self.embedder)
            self.enhanced_available = True
            logger.info("[CORE_MEMORY] Enhanced memory system initialized")
        except Exception as e:
            logger.warning(f"[CORE_MEMORY] Enhanced memory system unavailable: {e}")
            self.enhanced_available = False
        
        logger.info(f"[CORE_MEMORY] Initialized with enhanced_available={self.enhanced_available}")
    
    # ────────────────────────────── Core Memory Operations ──────────────────────────────
    
    def add(self, user_id: str, qa_summary: str):
        """Add a Q&A summary to memory (backward compatibility)"""
        try:
            # Add to legacy memory
            self.legacy_memory.add(user_id, qa_summary)
            
            # Also add to enhanced memory if available
            if self.enhanced_available:
                # Extract question and answer from summary
                lines = qa_summary.split('\n')
                question = ""
                answer = ""
                
                for line in lines:
                    if line.strip().lower().startswith('q:'):
                        question = line.strip()[2:].strip()
                    elif line.strip().lower().startswith('a:'):
                        answer = line.strip()[2:].strip()
                
                if question and answer:
                    asyncio.create_task(self._add_enhanced_memory(user_id, question, answer))
            
            logger.debug(f"[CORE_MEMORY] Added memory for user {user_id}")
        except Exception as e:
            logger.error(f"[CORE_MEMORY] Failed to add memory: {e}")
    
    def recent(self, user_id: str, n: int = 3) -> List[str]:
        """Get recent memories (backward compatibility)"""
        return self.legacy_memory.recent(user_id, n)
    
    def rest(self, user_id: str, skip_n: int = 3) -> List[str]:
        """Get remaining memories excluding recent ones (backward compatibility)"""
        return self.legacy_memory.rest(user_id, skip_n)
    
    def all(self, user_id: str) -> List[str]:
        """Get all memories for a user (backward compatibility)"""
        return self.legacy_memory.all(user_id)
    
    def clear(self, user_id: str) -> None:
        """Clear all memories for a user (backward compatibility)"""
        self.legacy_memory.clear(user_id)
        
        # Also clear enhanced memory if available
        if self.enhanced_available:
            try:
                self.enhanced_memory.clear_user_memories(user_id)
                logger.info(f"[CORE_MEMORY] Cleared enhanced memory for user {user_id}")
            except Exception as e:
                logger.warning(f"[CORE_MEMORY] Failed to clear enhanced memory: {e}")
    
    def is_enhanced_available(self) -> bool:
        """Check if enhanced memory features are available"""
        return self.enhanced_available
    
    # ────────────────────────────── Enhanced Features ──────────────────────────────
    
    async def add_conversation_memory(self, user_id: str, question: str, answer: str,
                                    project_id: Optional[str] = None,
                                    context: Dict[str, Any] = None) -> str:
        """Add conversation memory with enhanced context"""
        if not self.enhanced_available:
            logger.warning("[CORE_MEMORY] Enhanced features not available")
            return ""
        
        try:
            memory_id = self.enhanced_memory.add_memory(
                user_id=user_id,
                content=f"Q: {question}\nA: {answer}",
                memory_type="conversation",
                project_id=project_id,
                importance="medium",
                tags=["conversation", "qa"],
                metadata=context or {}
            )
            return memory_id
        except Exception as e:
            logger.error(f"[CORE_MEMORY] Failed to add conversation memory: {e}")
            return ""
    
    async def get_conversation_context(self, user_id: str, question: str,
                                     project_id: Optional[str] = None) -> Tuple[str, str]:
        """Get conversation context for chat continuity with enhanced memory ability"""
        try:
            if self.enhanced_available:
                # Use enhanced context retrieval with better integration
                recent_context, semantic_context = await self._get_enhanced_context(user_id, question)
                return recent_context, semantic_context
            else:
                # Use legacy context with enhanced semantic selection
                from memo.context import get_legacy_context
                return await get_legacy_context(user_id, question, self, self.embedder, 3)
        except Exception as e:
            logger.error(f"[CORE_MEMORY] Failed to get conversation context: {e}")
            return "", ""
    
    async def search_memories(self, user_id: str, query: str,
                            project_id: Optional[str] = None,
                            limit: int = 10) -> List[Tuple[str, float]]:
        """Search memories using semantic similarity"""
        if not self.enhanced_available:
            return []
        
        try:
            results = self.enhanced_memory.search_memories(
                user_id=user_id,
                query=query,
                project_id=project_id,
                limit=limit
            )
            return [(m["content"], score) for m, score in results]
        except Exception as e:
            logger.error(f"[CORE_MEMORY] Failed to search memories: {e}")
            return []
    
    def get_memory_stats(self, user_id: str) -> Dict[str, Any]:
        """Get memory statistics for a user"""
        if self.enhanced_available:
            return self.enhanced_memory.get_memory_stats(user_id)
        else:
            # Legacy memory stats
            all_memories = self.legacy_memory.all(user_id)
            return {
                "total_memories": len(all_memories),
                "system_type": "legacy",
                "enhanced_available": False
            }
    
    async def get_smart_context(self, user_id: str, question: str, 
                              nvidia_rotator=None, project_id: Optional[str] = None) -> Tuple[str, str]:
        """Get smart context using both NVIDIA and semantic similarity for optimal memory ability"""
        try:
            if self.enhanced_available:
                # Use enhanced context with NVIDIA integration if available
                recent_context, semantic_context = await self._get_enhanced_context(user_id, question)
                
                # If NVIDIA rotator is available, enhance recent context selection
                if nvidia_rotator and recent_context:
                    try:
                        from memo.nvidia import related_recent_context
                        recent_memories = self.legacy_memory.recent(user_id, 5)
                        if recent_memories:
                            nvidia_recent = await related_recent_context(question, recent_memories, nvidia_rotator)
                            if nvidia_recent:
                                recent_context = nvidia_recent
                    except Exception as e:
                        logger.warning(f"[CORE_MEMORY] NVIDIA context enhancement failed: {e}")
                
                return recent_context, semantic_context
            else:
                # Use legacy context with NVIDIA enhancement if available
                from memo.context import get_legacy_context
                return await get_legacy_context(user_id, question, self, self.embedder, 3)
        except Exception as e:
            logger.error(f"[CORE_MEMORY] Failed to get smart context: {e}")
            return "", ""
    
    # ────────────────────────────── Private Helper Methods ──────────────────────────────
    
    async def _add_enhanced_memory(self, user_id: str, question: str, answer: str):
        """Add memory to enhanced system"""
        try:
            self.enhanced_memory.add_memory(
                user_id=user_id,
                content=f"Q: {question}\nA: {answer}",
                memory_type="conversation",
                importance="medium",
                tags=["conversation", "qa"]
            )
        except Exception as e:
            logger.warning(f"[CORE_MEMORY] Failed to add enhanced memory: {e}")
    
    async def _get_enhanced_context(self, user_id: str, question: str) -> Tuple[str, str]:
        """Get context from enhanced memory system with semantic selection"""
        try:
            # Get recent conversation memories
            recent_memories = self.enhanced_memory.get_memories(
                user_id=user_id,
                memory_type="conversation",
                limit=5
            )
            
            recent_context = ""
            if recent_memories and self.embedder:
                # Use semantic similarity to select most relevant recent memories
                try:
                    from memo.context import semantic_context
                    recent_summaries = [m["summary"] for m in recent_memories]
                    recent_context = await semantic_context(question, recent_summaries, self.embedder, 3)
                except Exception as e:
                    logger.warning(f"[CORE_MEMORY] Semantic recent context failed, using all: {e}")
                    recent_context = "\n\n".join([m["summary"] for m in recent_memories])
            elif recent_memories:
                recent_context = "\n\n".join([m["summary"] for m in recent_memories])
            
            # Get semantic context from other memory types
            semantic_memories = self.enhanced_memory.get_memories(
                user_id=user_id,
                limit=10
            )
            
            semantic_context = ""
            if semantic_memories and self.embedder:
                try:
                    from memo.context import semantic_context
                    other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"]
                    if other_memories:
                        other_summaries = [m["summary"] for m in other_memories]
                        semantic_context = await semantic_context(question, other_summaries, self.embedder, 5)
                except Exception as e:
                    logger.warning(f"[CORE_MEMORY] Semantic context failed, using all: {e}")
                    other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"]
                    if other_memories:
                        semantic_context = "\n\n".join([m["summary"] for m in other_memories])
            elif semantic_memories:
                other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"]
                if other_memories:
                    semantic_context = "\n\n".join([m["summary"] for m in other_memories])
            
            return recent_context, semantic_context
            
        except Exception as e:
            logger.error(f"[CORE_MEMORY] Failed to get enhanced context: {e}")
            return "", ""

# ────────────────────────────── Global Instance ──────────────────────────────

_memory_system: Optional[MemorySystem] = None

def get_memory_system(mongo_uri: str = None, db_name: str = None) -> MemorySystem:
    """Get the global memory system instance"""
    global _memory_system
    
    if _memory_system is None:
        if mongo_uri is None:
            mongo_uri = os.getenv("MONGO_URI", "mongodb://localhost:27017")
        if db_name is None:
            db_name = os.getenv("MONGO_DB", "studybuddy")
        
        _memory_system = MemorySystem(mongo_uri, db_name)
        logger.info("[CORE_MEMORY] Global memory system initialized")
    
    return _memory_system

def reset_memory_system():
    """Reset the global memory system (for testing)"""
    global _memory_system
    _memory_system = None