Spaces:
Running
Running
| import queue | |
| import asyncio | |
| class IteratorPipe: | |
| """ | |
| Iterator Pipe creates an iterator that can be fed in data from another block of code or thread of execution | |
| """ | |
| def __init__(self, sentinel=object()): | |
| self._q = queue.Queue() | |
| self._sentinel = sentinel | |
| self._sentinel_pushed = False | |
| self._closed = False | |
| def __iter__(self): | |
| return self | |
| def __next__(self): | |
| if self._closed: | |
| raise StopIteration | |
| data = self._q.get(block=True) | |
| if data is self._sentinel: | |
| self._closed = True | |
| raise StopIteration | |
| return data | |
| def put(self, data) -> bool: | |
| """ | |
| Pushes next item to Iterator and returns True | |
| If iterator has been closed via close(), doesn't push anything and returns False | |
| """ | |
| if self._sentinel_pushed: | |
| return False | |
| self._q.put(data) | |
| return True | |
| def close(self): | |
| """ | |
| Close is idempotent. Calling close multiple times is safe | |
| Iterator will raise StopIteration only after all elements pushed before close have been iterated | |
| """ | |
| # make close idempotent | |
| if not self._sentinel_pushed: | |
| self._sentinel_pushed = True | |
| self._q.put(self._sentinel) | |
| class AsyncIteratorPipe: | |
| def __init__(self, sentinel=object()): | |
| self._q = asyncio.Queue() | |
| self._sentinel = sentinel | |
| self._sentinel_pushed = False | |
| self._closed = False | |
| def __aiter__(self): | |
| return self | |
| async def __anext__(self): | |
| if self._closed: | |
| raise StopAsyncIteration | |
| data = await self._q.get() | |
| if data is self._sentinel: | |
| self._closed = True | |
| raise StopAsyncIteration | |
| return data | |
| async def put(self, data) -> bool: | |
| """ | |
| Pushes next item to Iterator and returns True | |
| If iterator has been closed via close(), doesn't push anything and returns False | |
| """ | |
| if self._sentinel_pushed: | |
| return False | |
| await self._q.put(data) | |
| return True | |
| async def close(self): | |
| """ | |
| Close is idempotent. Calling close multiple times is safe | |
| Iterator will raise StopIteration only after all elements pushed before close have been iterated | |
| """ | |
| # make close idempotent | |
| if not self._sentinel_pushed: | |
| self._sentinel_pushed = True | |
| await self._q.put(self._sentinel) | |