|
|
""" |
|
|
Nó para processamento de contexto inicial usando Processing Agent |
|
|
""" |
|
|
import logging |
|
|
import pandas as pd |
|
|
from typing import Dict, Any |
|
|
|
|
|
from agents.processing_agent import ProcessingAgentManager |
|
|
from agents.tools import prepare_processing_context |
|
|
from utils.object_manager import get_object_manager |
|
|
|
|
|
|
|
|
async def process_initial_context_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para processar contexto inicial com Processing Agent (opcional) |
|
|
|
|
|
Args: |
|
|
state: Estado atual do agente |
|
|
|
|
|
Returns: |
|
|
Estado atualizado com contexto processado |
|
|
""" |
|
|
|
|
|
processing_enabled = state.get("processing_enabled", False) |
|
|
logging.info(f"[PROCESSING NODE] Processing enabled: {processing_enabled}") |
|
|
|
|
|
if not processing_enabled: |
|
|
logging.info("[PROCESSING NODE] Processing Agent desabilitado - pulando nó") |
|
|
return state |
|
|
|
|
|
logging.info("[PROCESSING NODE] ===== INICIANDO NÓ DE PROCESSAMENTO =====") |
|
|
|
|
|
try: |
|
|
user_input = state.get("user_input", "") |
|
|
processing_model = state.get("processing_model", "gpt-4o-mini") |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] Entrada do usuário: {user_input[:100]}...") |
|
|
logging.info(f"[PROCESSING NODE] Modelo selecionado: {processing_model}") |
|
|
|
|
|
if not user_input: |
|
|
logging.warning("[PROCESSING NODE] Entrada do usuário não disponível") |
|
|
return state |
|
|
|
|
|
|
|
|
obj_manager = get_object_manager() |
|
|
|
|
|
try: |
|
|
|
|
|
engine_id = state.get("engine_id") |
|
|
db_id = state.get("db_id") |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] ===== DEBUG ESTADO =====") |
|
|
logging.info(f"[PROCESSING NODE] engine_id do estado: {engine_id}") |
|
|
logging.info(f"[PROCESSING NODE] db_id do estado: {db_id}") |
|
|
logging.info(f"[PROCESSING NODE] connection_type do estado: {state.get('connection_type')}") |
|
|
logging.info(f"[PROCESSING NODE] Chaves disponíveis no estado: {list(state.keys())}") |
|
|
logging.info(f"[PROCESSING NODE] ===== FIM DEBUG =====") |
|
|
|
|
|
if not engine_id or not db_id: |
|
|
logging.error("[PROCESSING NODE] IDs de engine ou database não encontrados no estado") |
|
|
logging.error(f"[PROCESSING NODE] engine_id: {engine_id}, db_id: {db_id}") |
|
|
|
|
|
|
|
|
logging.info("[PROCESSING NODE] Tentando fallback para IDs disponíveis...") |
|
|
engines = obj_manager._engines |
|
|
databases = obj_manager._databases |
|
|
|
|
|
if engines and databases: |
|
|
engine_id = list(engines.keys())[-1] |
|
|
db_id = list(databases.keys())[-1] |
|
|
logging.info(f"[PROCESSING NODE] Fallback: usando engine_id={engine_id}, db_id={db_id}") |
|
|
else: |
|
|
logging.error("[PROCESSING NODE] Nenhum engine ou database disponível no ObjectManager") |
|
|
return state |
|
|
|
|
|
|
|
|
engine = obj_manager.get_engine(engine_id) |
|
|
database = obj_manager.get_database(db_id) |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] Engine obtido: {engine is not None}") |
|
|
logging.info(f"[PROCESSING NODE] Database obtido: {database is not None}") |
|
|
|
|
|
if not engine or not database: |
|
|
logging.error("[PROCESSING NODE] Engine ou database não encontrados no ObjectManager") |
|
|
logging.error(f"[PROCESSING NODE] engine: {engine}, database: {database}") |
|
|
logging.error(f"[PROCESSING NODE] Engines disponíveis: {list(obj_manager._engines.keys())}") |
|
|
logging.error(f"[PROCESSING NODE] Databases disponíveis: {list(obj_manager._databases.keys())}") |
|
|
return state |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] Usando engine {engine_id} e database {db_id} do estado atual") |
|
|
|
|
|
|
|
|
engine_dialect = str(engine.dialect.name).lower() |
|
|
connection_type = state.get("connection_type", "csv") |
|
|
single_table_mode = state.get("single_table_mode", False) |
|
|
selected_table = state.get("selected_table") |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] ===== DETECÇÃO DE CONEXÃO =====") |
|
|
logging.info(f"[PROCESSING NODE] Engine dialect detectado: {engine_dialect}") |
|
|
logging.info(f"[PROCESSING NODE] Connection type do estado: {connection_type}") |
|
|
logging.info(f"[PROCESSING NODE] Single table mode: {single_table_mode}") |
|
|
logging.info(f"[PROCESSING NODE] Selected table: {selected_table}") |
|
|
logging.info(f"[PROCESSING NODE] Engine URL: {str(engine.url)}") |
|
|
logging.info(f"[PROCESSING NODE] ===== FIM DETECÇÃO =====") |
|
|
|
|
|
|
|
|
if connection_type.lower() == "postgresql" and engine_dialect != "postgresql": |
|
|
logging.error(f"[PROCESSING NODE] INCONSISTÊNCIA: connection_type={connection_type} mas engine_dialect={engine_dialect}") |
|
|
logging.error(f"[PROCESSING NODE] Isso indica que está usando o engine errado!") |
|
|
elif connection_type.lower() == "csv" and engine_dialect != "sqlite": |
|
|
logging.error(f"[PROCESSING NODE] INCONSISTÊNCIA: connection_type={connection_type} mas engine_dialect={engine_dialect}") |
|
|
logging.error(f"[PROCESSING NODE] Isso indica que está usando o engine errado!") |
|
|
|
|
|
|
|
|
columns_data = {} |
|
|
import sqlalchemy as sa |
|
|
|
|
|
if engine_dialect == "postgresql": |
|
|
|
|
|
if single_table_mode and selected_table: |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] PostgreSQL - Modo tabela única: {selected_table}") |
|
|
columns_data[selected_table] = _extract_table_columns_info(engine, selected_table) |
|
|
|
|
|
else: |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] PostgreSQL - Modo multi-tabela") |
|
|
|
|
|
|
|
|
with engine.connect() as conn: |
|
|
tables_result = conn.execute(sa.text(""" |
|
|
SELECT table_name |
|
|
FROM information_schema.tables |
|
|
WHERE table_schema = 'public' |
|
|
ORDER BY table_name |
|
|
""")) |
|
|
available_tables = [row[0] for row in tables_result.fetchall()] |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] Tabelas encontradas: {available_tables}") |
|
|
|
|
|
|
|
|
for table_name in available_tables[:10]: |
|
|
columns_data[table_name] = _extract_table_columns_info(engine, table_name) |
|
|
|
|
|
else: |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] SQLite - processando tabela padrão") |
|
|
columns_data["tabela"] = _extract_table_columns_info(engine, "tabela") |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] ✅ Dados das colunas extraídos para {len(columns_data)} tabela(s)") |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"[PROCESSING NODE] ❌ Erro ao acessar banco de dados: {e}") |
|
|
logging.error(f"[PROCESSING NODE] Detalhes do erro: {str(e)}") |
|
|
logging.error(f"[PROCESSING NODE] Tipo do erro: {type(e)}") |
|
|
import traceback |
|
|
logging.error(f"[PROCESSING NODE] Traceback: {traceback.format_exc()}") |
|
|
return state |
|
|
|
|
|
|
|
|
processing_agent_id = state.get("processing_agent_id") |
|
|
|
|
|
if processing_agent_id: |
|
|
processing_agent = obj_manager.get_processing_agent(processing_agent_id) |
|
|
|
|
|
if processing_agent and processing_agent.model_name != processing_model: |
|
|
logging.info(f"[PROCESSING NODE] Recriando Processing Agent com modelo {processing_model}") |
|
|
processing_agent.recreate_llm(processing_model) |
|
|
else: |
|
|
logging.info(f"[PROCESSING NODE] Reutilizando Processing Agent existente com modelo {processing_agent.model_name}") |
|
|
else: |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] Criando novo Processing Agent com modelo {processing_model}") |
|
|
processing_agent = ProcessingAgentManager(processing_model) |
|
|
processing_agent_id = obj_manager.store_processing_agent(processing_agent) |
|
|
state["processing_agent_id"] = processing_agent_id |
|
|
logging.info(f"[PROCESSING NODE] Novo Processing Agent criado e armazenado com ID: {processing_agent_id}") |
|
|
|
|
|
|
|
|
connection_type = state.get("connection_type", "csv") |
|
|
single_table_mode = state.get("single_table_mode", False) |
|
|
selected_table = state.get("selected_table") |
|
|
|
|
|
|
|
|
available_tables = None |
|
|
if engine_dialect == "postgresql": |
|
|
available_tables = list(columns_data.keys()) |
|
|
logging.info(f"[PROCESSING NODE] Tabelas disponíveis para contexto: {available_tables}") |
|
|
|
|
|
|
|
|
processing_context = prepare_processing_context( |
|
|
user_input, |
|
|
columns_data, |
|
|
connection_type, |
|
|
single_table_mode, |
|
|
selected_table, |
|
|
available_tables |
|
|
) |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] ===== CONTEXTO PARA PRIMEIRA LLM =====") |
|
|
logging.info(f"{processing_context}") |
|
|
logging.info(f"[PROCESSING NODE] ===== FIM DO CONTEXTO =====") |
|
|
|
|
|
|
|
|
logging.info(f"[PROCESSING NODE] 🚀 Iniciando execução do Processing Agent...") |
|
|
logging.info(f"[PROCESSING NODE] Processing Agent: {processing_agent}") |
|
|
logging.info(f"[PROCESSING NODE] Modelo: {processing_agent.model_name if processing_agent else 'N/A'}") |
|
|
|
|
|
try: |
|
|
processing_result = await processing_agent.process_context(processing_context) |
|
|
logging.info(f"[PROCESSING NODE] ✅ Processing Agent executado com sucesso") |
|
|
except Exception as e: |
|
|
logging.error(f"[PROCESSING NODE] ❌ Erro na execução do Processing Agent: {e}") |
|
|
import traceback |
|
|
logging.error(f"[PROCESSING NODE] Traceback: {traceback.format_exc()}") |
|
|
return state |
|
|
|
|
|
|
|
|
logging.info(f"[PROCESSING NODE] ===== RESPOSTA DA PRIMEIRA LLM =====") |
|
|
logging.info(f"{processing_result.get('output', 'Sem resposta')}") |
|
|
logging.info(f"[PROCESSING NODE] ===== FIM DA RESPOSTA =====") |
|
|
|
|
|
if processing_result["success"]: |
|
|
|
|
|
suggested_query = processing_result.get("suggested_query", "") |
|
|
query_observations = processing_result.get("query_observations", "") |
|
|
|
|
|
|
|
|
state.update({ |
|
|
"suggested_query": suggested_query, |
|
|
"query_observations": query_observations, |
|
|
"processing_result": processing_result, |
|
|
"processing_success": True |
|
|
}) |
|
|
|
|
|
|
|
|
if suggested_query: |
|
|
logging.info(f"[PROCESSING NODE] ✅ Query SQL extraída com sucesso") |
|
|
logging.info(f"[PROCESSING NODE] ✅ Observações extraídas: {len(query_observations)} caracteres") |
|
|
logging.info(f"[PROCESSING NODE] 🎯 Query será incluída no contexto do SQL Agent") |
|
|
else: |
|
|
logging.warning(f"[PROCESSING NODE] ❌ Nenhuma query foi extraída - agente SQL funcionará normalmente") |
|
|
|
|
|
else: |
|
|
|
|
|
error_msg = processing_result.get("output", "Erro desconhecido") |
|
|
logging.error(f"[PROCESSING] Erro no processamento: {error_msg}") |
|
|
|
|
|
state.update({ |
|
|
"suggested_query": "", |
|
|
"query_observations": "", |
|
|
"processing_result": processing_result, |
|
|
"processing_success": False, |
|
|
"processing_error": error_msg |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erro no nó de processamento: {e}" |
|
|
logging.error(f"[PROCESSING] {error_msg}") |
|
|
|
|
|
|
|
|
state.update({ |
|
|
"suggested_query": "", |
|
|
"query_observations": "", |
|
|
"processing_success": False, |
|
|
"processing_error": error_msg |
|
|
}) |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
def should_use_processing(state: Dict[str, Any]) -> str: |
|
|
""" |
|
|
Determina se deve usar o Processing Agent |
|
|
|
|
|
Args: |
|
|
state: Estado atual |
|
|
|
|
|
Returns: |
|
|
Nome do próximo nó |
|
|
""" |
|
|
if state.get("processing_enabled", False): |
|
|
return "process_initial_context" |
|
|
else: |
|
|
return "prepare_context" |
|
|
|
|
|
|
|
|
async def validate_processing_input_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Valida entrada para o Processing Agent |
|
|
|
|
|
Args: |
|
|
state: Estado atual |
|
|
|
|
|
Returns: |
|
|
Estado validado |
|
|
""" |
|
|
try: |
|
|
logging.info("[PROCESSING VALIDATION] ===== VALIDANDO ENTRADA PARA PROCESSING AGENT =====") |
|
|
|
|
|
|
|
|
processing_enabled = state.get("processing_enabled", False) |
|
|
logging.info(f"[PROCESSING VALIDATION] Processing habilitado: {processing_enabled}") |
|
|
|
|
|
if not processing_enabled: |
|
|
logging.info("[PROCESSING VALIDATION] Processing desabilitado - pulando validação") |
|
|
return state |
|
|
|
|
|
|
|
|
processing_model = state.get("processing_model", "") |
|
|
logging.info(f"[PROCESSING VALIDATION] Modelo especificado: '{processing_model}'") |
|
|
|
|
|
if not processing_model: |
|
|
logging.warning("[PROCESSING VALIDATION] Modelo de processamento não especificado, usando padrão") |
|
|
state["processing_model"] = "gpt-4o-mini" |
|
|
logging.info(f"[PROCESSING VALIDATION] Modelo padrão definido: gpt-4o-mini") |
|
|
|
|
|
|
|
|
user_input = state.get("user_input", "") |
|
|
if not user_input or not user_input.strip(): |
|
|
logging.error("[PROCESSING VALIDATION] Entrada do usuário vazia - desabilitando processing") |
|
|
state["processing_enabled"] = False |
|
|
return state |
|
|
|
|
|
logging.info(f"[PROCESSING VALIDATION] Validação concluída com sucesso") |
|
|
logging.info(f"[PROCESSING VALIDATION] Modelo final: {state['processing_model']}") |
|
|
logging.info(f"[PROCESSING VALIDATION] Entrada: {user_input[:100]}...") |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"[PROCESSING VALIDATION] Erro na validação: {e}") |
|
|
state["processing_enabled"] = False |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
def _extract_table_columns_info(engine, table_name: str) -> list: |
|
|
""" |
|
|
Extrai informações das colunas de uma tabela específica |
|
|
|
|
|
Args: |
|
|
engine: Engine SQLAlchemy |
|
|
table_name: Nome da tabela |
|
|
|
|
|
Returns: |
|
|
Lista de dicionários com informações das colunas |
|
|
""" |
|
|
import sqlalchemy as sa |
|
|
import pandas as pd |
|
|
|
|
|
try: |
|
|
logging.info(f"[PROCESSING NODE] Extraindo informações da tabela: {table_name}") |
|
|
|
|
|
with engine.connect() as conn: |
|
|
|
|
|
try: |
|
|
result = conn.execute(sa.text(f"SELECT * FROM {table_name} LIMIT 5")) |
|
|
columns = result.keys() |
|
|
rows = result.fetchall() |
|
|
|
|
|
if rows: |
|
|
|
|
|
table_df = pd.DataFrame(rows, columns=columns) |
|
|
columns_info = [] |
|
|
|
|
|
for col in table_df.columns: |
|
|
col_data = table_df[col].dropna() |
|
|
|
|
|
col_info = { |
|
|
"column": col, |
|
|
"type": str(col_data.dtype) if len(col_data) > 0 else "object", |
|
|
"examples": "", |
|
|
"stats": "" |
|
|
} |
|
|
|
|
|
if len(col_data) > 0: |
|
|
|
|
|
unique_values = col_data.unique()[:3] |
|
|
col_info["examples"] = ", ".join([str(v) for v in unique_values]) |
|
|
|
|
|
|
|
|
if col_data.dtype in ['int64', 'float64']: |
|
|
try: |
|
|
min_val = col_data.min() |
|
|
max_val = col_data.max() |
|
|
col_info["stats"] = f" | Min: {min_val}, Max: {max_val}" |
|
|
except: |
|
|
pass |
|
|
|
|
|
columns_info.append(col_info) |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] ✅ Tabela {table_name}: {len(columns_info)} colunas com dados") |
|
|
return columns_info |
|
|
|
|
|
else: |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] ⚠️ Tabela {table_name} sem dados - obtendo apenas estrutura") |
|
|
|
|
|
|
|
|
if str(engine.dialect.name).lower() == "postgresql": |
|
|
schema_result = conn.execute(sa.text(f""" |
|
|
SELECT column_name, data_type |
|
|
FROM information_schema.columns |
|
|
WHERE table_name = '{table_name}' |
|
|
ORDER BY ordinal_position |
|
|
""")) |
|
|
|
|
|
columns_info = [] |
|
|
for row in schema_result.fetchall(): |
|
|
col_info = { |
|
|
"column": row[0], |
|
|
"type": row[1], |
|
|
"examples": "(sem dados)", |
|
|
"stats": "" |
|
|
} |
|
|
columns_info.append(col_info) |
|
|
else: |
|
|
|
|
|
pragma_result = conn.execute(sa.text(f"PRAGMA table_info({table_name})")) |
|
|
columns_info = [] |
|
|
for row in pragma_result.fetchall(): |
|
|
col_info = { |
|
|
"column": row[1], |
|
|
"type": row[2], |
|
|
"examples": "(sem dados)", |
|
|
"stats": "" |
|
|
} |
|
|
columns_info.append(col_info) |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] ✅ Tabela {table_name}: {len(columns_info)} colunas (estrutura apenas)") |
|
|
return columns_info |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
logging.warning(f"[PROCESSING NODE] Erro ao acessar dados da tabela {table_name}: {e}") |
|
|
|
|
|
try: |
|
|
|
|
|
if str(engine.dialect.name).lower() == "postgresql": |
|
|
schema_result = conn.execute(sa.text(f""" |
|
|
SELECT column_name, data_type |
|
|
FROM information_schema.columns |
|
|
WHERE table_name = '{table_name}' |
|
|
ORDER BY ordinal_position |
|
|
""")) |
|
|
|
|
|
columns_info = [] |
|
|
for row in schema_result.fetchall(): |
|
|
col_info = { |
|
|
"column": row[0], |
|
|
"type": row[1], |
|
|
"examples": "(erro ao acessar dados)", |
|
|
"stats": "" |
|
|
} |
|
|
columns_info.append(col_info) |
|
|
else: |
|
|
|
|
|
pragma_result = conn.execute(sa.text(f"PRAGMA table_info({table_name})")) |
|
|
columns_info = [] |
|
|
for row in pragma_result.fetchall(): |
|
|
col_info = { |
|
|
"column": row[1], |
|
|
"type": row[2], |
|
|
"examples": "(erro ao acessar dados)", |
|
|
"stats": "" |
|
|
} |
|
|
columns_info.append(col_info) |
|
|
|
|
|
logging.info(f"[PROCESSING NODE] ⚠️ Tabela {table_name}: {len(columns_info)} colunas (fallback)") |
|
|
return columns_info |
|
|
|
|
|
except Exception as e2: |
|
|
logging.error(f"[PROCESSING NODE] ❌ Erro total ao processar tabela {table_name}: {e2}") |
|
|
return [] |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"[PROCESSING NODE] ❌ Erro ao extrair informações da tabela {table_name}: {e}") |
|
|
return [] |
|
|
|