Tools / Modules /Obsidian_Vault.py
Nymbo's picture
Create Obsidian_Vault.py
e63577a verified
raw
history blame
19.5 kB
from __future__ import annotations
import json
import os
import re
import stat
from datetime import datetime
from typing import Annotated, Optional
import gradio as gr
from app import _log_call_end, _log_call_start, _truncate_for_log
from ._docstrings import autodoc
TOOL_SUMMARY = (
"Browse and search the Obsidian vault in read-only mode. "
"Actions: list, read, info, search, help. "
"All paths resolve within the vault root."
)
HELP_TEXT = (
"Obsidian Vault — actions and usage\n\n"
"Root: Nymbo-Tools/Obsidian (override with OBSIDIAN_VAULT_ROOT). "
"Absolute paths are disabled unless UNSAFE_ALLOW_ABS_PATHS=1.\n\n"
"Actions and fields:\n"
"- list: path='.' (default), recursive=false, show_hidden=false, max_entries=20\n"
"- read: path, offset=0, max_chars=4000 (shows next_cursor when truncated)\n"
"- info: path\n"
"- search: path (note or folder), query text in the Search field, recursive=false, show_hidden=false, max_entries=20, case_sensitive=false, offset=0\n"
"- help: show this guide\n\n"
"Errors are returned as JSON with fields: {status:'error', code, message, path?, hint?, data?}.\n\n"
"Examples:\n"
"- list current: action=list, path='.'\n"
"- read note: action=read, path='Projects/note.md', max_chars=500\n"
"- show metadata: action=info, path='Inbox'\n"
"- search notes: action=search, path='Projects', query='deadline', recursive=true, max_entries=100\n"
"- case-sensitive search: action=search, query='TODO', case_sensitive=true\n"
"- page search results: action=search, query='TODO', offset=20\n"
)
def _default_root() -> str:
env_root = os.getenv("OBSIDIAN_VAULT_ROOT")
if env_root and env_root.strip():
return os.path.abspath(os.path.expanduser(env_root.strip()))
try:
here = os.path.abspath(__file__)
tools_dir = os.path.dirname(os.path.dirname(here))
return os.path.abspath(os.path.join(tools_dir, "Obsidian"))
except Exception:
return os.path.abspath(os.getcwd())
ROOT_DIR = _default_root()
try:
os.makedirs(ROOT_DIR, exist_ok=True)
except Exception:
pass
ALLOW_ABS = bool(int(os.getenv("UNSAFE_ALLOW_ABS_PATHS", "0")))
def _safe_err(exc: Exception | str) -> str:
"""Return an error string with any absolute root replaced by '/' and slashes normalized."""
s = str(exc)
s_norm = s.replace("\\", "/")
root_fwd = ROOT_DIR.replace("\\", "/")
root_variants = {ROOT_DIR, root_fwd, re.sub(r"/+", "/", root_fwd)}
for variant in root_variants:
if variant:
s_norm = s_norm.replace(variant, "/")
s_norm = re.sub(r"/+", "/", s_norm)
return s_norm
def _err(code: str, message: str, *, path: str | None = None, hint: str | None = None, data: dict | None = None) -> str:
payload = {
"status": "error",
"code": code,
"message": message,
"root": "/",
}
if path:
payload["path"] = path
if hint:
payload["hint"] = hint
if data:
payload["data"] = data
return json.dumps(payload, ensure_ascii=False)
def _display_path(abs_path: str) -> str:
try:
norm_root = os.path.normpath(ROOT_DIR)
norm_abs = os.path.normpath(abs_path)
common = os.path.commonpath([norm_root, norm_abs])
if os.path.normcase(common) == os.path.normcase(norm_root):
rel = os.path.relpath(norm_abs, norm_root)
if rel == ".":
return "/"
return "/" + rel.replace("\\", "/")
except Exception:
pass
return abs_path.replace("\\", "/")
def _resolve_path(path: str) -> tuple[str, str]:
try:
user_input = (path or ".").strip()
raw = os.path.expanduser(user_input)
if os.path.isabs(raw):
if not ALLOW_ABS:
return "", _err(
"absolute_path_disabled",
"Absolute paths are disabled in safe mode.",
path=raw.replace("\\", "/"),
hint="Use a path relative to / (e.g., Notes/index.md).",
)
abs_path = os.path.abspath(raw)
else:
abs_path = os.path.abspath(os.path.join(ROOT_DIR, raw))
if not ALLOW_ABS:
try:
common = os.path.commonpath([os.path.normpath(ROOT_DIR), os.path.normpath(abs_path)])
except Exception:
root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR))
abs_cmp = os.path.normcase(os.path.normpath(abs_path))
if not abs_cmp.startswith(root_cmp):
return "", _err(
"path_outside_root",
"Path not allowed outside root.",
path=user_input.replace("\\", "/"),
hint="Use a path under / (the vault root).",
)
else:
root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR))
common_cmp = os.path.normcase(os.path.normpath(common))
if common_cmp != root_cmp:
return "", _err(
"path_outside_root",
"Path not allowed outside root.",
path=user_input.replace("\\", "/"),
hint="Use a path under / (the vault root).",
)
return abs_path, ""
except Exception as exc:
return "", _err(
"resolve_path_failed",
"Failed to resolve path.",
path=(path or ""),
data={"error": _safe_err(exc)},
)
def _fmt_size(num_bytes: int) -> str:
units = ["B", "KB", "MB", "GB", "TB"]
size = float(num_bytes)
for unit in units:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} PB"
def _list_dir(abs_path: str, *, show_hidden: bool, recursive: bool, max_entries: int) -> str:
lines: list[str] = []
total = 0
listing_display = _display_path(abs_path)
for root, dirs, files in os.walk(abs_path):
if not show_hidden:
dirs[:] = [d for d in dirs if not d.startswith('.')]
files = [f for f in files if not f.startswith('.')]
try:
rel_root = os.path.relpath(root, ROOT_DIR)
except Exception:
rel_root = root
rel_root_disp = "/" if rel_root == "." else "/" + rel_root.replace("\\", "/")
lines.append(f"\n📂 {rel_root_disp}")
dirs.sort()
files.sort()
for d in dirs:
p = os.path.join(root, d)
try:
mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds')
except Exception:
mtime = "?"
lines.append(f" • [DIR] {d} (modified {mtime})")
total += 1
if total >= max_entries:
lines.append(f"\n… Truncated at {max_entries} entries.")
return "\n".join(lines).strip()
for f in files:
p = os.path.join(root, f)
try:
size = _fmt_size(os.path.getsize(p))
mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds')
except Exception:
size, mtime = "?", "?"
lines.append(f" • {f} ({size}, modified {mtime})")
total += 1
if total >= max_entries:
lines.append(f"\n… Truncated at {max_entries} entries.")
return "\n".join(lines).strip()
if not recursive:
break
header = f"Listing of {listing_display}\nRoot: /\nEntries: {total}"
return (header + "\n" + "\n".join(lines)).strip()
def _search_text(
abs_path: str,
query: str,
*,
recursive: bool,
show_hidden: bool,
max_results: int,
case_sensitive: bool,
start_index: int,
) -> str:
if not os.path.exists(abs_path):
return _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path))
query = query or ""
normalized_query = query if case_sensitive else query.lower()
if normalized_query == "":
return _err(
"missing_search_query",
"Search query is required for the search action.",
hint="Provide text in the Search field to look for.",
)
max_results = max(1, int(max_results) if max_results is not None else 20)
start_index = max(0, int(start_index) if start_index is not None else 0)
matches: list[tuple[str, int, str]] = []
errors: list[str] = []
files_scanned = 0
truncated = False
total_matches = 0
def _should_skip(name: str) -> bool:
return not show_hidden and name.startswith('.')
def _handle_match(file_path: str, line_no: int, line_text: str) -> bool:
nonlocal truncated, total_matches
total_matches += 1
if total_matches <= start_index:
return False
if len(matches) < max_results:
snippet = line_text.strip()
if len(snippet) > 200:
snippet = snippet[:197] + "…"
matches.append((_display_path(file_path), line_no, snippet))
return False
truncated = True
return True
def _search_file(file_path: str) -> bool:
nonlocal files_scanned
files_scanned += 1
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as handle:
for line_no, line in enumerate(handle, start=1):
haystack = line if case_sensitive else line.lower()
if normalized_query in haystack:
if _handle_match(file_path, line_no, line):
return True
except Exception as exc:
errors.append(f"{_display_path(file_path)} ({_safe_err(exc)})")
return truncated
if os.path.isfile(abs_path):
_search_file(abs_path)
else:
for root, dirs, files in os.walk(abs_path):
dirs[:] = [d for d in dirs if not _should_skip(d)]
visible_files = [f for f in files if show_hidden or not f.startswith('.')]
for name in visible_files:
file_path = os.path.join(root, name)
if _search_file(file_path):
break
if truncated:
break
if not recursive:
break
header_lines = [
f"Search results for {query!r}",
f"Scope: {_display_path(abs_path)}",
f"Recursive: {'yes' if recursive else 'no'}, Hidden: {'yes' if show_hidden else 'no'}, Case-sensitive: {'yes' if case_sensitive else 'no'}",
f"Start offset: {start_index}",
f"Matches returned: {len(matches)}" + (" (truncated)" if truncated else ""),
f"Files scanned: {files_scanned}",
]
next_cursor = start_index + len(matches) if truncated else None
if truncated:
header_lines.append(f"Matches encountered before truncation: {total_matches}")
header_lines.append(f"Truncated: yes — re-run with offset={next_cursor} to continue.")
header_lines.append(f"Next cursor: {next_cursor}")
else:
header_lines.append(f"Total matches found: {total_matches}")
header_lines.append("Truncated: no — end of results.")
header_lines.append("Next cursor: None")
if not matches:
if total_matches > 0 and start_index >= total_matches:
hint_limit = max(total_matches - 1, 0)
body_lines = [
f"No matches found at or after offset {start_index}. Total matches available: {total_matches}.",
(f"Try a smaller offset (≤ {hint_limit})." if hint_limit >= 0 else ""),
]
body_lines = [line for line in body_lines if line]
else:
body_lines = [
"No matches found.",
(f"Total matches encountered: {total_matches}." if total_matches else ""),
]
body_lines = [line for line in body_lines if line]
else:
body_lines = [f"{idx}. {path}:{line_no}: {text}" for idx, (path, line_no, text) in enumerate(matches, start=1)]
if errors:
shown = errors[:5]
body_lines.extend(["", "Warnings:"])
body_lines.extend(shown)
if len(errors) > len(shown):
body_lines.append(f"… {len(errors) - len(shown)} additional files could not be read.")
return "\n".join(header_lines) + "\n\n" + "\n".join(body_lines)
def _read_file(abs_path: str, *, offset: int, max_chars: int) -> str:
if not os.path.exists(abs_path):
return _err("file_not_found", f"File not found: {_display_path(abs_path)}", path=_display_path(abs_path))
if os.path.isdir(abs_path):
return _err(
"is_directory",
f"Path is a directory, not a file: {_display_path(abs_path)}",
path=_display_path(abs_path),
hint="Provide a file path.",
)
try:
with open(abs_path, 'r', encoding='utf-8', errors='replace') as f:
data = f.read()
except Exception as exc:
return _err("read_failed", "Failed to read file.", path=_display_path(abs_path), data={"error": _safe_err(exc)})
total = len(data)
start = max(0, min(offset, total))
end = total if max_chars <= 0 else min(total, start + max_chars)
chunk = data[start:end]
next_cursor = end if end < total else None
header = (
f"Reading {_display_path(abs_path)}\n"
f"Offset {start}, returned {len(chunk)} of {total}."
+ (f"\nNext cursor: {next_cursor}" if next_cursor is not None else "")
)
return header + "\n\n---\n\n" + chunk
def _info(abs_path: str) -> str:
try:
st = os.stat(abs_path)
except Exception as exc:
return _err("stat_failed", "Failed to stat path.", path=_display_path(abs_path), data={"error": _safe_err(exc)})
info = {
"path": _display_path(abs_path),
"type": "directory" if stat.S_ISDIR(st.st_mode) else "file",
"size": st.st_size,
"modified": datetime.fromtimestamp(st.st_mtime).isoformat(sep=' ', timespec='seconds'),
"created": datetime.fromtimestamp(st.st_ctime).isoformat(sep=' ', timespec='seconds'),
"mode": oct(st.st_mode),
"root": "/",
}
return json.dumps(info, indent=2)
@autodoc(summary=TOOL_SUMMARY)
def Obsidian_Vault(
action: Annotated[str, "Operation to perform: 'list', 'read', 'info', 'search', 'help'."],
path: Annotated[str, "Target path, relative to the vault root." ] = ".",
query: Annotated[Optional[str], "Text to search for when action=search."] = None,
recursive: Annotated[bool, "Recurse into subfolders when listing/searching."] = False,
show_hidden: Annotated[bool, "Include hidden files when listing/searching."] = False,
max_entries: Annotated[int, "Max entries to list or matches to return (for list/search)."] = 20,
offset: Annotated[int, "Start offset when reading files."] = 0,
max_chars: Annotated[int, "Max characters to return when reading (0 = full file)."] = 4000,
case_sensitive: Annotated[bool, "Match case when searching text."] = False,
) -> str:
_log_call_start(
"Obsidian_Vault",
action=action,
path=path,
query=query,
recursive=recursive,
show_hidden=show_hidden,
max_entries=max_entries,
offset=offset,
max_chars=max_chars,
case_sensitive=case_sensitive,
)
action = (action or "").strip().lower()
if action not in {"list", "read", "info", "search", "help"}:
result = _err(
"invalid_action",
"Invalid action.",
hint="Choose from: list, read, info, search, help.",
)
_log_call_end("Obsidian_Vault", _truncate_for_log(result))
return result
if action == "help":
result = HELP_TEXT
_log_call_end("Obsidian_Vault", _truncate_for_log(result))
return result
abs_path, err = _resolve_path(path)
if err:
_log_call_end("Obsidian_Vault", _truncate_for_log(err))
return err
try:
if action == "list":
if not os.path.exists(abs_path):
result = _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path))
else:
result = _list_dir(abs_path, show_hidden=show_hidden, recursive=recursive, max_entries=max_entries)
elif action == "read":
result = _read_file(abs_path, offset=offset, max_chars=max_chars)
elif action == "search":
query_text = query or ""
if query_text.strip() == "":
result = _err(
"missing_search_query",
"Search query is required for the search action.",
hint="Provide text in the Search field to look for.",
)
else:
result = _search_text(
abs_path,
query_text,
recursive=recursive,
show_hidden=show_hidden,
max_results=max_entries,
case_sensitive=case_sensitive,
start_index=offset,
)
else: # info
result = _info(abs_path)
except Exception as exc:
result = _err("exception", "Unhandled error during operation.", data={"error": _safe_err(exc)})
_log_call_end("Obsidian_Vault", _truncate_for_log(result))
return result
def build_interface() -> gr.Interface:
return gr.Interface(
fn=Obsidian_Vault,
inputs=[
gr.Radio(
label="Action",
choices=["list", "read", "info", "search", "help"],
value="help",
),
gr.Textbox(label="Path", placeholder=". or Notes/todo.md", max_lines=1, value="."),
gr.Textbox(label="Search text (search)", lines=3, placeholder="Text to search for..."),
gr.Checkbox(label="Recursive (list/search)", value=False),
gr.Checkbox(label="Show hidden (list/search)", value=False),
gr.Slider(minimum=10, maximum=5000, step=10, value=20, label="Max entries / matches"),
gr.Slider(minimum=0, maximum=1_000_000, step=100, value=0, label="Offset (read/search start)"),
gr.Slider(minimum=0, maximum=100_000, step=500, value=4000, label="Max chars (read, 0=all)"),
gr.Checkbox(label="Case sensitive search", value=False),
],
outputs=gr.Textbox(label="Result", lines=20),
title="Obsidian Vault",
description=(
"<div style=\"text-align:center; overflow:hidden;\">Explore and search notes in the vault without modifying them." "</div>"
),
api_description=TOOL_SUMMARY,
flagging_mode="never",
submit_btn="Run",
css=(
"""
article.prose, .prose, .gr-prose {
overflow: visible !important;
max-height: none !important;
-ms-overflow-style: none !important;
scrollbar-width: none !important;
}
article.prose::-webkit-scrollbar,
.prose::-webkit-scrollbar,
.gr-prose::-webkit-scrollbar {
display: none !important;
}
"""
),
)
__all__ = ["Obsidian_Vault", "build_interface"]