| from typing import Dict, Union | |
| from uuid import UUID | |
| import asyncio | |
| from PIL import Image | |
| from typing import Dict, Union | |
| from PIL import Image | |
| InputParams = dict | |
| UserId = UUID | |
| EventDataContent = Dict[str, InputParams] | |
| class UserDataEvent: | |
| def __init__(self): | |
| self.data_event = asyncio.Event() | |
| self.data_content: EventDataContent = {} | |
| def update_data(self, new_data: EventDataContent): | |
| self.data_content = new_data | |
| self.data_event.set() | |
| async def wait_for_data(self) -> EventDataContent: | |
| await self.data_event.wait() | |
| self.data_event.clear() | |
| return self.data_content | |
| UserDataEventMap = Dict[UserId, UserDataEvent] | |
| user_data_events: UserDataEventMap = {} | |