Spaces:
Running
Running
| import re | |
| import threading | |
| from contextlib import contextmanager | |
| from pathlib import Path | |
| from filelock import FileLock | |
| from platformdirs import user_cache_dir | |
| class KeyedMemoryLockManager: | |
| """A manager for acquiring and releasing memory locks based on a key.""" | |
| def __init__(self) -> None: | |
| self.locks: dict[str, threading.Lock] = {} | |
| self.global_lock = threading.Lock() | |
| def _get_lock(self, key: str): | |
| with self.global_lock: | |
| if key not in self.locks: | |
| self.locks[key] = threading.Lock() | |
| return self.locks[key] | |
| def lock(self, key: str): | |
| lock = self._get_lock(key) | |
| lock.acquire() | |
| try: | |
| yield | |
| finally: | |
| lock.release() | |
| class KeyedWorkerLockManager: | |
| """A manager for acquiring locks between workers based on a key.""" | |
| def __init__(self) -> None: | |
| self.locks_dir = Path(user_cache_dir("langflow"), ensure_exists=True) / "worker_locks" | |
| def _validate_key(self, key: str) -> bool: | |
| """Validate that the string only contains alphanumeric characters and underscores. | |
| Parameters: | |
| s (str): The string to validate. | |
| Returns: | |
| bool: True if the string is valid, False otherwise. | |
| """ | |
| pattern = re.compile(r"^\w+$") | |
| return bool(pattern.match(key)) | |
| def lock(self, key: str): | |
| if not self._validate_key(key): | |
| msg = f"Invalid key: {key}" | |
| raise ValueError(msg) | |
| lock = FileLock(self.locks_dir / key) | |
| with lock: | |
| yield | |