|
|
""" |
|
|
Nó para gerenciamento de cache e histórico |
|
|
""" |
|
|
import logging |
|
|
from typing import Dict, Any |
|
|
|
|
|
from utils.object_manager import get_object_manager |
|
|
|
|
|
async def update_history_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para atualizar histórico e logs |
|
|
|
|
|
Args: |
|
|
state: Estado atual do agente |
|
|
|
|
|
Returns: |
|
|
Estado atualizado |
|
|
""" |
|
|
try: |
|
|
obj_manager = get_object_manager() |
|
|
cache_id = state.get("cache_id") |
|
|
|
|
|
if not cache_id: |
|
|
logging.warning("[HISTORY] ID do cache não encontrado") |
|
|
return state |
|
|
|
|
|
cache_manager = obj_manager.get_cache_manager(cache_id) |
|
|
if not cache_manager: |
|
|
logging.warning("[HISTORY] Cache manager não encontrado") |
|
|
return state |
|
|
|
|
|
|
|
|
history_entry = { |
|
|
"Modelo AgentSQL": state.get("selected_model", ""), |
|
|
"Pergunta": state.get("user_input", ""), |
|
|
"Resposta": state.get("response", ""), |
|
|
"Tempo de Resposta (s)": round(state.get("execution_time", 0.0), 2), |
|
|
"Modo Avançado": state.get("advanced_mode", False), |
|
|
"Refinado": state.get("refined", False), |
|
|
"Erro": state.get("error"), |
|
|
"Tipo de Query": state.get("query_type", "sql_query") |
|
|
} |
|
|
cache_manager.add_to_history(history_entry) |
|
|
|
|
|
|
|
|
cache_manager.update_recent_history( |
|
|
state.get("user_input", ""), |
|
|
state.get("response", "") |
|
|
) |
|
|
|
|
|
state["history_updated"] = True |
|
|
logging.info("[HISTORY] Histórico atualizado") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erro ao atualizar histórico: {e}" |
|
|
logging.error(f"[HISTORY] {error_msg}") |
|
|
state["history_error"] = error_msg |
|
|
|
|
|
return state |
|
|
|
|
|
async def cache_response_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para armazenar resposta no cache |
|
|
|
|
|
Args: |
|
|
state: Estado com resposta a ser cacheada |
|
|
|
|
|
Returns: |
|
|
Estado atualizado |
|
|
""" |
|
|
try: |
|
|
obj_manager = get_object_manager() |
|
|
cache_id = state.get("cache_id") |
|
|
|
|
|
if not cache_id: |
|
|
logging.warning("[CACHE] ID do cache não encontrado") |
|
|
return state |
|
|
|
|
|
cache_manager = obj_manager.get_cache_manager(cache_id) |
|
|
if not cache_manager: |
|
|
logging.warning("[CACHE] Cache manager não encontrado") |
|
|
return state |
|
|
|
|
|
user_input = state.get("user_input", "") |
|
|
response = state.get("response", "") |
|
|
|
|
|
if user_input and response and not state.get("error"): |
|
|
cache_manager.cache_response(user_input, response) |
|
|
state["cached"] = True |
|
|
logging.info(f"[CACHE] Resposta cacheada para: {user_input[:50]}...") |
|
|
else: |
|
|
state["cached"] = False |
|
|
logging.info("[CACHE] Resposta não cacheada (erro ou dados insuficientes)") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erro ao cachear resposta: {e}" |
|
|
logging.error(f"[CACHE] {error_msg}") |
|
|
state["cache_error"] = error_msg |
|
|
|
|
|
return state |
|
|
|
|
|
async def get_cache_stats_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para obter estatísticas do cache |
|
|
|
|
|
Args: |
|
|
state: Estado atual |
|
|
|
|
|
Returns: |
|
|
Estado com estatísticas do cache |
|
|
""" |
|
|
try: |
|
|
obj_manager = get_object_manager() |
|
|
cache_id = state.get("cache_id") |
|
|
|
|
|
if not cache_id: |
|
|
state["cache_stats"] = {} |
|
|
return state |
|
|
|
|
|
cache_manager = obj_manager.get_cache_manager(cache_id) |
|
|
if not cache_manager: |
|
|
state["cache_stats"] = {} |
|
|
return state |
|
|
|
|
|
|
|
|
cache_stats = { |
|
|
"cached_queries": len(cache_manager.query_cache), |
|
|
"history_entries": len(cache_manager.history_log), |
|
|
"recent_history_size": len(cache_manager.recent_history), |
|
|
"cache_hit_rate": 0.0 |
|
|
} |
|
|
|
|
|
|
|
|
if cache_stats["history_entries"] > 0: |
|
|
|
|
|
unique_queries = len(set(entry.get("Pergunta", "") for entry in cache_manager.history_log)) |
|
|
if unique_queries > 0: |
|
|
cache_stats["cache_hit_rate"] = max(0, 1 - (unique_queries / cache_stats["history_entries"])) |
|
|
|
|
|
state["cache_stats"] = cache_stats |
|
|
logging.info(f"[CACHE] Estatísticas coletadas: {cache_stats}") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erro ao obter estatísticas do cache: {e}" |
|
|
logging.error(f"[CACHE] {error_msg}") |
|
|
state["cache_stats"] = {} |
|
|
|
|
|
return state |
|
|
|
|
|
async def clear_cache_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para limpar cache |
|
|
|
|
|
Args: |
|
|
state: Estado atual |
|
|
|
|
|
Returns: |
|
|
Estado atualizado |
|
|
""" |
|
|
try: |
|
|
obj_manager = get_object_manager() |
|
|
cache_id = state.get("cache_id") |
|
|
|
|
|
if not cache_id: |
|
|
state["cache_cleared"] = False |
|
|
return state |
|
|
|
|
|
cache_manager = obj_manager.get_cache_manager(cache_id) |
|
|
if not cache_manager: |
|
|
state["cache_cleared"] = False |
|
|
return state |
|
|
|
|
|
|
|
|
cache_manager.clear_cache() |
|
|
state["cache_cleared"] = True |
|
|
|
|
|
logging.info("[CACHE] Cache limpo") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erro ao limpar cache: {e}" |
|
|
logging.error(f"[CACHE] {error_msg}") |
|
|
state["cache_cleared"] = False |
|
|
state["cache_error"] = error_msg |
|
|
|
|
|
return state |
|
|
|
|
|
async def check_cache_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para verificar se existe resposta em cache |
|
|
|
|
|
Args: |
|
|
state: Estado com consulta do usuário |
|
|
|
|
|
Returns: |
|
|
Estado com resultado da verificação de cache |
|
|
""" |
|
|
try: |
|
|
obj_manager = get_object_manager() |
|
|
cache_id = state.get("cache_id") |
|
|
user_input = state.get("user_input", "") |
|
|
|
|
|
if not cache_id or not user_input: |
|
|
state["cache_hit"] = False |
|
|
return state |
|
|
|
|
|
cache_manager = obj_manager.get_cache_manager(cache_id) |
|
|
if not cache_manager: |
|
|
state["cache_hit"] = False |
|
|
return state |
|
|
|
|
|
|
|
|
cached_response = cache_manager.get_cached_response(user_input) |
|
|
|
|
|
if cached_response: |
|
|
state["cache_hit"] = True |
|
|
state["response"] = cached_response |
|
|
state["execution_time"] = 0.0 |
|
|
state["error"] = None |
|
|
logging.info(f"[CACHE] Hit para: {user_input[:50]}...") |
|
|
else: |
|
|
state["cache_hit"] = False |
|
|
logging.info(f"[CACHE] Miss para: {user_input[:50]}...") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erro ao verificar cache: {e}" |
|
|
logging.error(f"[CACHE] {error_msg}") |
|
|
state["cache_hit"] = False |
|
|
state["cache_error"] = error_msg |
|
|
|
|
|
return state |
|
|
|