""" Nó para processamento de consultas SQL """ import time import logging import pandas as pd from typing import Dict, Any, TypedDict from agents.tools import is_greeting, detect_query_type, prepare_sql_context from agents.sql_agent import SQLAgentManager from utils.object_manager import get_object_manager class QueryState(TypedDict): """Estado para processamento de consultas""" user_input: str selected_model: str response: str execution_time: float error: str intermediate_steps: list llama_instruction: str sql_result: dict async def process_user_query_node(state: Dict[str, Any]) -> Dict[str, Any]: """ Nó principal para processar consulta do usuário Args: state: Estado atual com entrada do usuário Returns: Estado atualizado com resposta processada """ start_time = time.time() user_input = state["user_input"] selected_model = state["selected_model"] logging.info(f"[QUERY] Processando: {user_input[:50]}...") try: # Verifica se é saudação if is_greeting(user_input): greeting_response = "Olá! Estou aqui para ajudar com suas consultas. Pergunte algo relacionado aos dados carregados no agente!" state.update({ "response": greeting_response, "execution_time": time.time() - start_time, "error": None }) return state # Recupera objetos necessários obj_manager = get_object_manager() # Recupera cache manager cache_id = state.get("cache_id") cache_manager = obj_manager.get_cache_manager(cache_id) if cache_id else None # CACHE TEMPORARIAMENTE DESATIVADO # Verifica cache se disponível if False: # cache_manager: cached_response = cache_manager.get_cached_response(user_input) if cached_response: logging.info(f"[CACHE] Retornando resposta do cache") state.update({ "response": cached_response, "execution_time": time.time() - start_time, "error": None }) return state # Converte amostra do banco para DataFrame db_sample_dict = state.get("db_sample_dict", {}) if not db_sample_dict: raise ValueError("Amostra do banco não disponível") # Reconstrói DataFrame da amostra db_sample = pd.DataFrame(db_sample_dict.get("data", [])) if db_sample.empty: raise ValueError("Dados de amostra vazios") # Detecta tipo de query e prepara contexto query_type = detect_query_type(user_input) state["query_type"] = query_type if query_type in ['sql_query', 'sql_query_graphic']: # Obtém sugestão de query e observações do Processing Agent (se disponível) suggested_query = state.get("suggested_query", "") query_observations = state.get("query_observations", "") # Prepara contexto para envio direto ao agentSQL sql_context = prepare_sql_context(user_input, db_sample, suggested_query, query_observations) state["sql_context"] = sql_context logging.info(f"[DEBUG] Tipo de query detectado: {query_type}") if suggested_query: logging.info(f"[DEBUG] Query sugerida pelo Processing Agent incluída no contexto") logging.info(f"[DEBUG] Contexto preparado para agentSQL") else: # Para tipos futuros (prediction) error_msg = f"Tipo de query '{query_type}' ainda não implementado." state.update({ "error": error_msg, "response": error_msg, "execution_time": time.time() - start_time }) return state # Recupera agente SQL agent_id = state.get("agent_id") if not agent_id: raise ValueError("ID do agente SQL não encontrado") sql_agent = obj_manager.get_sql_agent(agent_id) if not sql_agent: raise ValueError("Agente SQL não encontrado") # Verifica se precisa recriar o agente SQL para PostgreSQL com configurações atuais connection_type = state.get("connection_type", "csv") if connection_type == "postgresql": single_table_mode = state.get("single_table_mode", False) selected_table = state.get("selected_table") selected_model = state.get("selected_model", "gpt-4o-mini") # Verifica se as configurações mudaram current_single_mode = getattr(sql_agent, 'single_table_mode', False) current_table = getattr(sql_agent, 'selected_table', None) current_model = getattr(sql_agent, 'model_name', 'gpt-4o-mini') if (single_table_mode != current_single_mode or selected_table != current_table or selected_model != current_model): logging.info(f"[QUERY] Recriando agente SQL - Modo: {'único' if single_table_mode else 'multi'}, Tabela: {selected_table}") # Recria o agente com as novas configurações sql_agent.recreate_agent( single_table_mode=single_table_mode, selected_table=selected_table, new_model=selected_model ) # Atualiza no ObjectManager obj_manager.store_sql_agent(sql_agent, state.get("db_id")) # Executa query no agente SQL com contexto direto sql_result = await sql_agent.execute_query(state["sql_context"]) # Log da resposta do agente SQL logging.info(f"[AGENT SQL] ===== RESPOSTA DO AGENTE SQL =====") logging.info(f"[AGENT SQL] Sucesso: {sql_result['success']}") logging.info(f"[AGENT SQL] Resposta completa:") logging.info(f"{sql_result.get('output', 'Sem resposta')}") if sql_result.get("sql_query"): logging.info(f"[AGENT SQL] Query SQL capturada: {sql_result['sql_query']}") logging.info(f"[AGENT SQL] ===== FIM DA RESPOSTA =====") if not sql_result["success"]: state.update({ "error": sql_result["output"], "response": sql_result["output"], "sql_result": sql_result }) else: # Captura query SQL do resultado do agente sql_query_captured = sql_result.get("sql_query") state.update({ "response": sql_result["output"], "intermediate_steps": sql_result["intermediate_steps"], "sql_result": sql_result, "sql_query_extracted": sql_query_captured, # ← Query SQL capturada "error": None }) # Log apenas se não foi capturada (caso de erro) if not sql_query_captured: logging.warning("[QUERY] ⚠️ Nenhuma query SQL foi capturada pelo handler") # Armazena no cache se disponível if cache_manager and sql_result["success"]: cache_manager.cache_response(user_input, state["response"]) state["execution_time"] = time.time() - start_time logging.info(f"[QUERY] Concluído em {state['execution_time']:.2f}s") except Exception as e: error_msg = f"Erro ao processar query: {e}" logging.error(f"[QUERY] {error_msg}") state.update({ "error": error_msg, "response": error_msg, "execution_time": time.time() - start_time }) return state async def validate_query_input_node(state: Dict[str, Any]) -> Dict[str, Any]: """ Nó para validar entrada da consulta Args: state: Estado com entrada do usuário Returns: Estado atualizado com validação """ user_input = state.get("user_input", "").strip() if not user_input: state.update({ "error": "Entrada vazia", "response": "Por favor, digite uma pergunta.", "execution_time": 0.0 }) return state if len(user_input) > 1000: state.update({ "error": "Entrada muito longa", "response": "Pergunta muito longa. Por favor, seja mais conciso.", "execution_time": 0.0 }) return state # Validação passou state["error"] = None logging.info(f"[VALIDATION] Entrada validada: {len(user_input)} caracteres") return state async def prepare_query_context_node(state: Dict[str, Any]) -> Dict[str, Any]: """ Nó para preparar contexto da consulta Args: state: Estado atual Returns: Estado com contexto preparado """ try: # Verifica se todos os componentes necessários estão disponíveis required_ids = ["agent_id", "engine_id", "cache_id"] missing_ids = [id_name for id_name in required_ids if not state.get(id_name)] if missing_ids: raise ValueError(f"IDs necessários não encontrados: {missing_ids}") obj_manager = get_object_manager() # Verifica se objetos existem for id_name in required_ids: obj_id = state[id_name] if id_name == "agent_id": obj = obj_manager.get_sql_agent(obj_id) elif id_name == "engine_id": obj = obj_manager.get_engine(obj_id) elif id_name == "cache_id": obj = obj_manager.get_cache_manager(obj_id) if obj is None: raise ValueError(f"Objeto não encontrado para {id_name}: {obj_id}") # Contexto preparado com sucesso state["context_ready"] = True logging.info("[CONTEXT] Contexto da consulta preparado") except Exception as e: error_msg = f"Erro ao preparar contexto: {e}" logging.error(f"[CONTEXT] {error_msg}") state.update({ "error": error_msg, "context_ready": False }) return state