Tools / Modules /Memory_Manager.py
Nymbo's picture
implementing autodoc and simplifying context
bb22cbf verified
raw
history blame
9.76 kB
from __future__ import annotations
import json
import os
import threading
import uuid
from datetime import datetime
from typing import Annotated, Dict, List, Literal, Optional
import gradio as gr
from ._docstrings import autodoc
_MODULE_DIR = os.path.dirname(os.path.abspath(__file__))
MEMORY_FILE = os.path.join(os.path.dirname(_MODULE_DIR), "memories.json")
_MEMORY_LOCK = threading.RLock()
_MAX_MEMORIES = 10_000
def _now_iso() -> str:
return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
def _load_memories() -> List[Dict[str, str]]:
if not os.path.exists(MEMORY_FILE):
return []
try:
with open(MEMORY_FILE, "r", encoding="utf-8") as file:
data = json.load(file)
if isinstance(data, list):
cleaned: List[Dict[str, str]] = []
for item in data:
if isinstance(item, dict) and "id" in item and "text" in item:
cleaned.append(item)
return cleaned
return []
except Exception:
try:
backup = MEMORY_FILE + ".corrupt"
if not os.path.exists(backup):
os.replace(MEMORY_FILE, backup)
except Exception:
pass
return []
def _save_memories(memories: List[Dict[str, str]]) -> None:
tmp_path = MEMORY_FILE + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as file:
json.dump(memories, file, ensure_ascii=False, indent=2)
os.replace(tmp_path, MEMORY_FILE)
def _mem_save(text: str, tags: str) -> str:
text_clean = (text or "").strip()
if not text_clean:
return "Error: memory text is empty."
with _MEMORY_LOCK:
memories = _load_memories()
if memories and memories[-1].get("text") == text_clean:
return "Skipped: identical to last stored memory."
mem_id = str(uuid.uuid4())
entry = {
"id": mem_id,
"text": text_clean,
"timestamp": _now_iso(),
"tags": tags.strip(),
}
memories.append(entry)
if len(memories) > _MAX_MEMORIES:
overflow = len(memories) - _MAX_MEMORIES
memories = memories[overflow:]
_save_memories(memories)
return f"Memory saved: {mem_id}"
def _mem_list(limit: int, include_tags: bool) -> str:
limit = max(1, min(200, limit))
with _MEMORY_LOCK:
memories = _load_memories()
if not memories:
return "No memories stored yet."
chosen = memories[-limit:][::-1]
lines: List[str] = []
for memory in chosen:
base = f"{memory['id'][:8]} [{memory.get('timestamp','?')}] {memory.get('text','')}"
if include_tags and memory.get("tags"):
base += f" | tags: {memory['tags']}"
lines.append(base)
omitted = len(memories) - len(chosen)
if omitted > 0:
lines.append(f"… ({omitted} older memorie{'s' if omitted!=1 else ''} omitted; total={len(memories)})")
return "\n".join(lines)
def _parse_search_query(query: str) -> Dict[str, List[str]]:
import re
result = {"tag_terms": [], "text_terms": [], "operator": "and"}
if not query or not query.strip():
return result
query = re.sub(r"\s+", " ", query.strip())
if re.search(r"\bOR\b", query, re.IGNORECASE):
result["operator"] = "or"
parts = re.split(r"\s+OR\s+", query, flags=re.IGNORECASE)
else:
parts = re.split(r"\s+(?:AND\s+)?", query, flags=re.IGNORECASE)
parts = [p for p in parts if p.strip() and p.strip().upper() != "AND"]
for part in parts:
part = part.strip()
if not part:
continue
tag_match = re.match(r"^tag:(.+)$", part, re.IGNORECASE)
if tag_match:
tag_name = tag_match.group(1).strip()
if tag_name:
result["tag_terms"].append(tag_name.lower())
else:
result["text_terms"].append(part.lower())
return result
def _match_memory_with_query(memory: Dict[str, str], parsed_query: Dict[str, List[str]]) -> bool:
tag_terms = parsed_query["tag_terms"]
text_terms = parsed_query["text_terms"]
operator = parsed_query["operator"]
if not tag_terms and not text_terms:
return False
memory_text = memory.get("text", "").lower()
memory_tags = memory.get("tags", "").lower()
memory_tag_list = [tag.strip() for tag in memory_tags.split(",") if tag.strip()]
tag_matches = [any(tag_term in tag for tag in memory_tag_list) for tag_term in tag_terms]
combined_text = memory_text + " " + memory_tags
text_matches = [text_term in combined_text for text_term in text_terms]
all_matches = tag_matches + text_matches
if not all_matches:
return False
if operator == "or":
return any(all_matches)
return all(all_matches)
def _mem_search(query: str, limit: int) -> str:
q = (query or "").strip()
if not q:
return "Error: empty query."
parsed_query = _parse_search_query(q)
if not parsed_query["tag_terms"] and not parsed_query["text_terms"]:
return "Error: no valid search terms found."
limit = max(1, min(200, limit))
with _MEMORY_LOCK:
memories = _load_memories()
matches: List[Dict[str, str]] = []
total_matches = 0
for memory in reversed(memories):
if _match_memory_with_query(memory, parsed_query):
total_matches += 1
if len(matches) < limit:
matches.append(memory)
if not matches:
return f"No matches for: {query}"
lines = [
f"{memory['id'][:8]} [{memory.get('timestamp','?')}] {memory.get('text','')}" + (f" | tags: {memory['tags']}" if memory.get('tags') else "")
for memory in matches
]
omitted = total_matches - len(matches)
if omitted > 0:
lines.append(f"… ({omitted} additional match{'es' if omitted!=1 else ''} omitted; total_matches={total_matches})")
return "\n".join(lines)
def _mem_delete(memory_id: str) -> str:
key = (memory_id or "").strip().lower()
if len(key) < 4:
return "Error: supply at least 4 characters of the id."
with _MEMORY_LOCK:
memories = _load_memories()
matched = [memory for memory in memories if memory["id"].lower().startswith(key)]
if not matched:
return "Memory not found."
if len(matched) > 1 and key != matched[0]["id"].lower():
sample = ", ".join(memory["id"][:8] for memory in matched[:5])
more = "…" if len(matched) > 5 else ""
return f"Ambiguous prefix (matches {len(matched)} ids: {sample}{more}). Provide more characters."
target_id = matched[0]["id"]
memories = [memory for memory in memories if memory["id"] != target_id]
_save_memories(memories)
return f"Deleted memory: {target_id}"
# Single source of truth for the LLM-facing tool description
TOOL_SUMMARY = (
"Manage short text memories (save, list, search, delete) in a local JSON store with tags and simple query language; "
"returns a result string (confirmation, listing, matches, or error)."
)
@autodoc(
summary=TOOL_SUMMARY,
)
def Memory_Manager(
action: Annotated[Literal["save", "list", "search", "delete"], "Action to perform: save | list | search | delete"],
text: Annotated[Optional[str], "Text content (Save only)"] = None,
tags: Annotated[Optional[str], "Comma-separated tags (Save only)"] = None,
query: Annotated[Optional[str], "Enhanced search with tag:name syntax, AND/OR operators (Search only)"] = None,
limit: Annotated[int, "Max results (List/Search only)"] = 20,
memory_id: Annotated[Optional[str], "Full UUID or unique prefix (Delete only)"] = None,
include_tags: Annotated[bool, "Include tags (List/Search only)"] = True,
) -> str:
act = (action or "").lower().strip()
text = text or ""
tags = tags or ""
query = query or ""
memory_id = memory_id or ""
if act == "save":
if not text.strip():
return "Error: 'text' is required when action=save."
return _mem_save(text=text, tags=tags)
if act == "list":
return _mem_list(limit=limit, include_tags=include_tags)
if act == "search":
if not query.strip():
return "Error: 'query' is required when action=search."
return _mem_search(query=query, limit=limit)
if act == "delete":
if not memory_id.strip():
return "Error: 'memory_id' is required when action=delete."
return _mem_delete(memory_id=memory_id)
return "Error: invalid action (use save|list|search|delete)."
def build_interface() -> gr.Interface:
return gr.Interface(
fn=Memory_Manager,
inputs=[
gr.Dropdown(label="Action", choices=["save", "list", "search", "delete"], value="list"),
gr.Textbox(label="Text", lines=3, placeholder="Memory text (save)"),
gr.Textbox(label="Tags", placeholder="tag1, tag2", max_lines=1),
gr.Textbox(label="Query", placeholder="tag:work AND tag:project OR meeting", max_lines=1),
gr.Slider(1, 200, value=20, step=1, label="Limit"),
gr.Textbox(label="Memory ID / Prefix", placeholder="UUID or prefix (delete)", max_lines=1),
gr.Checkbox(value=True, label="Include Tags"),
],
outputs=gr.Textbox(label="Result", lines=14),
title="Memory Manager",
description=(
"<div style=\"text-align:center\">Lightweight local JSON memory store (no external DB). Choose an Action, fill only the relevant fields, and run.</div>"
),
api_description=TOOL_SUMMARY,
flagging_mode="never",
)
__all__ = ["Memory_Manager", "build_interface", "_load_memories", "_save_memories"]