from __future__ import annotations import json import os import shutil import stat from datetime import datetime from typing import Annotated, Optional import re import gradio as gr from app import _log_call_end, _log_call_start, _truncate_for_log from ._docstrings import autodoc TOOL_SUMMARY = ( "Browse and manage files within a safe root. " "Actions: list, read, write, append, mkdir, move, copy, delete, info, help. " "Fill other fields as needed. " "Use paths like `.` because all paths are relative to the root (`/`). " "Use 'help' to see action-specific required fields and examples." ) HELP_TEXT = ( "File System — actions and usage\n\n" "Root: paths resolve under Nymbo-Tools/Filesystem by default (or NYMBO_TOOLS_ROOT if set). " "Absolute paths are disabled unless UNSAFE_ALLOW_ABS_PATHS=1.\n\n" "Actions and fields:\n" "- list: path='.' (default), recursive=false, show_hidden=false, max_entries=200\n" "- read: path, offset=0, max_chars=4000 (shows next_cursor when truncated)\n" "- write: path, content (UTF-8), create_dirs=true\n" "- append: path, content (UTF-8), create_dirs=true\n" "- mkdir: path (directory), exist_ok=true\n" "- move: path (src), dest_path (dst), overwrite=false\n" "- copy: path (src), dest_path (dst), overwrite=false\n" "- delete: path, recursive=true (required for directories)\n" "- info: path\n" "- help: show this guide\n\n" "Errors are returned as JSON with fields: {status:'error', code, message, path?, hint?, data?}.\n\n" "Examples:\n" "- list current: action=list, path='.'\n" "- make folder: action=mkdir, path='notes'\n" "- write file: action=write, path='notes/todo.txt', content='hello'\n" "- read file: action=read, path='notes/todo.txt', max_chars=200\n" "- move file: action=move, path='notes/todo.txt', dest_path='notes/todo-old.txt', overwrite=true\n" "- delete dir: action=delete, path='notes', recursive=true\n" ) def _default_root() -> str: # Prefer explicit root via env var root = os.getenv("NYMBO_TOOLS_ROOT") if root and root.strip(): return os.path.abspath(os.path.expanduser(root.strip())) # Default to "Nymbo-Tools/Filesystem" alongside this module package try: here = os.path.abspath(__file__) tools_dir = os.path.dirname(os.path.dirname(here)) # .../Nymbo-Tools default_root = os.path.abspath(os.path.join(tools_dir, "Filesystem")) return default_root except Exception: # Final fallback return os.path.abspath(os.getcwd()) ROOT_DIR = _default_root() # Ensure the default root directory exists to make listing/writing more convenient try: os.makedirs(ROOT_DIR, exist_ok=True) except Exception: pass ALLOW_ABS = bool(int(os.getenv("UNSAFE_ALLOW_ABS_PATHS", "0"))) def _safe_err(exc: Exception | str) -> str: """Return an error string with any absolute root replaced by '/' and slashes normalized. This handles variants like backslashes and duplicate slashes in OS messages. """ s = str(exc) # Normalize to forward slashes for comparison s_norm = s.replace("\\", "/") root_fwd = ROOT_DIR.replace("\\", "/") # Collapse duplicate slashes in root representation root_variants = {ROOT_DIR, root_fwd, re.sub(r"/+", "/", root_fwd)} for variant in root_variants: if variant: s_norm = s_norm.replace(variant, "/") # Collapse duplicate slashes in final output s_norm = re.sub(r"/+", "/", s_norm) return s_norm def _err(code: str, message: str, *, path: Optional[str] = None, hint: Optional[str] = None, data: Optional[dict] = None) -> str: """Return a structured error JSON string. Fields: status='error', code, message, path?, hint?, data?, root='/' """ payload = { "status": "error", "code": code, "message": message, "root": "/", } if path is not None and path != "": payload["path"] = path if hint: payload["hint"] = hint if data: payload["data"] = data return json.dumps(payload, ensure_ascii=False) def _resolve_path(path: str) -> tuple[str, str]: """ Resolve a user-provided path to an absolute, normalized path constrained to ROOT_DIR (unless UNSAFE_ALLOW_ABS_PATHS=1). Returns (abs_path, error_message). error_message is empty when ok. """ try: user_input = (path or ".").strip() raw = os.path.expanduser(user_input) if os.path.isabs(raw): if not ALLOW_ABS: # Absolute paths are not allowed in safe mode return "", _err( "absolute_path_disabled", "Absolute paths are disabled in safe mode.", path=raw.replace("\\", "/"), hint="Use a path relative to / (e.g., notes/todo.txt)." ) abs_path = os.path.abspath(raw) else: abs_path = os.path.abspath(os.path.join(ROOT_DIR, raw)) # Constrain to ROOT when not unsafe mode if not ALLOW_ABS: try: common = os.path.commonpath([os.path.normpath(ROOT_DIR), os.path.normpath(abs_path)]) except Exception: # Fallback to simple check root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR)) abs_cmp = os.path.normcase(os.path.normpath(abs_path)) if not abs_cmp.startswith(root_cmp): return "", _err( "path_outside_root", "Path not allowed outside root.", path=user_input.replace("\\", "/"), hint="Use a path under / (the tool's root)." ) else: root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR)) common_cmp = os.path.normcase(os.path.normpath(common)) if common_cmp != root_cmp: return "", _err( "path_outside_root", "Path not allowed outside root.", path=user_input.replace("\\", "/"), hint="Use a path under / (the tool's root)." ) return abs_path, "" except Exception as exc: return "", _err( "resolve_path_failed", "Failed to resolve path.", path=(path or ""), data={"error": _safe_err(exc)} ) def _fmt_size(num_bytes: int) -> str: units = ["B", "KB", "MB", "GB", "TB"] size = float(num_bytes) for unit in units: if size < 1024.0: return f"{size:.1f} {unit}" size /= 1024.0 return f"{size:.1f} PB" def _display_path(abs_path: str) -> str: """Return a user-friendly path relative to ROOT_DIR using forward slashes. Example: ROOT_DIR -> '/', a file under it -> '/sub/dir/file.txt'.""" try: norm_root = os.path.normpath(ROOT_DIR) norm_abs = os.path.normpath(abs_path) common = os.path.commonpath([norm_root, norm_abs]) if os.path.normcase(common) == os.path.normcase(norm_root): rel = os.path.relpath(norm_abs, norm_root) if rel == ".": return "/" return "/" + rel.replace("\\", "/") except Exception: pass # Fallback to original absolute path return abs_path.replace("\\", "/") def _list_dir(abs_path: str, *, show_hidden: bool, recursive: bool, max_entries: int) -> str: lines: list[str] = [] total = 0 root_display = "/" listing_display = _display_path(abs_path) for root, dirs, files in os.walk(abs_path): # filter hidden if not show_hidden: dirs[:] = [d for d in dirs if not d.startswith('.')] files = [f for f in files if not f.startswith('.')] try: rel_root = os.path.relpath(root, ROOT_DIR) except Exception: rel_root = root rel_root_disp = "/" if rel_root == "." else "/" + rel_root.replace("\\", "/") lines.append(f"\n📂 {rel_root_disp}") # sort dirs.sort() files.sort() for d in dirs: p = os.path.join(root, d) try: mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds') except Exception: mtime = "?" lines.append(f" • [DIR] {d} (modified {mtime})") total += 1 if total >= max_entries: lines.append(f"\n… Truncated at {max_entries} entries.") return "\n".join(lines).strip() for f in files: p = os.path.join(root, f) try: size = _fmt_size(os.path.getsize(p)) mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds') except Exception: size, mtime = "?", "?" lines.append(f" • {f} ({size}, modified {mtime})") total += 1 if total >= max_entries: lines.append(f"\n… Truncated at {max_entries} entries.") return "\n".join(lines).strip() if not recursive: break header = f"Listing of {listing_display}\nRoot: {root_display}\nEntries: {total}" return (header + "\n" + "\n".join(lines)).strip() def _read_file(abs_path: str, *, offset: int, max_chars: int) -> str: if not os.path.exists(abs_path): return _err("file_not_found", f"File not found: {_display_path(abs_path)}", path=_display_path(abs_path)) if os.path.isdir(abs_path): return _err("is_directory", f"Path is a directory, not a file: {_display_path(abs_path)}", path=_display_path(abs_path), hint="Provide a file path.") try: with open(abs_path, 'r', encoding='utf-8', errors='replace') as f: data = f.read() except Exception as exc: return _err("read_failed", "Failed to read file.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) total = len(data) start = max(0, min(offset, total)) if max_chars > 0: end = min(total, start + max_chars) else: end = total chunk = data[start:end] next_cursor = end if end < total else None meta = { "offset": start, "returned": len(chunk), "total": total, "next_cursor": next_cursor, "path": _display_path(abs_path), } header = ( f"Reading {_display_path(abs_path)}\n" f"Offset {start}, returned {len(chunk)} of {total}." + (f"\nNext cursor: {next_cursor}" if next_cursor is not None else "") ) sep = "\n\n---\n\n" return header + sep + chunk def _ensure_parent(abs_path: str, create_dirs: bool) -> None: parent = os.path.dirname(abs_path) if parent and not os.path.exists(parent): if create_dirs: os.makedirs(parent, exist_ok=True) else: raise FileNotFoundError(f"Parent directory does not exist: {_display_path(parent)}") def _write_file(abs_path: str, content: str, *, append: bool, create_dirs: bool) -> str: try: _ensure_parent(abs_path, create_dirs) mode = 'a' if append else 'w' with open(abs_path, mode, encoding='utf-8') as f: f.write(content or "") return f"{'Appended to' if append else 'Wrote'} file: {_display_path(abs_path)} (chars={len(content or '')})" except Exception as exc: return _err("write_failed", "Failed to write file.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) def _mkdir(abs_path: str, exist_ok: bool) -> str: try: os.makedirs(abs_path, exist_ok=exist_ok) return f"Created directory: {_display_path(abs_path)}" except Exception as exc: return _err("mkdir_failed", "Failed to create directory.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) def _move_copy(action: str, src: str, dst: str, *, overwrite: bool) -> str: try: if not os.path.exists(src): return _err("source_not_found", f"Source not found: {_display_path(src)}", path=_display_path(src)) if os.path.isdir(dst): # allow moving into an existing directory dst_path = os.path.join(dst, os.path.basename(src)) else: dst_path = dst if os.path.exists(dst_path): if overwrite: if os.path.isdir(dst_path): shutil.rmtree(dst_path) else: os.remove(dst_path) else: return _err( "destination_exists", f"Destination already exists: {_display_path(dst_path)}", path=_display_path(dst_path), hint="Set overwrite=True to replace the destination." ) if action == 'move': shutil.move(src, dst_path) else: if os.path.isdir(src): shutil.copytree(src, dst_path) else: shutil.copy2(src, dst_path) return f"{action.capitalize()}d: {_display_path(src)} -> {_display_path(dst_path)}" except Exception as exc: return _err(f"{action}_failed", f"Failed to {action}.", path=_display_path(src), data={"error": _safe_err(exc), "destination": _display_path(dst)}) def _delete(abs_path: str, *, recursive: bool) -> str: try: if not os.path.exists(abs_path): return _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path)) if os.path.isdir(abs_path): if not recursive: # Refuse to delete a dir unless recursive=True return _err("requires_recursive", "Refusing to delete a directory without recursive=True", path=_display_path(abs_path), hint="Pass recursive=True to delete a directory.") shutil.rmtree(abs_path) else: os.remove(abs_path) return f"Deleted: {_display_path(abs_path)}" except Exception as exc: return _err("delete_failed", "Failed to delete path.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) def _info(abs_path: str) -> str: try: st = os.stat(abs_path) except Exception as exc: return _err("stat_failed", "Failed to stat path.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) info = { "path": _display_path(abs_path), "type": "directory" if stat.S_ISDIR(st.st_mode) else "file", "size": st.st_size, "modified": datetime.fromtimestamp(st.st_mtime).isoformat(sep=' ', timespec='seconds'), "created": datetime.fromtimestamp(st.st_ctime).isoformat(sep=' ', timespec='seconds'), "mode": oct(st.st_mode), "root": "/", } return json.dumps(info, indent=2) @autodoc(summary=TOOL_SUMMARY) def File_System( action: Annotated[str, "Operation to perform: 'list', 'read', 'write', 'append', 'mkdir', 'move', 'copy', 'delete', 'info'."], path: Annotated[str, "Target path, relative to root unless UNSAFE_ALLOW_ABS_PATHS=1."] = ".", content: Annotated[Optional[str], "Content for write/append actions (UTF-8)."] = None, dest_path: Annotated[Optional[str], "Destination for move/copy (relative to root unless unsafe absolute allowed)."] = None, recursive: Annotated[bool, "For list (recurse into subfolders) and delete (required for directories)."] = False, show_hidden: Annotated[bool, "Include hidden files (dotfiles)."] = False, max_entries: Annotated[int, "Max entries to list (for list)."] = 200, offset: Annotated[int, "Start offset for reading files (for read)."] = 0, max_chars: Annotated[int, "Max characters to return when reading (0 = full file)."] = 4000, create_dirs: Annotated[bool, "Create parent directories for write/append if missing."] = True, overwrite: Annotated[bool, "Allow overwrite for move/copy destinations."] = False, ) -> str: _log_call_start( "File_System", action=action, path=path, dest_path=dest_path, recursive=recursive, show_hidden=show_hidden, max_entries=max_entries, offset=offset, max_chars=max_chars, create_dirs=create_dirs, overwrite=overwrite, ) action = (action or "").strip().lower() if action not in {"list", "read", "write", "append", "mkdir", "move", "copy", "delete", "info", "help"}: result = _err( "invalid_action", "Invalid action.", hint="Choose from: list, read, write, append, mkdir, move, copy, delete, info, help." ) _log_call_end("File_System", _truncate_for_log(result)) return result abs_path, err = _resolve_path(path) if err: _log_call_end("File_System", _truncate_for_log(err)) return err try: if action == "help": result = HELP_TEXT elif action == "list": if not os.path.exists(abs_path): result = _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path)) else: result = _list_dir(abs_path, show_hidden=show_hidden, recursive=recursive, max_entries=max_entries) elif action == "read": result = _read_file(abs_path, offset=offset, max_chars=max_chars) elif action in {"write", "append"}: # Prevent attempts to write to root or any directory if _display_path(abs_path) == "/" or os.path.isdir(abs_path): result = _err( "invalid_write_path", "Invalid path for write/append.", path=_display_path(abs_path), hint="Provide a file path under / (e.g., /notes/todo.txt)." ) else: result = _write_file(abs_path, content or "", append=(action == "append"), create_dirs=create_dirs) elif action == "mkdir": result = _mkdir(abs_path, exist_ok=True) elif action in {"move", "copy"}: if not dest_path: result = _err("missing_dest_path", "dest_path is required for move/copy (ignored for other actions).") else: abs_dst, err2 = _resolve_path(dest_path) if err2: result = err2 else: result = _move_copy(action, abs_path, abs_dst, overwrite=overwrite) elif action == "delete": result = _delete(abs_path, recursive=recursive) else: # info result = _info(abs_path) except Exception as exc: result = _err("exception", "Unhandled error during operation.", data={"error": _safe_err(exc)}) _log_call_end("File_System", _truncate_for_log(result)) return result def build_interface() -> gr.Interface: return gr.Interface( fn=File_System, inputs=[ gr.Radio( label="Action", choices=["list", "read", "write", "append", "mkdir", "move", "copy", "delete", "info", "help"], value="help", ), gr.Textbox(label="Path", placeholder=". or src/file.txt", max_lines=1, value="."), gr.Textbox(label="Content (for write/append)", lines=6, placeholder="Text to write..."), gr.Textbox(label="Destination (for move/copy)", max_lines=1), gr.Checkbox(label="Recursive (list/delete)", value=False), gr.Checkbox(label="Show hidden (list)", value=False), gr.Slider(minimum=10, maximum=5000, step=10, value=200, label="Max entries (list)"), gr.Slider(minimum=0, maximum=1_000_000, step=100, value=0, label="Offset (read)"), gr.Slider(minimum=0, maximum=100_000, step=500, value=4000, label="Max chars (read, 0=all)"), gr.Checkbox(label="Create parent dirs (write)", value=True), gr.Checkbox(label="Overwrite destination (move/copy)", value=False), ], outputs=gr.Textbox(label="Result", lines=20), title="File System", description=( "