""" Shared LLM Model Manager Single Qwen2.5-Coder-1.5B instance shared by NL translator and AI analysis Prevents duplicate model loading and memory waste """ import threading import queue import time from typing import Optional, Dict, Any, List from pathlib import Path try: from llama_cpp import Llama except ImportError: Llama = None class SharedModelManager: """Thread-safe singleton manager for shared LLM model""" _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): # Only initialize once if hasattr(self, '_initialized'): return self._initialized = True self.model = None # type: Optional[Llama] self.model_path = None # type: Optional[str] self.model_loaded = False self.last_error = None # type: Optional[str] # Request queue for sequential access self._request_queue = queue.Queue() # type: queue.Queue self._result_queues = {} # type: Dict[int, queue.Queue] self._queue_lock = threading.Lock() self._worker_thread = None # type: Optional[threading.Thread] self._stop_worker = False def load_model(self, model_path: str = "qwen2.5-coder-1.5b-instruct-q4_0.gguf") -> tuple[bool, Optional[str]]: """Load the shared model (thread-safe)""" with self._lock: if self.model_loaded and self.model_path == model_path: return True, None if Llama is None: self.last_error = "llama-cpp-python not installed" return False, self.last_error try: # Unload previous model if different if self.model is not None and self.model_path != model_path: del self.model self.model = None self.model_loaded = False # Load new model full_path = Path(__file__).parent / model_path if not full_path.exists(): self.last_error = f"Model file not found: {model_path}" return False, self.last_error self.model = Llama( model_path=str(full_path), n_ctx=4096, n_threads=4, verbose=False, chat_format='qwen2' ) self.model_path = model_path self.model_loaded = True self.last_error = None # Start worker thread if not running if self._worker_thread is None or not self._worker_thread.is_alive(): self._stop_worker = False self._worker_thread = threading.Thread(target=self._process_requests, daemon=True) self._worker_thread.start() return True, None except Exception as e: self.last_error = f"Failed to load model: {str(e)}" self.model_loaded = False return False, self.last_error def _process_requests(self): """Worker thread to process model requests sequentially""" while not self._stop_worker: try: # Get request with timeout to check stop flag try: request = self._request_queue.get(timeout=0.5) except queue.Empty: continue request_id = request['id'] messages = request['messages'] max_tokens = request.get('max_tokens', 512) temperature = request.get('temperature', 0.7) # Get result queue for this request with self._queue_lock: result_queue = self._result_queues.get(request_id) if result_queue is None: continue try: # Check model is loaded if not self.model_loaded or self.model is None: result_queue.put({ 'status': 'error', 'message': 'Model not loaded' }) continue # Process request response = self.model.create_chat_completion( messages=messages, max_tokens=max_tokens, temperature=temperature, stream=False ) # Extract text from response if response and 'choices' in response and len(response['choices']) > 0: text = response['choices'][0].get('message', {}).get('content', '') result_queue.put({ 'status': 'success', 'text': text }) else: result_queue.put({ 'status': 'error', 'message': 'Empty response from model' }) except Exception as e: result_queue.put({ 'status': 'error', 'message': f"Model inference error: {str(e)}" }) except Exception as e: print(f"Worker thread error: {e}") time.sleep(0.1) def generate(self, messages: List[Dict[str, str]], max_tokens: int = 512, temperature: float = 0.7, timeout: float = 30.0) -> tuple[bool, Optional[str], Optional[str]]: """ Generate response from model (thread-safe, queued) Args: messages: List of {role, content} dicts max_tokens: Maximum tokens to generate temperature: Sampling temperature timeout: Maximum wait time in seconds Returns: (success, response_text, error_message) """ if not self.model_loaded: return False, None, "Model not loaded. Call load_model() first." # Create request request_id = id(threading.current_thread()) + int(time.time() * 1000000) result_queue: queue.Queue = queue.Queue() # Register result queue with self._queue_lock: self._result_queues[request_id] = result_queue try: # Submit request self._request_queue.put({ 'id': request_id, 'messages': messages, 'max_tokens': max_tokens, 'temperature': temperature }) # Wait for result try: result = result_queue.get(timeout=timeout) except queue.Empty: return False, None, f"Request timeout after {timeout}s" if result['status'] == 'success': return True, result['text'], None else: return False, None, result.get('message', 'Unknown error') finally: # Cleanup result queue with self._queue_lock: self._result_queues.pop(request_id, None) def shutdown(self): """Cleanup resources""" self._stop_worker = True if self._worker_thread is not None: self._worker_thread.join(timeout=2.0) with self._lock: if self.model is not None: del self.model self.model = None self.model_loaded = False # Global singleton instance _shared_model_manager = SharedModelManager() def get_shared_model() -> SharedModelManager: """Get the shared model manager singleton""" return _shared_model_manager