Spaces:
Running
Running
| import asyncio | |
| import json | |
| import os | |
| import re | |
| import warnings | |
| from contextlib import asynccontextmanager | |
| from http import HTTPStatus | |
| from pathlib import Path | |
| from urllib.parse import urlencode | |
| from fastapi import FastAPI, HTTPException, Request, Response, status | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from loguru import logger | |
| from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor | |
| from pydantic import PydanticDeprecatedSince20 | |
| from pydantic_core import PydanticSerializationError | |
| from rich import print as rprint | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| from langflow.api import health_check_router, log_router, router | |
| from langflow.initial_setup.setup import ( | |
| create_or_update_starter_projects, | |
| initialize_super_user_if_needed, | |
| load_flows_from_directory, | |
| ) | |
| from langflow.interface.types import get_and_cache_all_types_dict | |
| from langflow.interface.utils import setup_llm_caching | |
| from langflow.logging.logger import configure | |
| from langflow.middleware import ContentSizeLimitMiddleware | |
| from langflow.services.deps import get_settings_service, get_telemetry_service | |
| from langflow.services.utils import initialize_services, teardown_services | |
| # Ignore Pydantic deprecation warnings from Langchain | |
| warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20) | |
| MAX_PORT = 65535 | |
| class RequestCancelledMiddleware(BaseHTTPMiddleware): | |
| def __init__(self, app) -> None: | |
| super().__init__(app) | |
| async def dispatch(self, request: Request, call_next): | |
| sentinel = object() | |
| async def cancel_handler(): | |
| while True: | |
| if await request.is_disconnected(): | |
| return sentinel | |
| await asyncio.sleep(0.1) | |
| handler_task = asyncio.create_task(call_next(request)) | |
| cancel_task = asyncio.create_task(cancel_handler()) | |
| done, pending = await asyncio.wait([handler_task, cancel_task], return_when=asyncio.FIRST_COMPLETED) | |
| for task in pending: | |
| task.cancel() | |
| if cancel_task in done: | |
| return Response("Request was cancelled", status_code=499) | |
| return await handler_task | |
| class JavaScriptMIMETypeMiddleware(BaseHTTPMiddleware): | |
| async def dispatch(self, request: Request, call_next): | |
| try: | |
| response = await call_next(request) | |
| except Exception as exc: | |
| if isinstance(exc, PydanticSerializationError): | |
| message = ( | |
| "Something went wrong while serializing the response. " | |
| "Please share this error on our GitHub repository." | |
| ) | |
| error_messages = json.dumps([message, str(exc)]) | |
| raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=error_messages) from exc | |
| raise | |
| if ( | |
| "files/" not in request.url.path | |
| and request.url.path.endswith(".js") | |
| and response.status_code == HTTPStatus.OK | |
| ): | |
| response.headers["Content-Type"] = "text/javascript" | |
| return response | |
| def get_lifespan(*, fix_migration=False, version=None): | |
| telemetry_service = get_telemetry_service() | |
| async def lifespan(_app: FastAPI): | |
| configure(async_file=True) | |
| # Startup message | |
| if version: | |
| rprint(f"[bold green]Starting Langflow v{version}...[/bold green]") | |
| else: | |
| rprint("[bold green]Starting Langflow...[/bold green]") | |
| try: | |
| await initialize_services(fix_migration=fix_migration) | |
| setup_llm_caching() | |
| await initialize_super_user_if_needed() | |
| all_types_dict = await get_and_cache_all_types_dict(get_settings_service()) | |
| await asyncio.to_thread(create_or_update_starter_projects, all_types_dict) | |
| telemetry_service.start() | |
| await load_flows_from_directory() | |
| yield | |
| except Exception as exc: | |
| if "langflow migration --fix" not in str(exc): | |
| logger.exception(exc) | |
| raise | |
| finally: | |
| # Clean shutdown | |
| logger.info("Cleaning up resources...") | |
| await teardown_services() | |
| await logger.complete() | |
| # Final message | |
| rprint("[bold red]Langflow shutdown complete[/bold red]") | |
| return lifespan | |
| def create_app(): | |
| """Create the FastAPI app and include the router.""" | |
| from langflow.utils.version import get_version_info | |
| __version__ = get_version_info()["version"] | |
| configure() | |
| lifespan = get_lifespan(version=__version__) | |
| app = FastAPI(lifespan=lifespan, title="Langflow", version=__version__) | |
| app.add_middleware( | |
| ContentSizeLimitMiddleware, | |
| ) | |
| setup_sentry(app) | |
| origins = ["*"] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| app.add_middleware(JavaScriptMIMETypeMiddleware) | |
| async def check_boundary(request: Request, call_next): | |
| if "/api/v1/files/upload" in request.url.path: | |
| content_type = request.headers.get("Content-Type") | |
| if not content_type or "multipart/form-data" not in content_type or "boundary=" not in content_type: | |
| return JSONResponse( | |
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, | |
| content={"detail": "Content-Type header must be 'multipart/form-data' with a boundary parameter."}, | |
| ) | |
| boundary = content_type.split("boundary=")[-1].strip() | |
| if not re.match(r"^[\w\-]{1,70}$", boundary): | |
| return JSONResponse( | |
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, | |
| content={"detail": "Invalid boundary format"}, | |
| ) | |
| body = await request.body() | |
| boundary_start = f"--{boundary}".encode() | |
| boundary_end = f"--{boundary}--\r\n".encode() | |
| if not body.startswith(boundary_start) or not body.endswith(boundary_end): | |
| return JSONResponse( | |
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, | |
| content={"detail": "Invalid multipart formatting"}, | |
| ) | |
| return await call_next(request) | |
| async def flatten_query_string_lists(request: Request, call_next): | |
| flattened: list[tuple[str, str]] = [] | |
| for key, value in request.query_params.multi_items(): | |
| flattened.extend((key, entry) for entry in value.split(",")) | |
| request.scope["query_string"] = urlencode(flattened, doseq=True).encode("utf-8") | |
| return await call_next(request) | |
| settings = get_settings_service().settings | |
| if prome_port_str := os.environ.get("LANGFLOW_PROMETHEUS_PORT"): | |
| # set here for create_app() entry point | |
| prome_port = int(prome_port_str) | |
| if prome_port > 0 or prome_port < MAX_PORT: | |
| rprint(f"[bold green]Starting Prometheus server on port {prome_port}...[/bold green]") | |
| settings.prometheus_enabled = True | |
| settings.prometheus_port = prome_port | |
| else: | |
| msg = f"Invalid port number {prome_port_str}" | |
| raise ValueError(msg) | |
| if settings.prometheus_enabled: | |
| from prometheus_client import start_http_server | |
| start_http_server(settings.prometheus_port) | |
| app.include_router(router) | |
| app.include_router(health_check_router) | |
| app.include_router(log_router) | |
| async def exception_handler(_request: Request, exc: Exception): | |
| if isinstance(exc, HTTPException): | |
| logger.error(f"HTTPException: {exc}", exc_info=exc) | |
| return JSONResponse( | |
| status_code=exc.status_code, | |
| content={"message": str(exc.detail)}, | |
| ) | |
| logger.error(f"unhandled error: {exc}", exc_info=exc) | |
| return JSONResponse( | |
| status_code=HTTPStatus.INTERNAL_SERVER_ERROR, | |
| content={"message": str(exc)}, | |
| ) | |
| FastAPIInstrumentor.instrument_app(app) | |
| return app | |
| def setup_sentry(app: FastAPI) -> None: | |
| settings = get_settings_service().settings | |
| if settings.sentry_dsn: | |
| import sentry_sdk | |
| from sentry_sdk.integrations.asgi import SentryAsgiMiddleware | |
| sentry_sdk.init( | |
| dsn=settings.sentry_dsn, | |
| traces_sample_rate=settings.sentry_traces_sample_rate, | |
| profiles_sample_rate=settings.sentry_profiles_sample_rate, | |
| ) | |
| app.add_middleware(SentryAsgiMiddleware) | |
| def setup_static_files(app: FastAPI, static_files_dir: Path) -> None: | |
| """Setup the static files directory. | |
| Args: | |
| app (FastAPI): FastAPI app. | |
| static_files_dir (str): Path to the static files directory. | |
| """ | |
| app.mount( | |
| "/", | |
| StaticFiles(directory=static_files_dir, html=True), | |
| name="static", | |
| ) | |
| async def custom_404_handler(_request, _exc): | |
| path = static_files_dir / "index.html" | |
| if not path.exists(): | |
| msg = f"File at path {path} does not exist." | |
| raise RuntimeError(msg) | |
| return FileResponse(path) | |
| def get_static_files_dir(): | |
| """Get the static files directory relative to Langflow's main.py file.""" | |
| frontend_path = Path(__file__).parent | |
| return frontend_path / "frontend" | |
| def setup_app(static_files_dir: Path | None = None, *, backend_only: bool = False) -> FastAPI: | |
| """Setup the FastAPI app.""" | |
| # get the directory of the current file | |
| logger.info(f"Setting up app with static files directory {static_files_dir}") | |
| if not static_files_dir: | |
| static_files_dir = get_static_files_dir() | |
| if not backend_only and (not static_files_dir or not static_files_dir.exists()): | |
| msg = f"Static files directory {static_files_dir} does not exist." | |
| raise RuntimeError(msg) | |
| app = create_app() | |
| if not backend_only and static_files_dir is not None: | |
| setup_static_files(app, static_files_dir) | |
| return app | |
| if __name__ == "__main__": | |
| import uvicorn | |
| from langflow.__main__ import get_number_of_workers | |
| configure() | |
| uvicorn.run( | |
| "langflow.main:create_app", | |
| host="127.0.0.1", | |
| port=7860, | |
| workers=get_number_of_workers(), | |
| log_level="error", | |
| reload=True, | |
| loop="asyncio", | |
| ) | |