""" AI Tactical Analysis System Uses Qwen2.5-Coder-1.5B via shared model manager ONLY uses the single shared LLM instance - NO separate process fallback """ import os import re import json import time from typing import Optional, Dict, Any, List from pathlib import Path # Import shared model manager (REQUIRED - no fallback) from model_manager import get_shared_model USE_SHARED_MODEL = True # Always true now # Global model download status (polled by server for UI) _MODEL_DOWNLOAD_STATUS: Dict[str, Any] = { 'status': 'idle', # idle | starting | downloading | retrying | done | error 'percent': 0, 'note': '', 'path': '' } def _update_model_download_status(update: Dict[str, Any]) -> None: try: _MODEL_DOWNLOAD_STATUS.update(update) except Exception: pass def get_model_download_status() -> Dict[str, Any]: return dict(_MODEL_DOWNLOAD_STATUS) # ============================================================================= # SINGLE LLM ARCHITECTURE # ============================================================================= # This module ONLY uses the shared model from model_manager.py # OLD CODE REMOVED: _llama_worker() that loaded duplicate LLM in separate process # That caused "falling back to process isolation" and severe lag # Now: One model, loaded once, shared by all AI tasks ✅ # ============================================================================= class AIAnalyzer: """ AI Tactical Analysis System Provides battlefield analysis using Qwen2.5-0.5B model. Uses shared model manager to avoid duplicate loading with NL interface. """ def __init__(self, model_path: Optional[str] = None): """Initialize AI analyzer with model path""" if model_path is None: # Try default locations (existing files) possible_paths = [ Path("./qwen2.5-coder-1.5b-instruct-q4_0.gguf"), Path("../qwen2.5-coder-1.5b-instruct-q4_0.gguf"), Path.home() / "rts" / "qwen2.5-coder-1.5b-instruct-q4_0.gguf", Path.home() / ".cache" / "rts" / "qwen2.5-coder-1.5b-instruct-q4_0.gguf", Path("/data/qwen2.5-coder-1.5b-instruct-q4_0.gguf"), Path("/tmp/rts/qwen2.5-coder-1.5b-instruct-q4_0.gguf"), ] for path in possible_paths: try: if path.exists(): model_path = str(path) break except Exception: continue self.model_path = model_path self.model_available = model_path is not None and Path(model_path).exists() # Use shared model manager if available self.use_shared = USE_SHARED_MODEL self.shared_model = None self._current_analysis_request_id = None # Track current active analysis if self.use_shared: try: self.shared_model = get_shared_model() # Ensure model is loaded if self.model_available and model_path: success, error = self.shared_model.load_model(Path(model_path).name) if success: print(f"✓ AI Analysis using SHARED model: {Path(model_path).name}") else: print(f"⚠️ Failed to load shared model: {error}") self.use_shared = False except Exception as e: print(f"⚠️ Shared model unavailable: {e}") self.use_shared = False if not self.model_available: print(f"⚠️ AI Model not found. Attempting automatic download...") # Try to download the model automatically try: import sys import urllib.request model_url = "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/resolve/main/qwen2.5-coder-1.5b-instruct-q4_0.gguf" # Fallback URL (blob with download param) alt_url = "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/blob/main/qwen2.5-coder-1.5b-instruct-q4_0.gguf?download=1" # Choose a writable destination directory filename = "qwen2.5-coder-1.5b-instruct-q4_0.gguf" candidate_dirs = [ Path(os.getenv("RTS_MODEL_DIR", "")), Path.cwd(), Path(__file__).resolve().parent, # /web Path(__file__).resolve().parent.parent, # repo root Path.home() / "rts", Path.home() / ".cache" / "rts", Path("/data"), Path("/tmp") / "rts", ] default_path: Path = Path.cwd() / filename for d in candidate_dirs: try: if not str(d): continue d.mkdir(parents=True, exist_ok=True) test_file = d / (".write_test") with open(test_file, 'w') as tf: tf.write('ok') test_file.unlink(missing_ok=True) # type: ignore[arg-type] default_path = d / filename break except Exception: continue _update_model_download_status({ 'status': 'starting', 'percent': 0, 'note': 'starting', 'path': str(default_path) }) print(f"📦 Downloading model (~350 MB)...") print(f" From: {model_url}") print(f" To: {default_path}") print(f" This may take a few minutes...") # Simple progress callback def progress_callback(block_num, block_size, total_size): if total_size > 0 and block_num % 100 == 0: downloaded = block_num * block_size percent = min(100, (downloaded / total_size) * 100) mb_downloaded = downloaded / (1024 * 1024) mb_total = total_size / (1024 * 1024) _update_model_download_status({ 'status': 'downloading', 'percent': round(percent, 1), 'note': f"{mb_downloaded:.1f}/{mb_total:.1f} MB", 'path': str(default_path) }) print(f" Progress: {percent:.1f}% ({mb_downloaded:.1f}/{mb_total:.1f} MB)", end='\r') # Ensure destination directory exists (should already be validated) try: default_path.parent.mkdir(parents=True, exist_ok=True) except Exception: pass success = False for attempt in range(3): try: # Try urllib first urllib.request.urlretrieve(model_url, default_path, reporthook=progress_callback) success = True break except Exception: # Fallback to requests streaming # Attempt streaming with requests if available used_requests = False try: try: import requests # type: ignore except Exception: requests = None # type: ignore if requests is not None: # type: ignore with requests.get(model_url, stream=True, timeout=60) as r: # type: ignore r.raise_for_status() total = int(r.headers.get('Content-Length', 0)) downloaded = 0 with open(default_path, 'wb') as f: for chunk in r.iter_content(chunk_size=1024 * 1024): # 1MB if not chunk: continue f.write(chunk) downloaded += len(chunk) if total > 0: percent = min(100, downloaded * 100 / total) _update_model_download_status({ 'status': 'downloading', 'percent': round(percent, 1), 'note': f"{downloaded/1048576:.1f}/{total/1048576:.1f} MB", 'path': str(default_path) }) print(f" Progress: {percent:.1f}% ({downloaded/1048576:.1f}/{total/1048576:.1f} MB)", end='\r') success = True used_requests = True break except Exception: # ignore and try alternative below pass # Last chance this attempt: alternative URL via urllib try: urllib.request.urlretrieve(alt_url, default_path, reporthook=progress_callback) success = True break except Exception as e: wait = 2 ** attempt _update_model_download_status({ 'status': 'retrying', 'percent': 0, 'note': f"attempt {attempt+1} failed: {e}", 'path': str(default_path) }) print(f" Download attempt {attempt+1}/3 failed: {e}. Retrying in {wait}s...") time.sleep(wait) print() # New line after progress # Verify download if success and default_path.exists(): size_mb = default_path.stat().st_size / (1024 * 1024) print(f"✅ Model downloaded successfully! ({size_mb:.1f} MB)") self.model_path = str(default_path) self.model_available = True _update_model_download_status({ 'status': 'done', 'percent': 100, 'note': f"{size_mb:.1f} MB", 'path': str(default_path) }) else: print(f"❌ Download failed. Tactical analysis disabled.") print(f" Manual download: https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF") _update_model_download_status({ 'status': 'error', 'percent': 0, 'note': 'download failed', 'path': str(default_path) }) except Exception as e: print(f"❌ Auto-download failed: {e}") print(f" Tactical analysis disabled.") print(f" Manual download: https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF") _update_model_download_status({ 'status': 'error', 'percent': 0, 'note': str(e), 'path': '' }) def generate_response( self, prompt: str, max_tokens: int = 256, temperature: float = 0.7 ) -> Dict[str, Any]: """ Generate a response from the model. NO TIMEOUT - waits for inference to complete (showcases LLM ability). Only cancelled if superseded by new analysis request. Args: prompt: Input prompt max_tokens: Maximum tokens to generate temperature: Sampling temperature Returns: Dict with status and data/message """ if not self.model_available: return {'status': 'error', 'message': 'Model not loaded'} # ONLY use shared model - NO fallback to separate process if not (self.use_shared and self.shared_model and self.shared_model.model_loaded): return {'status': 'error', 'message': 'Shared model not available'} try: # Cancel previous analysis if any (one active analysis at a time) if self._current_analysis_request_id is not None: self.shared_model.cancel_request(self._current_analysis_request_id) print(f"🔄 Cancelled previous AI analysis request {self._current_analysis_request_id} (new analysis requested)") messages = [ {"role": "user", "content": prompt} ] # Submit request and wait for completion (no timeout) success, response_text, error_message = self.shared_model.generate( messages=messages, max_tokens=max_tokens, temperature=temperature ) # Clear current request self._current_analysis_request_id = None if success and response_text: # Try to parse as JSON try: cleaned = response_text.strip() # Look for JSON in response match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', cleaned, re.DOTALL) if match: parsed = json.loads(match.group(0)) return {'status': 'ok', 'data': parsed, 'raw': response_text} else: return {'status': 'ok', 'data': {'raw': response_text}, 'raw': response_text} except: return {'status': 'ok', 'data': {'raw': response_text}, 'raw': response_text} else: print(f"⚠️ Shared model error: {error_message} (will use heuristic analysis)") return {'status': 'error', 'message': error_message or 'Generation failed'} except Exception as e: print(f"⚠️ Shared model exception: {e} (will use heuristic analysis)") return {'status': 'error', 'message': f'Error: {str(e)}'} def _heuristic_analysis(self, game_state: Dict, language_code: str) -> Dict[str, Any]: """Lightweight, deterministic analysis when LLM is unavailable.""" from localization import LOCALIZATION lang = language_code or "en" lang_name = LOCALIZATION.get_ai_language_name(lang) player_units = sum(1 for u in game_state.get('units', {}).values() if u.get('player_id') == 0) enemy_units = sum(1 for u in game_state.get('units', {}).values() if u.get('player_id') == 1) player_buildings = sum(1 for b in game_state.get('buildings', {}).values() if b.get('player_id') == 0) enemy_buildings = sum(1 for b in game_state.get('buildings', {}).values() if b.get('player_id') == 1) player = game_state.get('players', {}).get(0, {}) credits = int(player.get('credits', 0) or 0) power = int(player.get('power', 0) or 0) power_cons = int(player.get('power_consumption', 0) or 0) advantage = 'even' score = (player_units - enemy_units) + 0.5 * (player_buildings - enemy_buildings) if score > 1: advantage = 'ahead' elif score < -1: advantage = 'behind' # Localized templates (concise) summaries = { 'en': { 'ahead': f"{lang_name}: You hold the initiative. Maintain pressure and expand.", 'even': f"{lang_name}: Battlefield is balanced. Scout and take map control.", 'behind': f"{lang_name}: You're under pressure. Stabilize and defend key assets.", }, 'fr': { 'ahead': f"{lang_name} : Vous avez l'initiative. Maintenez la pression et étendez-vous.", 'even': f"{lang_name} : Situation équilibrée. Éclairez et prenez le contrôle de la carte.", 'behind': f"{lang_name} : Sous pression. Stabilisez et défendez les actifs clés.", }, 'zh-TW': { 'ahead': f"{lang_name}:佔據主動。保持壓力並擴張。", 'even': f"{lang_name}:局勢均衡。偵察並掌控地圖。", 'behind': f"{lang_name}:處於劣勢。穩住陣腳並防守關鍵建築。", } } summary = summaries.get(lang, summaries['en'])[advantage] tips: List[str] = [] # Power management tips if power_cons > 0 and power < power_cons: tips.append({ 'en': 'Build a Power Plant to restore production speed', 'fr': 'Construisez une centrale pour rétablir la production', 'zh-TW': '建造發電廠以恢復生產速度' }.get(lang, 'Build a Power Plant to restore production speed')) # Economy tips if credits < 300: tips.append({ 'en': 'Protect Harvester and secure more ore', 'fr': 'Protégez le collecteur et sécurisez plus de minerai', 'zh-TW': '保護採礦車並確保更多礦石' }.get(lang, 'Protect Harvester and secure more ore')) # Army composition tips if player_buildings > 0: if player_units < enemy_units: tips.append({ 'en': 'Train Infantry and add Tanks for frontline', 'fr': 'Entraînez de l’infanterie et ajoutez des chars en première ligne', 'zh-TW': '訓練步兵並加入坦克作為前線' }.get(lang, 'Train Infantry and add Tanks for frontline')) else: tips.append({ 'en': 'Scout enemy base and pressure weak flanks', 'fr': 'Éclairez la base ennemie et mettez la pression sur les flancs faibles', 'zh-TW': '偵察敵方基地並壓制薄弱側翼' }.get(lang, 'Scout enemy base and pressure weak flanks')) # Defense tip if buildings disadvantage if player_buildings < enemy_buildings: tips.append({ 'en': 'Fortify around HQ and key production buildings', 'fr': 'Fortifiez autour du QG et des bâtiments de production', 'zh-TW': '在總部與生產建築周圍加強防禦' }.get(lang, 'Fortify around HQ and key production buildings')) # Coach line coach = { 'en': 'Keep your economy safe and strike when you see an opening.', 'fr': 'Protégez votre économie et frappez dès qu’une ouverture se présente.', 'zh-TW': '保護經濟,抓住機會果斷出擊。' }.get(lang, 'Keep your economy safe and strike when you see an opening.') return { 'summary': summary, 'tips': tips[:4] or ['Build more units'], 'coach': coach, 'source': 'heuristic' } def summarize_combat_situation( self, game_state: Dict, language_code: str = "en" ) -> Dict[str, Any]: """ Generate tactical analysis of current battle. Args: game_state: Current game state dictionary language_code: Language for response (en, fr, zh-TW) Returns: Dict with keys: summary, tips, coach """ # If LLM is not available, return heuristic result if not self.model_available: return self._heuristic_analysis(game_state, language_code) # Import here to avoid circular dependency from localization import LOCALIZATION language_name = LOCALIZATION.get_ai_language_name(language_code) # Build tactical summary prompt player_units = sum(1 for u in game_state.get('units', {}).values() if u.get('player_id') == 0) enemy_units = sum(1 for u in game_state.get('units', {}).values() if u.get('player_id') == 1) player_buildings = sum(1 for b in game_state.get('buildings', {}).values() if b.get('player_id') == 0) enemy_buildings = sum(1 for b in game_state.get('buildings', {}).values() if b.get('player_id') == 1) player_credits = game_state.get('players', {}).get(0, {}).get('credits', 0) example_summary = LOCALIZATION.get_ai_example_summary(language_code) prompt = ( f"You are an expert RTS (Red Alert style) commentator & coach. Return ONLY one ... block.\n" f"JSON keys: summary (string concise tactical overview), tips (array of 1-4 short imperative build/composition suggestions), coach (1 motivational/adaptive sentence).\n" f"No additional keys. No text outside tags. Language: {language_name}.\n" f"\n" f"Battle state: Player {player_units} units vs Enemy {enemy_units} units. " f"Player {player_buildings} buildings vs Enemy {enemy_buildings} buildings. " f"Credits: {player_credits}.\n" f"\n" f"Example JSON:\n" f'{{"summary": "{example_summary}", ' f'"tips": ["Build more tanks", "Defend north base", "Scout enemy position"], ' f'"coach": "You are doing well; keep pressure on the enemy."}}\n' f"\n" f"Generate tactical analysis in {language_name}:" ) result = self.generate_response( prompt=prompt, max_tokens=150, # Reduced from 200 for faster generation temperature=0.7 ) if result.get('status') != 'ok': # Fallback to heuristic on error return self._heuristic_analysis(game_state, language_code) data = result.get('data', {}) # Try to extract fields from structured JSON first summary = str(data.get('summary') or '').strip() tips_raw = data.get('tips') or [] coach = str(data.get('coach') or '').strip() # If no structured data, try to parse raw text if not summary and 'raw' in data: raw_text = str(data.get('raw', '')).strip() # Use the first sentence or the whole text as summary sentences = raw_text.split('.') if sentences: summary = sentences[0].strip() + '.' else: summary = raw_text[:150] # Max 150 chars # Try to extract tips from remaining text # Look for patterns like "Build X", "Defend Y", etc. import re tip_patterns = [ r'Build [^.]+', r'Defend [^.]+', r'Attack [^.]+', r'Scout [^.]+', r'Expand [^.]+', r'Protect [^.]+', r'Train [^.]+', r'Produce [^.]+', ] found_tips = [] for pattern in tip_patterns: matches = re.findall(pattern, raw_text, re.IGNORECASE) found_tips.extend(matches[:2]) # Max 2 per pattern if found_tips: tips_raw = found_tips[:4] # Max 4 tips # Use remaining text as coach message if len(sentences) > 1: coach = '. '.join(sentences[1:3]).strip() # 2nd and 3rd sentences # Validate tips is array tips = [] if isinstance(tips_raw, list): for tip in tips_raw: if isinstance(tip, str): tips.append(tip.strip()) # Fallbacks if not summary or not tips or not coach: fallback = self._heuristic_analysis(game_state, language_code) summary = summary or fallback['summary'] tips = tips or fallback['tips'] coach = coach or fallback['coach'] return { 'summary': summary, 'tips': tips[:4], # Max 4 tips 'coach': coach, 'source': 'llm' } # Singleton instance (lazy initialization) _ai_analyzer_instance: Optional[AIAnalyzer] = None def get_ai_analyzer() -> AIAnalyzer: """Get singleton AI analyzer instance""" global _ai_analyzer_instance if _ai_analyzer_instance is None: _ai_analyzer_instance = AIAnalyzer() return _ai_analyzer_instance