|
|
from __future__ import annotations |
|
|
|
|
|
import os |
|
|
import platform |
|
|
import shlex |
|
|
import subprocess |
|
|
from typing import Annotated |
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
from app import _log_call_end, _log_call_start, _truncate_for_log |
|
|
from ._docstrings import autodoc |
|
|
from .File_System import _resolve_path, ROOT_DIR, _display_path |
|
|
import shutil |
|
|
|
|
|
|
|
|
def _detect_shell(prefer_powershell: bool = True) -> tuple[list[str], str]: |
|
|
""" |
|
|
Pick an appropriate shell for the host OS. |
|
|
- Windows: use PowerShell by default, fall back to cmd.exe. |
|
|
- POSIX: use /bin/bash if available, else /bin/sh. |
|
|
Returns (shell_cmd_prefix, shell_name) where shell_cmd_prefix is the command list to launch the shell. |
|
|
""" |
|
|
system = platform.system().lower() |
|
|
if system == "windows": |
|
|
if prefer_powershell: |
|
|
pwsh = shutil.which("pwsh") |
|
|
candidates = [pwsh, shutil.which("powershell"), shutil.which("powershell.exe")] |
|
|
for cand in candidates: |
|
|
if cand: |
|
|
return [cand, "-NoLogo", "-NoProfile", "-Command"], "powershell" |
|
|
|
|
|
comspec = os.environ.get("ComSpec", r"C:\\Windows\\System32\\cmd.exe") |
|
|
return [comspec, "/C"], "cmd" |
|
|
|
|
|
bash = shutil.which("bash") |
|
|
if bash: |
|
|
return [bash, "-lc"], "bash" |
|
|
sh = os.environ.get("SHELL", "/bin/sh") |
|
|
return [sh, "-lc"], "sh" |
|
|
|
|
|
|
|
|
|
|
|
_DETECTED_SHELL_PREFIX, _DETECTED_SHELL_NAME = _detect_shell() |
|
|
|
|
|
|
|
|
|
|
|
TOOL_SUMMARY = ( |
|
|
"Execute a shell command within a safe working directory under the tool root ('/'). " |
|
|
"Paths must be relative to '/' (which maps to Nymbo-Tools/Filesystem by default); " |
|
|
"absolute paths are disabled unless UNSAFE_ALLOW_ABS_PATHS=1. " |
|
|
f"Detected shell: {_DETECTED_SHELL_NAME}." |
|
|
) |
|
|
|
|
|
|
|
|
def _run_command(command: str, cwd: str, timeout: int) -> tuple[str, str, int]: |
|
|
shell_prefix, shell_name = _detect_shell() |
|
|
full_cmd = shell_prefix + [command] |
|
|
try: |
|
|
proc = subprocess.run( |
|
|
full_cmd, |
|
|
cwd=cwd, |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE, |
|
|
text=True, |
|
|
encoding="utf-8", |
|
|
errors="replace", |
|
|
timeout=timeout if timeout and timeout > 0 else None, |
|
|
) |
|
|
return proc.stdout, proc.stderr, proc.returncode |
|
|
except subprocess.TimeoutExpired as exc: |
|
|
return exc.stdout or "", (exc.stderr or "") + "\n[timeout]", 124 |
|
|
except Exception as exc: |
|
|
return "", f"Execution failed: {exc}", 1 |
|
|
|
|
|
|
|
|
@autodoc(summary=TOOL_SUMMARY) |
|
|
def Shell_Exec( |
|
|
command: Annotated[str, "Shell command to execute. Accepts multi-part pipelines as a single string."], |
|
|
workdir: Annotated[str, "Working directory (relative to root unless UNSAFE_ALLOW_ABS_PATHS=1)."] = ".", |
|
|
timeout: Annotated[int, "Timeout in seconds (0 = no timeout, be careful on public hosting)."] = 60, |
|
|
) -> str: |
|
|
_log_call_start("Shell_Exec", command=command, workdir=workdir, timeout=timeout) |
|
|
if not command or not command.strip(): |
|
|
result = "No command provided." |
|
|
_log_call_end("Shell_Exec", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
abs_cwd, err = _resolve_path(workdir) |
|
|
if err: |
|
|
_log_call_end("Shell_Exec", _truncate_for_log(err)) |
|
|
return err |
|
|
if not os.path.exists(abs_cwd): |
|
|
result = f"Working directory not found: {abs_cwd}" |
|
|
_log_call_end("Shell_Exec", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
|
|
|
_, shell_name = _detect_shell() |
|
|
stdout, stderr, code = _run_command(command, cwd=abs_cwd, timeout=timeout) |
|
|
display_cwd = _display_path(abs_cwd) |
|
|
header = ( |
|
|
f"Command: {command}\n" |
|
|
f"CWD: {display_cwd}\n" |
|
|
f"Root: /\n" |
|
|
f"Shell: {shell_name}\n" |
|
|
f"Exit code: {code}\n" |
|
|
f"--- STDOUT ---\n" |
|
|
) |
|
|
output = header + (stdout or "<empty>") + "\n--- STDERR ---\n" + (stderr or "<empty>") |
|
|
_log_call_end("Shell_Exec", _truncate_for_log(f"exit={code} stdout={len(stdout)} stderr={len(stderr)}")) |
|
|
return output |
|
|
|
|
|
|
|
|
def build_interface() -> gr.Interface: |
|
|
return gr.Interface( |
|
|
fn=Shell_Exec, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Command", placeholder="echo hello || dir", lines=2), |
|
|
gr.Textbox(label="Workdir", value=".", max_lines=1), |
|
|
gr.Slider(minimum=0, maximum=600, step=5, value=60, label="Timeout (seconds)"), |
|
|
], |
|
|
outputs=gr.Textbox(label="Output", lines=20), |
|
|
title="Shell Exec", |
|
|
description=( |
|
|
"Run a shell command under the same safe root as File System. " |
|
|
"Paths are relative to '/' (maps to Nymbo-Tools/Filesystem by default); absolute paths are disabled " |
|
|
"unless UNSAFE_ALLOW_ABS_PATHS=1. Tip: set workdir to '.' to use the root. " |
|
|
f"Detected shell: {_DETECTED_SHELL_NAME}. " |
|
|
"<br><br><b>Examples</b>:" |
|
|
"<br>- Command: <code>dir</code> (Windows) or <code>ls -la</code> (Linux)" |
|
|
"<br>- Command: <code>echo hello | tee out.txt</code>" |
|
|
"<br>- Workdir: <code>notes</code>, Command: <code>ls</code> to list /notes" |
|
|
), |
|
|
api_description=TOOL_SUMMARY, |
|
|
flagging_mode="never", |
|
|
submit_btn="Run", |
|
|
) |
|
|
|
|
|
|
|
|
__all__ = ["Shell_Exec", "build_interface"] |
|
|
|