from __future__ import annotations import json import os import threading import uuid from datetime import datetime from typing import Annotated, Dict, List, Literal, Optional import gradio as gr MEMORY_FILE = os.path.join(os.path.dirname(__file__), "memories.json") _MEMORY_LOCK = threading.RLock() _MAX_MEMORIES = 10_000 def _now_iso() -> str: return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") def _load_memories() -> List[Dict[str, str]]: if not os.path.exists(MEMORY_FILE): return [] try: with open(MEMORY_FILE, "r", encoding="utf-8") as file: data = json.load(file) if isinstance(data, list): cleaned: List[Dict[str, str]] = [] for item in data: if isinstance(item, dict) and "id" in item and "text" in item: cleaned.append(item) return cleaned return [] except Exception: try: backup = MEMORY_FILE + ".corrupt" if not os.path.exists(backup): os.replace(MEMORY_FILE, backup) except Exception: pass return [] def _save_memories(memories: List[Dict[str, str]]) -> None: tmp_path = MEMORY_FILE + ".tmp" with open(tmp_path, "w", encoding="utf-8") as file: json.dump(memories, file, ensure_ascii=False, indent=2) os.replace(tmp_path, MEMORY_FILE) def _mem_save(text: str, tags: str) -> str: text_clean = (text or "").strip() if not text_clean: return "Error: memory text is empty." with _MEMORY_LOCK: memories = _load_memories() if memories and memories[-1].get("text") == text_clean: return "Skipped: identical to last stored memory." mem_id = str(uuid.uuid4()) entry = { "id": mem_id, "text": text_clean, "timestamp": _now_iso(), "tags": tags.strip(), } memories.append(entry) if len(memories) > _MAX_MEMORIES: overflow = len(memories) - _MAX_MEMORIES memories = memories[overflow:] _save_memories(memories) return f"Memory saved: {mem_id}" def _mem_list(limit: int, include_tags: bool) -> str: limit = max(1, min(200, limit)) with _MEMORY_LOCK: memories = _load_memories() if not memories: return "No memories stored yet." chosen = memories[-limit:][::-1] lines: List[str] = [] for memory in chosen: base = f"{memory['id'][:8]} [{memory.get('timestamp','?')}] {memory.get('text','')}" if include_tags and memory.get("tags"): base += f" | tags: {memory['tags']}" lines.append(base) omitted = len(memories) - len(chosen) if omitted > 0: lines.append(f"… ({omitted} older memorie{'s' if omitted!=1 else ''} omitted; total={len(memories)})") return "\n".join(lines) def _parse_search_query(query: str) -> Dict[str, List[str]]: import re result = {"tag_terms": [], "text_terms": [], "operator": "and"} if not query or not query.strip(): return result query = re.sub(r"\s+", " ", query.strip()) if re.search(r"\bOR\b", query, re.IGNORECASE): result["operator"] = "or" parts = re.split(r"\s+OR\s+", query, flags=re.IGNORECASE) else: parts = re.split(r"\s+(?:AND\s+)?", query, flags=re.IGNORECASE) parts = [p for p in parts if p.strip() and p.strip().upper() != "AND"] for part in parts: part = part.strip() if not part: continue tag_match = re.match(r"^tag:(.+)$", part, re.IGNORECASE) if tag_match: tag_name = tag_match.group(1).strip() if tag_name: result["tag_terms"].append(tag_name.lower()) else: result["text_terms"].append(part.lower()) return result def _match_memory_with_query(memory: Dict[str, str], parsed_query: Dict[str, List[str]]) -> bool: tag_terms = parsed_query["tag_terms"] text_terms = parsed_query["text_terms"] operator = parsed_query["operator"] if not tag_terms and not text_terms: return False memory_text = memory.get("text", "").lower() memory_tags = memory.get("tags", "").lower() memory_tag_list = [tag.strip() for tag in memory_tags.split(",") if tag.strip()] tag_matches = [any(tag_term in tag for tag in memory_tag_list) for tag_term in tag_terms] combined_text = memory_text + " " + memory_tags text_matches = [text_term in combined_text for text_term in text_terms] all_matches = tag_matches + text_matches if not all_matches: return False if operator == "or": return any(all_matches) return all(all_matches) def _mem_search(query: str, limit: int) -> str: q = (query or "").strip() if not q: return "Error: empty query." parsed_query = _parse_search_query(q) if not parsed_query["tag_terms"] and not parsed_query["text_terms"]: return "Error: no valid search terms found." limit = max(1, min(200, limit)) with _MEMORY_LOCK: memories = _load_memories() matches: List[Dict[str, str]] = [] total_matches = 0 for memory in reversed(memories): if _match_memory_with_query(memory, parsed_query): total_matches += 1 if len(matches) < limit: matches.append(memory) if not matches: return f"No matches for: {query}" lines = [ f"{memory['id'][:8]} [{memory.get('timestamp','?')}] {memory.get('text','')}" + (f" | tags: {memory['tags']}" if memory.get('tags') else "") for memory in matches ] omitted = total_matches - len(matches) if omitted > 0: lines.append(f"… ({omitted} additional match{'es' if omitted!=1 else ''} omitted; total_matches={total_matches})") return "\n".join(lines) def _mem_delete(memory_id: str) -> str: key = (memory_id or "").strip().lower() if len(key) < 4: return "Error: supply at least 4 characters of the id." with _MEMORY_LOCK: memories = _load_memories() matched = [memory for memory in memories if memory["id"].lower().startswith(key)] if not matched: return "Memory not found." if len(matched) > 1 and key != matched[0]["id"].lower(): sample = ", ".join(memory["id"][:8] for memory in matched[:5]) more = "…" if len(matched) > 5 else "" return f"Ambiguous prefix (matches {len(matched)} ids: {sample}{more}). Provide more characters." target_id = matched[0]["id"] memories = [memory for memory in memories if memory["id"] != target_id] _save_memories(memories) return f"Deleted memory: {target_id}" def Memory_Manager( action: Annotated[Literal["save", "list", "search", "delete"], "Action to perform: save | list | search | delete"], text: Annotated[Optional[str], "Text content (Save only)"] = None, tags: Annotated[Optional[str], "Comma-separated tags (Save only)"] = None, query: Annotated[Optional[str], "Enhanced search with tag:name syntax, AND/OR operators (Search only)"] = None, limit: Annotated[int, "Max results (List/Search only)"] = 20, memory_id: Annotated[Optional[str], "Full UUID or unique prefix (Delete only)"] = None, include_tags: Annotated[bool, "Include tags (List/Search only)"] = True, ) -> str: act = (action or "").lower().strip() text = text or "" tags = tags or "" query = query or "" memory_id = memory_id or "" if act == "save": if not text.strip(): return "Error: 'text' is required when action=save." return _mem_save(text=text, tags=tags) if act == "list": return _mem_list(limit=limit, include_tags=include_tags) if act == "search": if not query.strip(): return "Error: 'query' is required when action=search." return _mem_search(query=query, limit=limit) if act == "delete": if not memory_id.strip(): return "Error: 'memory_id' is required when action=delete." return _mem_delete(memory_id=memory_id) return "Error: invalid action (use save|list|search|delete)." def build_interface() -> gr.Interface: return gr.Interface( fn=Memory_Manager, inputs=[ gr.Dropdown(label="Action", choices=["save", "list", "search", "delete"], value="list"), gr.Textbox(label="Text", lines=3, placeholder="Memory text (save)"), gr.Textbox(label="Tags", placeholder="tag1, tag2", max_lines=1), gr.Textbox(label="Query", placeholder="tag:work AND tag:project OR meeting", max_lines=1), gr.Slider(1, 200, value=20, step=1, label="Limit"), gr.Textbox(label="Memory ID / Prefix", placeholder="UUID or prefix (delete)", max_lines=1), gr.Checkbox(value=True, label="Include Tags"), ], outputs=gr.Textbox(label="Result", lines=14), title="Memory Manager", description=( "
Lightweight local JSON memory store (no external DB). Choose an Action, fill only the relevant fields, and run.
" ), api_description=( "Manage short text memories with optional tags. Actions: save(text,tags), list(limit,include_tags), " "search(query,limit,include_tags), delete(memory_id). Enhanced search supports tag:name queries and AND/OR operators. " "Examples: 'tag:work', 'tag:work AND tag:project', 'meeting tag:work', 'tag:urgent OR important'. " "Action parameter is always required. Use Memory_Manager whenever you are given information worth remembering about the user, " "and search for memories when relevant." ), flagging_mode="never", ) __all__ = ["Memory_Manager", "build_interface", "_load_memories", "_save_memories"]