rts-commander / ai_analysis.py
Luigi's picture
perf: Apply all thread optimizations + complete documentation
8e1770a
"""
AI Tactical Analysis System
Uses Qwen2.5-Coder-1.5B via shared model manager
ONLY uses the single shared LLM instance - NO separate process fallback
"""
import os
import re
import json
import time
from typing import Optional, Dict, Any, List
from pathlib import Path
# Import shared model manager (REQUIRED - no fallback)
from model_manager import get_shared_model
USE_SHARED_MODEL = True # Always true now
# Global model download status (polled by server for UI)
_MODEL_DOWNLOAD_STATUS: Dict[str, Any] = {
'status': 'idle', # idle | starting | downloading | retrying | done | error
'percent': 0,
'note': '',
'path': ''
}
def _update_model_download_status(update: Dict[str, Any]) -> None:
try:
_MODEL_DOWNLOAD_STATUS.update(update)
except Exception:
pass
def get_model_download_status() -> Dict[str, Any]:
return dict(_MODEL_DOWNLOAD_STATUS)
# =============================================================================
# SINGLE LLM ARCHITECTURE
# =============================================================================
# This module ONLY uses the shared model from model_manager.py
# OLD CODE REMOVED: _llama_worker() that loaded duplicate LLM in separate process
# That caused "falling back to process isolation" and severe lag
# Now: One model, loaded once, shared by all AI tasks ✅
# =============================================================================
class AIAnalyzer:
"""
AI Tactical Analysis System
Provides battlefield analysis using Qwen2.5-0.5B model.
Uses shared model manager to avoid duplicate loading with NL interface.
"""
def __init__(self, model_path: Optional[str] = None):
"""Initialize AI analyzer with model path"""
if model_path is None:
# Try default locations (existing files)
possible_paths = [
Path("./qwen2.5-coder-1.5b-instruct-q4_0.gguf"),
Path("../qwen2.5-coder-1.5b-instruct-q4_0.gguf"),
Path.home() / "rts" / "qwen2.5-coder-1.5b-instruct-q4_0.gguf",
Path.home() / ".cache" / "rts" / "qwen2.5-coder-1.5b-instruct-q4_0.gguf",
Path("/data/qwen2.5-coder-1.5b-instruct-q4_0.gguf"),
Path("/tmp/rts/qwen2.5-coder-1.5b-instruct-q4_0.gguf"),
]
for path in possible_paths:
try:
if path.exists():
model_path = str(path)
break
except Exception:
continue
self.model_path = model_path
self.model_available = model_path is not None and Path(model_path).exists()
# Use shared model manager if available
self.use_shared = USE_SHARED_MODEL
self.shared_model = None
self._current_analysis_request_id = None # Track current active analysis
if self.use_shared:
try:
self.shared_model = get_shared_model()
# Ensure model is loaded
if self.model_available and model_path:
success, error = self.shared_model.load_model(Path(model_path).name)
if success:
print(f"✓ AI Analysis using SHARED model: {Path(model_path).name}")
else:
print(f"⚠️ Failed to load shared model: {error}")
self.use_shared = False
except Exception as e:
print(f"⚠️ Shared model unavailable: {e}")
self.use_shared = False
if not self.model_available:
print(f"⚠️ AI Model not found. Attempting automatic download...")
# Try to download the model automatically
try:
import sys
import urllib.request
model_url = "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/resolve/main/qwen2.5-coder-1.5b-instruct-q4_0.gguf"
# Fallback URL (blob with download param)
alt_url = "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/blob/main/qwen2.5-coder-1.5b-instruct-q4_0.gguf?download=1"
# Choose a writable destination directory
filename = "qwen2.5-coder-1.5b-instruct-q4_0.gguf"
candidate_dirs = [
Path(os.getenv("RTS_MODEL_DIR", "")),
Path.cwd(),
Path(__file__).resolve().parent, # /web
Path(__file__).resolve().parent.parent, # repo root
Path.home() / "rts",
Path.home() / ".cache" / "rts",
Path("/data"),
Path("/tmp") / "rts",
]
default_path: Path = Path.cwd() / filename
for d in candidate_dirs:
try:
if not str(d):
continue
d.mkdir(parents=True, exist_ok=True)
test_file = d / (".write_test")
with open(test_file, 'w') as tf:
tf.write('ok')
test_file.unlink(missing_ok=True) # type: ignore[arg-type]
default_path = d / filename
break
except Exception:
continue
_update_model_download_status({
'status': 'starting',
'percent': 0,
'note': 'starting',
'path': str(default_path)
})
print(f"📦 Downloading model (~350 MB)...")
print(f" From: {model_url}")
print(f" To: {default_path}")
print(f" This may take a few minutes...")
# Simple progress callback
def progress_callback(block_num, block_size, total_size):
if total_size > 0 and block_num % 100 == 0:
downloaded = block_num * block_size
percent = min(100, (downloaded / total_size) * 100)
mb_downloaded = downloaded / (1024 * 1024)
mb_total = total_size / (1024 * 1024)
_update_model_download_status({
'status': 'downloading',
'percent': round(percent, 1),
'note': f"{mb_downloaded:.1f}/{mb_total:.1f} MB",
'path': str(default_path)
})
print(f" Progress: {percent:.1f}% ({mb_downloaded:.1f}/{mb_total:.1f} MB)", end='\r')
# Ensure destination directory exists (should already be validated)
try:
default_path.parent.mkdir(parents=True, exist_ok=True)
except Exception:
pass
success = False
for attempt in range(3):
try:
# Try urllib first
urllib.request.urlretrieve(model_url, default_path, reporthook=progress_callback)
success = True
break
except Exception:
# Fallback to requests streaming
# Attempt streaming with requests if available
used_requests = False
try:
try:
import requests # type: ignore
except Exception:
requests = None # type: ignore
if requests is not None: # type: ignore
with requests.get(model_url, stream=True, timeout=60) as r: # type: ignore
r.raise_for_status()
total = int(r.headers.get('Content-Length', 0))
downloaded = 0
with open(default_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024 * 1024): # 1MB
if not chunk:
continue
f.write(chunk)
downloaded += len(chunk)
if total > 0:
percent = min(100, downloaded * 100 / total)
_update_model_download_status({
'status': 'downloading',
'percent': round(percent, 1),
'note': f"{downloaded/1048576:.1f}/{total/1048576:.1f} MB",
'path': str(default_path)
})
print(f" Progress: {percent:.1f}% ({downloaded/1048576:.1f}/{total/1048576:.1f} MB)", end='\r')
success = True
used_requests = True
break
except Exception:
# ignore and try alternative below
pass
# Last chance this attempt: alternative URL via urllib
try:
urllib.request.urlretrieve(alt_url, default_path, reporthook=progress_callback)
success = True
break
except Exception as e:
wait = 2 ** attempt
_update_model_download_status({
'status': 'retrying',
'percent': 0,
'note': f"attempt {attempt+1} failed: {e}",
'path': str(default_path)
})
print(f" Download attempt {attempt+1}/3 failed: {e}. Retrying in {wait}s...")
time.sleep(wait)
print() # New line after progress
# Verify download
if success and default_path.exists():
size_mb = default_path.stat().st_size / (1024 * 1024)
print(f"✅ Model downloaded successfully! ({size_mb:.1f} MB)")
self.model_path = str(default_path)
self.model_available = True
_update_model_download_status({
'status': 'done',
'percent': 100,
'note': f"{size_mb:.1f} MB",
'path': str(default_path)
})
else:
print(f"❌ Download failed. Tactical analysis disabled.")
print(f" Manual download: https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF")
_update_model_download_status({
'status': 'error',
'percent': 0,
'note': 'download failed',
'path': str(default_path)
})
except Exception as e:
print(f"❌ Auto-download failed: {e}")
print(f" Tactical analysis disabled.")
print(f" Manual download: https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF")
_update_model_download_status({
'status': 'error',
'percent': 0,
'note': str(e),
'path': ''
})
def generate_response(
self,
prompt: str,
max_tokens: int = 256,
temperature: float = 0.7
) -> Dict[str, Any]:
"""
Generate a response from the model.
NO TIMEOUT - waits for inference to complete (showcases LLM ability).
Only cancelled if superseded by new analysis request.
Args:
prompt: Input prompt
max_tokens: Maximum tokens to generate
temperature: Sampling temperature
Returns:
Dict with status and data/message
"""
if not self.model_available:
return {'status': 'error', 'message': 'Model not loaded'}
# ONLY use shared model - NO fallback to separate process
if not (self.use_shared and self.shared_model and self.shared_model.model_loaded):
return {'status': 'error', 'message': 'Shared model not available'}
try:
# Cancel previous analysis if any (one active analysis at a time)
if self._current_analysis_request_id is not None:
self.shared_model.cancel_request(self._current_analysis_request_id)
print(f"🔄 Cancelled previous AI analysis request {self._current_analysis_request_id} (new analysis requested)")
messages = [
{"role": "user", "content": prompt}
]
# Submit request and wait for completion (no timeout)
success, response_text, error_message = self.shared_model.generate(
messages=messages,
max_tokens=max_tokens,
temperature=temperature
)
# Clear current request
self._current_analysis_request_id = None
if success and response_text:
# Try to parse as JSON
try:
cleaned = response_text.strip()
# Look for JSON in response
match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', cleaned, re.DOTALL)
if match:
parsed = json.loads(match.group(0))
return {'status': 'ok', 'data': parsed, 'raw': response_text}
else:
return {'status': 'ok', 'data': {'raw': response_text}, 'raw': response_text}
except:
return {'status': 'ok', 'data': {'raw': response_text}, 'raw': response_text}
else:
print(f"⚠️ Shared model error: {error_message} (will use heuristic analysis)")
return {'status': 'error', 'message': error_message or 'Generation failed'}
except Exception as e:
print(f"⚠️ Shared model exception: {e} (will use heuristic analysis)")
return {'status': 'error', 'message': f'Error: {str(e)}'}
def _heuristic_analysis(self, game_state: Dict, language_code: str) -> Dict[str, Any]:
"""Lightweight, deterministic analysis when LLM is unavailable."""
from localization import LOCALIZATION
lang = language_code or "en"
lang_name = LOCALIZATION.get_ai_language_name(lang)
player_units = sum(1 for u in game_state.get('units', {}).values() if u.get('player_id') == 0)
enemy_units = sum(1 for u in game_state.get('units', {}).values() if u.get('player_id') == 1)
player_buildings = sum(1 for b in game_state.get('buildings', {}).values() if b.get('player_id') == 0)
enemy_buildings = sum(1 for b in game_state.get('buildings', {}).values() if b.get('player_id') == 1)
player = game_state.get('players', {}).get(0, {})
credits = int(player.get('credits', 0) or 0)
power = int(player.get('power', 0) or 0)
power_cons = int(player.get('power_consumption', 0) or 0)
advantage = 'even'
score = (player_units - enemy_units) + 0.5 * (player_buildings - enemy_buildings)
if score > 1:
advantage = 'ahead'
elif score < -1:
advantage = 'behind'
# Localized templates (concise)
summaries = {
'en': {
'ahead': f"{lang_name}: You hold the initiative. Maintain pressure and expand.",
'even': f"{lang_name}: Battlefield is balanced. Scout and take map control.",
'behind': f"{lang_name}: You're under pressure. Stabilize and defend key assets.",
},
'fr': {
'ahead': f"{lang_name} : Vous avez l'initiative. Maintenez la pression et étendez-vous.",
'even': f"{lang_name} : Situation équilibrée. Éclairez et prenez le contrôle de la carte.",
'behind': f"{lang_name} : Sous pression. Stabilisez et défendez les actifs clés.",
},
'zh-TW': {
'ahead': f"{lang_name}:佔據主動。保持壓力並擴張。",
'even': f"{lang_name}:局勢均衡。偵察並掌控地圖。",
'behind': f"{lang_name}:處於劣勢。穩住陣腳並防守關鍵建築。",
}
}
summary = summaries.get(lang, summaries['en'])[advantage]
tips: List[str] = []
# Power management tips
if power_cons > 0 and power < power_cons:
tips.append({
'en': 'Build a Power Plant to restore production speed',
'fr': 'Construisez une centrale pour rétablir la production',
'zh-TW': '建造發電廠以恢復生產速度'
}.get(lang, 'Build a Power Plant to restore production speed'))
# Economy tips
if credits < 300:
tips.append({
'en': 'Protect Harvester and secure more ore',
'fr': 'Protégez le collecteur et sécurisez plus de minerai',
'zh-TW': '保護採礦車並確保更多礦石'
}.get(lang, 'Protect Harvester and secure more ore'))
# Army composition tips
if player_buildings > 0:
if player_units < enemy_units:
tips.append({
'en': 'Train Infantry and add Tanks for frontline',
'fr': 'Entraînez de l’infanterie et ajoutez des chars en première ligne',
'zh-TW': '訓練步兵並加入坦克作為前線'
}.get(lang, 'Train Infantry and add Tanks for frontline'))
else:
tips.append({
'en': 'Scout enemy base and pressure weak flanks',
'fr': 'Éclairez la base ennemie et mettez la pression sur les flancs faibles',
'zh-TW': '偵察敵方基地並壓制薄弱側翼'
}.get(lang, 'Scout enemy base and pressure weak flanks'))
# Defense tip if buildings disadvantage
if player_buildings < enemy_buildings:
tips.append({
'en': 'Fortify around HQ and key production buildings',
'fr': 'Fortifiez autour du QG et des bâtiments de production',
'zh-TW': '在總部與生產建築周圍加強防禦'
}.get(lang, 'Fortify around HQ and key production buildings'))
# Coach line
coach = {
'en': 'Keep your economy safe and strike when you see an opening.',
'fr': 'Protégez votre économie et frappez dès qu’une ouverture se présente.',
'zh-TW': '保護經濟,抓住機會果斷出擊。'
}.get(lang, 'Keep your economy safe and strike when you see an opening.')
return { 'summary': summary, 'tips': tips[:4] or ['Build more units'], 'coach': coach, 'source': 'heuristic' }
def summarize_combat_situation(
self,
game_state: Dict,
language_code: str = "en"
) -> Dict[str, Any]:
"""
Generate tactical analysis of current battle.
Args:
game_state: Current game state dictionary
language_code: Language for response (en, fr, zh-TW)
Returns:
Dict with keys: summary, tips, coach
"""
# If LLM is not available, return heuristic result
if not self.model_available:
return self._heuristic_analysis(game_state, language_code)
# Import here to avoid circular dependency
from localization import LOCALIZATION
language_name = LOCALIZATION.get_ai_language_name(language_code)
# Build tactical summary prompt
player_units = sum(1 for u in game_state.get('units', {}).values()
if u.get('player_id') == 0)
enemy_units = sum(1 for u in game_state.get('units', {}).values()
if u.get('player_id') == 1)
player_buildings = sum(1 for b in game_state.get('buildings', {}).values()
if b.get('player_id') == 0)
enemy_buildings = sum(1 for b in game_state.get('buildings', {}).values()
if b.get('player_id') == 1)
player_credits = game_state.get('players', {}).get(0, {}).get('credits', 0)
example_summary = LOCALIZATION.get_ai_example_summary(language_code)
prompt = (
f"You are an expert RTS (Red Alert style) commentator & coach. Return ONLY one <json>...</json> block.\n"
f"JSON keys: summary (string concise tactical overview), tips (array of 1-4 short imperative build/composition suggestions), coach (1 motivational/adaptive sentence).\n"
f"No additional keys. No text outside tags. Language: {language_name}.\n"
f"\n"
f"Battle state: Player {player_units} units vs Enemy {enemy_units} units. "
f"Player {player_buildings} buildings vs Enemy {enemy_buildings} buildings. "
f"Credits: {player_credits}.\n"
f"\n"
f"Example JSON:\n"
f'{{"summary": "{example_summary}", '
f'"tips": ["Build more tanks", "Defend north base", "Scout enemy position"], '
f'"coach": "You are doing well; keep pressure on the enemy."}}\n'
f"\n"
f"Generate tactical analysis in {language_name}:"
)
result = self.generate_response(
prompt=prompt,
max_tokens=150, # Reduced from 200 for faster generation
temperature=0.7
)
if result.get('status') != 'ok':
# Fallback to heuristic on error
return self._heuristic_analysis(game_state, language_code)
data = result.get('data', {})
# Try to extract fields from structured JSON first
summary = str(data.get('summary') or '').strip()
tips_raw = data.get('tips') or []
coach = str(data.get('coach') or '').strip()
# If no structured data, try to parse raw text
if not summary and 'raw' in data:
raw_text = str(data.get('raw', '')).strip()
# Use the first sentence or the whole text as summary
sentences = raw_text.split('.')
if sentences:
summary = sentences[0].strip() + '.'
else:
summary = raw_text[:150] # Max 150 chars
# Try to extract tips from remaining text
# Look for patterns like "Build X", "Defend Y", etc.
import re
tip_patterns = [
r'Build [^.]+',
r'Defend [^.]+',
r'Attack [^.]+',
r'Scout [^.]+',
r'Expand [^.]+',
r'Protect [^.]+',
r'Train [^.]+',
r'Produce [^.]+',
]
found_tips = []
for pattern in tip_patterns:
matches = re.findall(pattern, raw_text, re.IGNORECASE)
found_tips.extend(matches[:2]) # Max 2 per pattern
if found_tips:
tips_raw = found_tips[:4] # Max 4 tips
# Use remaining text as coach message
if len(sentences) > 1:
coach = '. '.join(sentences[1:3]).strip() # 2nd and 3rd sentences
# Validate tips is array
tips = []
if isinstance(tips_raw, list):
for tip in tips_raw:
if isinstance(tip, str):
tips.append(tip.strip())
# Fallbacks
if not summary or not tips or not coach:
fallback = self._heuristic_analysis(game_state, language_code)
summary = summary or fallback['summary']
tips = tips or fallback['tips']
coach = coach or fallback['coach']
return {
'summary': summary,
'tips': tips[:4], # Max 4 tips
'coach': coach,
'source': 'llm'
}
# Singleton instance (lazy initialization)
_ai_analyzer_instance: Optional[AIAnalyzer] = None
def get_ai_analyzer() -> AIAnalyzer:
"""Get singleton AI analyzer instance"""
global _ai_analyzer_instance
if _ai_analyzer_instance is None:
_ai_analyzer_instance = AIAnalyzer()
return _ai_analyzer_instance