Spaces:
Running
Running
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| import logging | |
| import os | |
| from queue import Queue, Empty | |
| import signal | |
| import sys | |
| import threading | |
| import traceback | |
| logger = logging.getLogger(__name__) | |
| class DeadlockDetect: | |
| def __init__(self, use: bool = False, timeout: float = 120.): | |
| self.use = use | |
| self.timeout = timeout | |
| self._queue: Queue = Queue() | |
| def update(self, stage: str): | |
| if self.use: | |
| self._queue.put(stage) | |
| def __enter__(self): | |
| if self.use: | |
| self._thread = threading.Thread(target=self._detector_thread) | |
| self._thread.start() | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| if self.use: | |
| self._queue.put(None) | |
| self._thread.join() | |
| def _detector_thread(self): | |
| logger.debug("Deadlock detector started") | |
| last_stage = "init" | |
| while True: | |
| try: | |
| stage = self._queue.get(timeout=self.timeout) | |
| except Empty: | |
| break | |
| if stage is None: | |
| logger.debug("Exiting deadlock detector thread") | |
| return | |
| else: | |
| last_stage = stage | |
| logger.error("Deadlock detector timed out, last stage was %s", last_stage) | |
| for th in threading.enumerate(): | |
| print(th, file=sys.stderr) | |
| traceback.print_stack(sys._current_frames()[th.ident]) | |
| print(file=sys.stderr) | |
| sys.stdout.flush() | |
| sys.stderr.flush() | |
| os.kill(os.getpid(), signal.SIGKILL) | |