Spaces:
Running
Running
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import sys | |
| from collections import deque | |
| from pathlib import Path | |
| from threading import Lock, Semaphore | |
| from typing import TypedDict | |
| import orjson | |
| from loguru import _defaults, logger | |
| from loguru._error_interceptor import ErrorInterceptor | |
| from loguru._file_sink import FileSink | |
| from loguru._simple_sinks import AsyncSink | |
| from platformdirs import user_cache_dir | |
| from rich.logging import RichHandler | |
| from typing_extensions import NotRequired | |
| from langflow.settings import DEV | |
| VALID_LOG_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] | |
| # Human-readable | |
| DEFAULT_LOG_FORMAT = ( | |
| "<green>{time:YYYY-MM-DD HH:mm:ss}</green> - <level>" "{level: <8}</level> - {module} - <level>{message}</level>" | |
| ) | |
| class SizedLogBuffer: | |
| def __init__( | |
| self, | |
| max_readers: int = 20, # max number of concurrent readers for the buffer | |
| ): | |
| """A buffer for storing log messages for the log retrieval API. | |
| The buffer can be overwritten by an env variable LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE | |
| because the logger is initialized before the settings_service are loaded. | |
| """ | |
| self.buffer: deque = deque() | |
| self._max_readers = max_readers | |
| self._wlock = Lock() | |
| self._rsemaphore = Semaphore(max_readers) | |
| self._max = 0 | |
| def get_write_lock(self) -> Lock: | |
| return self._wlock | |
| def write(self, message: str) -> None: | |
| record = json.loads(message) | |
| log_entry = record["text"] | |
| epoch = int(record["record"]["time"]["timestamp"] * 1000) | |
| with self._wlock: | |
| if len(self.buffer) >= self.max: | |
| for _ in range(len(self.buffer) - self.max + 1): | |
| self.buffer.popleft() | |
| self.buffer.append((epoch, log_entry)) | |
| def __len__(self) -> int: | |
| return len(self.buffer) | |
| def get_after_timestamp(self, timestamp: int, lines: int = 5) -> dict[int, str]: | |
| rc = {} | |
| self._rsemaphore.acquire() | |
| try: | |
| with self._wlock: | |
| for ts, msg in self.buffer: | |
| if lines == 0: | |
| break | |
| if ts >= timestamp and lines > 0: | |
| rc[ts] = msg | |
| lines -= 1 | |
| finally: | |
| self._rsemaphore.release() | |
| return rc | |
| def get_before_timestamp(self, timestamp: int, lines: int = 5) -> dict[int, str]: | |
| self._rsemaphore.acquire() | |
| try: | |
| with self._wlock: | |
| as_list = list(self.buffer) | |
| max_index = -1 | |
| for i, (ts, _) in enumerate(as_list): | |
| if ts >= timestamp: | |
| max_index = i | |
| break | |
| if max_index == -1: | |
| return self.get_last_n(lines) | |
| rc = {} | |
| start_from = max(max_index - lines, 0) | |
| for i, (ts, msg) in enumerate(as_list): | |
| if start_from <= i < max_index: | |
| rc[ts] = msg | |
| return rc | |
| finally: | |
| self._rsemaphore.release() | |
| def get_last_n(self, last_idx: int) -> dict[int, str]: | |
| self._rsemaphore.acquire() | |
| try: | |
| with self._wlock: | |
| as_list = list(self.buffer) | |
| return dict(as_list[-last_idx:]) | |
| finally: | |
| self._rsemaphore.release() | |
| def max(self) -> int: | |
| # Get it dynamically to allow for env variable changes | |
| if self._max == 0: | |
| env_buffer_size = os.getenv("LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE", "0") | |
| if env_buffer_size.isdigit(): | |
| self._max = int(env_buffer_size) | |
| return self._max | |
| def max(self, value: int) -> None: | |
| self._max = value | |
| def enabled(self) -> bool: | |
| return self.max > 0 | |
| def max_size(self) -> int: | |
| return self.max | |
| # log buffer for capturing log messages | |
| log_buffer = SizedLogBuffer() | |
| def serialize_log(record): | |
| subset = { | |
| "timestamp": record["time"].timestamp(), | |
| "message": record["message"], | |
| "level": record["level"].name, | |
| "module": record["module"], | |
| } | |
| return orjson.dumps(subset) | |
| def patching(record) -> None: | |
| record["extra"]["serialized"] = serialize_log(record) | |
| if DEV is False: | |
| record.pop("exception", None) | |
| class LogConfig(TypedDict): | |
| log_level: NotRequired[str] | |
| log_file: NotRequired[Path] | |
| disable: NotRequired[bool] | |
| log_env: NotRequired[str] | |
| log_format: NotRequired[str] | |
| class AsyncFileSink(AsyncSink): | |
| def __init__(self, file): | |
| self._sink = FileSink( | |
| path=file, | |
| rotation="10 MB", # Log rotation based on file size | |
| ) | |
| super().__init__(self.write_async, None, ErrorInterceptor(_defaults.LOGURU_CATCH, -1)) | |
| async def complete(self): | |
| await asyncio.to_thread(self._sink.stop) | |
| for task in self._tasks: | |
| await self._complete_task(task) | |
| async def write_async(self, message): | |
| await asyncio.to_thread(self._sink.write, message) | |
| def is_valid_log_format(format_string) -> bool: | |
| """Validates a logging format string by attempting to format it with a dummy LogRecord. | |
| Args: | |
| format_string (str): The format string to validate. | |
| Returns: | |
| bool: True if the format string is valid, False otherwise. | |
| """ | |
| record = logging.LogRecord( | |
| name="dummy", level=logging.INFO, pathname="dummy_path", lineno=0, msg="dummy message", args=None, exc_info=None | |
| ) | |
| formatter = logging.Formatter(format_string) | |
| try: | |
| # Attempt to format the record | |
| formatter.format(record) | |
| except (KeyError, ValueError, TypeError): | |
| logger.error("Invalid log format string passed, fallback to default") | |
| return False | |
| return True | |
| def configure( | |
| *, | |
| log_level: str | None = None, | |
| log_file: Path | None = None, | |
| disable: bool | None = False, | |
| log_env: str | None = None, | |
| log_format: str | None = None, | |
| async_file: bool = False, | |
| ) -> None: | |
| if disable and log_level is None and log_file is None: | |
| logger.disable("langflow") | |
| if os.getenv("LANGFLOW_LOG_LEVEL", "").upper() in VALID_LOG_LEVELS and log_level is None: | |
| log_level = os.getenv("LANGFLOW_LOG_LEVEL") | |
| if log_level is None: | |
| log_level = "ERROR" | |
| if log_file is None: | |
| env_log_file = os.getenv("LANGFLOW_LOG_FILE", "") | |
| log_file = Path(env_log_file) if env_log_file else None | |
| if log_env is None: | |
| log_env = os.getenv("LANGFLOW_LOG_ENV", "") | |
| logger.remove() # Remove default handlers | |
| logger.patch(patching) | |
| if log_env.lower() == "container" or log_env.lower() == "container_json": | |
| logger.add(sys.stdout, format="{message}", serialize=True) | |
| elif log_env.lower() == "container_csv": | |
| logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss.SSS} {level} {file} {line} {function} {message}") | |
| else: | |
| if os.getenv("LANGFLOW_LOG_FORMAT") and log_format is None: | |
| log_format = os.getenv("LANGFLOW_LOG_FORMAT") | |
| if log_format is None or not is_valid_log_format(log_format): | |
| log_format = DEFAULT_LOG_FORMAT | |
| # Configure loguru to use RichHandler | |
| logger.configure( | |
| handlers=[ | |
| { | |
| "sink": RichHandler(rich_tracebacks=True, markup=True), | |
| "format": log_format, | |
| "level": log_level.upper(), | |
| } | |
| ] | |
| ) | |
| if not log_file: | |
| cache_dir = Path(user_cache_dir("langflow")) | |
| logger.debug(f"Cache directory: {cache_dir}") | |
| log_file = cache_dir / "langflow.log" | |
| logger.debug(f"Log file: {log_file}") | |
| try: | |
| log_file.parent.mkdir(parents=True, exist_ok=True) | |
| logger.add( | |
| sink=AsyncFileSink(log_file) if async_file else log_file, | |
| level=log_level.upper(), | |
| format=log_format, | |
| serialize=True, | |
| ) | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Error setting up log file") | |
| if log_buffer.enabled(): | |
| logger.add(sink=log_buffer.write, format="{time} {level} {message}", serialize=True) | |
| logger.debug(f"Logger set up with log level: {log_level}") | |
| setup_uvicorn_logger() | |
| setup_gunicorn_logger() | |
| def setup_uvicorn_logger() -> None: | |
| loggers = (logging.getLogger(name) for name in logging.root.manager.loggerDict if name.startswith("uvicorn.")) | |
| for uvicorn_logger in loggers: | |
| uvicorn_logger.handlers = [] | |
| logging.getLogger("uvicorn").handlers = [InterceptHandler()] | |
| def setup_gunicorn_logger() -> None: | |
| logging.getLogger("gunicorn.error").handlers = [InterceptHandler()] | |
| logging.getLogger("gunicorn.access").handlers = [InterceptHandler()] | |
| class InterceptHandler(logging.Handler): | |
| """Default handler from examples in loguru documentation. | |
| See https://loguru.readthedocs.io/en/stable/overview.html#entirely-compatible-with-standard-logging. | |
| """ | |
| def emit(self, record) -> None: | |
| # Get corresponding Loguru level if it exists | |
| try: | |
| level = logger.level(record.levelname).name | |
| except ValueError: | |
| level = record.levelno | |
| # Find caller from where originated the logged message | |
| frame, depth = logging.currentframe(), 2 | |
| while frame.f_code.co_filename == logging.__file__ and frame.f_back: | |
| frame = frame.f_back | |
| depth += 1 | |
| logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) | |