Tools / Modules /File_System.py
Nymbo's picture
Create File_System.py
78d32d0 verified
raw
history blame
21.1 kB
from __future__ import annotations
import json
import os
import shutil
import stat
from datetime import datetime
from typing import Annotated, Optional
import re
import gradio as gr
from app import _log_call_end, _log_call_start, _truncate_for_log
from ._docstrings import autodoc
TOOL_SUMMARY = (
"Browse and manage files within a safe root. "
"Actions: list, read, write, append, mkdir, move, copy, delete, info, help. "
"Fill other fields as needed. "
"Use paths like `.` because all paths are relative to the root (`/`). "
"Use 'help' to see action-specific required fields and examples."
)
HELP_TEXT = (
"File System — actions and usage\n\n"
"Root: paths resolve under Nymbo-Tools/Filesystem by default (or NYMBO_TOOLS_ROOT if set). "
"Absolute paths are disabled unless UNSAFE_ALLOW_ABS_PATHS=1.\n\n"
"Actions and fields:\n"
"- list: path='.' (default), recursive=false, show_hidden=false, max_entries=200\n"
"- read: path, offset=0, max_chars=4000 (shows next_cursor when truncated)\n"
"- write: path, content (UTF-8), create_dirs=true\n"
"- append: path, content (UTF-8), create_dirs=true\n"
"- mkdir: path (directory), exist_ok=true\n"
"- move: path (src), dest_path (dst), overwrite=false\n"
"- copy: path (src), dest_path (dst), overwrite=false\n"
"- delete: path, recursive=true (required for directories)\n"
"- info: path\n"
"- help: show this guide\n\n"
"Errors are returned as JSON with fields: {status:'error', code, message, path?, hint?, data?}.\n\n"
"Examples:\n"
"- list current: action=list, path='.'\n"
"- make folder: action=mkdir, path='notes'\n"
"- write file: action=write, path='notes/todo.txt', content='hello'\n"
"- read file: action=read, path='notes/todo.txt', max_chars=200\n"
"- move file: action=move, path='notes/todo.txt', dest_path='notes/todo-old.txt', overwrite=true\n"
"- delete dir: action=delete, path='notes', recursive=true\n"
)
def _default_root() -> str:
# Prefer explicit root via env var
root = os.getenv("NYMBO_TOOLS_ROOT")
if root and root.strip():
return os.path.abspath(os.path.expanduser(root.strip()))
# Default to "Nymbo-Tools/Filesystem" alongside this module package
try:
here = os.path.abspath(__file__)
tools_dir = os.path.dirname(os.path.dirname(here)) # .../Nymbo-Tools
default_root = os.path.abspath(os.path.join(tools_dir, "Filesystem"))
return default_root
except Exception:
# Final fallback
return os.path.abspath(os.getcwd())
ROOT_DIR = _default_root()
# Ensure the default root directory exists to make listing/writing more convenient
try:
os.makedirs(ROOT_DIR, exist_ok=True)
except Exception:
pass
ALLOW_ABS = bool(int(os.getenv("UNSAFE_ALLOW_ABS_PATHS", "0")))
def _safe_err(exc: Exception | str) -> str:
"""Return an error string with any absolute root replaced by '/' and slashes normalized.
This handles variants like backslashes and duplicate slashes in OS messages.
"""
s = str(exc)
# Normalize to forward slashes for comparison
s_norm = s.replace("\\", "/")
root_fwd = ROOT_DIR.replace("\\", "/")
# Collapse duplicate slashes in root representation
root_variants = {ROOT_DIR, root_fwd, re.sub(r"/+", "/", root_fwd)}
for variant in root_variants:
if variant:
s_norm = s_norm.replace(variant, "/")
# Collapse duplicate slashes in final output
s_norm = re.sub(r"/+", "/", s_norm)
return s_norm
def _err(code: str, message: str, *, path: Optional[str] = None, hint: Optional[str] = None, data: Optional[dict] = None) -> str:
"""Return a structured error JSON string.
Fields: status='error', code, message, path?, hint?, data?, root='/'
"""
payload = {
"status": "error",
"code": code,
"message": message,
"root": "/",
}
if path is not None and path != "":
payload["path"] = path
if hint:
payload["hint"] = hint
if data:
payload["data"] = data
return json.dumps(payload, ensure_ascii=False)
def _resolve_path(path: str) -> tuple[str, str]:
"""
Resolve a user-provided path to an absolute, normalized path constrained to ROOT_DIR
(unless UNSAFE_ALLOW_ABS_PATHS=1). Returns (abs_path, error_message). error_message is empty when ok.
"""
try:
user_input = (path or ".").strip()
raw = os.path.expanduser(user_input)
if os.path.isabs(raw):
if not ALLOW_ABS:
# Absolute paths are not allowed in safe mode
return "", _err(
"absolute_path_disabled",
"Absolute paths are disabled in safe mode.",
path=raw.replace("\\", "/"),
hint="Use a path relative to / (e.g., notes/todo.txt)."
)
abs_path = os.path.abspath(raw)
else:
abs_path = os.path.abspath(os.path.join(ROOT_DIR, raw))
# Constrain to ROOT when not unsafe mode
if not ALLOW_ABS:
try:
common = os.path.commonpath([os.path.normpath(ROOT_DIR), os.path.normpath(abs_path)])
except Exception:
# Fallback to simple check
root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR))
abs_cmp = os.path.normcase(os.path.normpath(abs_path))
if not abs_cmp.startswith(root_cmp):
return "", _err(
"path_outside_root",
"Path not allowed outside root.",
path=user_input.replace("\\", "/"),
hint="Use a path under / (the tool's root)."
)
else:
root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR))
common_cmp = os.path.normcase(os.path.normpath(common))
if common_cmp != root_cmp:
return "", _err(
"path_outside_root",
"Path not allowed outside root.",
path=user_input.replace("\\", "/"),
hint="Use a path under / (the tool's root)."
)
return abs_path, ""
except Exception as exc:
return "", _err(
"resolve_path_failed",
"Failed to resolve path.",
path=(path or ""),
data={"error": _safe_err(exc)}
)
def _fmt_size(num_bytes: int) -> str:
units = ["B", "KB", "MB", "GB", "TB"]
size = float(num_bytes)
for unit in units:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} PB"
def _display_path(abs_path: str) -> str:
"""Return a user-friendly path relative to ROOT_DIR using forward slashes.
Example: ROOT_DIR -> '/', a file under it -> '/sub/dir/file.txt'."""
try:
norm_root = os.path.normpath(ROOT_DIR)
norm_abs = os.path.normpath(abs_path)
common = os.path.commonpath([norm_root, norm_abs])
if os.path.normcase(common) == os.path.normcase(norm_root):
rel = os.path.relpath(norm_abs, norm_root)
if rel == ".":
return "/"
return "/" + rel.replace("\\", "/")
except Exception:
pass
# Fallback to original absolute path
return abs_path.replace("\\", "/")
def _list_dir(abs_path: str, *, show_hidden: bool, recursive: bool, max_entries: int) -> str:
lines: list[str] = []
total = 0
root_display = "/"
listing_display = _display_path(abs_path)
for root, dirs, files in os.walk(abs_path):
# filter hidden
if not show_hidden:
dirs[:] = [d for d in dirs if not d.startswith('.')]
files = [f for f in files if not f.startswith('.')]
try:
rel_root = os.path.relpath(root, ROOT_DIR)
except Exception:
rel_root = root
rel_root_disp = "/" if rel_root == "." else "/" + rel_root.replace("\\", "/")
lines.append(f"\n📂 {rel_root_disp}")
# sort
dirs.sort()
files.sort()
for d in dirs:
p = os.path.join(root, d)
try:
mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds')
except Exception:
mtime = "?"
lines.append(f" • [DIR] {d} (modified {mtime})")
total += 1
if total >= max_entries:
lines.append(f"\n… Truncated at {max_entries} entries.")
return "\n".join(lines).strip()
for f in files:
p = os.path.join(root, f)
try:
size = _fmt_size(os.path.getsize(p))
mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds')
except Exception:
size, mtime = "?", "?"
lines.append(f" • {f} ({size}, modified {mtime})")
total += 1
if total >= max_entries:
lines.append(f"\n… Truncated at {max_entries} entries.")
return "\n".join(lines).strip()
if not recursive:
break
header = f"Listing of {listing_display}\nRoot: {root_display}\nEntries: {total}"
return (header + "\n" + "\n".join(lines)).strip()
def _read_file(abs_path: str, *, offset: int, max_chars: int) -> str:
if not os.path.exists(abs_path):
return _err("file_not_found", f"File not found: {_display_path(abs_path)}", path=_display_path(abs_path))
if os.path.isdir(abs_path):
return _err("is_directory", f"Path is a directory, not a file: {_display_path(abs_path)}", path=_display_path(abs_path), hint="Provide a file path.")
try:
with open(abs_path, 'r', encoding='utf-8', errors='replace') as f:
data = f.read()
except Exception as exc:
return _err("read_failed", "Failed to read file.", path=_display_path(abs_path), data={"error": _safe_err(exc)})
total = len(data)
start = max(0, min(offset, total))
if max_chars > 0:
end = min(total, start + max_chars)
else:
end = total
chunk = data[start:end]
next_cursor = end if end < total else None
meta = {
"offset": start,
"returned": len(chunk),
"total": total,
"next_cursor": next_cursor,
"path": _display_path(abs_path),
}
header = (
f"Reading {_display_path(abs_path)}\n"
f"Offset {start}, returned {len(chunk)} of {total}."
+ (f"\nNext cursor: {next_cursor}" if next_cursor is not None else "")
)
sep = "\n\n---\n\n"
return header + sep + chunk
def _ensure_parent(abs_path: str, create_dirs: bool) -> None:
parent = os.path.dirname(abs_path)
if parent and not os.path.exists(parent):
if create_dirs:
os.makedirs(parent, exist_ok=True)
else:
raise FileNotFoundError(f"Parent directory does not exist: {_display_path(parent)}")
def _write_file(abs_path: str, content: str, *, append: bool, create_dirs: bool) -> str:
try:
_ensure_parent(abs_path, create_dirs)
mode = 'a' if append else 'w'
with open(abs_path, mode, encoding='utf-8') as f:
f.write(content or "")
return f"{'Appended to' if append else 'Wrote'} file: {_display_path(abs_path)} (chars={len(content or '')})"
except Exception as exc:
return _err("write_failed", "Failed to write file.", path=_display_path(abs_path), data={"error": _safe_err(exc)})
def _mkdir(abs_path: str, exist_ok: bool) -> str:
try:
os.makedirs(abs_path, exist_ok=exist_ok)
return f"Created directory: {_display_path(abs_path)}"
except Exception as exc:
return _err("mkdir_failed", "Failed to create directory.", path=_display_path(abs_path), data={"error": _safe_err(exc)})
def _move_copy(action: str, src: str, dst: str, *, overwrite: bool) -> str:
try:
if not os.path.exists(src):
return _err("source_not_found", f"Source not found: {_display_path(src)}", path=_display_path(src))
if os.path.isdir(dst):
# allow moving into an existing directory
dst_path = os.path.join(dst, os.path.basename(src))
else:
dst_path = dst
if os.path.exists(dst_path):
if overwrite:
if os.path.isdir(dst_path):
shutil.rmtree(dst_path)
else:
os.remove(dst_path)
else:
return _err(
"destination_exists",
f"Destination already exists: {_display_path(dst_path)}",
path=_display_path(dst_path),
hint="Set overwrite=True to replace the destination."
)
if action == 'move':
shutil.move(src, dst_path)
else:
if os.path.isdir(src):
shutil.copytree(src, dst_path)
else:
shutil.copy2(src, dst_path)
return f"{action.capitalize()}d: {_display_path(src)} -> {_display_path(dst_path)}"
except Exception as exc:
return _err(f"{action}_failed", f"Failed to {action}.", path=_display_path(src), data={"error": _safe_err(exc), "destination": _display_path(dst)})
def _delete(abs_path: str, *, recursive: bool) -> str:
try:
if not os.path.exists(abs_path):
return _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path))
if os.path.isdir(abs_path):
if not recursive:
# Refuse to delete a dir unless recursive=True
return _err("requires_recursive", "Refusing to delete a directory without recursive=True", path=_display_path(abs_path), hint="Pass recursive=True to delete a directory.")
shutil.rmtree(abs_path)
else:
os.remove(abs_path)
return f"Deleted: {_display_path(abs_path)}"
except Exception as exc:
return _err("delete_failed", "Failed to delete path.", path=_display_path(abs_path), data={"error": _safe_err(exc)})
def _info(abs_path: str) -> str:
try:
st = os.stat(abs_path)
except Exception as exc:
return _err("stat_failed", "Failed to stat path.", path=_display_path(abs_path), data={"error": _safe_err(exc)})
info = {
"path": _display_path(abs_path),
"type": "directory" if stat.S_ISDIR(st.st_mode) else "file",
"size": st.st_size,
"modified": datetime.fromtimestamp(st.st_mtime).isoformat(sep=' ', timespec='seconds'),
"created": datetime.fromtimestamp(st.st_ctime).isoformat(sep=' ', timespec='seconds'),
"mode": oct(st.st_mode),
"root": "/",
}
return json.dumps(info, indent=2)
@autodoc(summary=TOOL_SUMMARY)
def File_System(
action: Annotated[str, "Operation to perform: 'list', 'read', 'write', 'append', 'mkdir', 'move', 'copy', 'delete', 'info'."],
path: Annotated[str, "Target path, relative to root unless UNSAFE_ALLOW_ABS_PATHS=1."] = ".",
content: Annotated[Optional[str], "Content for write/append actions (UTF-8)."] = None,
dest_path: Annotated[Optional[str], "Destination for move/copy (relative to root unless unsafe absolute allowed)."] = None,
recursive: Annotated[bool, "For list (recurse into subfolders) and delete (required for directories)."] = False,
show_hidden: Annotated[bool, "Include hidden files (dotfiles)."] = False,
max_entries: Annotated[int, "Max entries to list (for list)."] = 200,
offset: Annotated[int, "Start offset for reading files (for read)."] = 0,
max_chars: Annotated[int, "Max characters to return when reading (0 = full file)."] = 4000,
create_dirs: Annotated[bool, "Create parent directories for write/append if missing."] = True,
overwrite: Annotated[bool, "Allow overwrite for move/copy destinations."] = False,
) -> str:
_log_call_start(
"File_System",
action=action,
path=path,
dest_path=dest_path,
recursive=recursive,
show_hidden=show_hidden,
max_entries=max_entries,
offset=offset,
max_chars=max_chars,
create_dirs=create_dirs,
overwrite=overwrite,
)
action = (action or "").strip().lower()
if action not in {"list", "read", "write", "append", "mkdir", "move", "copy", "delete", "info", "help"}:
result = _err(
"invalid_action",
"Invalid action.",
hint="Choose from: list, read, write, append, mkdir, move, copy, delete, info, help."
)
_log_call_end("File_System", _truncate_for_log(result))
return result
abs_path, err = _resolve_path(path)
if err:
_log_call_end("File_System", _truncate_for_log(err))
return err
try:
if action == "help":
result = HELP_TEXT
elif action == "list":
if not os.path.exists(abs_path):
result = _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path))
else:
result = _list_dir(abs_path, show_hidden=show_hidden, recursive=recursive, max_entries=max_entries)
elif action == "read":
result = _read_file(abs_path, offset=offset, max_chars=max_chars)
elif action in {"write", "append"}:
# Prevent attempts to write to root or any directory
if _display_path(abs_path) == "/" or os.path.isdir(abs_path):
result = _err(
"invalid_write_path",
"Invalid path for write/append.",
path=_display_path(abs_path),
hint="Provide a file path under / (e.g., /notes/todo.txt)."
)
else:
result = _write_file(abs_path, content or "", append=(action == "append"), create_dirs=create_dirs)
elif action == "mkdir":
result = _mkdir(abs_path, exist_ok=True)
elif action in {"move", "copy"}:
if not dest_path:
result = _err("missing_dest_path", "dest_path is required for move/copy (ignored for other actions).")
else:
abs_dst, err2 = _resolve_path(dest_path)
if err2:
result = err2
else:
result = _move_copy(action, abs_path, abs_dst, overwrite=overwrite)
elif action == "delete":
result = _delete(abs_path, recursive=recursive)
else: # info
result = _info(abs_path)
except Exception as exc:
result = _err("exception", "Unhandled error during operation.", data={"error": _safe_err(exc)})
_log_call_end("File_System", _truncate_for_log(result))
return result
def build_interface() -> gr.Interface:
return gr.Interface(
fn=File_System,
inputs=[
gr.Radio(
label="Action",
choices=["list", "read", "write", "append", "mkdir", "move", "copy", "delete", "info", "help"],
value="help",
),
gr.Textbox(label="Path", placeholder=". or src/file.txt", max_lines=1, value="."),
gr.Textbox(label="Content (for write/append)", lines=6, placeholder="Text to write..."),
gr.Textbox(label="Destination (for move/copy)", max_lines=1),
gr.Checkbox(label="Recursive (list/delete)", value=False),
gr.Checkbox(label="Show hidden (list)", value=False),
gr.Slider(minimum=10, maximum=5000, step=10, value=200, label="Max entries (list)"),
gr.Slider(minimum=0, maximum=1_000_000, step=100, value=0, label="Offset (read)"),
gr.Slider(minimum=0, maximum=100_000, step=500, value=4000, label="Max chars (read, 0=all)"),
gr.Checkbox(label="Create parent dirs (write)", value=True),
gr.Checkbox(label="Overwrite destination (move/copy)", value=False),
],
outputs=gr.Textbox(label="Result", lines=20),
title="File System",
description=(
"<div id=\"fs-desc\" style=\"text-align:center; overflow:hidden;\">Browse and interact with a filesystem. "
"Actions are required, fill other fields as needed."
"</div>"
),
api_description=TOOL_SUMMARY,
flagging_mode="never",
submit_btn="Run",
css=(
"""
/* Hide scrollbars/arrows that can appear on the description block in some browsers */
article.prose, .prose, .gr-prose {
overflow: visible !important;
max-height: none !important;
-ms-overflow-style: none !important; /* IE/Edge */
scrollbar-width: none !important; /* Firefox */
}
article.prose::-webkit-scrollbar,
.prose::-webkit-scrollbar,
.gr-prose::-webkit-scrollbar {
display: none !important; /* Chrome/Safari */
}
"""
),
)
__all__ = ["File_System", "build_interface"]