DataGraph / nodes /cache_node.py
rwayz's picture
Deploy
88b683e
"""
Nó para gerenciamento de cache e histórico
"""
import logging
from typing import Dict, Any
from utils.object_manager import get_object_manager
async def update_history_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para atualizar histórico e logs
Args:
state: Estado atual do agente
Returns:
Estado atualizado
"""
try:
obj_manager = get_object_manager()
cache_id = state.get("cache_id")
if not cache_id:
logging.warning("[HISTORY] ID do cache não encontrado")
return state
cache_manager = obj_manager.get_cache_manager(cache_id)
if not cache_manager:
logging.warning("[HISTORY] Cache manager não encontrado")
return state
# Adiciona ao histórico de logs
history_entry = {
"Modelo AgentSQL": state.get("selected_model", ""),
"Pergunta": state.get("user_input", ""),
"Resposta": state.get("response", ""),
"Tempo de Resposta (s)": round(state.get("execution_time", 0.0), 2),
"Modo Avançado": state.get("advanced_mode", False),
"Refinado": state.get("refined", False),
"Erro": state.get("error"),
"Tipo de Query": state.get("query_type", "sql_query")
}
cache_manager.add_to_history(history_entry)
# Atualiza histórico recente
cache_manager.update_recent_history(
state.get("user_input", ""),
state.get("response", "")
)
state["history_updated"] = True
logging.info("[HISTORY] Histórico atualizado")
except Exception as e:
error_msg = f"Erro ao atualizar histórico: {e}"
logging.error(f"[HISTORY] {error_msg}")
state["history_error"] = error_msg
return state
async def cache_response_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para armazenar resposta no cache
Args:
state: Estado com resposta a ser cacheada
Returns:
Estado atualizado
"""
try:
obj_manager = get_object_manager()
cache_id = state.get("cache_id")
if not cache_id:
logging.warning("[CACHE] ID do cache não encontrado")
return state
cache_manager = obj_manager.get_cache_manager(cache_id)
if not cache_manager:
logging.warning("[CACHE] Cache manager não encontrado")
return state
user_input = state.get("user_input", "")
response = state.get("response", "")
if user_input and response and not state.get("error"):
cache_manager.cache_response(user_input, response)
state["cached"] = True
logging.info(f"[CACHE] Resposta cacheada para: {user_input[:50]}...")
else:
state["cached"] = False
logging.info("[CACHE] Resposta não cacheada (erro ou dados insuficientes)")
except Exception as e:
error_msg = f"Erro ao cachear resposta: {e}"
logging.error(f"[CACHE] {error_msg}")
state["cache_error"] = error_msg
return state
async def get_cache_stats_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para obter estatísticas do cache
Args:
state: Estado atual
Returns:
Estado com estatísticas do cache
"""
try:
obj_manager = get_object_manager()
cache_id = state.get("cache_id")
if not cache_id:
state["cache_stats"] = {}
return state
cache_manager = obj_manager.get_cache_manager(cache_id)
if not cache_manager:
state["cache_stats"] = {}
return state
# Coleta estatísticas
cache_stats = {
"cached_queries": len(cache_manager.query_cache),
"history_entries": len(cache_manager.history_log),
"recent_history_size": len(cache_manager.recent_history),
"cache_hit_rate": 0.0 # Seria calculado com mais dados históricos
}
# Calcula taxa de acerto aproximada
if cache_stats["history_entries"] > 0:
# Estimativa simples baseada em queries repetidas
unique_queries = len(set(entry.get("Pergunta", "") for entry in cache_manager.history_log))
if unique_queries > 0:
cache_stats["cache_hit_rate"] = max(0, 1 - (unique_queries / cache_stats["history_entries"]))
state["cache_stats"] = cache_stats
logging.info(f"[CACHE] Estatísticas coletadas: {cache_stats}")
except Exception as e:
error_msg = f"Erro ao obter estatísticas do cache: {e}"
logging.error(f"[CACHE] {error_msg}")
state["cache_stats"] = {}
return state
async def clear_cache_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para limpar cache
Args:
state: Estado atual
Returns:
Estado atualizado
"""
try:
obj_manager = get_object_manager()
cache_id = state.get("cache_id")
if not cache_id:
state["cache_cleared"] = False
return state
cache_manager = obj_manager.get_cache_manager(cache_id)
if not cache_manager:
state["cache_cleared"] = False
return state
# Limpa cache
cache_manager.clear_cache()
state["cache_cleared"] = True
logging.info("[CACHE] Cache limpo")
except Exception as e:
error_msg = f"Erro ao limpar cache: {e}"
logging.error(f"[CACHE] {error_msg}")
state["cache_cleared"] = False
state["cache_error"] = error_msg
return state
async def check_cache_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para verificar se existe resposta em cache
Args:
state: Estado com consulta do usuário
Returns:
Estado com resultado da verificação de cache
"""
try:
obj_manager = get_object_manager()
cache_id = state.get("cache_id")
user_input = state.get("user_input", "")
if not cache_id or not user_input:
state["cache_hit"] = False
return state
cache_manager = obj_manager.get_cache_manager(cache_id)
if not cache_manager:
state["cache_hit"] = False
return state
# Verifica cache
cached_response = cache_manager.get_cached_response(user_input)
if cached_response:
state["cache_hit"] = True
state["response"] = cached_response
state["execution_time"] = 0.0
state["error"] = None
logging.info(f"[CACHE] Hit para: {user_input[:50]}...")
else:
state["cache_hit"] = False
logging.info(f"[CACHE] Miss para: {user_input[:50]}...")
except Exception as e:
error_msg = f"Erro ao verificar cache: {e}"
logging.error(f"[CACHE] {error_msg}")
state["cache_hit"] = False
state["cache_error"] = error_msg
return state