File size: 3,047 Bytes
fea1bd1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# C:\Users\marco\agente_gemini\HASHIRU_6_1\tools\exec.py
from __future__ import annotations
import shlex
import subprocess
from config import TIMEOUT_DEFAULT
from utils.audit import audit_event
from utils.guard import get_allowlist, needs_confirmation_for_exec
def _parse_timeout(parts: list[str], default_timeout: int) -> tuple[list[str], int]:
"""Extrai --timeout N se existir."""
timeout = default_timeout
if "--timeout" in parts:
try:
i = parts.index("--timeout")
timeout = int(parts[i + 1])
parts = parts[:i] + parts[i + 2 :]
except Exception:
# parâmetro malformado -> ignora e mantém default
pass
return parts, timeout
async def handle_exec(args: str, block: str) -> str:
"""
/exec <comando> [--timeout N]
Regras:
- Allowlist (ENV EXEC_ALLOWLIST). Se 1º token não estiver na allowlist, exige CONFIRMO:
/exec CONFIRMO <comando>
- --timeout N (segundos) sobrepõe TIMEOUT_DEFAULT.
"""
if not args.strip():
return "Uso: /exec <comando> [--timeout N]"
# Parse básico (Windows-friendly)
parts = shlex.split(args, posix=False)
parts, timeout = _parse_timeout(parts, TIMEOUT_DEFAULT)
# Monta comando final
cmd = " ".join(parts).strip()
if not cmd:
return "Uso: /exec <comando> [--timeout N]"
allow = get_allowlist()
# Checagem de confirmação para fora da allowlist
if needs_confirmation_for_exec(cmd, allow):
if cmd.lower().startswith("confirmo "):
cmd = cmd[len("confirmo ") :].lstrip()
else:
alw = ", ".join(allow) or "(vazia)"
return (
"⚠️ Comando fora da allowlist.\n"
f"Allowlist: {alw}\n"
"Se deseja executar mesmo assim, use:\n"
f"`/exec CONFIRMO {cmd}`"
)
# Execução do comando
try:
proc = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=timeout,
)
audit_event("exec", {"cmd": cmd, "rc": proc.returncode})
out = (proc.stdout or "").strip()
err = (proc.stderr or "").strip()
lines = [f"✅ RC={proc.returncode}"]
if out:
if len(out) > 4000:
out = out[:4000] + "\n...[truncado]..."
lines.append("**STDOUT:**\n```\n" + out + "\n```")
if err:
if len(err) > 3000:
err = err[:3000] + "\n...[truncado]..."
lines.append("**STDERR:**\n```\n" + err + "\n```")
if not out and not err:
lines.append("_(sem saída)_")
return "\n".join(lines)
except subprocess.TimeoutExpired:
audit_event("exec_timeout", {"cmd": cmd, "timeout": timeout})
return f"⏳ Timeout ({timeout}s) executando: `{cmd}`"
except Exception as e:
audit_event("exec_error", {"cmd": cmd, "error": str(e)})
return f"❌ Erro em /exec: {e}"
|