File size: 10,309 Bytes
88b683e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7094511
88b683e
7094511
88b683e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7094511
88b683e
 
 
7094511
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88b683e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
"""
Nó para processamento de consultas SQL
"""
import time
import logging
import pandas as pd
from typing import Dict, Any, TypedDict

from agents.tools import is_greeting, detect_query_type, prepare_sql_context
from agents.sql_agent import SQLAgentManager
from utils.object_manager import get_object_manager

class QueryState(TypedDict):
    """Estado para processamento de consultas"""
    user_input: str
    selected_model: str
    response: str
    execution_time: float
    error: str
    intermediate_steps: list
    llama_instruction: str
    sql_result: dict

async def process_user_query_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Nó principal para processar consulta do usuário
    
    Args:
        state: Estado atual com entrada do usuário
        
    Returns:
        Estado atualizado com resposta processada
    """
    start_time = time.time()
    user_input = state["user_input"]
    selected_model = state["selected_model"]
    
    logging.info(f"[QUERY] Processando: {user_input[:50]}...")
    
    try:
        # Verifica se é saudação
        if is_greeting(user_input):
            greeting_response = "Olá! Estou aqui para ajudar com suas consultas. Pergunte algo relacionado aos dados carregados no agente!"
            state.update({
                "response": greeting_response,
                "execution_time": time.time() - start_time,
                "error": None
            })
            return state
        
        # Recupera objetos necessários
        obj_manager = get_object_manager()
        
        # Recupera cache manager
        cache_id = state.get("cache_id")
        cache_manager = obj_manager.get_cache_manager(cache_id) if cache_id else None
        
        # CACHE TEMPORARIAMENTE DESATIVADO
        # Verifica cache se disponível
        if False:  # cache_manager:
            cached_response = cache_manager.get_cached_response(user_input)
            if cached_response:
                logging.info(f"[CACHE] Retornando resposta do cache")
                state.update({
                    "response": cached_response,
                    "execution_time": time.time() - start_time,
                    "error": None
                })
                return state
        
        # Converte amostra do banco para DataFrame
        db_sample_dict = state.get("db_sample_dict", {})
        if not db_sample_dict:
            raise ValueError("Amostra do banco não disponível")
        
        # Reconstrói DataFrame da amostra
        db_sample = pd.DataFrame(db_sample_dict.get("data", []))
        if db_sample.empty:
            raise ValueError("Dados de amostra vazios")
        
        # Detecta tipo de query e prepara contexto
        query_type = detect_query_type(user_input)
        state["query_type"] = query_type

        if query_type in ['sql_query', 'sql_query_graphic']:
            # Obtém sugestão de query e observações do Processing Agent (se disponível)
            suggested_query = state.get("suggested_query", "")
            query_observations = state.get("query_observations", "")

            # Prepara contexto para envio direto ao agentSQL
            sql_context = prepare_sql_context(user_input, db_sample, suggested_query, query_observations)
            state["sql_context"] = sql_context

            logging.info(f"[DEBUG] Tipo de query detectado: {query_type}")
            if suggested_query:
                logging.info(f"[DEBUG] Query sugerida pelo Processing Agent incluída no contexto")
            logging.info(f"[DEBUG] Contexto preparado para agentSQL")
        else:
            # Para tipos futuros (prediction)
            error_msg = f"Tipo de query '{query_type}' ainda não implementado."
            state.update({
                "error": error_msg,
                "response": error_msg,
                "execution_time": time.time() - start_time
            })
            return state
        
        # Recupera agente SQL
        agent_id = state.get("agent_id")
        if not agent_id:
            raise ValueError("ID do agente SQL não encontrado")

        sql_agent = obj_manager.get_sql_agent(agent_id)
        if not sql_agent:
            raise ValueError("Agente SQL não encontrado")

        # Verifica se precisa recriar o agente SQL para PostgreSQL com configurações atuais
        connection_type = state.get("connection_type", "csv")
        if connection_type == "postgresql":
            single_table_mode = state.get("single_table_mode", False)
            selected_table = state.get("selected_table")
            selected_model = state.get("selected_model", "gpt-4o-mini")

            # Verifica se as configurações mudaram
            current_single_mode = getattr(sql_agent, 'single_table_mode', False)
            current_table = getattr(sql_agent, 'selected_table', None)
            current_model = getattr(sql_agent, 'model_name', 'gpt-4o-mini')

            if (single_table_mode != current_single_mode or
                selected_table != current_table or
                selected_model != current_model):

                logging.info(f"[QUERY] Recriando agente SQL - Modo: {'único' if single_table_mode else 'multi'}, Tabela: {selected_table}")

                # Recria o agente com as novas configurações
                sql_agent.recreate_agent(
                    single_table_mode=single_table_mode,
                    selected_table=selected_table,
                    new_model=selected_model
                )

                # Atualiza no ObjectManager
                obj_manager.store_sql_agent(sql_agent, state.get("db_id"))
        
        # Executa query no agente SQL com contexto direto
        sql_result = await sql_agent.execute_query(state["sql_context"])

        # Log da resposta do agente SQL
        logging.info(f"[AGENT SQL] ===== RESPOSTA DO AGENTE SQL =====")
        logging.info(f"[AGENT SQL] Sucesso: {sql_result['success']}")
        logging.info(f"[AGENT SQL] Resposta completa:")
        logging.info(f"{sql_result.get('output', 'Sem resposta')}")
        if sql_result.get("sql_query"):
            logging.info(f"[AGENT SQL] Query SQL capturada: {sql_result['sql_query']}")
        logging.info(f"[AGENT SQL] ===== FIM DA RESPOSTA =====")

        if not sql_result["success"]:
            state.update({
                "error": sql_result["output"],
                "response": sql_result["output"],
                "sql_result": sql_result
            })
        else:
            # Captura query SQL do resultado do agente
            sql_query_captured = sql_result.get("sql_query")

            state.update({
                "response": sql_result["output"],
                "intermediate_steps": sql_result["intermediate_steps"],
                "sql_result": sql_result,
                "sql_query_extracted": sql_query_captured,  # ← Query SQL capturada
                "error": None
            })

            # Log apenas se não foi capturada (caso de erro)
            if not sql_query_captured:
                logging.warning("[QUERY] ⚠️ Nenhuma query SQL foi capturada pelo handler")
        
        # Armazena no cache se disponível
        if cache_manager and sql_result["success"]:
            cache_manager.cache_response(user_input, state["response"])
        
        state["execution_time"] = time.time() - start_time
        logging.info(f"[QUERY] Concluído em {state['execution_time']:.2f}s")
        
    except Exception as e:
        error_msg = f"Erro ao processar query: {e}"
        logging.error(f"[QUERY] {error_msg}")
        state.update({
            "error": error_msg,
            "response": error_msg,
            "execution_time": time.time() - start_time
        })
    
    return state

async def validate_query_input_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Nó para validar entrada da consulta
    
    Args:
        state: Estado com entrada do usuário
        
    Returns:
        Estado atualizado com validação
    """
    user_input = state.get("user_input", "").strip()
    
    if not user_input:
        state.update({
            "error": "Entrada vazia",
            "response": "Por favor, digite uma pergunta.",
            "execution_time": 0.0
        })
        return state
    
    if len(user_input) > 1000:
        state.update({
            "error": "Entrada muito longa",
            "response": "Pergunta muito longa. Por favor, seja mais conciso.",
            "execution_time": 0.0
        })
        return state
    
    # Validação passou
    state["error"] = None
    logging.info(f"[VALIDATION] Entrada validada: {len(user_input)} caracteres")
    
    return state

async def prepare_query_context_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Nó para preparar contexto da consulta
    
    Args:
        state: Estado atual
        
    Returns:
        Estado com contexto preparado
    """
    try:
        # Verifica se todos os componentes necessários estão disponíveis
        required_ids = ["agent_id", "engine_id", "cache_id"]
        missing_ids = [id_name for id_name in required_ids if not state.get(id_name)]
        
        if missing_ids:
            raise ValueError(f"IDs necessários não encontrados: {missing_ids}")
        
        obj_manager = get_object_manager()
        
        # Verifica se objetos existem
        for id_name in required_ids:
            obj_id = state[id_name]
            if id_name == "agent_id":
                obj = obj_manager.get_sql_agent(obj_id)
            elif id_name == "engine_id":
                obj = obj_manager.get_engine(obj_id)
            elif id_name == "cache_id":
                obj = obj_manager.get_cache_manager(obj_id)
            
            if obj is None:
                raise ValueError(f"Objeto não encontrado para {id_name}: {obj_id}")
        
        # Contexto preparado com sucesso
        state["context_ready"] = True
        logging.info("[CONTEXT] Contexto da consulta preparado")
        
    except Exception as e:
        error_msg = f"Erro ao preparar contexto: {e}"
        logging.error(f"[CONTEXT] {error_msg}")
        state.update({
            "error": error_msg,
            "context_ready": False
        })
    
    return state