Spaces:
Sleeping
Sleeping
| # ────────────────────────────── memo/core.py ────────────────────────────── | |
| """ | |
| Core Memory System | |
| Main memory system that provides both legacy and enhanced functionality. | |
| """ | |
| import os | |
| import asyncio | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from utils.logger import get_logger | |
| from utils.rag.embeddings import EmbeddingClient | |
| from memo.legacy import MemoryLRU | |
| from memo.persistent import PersistentMemory | |
| logger = get_logger("CORE_MEMORY", __name__) | |
| class MemorySystem: | |
| """ | |
| Main memory system that provides both legacy and enhanced functionality. | |
| Automatically uses enhanced features when MongoDB is available. | |
| """ | |
| def __init__(self, mongo_uri: str = None, db_name: str = "studybuddy"): | |
| self.mongo_uri = mongo_uri or os.getenv("MONGO_URI", "mongodb://localhost:27017") | |
| self.db_name = db_name | |
| # Initialize legacy memory system (always available) | |
| self.legacy_memory = MemoryLRU() | |
| # Initialize enhanced memory system if MongoDB is available | |
| self.enhanced_available = False | |
| self.enhanced_memory = None | |
| self.embedder = None | |
| try: | |
| self.embedder = EmbeddingClient() | |
| self.enhanced_memory = PersistentMemory(self.mongo_uri, self.db_name, self.embedder) | |
| self.enhanced_available = True | |
| logger.info("[CORE_MEMORY] Enhanced memory system initialized") | |
| except Exception as e: | |
| logger.warning(f"[CORE_MEMORY] Enhanced memory system unavailable: {e}") | |
| self.enhanced_available = False | |
| logger.info(f"[CORE_MEMORY] Initialized with enhanced_available={self.enhanced_available}") | |
| # ────────────────────────────── Core Memory Operations ────────────────────────────── | |
| def add(self, user_id: str, qa_summary: str): | |
| """Add a Q&A summary to memory (backward compatibility)""" | |
| try: | |
| # Add to legacy memory | |
| self.legacy_memory.add(user_id, qa_summary) | |
| # Also add to enhanced memory if available | |
| if self.enhanced_available: | |
| # Extract question and answer from summary | |
| lines = qa_summary.split('\n') | |
| question = "" | |
| answer = "" | |
| for line in lines: | |
| if line.strip().lower().startswith('q:'): | |
| question = line.strip()[2:].strip() | |
| elif line.strip().lower().startswith('a:'): | |
| answer = line.strip()[2:].strip() | |
| if question and answer: | |
| asyncio.create_task(self._add_enhanced_memory(user_id, question, answer)) | |
| logger.debug(f"[CORE_MEMORY] Added memory for user {user_id}") | |
| except Exception as e: | |
| logger.error(f"[CORE_MEMORY] Failed to add memory: {e}") | |
| def recent(self, user_id: str, n: int = 3) -> List[str]: | |
| """Get recent memories (backward compatibility)""" | |
| return self.legacy_memory.recent(user_id, n) | |
| def rest(self, user_id: str, skip_n: int = 3) -> List[str]: | |
| """Get remaining memories excluding recent ones (backward compatibility)""" | |
| return self.legacy_memory.rest(user_id, skip_n) | |
| def all(self, user_id: str) -> List[str]: | |
| """Get all memories for a user (backward compatibility)""" | |
| return self.legacy_memory.all(user_id) | |
| def clear(self, user_id: str) -> None: | |
| """Clear all memories for a user (backward compatibility)""" | |
| self.legacy_memory.clear(user_id) | |
| # Also clear enhanced memory if available | |
| if self.enhanced_available: | |
| try: | |
| self.enhanced_memory.clear_user_memories(user_id) | |
| logger.info(f"[CORE_MEMORY] Cleared enhanced memory for user {user_id}") | |
| except Exception as e: | |
| logger.warning(f"[CORE_MEMORY] Failed to clear enhanced memory: {e}") | |
| def is_enhanced_available(self) -> bool: | |
| """Check if enhanced memory features are available""" | |
| return self.enhanced_available | |
| # ────────────────────────────── Enhanced Features ────────────────────────────── | |
| async def add_conversation_memory(self, user_id: str, question: str, answer: str, | |
| project_id: Optional[str] = None, | |
| context: Dict[str, Any] = None) -> str: | |
| """Add conversation memory with enhanced context""" | |
| if not self.enhanced_available: | |
| logger.warning("[CORE_MEMORY] Enhanced features not available") | |
| return "" | |
| try: | |
| memory_id = self.enhanced_memory.add_memory( | |
| user_id=user_id, | |
| content=f"Q: {question}\nA: {answer}", | |
| memory_type="conversation", | |
| project_id=project_id, | |
| importance="medium", | |
| tags=["conversation", "qa"], | |
| metadata=context or {} | |
| ) | |
| return memory_id | |
| except Exception as e: | |
| logger.error(f"[CORE_MEMORY] Failed to add conversation memory: {e}") | |
| return "" | |
| async def get_conversation_context(self, user_id: str, question: str, | |
| project_id: Optional[str] = None) -> Tuple[str, str]: | |
| """Get conversation context for chat continuity with enhanced memory ability""" | |
| try: | |
| if self.enhanced_available: | |
| # Use enhanced context retrieval with better integration | |
| recent_context, semantic_context = await self._get_enhanced_context(user_id, question) | |
| return recent_context, semantic_context | |
| else: | |
| # Use legacy context with enhanced semantic selection | |
| from memo.context import get_legacy_context | |
| return await get_legacy_context(user_id, question, self, self.embedder, 3) | |
| except Exception as e: | |
| logger.error(f"[CORE_MEMORY] Failed to get conversation context: {e}") | |
| return "", "" | |
| async def search_memories(self, user_id: str, query: str, | |
| project_id: Optional[str] = None, | |
| limit: int = 10) -> List[Tuple[str, float]]: | |
| """Search memories using semantic similarity""" | |
| if not self.enhanced_available: | |
| return [] | |
| try: | |
| results = self.enhanced_memory.search_memories( | |
| user_id=user_id, | |
| query=query, | |
| project_id=project_id, | |
| limit=limit | |
| ) | |
| return [(m["content"], score) for m, score in results] | |
| except Exception as e: | |
| logger.error(f"[CORE_MEMORY] Failed to search memories: {e}") | |
| return [] | |
| def get_memory_stats(self, user_id: str) -> Dict[str, Any]: | |
| """Get memory statistics for a user""" | |
| if self.enhanced_available: | |
| return self.enhanced_memory.get_memory_stats(user_id) | |
| else: | |
| # Legacy memory stats | |
| all_memories = self.legacy_memory.all(user_id) | |
| return { | |
| "total_memories": len(all_memories), | |
| "system_type": "legacy", | |
| "enhanced_available": False | |
| } | |
| async def get_smart_context(self, user_id: str, question: str, | |
| nvidia_rotator=None, project_id: Optional[str] = None) -> Tuple[str, str]: | |
| """Get smart context using both NVIDIA and semantic similarity for optimal memory ability""" | |
| try: | |
| if self.enhanced_available: | |
| # Use enhanced context with NVIDIA integration if available | |
| recent_context, semantic_context = await self._get_enhanced_context(user_id, question) | |
| # If NVIDIA rotator is available, enhance recent context selection | |
| if nvidia_rotator and recent_context: | |
| try: | |
| from memo.nvidia import related_recent_context | |
| recent_memories = self.legacy_memory.recent(user_id, 5) | |
| if recent_memories: | |
| nvidia_recent = await related_recent_context(question, recent_memories, nvidia_rotator) | |
| if nvidia_recent: | |
| recent_context = nvidia_recent | |
| except Exception as e: | |
| logger.warning(f"[CORE_MEMORY] NVIDIA context enhancement failed: {e}") | |
| return recent_context, semantic_context | |
| else: | |
| # Use legacy context with NVIDIA enhancement if available | |
| from memo.context import get_legacy_context | |
| return await get_legacy_context(user_id, question, self, self.embedder, 3) | |
| except Exception as e: | |
| logger.error(f"[CORE_MEMORY] Failed to get smart context: {e}") | |
| return "", "" | |
| # ────────────────────────────── Private Helper Methods ────────────────────────────── | |
| async def _add_enhanced_memory(self, user_id: str, question: str, answer: str): | |
| """Add memory to enhanced system""" | |
| try: | |
| self.enhanced_memory.add_memory( | |
| user_id=user_id, | |
| content=f"Q: {question}\nA: {answer}", | |
| memory_type="conversation", | |
| importance="medium", | |
| tags=["conversation", "qa"] | |
| ) | |
| except Exception as e: | |
| logger.warning(f"[CORE_MEMORY] Failed to add enhanced memory: {e}") | |
| async def _get_enhanced_context(self, user_id: str, question: str) -> Tuple[str, str]: | |
| """Get context from enhanced memory system with semantic selection""" | |
| try: | |
| # Get recent conversation memories | |
| recent_memories = self.enhanced_memory.get_memories( | |
| user_id=user_id, | |
| memory_type="conversation", | |
| limit=5 | |
| ) | |
| recent_context = "" | |
| if recent_memories and self.embedder: | |
| # Use semantic similarity to select most relevant recent memories | |
| try: | |
| from memo.context import semantic_context | |
| recent_summaries = [m["summary"] for m in recent_memories] | |
| recent_context = await semantic_context(question, recent_summaries, self.embedder, 3) | |
| except Exception as e: | |
| logger.warning(f"[CORE_MEMORY] Semantic recent context failed, using all: {e}") | |
| recent_context = "\n\n".join([m["summary"] for m in recent_memories]) | |
| elif recent_memories: | |
| recent_context = "\n\n".join([m["summary"] for m in recent_memories]) | |
| # Get semantic context from other memory types | |
| semantic_memories = self.enhanced_memory.get_memories( | |
| user_id=user_id, | |
| limit=10 | |
| ) | |
| semantic_context = "" | |
| if semantic_memories and self.embedder: | |
| try: | |
| from memo.context import semantic_context | |
| other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"] | |
| if other_memories: | |
| other_summaries = [m["summary"] for m in other_memories] | |
| semantic_context = await semantic_context(question, other_summaries, self.embedder, 5) | |
| except Exception as e: | |
| logger.warning(f"[CORE_MEMORY] Semantic context failed, using all: {e}") | |
| other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"] | |
| if other_memories: | |
| semantic_context = "\n\n".join([m["summary"] for m in other_memories]) | |
| elif semantic_memories: | |
| other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"] | |
| if other_memories: | |
| semantic_context = "\n\n".join([m["summary"] for m in other_memories]) | |
| return recent_context, semantic_context | |
| except Exception as e: | |
| logger.error(f"[CORE_MEMORY] Failed to get enhanced context: {e}") | |
| return "", "" | |
| # ────────────────────────────── Global Instance ────────────────────────────── | |
| _memory_system: Optional[MemorySystem] = None | |
| def get_memory_system(mongo_uri: str = None, db_name: str = None) -> MemorySystem: | |
| """Get the global memory system instance""" | |
| global _memory_system | |
| if _memory_system is None: | |
| if mongo_uri is None: | |
| mongo_uri = os.getenv("MONGO_URI", "mongodb://localhost:27017") | |
| if db_name is None: | |
| db_name = os.getenv("MONGO_DB", "studybuddy") | |
| _memory_system = MemorySystem(mongo_uri, db_name) | |
| logger.info("[CORE_MEMORY] Global memory system initialized") | |
| return _memory_system | |
| def reset_memory_system(): | |
| """Reset the global memory system (for testing)""" | |
| global _memory_system | |
| _memory_system = None | |