File size: 22,435 Bytes
88b683e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7094511
88b683e
 
 
7094511
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88b683e
 
7094511
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88b683e
 
7094511
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88b683e
7094511
88b683e
 
7094511
88b683e
7094511
 
 
88b683e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7094511
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88b683e
 
 
 
7094511
88b683e
7094511
 
 
 
 
 
 
 
 
 
 
 
88b683e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7094511
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
"""
Nó para processamento de contexto inicial usando Processing Agent
"""
import logging
import pandas as pd
from typing import Dict, Any

from agents.processing_agent import ProcessingAgentManager
from agents.tools import prepare_processing_context
from utils.object_manager import get_object_manager


async def process_initial_context_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Nó para processar contexto inicial com Processing Agent (opcional)
    
    Args:
        state: Estado atual do agente
        
    Returns:
        Estado atualizado com contexto processado
    """
    # Verifica se o processing está habilitado
    processing_enabled = state.get("processing_enabled", False)
    logging.info(f"[PROCESSING NODE] Processing enabled: {processing_enabled}")

    if not processing_enabled:
        logging.info("[PROCESSING NODE] Processing Agent desabilitado - pulando nó")
        return state

    logging.info("[PROCESSING NODE] ===== INICIANDO NÓ DE PROCESSAMENTO =====")
    
    try:
        user_input = state.get("user_input", "")
        processing_model = state.get("processing_model", "gpt-4o-mini")

        logging.info(f"[PROCESSING NODE] Entrada do usuário: {user_input[:100]}...")
        logging.info(f"[PROCESSING NODE] Modelo selecionado: {processing_model}")

        if not user_input:
            logging.warning("[PROCESSING NODE] Entrada do usuário não disponível")
            return state
        
        # Acessa o banco de dados correto baseado no estado atual
        obj_manager = get_object_manager()

        try:
            # Usa os IDs específicos do estado atual (não o primeiro disponível)
            engine_id = state.get("engine_id")
            db_id = state.get("db_id")

            logging.info(f"[PROCESSING NODE] ===== DEBUG ESTADO =====")
            logging.info(f"[PROCESSING NODE] engine_id do estado: {engine_id}")
            logging.info(f"[PROCESSING NODE] db_id do estado: {db_id}")
            logging.info(f"[PROCESSING NODE] connection_type do estado: {state.get('connection_type')}")
            logging.info(f"[PROCESSING NODE] Chaves disponíveis no estado: {list(state.keys())}")
            logging.info(f"[PROCESSING NODE] ===== FIM DEBUG =====")

            if not engine_id or not db_id:
                logging.error("[PROCESSING NODE] IDs de engine ou database não encontrados no estado")
                logging.error(f"[PROCESSING NODE] engine_id: {engine_id}, db_id: {db_id}")

                # Fallback: tenta usar os IDs disponíveis no ObjectManager
                logging.info("[PROCESSING NODE] Tentando fallback para IDs disponíveis...")
                engines = obj_manager._engines
                databases = obj_manager._databases

                if engines and databases:
                    engine_id = list(engines.keys())[-1]  # Pega o último (mais recente)
                    db_id = list(databases.keys())[-1]    # Pega o último (mais recente)
                    logging.info(f"[PROCESSING NODE] Fallback: usando engine_id={engine_id}, db_id={db_id}")
                else:
                    logging.error("[PROCESSING NODE] Nenhum engine ou database disponível no ObjectManager")
                    return state

            # Obtém engine e database específicos do estado atual
            engine = obj_manager.get_engine(engine_id)
            database = obj_manager.get_database(db_id)

            logging.info(f"[PROCESSING NODE] Engine obtido: {engine is not None}")
            logging.info(f"[PROCESSING NODE] Database obtido: {database is not None}")

            if not engine or not database:
                logging.error("[PROCESSING NODE] Engine ou database não encontrados no ObjectManager")
                logging.error(f"[PROCESSING NODE] engine: {engine}, database: {database}")
                logging.error(f"[PROCESSING NODE] Engines disponíveis: {list(obj_manager._engines.keys())}")
                logging.error(f"[PROCESSING NODE] Databases disponíveis: {list(obj_manager._databases.keys())}")
                return state

            logging.info(f"[PROCESSING NODE] Usando engine {engine_id} e database {db_id} do estado atual")

            # Detecta o tipo de engine baseado no dialect
            engine_dialect = str(engine.dialect.name).lower()
            connection_type = state.get("connection_type", "csv")
            single_table_mode = state.get("single_table_mode", False)
            selected_table = state.get("selected_table")

            logging.info(f"[PROCESSING NODE] ===== DETECÇÃO DE CONEXÃO =====")
            logging.info(f"[PROCESSING NODE] Engine dialect detectado: {engine_dialect}")
            logging.info(f"[PROCESSING NODE] Connection type do estado: {connection_type}")
            logging.info(f"[PROCESSING NODE] Single table mode: {single_table_mode}")
            logging.info(f"[PROCESSING NODE] Selected table: {selected_table}")
            logging.info(f"[PROCESSING NODE] Engine URL: {str(engine.url)}")
            logging.info(f"[PROCESSING NODE] ===== FIM DETECÇÃO =====")

            # Validação: engine dialect deve corresponder ao connection_type
            if connection_type.lower() == "postgresql" and engine_dialect != "postgresql":
                logging.error(f"[PROCESSING NODE] INCONSISTÊNCIA: connection_type={connection_type} mas engine_dialect={engine_dialect}")
                logging.error(f"[PROCESSING NODE] Isso indica que está usando o engine errado!")
            elif connection_type.lower() == "csv" and engine_dialect != "sqlite":
                logging.error(f"[PROCESSING NODE] INCONSISTÊNCIA: connection_type={connection_type} mas engine_dialect={engine_dialect}")
                logging.error(f"[PROCESSING NODE] Isso indica que está usando o engine errado!")

            # NOVA IMPLEMENTAÇÃO: Cria dados das colunas baseado no tipo de conexão
            columns_data = {}
            import sqlalchemy as sa

            if engine_dialect == "postgresql":
                # Para PostgreSQL, processa baseado no modo
                if single_table_mode and selected_table:
                    # Modo tabela única - processa APENAS a tabela selecionada
                    logging.info(f"[PROCESSING NODE] PostgreSQL - Modo tabela única: {selected_table}")
                    columns_data[selected_table] = _extract_table_columns_info(engine, selected_table)

                else:
                    # Modo multi-tabela - processa TODAS as tabelas disponíveis
                    logging.info(f"[PROCESSING NODE] PostgreSQL - Modo multi-tabela")

                    # Obtém lista de todas as tabelas
                    with engine.connect() as conn:
                        tables_result = conn.execute(sa.text("""
                            SELECT table_name
                            FROM information_schema.tables
                            WHERE table_schema = 'public'
                            ORDER BY table_name
                        """))
                        available_tables = [row[0] for row in tables_result.fetchall()]

                    logging.info(f"[PROCESSING NODE] Tabelas encontradas: {available_tables}")

                    # Processa cada tabela (máximo 5 para performance)
                    for table_name in available_tables[:10]:
                        columns_data[table_name] = _extract_table_columns_info(engine, table_name)

            else:
                # Para SQLite (CSV convertido), processa tabela padrão
                logging.info(f"[PROCESSING NODE] SQLite - processando tabela padrão")
                columns_data["tabela"] = _extract_table_columns_info(engine, "tabela")

            logging.info(f"[PROCESSING NODE] ✅ Dados das colunas extraídos para {len(columns_data)} tabela(s)")

        except Exception as e:
            logging.error(f"[PROCESSING NODE] ❌ Erro ao acessar banco de dados: {e}")
            logging.error(f"[PROCESSING NODE] Detalhes do erro: {str(e)}")
            logging.error(f"[PROCESSING NODE] Tipo do erro: {type(e)}")
            import traceback
            logging.error(f"[PROCESSING NODE] Traceback: {traceback.format_exc()}")
            return state
        
        # Recupera ou cria Processing Agent
        processing_agent_id = state.get("processing_agent_id")
        
        if processing_agent_id:
            processing_agent = obj_manager.get_processing_agent(processing_agent_id)
            # Verifica se precisa recriar com modelo diferente
            if processing_agent and processing_agent.model_name != processing_model:
                logging.info(f"[PROCESSING NODE] Recriando Processing Agent com modelo {processing_model}")
                processing_agent.recreate_llm(processing_model)
            else:
                logging.info(f"[PROCESSING NODE] Reutilizando Processing Agent existente com modelo {processing_agent.model_name}")
        else:
            # Cria novo Processing Agent
            logging.info(f"[PROCESSING NODE] Criando novo Processing Agent com modelo {processing_model}")
            processing_agent = ProcessingAgentManager(processing_model)
            processing_agent_id = obj_manager.store_processing_agent(processing_agent)
            state["processing_agent_id"] = processing_agent_id
            logging.info(f"[PROCESSING NODE] Novo Processing Agent criado e armazenado com ID: {processing_agent_id}")
        
        # Prepara contexto para o Processing Agent com dados já processados
        connection_type = state.get("connection_type", "csv")
        single_table_mode = state.get("single_table_mode", False)
        selected_table = state.get("selected_table")

        # Obtém lista de tabelas disponíveis se for PostgreSQL
        available_tables = None
        if engine_dialect == "postgresql":
            available_tables = list(columns_data.keys())
            logging.info(f"[PROCESSING NODE] Tabelas disponíveis para contexto: {available_tables}")

        # NOVA CHAMADA: Passa dados já processados em vez de fazer consultas redundantes
        processing_context = prepare_processing_context(
            user_input,
            columns_data,  # Dados já processados das colunas
            connection_type,
            single_table_mode,
            selected_table,
            available_tables
        )

        logging.info(f"[PROCESSING NODE] ===== CONTEXTO PARA PRIMEIRA LLM =====")
        logging.info(f"{processing_context}")
        logging.info(f"[PROCESSING NODE] ===== FIM DO CONTEXTO =====")

        # Executa processamento
        logging.info(f"[PROCESSING NODE] 🚀 Iniciando execução do Processing Agent...")
        logging.info(f"[PROCESSING NODE] Processing Agent: {processing_agent}")
        logging.info(f"[PROCESSING NODE] Modelo: {processing_agent.model_name if processing_agent else 'N/A'}")

        try:
            processing_result = await processing_agent.process_context(processing_context)
            logging.info(f"[PROCESSING NODE] ✅ Processing Agent executado com sucesso")
        except Exception as e:
            logging.error(f"[PROCESSING NODE] ❌ Erro na execução do Processing Agent: {e}")
            import traceback
            logging.error(f"[PROCESSING NODE] Traceback: {traceback.format_exc()}")
            return state

        # Log da resposta da primeira LLM
        logging.info(f"[PROCESSING NODE] ===== RESPOSTA DA PRIMEIRA LLM =====")
        logging.info(f"{processing_result.get('output', 'Sem resposta')}")
        logging.info(f"[PROCESSING NODE] ===== FIM DA RESPOSTA =====")

        if processing_result["success"]:
            # Extrai query sugerida e observações
            suggested_query = processing_result.get("suggested_query", "")
            query_observations = processing_result.get("query_observations", "")

            # Atualiza estado com resultados do processamento
            state.update({
                "suggested_query": suggested_query,
                "query_observations": query_observations,
                "processing_result": processing_result,
                "processing_success": True
            })
            
            # Log simples do resultado
            if suggested_query:
                logging.info(f"[PROCESSING NODE] ✅ Query SQL extraída com sucesso")
                logging.info(f"[PROCESSING NODE] ✅ Observações extraídas: {len(query_observations)} caracteres")
                logging.info(f"[PROCESSING NODE] 🎯 Query será incluída no contexto do SQL Agent")
            else:
                logging.warning(f"[PROCESSING NODE] ❌ Nenhuma query foi extraída - agente SQL funcionará normalmente")
            
        else:
            # Em caso de erro, continua sem processamento
            error_msg = processing_result.get("output", "Erro desconhecido")
            logging.error(f"[PROCESSING] Erro no processamento: {error_msg}")

            state.update({
                "suggested_query": "",
                "query_observations": "",
                "processing_result": processing_result,
                "processing_success": False,
                "processing_error": error_msg
            })
        
    except Exception as e:
        error_msg = f"Erro no nó de processamento: {e}"
        logging.error(f"[PROCESSING] {error_msg}")
        
        # Em caso de erro, continua sem processamento
        state.update({
            "suggested_query": "",
            "query_observations": "",
            "processing_success": False,
            "processing_error": error_msg
        })
    
    return state


def should_use_processing(state: Dict[str, Any]) -> str:
    """
    Determina se deve usar o Processing Agent
    
    Args:
        state: Estado atual
        
    Returns:
        Nome do próximo nó
    """
    if state.get("processing_enabled", False):
        return "process_initial_context"
    else:
        return "prepare_context"


async def validate_processing_input_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Valida entrada para o Processing Agent

    Args:
        state: Estado atual

    Returns:
        Estado validado
    """
    try:
        logging.info("[PROCESSING VALIDATION] ===== VALIDANDO ENTRADA PARA PROCESSING AGENT =====")

        # Verifica se processing está habilitado
        processing_enabled = state.get("processing_enabled", False)
        logging.info(f"[PROCESSING VALIDATION] Processing habilitado: {processing_enabled}")

        if not processing_enabled:
            logging.info("[PROCESSING VALIDATION] Processing desabilitado - pulando validação")
            return state

        # Valida modelo de processamento
        processing_model = state.get("processing_model", "")
        logging.info(f"[PROCESSING VALIDATION] Modelo especificado: '{processing_model}'")

        if not processing_model:
            logging.warning("[PROCESSING VALIDATION] Modelo de processamento não especificado, usando padrão")
            state["processing_model"] = "gpt-4o-mini"
            logging.info(f"[PROCESSING VALIDATION] Modelo padrão definido: gpt-4o-mini")

        # Valida entrada do usuário
        user_input = state.get("user_input", "")
        if not user_input or not user_input.strip():
            logging.error("[PROCESSING VALIDATION] Entrada do usuário vazia - desabilitando processing")
            state["processing_enabled"] = False
            return state

        logging.info(f"[PROCESSING VALIDATION] Validação concluída com sucesso")
        logging.info(f"[PROCESSING VALIDATION] Modelo final: {state['processing_model']}")
        logging.info(f"[PROCESSING VALIDATION] Entrada: {user_input[:100]}...")

    except Exception as e:
        logging.error(f"[PROCESSING VALIDATION] Erro na validação: {e}")
        state["processing_enabled"] = False

    return state


def _extract_table_columns_info(engine, table_name: str) -> list:
    """
    Extrai informações das colunas de uma tabela específica

    Args:
        engine: Engine SQLAlchemy
        table_name: Nome da tabela

    Returns:
        Lista de dicionários com informações das colunas
    """
    import sqlalchemy as sa
    import pandas as pd

    try:
        logging.info(f"[PROCESSING NODE] Extraindo informações da tabela: {table_name}")

        with engine.connect() as conn:
            # Primeiro, tenta obter dados da tabela (máximo 5 linhas)
            try:
                result = conn.execute(sa.text(f"SELECT * FROM {table_name} LIMIT 5"))
                columns = result.keys()
                rows = result.fetchall()

                if rows:
                    # Tabela com dados - processa normalmente
                    table_df = pd.DataFrame(rows, columns=columns)
                    columns_info = []

                    for col in table_df.columns:
                        col_data = table_df[col].dropna()

                        col_info = {
                            "column": col,
                            "type": str(col_data.dtype) if len(col_data) > 0 else "object",
                            "examples": "",
                            "stats": ""
                        }

                        if len(col_data) > 0:
                            # Adiciona exemplos de valores
                            unique_values = col_data.unique()[:3]
                            col_info["examples"] = ", ".join([str(v) for v in unique_values])

                            # Adiciona estatísticas para colunas numéricas
                            if col_data.dtype in ['int64', 'float64']:
                                try:
                                    min_val = col_data.min()
                                    max_val = col_data.max()
                                    col_info["stats"] = f" | Min: {min_val}, Max: {max_val}"
                                except:
                                    pass

                        columns_info.append(col_info)

                    logging.info(f"[PROCESSING NODE] ✅ Tabela {table_name}: {len(columns_info)} colunas com dados")
                    return columns_info

                else:
                    # Tabela sem dados - obtém apenas estrutura das colunas
                    logging.info(f"[PROCESSING NODE] ⚠️ Tabela {table_name} sem dados - obtendo apenas estrutura")

                    # Para PostgreSQL, obtém informações das colunas do schema
                    if str(engine.dialect.name).lower() == "postgresql":
                        schema_result = conn.execute(sa.text(f"""
                            SELECT column_name, data_type
                            FROM information_schema.columns
                            WHERE table_name = '{table_name}'
                            ORDER BY ordinal_position
                        """))

                        columns_info = []
                        for row in schema_result.fetchall():
                            col_info = {
                                "column": row[0],
                                "type": row[1],
                                "examples": "(sem dados)",
                                "stats": ""
                            }
                            columns_info.append(col_info)
                    else:
                        # Para SQLite, usa PRAGMA
                        pragma_result = conn.execute(sa.text(f"PRAGMA table_info({table_name})"))
                        columns_info = []
                        for row in pragma_result.fetchall():
                            col_info = {
                                "column": row[1],  # column name
                                "type": row[2],    # column type
                                "examples": "(sem dados)",
                                "stats": ""
                            }
                            columns_info.append(col_info)

                    logging.info(f"[PROCESSING NODE] ✅ Tabela {table_name}: {len(columns_info)} colunas (estrutura apenas)")
                    return columns_info

            except Exception as e:
                # Se falhar ao acessar a tabela, tenta obter pelo menos a estrutura
                logging.warning(f"[PROCESSING NODE] Erro ao acessar dados da tabela {table_name}: {e}")

                try:
                    # Fallback: obtém estrutura das colunas
                    if str(engine.dialect.name).lower() == "postgresql":
                        schema_result = conn.execute(sa.text(f"""
                            SELECT column_name, data_type
                            FROM information_schema.columns
                            WHERE table_name = '{table_name}'
                            ORDER BY ordinal_position
                        """))

                        columns_info = []
                        for row in schema_result.fetchall():
                            col_info = {
                                "column": row[0],
                                "type": row[1],
                                "examples": "(erro ao acessar dados)",
                                "stats": ""
                            }
                            columns_info.append(col_info)
                    else:
                        # Para SQLite
                        pragma_result = conn.execute(sa.text(f"PRAGMA table_info({table_name})"))
                        columns_info = []
                        for row in pragma_result.fetchall():
                            col_info = {
                                "column": row[1],
                                "type": row[2],
                                "examples": "(erro ao acessar dados)",
                                "stats": ""
                            }
                            columns_info.append(col_info)

                    logging.info(f"[PROCESSING NODE] ⚠️ Tabela {table_name}: {len(columns_info)} colunas (fallback)")
                    return columns_info

                except Exception as e2:
                    logging.error(f"[PROCESSING NODE] ❌ Erro total ao processar tabela {table_name}: {e2}")
                    return []

    except Exception as e:
        logging.error(f"[PROCESSING NODE] ❌ Erro ao extrair informações da tabela {table_name}: {e}")
        return []