|
|
""" |
|
|
Utilitários de validação para o sistema AgentGraph |
|
|
""" |
|
|
import re |
|
|
import logging |
|
|
from typing import Dict, Any, Tuple, Optional |
|
|
|
|
|
|
|
|
def validate_postgresql_config(config: Dict[str, Any]) -> Tuple[bool, Optional[str]]: |
|
|
""" |
|
|
Valida configuração postgresql completa |
|
|
|
|
|
Args: |
|
|
config: Dicionário com configuração postgresql |
|
|
|
|
|
Returns: |
|
|
Tupla (válido, mensagem_erro) |
|
|
""" |
|
|
try: |
|
|
|
|
|
required_fields = ["host", "port", "database", "username", "password"] |
|
|
|
|
|
for field in required_fields: |
|
|
if field not in config or not config[field]: |
|
|
return False, f"Campo obrigatório ausente ou vazio: {field}" |
|
|
|
|
|
|
|
|
host = str(config["host"]).strip() |
|
|
if not host: |
|
|
return False, "Host não pode estar vazio" |
|
|
|
|
|
|
|
|
if not _is_valid_host(host): |
|
|
return False, "Formato de host inválido" |
|
|
|
|
|
|
|
|
try: |
|
|
port = int(config["port"]) |
|
|
if port < 1 or port > 65535: |
|
|
return False, "Porta deve estar entre 1 e 65535" |
|
|
except (ValueError, TypeError): |
|
|
return False, "Porta deve ser um número válido" |
|
|
|
|
|
|
|
|
database = str(config["database"]).strip() |
|
|
if not database: |
|
|
return False, "Nome do banco não pode estar vazio" |
|
|
|
|
|
if not _is_valid_database_name(database): |
|
|
return False, "Nome do banco contém caracteres inválidos" |
|
|
|
|
|
|
|
|
username = str(config["username"]).strip() |
|
|
if not username: |
|
|
return False, "Nome de usuário não pode estar vazio" |
|
|
|
|
|
if not _is_valid_username(username): |
|
|
return False, "Nome de usuário contém caracteres inválidos" |
|
|
|
|
|
|
|
|
password = str(config["password"]) |
|
|
if not password: |
|
|
return False, "Senha não pode estar vazia" |
|
|
|
|
|
return True, None |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Erro na validação: {e}" |
|
|
|
|
|
|
|
|
def _is_valid_host(host: str) -> bool: |
|
|
""" |
|
|
Valida formato de host (IP ou hostname) |
|
|
|
|
|
Args: |
|
|
host: Host a validar |
|
|
|
|
|
Returns: |
|
|
True se válido |
|
|
""" |
|
|
|
|
|
ipv4_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' |
|
|
|
|
|
|
|
|
hostname_pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$' |
|
|
|
|
|
|
|
|
if host.lower() == 'localhost': |
|
|
return True |
|
|
|
|
|
|
|
|
if re.match(ipv4_pattern, host): |
|
|
return True |
|
|
|
|
|
|
|
|
if re.match(hostname_pattern, host): |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
def _is_valid_database_name(database: str) -> bool: |
|
|
""" |
|
|
Valida nome de banco postgresql |
|
|
|
|
|
Args: |
|
|
database: Nome do banco |
|
|
|
|
|
Returns: |
|
|
True se válido |
|
|
""" |
|
|
|
|
|
|
|
|
pattern = r'^[a-zA-Z_][a-zA-Z0-9_-]*$' |
|
|
|
|
|
|
|
|
if len(database) > 63: |
|
|
return False |
|
|
|
|
|
return bool(re.match(pattern, database)) |
|
|
|
|
|
|
|
|
def _is_valid_username(username: str) -> bool: |
|
|
""" |
|
|
Valida nome de usuário postgresql |
|
|
|
|
|
Args: |
|
|
username: Nome de usuário |
|
|
|
|
|
Returns: |
|
|
True se válido |
|
|
""" |
|
|
|
|
|
pattern = r'^[a-zA-Z_][a-zA-Z0-9_-]*$' |
|
|
|
|
|
|
|
|
if len(username) > 63: |
|
|
return False |
|
|
|
|
|
return bool(re.match(pattern, username)) |
|
|
|
|
|
|
|
|
def validate_csv_file_path(file_path: str) -> Tuple[bool, Optional[str]]: |
|
|
""" |
|
|
Valida caminho de arquivo csv |
|
|
|
|
|
Args: |
|
|
file_path: Caminho do arquivo |
|
|
|
|
|
Returns: |
|
|
Tupla (válido, mensagem_erro) |
|
|
""" |
|
|
try: |
|
|
import os |
|
|
|
|
|
if not file_path: |
|
|
return False, "Caminho do arquivo não pode estar vazio" |
|
|
|
|
|
if not os.path.exists(file_path): |
|
|
return False, f"Arquivo não encontrado: {file_path}" |
|
|
|
|
|
if not file_path.lower().endswith('.csv'): |
|
|
return False, "Arquivo deve ter extensão .csv" |
|
|
|
|
|
|
|
|
if not os.path.isfile(file_path): |
|
|
return False, "Caminho deve apontar para um arquivo" |
|
|
|
|
|
|
|
|
file_size = os.path.getsize(file_path) |
|
|
if file_size == 0: |
|
|
return False, "Arquivo csv está vazio" |
|
|
|
|
|
|
|
|
if file_size > 5 * 1024 * 1024 * 1024: |
|
|
return False, "Arquivo muito grande (máximo 5GB)" |
|
|
|
|
|
return True, None |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Erro na validação do arquivo: {e}" |
|
|
|
|
|
|
|
|
def validate_connection_state(state: Dict[str, Any]) -> Tuple[bool, Optional[str]]: |
|
|
""" |
|
|
Valida estado de conexão completo |
|
|
|
|
|
Args: |
|
|
state: Estado da conexão |
|
|
|
|
|
Returns: |
|
|
Tupla (válido, mensagem_erro) |
|
|
""" |
|
|
try: |
|
|
connection_type = state.get("connection_type", "csv") |
|
|
|
|
|
if connection_type.lower() not in ["csv", "postgresql"]: |
|
|
return False, f"Tipo de conexão inválido: {connection_type}" |
|
|
|
|
|
if connection_type.lower() == "postgresql": |
|
|
postgresql_config = state.get("postgresql_config") |
|
|
if not postgresql_config: |
|
|
return False, "Configuração postgresql ausente" |
|
|
|
|
|
return validate_postgresql_config(postgresql_config) |
|
|
|
|
|
elif connection_type.lower() == "csv": |
|
|
file_path = state.get("file_path") |
|
|
if file_path: |
|
|
return validate_csv_file_path(file_path) |
|
|
else: |
|
|
|
|
|
import os |
|
|
from utils.config import SQL_DB_PATH |
|
|
|
|
|
if not os.path.exists(SQL_DB_PATH): |
|
|
return False, "Nenhum arquivo csv fornecido e nenhum banco existente" |
|
|
|
|
|
return True, None |
|
|
|
|
|
return True, None |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Erro na validação do estado: {e}" |
|
|
|
|
|
|
|
|
def sanitize_postgresql_config(config: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Sanitiza configuração postgresql removendo espaços e normalizando |
|
|
|
|
|
Args: |
|
|
config: Configuração original |
|
|
|
|
|
Returns: |
|
|
Configuração sanitizada |
|
|
""" |
|
|
try: |
|
|
sanitized = {} |
|
|
|
|
|
|
|
|
sanitized["host"] = str(config.get("host", "")).strip() |
|
|
|
|
|
|
|
|
try: |
|
|
sanitized["port"] = int(config.get("port", 5432)) |
|
|
except (ValueError, TypeError): |
|
|
sanitized["port"] = 5432 |
|
|
|
|
|
|
|
|
sanitized["database"] = str(config.get("database", "")).strip() |
|
|
|
|
|
|
|
|
sanitized["username"] = str(config.get("username", "")).strip() |
|
|
|
|
|
|
|
|
sanitized["password"] = str(config.get("password", "")) |
|
|
|
|
|
return sanitized |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Erro ao sanitizar configuração postgresql: {e}") |
|
|
return config |
|
|
|
|
|
|
|
|
def get_connection_error_message(error: Exception) -> str: |
|
|
""" |
|
|
Converte erro de conexão em mensagem amigável |
|
|
|
|
|
Args: |
|
|
error: Exceção capturada |
|
|
|
|
|
Returns: |
|
|
Mensagem de erro amigável |
|
|
""" |
|
|
error_str = str(error).lower() |
|
|
|
|
|
if "password authentication failed" in error_str: |
|
|
return "❌ Falha na autenticação: Usuário ou senha incorretos" |
|
|
|
|
|
elif "could not connect to server" in error_str: |
|
|
return "❌ Não foi possível conectar ao servidor: Verifique host e porta" |
|
|
|
|
|
elif "database" in error_str and "does not exist" in error_str: |
|
|
return "❌ Banco de dados não existe: Verifique o nome do banco" |
|
|
|
|
|
elif "connection refused" in error_str: |
|
|
return "❌ Conexão recusada: Servidor postgresql pode estar desligado" |
|
|
|
|
|
elif "timeout" in error_str: |
|
|
return "❌ Timeout na conexão: Servidor demorou muito para responder" |
|
|
|
|
|
elif "permission denied" in error_str: |
|
|
return "❌ Permissão negada: Usuário não tem acesso ao banco" |
|
|
|
|
|
elif "too many connections" in error_str: |
|
|
return "❌ Muitas conexões: Servidor postgresql está sobrecarregado" |
|
|
|
|
|
else: |
|
|
return f"❌ Erro de conexão: {str(error)}" |
|
|
|