""" Shared LLM Model Manager Single Qwen2.5-Coder-1.5B instance shared by NL translator and AI analysis Prevents duplicate model loading and memory waste OPTIMIZED FOR NON-BLOCKING OPERATION: - Async request submission (returns immediately) - Result polling (check if ready) - Request cancellation if game loop needs to continue """ import threading import queue import time from typing import Optional, Dict, Any, List, Tuple from pathlib import Path from enum import Enum try: from llama_cpp import Llama except ImportError: Llama = None class RequestStatus(Enum): """Status of an async request""" PENDING = "pending" # In queue, not yet processed PROCESSING = "processing" # Currently being processed COMPLETED = "completed" # Done, result available FAILED = "failed" # Error occurred CANCELLED = "cancelled" # Request was cancelled class AsyncRequest: """Represents an async LLM request""" def __init__(self, request_id: str, messages: List[Dict[str, str]], max_tokens: int, temperature: float): self.request_id = request_id self.messages = messages self.max_tokens = max_tokens self.temperature = temperature self.status = RequestStatus.PENDING self.result_text: Optional[str] = None self.error_message: Optional[str] = None self.submitted_at = time.time() self.completed_at: Optional[float] = None class SharedModelManager: """Thread-safe singleton manager for shared LLM model""" _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): # Only initialize once if hasattr(self, '_initialized'): return self._initialized = True self.model = None # type: Optional[Llama] self.model_path = None # type: Optional[str] self.model_loaded = False self.last_error = None # type: Optional[str] # Async request management self._request_queue = queue.Queue() # type: queue.Queue[AsyncRequest] self._requests = {} # type: Dict[str, AsyncRequest] self._requests_lock = threading.Lock() self._worker_thread = None # type: Optional[threading.Thread] self._stop_worker = False self._current_request_id: Optional[str] = None # Track what's being processed def load_model(self, model_path: str = "qwen2.5-coder-1.5b-instruct-q4_0.gguf") -> tuple[bool, Optional[str]]: """Load the shared model (thread-safe)""" with self._lock: if self.model_loaded and self.model_path == model_path: return True, None if Llama is None: self.last_error = "llama-cpp-python not installed" return False, self.last_error try: # Unload previous model if different if self.model is not None and self.model_path != model_path: del self.model self.model = None self.model_loaded = False # Load new model # Try /tmp/rts first (HuggingFace Space download location) tmp_path = Path("/tmp/rts") / model_path local_path = Path(__file__).parent / model_path if tmp_path.exists(): full_path = tmp_path elif local_path.exists(): full_path = local_path else: self.last_error = f"Model file not found: {model_path} (checked /tmp/rts/ and {Path(__file__).parent})" return False, self.last_error self.model = Llama( model_path=str(full_path), n_ctx=2048, # Reduced from 4096 for faster processing n_threads=1, # Prompt processing: 1 thread n_threads_batch=1, # Token generation: 1 thread (CRITICAL!) n_batch=256, # Increased from 128 for better throughput verbose=False, chat_format='qwen' ) self.model_path = model_path self.model_loaded = True self.last_error = None # Start worker thread if not running if self._worker_thread is None or not self._worker_thread.is_alive(): self._stop_worker = False self._worker_thread = threading.Thread(target=self._process_requests, daemon=True) self._worker_thread.start() return True, None except Exception as e: self.last_error = f"Failed to load model: {str(e)}" self.model_loaded = False return False, self.last_error def _process_requests(self): """Worker thread to process model requests sequentially (async-friendly)""" # Lower thread priority so game gets CPU preference import os try: os.nice(10) # Lower priority (0=normal, 19=lowest) print("๐Ÿ“‰ LLM worker thread priority lowered (nice +10)") except Exception as e: print(f"โš ๏ธ Could not lower thread priority: {e}") while not self._stop_worker: try: # Get request with timeout to check stop flag try: request = self._request_queue.get(timeout=0.5) except queue.Empty: continue if not isinstance(request, AsyncRequest): continue # Mark as processing with self._requests_lock: self._current_request_id = request.request_id request.status = RequestStatus.PROCESSING try: # Check model is loaded if not self.model_loaded or self.model is None: request.status = RequestStatus.FAILED request.error_message = 'Model not loaded' request.completed_at = time.time() continue # Process request (this is the blocking part) start_time = time.time() response = self.model.create_chat_completion( messages=request.messages, max_tokens=request.max_tokens, temperature=request.temperature, stream=False ) elapsed = time.time() - start_time # Extract text from response if response and 'choices' in response and len(response['choices']) > 0: text = response['choices'][0].get('message', {}).get('content', '') request.status = RequestStatus.COMPLETED request.result_text = text request.completed_at = time.time() print(f"โœ… LLM request completed in {elapsed:.2f}s") else: request.status = RequestStatus.FAILED request.error_message = 'Empty response from model' request.completed_at = time.time() except Exception as e: request.status = RequestStatus.FAILED request.error_message = f"Model inference error: {str(e)}" request.completed_at = time.time() print(f"โŒ LLM request failed: {e}") finally: with self._requests_lock: self._current_request_id = None except Exception as e: print(f"โŒ Worker thread error: {e}") time.sleep(0.1) def submit_async(self, messages: List[Dict[str, str]], max_tokens: int = 256, temperature: float = 0.7) -> str: """ Submit request asynchronously (non-blocking) Args: messages: List of {role, content} dicts max_tokens: Maximum tokens to generate temperature: Sampling temperature Returns: request_id: Use this to poll for results with get_result() """ if not self.model_loaded: raise RuntimeError("Model not loaded. Call load_model() first.") # Create unique request ID request_id = f"req_{int(time.time() * 1000000)}_{id(threading.current_thread())}" # Create request object request = AsyncRequest( request_id=request_id, messages=messages, max_tokens=max_tokens, temperature=temperature ) # Register and submit with self._requests_lock: self._requests[request_id] = request self._request_queue.put(request) print(f"๐Ÿ“ค LLM request submitted: {request_id}") return request_id def get_result(self, request_id: str, remove: bool = True) -> Tuple[RequestStatus, Optional[str], Optional[str]]: """ Check result of async request (non-blocking) Args: request_id: ID returned by submit_async() remove: If True, remove request after getting result Returns: (status, result_text, error_message) """ with self._requests_lock: request = self._requests.get(request_id) if request is None: return RequestStatus.FAILED, None, "Request not found (may have been cleaned up)" # Return current status status = request.status result_text = request.result_text error_message = request.error_message # Cleanup if requested and completed if remove and status in [RequestStatus.COMPLETED, RequestStatus.FAILED, RequestStatus.CANCELLED]: with self._requests_lock: self._requests.pop(request_id, None) return status, result_text, error_message def cancel_request(self, request_id: str) -> bool: """ Cancel a pending request (cannot cancel if already processing) Returns: True if cancelled, False if already processing/completed """ with self._requests_lock: request = self._requests.get(request_id) if request is None: return False # Can only cancel pending requests if request.status == RequestStatus.PENDING: request.status = RequestStatus.CANCELLED request.completed_at = time.time() return True return False def generate(self, messages: List[Dict[str, str]], max_tokens: int = 256, temperature: float = 0.7, max_wait: float = 300.0) -> tuple[bool, Optional[str], Optional[str]]: """ Generate response from model (blocking, for backward compatibility) NO TIMEOUT - waits for inference to complete naturally. Only cancelled if superseded by new request of same type. max_wait is a safety limit only. Args: messages: List of {role, content} dicts max_tokens: Maximum tokens to generate temperature: Sampling temperature max_wait: Safety limit in seconds (default 5min) Returns: (success, response_text, error_message) """ try: # Submit async request_id = self.submit_async(messages, max_tokens, temperature) # Poll for result (no timeout, wait for completion) start_time = time.time() while time.time() - start_time < max_wait: # Safety limit only status, result_text, error_message = self.get_result(request_id, remove=False) if status == RequestStatus.COMPLETED: # Cleanup and return self.get_result(request_id, remove=True) return True, result_text, None elif status == RequestStatus.FAILED: # Cleanup and return self.get_result(request_id, remove=True) return False, None, error_message elif status == RequestStatus.CANCELLED: self.get_result(request_id, remove=True) return False, None, "Request was cancelled by newer request" # Still pending/processing, wait a bit time.sleep(0.1) # Safety limit reached (model may be stuck) return False, None, f"Request exceeded safety limit ({max_wait}s) - model may be stuck" except Exception as e: return False, None, f"Error: {str(e)}" def cleanup_old_requests(self, max_age: float = 300.0): """ Remove completed/failed requests older than max_age seconds Args: max_age: Maximum age in seconds (default 5 minutes) """ now = time.time() with self._requests_lock: to_remove = [] for request_id, request in self._requests.items(): if request.completed_at is not None: age = now - request.completed_at if age > max_age: to_remove.append(request_id) for request_id in to_remove: self._requests.pop(request_id, None) if to_remove: print(f"๐Ÿงน Cleaned up {len(to_remove)} old LLM requests") def get_queue_status(self) -> Dict[str, Any]: """Get current queue status for monitoring""" with self._requests_lock: pending = sum(1 for r in self._requests.values() if r.status == RequestStatus.PENDING) processing = sum(1 for r in self._requests.values() if r.status == RequestStatus.PROCESSING) completed = sum(1 for r in self._requests.values() if r.status == RequestStatus.COMPLETED) failed = sum(1 for r in self._requests.values() if r.status == RequestStatus.FAILED) return { 'queue_size': self._request_queue.qsize(), 'total_requests': len(self._requests), 'pending': pending, 'processing': processing, 'completed': completed, 'failed': failed, 'current_request': self._current_request_id } def shutdown(self): """Cleanup resources""" self._stop_worker = True if self._worker_thread is not None: self._worker_thread.join(timeout=2.0) with self._lock: if self.model is not None: del self.model self.model = None self.model_loaded = False # Global singleton instance _shared_model_manager = SharedModelManager() def get_shared_model() -> SharedModelManager: """Get the shared model manager singleton""" return _shared_model_manager