| import asyncio | |
| from utils.logger import Logger | |
| logger = Logger.get_logger(__name__) | |
| class RequestCancellation: | |
| """ | |
| RequestCancellation middleware handles request canceling | |
| * In case of API routes where very frequent/expensive requests are made. | |
| """ | |
| def __init__(self, app): | |
| self.app = app | |
| async def __call__(self, scope, receive, send): | |
| if scope["type"] != "http": | |
| await self.app(scope, receive, send) | |
| return | |
| queue = asyncio.Queue() | |
| async def message_poller(sentinel, handler_task): | |
| nonlocal queue | |
| while True: | |
| message = await receive() | |
| if message["type"] == "http.disconnect": | |
| handler_task.cancel() | |
| return sentinel | |
| await queue.put(message) | |
| sentinel = object() | |
| handler_task = asyncio.create_task(self.app(scope, queue.get, send)) | |
| asyncio.create_task(message_poller(sentinel, handler_task)) | |
| try: | |
| return await handler_task | |
| except asyncio.CancelledError: | |
| logger.info('Task Cancellatation Requested.') | |