|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import shlex |
|
|
import subprocess |
|
|
|
|
|
from config import TIMEOUT_DEFAULT |
|
|
from utils.audit import audit_event |
|
|
from utils.guard import get_allowlist, needs_confirmation_for_exec |
|
|
|
|
|
|
|
|
def _parse_timeout(parts: list[str], default_timeout: int) -> tuple[list[str], int]: |
|
|
"""Extrai --timeout N se existir.""" |
|
|
timeout = default_timeout |
|
|
if "--timeout" in parts: |
|
|
try: |
|
|
i = parts.index("--timeout") |
|
|
timeout = int(parts[i + 1]) |
|
|
parts = parts[:i] + parts[i + 2 :] |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
return parts, timeout |
|
|
|
|
|
|
|
|
async def handle_exec(args: str, block: str) -> str: |
|
|
""" |
|
|
/exec <comando> [--timeout N] |
|
|
|
|
|
Regras: |
|
|
- Allowlist (ENV EXEC_ALLOWLIST). Se 1º token não estiver na allowlist, exige CONFIRMO: |
|
|
/exec CONFIRMO <comando> |
|
|
- --timeout N (segundos) sobrepõe TIMEOUT_DEFAULT. |
|
|
""" |
|
|
if not args.strip(): |
|
|
return "Uso: /exec <comando> [--timeout N]" |
|
|
|
|
|
|
|
|
parts = shlex.split(args, posix=False) |
|
|
parts, timeout = _parse_timeout(parts, TIMEOUT_DEFAULT) |
|
|
|
|
|
|
|
|
cmd = " ".join(parts).strip() |
|
|
if not cmd: |
|
|
return "Uso: /exec <comando> [--timeout N]" |
|
|
|
|
|
allow = get_allowlist() |
|
|
|
|
|
|
|
|
if needs_confirmation_for_exec(cmd, allow): |
|
|
if cmd.lower().startswith("confirmo "): |
|
|
cmd = cmd[len("confirmo ") :].lstrip() |
|
|
else: |
|
|
alw = ", ".join(allow) or "(vazia)" |
|
|
return ( |
|
|
"⚠️ Comando fora da allowlist.\n" |
|
|
f"Allowlist: {alw}\n" |
|
|
"Se deseja executar mesmo assim, use:\n" |
|
|
f"`/exec CONFIRMO {cmd}`" |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
proc = subprocess.run( |
|
|
cmd, |
|
|
shell=True, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=timeout, |
|
|
) |
|
|
audit_event("exec", {"cmd": cmd, "rc": proc.returncode}) |
|
|
|
|
|
out = (proc.stdout or "").strip() |
|
|
err = (proc.stderr or "").strip() |
|
|
|
|
|
lines = [f"✅ RC={proc.returncode}"] |
|
|
if out: |
|
|
if len(out) > 4000: |
|
|
out = out[:4000] + "\n...[truncado]..." |
|
|
lines.append("**STDOUT:**\n```\n" + out + "\n```") |
|
|
if err: |
|
|
if len(err) > 3000: |
|
|
err = err[:3000] + "\n...[truncado]..." |
|
|
lines.append("**STDERR:**\n```\n" + err + "\n```") |
|
|
if not out and not err: |
|
|
lines.append("_(sem saída)_") |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
audit_event("exec_timeout", {"cmd": cmd, "timeout": timeout}) |
|
|
return f"⏳ Timeout ({timeout}s) executando: `{cmd}`" |
|
|
except Exception as e: |
|
|
audit_event("exec_error", {"cmd": cmd, "error": str(e)}) |
|
|
return f"❌ Erro em /exec: {e}" |
|
|
|