|
|
from __future__ import annotations |
|
|
|
|
|
import json |
|
|
import os |
|
|
import re |
|
|
import stat |
|
|
from datetime import datetime |
|
|
from typing import Annotated, Optional |
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
from app import _log_call_end, _log_call_start, _truncate_for_log |
|
|
from ._docstrings import autodoc |
|
|
|
|
|
|
|
|
TOOL_SUMMARY = ( |
|
|
"Browse and search the Obsidian vault in read-only mode. " |
|
|
"Actions: list, read, info, search, help. " |
|
|
"All paths resolve within the vault root." |
|
|
) |
|
|
|
|
|
HELP_TEXT = ( |
|
|
"Obsidian Vault — actions and usage\n\n" |
|
|
"Root: Nymbo-Tools/Obsidian (override with OBSIDIAN_VAULT_ROOT). " |
|
|
"Absolute paths are disabled unless UNSAFE_ALLOW_ABS_PATHS=1.\n\n" |
|
|
"Actions and fields:\n" |
|
|
"- list: path='.' (default), recursive=false, show_hidden=false, max_entries=20\n" |
|
|
"- read: path, offset=0, max_chars=4000 (shows next_cursor when truncated)\n" |
|
|
"- info: path\n" |
|
|
"- search: path (note or folder), query text in the Search field, recursive=false, show_hidden=false, max_entries=20, case_sensitive=false, offset=0\n" |
|
|
"- help: show this guide\n\n" |
|
|
"Errors are returned as JSON with fields: {status:'error', code, message, path?, hint?, data?}.\n\n" |
|
|
"Examples:\n" |
|
|
"- list current: action=list, path='.'\n" |
|
|
"- read note: action=read, path='Projects/note.md', max_chars=500\n" |
|
|
"- show metadata: action=info, path='Inbox'\n" |
|
|
"- search notes: action=search, path='Projects', query='deadline', recursive=true, max_entries=100\n" |
|
|
"- case-sensitive search: action=search, query='TODO', case_sensitive=true\n" |
|
|
"- page search results: action=search, query='TODO', offset=20\n" |
|
|
) |
|
|
|
|
|
|
|
|
def _default_root() -> str: |
|
|
env_root = os.getenv("OBSIDIAN_VAULT_ROOT") |
|
|
if env_root and env_root.strip(): |
|
|
return os.path.abspath(os.path.expanduser(env_root.strip())) |
|
|
try: |
|
|
here = os.path.abspath(__file__) |
|
|
tools_dir = os.path.dirname(os.path.dirname(here)) |
|
|
return os.path.abspath(os.path.join(tools_dir, "Obsidian")) |
|
|
except Exception: |
|
|
return os.path.abspath(os.getcwd()) |
|
|
|
|
|
|
|
|
ROOT_DIR = _default_root() |
|
|
try: |
|
|
os.makedirs(ROOT_DIR, exist_ok=True) |
|
|
except Exception: |
|
|
pass |
|
|
ALLOW_ABS = bool(int(os.getenv("UNSAFE_ALLOW_ABS_PATHS", "0"))) |
|
|
|
|
|
|
|
|
def _safe_err(exc: Exception | str) -> str: |
|
|
"""Return an error string with any absolute root replaced by '/' and slashes normalized.""" |
|
|
s = str(exc) |
|
|
s_norm = s.replace("\\", "/") |
|
|
root_fwd = ROOT_DIR.replace("\\", "/") |
|
|
root_variants = {ROOT_DIR, root_fwd, re.sub(r"/+", "/", root_fwd)} |
|
|
for variant in root_variants: |
|
|
if variant: |
|
|
s_norm = s_norm.replace(variant, "/") |
|
|
s_norm = re.sub(r"/+", "/", s_norm) |
|
|
return s_norm |
|
|
|
|
|
|
|
|
def _err(code: str, message: str, *, path: str | None = None, hint: str | None = None, data: dict | None = None) -> str: |
|
|
payload = { |
|
|
"status": "error", |
|
|
"code": code, |
|
|
"message": message, |
|
|
"root": "/", |
|
|
} |
|
|
if path: |
|
|
payload["path"] = path |
|
|
if hint: |
|
|
payload["hint"] = hint |
|
|
if data: |
|
|
payload["data"] = data |
|
|
return json.dumps(payload, ensure_ascii=False) |
|
|
|
|
|
|
|
|
def _display_path(abs_path: str) -> str: |
|
|
try: |
|
|
norm_root = os.path.normpath(ROOT_DIR) |
|
|
norm_abs = os.path.normpath(abs_path) |
|
|
common = os.path.commonpath([norm_root, norm_abs]) |
|
|
if os.path.normcase(common) == os.path.normcase(norm_root): |
|
|
rel = os.path.relpath(norm_abs, norm_root) |
|
|
if rel == ".": |
|
|
return "/" |
|
|
return "/" + rel.replace("\\", "/") |
|
|
except Exception: |
|
|
pass |
|
|
return abs_path.replace("\\", "/") |
|
|
|
|
|
|
|
|
def _resolve_path(path: str) -> tuple[str, str]: |
|
|
try: |
|
|
user_input = (path or ".").strip() |
|
|
raw = os.path.expanduser(user_input) |
|
|
if os.path.isabs(raw): |
|
|
if not ALLOW_ABS: |
|
|
return "", _err( |
|
|
"absolute_path_disabled", |
|
|
"Absolute paths are disabled in safe mode.", |
|
|
path=raw.replace("\\", "/"), |
|
|
hint="Use a path relative to / (e.g., Notes/index.md).", |
|
|
) |
|
|
abs_path = os.path.abspath(raw) |
|
|
else: |
|
|
abs_path = os.path.abspath(os.path.join(ROOT_DIR, raw)) |
|
|
if not ALLOW_ABS: |
|
|
try: |
|
|
common = os.path.commonpath([os.path.normpath(ROOT_DIR), os.path.normpath(abs_path)]) |
|
|
except Exception: |
|
|
root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR)) |
|
|
abs_cmp = os.path.normcase(os.path.normpath(abs_path)) |
|
|
if not abs_cmp.startswith(root_cmp): |
|
|
return "", _err( |
|
|
"path_outside_root", |
|
|
"Path not allowed outside root.", |
|
|
path=user_input.replace("\\", "/"), |
|
|
hint="Use a path under / (the vault root).", |
|
|
) |
|
|
else: |
|
|
root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR)) |
|
|
common_cmp = os.path.normcase(os.path.normpath(common)) |
|
|
if common_cmp != root_cmp: |
|
|
return "", _err( |
|
|
"path_outside_root", |
|
|
"Path not allowed outside root.", |
|
|
path=user_input.replace("\\", "/"), |
|
|
hint="Use a path under / (the vault root).", |
|
|
) |
|
|
return abs_path, "" |
|
|
except Exception as exc: |
|
|
return "", _err( |
|
|
"resolve_path_failed", |
|
|
"Failed to resolve path.", |
|
|
path=(path or ""), |
|
|
data={"error": _safe_err(exc)}, |
|
|
) |
|
|
|
|
|
|
|
|
def _fmt_size(num_bytes: int) -> str: |
|
|
units = ["B", "KB", "MB", "GB", "TB"] |
|
|
size = float(num_bytes) |
|
|
for unit in units: |
|
|
if size < 1024.0: |
|
|
return f"{size:.1f} {unit}" |
|
|
size /= 1024.0 |
|
|
return f"{size:.1f} PB" |
|
|
|
|
|
|
|
|
def _list_dir(abs_path: str, *, show_hidden: bool, recursive: bool, max_entries: int) -> str: |
|
|
lines: list[str] = [] |
|
|
total = 0 |
|
|
listing_display = _display_path(abs_path) |
|
|
for root, dirs, files in os.walk(abs_path): |
|
|
if not show_hidden: |
|
|
dirs[:] = [d for d in dirs if not d.startswith('.')] |
|
|
files = [f for f in files if not f.startswith('.')] |
|
|
try: |
|
|
rel_root = os.path.relpath(root, ROOT_DIR) |
|
|
except Exception: |
|
|
rel_root = root |
|
|
rel_root_disp = "/" if rel_root == "." else "/" + rel_root.replace("\\", "/") |
|
|
lines.append(f"\n📂 {rel_root_disp}") |
|
|
dirs.sort() |
|
|
files.sort() |
|
|
for d in dirs: |
|
|
p = os.path.join(root, d) |
|
|
try: |
|
|
mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds') |
|
|
except Exception: |
|
|
mtime = "?" |
|
|
lines.append(f" • [DIR] {d} (modified {mtime})") |
|
|
total += 1 |
|
|
if total >= max_entries: |
|
|
lines.append(f"\n… Truncated at {max_entries} entries.") |
|
|
return "\n".join(lines).strip() |
|
|
for f in files: |
|
|
p = os.path.join(root, f) |
|
|
try: |
|
|
size = _fmt_size(os.path.getsize(p)) |
|
|
mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds') |
|
|
except Exception: |
|
|
size, mtime = "?", "?" |
|
|
lines.append(f" • {f} ({size}, modified {mtime})") |
|
|
total += 1 |
|
|
if total >= max_entries: |
|
|
lines.append(f"\n… Truncated at {max_entries} entries.") |
|
|
return "\n".join(lines).strip() |
|
|
if not recursive: |
|
|
break |
|
|
header = f"Listing of {listing_display}\nRoot: /\nEntries: {total}" |
|
|
return (header + "\n" + "\n".join(lines)).strip() |
|
|
|
|
|
|
|
|
def _search_text( |
|
|
abs_path: str, |
|
|
query: str, |
|
|
*, |
|
|
recursive: bool, |
|
|
show_hidden: bool, |
|
|
max_results: int, |
|
|
case_sensitive: bool, |
|
|
start_index: int, |
|
|
) -> str: |
|
|
if not os.path.exists(abs_path): |
|
|
return _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path)) |
|
|
|
|
|
query = query or "" |
|
|
normalized_query = query if case_sensitive else query.lower() |
|
|
if normalized_query == "": |
|
|
return _err( |
|
|
"missing_search_query", |
|
|
"Search query is required for the search action.", |
|
|
hint="Provide text in the Search field to look for.", |
|
|
) |
|
|
|
|
|
max_results = max(1, int(max_results) if max_results is not None else 20) |
|
|
start_index = max(0, int(start_index) if start_index is not None else 0) |
|
|
matches: list[tuple[str, int, str]] = [] |
|
|
errors: list[str] = [] |
|
|
files_scanned = 0 |
|
|
truncated = False |
|
|
total_matches = 0 |
|
|
|
|
|
def _should_skip(name: str) -> bool: |
|
|
return not show_hidden and name.startswith('.') |
|
|
|
|
|
def _handle_match(file_path: str, line_no: int, line_text: str) -> bool: |
|
|
nonlocal truncated, total_matches |
|
|
total_matches += 1 |
|
|
if total_matches <= start_index: |
|
|
return False |
|
|
if len(matches) < max_results: |
|
|
snippet = line_text.strip() |
|
|
if len(snippet) > 200: |
|
|
snippet = snippet[:197] + "…" |
|
|
matches.append((_display_path(file_path), line_no, snippet)) |
|
|
return False |
|
|
truncated = True |
|
|
return True |
|
|
|
|
|
def _search_file(file_path: str) -> bool: |
|
|
nonlocal files_scanned |
|
|
files_scanned += 1 |
|
|
try: |
|
|
with open(file_path, 'r', encoding='utf-8', errors='replace') as handle: |
|
|
for line_no, line in enumerate(handle, start=1): |
|
|
haystack = line if case_sensitive else line.lower() |
|
|
if normalized_query in haystack: |
|
|
if _handle_match(file_path, line_no, line): |
|
|
return True |
|
|
except Exception as exc: |
|
|
errors.append(f"{_display_path(file_path)} ({_safe_err(exc)})") |
|
|
return truncated |
|
|
|
|
|
if os.path.isfile(abs_path): |
|
|
_search_file(abs_path) |
|
|
else: |
|
|
for root, dirs, files in os.walk(abs_path): |
|
|
dirs[:] = [d for d in dirs if not _should_skip(d)] |
|
|
visible_files = [f for f in files if show_hidden or not f.startswith('.')] |
|
|
for name in visible_files: |
|
|
file_path = os.path.join(root, name) |
|
|
if _search_file(file_path): |
|
|
break |
|
|
if truncated: |
|
|
break |
|
|
if not recursive: |
|
|
break |
|
|
|
|
|
header_lines = [ |
|
|
f"Search results for {query!r}", |
|
|
f"Scope: {_display_path(abs_path)}", |
|
|
f"Recursive: {'yes' if recursive else 'no'}, Hidden: {'yes' if show_hidden else 'no'}, Case-sensitive: {'yes' if case_sensitive else 'no'}", |
|
|
f"Start offset: {start_index}", |
|
|
f"Matches returned: {len(matches)}" + (" (truncated)" if truncated else ""), |
|
|
f"Files scanned: {files_scanned}", |
|
|
] |
|
|
|
|
|
next_cursor = start_index + len(matches) if truncated else None |
|
|
|
|
|
if truncated: |
|
|
header_lines.append(f"Matches encountered before truncation: {total_matches}") |
|
|
header_lines.append(f"Truncated: yes — re-run with offset={next_cursor} to continue.") |
|
|
header_lines.append(f"Next cursor: {next_cursor}") |
|
|
else: |
|
|
header_lines.append(f"Total matches found: {total_matches}") |
|
|
header_lines.append("Truncated: no — end of results.") |
|
|
header_lines.append("Next cursor: None") |
|
|
|
|
|
if not matches: |
|
|
if total_matches > 0 and start_index >= total_matches: |
|
|
hint_limit = max(total_matches - 1, 0) |
|
|
body_lines = [ |
|
|
f"No matches found at or after offset {start_index}. Total matches available: {total_matches}.", |
|
|
(f"Try a smaller offset (≤ {hint_limit})." if hint_limit >= 0 else ""), |
|
|
] |
|
|
body_lines = [line for line in body_lines if line] |
|
|
else: |
|
|
body_lines = [ |
|
|
"No matches found.", |
|
|
(f"Total matches encountered: {total_matches}." if total_matches else ""), |
|
|
] |
|
|
body_lines = [line for line in body_lines if line] |
|
|
else: |
|
|
body_lines = [f"{idx}. {path}:{line_no}: {text}" for idx, (path, line_no, text) in enumerate(matches, start=1)] |
|
|
|
|
|
if errors: |
|
|
shown = errors[:5] |
|
|
body_lines.extend(["", "Warnings:"]) |
|
|
body_lines.extend(shown) |
|
|
if len(errors) > len(shown): |
|
|
body_lines.append(f"… {len(errors) - len(shown)} additional files could not be read.") |
|
|
|
|
|
return "\n".join(header_lines) + "\n\n" + "\n".join(body_lines) |
|
|
|
|
|
|
|
|
def _read_file(abs_path: str, *, offset: int, max_chars: int) -> str: |
|
|
if not os.path.exists(abs_path): |
|
|
return _err("file_not_found", f"File not found: {_display_path(abs_path)}", path=_display_path(abs_path)) |
|
|
if os.path.isdir(abs_path): |
|
|
return _err( |
|
|
"is_directory", |
|
|
f"Path is a directory, not a file: {_display_path(abs_path)}", |
|
|
path=_display_path(abs_path), |
|
|
hint="Provide a file path.", |
|
|
) |
|
|
try: |
|
|
with open(abs_path, 'r', encoding='utf-8', errors='replace') as f: |
|
|
data = f.read() |
|
|
except Exception as exc: |
|
|
return _err("read_failed", "Failed to read file.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) |
|
|
total = len(data) |
|
|
start = max(0, min(offset, total)) |
|
|
end = total if max_chars <= 0 else min(total, start + max_chars) |
|
|
chunk = data[start:end] |
|
|
next_cursor = end if end < total else None |
|
|
header = ( |
|
|
f"Reading {_display_path(abs_path)}\n" |
|
|
f"Offset {start}, returned {len(chunk)} of {total}." |
|
|
+ (f"\nNext cursor: {next_cursor}" if next_cursor is not None else "") |
|
|
) |
|
|
return header + "\n\n---\n\n" + chunk |
|
|
|
|
|
|
|
|
def _info(abs_path: str) -> str: |
|
|
try: |
|
|
st = os.stat(abs_path) |
|
|
except Exception as exc: |
|
|
return _err("stat_failed", "Failed to stat path.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) |
|
|
info = { |
|
|
"path": _display_path(abs_path), |
|
|
"type": "directory" if stat.S_ISDIR(st.st_mode) else "file", |
|
|
"size": st.st_size, |
|
|
"modified": datetime.fromtimestamp(st.st_mtime).isoformat(sep=' ', timespec='seconds'), |
|
|
"created": datetime.fromtimestamp(st.st_ctime).isoformat(sep=' ', timespec='seconds'), |
|
|
"mode": oct(st.st_mode), |
|
|
"root": "/", |
|
|
} |
|
|
return json.dumps(info, indent=2) |
|
|
|
|
|
|
|
|
@autodoc(summary=TOOL_SUMMARY) |
|
|
def Obsidian_Vault( |
|
|
action: Annotated[str, "Operation to perform: 'list', 'read', 'info', 'search', 'help'."], |
|
|
path: Annotated[str, "Target path, relative to the vault root." ] = ".", |
|
|
query: Annotated[Optional[str], "Text to search for when action=search."] = None, |
|
|
recursive: Annotated[bool, "Recurse into subfolders when listing/searching."] = False, |
|
|
show_hidden: Annotated[bool, "Include hidden files when listing/searching."] = False, |
|
|
max_entries: Annotated[int, "Max entries to list or matches to return (for list/search)."] = 20, |
|
|
offset: Annotated[int, "Start offset when reading files."] = 0, |
|
|
max_chars: Annotated[int, "Max characters to return when reading (0 = full file)."] = 4000, |
|
|
case_sensitive: Annotated[bool, "Match case when searching text."] = False, |
|
|
) -> str: |
|
|
_log_call_start( |
|
|
"Obsidian_Vault", |
|
|
action=action, |
|
|
path=path, |
|
|
query=query, |
|
|
recursive=recursive, |
|
|
show_hidden=show_hidden, |
|
|
max_entries=max_entries, |
|
|
offset=offset, |
|
|
max_chars=max_chars, |
|
|
case_sensitive=case_sensitive, |
|
|
) |
|
|
action = (action or "").strip().lower() |
|
|
if action not in {"list", "read", "info", "search", "help"}: |
|
|
result = _err( |
|
|
"invalid_action", |
|
|
"Invalid action.", |
|
|
hint="Choose from: list, read, info, search, help.", |
|
|
) |
|
|
_log_call_end("Obsidian_Vault", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
if action == "help": |
|
|
result = HELP_TEXT |
|
|
_log_call_end("Obsidian_Vault", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
abs_path, err = _resolve_path(path) |
|
|
if err: |
|
|
_log_call_end("Obsidian_Vault", _truncate_for_log(err)) |
|
|
return err |
|
|
|
|
|
try: |
|
|
if action == "list": |
|
|
if not os.path.exists(abs_path): |
|
|
result = _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path)) |
|
|
else: |
|
|
result = _list_dir(abs_path, show_hidden=show_hidden, recursive=recursive, max_entries=max_entries) |
|
|
elif action == "read": |
|
|
result = _read_file(abs_path, offset=offset, max_chars=max_chars) |
|
|
elif action == "search": |
|
|
query_text = query or "" |
|
|
if query_text.strip() == "": |
|
|
result = _err( |
|
|
"missing_search_query", |
|
|
"Search query is required for the search action.", |
|
|
hint="Provide text in the Search field to look for.", |
|
|
) |
|
|
else: |
|
|
result = _search_text( |
|
|
abs_path, |
|
|
query_text, |
|
|
recursive=recursive, |
|
|
show_hidden=show_hidden, |
|
|
max_results=max_entries, |
|
|
case_sensitive=case_sensitive, |
|
|
start_index=offset, |
|
|
) |
|
|
else: |
|
|
result = _info(abs_path) |
|
|
except Exception as exc: |
|
|
result = _err("exception", "Unhandled error during operation.", data={"error": _safe_err(exc)}) |
|
|
|
|
|
_log_call_end("Obsidian_Vault", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
|
|
|
def build_interface() -> gr.Interface: |
|
|
return gr.Interface( |
|
|
fn=Obsidian_Vault, |
|
|
inputs=[ |
|
|
gr.Radio( |
|
|
label="Action", |
|
|
choices=["list", "read", "info", "search", "help"], |
|
|
value="help", |
|
|
), |
|
|
gr.Textbox(label="Path", placeholder=". or Notes/todo.md", max_lines=1, value="."), |
|
|
gr.Textbox(label="Search text (search)", lines=3, placeholder="Text to search for..."), |
|
|
gr.Checkbox(label="Recursive (list/search)", value=False), |
|
|
gr.Checkbox(label="Show hidden (list/search)", value=False), |
|
|
gr.Slider(minimum=10, maximum=5000, step=10, value=20, label="Max entries / matches"), |
|
|
gr.Slider(minimum=0, maximum=1_000_000, step=100, value=0, label="Offset (read/search start)"), |
|
|
gr.Slider(minimum=0, maximum=100_000, step=500, value=4000, label="Max chars (read, 0=all)"), |
|
|
gr.Checkbox(label="Case sensitive search", value=False), |
|
|
], |
|
|
outputs=gr.Textbox(label="Result", lines=20), |
|
|
title="Obsidian Vault", |
|
|
description=( |
|
|
"<div style=\"text-align:center; overflow:hidden;\">Explore and search notes in the vault without modifying them." "</div>" |
|
|
), |
|
|
api_description=TOOL_SUMMARY, |
|
|
flagging_mode="never", |
|
|
submit_btn="Run", |
|
|
css=( |
|
|
""" |
|
|
article.prose, .prose, .gr-prose { |
|
|
overflow: visible !important; |
|
|
max-height: none !important; |
|
|
-ms-overflow-style: none !important; |
|
|
scrollbar-width: none !important; |
|
|
} |
|
|
article.prose::-webkit-scrollbar, |
|
|
.prose::-webkit-scrollbar, |
|
|
.gr-prose::-webkit-scrollbar { |
|
|
display: none !important; |
|
|
} |
|
|
""" |
|
|
), |
|
|
) |
|
|
|
|
|
|
|
|
__all__ = ["Obsidian_Vault", "build_interface"] |
|
|
|