rts-commander / model_manager.py
Luigi's picture
fix: Shared LLM model manager and UI positioning
1e98ab1
raw
history blame
8.37 kB
"""
Shared LLM Model Manager
Single Qwen2.5-Coder-1.5B instance shared by NL translator and AI analysis
Prevents duplicate model loading and memory waste
"""
import threading
import queue
import time
from typing import Optional, Dict, Any, List
from pathlib import Path
try:
from llama_cpp import Llama
except ImportError:
Llama = None
class SharedModelManager:
"""Thread-safe singleton manager for shared LLM model"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
# Only initialize once
if hasattr(self, '_initialized'):
return
self._initialized = True
self.model = None # type: Optional[Llama]
self.model_path = None # type: Optional[str]
self.model_loaded = False
self.last_error = None # type: Optional[str]
# Request queue for sequential access
self._request_queue = queue.Queue() # type: queue.Queue
self._result_queues = {} # type: Dict[int, queue.Queue]
self._queue_lock = threading.Lock()
self._worker_thread = None # type: Optional[threading.Thread]
self._stop_worker = False
def load_model(self, model_path: str = "qwen2.5-coder-1.5b-instruct-q4_0.gguf") -> tuple[bool, Optional[str]]:
"""Load the shared model (thread-safe)"""
with self._lock:
if self.model_loaded and self.model_path == model_path:
return True, None
if Llama is None:
self.last_error = "llama-cpp-python not installed"
return False, self.last_error
try:
# Unload previous model if different
if self.model is not None and self.model_path != model_path:
del self.model
self.model = None
self.model_loaded = False
# Load new model
full_path = Path(__file__).parent / model_path
if not full_path.exists():
self.last_error = f"Model file not found: {model_path}"
return False, self.last_error
self.model = Llama(
model_path=str(full_path),
n_ctx=4096,
n_threads=4,
verbose=False,
chat_format='qwen2'
)
self.model_path = model_path
self.model_loaded = True
self.last_error = None
# Start worker thread if not running
if self._worker_thread is None or not self._worker_thread.is_alive():
self._stop_worker = False
self._worker_thread = threading.Thread(target=self._process_requests, daemon=True)
self._worker_thread.start()
return True, None
except Exception as e:
self.last_error = f"Failed to load model: {str(e)}"
self.model_loaded = False
return False, self.last_error
def _process_requests(self):
"""Worker thread to process model requests sequentially"""
while not self._stop_worker:
try:
# Get request with timeout to check stop flag
try:
request = self._request_queue.get(timeout=0.5)
except queue.Empty:
continue
request_id = request['id']
messages = request['messages']
max_tokens = request.get('max_tokens', 512)
temperature = request.get('temperature', 0.7)
# Get result queue for this request
with self._queue_lock:
result_queue = self._result_queues.get(request_id)
if result_queue is None:
continue
try:
# Check model is loaded
if not self.model_loaded or self.model is None:
result_queue.put({
'status': 'error',
'message': 'Model not loaded'
})
continue
# Process request
response = self.model.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
stream=False
)
# Extract text from response
if response and 'choices' in response and len(response['choices']) > 0:
text = response['choices'][0].get('message', {}).get('content', '')
result_queue.put({
'status': 'success',
'text': text
})
else:
result_queue.put({
'status': 'error',
'message': 'Empty response from model'
})
except Exception as e:
result_queue.put({
'status': 'error',
'message': f"Model inference error: {str(e)}"
})
except Exception as e:
print(f"Worker thread error: {e}")
time.sleep(0.1)
def generate(self, messages: List[Dict[str, str]], max_tokens: int = 512,
temperature: float = 0.7, timeout: float = 30.0) -> tuple[bool, Optional[str], Optional[str]]:
"""
Generate response from model (thread-safe, queued)
Args:
messages: List of {role, content} dicts
max_tokens: Maximum tokens to generate
temperature: Sampling temperature
timeout: Maximum wait time in seconds
Returns:
(success, response_text, error_message)
"""
if not self.model_loaded:
return False, None, "Model not loaded. Call load_model() first."
# Create request
request_id = id(threading.current_thread()) + int(time.time() * 1000000)
result_queue: queue.Queue = queue.Queue()
# Register result queue
with self._queue_lock:
self._result_queues[request_id] = result_queue
try:
# Submit request
self._request_queue.put({
'id': request_id,
'messages': messages,
'max_tokens': max_tokens,
'temperature': temperature
})
# Wait for result
try:
result = result_queue.get(timeout=timeout)
except queue.Empty:
return False, None, f"Request timeout after {timeout}s"
if result['status'] == 'success':
return True, result['text'], None
else:
return False, None, result.get('message', 'Unknown error')
finally:
# Cleanup result queue
with self._queue_lock:
self._result_queues.pop(request_id, None)
def shutdown(self):
"""Cleanup resources"""
self._stop_worker = True
if self._worker_thread is not None:
self._worker_thread.join(timeout=2.0)
with self._lock:
if self.model is not None:
del self.model
self.model = None
self.model_loaded = False
# Global singleton instance
_shared_model_manager = SharedModelManager()
def get_shared_model() -> SharedModelManager:
"""Get the shared model manager singleton"""
return _shared_model_manager