Spaces:
Paused
Paused
| """Various base classes.""" | |
| from types import coroutine | |
| from collections.abc import Coroutine | |
| from asyncio import get_running_loop | |
| class AsyncBase: | |
| def __init__(self, file, loop, executor): | |
| self._file = file | |
| self._executor = executor | |
| self._ref_loop = loop | |
| def _loop(self): | |
| return self._ref_loop or get_running_loop() | |
| def __aiter__(self): | |
| """We are our own iterator.""" | |
| return self | |
| def __repr__(self): | |
| return super().__repr__() + " wrapping " + repr(self._file) | |
| async def __anext__(self): | |
| """Simulate normal file iteration.""" | |
| line = await self.readline() | |
| if line: | |
| return line | |
| else: | |
| raise StopAsyncIteration | |
| class AsyncIndirectBase(AsyncBase): | |
| def __init__(self, name, loop, executor, indirect): | |
| self._indirect = indirect | |
| self._name = name | |
| super().__init__(None, loop, executor) | |
| def _file(self): | |
| return self._indirect() | |
| def _file(self, v): | |
| pass # discard writes | |
| class _ContextManager(Coroutine): | |
| __slots__ = ("_coro", "_obj") | |
| def __init__(self, coro): | |
| self._coro = coro | |
| self._obj = None | |
| def send(self, value): | |
| return self._coro.send(value) | |
| def throw(self, typ, val=None, tb=None): | |
| if val is None: | |
| return self._coro.throw(typ) | |
| elif tb is None: | |
| return self._coro.throw(typ, val) | |
| else: | |
| return self._coro.throw(typ, val, tb) | |
| def close(self): | |
| return self._coro.close() | |
| def gi_frame(self): | |
| return self._coro.gi_frame | |
| def gi_running(self): | |
| return self._coro.gi_running | |
| def gi_code(self): | |
| return self._coro.gi_code | |
| def __next__(self): | |
| return self.send(None) | |
| def __iter__(self): | |
| resp = yield from self._coro | |
| return resp | |
| def __await__(self): | |
| resp = yield from self._coro | |
| return resp | |
| async def __anext__(self): | |
| resp = await self._coro | |
| return resp | |
| async def __aenter__(self): | |
| self._obj = await self._coro | |
| return self._obj | |
| async def __aexit__(self, exc_type, exc, tb): | |
| self._obj.close() | |
| self._obj = None | |
| class AiofilesContextManager(_ContextManager): | |
| """An adjusted async context manager for aiofiles.""" | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| await get_running_loop().run_in_executor( | |
| None, self._obj._file.__exit__, exc_type, exc_val, exc_tb | |
| ) | |
| self._obj = None | |