File size: 10,309 Bytes
88b683e 7094511 88b683e 7094511 88b683e 7094511 88b683e 7094511 88b683e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
"""
Nó para processamento de consultas SQL
"""
import time
import logging
import pandas as pd
from typing import Dict, Any, TypedDict
from agents.tools import is_greeting, detect_query_type, prepare_sql_context
from agents.sql_agent import SQLAgentManager
from utils.object_manager import get_object_manager
class QueryState(TypedDict):
"""Estado para processamento de consultas"""
user_input: str
selected_model: str
response: str
execution_time: float
error: str
intermediate_steps: list
llama_instruction: str
sql_result: dict
async def process_user_query_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó principal para processar consulta do usuário
Args:
state: Estado atual com entrada do usuário
Returns:
Estado atualizado com resposta processada
"""
start_time = time.time()
user_input = state["user_input"]
selected_model = state["selected_model"]
logging.info(f"[QUERY] Processando: {user_input[:50]}...")
try:
# Verifica se é saudação
if is_greeting(user_input):
greeting_response = "Olá! Estou aqui para ajudar com suas consultas. Pergunte algo relacionado aos dados carregados no agente!"
state.update({
"response": greeting_response,
"execution_time": time.time() - start_time,
"error": None
})
return state
# Recupera objetos necessários
obj_manager = get_object_manager()
# Recupera cache manager
cache_id = state.get("cache_id")
cache_manager = obj_manager.get_cache_manager(cache_id) if cache_id else None
# CACHE TEMPORARIAMENTE DESATIVADO
# Verifica cache se disponível
if False: # cache_manager:
cached_response = cache_manager.get_cached_response(user_input)
if cached_response:
logging.info(f"[CACHE] Retornando resposta do cache")
state.update({
"response": cached_response,
"execution_time": time.time() - start_time,
"error": None
})
return state
# Converte amostra do banco para DataFrame
db_sample_dict = state.get("db_sample_dict", {})
if not db_sample_dict:
raise ValueError("Amostra do banco não disponível")
# Reconstrói DataFrame da amostra
db_sample = pd.DataFrame(db_sample_dict.get("data", []))
if db_sample.empty:
raise ValueError("Dados de amostra vazios")
# Detecta tipo de query e prepara contexto
query_type = detect_query_type(user_input)
state["query_type"] = query_type
if query_type in ['sql_query', 'sql_query_graphic']:
# Obtém sugestão de query e observações do Processing Agent (se disponível)
suggested_query = state.get("suggested_query", "")
query_observations = state.get("query_observations", "")
# Prepara contexto para envio direto ao agentSQL
sql_context = prepare_sql_context(user_input, db_sample, suggested_query, query_observations)
state["sql_context"] = sql_context
logging.info(f"[DEBUG] Tipo de query detectado: {query_type}")
if suggested_query:
logging.info(f"[DEBUG] Query sugerida pelo Processing Agent incluída no contexto")
logging.info(f"[DEBUG] Contexto preparado para agentSQL")
else:
# Para tipos futuros (prediction)
error_msg = f"Tipo de query '{query_type}' ainda não implementado."
state.update({
"error": error_msg,
"response": error_msg,
"execution_time": time.time() - start_time
})
return state
# Recupera agente SQL
agent_id = state.get("agent_id")
if not agent_id:
raise ValueError("ID do agente SQL não encontrado")
sql_agent = obj_manager.get_sql_agent(agent_id)
if not sql_agent:
raise ValueError("Agente SQL não encontrado")
# Verifica se precisa recriar o agente SQL para PostgreSQL com configurações atuais
connection_type = state.get("connection_type", "csv")
if connection_type == "postgresql":
single_table_mode = state.get("single_table_mode", False)
selected_table = state.get("selected_table")
selected_model = state.get("selected_model", "gpt-4o-mini")
# Verifica se as configurações mudaram
current_single_mode = getattr(sql_agent, 'single_table_mode', False)
current_table = getattr(sql_agent, 'selected_table', None)
current_model = getattr(sql_agent, 'model_name', 'gpt-4o-mini')
if (single_table_mode != current_single_mode or
selected_table != current_table or
selected_model != current_model):
logging.info(f"[QUERY] Recriando agente SQL - Modo: {'único' if single_table_mode else 'multi'}, Tabela: {selected_table}")
# Recria o agente com as novas configurações
sql_agent.recreate_agent(
single_table_mode=single_table_mode,
selected_table=selected_table,
new_model=selected_model
)
# Atualiza no ObjectManager
obj_manager.store_sql_agent(sql_agent, state.get("db_id"))
# Executa query no agente SQL com contexto direto
sql_result = await sql_agent.execute_query(state["sql_context"])
# Log da resposta do agente SQL
logging.info(f"[AGENT SQL] ===== RESPOSTA DO AGENTE SQL =====")
logging.info(f"[AGENT SQL] Sucesso: {sql_result['success']}")
logging.info(f"[AGENT SQL] Resposta completa:")
logging.info(f"{sql_result.get('output', 'Sem resposta')}")
if sql_result.get("sql_query"):
logging.info(f"[AGENT SQL] Query SQL capturada: {sql_result['sql_query']}")
logging.info(f"[AGENT SQL] ===== FIM DA RESPOSTA =====")
if not sql_result["success"]:
state.update({
"error": sql_result["output"],
"response": sql_result["output"],
"sql_result": sql_result
})
else:
# Captura query SQL do resultado do agente
sql_query_captured = sql_result.get("sql_query")
state.update({
"response": sql_result["output"],
"intermediate_steps": sql_result["intermediate_steps"],
"sql_result": sql_result,
"sql_query_extracted": sql_query_captured, # ← Query SQL capturada
"error": None
})
# Log apenas se não foi capturada (caso de erro)
if not sql_query_captured:
logging.warning("[QUERY] ⚠️ Nenhuma query SQL foi capturada pelo handler")
# Armazena no cache se disponível
if cache_manager and sql_result["success"]:
cache_manager.cache_response(user_input, state["response"])
state["execution_time"] = time.time() - start_time
logging.info(f"[QUERY] Concluído em {state['execution_time']:.2f}s")
except Exception as e:
error_msg = f"Erro ao processar query: {e}"
logging.error(f"[QUERY] {error_msg}")
state.update({
"error": error_msg,
"response": error_msg,
"execution_time": time.time() - start_time
})
return state
async def validate_query_input_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para validar entrada da consulta
Args:
state: Estado com entrada do usuário
Returns:
Estado atualizado com validação
"""
user_input = state.get("user_input", "").strip()
if not user_input:
state.update({
"error": "Entrada vazia",
"response": "Por favor, digite uma pergunta.",
"execution_time": 0.0
})
return state
if len(user_input) > 1000:
state.update({
"error": "Entrada muito longa",
"response": "Pergunta muito longa. Por favor, seja mais conciso.",
"execution_time": 0.0
})
return state
# Validação passou
state["error"] = None
logging.info(f"[VALIDATION] Entrada validada: {len(user_input)} caracteres")
return state
async def prepare_query_context_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para preparar contexto da consulta
Args:
state: Estado atual
Returns:
Estado com contexto preparado
"""
try:
# Verifica se todos os componentes necessários estão disponíveis
required_ids = ["agent_id", "engine_id", "cache_id"]
missing_ids = [id_name for id_name in required_ids if not state.get(id_name)]
if missing_ids:
raise ValueError(f"IDs necessários não encontrados: {missing_ids}")
obj_manager = get_object_manager()
# Verifica se objetos existem
for id_name in required_ids:
obj_id = state[id_name]
if id_name == "agent_id":
obj = obj_manager.get_sql_agent(obj_id)
elif id_name == "engine_id":
obj = obj_manager.get_engine(obj_id)
elif id_name == "cache_id":
obj = obj_manager.get_cache_manager(obj_id)
if obj is None:
raise ValueError(f"Objeto não encontrado para {id_name}: {obj_id}")
# Contexto preparado com sucesso
state["context_ready"] = True
logging.info("[CONTEXT] Contexto da consulta preparado")
except Exception as e:
error_msg = f"Erro ao preparar contexto: {e}"
logging.error(f"[CONTEXT] {error_msg}")
state.update({
"error": error_msg,
"context_ready": False
})
return state
|