Spaces:
Sleeping
Sleeping
| """ | |
| AI Tactical Analysis System | |
| Uses Qwen2.5-Coder-1.5B via shared model manager | |
| ONLY uses the single shared LLM instance - NO separate process fallback | |
| """ | |
| import os | |
| import re | |
| import json | |
| import time | |
| from typing import Optional, Dict, Any, List | |
| from pathlib import Path | |
| # Import shared model manager (REQUIRED - no fallback) | |
| from model_manager import get_shared_model | |
| USE_SHARED_MODEL = True # Always true now | |
| # Global model download status (polled by server for UI) | |
| _MODEL_DOWNLOAD_STATUS: Dict[str, Any] = { | |
| 'status': 'idle', # idle | starting | downloading | retrying | done | error | |
| 'percent': 0, | |
| 'note': '', | |
| 'path': '' | |
| } | |
| def _update_model_download_status(update: Dict[str, Any]) -> None: | |
| try: | |
| _MODEL_DOWNLOAD_STATUS.update(update) | |
| except Exception: | |
| pass | |
| def get_model_download_status() -> Dict[str, Any]: | |
| return dict(_MODEL_DOWNLOAD_STATUS) | |
| # ============================================================================= | |
| # SINGLE LLM ARCHITECTURE | |
| # ============================================================================= | |
| # This module ONLY uses the shared model from model_manager.py | |
| # OLD CODE REMOVED: _llama_worker() that loaded duplicate LLM in separate process | |
| # That caused "falling back to process isolation" and severe lag | |
| # Now: One model, loaded once, shared by all AI tasks ✅ | |
| # ============================================================================= | |
| class AIAnalyzer: | |
| """ | |
| AI Tactical Analysis System | |
| Provides battlefield analysis using Qwen2.5-0.5B model. | |
| Uses shared model manager to avoid duplicate loading with NL interface. | |
| """ | |
| def __init__(self, model_path: Optional[str] = None): | |
| """Initialize AI analyzer with model path""" | |
| if model_path is None: | |
| # Try default locations (existing files) | |
| possible_paths = [ | |
| Path("./qwen2.5-coder-1.5b-instruct-q4_0.gguf"), | |
| Path("../qwen2.5-coder-1.5b-instruct-q4_0.gguf"), | |
| Path.home() / "rts" / "qwen2.5-coder-1.5b-instruct-q4_0.gguf", | |
| Path.home() / ".cache" / "rts" / "qwen2.5-coder-1.5b-instruct-q4_0.gguf", | |
| Path("/data/qwen2.5-coder-1.5b-instruct-q4_0.gguf"), | |
| Path("/tmp/rts/qwen2.5-coder-1.5b-instruct-q4_0.gguf"), | |
| ] | |
| for path in possible_paths: | |
| try: | |
| if path.exists(): | |
| model_path = str(path) | |
| break | |
| except Exception: | |
| continue | |
| self.model_path = model_path | |
| self.model_available = model_path is not None and Path(model_path).exists() | |
| # Use shared model manager if available | |
| self.use_shared = USE_SHARED_MODEL | |
| self.shared_model = None | |
| self._current_analysis_request_id = None # Track current active analysis | |
| if self.use_shared: | |
| try: | |
| self.shared_model = get_shared_model() | |
| # Ensure model is loaded | |
| if self.model_available and model_path: | |
| success, error = self.shared_model.load_model(Path(model_path).name) | |
| if success: | |
| print(f"✓ AI Analysis using SHARED model: {Path(model_path).name}") | |
| else: | |
| print(f"⚠️ Failed to load shared model: {error}") | |
| self.use_shared = False | |
| except Exception as e: | |
| print(f"⚠️ Shared model unavailable: {e}") | |
| self.use_shared = False | |
| if not self.model_available: | |
| print(f"⚠️ AI Model not found. Attempting automatic download...") | |
| # Try to download the model automatically | |
| try: | |
| import sys | |
| import urllib.request | |
| model_url = "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/resolve/main/qwen2.5-coder-1.5b-instruct-q4_0.gguf" | |
| # Fallback URL (blob with download param) | |
| alt_url = "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/blob/main/qwen2.5-coder-1.5b-instruct-q4_0.gguf?download=1" | |
| # Choose a writable destination directory | |
| filename = "qwen2.5-coder-1.5b-instruct-q4_0.gguf" | |
| candidate_dirs = [ | |
| Path(os.getenv("RTS_MODEL_DIR", "")), | |
| Path.cwd(), | |
| Path(__file__).resolve().parent, # /web | |
| Path(__file__).resolve().parent.parent, # repo root | |
| Path.home() / "rts", | |
| Path.home() / ".cache" / "rts", | |
| Path("/data"), | |
| Path("/tmp") / "rts", | |
| ] | |
| default_path: Path = Path.cwd() / filename | |
| for d in candidate_dirs: | |
| try: | |
| if not str(d): | |
| continue | |
| d.mkdir(parents=True, exist_ok=True) | |
| test_file = d / (".write_test") | |
| with open(test_file, 'w') as tf: | |
| tf.write('ok') | |
| test_file.unlink(missing_ok=True) # type: ignore[arg-type] | |
| default_path = d / filename | |
| break | |
| except Exception: | |
| continue | |
| _update_model_download_status({ | |
| 'status': 'starting', | |
| 'percent': 0, | |
| 'note': 'starting', | |
| 'path': str(default_path) | |
| }) | |
| print(f"📦 Downloading model (~350 MB)...") | |
| print(f" From: {model_url}") | |
| print(f" To: {default_path}") | |
| print(f" This may take a few minutes...") | |
| # Simple progress callback | |
| def progress_callback(block_num, block_size, total_size): | |
| if total_size > 0 and block_num % 100 == 0: | |
| downloaded = block_num * block_size | |
| percent = min(100, (downloaded / total_size) * 100) | |
| mb_downloaded = downloaded / (1024 * 1024) | |
| mb_total = total_size / (1024 * 1024) | |
| _update_model_download_status({ | |
| 'status': 'downloading', | |
| 'percent': round(percent, 1), | |
| 'note': f"{mb_downloaded:.1f}/{mb_total:.1f} MB", | |
| 'path': str(default_path) | |
| }) | |
| print(f" Progress: {percent:.1f}% ({mb_downloaded:.1f}/{mb_total:.1f} MB)", end='\r') | |
| # Ensure destination directory exists (should already be validated) | |
| try: | |
| default_path.parent.mkdir(parents=True, exist_ok=True) | |
| except Exception: | |
| pass | |
| success = False | |
| for attempt in range(3): | |
| try: | |
| # Try urllib first | |
| urllib.request.urlretrieve(model_url, default_path, reporthook=progress_callback) | |
| success = True | |
| break | |
| except Exception: | |
| # Fallback to requests streaming | |
| # Attempt streaming with requests if available | |
| used_requests = False | |
| try: | |
| try: | |
| import requests # type: ignore | |
| except Exception: | |
| requests = None # type: ignore | |
| if requests is not None: # type: ignore | |
| with requests.get(model_url, stream=True, timeout=60) as r: # type: ignore | |
| r.raise_for_status() | |
| total = int(r.headers.get('Content-Length', 0)) | |
| downloaded = 0 | |
| with open(default_path, 'wb') as f: | |
| for chunk in r.iter_content(chunk_size=1024 * 1024): # 1MB | |
| if not chunk: | |
| continue | |
| f.write(chunk) | |
| downloaded += len(chunk) | |
| if total > 0: | |
| percent = min(100, downloaded * 100 / total) | |
| _update_model_download_status({ | |
| 'status': 'downloading', | |
| 'percent': round(percent, 1), | |
| 'note': f"{downloaded/1048576:.1f}/{total/1048576:.1f} MB", | |
| 'path': str(default_path) | |
| }) | |
| print(f" Progress: {percent:.1f}% ({downloaded/1048576:.1f}/{total/1048576:.1f} MB)", end='\r') | |
| success = True | |
| used_requests = True | |
| break | |
| except Exception: | |
| # ignore and try alternative below | |
| pass | |
| # Last chance this attempt: alternative URL via urllib | |
| try: | |
| urllib.request.urlretrieve(alt_url, default_path, reporthook=progress_callback) | |
| success = True | |
| break | |
| except Exception as e: | |
| wait = 2 ** attempt | |
| _update_model_download_status({ | |
| 'status': 'retrying', | |
| 'percent': 0, | |
| 'note': f"attempt {attempt+1} failed: {e}", | |
| 'path': str(default_path) | |
| }) | |
| print(f" Download attempt {attempt+1}/3 failed: {e}. Retrying in {wait}s...") | |
| time.sleep(wait) | |
| print() # New line after progress | |
| # Verify download | |
| if success and default_path.exists(): | |
| size_mb = default_path.stat().st_size / (1024 * 1024) | |
| print(f"✅ Model downloaded successfully! ({size_mb:.1f} MB)") | |
| self.model_path = str(default_path) | |
| self.model_available = True | |
| _update_model_download_status({ | |
| 'status': 'done', | |
| 'percent': 100, | |
| 'note': f"{size_mb:.1f} MB", | |
| 'path': str(default_path) | |
| }) | |
| else: | |
| print(f"❌ Download failed. Tactical analysis disabled.") | |
| print(f" Manual download: https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF") | |
| _update_model_download_status({ | |
| 'status': 'error', | |
| 'percent': 0, | |
| 'note': 'download failed', | |
| 'path': str(default_path) | |
| }) | |
| except Exception as e: | |
| print(f"❌ Auto-download failed: {e}") | |
| print(f" Tactical analysis disabled.") | |
| print(f" Manual download: https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF") | |
| _update_model_download_status({ | |
| 'status': 'error', | |
| 'percent': 0, | |
| 'note': str(e), | |
| 'path': '' | |
| }) | |
| def generate_response( | |
| self, | |
| prompt: str, | |
| max_tokens: int = 256, | |
| temperature: float = 0.7 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Generate a response from the model. | |
| NO TIMEOUT - waits for inference to complete (showcases LLM ability). | |
| Only cancelled if superseded by new analysis request. | |
| Args: | |
| prompt: Input prompt | |
| max_tokens: Maximum tokens to generate | |
| temperature: Sampling temperature | |
| Returns: | |
| Dict with status and data/message | |
| """ | |
| if not self.model_available: | |
| return {'status': 'error', 'message': 'Model not loaded'} | |
| # ONLY use shared model - NO fallback to separate process | |
| if not (self.use_shared and self.shared_model and self.shared_model.model_loaded): | |
| return {'status': 'error', 'message': 'Shared model not available'} | |
| try: | |
| # Cancel previous analysis if any (one active analysis at a time) | |
| if self._current_analysis_request_id is not None: | |
| self.shared_model.cancel_request(self._current_analysis_request_id) | |
| print(f"🔄 Cancelled previous AI analysis request {self._current_analysis_request_id} (new analysis requested)") | |
| messages = [ | |
| {"role": "user", "content": prompt} | |
| ] | |
| # Submit request and wait for completion (no timeout) | |
| success, response_text, error_message = self.shared_model.generate( | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature | |
| ) | |
| # Clear current request | |
| self._current_analysis_request_id = None | |
| if success and response_text: | |
| # Try to parse as JSON | |
| try: | |
| cleaned = response_text.strip() | |
| # Look for JSON in response | |
| match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', cleaned, re.DOTALL) | |
| if match: | |
| parsed = json.loads(match.group(0)) | |
| return {'status': 'ok', 'data': parsed, 'raw': response_text} | |
| else: | |
| return {'status': 'ok', 'data': {'raw': response_text}, 'raw': response_text} | |
| except: | |
| return {'status': 'ok', 'data': {'raw': response_text}, 'raw': response_text} | |
| else: | |
| print(f"⚠️ Shared model error: {error_message} (will use heuristic analysis)") | |
| return {'status': 'error', 'message': error_message or 'Generation failed'} | |
| except Exception as e: | |
| print(f"⚠️ Shared model exception: {e} (will use heuristic analysis)") | |
| return {'status': 'error', 'message': f'Error: {str(e)}'} | |
| def _heuristic_analysis(self, game_state: Dict, language_code: str) -> Dict[str, Any]: | |
| """Lightweight, deterministic analysis when LLM is unavailable.""" | |
| from localization import LOCALIZATION | |
| lang = language_code or "en" | |
| lang_name = LOCALIZATION.get_ai_language_name(lang) | |
| player_units = sum(1 for u in game_state.get('units', {}).values() if u.get('player_id') == 0) | |
| enemy_units = sum(1 for u in game_state.get('units', {}).values() if u.get('player_id') == 1) | |
| player_buildings = sum(1 for b in game_state.get('buildings', {}).values() if b.get('player_id') == 0) | |
| enemy_buildings = sum(1 for b in game_state.get('buildings', {}).values() if b.get('player_id') == 1) | |
| player = game_state.get('players', {}).get(0, {}) | |
| credits = int(player.get('credits', 0) or 0) | |
| power = int(player.get('power', 0) or 0) | |
| power_cons = int(player.get('power_consumption', 0) or 0) | |
| advantage = 'even' | |
| score = (player_units - enemy_units) + 0.5 * (player_buildings - enemy_buildings) | |
| if score > 1: | |
| advantage = 'ahead' | |
| elif score < -1: | |
| advantage = 'behind' | |
| # Localized templates (concise) | |
| summaries = { | |
| 'en': { | |
| 'ahead': f"{lang_name}: You hold the initiative. Maintain pressure and expand.", | |
| 'even': f"{lang_name}: Battlefield is balanced. Scout and take map control.", | |
| 'behind': f"{lang_name}: You're under pressure. Stabilize and defend key assets.", | |
| }, | |
| 'fr': { | |
| 'ahead': f"{lang_name} : Vous avez l'initiative. Maintenez la pression et étendez-vous.", | |
| 'even': f"{lang_name} : Situation équilibrée. Éclairez et prenez le contrôle de la carte.", | |
| 'behind': f"{lang_name} : Sous pression. Stabilisez et défendez les actifs clés.", | |
| }, | |
| 'zh-TW': { | |
| 'ahead': f"{lang_name}:佔據主動。保持壓力並擴張。", | |
| 'even': f"{lang_name}:局勢均衡。偵察並掌控地圖。", | |
| 'behind': f"{lang_name}:處於劣勢。穩住陣腳並防守關鍵建築。", | |
| } | |
| } | |
| summary = summaries.get(lang, summaries['en'])[advantage] | |
| tips: List[str] = [] | |
| # Power management tips | |
| if power_cons > 0 and power < power_cons: | |
| tips.append({ | |
| 'en': 'Build a Power Plant to restore production speed', | |
| 'fr': 'Construisez une centrale pour rétablir la production', | |
| 'zh-TW': '建造發電廠以恢復生產速度' | |
| }.get(lang, 'Build a Power Plant to restore production speed')) | |
| # Economy tips | |
| if credits < 300: | |
| tips.append({ | |
| 'en': 'Protect Harvester and secure more ore', | |
| 'fr': 'Protégez le collecteur et sécurisez plus de minerai', | |
| 'zh-TW': '保護採礦車並確保更多礦石' | |
| }.get(lang, 'Protect Harvester and secure more ore')) | |
| # Army composition tips | |
| if player_buildings > 0: | |
| if player_units < enemy_units: | |
| tips.append({ | |
| 'en': 'Train Infantry and add Tanks for frontline', | |
| 'fr': 'Entraînez de l’infanterie et ajoutez des chars en première ligne', | |
| 'zh-TW': '訓練步兵並加入坦克作為前線' | |
| }.get(lang, 'Train Infantry and add Tanks for frontline')) | |
| else: | |
| tips.append({ | |
| 'en': 'Scout enemy base and pressure weak flanks', | |
| 'fr': 'Éclairez la base ennemie et mettez la pression sur les flancs faibles', | |
| 'zh-TW': '偵察敵方基地並壓制薄弱側翼' | |
| }.get(lang, 'Scout enemy base and pressure weak flanks')) | |
| # Defense tip if buildings disadvantage | |
| if player_buildings < enemy_buildings: | |
| tips.append({ | |
| 'en': 'Fortify around HQ and key production buildings', | |
| 'fr': 'Fortifiez autour du QG et des bâtiments de production', | |
| 'zh-TW': '在總部與生產建築周圍加強防禦' | |
| }.get(lang, 'Fortify around HQ and key production buildings')) | |
| # Coach line | |
| coach = { | |
| 'en': 'Keep your economy safe and strike when you see an opening.', | |
| 'fr': 'Protégez votre économie et frappez dès qu’une ouverture se présente.', | |
| 'zh-TW': '保護經濟,抓住機會果斷出擊。' | |
| }.get(lang, 'Keep your economy safe and strike when you see an opening.') | |
| return { 'summary': summary, 'tips': tips[:4] or ['Build more units'], 'coach': coach, 'source': 'heuristic' } | |
| def summarize_combat_situation( | |
| self, | |
| game_state: Dict, | |
| language_code: str = "en" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Generate tactical analysis of current battle. | |
| Args: | |
| game_state: Current game state dictionary | |
| language_code: Language for response (en, fr, zh-TW) | |
| Returns: | |
| Dict with keys: summary, tips, coach | |
| """ | |
| # If LLM is not available, return heuristic result | |
| if not self.model_available: | |
| return self._heuristic_analysis(game_state, language_code) | |
| # Import here to avoid circular dependency | |
| from localization import LOCALIZATION | |
| language_name = LOCALIZATION.get_ai_language_name(language_code) | |
| # Build tactical summary prompt | |
| player_units = sum(1 for u in game_state.get('units', {}).values() | |
| if u.get('player_id') == 0) | |
| enemy_units = sum(1 for u in game_state.get('units', {}).values() | |
| if u.get('player_id') == 1) | |
| player_buildings = sum(1 for b in game_state.get('buildings', {}).values() | |
| if b.get('player_id') == 0) | |
| enemy_buildings = sum(1 for b in game_state.get('buildings', {}).values() | |
| if b.get('player_id') == 1) | |
| player_credits = game_state.get('players', {}).get(0, {}).get('credits', 0) | |
| example_summary = LOCALIZATION.get_ai_example_summary(language_code) | |
| prompt = ( | |
| f"You are an expert RTS (Red Alert style) commentator & coach. Return ONLY one <json>...</json> block.\n" | |
| f"JSON keys: summary (string concise tactical overview), tips (array of 1-4 short imperative build/composition suggestions), coach (1 motivational/adaptive sentence).\n" | |
| f"No additional keys. No text outside tags. Language: {language_name}.\n" | |
| f"\n" | |
| f"Battle state: Player {player_units} units vs Enemy {enemy_units} units. " | |
| f"Player {player_buildings} buildings vs Enemy {enemy_buildings} buildings. " | |
| f"Credits: {player_credits}.\n" | |
| f"\n" | |
| f"Example JSON:\n" | |
| f'{{"summary": "{example_summary}", ' | |
| f'"tips": ["Build more tanks", "Defend north base", "Scout enemy position"], ' | |
| f'"coach": "You are doing well; keep pressure on the enemy."}}\n' | |
| f"\n" | |
| f"Generate tactical analysis in {language_name}:" | |
| ) | |
| result = self.generate_response( | |
| prompt=prompt, | |
| max_tokens=150, # Reduced from 200 for faster generation | |
| temperature=0.7 | |
| ) | |
| if result.get('status') != 'ok': | |
| # Fallback to heuristic on error | |
| return self._heuristic_analysis(game_state, language_code) | |
| data = result.get('data', {}) | |
| # Try to extract fields from structured JSON first | |
| summary = str(data.get('summary') or '').strip() | |
| tips_raw = data.get('tips') or [] | |
| coach = str(data.get('coach') or '').strip() | |
| # If no structured data, try to parse raw text | |
| if not summary and 'raw' in data: | |
| raw_text = str(data.get('raw', '')).strip() | |
| # Use the first sentence or the whole text as summary | |
| sentences = raw_text.split('.') | |
| if sentences: | |
| summary = sentences[0].strip() + '.' | |
| else: | |
| summary = raw_text[:150] # Max 150 chars | |
| # Try to extract tips from remaining text | |
| # Look for patterns like "Build X", "Defend Y", etc. | |
| import re | |
| tip_patterns = [ | |
| r'Build [^.]+', | |
| r'Defend [^.]+', | |
| r'Attack [^.]+', | |
| r'Scout [^.]+', | |
| r'Expand [^.]+', | |
| r'Protect [^.]+', | |
| r'Train [^.]+', | |
| r'Produce [^.]+', | |
| ] | |
| found_tips = [] | |
| for pattern in tip_patterns: | |
| matches = re.findall(pattern, raw_text, re.IGNORECASE) | |
| found_tips.extend(matches[:2]) # Max 2 per pattern | |
| if found_tips: | |
| tips_raw = found_tips[:4] # Max 4 tips | |
| # Use remaining text as coach message | |
| if len(sentences) > 1: | |
| coach = '. '.join(sentences[1:3]).strip() # 2nd and 3rd sentences | |
| # Validate tips is array | |
| tips = [] | |
| if isinstance(tips_raw, list): | |
| for tip in tips_raw: | |
| if isinstance(tip, str): | |
| tips.append(tip.strip()) | |
| # Fallbacks | |
| if not summary or not tips or not coach: | |
| fallback = self._heuristic_analysis(game_state, language_code) | |
| summary = summary or fallback['summary'] | |
| tips = tips or fallback['tips'] | |
| coach = coach or fallback['coach'] | |
| return { | |
| 'summary': summary, | |
| 'tips': tips[:4], # Max 4 tips | |
| 'coach': coach, | |
| 'source': 'llm' | |
| } | |
| # Singleton instance (lazy initialization) | |
| _ai_analyzer_instance: Optional[AIAnalyzer] = None | |
| def get_ai_analyzer() -> AIAnalyzer: | |
| """Get singleton AI analyzer instance""" | |
| global _ai_analyzer_instance | |
| if _ai_analyzer_instance is None: | |
| _ai_analyzer_instance = AIAnalyzer() | |
| return _ai_analyzer_instance | |