""" AI Tactical Analysis System Uses Qwen2.5-0.5B via llama-cpp-python for battlefield analysis Shares model with NL interface through model_manager """ import os import re import json import time import multiprocessing as mp import queue from typing import Optional, Dict, Any, List from pathlib import Path # Import shared model manager try: from model_manager import get_shared_model USE_SHARED_MODEL = True except ImportError: USE_SHARED_MODEL = False # Global model download status (polled by server for UI) _MODEL_DOWNLOAD_STATUS: Dict[str, Any] = { 'status': 'idle', # idle | starting | downloading | retrying | done | error 'percent': 0, 'note': '', 'path': '' } def _update_model_download_status(update: Dict[str, Any]) -> None: try: _MODEL_DOWNLOAD_STATUS.update(update) except Exception: pass def get_model_download_status() -> Dict[str, Any]: return dict(_MODEL_DOWNLOAD_STATUS) def _llama_worker(result_queue, model_path, prompt, messages, max_tokens, temperature): """ Worker process for LLM inference. Runs in separate process to isolate native library crashes. """ try: from typing import cast from llama_cpp import Llama, ChatCompletionRequestMessage except Exception as exc: result_queue.put({'status': 'error', 'message': f"llama-cpp import failed: {exc}"}) return # Try loading the model with best-suited chat template for Qwen2.5 n_threads = max(1, min(4, os.cpu_count() or 2)) last_exc = None llama = None for chat_fmt in ('qwen2', 'qwen', None): try: kwargs: Dict[str, Any] = dict( model_path=model_path, n_ctx=4096, n_threads=n_threads, verbose=False, ) if chat_fmt is not None: kwargs['chat_format'] = chat_fmt # type: ignore[index] llama = Llama(**kwargs) # type: ignore[arg-type] break except Exception as exc: last_exc = exc llama = None continue if llama is None: result_queue.put({'status': 'error', 'message': f"Failed to load model: {last_exc}"}) return try: # Build message payload payload: List[ChatCompletionRequestMessage] = [] if messages: for msg in messages: if not isinstance(msg, dict): continue role = msg.get('role') content = msg.get('content') if not isinstance(role, str) or not isinstance(content, str): continue payload.append(cast(ChatCompletionRequestMessage, { 'role': role, 'content': content })) if not payload: base_prompt = prompt or '' if base_prompt: payload = [cast(ChatCompletionRequestMessage, { 'role': 'user', 'content': base_prompt })] else: payload = [cast(ChatCompletionRequestMessage, { 'role': 'user', 'content': '' })] # Try chat completion try: resp = llama.create_chat_completion( messages=payload, max_tokens=max_tokens, temperature=temperature, ) except Exception: resp = None # Extract text from response text = None if isinstance(resp, dict): choices = resp.get('choices') or [] if choices: parts = [] for choice in choices: if isinstance(choice, dict): part = ( choice.get('text') or (choice.get('message') or {}).get('content') or '' ) parts.append(str(part)) text = '\n'.join(parts).strip() if not text and 'text' in resp: text = str(resp.get('text')) elif resp is not None: text = str(resp) # Fallback to direct generation if chat failed if not text: try: raw_resp = llama( prompt or '', max_tokens=max_tokens, temperature=temperature, stop=["", "<|endoftext|>"] ) except Exception: raw_resp = None if isinstance(raw_resp, dict): choices = raw_resp.get('choices') or [] if choices: parts = [] for choice in choices: if isinstance(choice, dict): part = ( choice.get('text') or (choice.get('message') or {}).get('content') or '' ) parts.append(str(part)) text = '\n'.join(parts).strip() if not text and 'text' in raw_resp: text = str(raw_resp.get('text')) elif raw_resp is not None: text = str(raw_resp) if not text: text = '' # Clean up response text cleaned = text.replace('<>', ' ').replace('[/INST]', ' ').replace('[INST]', ' ') cleaned = re.sub(r'', ' ', cleaned) cleaned = re.sub(r'', ' ', cleaned) cleaned = re.sub(r'```\w*', '', cleaned) cleaned = cleaned.replace('```', '') # Remove thinking tags (Qwen models) cleaned = re.sub(r'.*?', '', cleaned, flags=re.DOTALL) cleaned = re.sub(r'.*', '', cleaned, flags=re.DOTALL) cleaned = cleaned.strip() # Try to extract JSON objects def extract_json_objects(s: str): objs = [] stack = [] start = None for idx, ch in enumerate(s): if ch == '{': if not stack: start = idx stack.append('{') elif ch == '}': if stack: stack.pop() if not stack and start is not None: candidate = s[start:idx + 1] objs.append(candidate) start = None return objs parsed_json = None try: for candidate in extract_json_objects(cleaned): try: parsed = json.loads(candidate) parsed_json = parsed break except Exception: continue except Exception: parsed_json = None if parsed_json is not None: result_queue.put({'status': 'ok', 'data': parsed_json}) else: result_queue.put({'status': 'ok', 'data': {'raw': cleaned}}) except Exception as exc: result_queue.put({'status': 'error', 'message': f"Generation failed: {exc}"}) class AIAnalyzer: """ AI Tactical Analysis System Provides battlefield analysis using Qwen2.5-0.5B model. Uses shared model manager to avoid duplicate loading with NL interface. """ def __init__(self, model_path: Optional[str] = None): """Initialize AI analyzer with model path""" if model_path is None: # Try default locations (existing files) possible_paths = [ Path("./qwen2.5-coder-1.5b-instruct-q4_0.gguf"), Path("../qwen2.5-coder-1.5b-instruct-q4_0.gguf"), Path.home() / "rts" / "qwen2.5-coder-1.5b-instruct-q4_0.gguf", Path.home() / ".cache" / "rts" / "qwen2.5-coder-1.5b-instruct-q4_0.gguf", Path("/data/qwen2.5-coder-1.5b-instruct-q4_0.gguf"), Path("/tmp/rts/qwen2.5-coder-1.5b-instruct-q4_0.gguf"), ] for path in possible_paths: try: if path.exists(): model_path = str(path) break except Exception: continue self.model_path = model_path self.model_available = model_path is not None and Path(model_path).exists() # Use shared model manager if available self.use_shared = USE_SHARED_MODEL self.shared_model = None if self.use_shared: try: self.shared_model = get_shared_model() # Ensure model is loaded if self.model_available and model_path: success, error = self.shared_model.load_model(Path(model_path).name) if success: print(f"✓ AI Analysis using SHARED model: {Path(model_path).name}") else: print(f"⚠️ Failed to load shared model: {error}") self.use_shared = False except Exception as e: print(f"⚠️ Shared model unavailable: {e}") self.use_shared = False if not self.model_available: print(f"⚠️ AI Model not found. Attempting automatic download...") # Try to download the model automatically try: import sys import urllib.request model_url = "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/resolve/main/qwen2.5-coder-1.5b-instruct-q4_0.gguf" # Fallback URL (blob with download param) alt_url = "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/blob/main/qwen2.5-coder-1.5b-instruct-q4_0.gguf?download=1" # Choose a writable destination directory filename = "qwen2.5-coder-1.5b-instruct-q4_0.gguf" candidate_dirs = [ Path(os.getenv("RTS_MODEL_DIR", "")), Path.cwd(), Path(__file__).resolve().parent, # /web Path(__file__).resolve().parent.parent, # repo root Path.home() / "rts", Path.home() / ".cache" / "rts", Path("/data"), Path("/tmp") / "rts", ] default_path: Path = Path.cwd() / filename for d in candidate_dirs: try: if not str(d): continue d.mkdir(parents=True, exist_ok=True) test_file = d / (".write_test") with open(test_file, 'w') as tf: tf.write('ok') test_file.unlink(missing_ok=True) # type: ignore[arg-type] default_path = d / filename break except Exception: continue _update_model_download_status({ 'status': 'starting', 'percent': 0, 'note': 'starting', 'path': str(default_path) }) print(f"📦 Downloading model (~350 MB)...") print(f" From: {model_url}") print(f" To: {default_path}") print(f" This may take a few minutes...") # Simple progress callback def progress_callback(block_num, block_size, total_size): if total_size > 0 and block_num % 100 == 0: downloaded = block_num * block_size percent = min(100, (downloaded / total_size) * 100) mb_downloaded = downloaded / (1024 * 1024) mb_total = total_size / (1024 * 1024) _update_model_download_status({ 'status': 'downloading', 'percent': round(percent, 1), 'note': f"{mb_downloaded:.1f}/{mb_total:.1f} MB", 'path': str(default_path) }) print(f" Progress: {percent:.1f}% ({mb_downloaded:.1f}/{mb_total:.1f} MB)", end='\r') # Ensure destination directory exists (should already be validated) try: default_path.parent.mkdir(parents=True, exist_ok=True) except Exception: pass success = False for attempt in range(3): try: # Try urllib first urllib.request.urlretrieve(model_url, default_path, reporthook=progress_callback) success = True break except Exception: # Fallback to requests streaming # Attempt streaming with requests if available used_requests = False try: try: import requests # type: ignore except Exception: requests = None # type: ignore if requests is not None: # type: ignore with requests.get(model_url, stream=True, timeout=60) as r: # type: ignore r.raise_for_status() total = int(r.headers.get('Content-Length', 0)) downloaded = 0 with open(default_path, 'wb') as f: for chunk in r.iter_content(chunk_size=1024 * 1024): # 1MB if not chunk: continue f.write(chunk) downloaded += len(chunk) if total > 0: percent = min(100, downloaded * 100 / total) _update_model_download_status({ 'status': 'downloading', 'percent': round(percent, 1), 'note': f"{downloaded/1048576:.1f}/{total/1048576:.1f} MB", 'path': str(default_path) }) print(f" Progress: {percent:.1f}% ({downloaded/1048576:.1f}/{total/1048576:.1f} MB)", end='\r') success = True used_requests = True break except Exception: # ignore and try alternative below pass # Last chance this attempt: alternative URL via urllib try: urllib.request.urlretrieve(alt_url, default_path, reporthook=progress_callback) success = True break except Exception as e: wait = 2 ** attempt _update_model_download_status({ 'status': 'retrying', 'percent': 0, 'note': f"attempt {attempt+1} failed: {e}", 'path': str(default_path) }) print(f" Download attempt {attempt+1}/3 failed: {e}. Retrying in {wait}s...") time.sleep(wait) print() # New line after progress # Verify download if success and default_path.exists(): size_mb = default_path.stat().st_size / (1024 * 1024) print(f"✅ Model downloaded successfully! ({size_mb:.1f} MB)") self.model_path = str(default_path) self.model_available = True _update_model_download_status({ 'status': 'done', 'percent': 100, 'note': f"{size_mb:.1f} MB", 'path': str(default_path) }) else: print(f"❌ Download failed. Tactical analysis disabled.") print(f" Manual download: https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF") _update_model_download_status({ 'status': 'error', 'percent': 0, 'note': 'download failed', 'path': str(default_path) }) except Exception as e: print(f"❌ Auto-download failed: {e}") print(f" Tactical analysis disabled.") print(f" Manual download: https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF") _update_model_download_status({ 'status': 'error', 'percent': 0, 'note': str(e), 'path': '' }) def generate_response( self, prompt: Optional[str] = None, messages: Optional[List[Dict]] = None, max_tokens: int = 300, temperature: float = 0.7, timeout: float = 30.0 ) -> Dict[str, Any]: """ Generate LLM response (uses shared model if available, falls back to separate process). Args: prompt: Direct prompt string messages: Chat-style messages [{"role": "user", "content": "..."}] max_tokens: Maximum tokens to generate temperature: Sampling temperature timeout: Timeout in seconds Returns: Dict with 'status' and 'data' or 'message' """ if not self.model_available: return { 'status': 'error', 'message': 'Model not available' } # Try shared model first if self.use_shared and self.shared_model and self.shared_model.model_loaded: try: # Convert prompt to messages if needed msg_list = messages if messages else [{"role": "user", "content": prompt or ""}] success, response_text, error = self.shared_model.generate( messages=msg_list, max_tokens=max_tokens, temperature=temperature, timeout=timeout ) if success and response_text: # Try to parse JSON from response try: cleaned = response_text.strip() # Try to extract JSON match = re.search(r'\{[^{}]*\}', cleaned, re.DOTALL) if match: parsed = json.loads(match.group(0)) return {'status': 'ok', 'data': parsed} else: return {'status': 'ok', 'data': {'raw': cleaned}} except: return {'status': 'ok', 'data': {'raw': response_text}} else: # Fall through to multiprocess method print(f"⚠️ Shared model failed: {error}, falling back to process isolation") except Exception as e: print(f"⚠️ Shared model error: {e}, falling back to process isolation") # Fallback: Use separate process (original method) ctx = mp.get_context('fork') result_queue = ctx.Queue() worker_process = ctx.Process( target=_llama_worker, args=(result_queue, self.model_path, prompt, messages, max_tokens, temperature) ) worker_process.start() try: result = result_queue.get(timeout=timeout) worker_process.join(timeout=5.0) return result except queue.Empty: worker_process.terminate() worker_process.join(timeout=5.0) if worker_process.is_alive(): worker_process.kill() worker_process.join() return {'status': 'error', 'message': 'Generation timeout'} except Exception as exc: worker_process.terminate() worker_process.join(timeout=5.0) return {'status': 'error', 'message': str(exc)} def _heuristic_analysis(self, game_state: Dict, language_code: str) -> Dict[str, Any]: """Lightweight, deterministic analysis when LLM is unavailable.""" from localization import LOCALIZATION lang = language_code or "en" lang_name = LOCALIZATION.get_ai_language_name(lang) player_units = sum(1 for u in game_state.get('units', {}).values() if u.get('player_id') == 0) enemy_units = sum(1 for u in game_state.get('units', {}).values() if u.get('player_id') == 1) player_buildings = sum(1 for b in game_state.get('buildings', {}).values() if b.get('player_id') == 0) enemy_buildings = sum(1 for b in game_state.get('buildings', {}).values() if b.get('player_id') == 1) player = game_state.get('players', {}).get(0, {}) credits = int(player.get('credits', 0) or 0) power = int(player.get('power', 0) or 0) power_cons = int(player.get('power_consumption', 0) or 0) advantage = 'even' score = (player_units - enemy_units) + 0.5 * (player_buildings - enemy_buildings) if score > 1: advantage = 'ahead' elif score < -1: advantage = 'behind' # Localized templates (concise) summaries = { 'en': { 'ahead': f"{lang_name}: You hold the initiative. Maintain pressure and expand.", 'even': f"{lang_name}: Battlefield is balanced. Scout and take map control.", 'behind': f"{lang_name}: You're under pressure. Stabilize and defend key assets.", }, 'fr': { 'ahead': f"{lang_name} : Vous avez l'initiative. Maintenez la pression et étendez-vous.", 'even': f"{lang_name} : Situation équilibrée. Éclairez et prenez le contrôle de la carte.", 'behind': f"{lang_name} : Sous pression. Stabilisez et défendez les actifs clés.", }, 'zh-TW': { 'ahead': f"{lang_name}:佔據主動。保持壓力並擴張。", 'even': f"{lang_name}:局勢均衡。偵察並掌控地圖。", 'behind': f"{lang_name}:處於劣勢。穩住陣腳並防守關鍵建築。", } } summary = summaries.get(lang, summaries['en'])[advantage] tips: List[str] = [] # Power management tips if power_cons > 0 and power < power_cons: tips.append({ 'en': 'Build a Power Plant to restore production speed', 'fr': 'Construisez une centrale pour rétablir la production', 'zh-TW': '建造發電廠以恢復生產速度' }.get(lang, 'Build a Power Plant to restore production speed')) # Economy tips if credits < 300: tips.append({ 'en': 'Protect Harvester and secure more ore', 'fr': 'Protégez le collecteur et sécurisez plus de minerai', 'zh-TW': '保護採礦車並確保更多礦石' }.get(lang, 'Protect Harvester and secure more ore')) # Army composition tips if player_buildings > 0: if player_units < enemy_units: tips.append({ 'en': 'Train Infantry and add Tanks for frontline', 'fr': 'Entraînez de l’infanterie et ajoutez des chars en première ligne', 'zh-TW': '訓練步兵並加入坦克作為前線' }.get(lang, 'Train Infantry and add Tanks for frontline')) else: tips.append({ 'en': 'Scout enemy base and pressure weak flanks', 'fr': 'Éclairez la base ennemie et mettez la pression sur les flancs faibles', 'zh-TW': '偵察敵方基地並壓制薄弱側翼' }.get(lang, 'Scout enemy base and pressure weak flanks')) # Defense tip if buildings disadvantage if player_buildings < enemy_buildings: tips.append({ 'en': 'Fortify around HQ and key production buildings', 'fr': 'Fortifiez autour du QG et des bâtiments de production', 'zh-TW': '在總部與生產建築周圍加強防禦' }.get(lang, 'Fortify around HQ and key production buildings')) # Coach line coach = { 'en': 'Keep your economy safe and strike when you see an opening.', 'fr': 'Protégez votre économie et frappez dès qu’une ouverture se présente.', 'zh-TW': '保護經濟,抓住機會果斷出擊。' }.get(lang, 'Keep your economy safe and strike when you see an opening.') return { 'summary': summary, 'tips': tips[:4] or ['Build more units'], 'coach': coach, 'source': 'heuristic' } def summarize_combat_situation( self, game_state: Dict, language_code: str = "en" ) -> Dict[str, Any]: """ Generate tactical analysis of current battle. Args: game_state: Current game state dictionary language_code: Language for response (en, fr, zh-TW) Returns: Dict with keys: summary, tips, coach """ # If LLM is not available, return heuristic result if not self.model_available: return self._heuristic_analysis(game_state, language_code) # Import here to avoid circular dependency from localization import LOCALIZATION language_name = LOCALIZATION.get_ai_language_name(language_code) # Build tactical summary prompt player_units = sum(1 for u in game_state.get('units', {}).values() if u.get('player_id') == 0) enemy_units = sum(1 for u in game_state.get('units', {}).values() if u.get('player_id') == 1) player_buildings = sum(1 for b in game_state.get('buildings', {}).values() if b.get('player_id') == 0) enemy_buildings = sum(1 for b in game_state.get('buildings', {}).values() if b.get('player_id') == 1) player_credits = game_state.get('players', {}).get(0, {}).get('credits', 0) example_summary = LOCALIZATION.get_ai_example_summary(language_code) prompt = ( f"You are an expert RTS (Red Alert style) commentator & coach. Return ONLY one ... block.\n" f"JSON keys: summary (string concise tactical overview), tips (array of 1-4 short imperative build/composition suggestions), coach (1 motivational/adaptive sentence).\n" f"No additional keys. No text outside tags. Language: {language_name}.\n" f"\n" f"Battle state: Player {player_units} units vs Enemy {enemy_units} units. " f"Player {player_buildings} buildings vs Enemy {enemy_buildings} buildings. " f"Credits: {player_credits}.\n" f"\n" f"Example JSON:\n" f'{{"summary": "{example_summary}", ' f'"tips": ["Build more tanks", "Defend north base", "Scout enemy position"], ' f'"coach": "You are doing well; keep pressure on the enemy."}}\n' f"\n" f"Generate tactical analysis in {language_name}:" ) result = self.generate_response( prompt=prompt, max_tokens=300, temperature=0.7, timeout=25.0 ) if result.get('status') != 'ok': # Fallback to heuristic on error return self._heuristic_analysis(game_state, language_code) data = result.get('data', {}) # Try to extract fields from structured JSON first summary = str(data.get('summary') or '').strip() tips_raw = data.get('tips') or [] coach = str(data.get('coach') or '').strip() # If no structured data, try to parse raw text if not summary and 'raw' in data: raw_text = str(data.get('raw', '')).strip() # Use the first sentence or the whole text as summary sentences = raw_text.split('.') if sentences: summary = sentences[0].strip() + '.' else: summary = raw_text[:150] # Max 150 chars # Try to extract tips from remaining text # Look for patterns like "Build X", "Defend Y", etc. import re tip_patterns = [ r'Build [^.]+', r'Defend [^.]+', r'Attack [^.]+', r'Scout [^.]+', r'Expand [^.]+', r'Protect [^.]+', r'Train [^.]+', r'Produce [^.]+', ] found_tips = [] for pattern in tip_patterns: matches = re.findall(pattern, raw_text, re.IGNORECASE) found_tips.extend(matches[:2]) # Max 2 per pattern if found_tips: tips_raw = found_tips[:4] # Max 4 tips # Use remaining text as coach message if len(sentences) > 1: coach = '. '.join(sentences[1:3]).strip() # 2nd and 3rd sentences # Validate tips is array tips = [] if isinstance(tips_raw, list): for tip in tips_raw: if isinstance(tip, str): tips.append(tip.strip()) # Fallbacks if not summary or not tips or not coach: fallback = self._heuristic_analysis(game_state, language_code) summary = summary or fallback['summary'] tips = tips or fallback['tips'] coach = coach or fallback['coach'] return { 'summary': summary, 'tips': tips[:4], # Max 4 tips 'coach': coach, 'source': 'llm' } # Singleton instance (lazy initialization) _ai_analyzer_instance: Optional[AIAnalyzer] = None def get_ai_analyzer() -> AIAnalyzer: """Get singleton AI analyzer instance""" global _ai_analyzer_instance if _ai_analyzer_instance is None: _ai_analyzer_instance = AIAnalyzer() return _ai_analyzer_instance