DataGraph / utils /object_manager.py
rwayz's picture
ss
7094511
"""
Gerenciador de objetos não-serializáveis para LangGraph
"""
import uuid
from typing import Dict, Any, Optional
import logging
class ObjectManager:
"""
Gerencia objetos não-serializáveis que não podem ser incluídos no estado do LangGraph
"""
def __init__(self):
self._objects: Dict[str, Any] = {}
self._sql_agents: Dict[str, Any] = {}
self._processing_agents: Dict[str, Any] = {}
self._engines: Dict[str, Any] = {}
self._databases: Dict[str, Any] = {}
self._cache_managers: Dict[str, Any] = {}
# Mapeamento para relacionar agentes com seus bancos
self._agent_db_mapping: Dict[str, str] = {}
# Metadados de conexões (CSV/PostgreSQL)
self._connection_metadata: Dict[str, Dict[str, Any]] = {}
def store_sql_agent(self, agent: Any, db_id: str = None) -> str:
"""Armazena agente SQL e retorna ID"""
agent_id = str(uuid.uuid4())
self._sql_agents[agent_id] = agent
# Mapeia agente com seu banco se fornecido
if db_id:
self._agent_db_mapping[agent_id] = db_id
logging.info(f"Agente SQL armazenado com ID: {agent_id}")
return agent_id
def get_sql_agent(self, agent_id: str) -> Optional[Any]:
"""Recupera agente SQL pelo ID"""
return self._sql_agents.get(agent_id)
def store_processing_agent(self, agent: Any) -> str:
"""Armazena Processing Agent e retorna ID"""
agent_id = str(uuid.uuid4())
self._processing_agents[agent_id] = agent
logging.info(f"Processing Agent armazenado com ID: {agent_id}")
return agent_id
def get_processing_agent(self, agent_id: str) -> Optional[Any]:
"""Recupera Processing Agent pelo ID"""
return self._processing_agents.get(agent_id)
def store_engine(self, engine: Any) -> str:
"""Armazena engine e retorna ID"""
engine_id = str(uuid.uuid4())
self._engines[engine_id] = engine
logging.info(f"Engine armazenada com ID: {engine_id}")
return engine_id
def get_engine(self, engine_id: str) -> Optional[Any]:
"""Recupera engine pelo ID"""
return self._engines.get(engine_id)
def store_database(self, database: Any) -> str:
"""Armazena banco de dados e retorna ID"""
db_id = str(uuid.uuid4())
self._databases[db_id] = database
logging.info(f"Banco de dados armazenado com ID: {db_id}")
return db_id
def get_database(self, db_id: str) -> Optional[Any]:
"""Recupera banco de dados pelo ID"""
return self._databases.get(db_id)
def get_db_id_for_agent(self, agent_id: str) -> Optional[str]:
"""Recupera ID do banco associado ao agente"""
return self._agent_db_mapping.get(agent_id)
def store_cache_manager(self, cache_manager: Any) -> str:
"""Armazena cache manager e retorna ID"""
cache_id = str(uuid.uuid4())
self._cache_managers[cache_id] = cache_manager
logging.info(f"Cache manager armazenado com ID: {cache_id}")
return cache_id
def get_cache_manager(self, cache_id: str) -> Optional[Any]:
"""Recupera cache manager pelo ID"""
return self._cache_managers.get(cache_id)
def store_object(self, obj: Any, category: str = "general") -> str:
"""Armazena objeto genérico e retorna ID"""
obj_id = str(uuid.uuid4())
self._objects[obj_id] = {"object": obj, "category": category}
logging.info(f"Objeto {category} armazenado com ID: {obj_id}")
return obj_id
def get_object(self, obj_id: str) -> Optional[Any]:
"""Recupera objeto pelo ID"""
obj_data = self._objects.get(obj_id)
return obj_data["object"] if obj_data else None
def update_sql_agent(self, agent_id: str, new_agent: Any) -> bool:
"""Atualiza agente SQL existente"""
if agent_id in self._sql_agents:
self._sql_agents[agent_id] = new_agent
logging.info(f"Agente SQL atualizado: {agent_id}")
return True
return False
def update_engine(self, engine_id: str, new_engine: Any) -> bool:
"""Atualiza engine existente"""
if engine_id in self._engines:
self._engines[engine_id] = new_engine
logging.info(f"Engine atualizada: {engine_id}")
return True
return False
def update_cache_manager(self, cache_id: str, new_cache_manager: Any) -> bool:
"""Atualiza cache manager existente"""
if cache_id in self._cache_managers:
self._cache_managers[cache_id] = new_cache_manager
logging.info(f"Cache manager atualizado: {cache_id}")
return True
return False
def clear_all(self):
"""Limpa todos os objetos armazenados"""
self._objects.clear()
self._sql_agents.clear()
self._engines.clear()
self._databases.clear()
self._cache_managers.clear()
self._agent_db_mapping.clear()
self._connection_metadata.clear()
logging.info("Todos os objetos foram limpos do gerenciador")
def store_connection_metadata(self, connection_id: str, metadata: Dict[str, Any]) -> str:
"""
Armazena metadados de conexão
Args:
connection_id: ID da conexão
metadata: Metadados da conexão
Returns:
ID dos metadados armazenados
"""
self._connection_metadata[connection_id] = metadata
logging.info(f"Metadados de conexão armazenados com ID: {connection_id}")
return connection_id
def get_connection_metadata(self, connection_id: str) -> Optional[Dict[str, Any]]:
"""
Recupera metadados de conexão pelo ID
Args:
connection_id: ID da conexão
Returns:
Metadados da conexão ou None se não encontrado
"""
return self._connection_metadata.get(connection_id)
def get_all_connection_metadata(self) -> Dict[str, Dict[str, Any]]:
"""
Retorna todos os metadados de conexão
Returns:
Dicionário com todos os metadados
"""
return self._connection_metadata.copy()
def get_stats(self) -> Dict[str, int]:
"""Retorna estatísticas dos objetos armazenados"""
return {
"sql_agents": len(self._sql_agents),
"engines": len(self._engines),
"databases": len(self._databases),
"cache_managers": len(self._cache_managers),
"general_objects": len(self._objects),
"agent_db_mappings": len(self._agent_db_mapping),
"connection_metadata": len(self._connection_metadata)
}
# Instância global do gerenciador
_object_manager: Optional[ObjectManager] = None
def get_object_manager() -> ObjectManager:
"""Retorna instância singleton do gerenciador de objetos"""
global _object_manager
if _object_manager is None:
_object_manager = ObjectManager()
return _object_manager
def reset_object_manager():
"""Reseta o gerenciador de objetos"""
global _object_manager
if _object_manager:
_object_manager.clear_all()
_object_manager = ObjectManager()