Spaces:
Sleeping
Sleeping
| """ | |
| Shared LLM Model Manager | |
| Single Qwen2.5-Coder-1.5B instance shared by NL translator and AI analysis | |
| Prevents duplicate model loading and memory waste | |
| """ | |
| import threading | |
| import queue | |
| import time | |
| from typing import Optional, Dict, Any, List | |
| from pathlib import Path | |
| try: | |
| from llama_cpp import Llama | |
| except ImportError: | |
| Llama = None | |
| class SharedModelManager: | |
| """Thread-safe singleton manager for shared LLM model""" | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __new__(cls): | |
| if cls._instance is None: | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| return cls._instance | |
| def __init__(self): | |
| # Only initialize once | |
| if hasattr(self, '_initialized'): | |
| return | |
| self._initialized = True | |
| self.model = None # type: Optional[Llama] | |
| self.model_path = None # type: Optional[str] | |
| self.model_loaded = False | |
| self.last_error = None # type: Optional[str] | |
| # Request queue for sequential access | |
| self._request_queue = queue.Queue() # type: queue.Queue | |
| self._result_queues = {} # type: Dict[int, queue.Queue] | |
| self._queue_lock = threading.Lock() | |
| self._worker_thread = None # type: Optional[threading.Thread] | |
| self._stop_worker = False | |
| def load_model(self, model_path: str = "qwen2.5-coder-1.5b-instruct-q4_0.gguf") -> tuple[bool, Optional[str]]: | |
| """Load the shared model (thread-safe)""" | |
| with self._lock: | |
| if self.model_loaded and self.model_path == model_path: | |
| return True, None | |
| if Llama is None: | |
| self.last_error = "llama-cpp-python not installed" | |
| return False, self.last_error | |
| try: | |
| # Unload previous model if different | |
| if self.model is not None and self.model_path != model_path: | |
| del self.model | |
| self.model = None | |
| self.model_loaded = False | |
| # Load new model | |
| full_path = Path(__file__).parent / model_path | |
| if not full_path.exists(): | |
| self.last_error = f"Model file not found: {model_path}" | |
| return False, self.last_error | |
| self.model = Llama( | |
| model_path=str(full_path), | |
| n_ctx=4096, | |
| n_threads=4, | |
| verbose=False, | |
| chat_format='qwen2' | |
| ) | |
| self.model_path = model_path | |
| self.model_loaded = True | |
| self.last_error = None | |
| # Start worker thread if not running | |
| if self._worker_thread is None or not self._worker_thread.is_alive(): | |
| self._stop_worker = False | |
| self._worker_thread = threading.Thread(target=self._process_requests, daemon=True) | |
| self._worker_thread.start() | |
| return True, None | |
| except Exception as e: | |
| self.last_error = f"Failed to load model: {str(e)}" | |
| self.model_loaded = False | |
| return False, self.last_error | |
| def _process_requests(self): | |
| """Worker thread to process model requests sequentially""" | |
| while not self._stop_worker: | |
| try: | |
| # Get request with timeout to check stop flag | |
| try: | |
| request = self._request_queue.get(timeout=0.5) | |
| except queue.Empty: | |
| continue | |
| request_id = request['id'] | |
| messages = request['messages'] | |
| max_tokens = request.get('max_tokens', 512) | |
| temperature = request.get('temperature', 0.7) | |
| # Get result queue for this request | |
| with self._queue_lock: | |
| result_queue = self._result_queues.get(request_id) | |
| if result_queue is None: | |
| continue | |
| try: | |
| # Check model is loaded | |
| if not self.model_loaded or self.model is None: | |
| result_queue.put({ | |
| 'status': 'error', | |
| 'message': 'Model not loaded' | |
| }) | |
| continue | |
| # Process request | |
| response = self.model.create_chat_completion( | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| stream=False | |
| ) | |
| # Extract text from response | |
| if response and 'choices' in response and len(response['choices']) > 0: | |
| text = response['choices'][0].get('message', {}).get('content', '') | |
| result_queue.put({ | |
| 'status': 'success', | |
| 'text': text | |
| }) | |
| else: | |
| result_queue.put({ | |
| 'status': 'error', | |
| 'message': 'Empty response from model' | |
| }) | |
| except Exception as e: | |
| result_queue.put({ | |
| 'status': 'error', | |
| 'message': f"Model inference error: {str(e)}" | |
| }) | |
| except Exception as e: | |
| print(f"Worker thread error: {e}") | |
| time.sleep(0.1) | |
| def generate(self, messages: List[Dict[str, str]], max_tokens: int = 512, | |
| temperature: float = 0.7, timeout: float = 30.0) -> tuple[bool, Optional[str], Optional[str]]: | |
| """ | |
| Generate response from model (thread-safe, queued) | |
| Args: | |
| messages: List of {role, content} dicts | |
| max_tokens: Maximum tokens to generate | |
| temperature: Sampling temperature | |
| timeout: Maximum wait time in seconds | |
| Returns: | |
| (success, response_text, error_message) | |
| """ | |
| if not self.model_loaded: | |
| return False, None, "Model not loaded. Call load_model() first." | |
| # Create request | |
| request_id = id(threading.current_thread()) + int(time.time() * 1000000) | |
| result_queue: queue.Queue = queue.Queue() | |
| # Register result queue | |
| with self._queue_lock: | |
| self._result_queues[request_id] = result_queue | |
| try: | |
| # Submit request | |
| self._request_queue.put({ | |
| 'id': request_id, | |
| 'messages': messages, | |
| 'max_tokens': max_tokens, | |
| 'temperature': temperature | |
| }) | |
| # Wait for result | |
| try: | |
| result = result_queue.get(timeout=timeout) | |
| except queue.Empty: | |
| return False, None, f"Request timeout after {timeout}s" | |
| if result['status'] == 'success': | |
| return True, result['text'], None | |
| else: | |
| return False, None, result.get('message', 'Unknown error') | |
| finally: | |
| # Cleanup result queue | |
| with self._queue_lock: | |
| self._result_queues.pop(request_id, None) | |
| def shutdown(self): | |
| """Cleanup resources""" | |
| self._stop_worker = True | |
| if self._worker_thread is not None: | |
| self._worker_thread.join(timeout=2.0) | |
| with self._lock: | |
| if self.model is not None: | |
| del self.model | |
| self.model = None | |
| self.model_loaded = False | |
| # Global singleton instance | |
| _shared_model_manager = SharedModelManager() | |
| def get_shared_model() -> SharedModelManager: | |
| """Get the shared model manager singleton""" | |
| return _shared_model_manager | |