Spaces:
Running
Running
| from __future__ import annotations | |
| from contextlib import asynccontextmanager, contextmanager | |
| from typing import TYPE_CHECKING | |
| from loguru import logger | |
| from langflow.services.schema import ServiceType | |
| if TYPE_CHECKING: | |
| from collections.abc import AsyncGenerator, Generator | |
| from sqlmodel import Session | |
| from sqlmodel.ext.asyncio.session import AsyncSession | |
| from langflow.services.cache.service import AsyncBaseCacheService, CacheService | |
| from langflow.services.chat.service import ChatService | |
| from langflow.services.database.service import DatabaseService | |
| from langflow.services.session.service import SessionService | |
| from langflow.services.settings.service import SettingsService | |
| from langflow.services.socket.service import SocketIOService | |
| from langflow.services.state.service import StateService | |
| from langflow.services.storage.service import StorageService | |
| from langflow.services.store.service import StoreService | |
| from langflow.services.task.service import TaskService | |
| from langflow.services.telemetry.service import TelemetryService | |
| from langflow.services.tracing.service import TracingService | |
| from langflow.services.variable.service import VariableService | |
| def get_service(service_type: ServiceType, default=None): | |
| """Retrieves the service instance for the given service type. | |
| Args: | |
| service_type (ServiceType): The type of service to retrieve. | |
| default (ServiceFactory, optional): The default ServiceFactory to use if the service is not found. | |
| Defaults to None. | |
| Returns: | |
| Any: The service instance. | |
| """ | |
| from langflow.services.manager import service_manager | |
| if not service_manager.factories: | |
| # ! This is a workaround to ensure that the service manager is initialized | |
| # ! Not optimal, but it works for now | |
| service_manager.register_factories() | |
| return service_manager.get(service_type, default) | |
| def get_telemetry_service() -> TelemetryService: | |
| """Retrieves the TelemetryService instance from the service manager. | |
| Returns: | |
| TelemetryService: The TelemetryService instance. | |
| """ | |
| from langflow.services.telemetry.factory import TelemetryServiceFactory | |
| return get_service(ServiceType.TELEMETRY_SERVICE, TelemetryServiceFactory()) | |
| def get_tracing_service() -> TracingService: | |
| """Retrieves the TracingService instance from the service manager. | |
| Returns: | |
| TracingService: The TracingService instance. | |
| """ | |
| from langflow.services.tracing.factory import TracingServiceFactory | |
| return get_service(ServiceType.TRACING_SERVICE, TracingServiceFactory()) | |
| def get_state_service() -> StateService: | |
| """Retrieves the StateService instance from the service manager. | |
| Returns: | |
| The StateService instance. | |
| """ | |
| from langflow.services.state.factory import StateServiceFactory | |
| return get_service(ServiceType.STATE_SERVICE, StateServiceFactory()) | |
| def get_socket_service() -> SocketIOService: | |
| """Get the SocketIOService instance from the service manager. | |
| Returns: | |
| SocketIOService: The SocketIOService instance. | |
| """ | |
| return get_service(ServiceType.SOCKETIO_SERVICE) # type: ignore[attr-defined] | |
| def get_storage_service() -> StorageService: | |
| """Retrieves the storage service instance. | |
| Returns: | |
| The storage service instance. | |
| """ | |
| from langflow.services.storage.factory import StorageServiceFactory | |
| return get_service(ServiceType.STORAGE_SERVICE, default=StorageServiceFactory()) | |
| def get_variable_service() -> VariableService: | |
| """Retrieves the VariableService instance from the service manager. | |
| Returns: | |
| The VariableService instance. | |
| """ | |
| from langflow.services.variable.factory import VariableServiceFactory | |
| return get_service(ServiceType.VARIABLE_SERVICE, VariableServiceFactory()) | |
| def get_settings_service() -> SettingsService: | |
| """Retrieves the SettingsService instance. | |
| If the service is not yet initialized, it will be initialized before returning. | |
| Returns: | |
| The SettingsService instance. | |
| Raises: | |
| ValueError: If the service cannot be retrieved or initialized. | |
| """ | |
| from langflow.services.settings.factory import SettingsServiceFactory | |
| return get_service(ServiceType.SETTINGS_SERVICE, SettingsServiceFactory()) | |
| def get_db_service() -> DatabaseService: | |
| """Retrieves the DatabaseService instance from the service manager. | |
| Returns: | |
| The DatabaseService instance. | |
| """ | |
| from langflow.services.database.factory import DatabaseServiceFactory | |
| return get_service(ServiceType.DATABASE_SERVICE, DatabaseServiceFactory()) | |
| async def get_session() -> AsyncGenerator[AsyncSession, None]: | |
| """Retrieves an async session from the database service. | |
| Yields: | |
| AsyncSession: An async session object. | |
| """ | |
| async with get_db_service().with_async_session() as session: | |
| yield session | |
| def session_scope() -> Generator[Session, None, None]: | |
| """Context manager for managing a session scope. | |
| This context manager is used to manage a session scope for database operations. | |
| It ensures that the session is properly committed if no exceptions occur, | |
| and rolled back if an exception is raised. | |
| Yields: | |
| Session: The session object. | |
| Raises: | |
| Exception: If an error occurs during the session scope. | |
| """ | |
| db_service = get_db_service() | |
| with db_service.with_session() as session: | |
| try: | |
| yield session | |
| session.commit() | |
| except Exception: | |
| logger.exception("An error occurred during the session scope.") | |
| session.rollback() | |
| raise | |
| async def async_session_scope() -> AsyncGenerator[AsyncSession, None]: | |
| """Context manager for managing an async session scope. | |
| This context manager is used to manage an async session scope for database operations. | |
| It ensures that the session is properly committed if no exceptions occur, | |
| and rolled back if an exception is raised. | |
| Yields: | |
| AsyncSession: The async session object. | |
| Raises: | |
| Exception: If an error occurs during the session scope. | |
| """ | |
| db_service = get_db_service() | |
| async with db_service.with_async_session() as session: | |
| try: | |
| yield session | |
| await session.commit() | |
| except Exception: | |
| logger.exception("An error occurred during the session scope.") | |
| await session.rollback() | |
| raise | |
| def get_cache_service() -> CacheService | AsyncBaseCacheService: | |
| """Retrieves the cache service from the service manager. | |
| Returns: | |
| The cache service instance. | |
| """ | |
| from langflow.services.cache.factory import CacheServiceFactory | |
| return get_service(ServiceType.CACHE_SERVICE, CacheServiceFactory()) | |
| def get_shared_component_cache_service() -> CacheService: | |
| """Retrieves the cache service from the service manager. | |
| Returns: | |
| The cache service instance. | |
| """ | |
| from langflow.services.shared_component_cache.factory import SharedComponentCacheServiceFactory | |
| return get_service(ServiceType.SHARED_COMPONENT_CACHE_SERVICE, SharedComponentCacheServiceFactory()) | |
| def get_session_service() -> SessionService: | |
| """Retrieves the session service from the service manager. | |
| Returns: | |
| The session service instance. | |
| """ | |
| from langflow.services.session.factory import SessionServiceFactory | |
| return get_service(ServiceType.SESSION_SERVICE, SessionServiceFactory()) | |
| def get_task_service() -> TaskService: | |
| """Retrieves the TaskService instance from the service manager. | |
| Returns: | |
| The TaskService instance. | |
| """ | |
| from langflow.services.task.factory import TaskServiceFactory | |
| return get_service(ServiceType.TASK_SERVICE, TaskServiceFactory()) | |
| def get_chat_service() -> ChatService: | |
| """Get the chat service instance. | |
| Returns: | |
| ChatService: The chat service instance. | |
| """ | |
| return get_service(ServiceType.CHAT_SERVICE) | |
| def get_store_service() -> StoreService: | |
| """Retrieves the StoreService instance from the service manager. | |
| Returns: | |
| StoreService: The StoreService instance. | |
| """ | |
| return get_service(ServiceType.STORE_SERVICE) | |