Spaces:
Paused
Paused
| # +-----------------------------------------------+ | |
| # | | | |
| # | Give Feedback / Get Help | | |
| # | https://github.com/BerriAI/litellm/issues/new | | |
| # | | | |
| # +-----------------------------------------------+ | |
| # | |
| # Thank you users! We ❤️ you! - Krrish & Ishaan | |
| """ | |
| Module containing "timeout" decorator for sync and async callables. | |
| """ | |
| import asyncio | |
| from concurrent import futures | |
| from inspect import iscoroutinefunction | |
| from functools import wraps | |
| from threading import Thread | |
| from litellm.exceptions import Timeout | |
| def timeout(timeout_duration: float = 0.0, exception_to_raise=Timeout): | |
| """ | |
| Wraps a function to raise the specified exception if execution time | |
| is greater than the specified timeout. | |
| Works with both synchronous and asynchronous callables, but with synchronous ones will introduce | |
| some overhead due to the backend use of threads and asyncio. | |
| :param float timeout_duration: Timeout duration in seconds. If none callable won't time out. | |
| :param OpenAIError exception_to_raise: Exception to raise when the callable times out. | |
| Defaults to TimeoutError. | |
| :return: The decorated function. | |
| :rtype: callable | |
| """ | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| async def async_func(): | |
| return func(*args, **kwargs) | |
| thread = _LoopWrapper() | |
| thread.start() | |
| future = asyncio.run_coroutine_threadsafe(async_func(), thread.loop) | |
| local_timeout_duration = timeout_duration | |
| if "force_timeout" in kwargs and kwargs["force_timeout"] is not None: | |
| local_timeout_duration = kwargs["force_timeout"] | |
| elif "request_timeout" in kwargs and kwargs["request_timeout"] is not None: | |
| local_timeout_duration = kwargs["request_timeout"] | |
| try: | |
| result = future.result(timeout=local_timeout_duration) | |
| except futures.TimeoutError: | |
| thread.stop_loop() | |
| model = args[0] if len(args) > 0 else kwargs["model"] | |
| raise exception_to_raise( | |
| f"A timeout error occurred. The function call took longer than {local_timeout_duration} second(s).", | |
| model=model, # [TODO]: replace with logic for parsing out llm provider from model name | |
| llm_provider="openai", | |
| ) | |
| thread.stop_loop() | |
| return result | |
| async def async_wrapper(*args, **kwargs): | |
| local_timeout_duration = timeout_duration | |
| if "force_timeout" in kwargs: | |
| local_timeout_duration = kwargs["force_timeout"] | |
| elif "request_timeout" in kwargs and kwargs["request_timeout"] is not None: | |
| local_timeout_duration = kwargs["request_timeout"] | |
| try: | |
| value = await asyncio.wait_for( | |
| func(*args, **kwargs), timeout=timeout_duration | |
| ) | |
| return value | |
| except asyncio.TimeoutError: | |
| model = args[0] if len(args) > 0 else kwargs["model"] | |
| raise exception_to_raise( | |
| f"A timeout error occurred. The function call took longer than {local_timeout_duration} second(s).", | |
| model=model, # [TODO]: replace with logic for parsing out llm provider from model name | |
| llm_provider="openai", | |
| ) | |
| if iscoroutinefunction(func): | |
| return async_wrapper | |
| return wrapper | |
| return decorator | |
| class _LoopWrapper(Thread): | |
| def __init__(self): | |
| super().__init__(daemon=True) | |
| self.loop = asyncio.new_event_loop() | |
| def run(self) -> None: | |
| try: | |
| self.loop.run_forever() | |
| self.loop.call_soon_threadsafe(self.loop.close) | |
| except Exception as e: | |
| # Log exception here | |
| pass | |
| finally: | |
| self.loop.close() | |
| asyncio.set_event_loop(None) | |
| def stop_loop(self): | |
| for task in asyncio.all_tasks(self.loop): | |
| task.cancel() | |
| self.loop.call_soon_threadsafe(self.loop.stop) | |