|
|
""" |
|
|
Nó para processamento de consultas SQL |
|
|
""" |
|
|
import time |
|
|
import logging |
|
|
import pandas as pd |
|
|
from typing import Dict, Any, TypedDict |
|
|
|
|
|
from agents.tools import is_greeting, detect_query_type, prepare_sql_context |
|
|
from agents.sql_agent import SQLAgentManager |
|
|
from utils.object_manager import get_object_manager |
|
|
|
|
|
class QueryState(TypedDict): |
|
|
"""Estado para processamento de consultas""" |
|
|
user_input: str |
|
|
selected_model: str |
|
|
response: str |
|
|
execution_time: float |
|
|
error: str |
|
|
intermediate_steps: list |
|
|
llama_instruction: str |
|
|
sql_result: dict |
|
|
|
|
|
async def process_user_query_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó principal para processar consulta do usuário |
|
|
|
|
|
Args: |
|
|
state: Estado atual com entrada do usuário |
|
|
|
|
|
Returns: |
|
|
Estado atualizado com resposta processada |
|
|
""" |
|
|
start_time = time.time() |
|
|
user_input = state["user_input"] |
|
|
selected_model = state["selected_model"] |
|
|
|
|
|
logging.info(f"[QUERY] Processando: {user_input[:50]}...") |
|
|
|
|
|
try: |
|
|
|
|
|
if is_greeting(user_input): |
|
|
greeting_response = "Olá! Estou aqui para ajudar com suas consultas. Pergunte algo relacionado aos dados carregados no agente!" |
|
|
state.update({ |
|
|
"response": greeting_response, |
|
|
"execution_time": time.time() - start_time, |
|
|
"error": None |
|
|
}) |
|
|
return state |
|
|
|
|
|
|
|
|
obj_manager = get_object_manager() |
|
|
|
|
|
|
|
|
cache_id = state.get("cache_id") |
|
|
cache_manager = obj_manager.get_cache_manager(cache_id) if cache_id else None |
|
|
|
|
|
|
|
|
|
|
|
if False: |
|
|
cached_response = cache_manager.get_cached_response(user_input) |
|
|
if cached_response: |
|
|
logging.info(f"[CACHE] Retornando resposta do cache") |
|
|
state.update({ |
|
|
"response": cached_response, |
|
|
"execution_time": time.time() - start_time, |
|
|
"error": None |
|
|
}) |
|
|
return state |
|
|
|
|
|
|
|
|
db_sample_dict = state.get("db_sample_dict", {}) |
|
|
if not db_sample_dict: |
|
|
raise ValueError("Amostra do banco não disponível") |
|
|
|
|
|
|
|
|
db_sample = pd.DataFrame(db_sample_dict.get("data", [])) |
|
|
if db_sample.empty: |
|
|
raise ValueError("Dados de amostra vazios") |
|
|
|
|
|
|
|
|
query_type = detect_query_type(user_input) |
|
|
state["query_type"] = query_type |
|
|
|
|
|
if query_type in ['sql_query', 'sql_query_graphic']: |
|
|
|
|
|
suggested_query = state.get("suggested_query", "") |
|
|
query_observations = state.get("query_observations", "") |
|
|
|
|
|
|
|
|
sql_context = prepare_sql_context(user_input, db_sample, suggested_query, query_observations) |
|
|
state["sql_context"] = sql_context |
|
|
|
|
|
logging.info(f"[DEBUG] Tipo de query detectado: {query_type}") |
|
|
if suggested_query: |
|
|
logging.info(f"[DEBUG] Query sugerida pelo Processing Agent incluída no contexto") |
|
|
logging.info(f"[DEBUG] Contexto preparado para agentSQL") |
|
|
else: |
|
|
|
|
|
error_msg = f"Tipo de query '{query_type}' ainda não implementado." |
|
|
state.update({ |
|
|
"error": error_msg, |
|
|
"response": error_msg, |
|
|
"execution_time": time.time() - start_time |
|
|
}) |
|
|
return state |
|
|
|
|
|
|
|
|
agent_id = state.get("agent_id") |
|
|
if not agent_id: |
|
|
raise ValueError("ID do agente SQL não encontrado") |
|
|
|
|
|
sql_agent = obj_manager.get_sql_agent(agent_id) |
|
|
if not sql_agent: |
|
|
raise ValueError("Agente SQL não encontrado") |
|
|
|
|
|
|
|
|
connection_type = state.get("connection_type", "csv") |
|
|
if connection_type == "postgresql": |
|
|
single_table_mode = state.get("single_table_mode", False) |
|
|
selected_table = state.get("selected_table") |
|
|
selected_model = state.get("selected_model", "gpt-4o-mini") |
|
|
|
|
|
|
|
|
current_single_mode = getattr(sql_agent, 'single_table_mode', False) |
|
|
current_table = getattr(sql_agent, 'selected_table', None) |
|
|
current_model = getattr(sql_agent, 'model_name', 'gpt-4o-mini') |
|
|
|
|
|
if (single_table_mode != current_single_mode or |
|
|
selected_table != current_table or |
|
|
selected_model != current_model): |
|
|
|
|
|
logging.info(f"[QUERY] Recriando agente SQL - Modo: {'único' if single_table_mode else 'multi'}, Tabela: {selected_table}") |
|
|
|
|
|
|
|
|
sql_agent.recreate_agent( |
|
|
single_table_mode=single_table_mode, |
|
|
selected_table=selected_table, |
|
|
new_model=selected_model |
|
|
) |
|
|
|
|
|
|
|
|
obj_manager.store_sql_agent(sql_agent, state.get("db_id")) |
|
|
|
|
|
|
|
|
sql_result = await sql_agent.execute_query(state["sql_context"]) |
|
|
|
|
|
|
|
|
logging.info(f"[AGENT SQL] ===== RESPOSTA DO AGENTE SQL =====") |
|
|
logging.info(f"[AGENT SQL] Sucesso: {sql_result['success']}") |
|
|
logging.info(f"[AGENT SQL] Resposta completa:") |
|
|
logging.info(f"{sql_result.get('output', 'Sem resposta')}") |
|
|
if sql_result.get("sql_query"): |
|
|
logging.info(f"[AGENT SQL] Query SQL capturada: {sql_result['sql_query']}") |
|
|
logging.info(f"[AGENT SQL] ===== FIM DA RESPOSTA =====") |
|
|
|
|
|
if not sql_result["success"]: |
|
|
state.update({ |
|
|
"error": sql_result["output"], |
|
|
"response": sql_result["output"], |
|
|
"sql_result": sql_result |
|
|
}) |
|
|
else: |
|
|
|
|
|
sql_query_captured = sql_result.get("sql_query") |
|
|
|
|
|
state.update({ |
|
|
"response": sql_result["output"], |
|
|
"intermediate_steps": sql_result["intermediate_steps"], |
|
|
"sql_result": sql_result, |
|
|
"sql_query_extracted": sql_query_captured, |
|
|
"error": None |
|
|
}) |
|
|
|
|
|
|
|
|
if not sql_query_captured: |
|
|
logging.warning("[QUERY] ⚠️ Nenhuma query SQL foi capturada pelo handler") |
|
|
|
|
|
|
|
|
if cache_manager and sql_result["success"]: |
|
|
cache_manager.cache_response(user_input, state["response"]) |
|
|
|
|
|
state["execution_time"] = time.time() - start_time |
|
|
logging.info(f"[QUERY] Concluído em {state['execution_time']:.2f}s") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erro ao processar query: {e}" |
|
|
logging.error(f"[QUERY] {error_msg}") |
|
|
state.update({ |
|
|
"error": error_msg, |
|
|
"response": error_msg, |
|
|
"execution_time": time.time() - start_time |
|
|
}) |
|
|
|
|
|
return state |
|
|
|
|
|
async def validate_query_input_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para validar entrada da consulta |
|
|
|
|
|
Args: |
|
|
state: Estado com entrada do usuário |
|
|
|
|
|
Returns: |
|
|
Estado atualizado com validação |
|
|
""" |
|
|
user_input = state.get("user_input", "").strip() |
|
|
|
|
|
if not user_input: |
|
|
state.update({ |
|
|
"error": "Entrada vazia", |
|
|
"response": "Por favor, digite uma pergunta.", |
|
|
"execution_time": 0.0 |
|
|
}) |
|
|
return state |
|
|
|
|
|
if len(user_input) > 1000: |
|
|
state.update({ |
|
|
"error": "Entrada muito longa", |
|
|
"response": "Pergunta muito longa. Por favor, seja mais conciso.", |
|
|
"execution_time": 0.0 |
|
|
}) |
|
|
return state |
|
|
|
|
|
|
|
|
state["error"] = None |
|
|
logging.info(f"[VALIDATION] Entrada validada: {len(user_input)} caracteres") |
|
|
|
|
|
return state |
|
|
|
|
|
async def prepare_query_context_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para preparar contexto da consulta |
|
|
|
|
|
Args: |
|
|
state: Estado atual |
|
|
|
|
|
Returns: |
|
|
Estado com contexto preparado |
|
|
""" |
|
|
try: |
|
|
|
|
|
required_ids = ["agent_id", "engine_id", "cache_id"] |
|
|
missing_ids = [id_name for id_name in required_ids if not state.get(id_name)] |
|
|
|
|
|
if missing_ids: |
|
|
raise ValueError(f"IDs necessários não encontrados: {missing_ids}") |
|
|
|
|
|
obj_manager = get_object_manager() |
|
|
|
|
|
|
|
|
for id_name in required_ids: |
|
|
obj_id = state[id_name] |
|
|
if id_name == "agent_id": |
|
|
obj = obj_manager.get_sql_agent(obj_id) |
|
|
elif id_name == "engine_id": |
|
|
obj = obj_manager.get_engine(obj_id) |
|
|
elif id_name == "cache_id": |
|
|
obj = obj_manager.get_cache_manager(obj_id) |
|
|
|
|
|
if obj is None: |
|
|
raise ValueError(f"Objeto não encontrado para {id_name}: {obj_id}") |
|
|
|
|
|
|
|
|
state["context_ready"] = True |
|
|
logging.info("[CONTEXT] Contexto da consulta preparado") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erro ao preparar contexto: {e}" |
|
|
logging.error(f"[CONTEXT] {error_msg}") |
|
|
state.update({ |
|
|
"error": error_msg, |
|
|
"context_ready": False |
|
|
}) |
|
|
|
|
|
return state |
|
|
|