EdSummariser / memo /core.py
LiamKhoaLe's picture
refactor utils
6d2a17c
raw
history blame
14 kB
# ────────────────────────────── memo/core.py ──────────────────────────────
"""
Core Memory System
Main memory system that provides both legacy and enhanced functionality.
"""
import os
import asyncio
from typing import List, Dict, Any, Optional, Tuple
from utils.logger import get_logger
from utils.rag.embeddings import EmbeddingClient
from memo.legacy import MemoryLRU
from memo.persistent import PersistentMemory
logger = get_logger("CORE_MEMORY", __name__)
class MemorySystem:
"""
Main memory system that provides both legacy and enhanced functionality.
Automatically uses enhanced features when MongoDB is available.
"""
def __init__(self, mongo_uri: str = None, db_name: str = "studybuddy"):
self.mongo_uri = mongo_uri or os.getenv("MONGO_URI", "mongodb://localhost:27017")
self.db_name = db_name
# Initialize legacy memory system (always available)
self.legacy_memory = MemoryLRU()
# Initialize enhanced memory system if MongoDB is available
self.enhanced_available = False
self.enhanced_memory = None
self.embedder = None
try:
self.embedder = EmbeddingClient()
self.enhanced_memory = PersistentMemory(self.mongo_uri, self.db_name, self.embedder)
self.enhanced_available = True
logger.info("[CORE_MEMORY] Enhanced memory system initialized")
except Exception as e:
logger.warning(f"[CORE_MEMORY] Enhanced memory system unavailable: {e}")
self.enhanced_available = False
logger.info(f"[CORE_MEMORY] Initialized with enhanced_available={self.enhanced_available}")
# ────────────────────────────── Core Memory Operations ──────────────────────────────
def add(self, user_id: str, qa_summary: str):
"""Add a Q&A summary to memory (backward compatibility)"""
try:
# Add to legacy memory
self.legacy_memory.add(user_id, qa_summary)
# Also add to enhanced memory if available
if self.enhanced_available:
# Extract question and answer from summary
lines = qa_summary.split('\n')
question = ""
answer = ""
for line in lines:
if line.strip().lower().startswith('q:'):
question = line.strip()[2:].strip()
elif line.strip().lower().startswith('a:'):
answer = line.strip()[2:].strip()
if question and answer:
asyncio.create_task(self._add_enhanced_memory(user_id, question, answer))
logger.debug(f"[CORE_MEMORY] Added memory for user {user_id}")
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to add memory: {e}")
def recent(self, user_id: str, n: int = 3) -> List[str]:
"""Get recent memories (backward compatibility)"""
return self.legacy_memory.recent(user_id, n)
def rest(self, user_id: str, skip_n: int = 3) -> List[str]:
"""Get remaining memories excluding recent ones (backward compatibility)"""
return self.legacy_memory.rest(user_id, skip_n)
def all(self, user_id: str) -> List[str]:
"""Get all memories for a user (backward compatibility)"""
return self.legacy_memory.all(user_id)
def clear(self, user_id: str) -> None:
"""Clear all memories for a user (backward compatibility)"""
self.legacy_memory.clear(user_id)
# Also clear enhanced memory if available
if self.enhanced_available:
try:
self.enhanced_memory.clear_user_memories(user_id)
logger.info(f"[CORE_MEMORY] Cleared enhanced memory for user {user_id}")
except Exception as e:
logger.warning(f"[CORE_MEMORY] Failed to clear enhanced memory: {e}")
def is_enhanced_available(self) -> bool:
"""Check if enhanced memory features are available"""
return self.enhanced_available
# ────────────────────────────── Enhanced Features ──────────────────────────────
async def add_conversation_memory(self, user_id: str, question: str, answer: str,
project_id: Optional[str] = None,
context: Dict[str, Any] = None) -> str:
"""Add conversation memory with enhanced context"""
if not self.enhanced_available:
logger.warning("[CORE_MEMORY] Enhanced features not available")
return ""
try:
memory_id = self.enhanced_memory.add_memory(
user_id=user_id,
content=f"Q: {question}\nA: {answer}",
memory_type="conversation",
project_id=project_id,
importance="medium",
tags=["conversation", "qa"],
metadata=context or {}
)
return memory_id
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to add conversation memory: {e}")
return ""
async def get_conversation_context(self, user_id: str, question: str,
project_id: Optional[str] = None) -> Tuple[str, str]:
"""Get conversation context for chat continuity with enhanced memory ability"""
try:
if self.enhanced_available:
# Use enhanced context retrieval with better integration
recent_context, semantic_context = await self._get_enhanced_context(user_id, question)
return recent_context, semantic_context
else:
# Use legacy context with enhanced semantic selection
from memo.context import get_legacy_context
return await get_legacy_context(user_id, question, self, self.embedder, 3)
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to get conversation context: {e}")
return "", ""
async def search_memories(self, user_id: str, query: str,
project_id: Optional[str] = None,
limit: int = 10) -> List[Tuple[str, float]]:
"""Search memories using semantic similarity"""
if not self.enhanced_available:
return []
try:
results = self.enhanced_memory.search_memories(
user_id=user_id,
query=query,
project_id=project_id,
limit=limit
)
return [(m["content"], score) for m, score in results]
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to search memories: {e}")
return []
def get_memory_stats(self, user_id: str) -> Dict[str, Any]:
"""Get memory statistics for a user"""
if self.enhanced_available:
return self.enhanced_memory.get_memory_stats(user_id)
else:
# Legacy memory stats
all_memories = self.legacy_memory.all(user_id)
return {
"total_memories": len(all_memories),
"system_type": "legacy",
"enhanced_available": False
}
async def get_smart_context(self, user_id: str, question: str,
nvidia_rotator=None, project_id: Optional[str] = None) -> Tuple[str, str]:
"""Get smart context using both NVIDIA and semantic similarity for optimal memory ability"""
try:
if self.enhanced_available:
# Use enhanced context with NVIDIA integration if available
recent_context, semantic_context = await self._get_enhanced_context(user_id, question)
# If NVIDIA rotator is available, enhance recent context selection
if nvidia_rotator and recent_context:
try:
from memo.nvidia import related_recent_context
recent_memories = self.legacy_memory.recent(user_id, 5)
if recent_memories:
nvidia_recent = await related_recent_context(question, recent_memories, nvidia_rotator)
if nvidia_recent:
recent_context = nvidia_recent
except Exception as e:
logger.warning(f"[CORE_MEMORY] NVIDIA context enhancement failed: {e}")
return recent_context, semantic_context
else:
# Use legacy context with NVIDIA enhancement if available
from memo.context import get_legacy_context
return await get_legacy_context(user_id, question, self, self.embedder, 3)
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to get smart context: {e}")
return "", ""
# ────────────────────────────── Private Helper Methods ──────────────────────────────
async def _add_enhanced_memory(self, user_id: str, question: str, answer: str):
"""Add memory to enhanced system"""
try:
self.enhanced_memory.add_memory(
user_id=user_id,
content=f"Q: {question}\nA: {answer}",
memory_type="conversation",
importance="medium",
tags=["conversation", "qa"]
)
except Exception as e:
logger.warning(f"[CORE_MEMORY] Failed to add enhanced memory: {e}")
async def _get_enhanced_context(self, user_id: str, question: str) -> Tuple[str, str]:
"""Get context from enhanced memory system with semantic selection"""
try:
# Get recent conversation memories
recent_memories = self.enhanced_memory.get_memories(
user_id=user_id,
memory_type="conversation",
limit=5
)
recent_context = ""
if recent_memories and self.embedder:
# Use semantic similarity to select most relevant recent memories
try:
from memo.context import semantic_context
recent_summaries = [m["summary"] for m in recent_memories]
recent_context = await semantic_context(question, recent_summaries, self.embedder, 3)
except Exception as e:
logger.warning(f"[CORE_MEMORY] Semantic recent context failed, using all: {e}")
recent_context = "\n\n".join([m["summary"] for m in recent_memories])
elif recent_memories:
recent_context = "\n\n".join([m["summary"] for m in recent_memories])
# Get semantic context from other memory types
semantic_memories = self.enhanced_memory.get_memories(
user_id=user_id,
limit=10
)
semantic_context = ""
if semantic_memories and self.embedder:
try:
from memo.context import semantic_context
other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"]
if other_memories:
other_summaries = [m["summary"] for m in other_memories]
semantic_context = await semantic_context(question, other_summaries, self.embedder, 5)
except Exception as e:
logger.warning(f"[CORE_MEMORY] Semantic context failed, using all: {e}")
other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"]
if other_memories:
semantic_context = "\n\n".join([m["summary"] for m in other_memories])
elif semantic_memories:
other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"]
if other_memories:
semantic_context = "\n\n".join([m["summary"] for m in other_memories])
return recent_context, semantic_context
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to get enhanced context: {e}")
return "", ""
# ────────────────────────────── Global Instance ──────────────────────────────
_memory_system: Optional[MemorySystem] = None
def get_memory_system(mongo_uri: str = None, db_name: str = None) -> MemorySystem:
"""Get the global memory system instance"""
global _memory_system
if _memory_system is None:
if mongo_uri is None:
mongo_uri = os.getenv("MONGO_URI", "mongodb://localhost:27017")
if db_name is None:
db_name = os.getenv("MONGO_DB", "studybuddy")
_memory_system = MemorySystem(mongo_uri, db_name)
logger.info("[CORE_MEMORY] Global memory system initialized")
return _memory_system
def reset_memory_system():
"""Reset the global memory system (for testing)"""
global _memory_system
_memory_system = None