Spaces:
Sleeping
Sleeping
| # ────────────────────────────── memo/persistent.py ────────────────────────────── | |
| """ | |
| Persistent Memory System | |
| MongoDB-based persistent memory storage with semantic search capabilities. | |
| """ | |
| import os | |
| import uuid | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from datetime import datetime, timezone | |
| from utils.logger import get_logger | |
| from utils.rag.embeddings import EmbeddingClient | |
| logger = get_logger("PERSISTENT_MEMORY", __name__) | |
| class PersistentMemory: | |
| """MongoDB-based persistent memory system with semantic search""" | |
| def __init__(self, mongo_uri: str, db_name: str, embedder: EmbeddingClient): | |
| self.mongo_uri = mongo_uri | |
| self.db_name = db_name | |
| self.embedder = embedder | |
| # MongoDB connection | |
| try: | |
| from pymongo import MongoClient | |
| self.client = MongoClient(mongo_uri) | |
| self.db = self.client[db_name] | |
| self.memories = self.db["memories"] | |
| # Create indexes for efficient querying | |
| self.memories.create_index([("user_id", 1), ("memory_type", 1)]) | |
| self.memories.create_index([("user_id", 1), ("created_at", -1)]) | |
| self.memories.create_index([("user_id", 1), ("project_id", 1)]) | |
| logger.info(f"[PERSISTENT_MEMORY] Connected to MongoDB: {db_name}") | |
| except Exception as e: | |
| logger.error(f"[PERSISTENT_MEMORY] Failed to connect to MongoDB: {e}") | |
| raise | |
| def add_memory(self, user_id: str, content: str, memory_type: str, | |
| project_id: str = None, importance: str = "medium", | |
| tags: List[str] = None, metadata: Dict[str, Any] = None) -> str: | |
| """Add a memory entry to the persistent system""" | |
| try: | |
| # Generate embedding for semantic search | |
| embedding = self.embedder.embed([content])[0] if content else None | |
| # Create summary | |
| summary = content[:200] + "..." if len(content) > 200 else content | |
| memory_entry = { | |
| "id": str(uuid.uuid4()), | |
| "user_id": user_id, | |
| "project_id": project_id, | |
| "memory_type": memory_type, | |
| "content": content, | |
| "summary": summary, | |
| "importance": importance, | |
| "tags": tags or [], | |
| "created_at": datetime.now(timezone.utc), | |
| "updated_at": datetime.now(timezone.utc), | |
| "last_accessed": datetime.now(timezone.utc), | |
| "access_count": 0, | |
| "embedding": embedding, | |
| "metadata": metadata or {} | |
| } | |
| # Store in MongoDB | |
| self.memories.insert_one(memory_entry) | |
| logger.info(f"[PERSISTENT_MEMORY] Added {memory_type} memory for user {user_id}") | |
| return memory_entry["id"] | |
| except Exception as e: | |
| logger.error(f"[PERSISTENT_MEMORY] Failed to add memory: {e}") | |
| raise | |
| def get_memories(self, user_id: str, memory_type: str = None, | |
| project_id: str = None, limit: int = 50) -> List[Dict[str, Any]]: | |
| """Get memories for a user with optional filtering""" | |
| try: | |
| query = {"user_id": user_id} | |
| if memory_type: | |
| query["memory_type"] = memory_type | |
| if project_id: | |
| query["project_id"] = project_id | |
| cursor = self.memories.find(query).sort("created_at", -1).limit(limit) | |
| return list(cursor) | |
| except Exception as e: | |
| logger.error(f"[PERSISTENT_MEMORY] Failed to get memories: {e}") | |
| return [] | |
| def search_memories(self, user_id: str, query: str, memory_types: List[str] = None, | |
| project_id: str = None, limit: int = 10) -> List[Tuple[Dict[str, Any], float]]: | |
| """Search memories using semantic similarity""" | |
| try: | |
| # Generate query embedding | |
| query_embedding = self.embedder.embed([query])[0] | |
| # Build MongoDB query | |
| mongo_query = { | |
| "user_id": user_id, | |
| "embedding": {"$exists": True} | |
| } | |
| if memory_types: | |
| mongo_query["memory_type"] = {"$in": memory_types} | |
| if project_id: | |
| mongo_query["project_id"] = project_id | |
| # Get all matching memories | |
| cursor = self.memories.find(mongo_query) | |
| # Calculate similarities | |
| results = [] | |
| for doc in cursor: | |
| try: | |
| if doc.get("embedding"): | |
| # Calculate cosine similarity | |
| similarity = self._cosine_similarity(query_embedding, doc["embedding"]) | |
| results.append((doc, similarity)) | |
| except Exception as e: | |
| logger.warning(f"[PERSISTENT_MEMORY] Failed to process memory for search: {e}") | |
| continue | |
| # Sort by similarity and return top results | |
| results.sort(key=lambda x: x[1], reverse=True) | |
| return results[:limit] | |
| except Exception as e: | |
| logger.error(f"[PERSISTENT_MEMORY] Failed to search memories: {e}") | |
| return [] | |
| def _cosine_similarity(self, a: List[float], b: List[float]) -> float: | |
| """Calculate cosine similarity between two vectors""" | |
| try: | |
| import numpy as np | |
| from memo.context import cosine_similarity | |
| a_np = np.array(a) | |
| b_np = np.array(b) | |
| return cosine_similarity(a_np, b_np) | |
| except Exception: | |
| return 0.0 | |
| def clear_user_memories(self, user_id: str) -> int: | |
| """Clear all memories for a user""" | |
| try: | |
| result = self.memories.delete_many({"user_id": user_id}) | |
| logger.info(f"[PERSISTENT_MEMORY] Cleared {result.deleted_count} memories for user {user_id}") | |
| return result.deleted_count | |
| except Exception as e: | |
| logger.error(f"[PERSISTENT_MEMORY] Failed to clear user memories: {e}") | |
| return 0 | |
| def clear_project_memories(self, user_id: str, project_id: str) -> int: | |
| """Clear all memories for a specific user and project""" | |
| try: | |
| result = self.memories.delete_many({"user_id": user_id, "project_id": project_id}) | |
| logger.info(f"[PERSISTENT_MEMORY] Cleared {result.deleted_count} memories for user {user_id}, project {project_id}") | |
| return result.deleted_count | |
| except Exception as e: | |
| logger.error(f"[PERSISTENT_MEMORY] Failed to clear project memories: {e}") | |
| return 0 | |
| def get_memory_stats(self, user_id: str) -> Dict[str, Any]: | |
| """Get memory statistics for a user""" | |
| try: | |
| stats = { | |
| "total_memories": self.memories.count_documents({"user_id": user_id}), | |
| "by_type": {}, | |
| "recent_activity": 0 | |
| } | |
| # Count by memory type | |
| pipeline = [ | |
| {"$match": {"user_id": user_id}}, | |
| {"$group": {"_id": "$memory_type", "count": {"$sum": 1}}} | |
| ] | |
| for result in self.memories.aggregate(pipeline): | |
| stats["by_type"][result["_id"]] = result["count"] | |
| # Recent activity (last 7 days) | |
| from datetime import timedelta | |
| week_ago = datetime.now(timezone.utc) - timedelta(days=7) | |
| stats["recent_activity"] = self.memories.count_documents({ | |
| "user_id": user_id, | |
| "created_at": {"$gte": week_ago} | |
| }) | |
| return stats | |
| except Exception as e: | |
| logger.error(f"[PERSISTENT_MEMORY] Failed to get memory stats: {e}") | |
| return {} | |
| def update_memory(self, memory_id: str, content: str = None, importance: str = None, | |
| tags: List[str] = None, metadata: Dict[str, Any] = None) -> bool: | |
| """Update an existing memory entry""" | |
| try: | |
| update_data = {"updated_at": datetime.now(timezone.utc)} | |
| if content is not None: | |
| update_data["content"] = content | |
| update_data["summary"] = content[:200] + "..." if len(content) > 200 else content | |
| # Update embedding if content changed | |
| update_data["embedding"] = self.embedder.embed([content])[0] | |
| if importance is not None: | |
| update_data["importance"] = importance | |
| if tags is not None: | |
| update_data["tags"] = tags | |
| if metadata is not None: | |
| update_data["metadata"] = metadata | |
| result = self.memories.update_one( | |
| {"id": memory_id}, | |
| {"$set": update_data} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"[PERSISTENT_MEMORY] Updated memory {memory_id}") | |
| return True | |
| else: | |
| logger.warning(f"[PERSISTENT_MEMORY] Memory {memory_id} not found for update") | |
| return False | |
| except Exception as e: | |
| logger.error(f"[PERSISTENT_MEMORY] Failed to update memory: {e}") | |
| return False | |
| def delete_memory(self, memory_id: str) -> bool: | |
| """Delete a specific memory entry""" | |
| try: | |
| result = self.memories.delete_one({"id": memory_id}) | |
| if result.deleted_count > 0: | |
| logger.info(f"[PERSISTENT_MEMORY] Deleted memory {memory_id}") | |
| return True | |
| else: | |
| logger.warning(f"[PERSISTENT_MEMORY] Memory {memory_id} not found for deletion") | |
| return False | |
| except Exception as e: | |
| logger.error(f"[PERSISTENT_MEMORY] Failed to delete memory: {e}") | |
| return False | |
| def get_memory_by_id(self, memory_id: str) -> Optional[Dict[str, Any]]: | |
| """Get a specific memory by its ID""" | |
| try: | |
| memory = self.memories.find_one({"id": memory_id}) | |
| if memory: | |
| # Increment access count | |
| self.increment_access(memory_id) | |
| return memory | |
| except Exception as e: | |
| logger.error(f"[PERSISTENT_MEMORY] Failed to get memory by ID: {e}") | |
| return None | |
| def increment_access(self, memory_id: str) -> bool: | |
| """Increment access count and update last accessed time""" | |
| try: | |
| result = self.memories.update_one( | |
| {"id": memory_id}, | |
| { | |
| "$inc": {"access_count": 1}, | |
| "$set": {"last_accessed": datetime.now(timezone.utc)} | |
| } | |
| ) | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| logger.error(f"[PERSISTENT_MEMORY] Failed to increment access: {e}") | |
| return False |