Spaces:
Running
Running
| from __future__ import annotations | |
| import importlib | |
| import inspect | |
| from typing import TYPE_CHECKING | |
| from loguru import logger | |
| from langflow.utils.concurrency import KeyedMemoryLockManager | |
| if TYPE_CHECKING: | |
| from langflow.services.base import Service | |
| from langflow.services.factory import ServiceFactory | |
| from langflow.services.schema import ServiceType | |
| class NoFactoryRegisteredError(Exception): | |
| pass | |
| class ServiceManager: | |
| """Manages the creation of different services.""" | |
| def __init__(self) -> None: | |
| self.services: dict[str, Service] = {} | |
| self.factories: dict[str, ServiceFactory] = {} | |
| self.register_factories() | |
| self.keyed_lock = KeyedMemoryLockManager() | |
| def register_factories(self) -> None: | |
| for factory in self.get_factories(): | |
| try: | |
| self.register_factory(factory) | |
| except Exception: # noqa: BLE001 | |
| logger.exception(f"Error initializing {factory}") | |
| def register_factory( | |
| self, | |
| service_factory: ServiceFactory, | |
| ) -> None: | |
| """Registers a new factory with dependencies.""" | |
| service_name = service_factory.service_class.name | |
| self.factories[service_name] = service_factory | |
| def get(self, service_name: ServiceType, default: ServiceFactory | None = None) -> Service: | |
| """Get (or create) a service by its name.""" | |
| with self.keyed_lock.lock(service_name): | |
| if service_name not in self.services: | |
| self._create_service(service_name, default) | |
| return self.services[service_name] | |
| def _create_service(self, service_name: ServiceType, default: ServiceFactory | None = None) -> None: | |
| """Create a new service given its name, handling dependencies.""" | |
| logger.debug(f"Create service {service_name}") | |
| self._validate_service_creation(service_name, default) | |
| # Create dependencies first | |
| factory = self.factories.get(service_name) | |
| if factory is None and default is not None: | |
| self.register_factory(default) | |
| factory = default | |
| if factory is None: | |
| msg = f"No factory registered for {service_name}" | |
| raise NoFactoryRegisteredError(msg) | |
| for dependency in factory.dependencies: | |
| if dependency not in self.services: | |
| self._create_service(dependency) | |
| # Collect the dependent services | |
| dependent_services = {dep.value: self.services[dep] for dep in factory.dependencies} | |
| # Create the actual service | |
| self.services[service_name] = self.factories[service_name].create(**dependent_services) | |
| self.services[service_name].set_ready() | |
| def _validate_service_creation(self, service_name: ServiceType, default: ServiceFactory | None = None) -> None: | |
| """Validate whether the service can be created.""" | |
| if service_name not in self.factories and default is None: | |
| msg = f"No factory registered for the service class '{service_name.name}'" | |
| raise NoFactoryRegisteredError(msg) | |
| def update(self, service_name: ServiceType) -> None: | |
| """Update a service by its name.""" | |
| if service_name in self.services: | |
| logger.debug(f"Update service {service_name}") | |
| self.services.pop(service_name, None) | |
| self.get(service_name) | |
| async def teardown(self) -> None: | |
| """Teardown all the services.""" | |
| for service in self.services.values(): | |
| if service is None: | |
| continue | |
| logger.debug(f"Teardown service {service.name}") | |
| try: | |
| await service.teardown() | |
| except Exception as exc: # noqa: BLE001 | |
| logger.exception(exc) | |
| self.services = {} | |
| self.factories = {} | |
| def get_factories(): | |
| from langflow.services.factory import ServiceFactory | |
| from langflow.services.schema import ServiceType | |
| service_names = [ServiceType(service_type).value.replace("_service", "") for service_type in ServiceType] | |
| base_module = "langflow.services" | |
| factories = [] | |
| for name in service_names: | |
| try: | |
| module_name = f"{base_module}.{name}.factory" | |
| module = importlib.import_module(module_name) | |
| # Find all classes in the module that are subclasses of ServiceFactory | |
| for _, obj in inspect.getmembers(module, inspect.isclass): | |
| if issubclass(obj, ServiceFactory) and obj is not ServiceFactory: | |
| factories.append(obj()) | |
| break | |
| except Exception as exc: | |
| logger.exception(exc) | |
| msg = f"Could not initialize services. Please check your settings. Error in {name}." | |
| raise RuntimeError(msg) from exc | |
| return factories | |
| service_manager = ServiceManager() | |
| def initialize_settings_service() -> None: | |
| """Initialize the settings manager.""" | |
| from langflow.services.settings import factory as settings_factory | |
| service_manager.register_factory(settings_factory.SettingsServiceFactory()) | |
| def initialize_session_service() -> None: | |
| """Initialize the session manager.""" | |
| from langflow.services.cache import factory as cache_factory | |
| from langflow.services.session import factory as session_service_factory | |
| initialize_settings_service() | |
| service_manager.register_factory(cache_factory.CacheServiceFactory()) | |
| service_manager.register_factory(session_service_factory.SessionServiceFactory()) | |