Spaces:
Running
Running
| from __future__ import annotations | |
| from collections.abc import Callable, Coroutine | |
| from typing import TYPE_CHECKING, Any | |
| from loguru import logger | |
| from langflow.services.base import Service | |
| from langflow.services.task.backends.anyio import AnyIOBackend | |
| from langflow.services.task.utils import get_celery_worker_status | |
| if TYPE_CHECKING: | |
| from langflow.services.settings.service import SettingsService | |
| from langflow.services.task.backends.base import TaskBackend | |
| def check_celery_availability(): | |
| try: | |
| from langflow.worker import celery_app | |
| status = get_celery_worker_status(celery_app) | |
| logger.debug(f"Celery status: {status}") | |
| except Exception: # noqa: BLE001 | |
| logger.opt(exception=True).debug("Celery not available") | |
| status = {"availability": None} | |
| return status | |
| class TaskService(Service): | |
| name = "task_service" | |
| def __init__(self, settings_service: SettingsService): | |
| self.settings_service = settings_service | |
| try: | |
| if self.settings_service.settings.celery_enabled: | |
| status = check_celery_availability() | |
| use_celery = status.get("availability") is not None | |
| else: | |
| use_celery = False | |
| except ImportError: | |
| use_celery = False | |
| self.use_celery = use_celery | |
| self.backend = self.get_backend() | |
| def backend_name(self) -> str: | |
| return self.backend.name | |
| def get_backend(self) -> TaskBackend: | |
| if self.use_celery: | |
| from langflow.services.task.backends.celery import CeleryBackend | |
| logger.debug("Using Celery backend") | |
| return CeleryBackend() | |
| logger.debug("Using AnyIO backend") | |
| return AnyIOBackend() | |
| # In your TaskService class | |
| async def launch_and_await_task( | |
| self, | |
| task_func: Callable[..., Any], | |
| *args: Any, | |
| **kwargs: Any, | |
| ) -> Any: | |
| if not self.use_celery: | |
| return None, await task_func(*args, **kwargs) | |
| if not hasattr(task_func, "apply"): | |
| msg = f"Task function {task_func} does not have an apply method" | |
| raise ValueError(msg) | |
| task = task_func.apply(args=args, kwargs=kwargs) | |
| result = task.get() | |
| # if result is coroutine | |
| if isinstance(result, Coroutine): | |
| result = await result | |
| return task.id, result | |
| async def launch_task(self, task_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: | |
| logger.debug(f"Launching task {task_func} with args {args} and kwargs {kwargs}") | |
| logger.debug(f"Using backend {self.backend}") | |
| task = self.backend.launch_task(task_func, *args, **kwargs) | |
| return await task if isinstance(task, Coroutine) else task | |
| def get_task(self, task_id: str) -> Any: | |
| return self.backend.get_task(task_id) | |