from __future__ import annotations import json import os import re import stat from datetime import datetime from typing import Annotated, Optional import gradio as gr from app import _log_call_end, _log_call_start, _truncate_for_log from ._docstrings import autodoc TOOL_SUMMARY = ( "Browse and search the Obsidian vault in read-only mode. " "Actions: list, read, info, search, help. " "All paths resolve within the vault root." ) HELP_TEXT = ( "Obsidian Vault ā actions and usage\n\n" "Root: Nymbo-Tools/Obsidian (override with OBSIDIAN_VAULT_ROOT). " "Absolute paths are disabled unless UNSAFE_ALLOW_ABS_PATHS=1.\n\n" "Actions and fields:\n" "- list: path='.' (default), recursive=false, show_hidden=false, max_entries=20\n" "- read: path, offset=0, max_chars=4000 (shows next_cursor when truncated)\n" "- info: path\n" "- search: path (note or folder), query text in the Search field, recursive=false, show_hidden=false, max_entries=20, case_sensitive=false, offset=0\n" "- help: show this guide\n\n" "Errors are returned as JSON with fields: {status:'error', code, message, path?, hint?, data?}.\n\n" "Examples:\n" "- list current: action=list, path='.'\n" "- read note: action=read, path='Projects/note.md', max_chars=500\n" "- show metadata: action=info, path='Inbox'\n" "- search notes: action=search, path='Projects', query='deadline', recursive=true, max_entries=100\n" "- case-sensitive search: action=search, query='TODO', case_sensitive=true\n" "- page search results: action=search, query='TODO', offset=20\n" ) def _default_root() -> str: env_root = os.getenv("OBSIDIAN_VAULT_ROOT") if env_root and env_root.strip(): return os.path.abspath(os.path.expanduser(env_root.strip())) try: here = os.path.abspath(__file__) tools_dir = os.path.dirname(os.path.dirname(here)) return os.path.abspath(os.path.join(tools_dir, "Obsidian")) except Exception: return os.path.abspath(os.getcwd()) ROOT_DIR = _default_root() try: os.makedirs(ROOT_DIR, exist_ok=True) except Exception: pass ALLOW_ABS = bool(int(os.getenv("UNSAFE_ALLOW_ABS_PATHS", "0"))) def _safe_err(exc: Exception | str) -> str: """Return an error string with any absolute root replaced by '/' and slashes normalized.""" s = str(exc) s_norm = s.replace("\\", "/") root_fwd = ROOT_DIR.replace("\\", "/") root_variants = {ROOT_DIR, root_fwd, re.sub(r"/+", "/", root_fwd)} for variant in root_variants: if variant: s_norm = s_norm.replace(variant, "/") s_norm = re.sub(r"/+", "/", s_norm) return s_norm def _err(code: str, message: str, *, path: str | None = None, hint: str | None = None, data: dict | None = None) -> str: payload = { "status": "error", "code": code, "message": message, "root": "/", } if path: payload["path"] = path if hint: payload["hint"] = hint if data: payload["data"] = data return json.dumps(payload, ensure_ascii=False) def _display_path(abs_path: str) -> str: try: norm_root = os.path.normpath(ROOT_DIR) norm_abs = os.path.normpath(abs_path) common = os.path.commonpath([norm_root, norm_abs]) if os.path.normcase(common) == os.path.normcase(norm_root): rel = os.path.relpath(norm_abs, norm_root) if rel == ".": return "/" return "/" + rel.replace("\\", "/") except Exception: pass return abs_path.replace("\\", "/") def _resolve_path(path: str) -> tuple[str, str]: try: user_input = (path or ".").strip() raw = os.path.expanduser(user_input) if os.path.isabs(raw): if not ALLOW_ABS: return "", _err( "absolute_path_disabled", "Absolute paths are disabled in safe mode.", path=raw.replace("\\", "/"), hint="Use a path relative to / (e.g., Notes/index.md).", ) abs_path = os.path.abspath(raw) else: abs_path = os.path.abspath(os.path.join(ROOT_DIR, raw)) if not ALLOW_ABS: try: common = os.path.commonpath([os.path.normpath(ROOT_DIR), os.path.normpath(abs_path)]) except Exception: root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR)) abs_cmp = os.path.normcase(os.path.normpath(abs_path)) if not abs_cmp.startswith(root_cmp): return "", _err( "path_outside_root", "Path not allowed outside root.", path=user_input.replace("\\", "/"), hint="Use a path under / (the vault root).", ) else: root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR)) common_cmp = os.path.normcase(os.path.normpath(common)) if common_cmp != root_cmp: return "", _err( "path_outside_root", "Path not allowed outside root.", path=user_input.replace("\\", "/"), hint="Use a path under / (the vault root).", ) return abs_path, "" except Exception as exc: return "", _err( "resolve_path_failed", "Failed to resolve path.", path=(path or ""), data={"error": _safe_err(exc)}, ) def _fmt_size(num_bytes: int) -> str: units = ["B", "KB", "MB", "GB", "TB"] size = float(num_bytes) for unit in units: if size < 1024.0: return f"{size:.1f} {unit}" size /= 1024.0 return f"{size:.1f} PB" def _list_dir(abs_path: str, *, show_hidden: bool, recursive: bool, max_entries: int) -> str: lines: list[str] = [] total = 0 listing_display = _display_path(abs_path) for root, dirs, files in os.walk(abs_path): if not show_hidden: dirs[:] = [d for d in dirs if not d.startswith('.')] files = [f for f in files if not f.startswith('.')] try: rel_root = os.path.relpath(root, ROOT_DIR) except Exception: rel_root = root rel_root_disp = "/" if rel_root == "." else "/" + rel_root.replace("\\", "/") lines.append(f"\nš {rel_root_disp}") dirs.sort() files.sort() for d in dirs: p = os.path.join(root, d) try: mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds') except Exception: mtime = "?" lines.append(f" ⢠[DIR] {d} (modified {mtime})") total += 1 if total >= max_entries: lines.append(f"\n⦠Truncated at {max_entries} entries.") return "\n".join(lines).strip() for f in files: p = os.path.join(root, f) try: size = _fmt_size(os.path.getsize(p)) mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds') except Exception: size, mtime = "?", "?" lines.append(f" ⢠{f} ({size}, modified {mtime})") total += 1 if total >= max_entries: lines.append(f"\n⦠Truncated at {max_entries} entries.") return "\n".join(lines).strip() if not recursive: break header = f"Listing of {listing_display}\nRoot: /\nEntries: {total}" return (header + "\n" + "\n".join(lines)).strip() def _search_text( abs_path: str, query: str, *, recursive: bool, show_hidden: bool, max_results: int, case_sensitive: bool, start_index: int, ) -> str: if not os.path.exists(abs_path): return _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path)) query = query or "" normalized_query = query if case_sensitive else query.lower() if normalized_query == "": return _err( "missing_search_query", "Search query is required for the search action.", hint="Provide text in the Search field to look for.", ) max_results = max(1, int(max_results) if max_results is not None else 20) start_index = max(0, int(start_index) if start_index is not None else 0) matches: list[tuple[str, int, str]] = [] errors: list[str] = [] files_scanned = 0 truncated = False total_matches = 0 def _should_skip(name: str) -> bool: return not show_hidden and name.startswith('.') def _handle_match(file_path: str, line_no: int, line_text: str) -> bool: nonlocal truncated, total_matches total_matches += 1 if total_matches <= start_index: return False if len(matches) < max_results: snippet = line_text.strip() if len(snippet) > 200: snippet = snippet[:197] + "ā¦" matches.append((_display_path(file_path), line_no, snippet)) return False truncated = True return True def _search_file(file_path: str) -> bool: nonlocal files_scanned files_scanned += 1 try: with open(file_path, 'r', encoding='utf-8', errors='replace') as handle: for line_no, line in enumerate(handle, start=1): haystack = line if case_sensitive else line.lower() if normalized_query in haystack: if _handle_match(file_path, line_no, line): return True except Exception as exc: errors.append(f"{_display_path(file_path)} ({_safe_err(exc)})") return truncated if os.path.isfile(abs_path): _search_file(abs_path) else: for root, dirs, files in os.walk(abs_path): dirs[:] = [d for d in dirs if not _should_skip(d)] visible_files = [f for f in files if show_hidden or not f.startswith('.')] for name in visible_files: file_path = os.path.join(root, name) if _search_file(file_path): break if truncated: break if not recursive: break header_lines = [ f"Search results for {query!r}", f"Scope: {_display_path(abs_path)}", f"Recursive: {'yes' if recursive else 'no'}, Hidden: {'yes' if show_hidden else 'no'}, Case-sensitive: {'yes' if case_sensitive else 'no'}", f"Start offset: {start_index}", f"Matches returned: {len(matches)}" + (" (truncated)" if truncated else ""), f"Files scanned: {files_scanned}", ] next_cursor = start_index + len(matches) if truncated else None if truncated: header_lines.append(f"Matches encountered before truncation: {total_matches}") header_lines.append(f"Truncated: yes ā re-run with offset={next_cursor} to continue.") header_lines.append(f"Next cursor: {next_cursor}") else: header_lines.append(f"Total matches found: {total_matches}") header_lines.append("Truncated: no ā end of results.") header_lines.append("Next cursor: None") if not matches: if total_matches > 0 and start_index >= total_matches: hint_limit = max(total_matches - 1, 0) body_lines = [ f"No matches found at or after offset {start_index}. Total matches available: {total_matches}.", (f"Try a smaller offset (⤠{hint_limit})." if hint_limit >= 0 else ""), ] body_lines = [line for line in body_lines if line] else: body_lines = [ "No matches found.", (f"Total matches encountered: {total_matches}." if total_matches else ""), ] body_lines = [line for line in body_lines if line] else: body_lines = [f"{idx}. {path}:{line_no}: {text}" for idx, (path, line_no, text) in enumerate(matches, start=1)] if errors: shown = errors[:5] body_lines.extend(["", "Warnings:"]) body_lines.extend(shown) if len(errors) > len(shown): body_lines.append(f"⦠{len(errors) - len(shown)} additional files could not be read.") return "\n".join(header_lines) + "\n\n" + "\n".join(body_lines) def _read_file(abs_path: str, *, offset: int, max_chars: int) -> str: if not os.path.exists(abs_path): return _err("file_not_found", f"File not found: {_display_path(abs_path)}", path=_display_path(abs_path)) if os.path.isdir(abs_path): return _err( "is_directory", f"Path is a directory, not a file: {_display_path(abs_path)}", path=_display_path(abs_path), hint="Provide a file path.", ) try: with open(abs_path, 'r', encoding='utf-8', errors='replace') as f: data = f.read() except Exception as exc: return _err("read_failed", "Failed to read file.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) total = len(data) start = max(0, min(offset, total)) end = total if max_chars <= 0 else min(total, start + max_chars) chunk = data[start:end] next_cursor = end if end < total else None header = ( f"Reading {_display_path(abs_path)}\n" f"Offset {start}, returned {len(chunk)} of {total}." + (f"\nNext cursor: {next_cursor}" if next_cursor is not None else "") ) return header + "\n\n---\n\n" + chunk def _info(abs_path: str) -> str: try: st = os.stat(abs_path) except Exception as exc: return _err("stat_failed", "Failed to stat path.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) info = { "path": _display_path(abs_path), "type": "directory" if stat.S_ISDIR(st.st_mode) else "file", "size": st.st_size, "modified": datetime.fromtimestamp(st.st_mtime).isoformat(sep=' ', timespec='seconds'), "created": datetime.fromtimestamp(st.st_ctime).isoformat(sep=' ', timespec='seconds'), "mode": oct(st.st_mode), "root": "/", } return json.dumps(info, indent=2) @autodoc(summary=TOOL_SUMMARY) def Obsidian_Vault( action: Annotated[str, "Operation to perform: 'list', 'read', 'info', 'search', 'help'."], path: Annotated[str, "Target path, relative to the vault root." ] = ".", query: Annotated[Optional[str], "Text to search for when action=search."] = None, recursive: Annotated[bool, "Recurse into subfolders when listing/searching."] = False, show_hidden: Annotated[bool, "Include hidden files when listing/searching."] = False, max_entries: Annotated[int, "Max entries to list or matches to return (for list/search)."] = 20, offset: Annotated[int, "Start offset when reading files."] = 0, max_chars: Annotated[int, "Max characters to return when reading (0 = full file)."] = 4000, case_sensitive: Annotated[bool, "Match case when searching text."] = False, ) -> str: _log_call_start( "Obsidian_Vault", action=action, path=path, query=query, recursive=recursive, show_hidden=show_hidden, max_entries=max_entries, offset=offset, max_chars=max_chars, case_sensitive=case_sensitive, ) action = (action or "").strip().lower() if action not in {"list", "read", "info", "search", "help"}: result = _err( "invalid_action", "Invalid action.", hint="Choose from: list, read, info, search, help.", ) _log_call_end("Obsidian_Vault", _truncate_for_log(result)) return result if action == "help": result = HELP_TEXT _log_call_end("Obsidian_Vault", _truncate_for_log(result)) return result abs_path, err = _resolve_path(path) if err: _log_call_end("Obsidian_Vault", _truncate_for_log(err)) return err try: if action == "list": if not os.path.exists(abs_path): result = _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path)) else: result = _list_dir(abs_path, show_hidden=show_hidden, recursive=recursive, max_entries=max_entries) elif action == "read": result = _read_file(abs_path, offset=offset, max_chars=max_chars) elif action == "search": query_text = query or "" if query_text.strip() == "": result = _err( "missing_search_query", "Search query is required for the search action.", hint="Provide text in the Search field to look for.", ) else: result = _search_text( abs_path, query_text, recursive=recursive, show_hidden=show_hidden, max_results=max_entries, case_sensitive=case_sensitive, start_index=offset, ) else: # info result = _info(abs_path) except Exception as exc: result = _err("exception", "Unhandled error during operation.", data={"error": _safe_err(exc)}) _log_call_end("Obsidian_Vault", _truncate_for_log(result)) return result def build_interface() -> gr.Interface: return gr.Interface( fn=Obsidian_Vault, inputs=[ gr.Radio( label="Action", choices=["list", "read", "info", "search", "help"], value="help", ), gr.Textbox(label="Path", placeholder=". or Notes/todo.md", max_lines=1, value="."), gr.Textbox(label="Search text (search)", lines=3, placeholder="Text to search for..."), gr.Checkbox(label="Recursive (list/search)", value=False), gr.Checkbox(label="Show hidden (list/search)", value=False), gr.Slider(minimum=10, maximum=5000, step=10, value=20, label="Max entries / matches"), gr.Slider(minimum=0, maximum=1_000_000, step=100, value=0, label="Offset (read/search start)"), gr.Slider(minimum=0, maximum=100_000, step=500, value=4000, label="Max chars (read, 0=all)"), gr.Checkbox(label="Case sensitive search", value=False), ], outputs=gr.Textbox(label="Result", lines=20), title="Obsidian Vault", description=( "