""" RTS Game Web Server - FastAPI + WebSocket Optimized for HuggingFace Spaces with Docker Features: - Real-time multiplayer RTS gameplay - AI tactical analysis via Qwen2.5 LLM - Multi-language support (EN/FR/ZH-TW) - Red Alert-style mechanics """ from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse, FileResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware import asyncio import json import random import time from typing import Dict, List, Optional, Set, Any from dataclasses import dataclass, asdict from enum import Enum import uuid # Import localization and AI systems from localization import LOCALIZATION from ai_analysis import get_ai_analyzer, get_model_download_status from nl_translator_async import get_nl_translator # Game Constants TILE_SIZE = 40 MAP_WIDTH = 96 MAP_HEIGHT = 72 VIEW_WIDTH = 48 VIEW_HEIGHT = 27 # Initialize FastAPI app app = FastAPI(title="RTS Game", version="1.0.0") # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) from backend.constants import ( UnitType, BuildingType, UNIT_COSTS, BUILDING_COSTS, POWER_PRODUCTION, POWER_CONSUMPTION, LOW_POWER_THRESHOLD, LOW_POWER_PRODUCTION_FACTOR, HARVESTER_CAPACITY, HARVEST_AMOUNT_PER_ORE, HARVEST_AMOUNT_PER_GEM, HQ_BUILD_RADIUS_TILES, ALLOW_MULTIPLE_SAME_BUILDING, ) class TerrainType(str, Enum): GRASS = "grass" ORE = "ore" GEM = "gem" WATER = "water" # Production Requirements - Critical for gameplay! PRODUCTION_REQUIREMENTS = { UnitType.INFANTRY: BuildingType.BARRACKS, UnitType.TANK: BuildingType.WAR_FACTORY, UnitType.ARTILLERY: BuildingType.WAR_FACTORY, UnitType.HELICOPTER: BuildingType.WAR_FACTORY, UnitType.HARVESTER: BuildingType.HQ, # Harvester needs HQ, NOT Refinery! } ## Costs, power system, and harvesting constants are imported above # Data Classes @dataclass class Position: x: float y: float def distance_to(self, other: 'Position') -> float: return ((self.x - other.x) ** 2 + (self.y - other.y) ** 2) ** 0.5 def to_dict(self): return {"x": self.x, "y": self.y} @dataclass class Unit: id: str type: UnitType player_id: int position: Position health: int max_health: int speed: float damage: int range: float target: Optional[Position] = None target_unit_id: Optional[str] = None target_building_id: Optional[str] = None cargo: int = 0 gathering: bool = False returning: bool = False ore_target: Optional[Position] = None last_attacker_id: Optional[str] = None manual_control: bool = False # True when player gives manual orders manual_order: bool = False # True when player gives manual move/attack order collision_radius: float = 15.0 # Collision detection radius attack_cooldown: int = 0 # Frames until next attack attack_animation: int = 0 # Frames for attack animation (for visual feedback) def to_dict(self): return { "id": self.id, "type": self.type.value, "player_id": self.player_id, "position": self.position.to_dict(), "health": self.health, "max_health": self.max_health, "speed": self.speed, "damage": self.damage, "range": self.range, "target": self.target.to_dict() if self.target else None, "target_unit_id": self.target_unit_id, "target_building_id": self.target_building_id, "cargo": self.cargo, "gathering": self.gathering, "returning": self.returning, "manual_control": self.manual_control, "manual_order": self.manual_order, "collision_radius": self.collision_radius, "attack_cooldown": self.attack_cooldown, "attack_animation": self.attack_animation } @dataclass class Building: id: str type: BuildingType player_id: int position: Position health: int max_health: int production_queue: List[str] production_progress: float target_unit_id: Optional[str] = None # For defense turrets attack_cooldown: int = 0 # For defense turrets attack_animation: int = 0 # For defense turrets def to_dict(self): return { "id": self.id, "type": self.type.value, "player_id": self.player_id, "position": self.position.to_dict(), "health": self.health, "max_health": self.max_health, "production_queue": self.production_queue, "production_progress": self.production_progress, "target_unit_id": self.target_unit_id, "attack_cooldown": self.attack_cooldown, "attack_animation": self.attack_animation } @dataclass class Player: id: int name: str color: str credits: int power: int power_consumption: int is_ai: bool language: str = "en" # Language preference (en, fr, zh-TW) superweapon_charge: int = 0 # 0-1800 ticks (30 seconds at 60 ticks/sec) superweapon_ready: bool = False nuke_preparing: bool = False # True when 'N' key pressed, waiting for target def to_dict(self): return asdict(self) # Game State Manager class GameState: def __init__(self): self.units: Dict[str, Unit] = {} self.buildings: Dict[str, Building] = {} self.players: Dict[int, Player] = {} self.terrain: List[List[TerrainType]] = [] self.fog_of_war: List[List[bool]] = [] self.game_started = False self.game_over = False self.winner: Optional[str] = None # "player" or "enemy" self.tick = 0 self.init_map() self.init_players() def init_map(self): """Initialize terrain with grass, ore, and water""" self.terrain = [[TerrainType.GRASS for _ in range(MAP_WIDTH)] for _ in range(MAP_HEIGHT)] self.fog_of_war = [[True for _ in range(MAP_WIDTH)] for _ in range(MAP_HEIGHT)] # Add ore patches for _ in range(15): ox, oy = random.randint(5, MAP_WIDTH-6), random.randint(5, MAP_HEIGHT-6) for dx in range(-2, 3): for dy in range(-2, 3): if 0 <= ox+dx < MAP_WIDTH and 0 <= oy+dy < MAP_HEIGHT: if random.random() > 0.3: self.terrain[oy+dy][ox+dx] = TerrainType.ORE # Add gem patches (rare) for _ in range(5): gx, gy = random.randint(5, MAP_WIDTH-6), random.randint(5, MAP_HEIGHT-6) for dx in range(-1, 2): for dy in range(-1, 2): if 0 <= gx+dx < MAP_WIDTH and 0 <= gy+dy < MAP_HEIGHT: if random.random() > 0.5: self.terrain[gy+dy][gx+dx] = TerrainType.GEM # Add water bodies for _ in range(8): wx, wy = random.randint(5, MAP_WIDTH-6), random.randint(5, MAP_HEIGHT-6) for dx in range(-3, 4): for dy in range(-3, 4): if 0 <= wx+dx < MAP_WIDTH and 0 <= wy+dy < MAP_HEIGHT: if (dx*dx + dy*dy) < 9: self.terrain[wy+dy][wx+dx] = TerrainType.WATER def init_players(self): """Initialize player 0 (human) and player 1 (AI)""" # Start with power=50 (from HQ), consumption=0 self.players[0] = Player(0, "Player", "#4A90E2", 5000, 50, 0, False) self.players[1] = Player(1, "AI", "#E74C3C", 5000, 50, 0, True) # Create starting HQ for each player hq0_id = str(uuid.uuid4()) self.buildings[hq0_id] = Building( id=hq0_id, type=BuildingType.HQ, player_id=0, position=Position(5 * TILE_SIZE, 5 * TILE_SIZE), health=500, max_health=500, production_queue=[], production_progress=0 ) hq1_id = str(uuid.uuid4()) self.buildings[hq1_id] = Building( id=hq1_id, type=BuildingType.HQ, player_id=1, position=Position((MAP_WIDTH-8) * TILE_SIZE, (MAP_HEIGHT-8) * TILE_SIZE), health=500, max_health=500, production_queue=[], production_progress=0 ) # Starting units for i in range(3): self.create_unit(UnitType.INFANTRY, 0, Position((7+i)*TILE_SIZE, 7*TILE_SIZE)) self.create_unit(UnitType.INFANTRY, 1, Position((MAP_WIDTH-10-i)*TILE_SIZE, (MAP_HEIGHT-10)*TILE_SIZE)) def create_unit(self, unit_type: UnitType, player_id: int, position: Position) -> Unit: """Create a new unit""" unit_stats = { UnitType.INFANTRY: {"health": 100, "speed": 2.0, "damage": 10, "range": 80}, UnitType.TANK: {"health": 200, "speed": 1.5, "damage": 30, "range": 120}, UnitType.HARVESTER: {"health": 150, "speed": 1.0, "damage": 0, "range": 0}, UnitType.HELICOPTER: {"health": 120, "speed": 3.0, "damage": 25, "range": 150}, UnitType.ARTILLERY: {"health": 100, "speed": 1.0, "damage": 50, "range": 200}, } stats = unit_stats[unit_type] unit_id = str(uuid.uuid4()) unit = Unit( id=unit_id, type=unit_type, player_id=player_id, position=position, health=stats["health"], max_health=stats["health"], speed=stats["speed"], damage=stats["damage"], range=stats["range"], target=None, target_unit_id=None ) self.units[unit_id] = unit return unit def create_building(self, building_type: BuildingType, player_id: int, position: Position) -> Building: """Create a new building""" building_stats = { BuildingType.HQ: {"health": 500}, BuildingType.BARRACKS: {"health": 300}, BuildingType.WAR_FACTORY: {"health": 400}, BuildingType.REFINERY: {"health": 250}, BuildingType.POWER_PLANT: {"health": 200}, BuildingType.DEFENSE_TURRET: {"health": 350}, } stats = building_stats[building_type] building_id = str(uuid.uuid4()) building = Building( id=building_id, type=building_type, player_id=player_id, position=position, health=stats["health"], max_health=stats["health"], production_queue=[], production_progress=0 ) self.buildings[building_id] = building return building def calculate_power(self, player_id: int) -> tuple[int, int, str]: """ Calculate power production and consumption for a player. Returns: tuple: (power_production, power_consumption, status) status: 'green' (enough power), 'yellow' (low power), 'red' (no power) """ production = 0 consumption = 0 for building in self.buildings.values(): if building.player_id == player_id: # Add power production production += POWER_PRODUCTION.get(building.type, 0) # Add power consumption consumption += POWER_CONSUMPTION.get(building.type, 0) # Determine status if consumption == 0: status = 'green' elif production >= consumption: status = 'green' elif production >= consumption * LOW_POWER_THRESHOLD: status = 'yellow' else: status = 'red' # Update player power values if player_id in self.players: self.players[player_id].power = production self.players[player_id].power_consumption = consumption return production, consumption, status def to_dict(self): """Convert game state to dictionary for JSON serialization""" return { "tick": self.tick, "game_started": self.game_started, "game_over": self.game_over, "winner": self.winner, "players": {pid: p.to_dict() for pid, p in self.players.items()}, "units": {uid: u.to_dict() for uid, u in self.units.items()}, "buildings": {bid: b.to_dict() for bid, b in self.buildings.items()}, "terrain": [[t.value for t in row] for row in self.terrain], "fog_of_war": self.fog_of_war } # WebSocket Connection Manager class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] self.game_state = GameState() self.game_loop_task: Optional[asyncio.Task] = None self.ai_analyzer = get_ai_analyzer() self.last_ai_analysis: Dict[str, Any] = {} self.ai_analysis_interval = 60.0 # Analyze every 60 seconds (reduced frequency to avoid blocking) self.last_ai_analysis_time = 0.0 # RED ALERT: Enemy AI state self.ai_last_action_tick = 0 self.ai_action_interval = 120 # Take action every 6 seconds (120 ticks at 20Hz) self.ai_build_plan = [ 'power_plant', 'refinery', 'barracks', 'power_plant', # Second power plant 'war_factory', ] self.ai_build_index = 0 self.ai_unit_cycle = ['infantry', 'infantry', 'tank', 'infantry', 'helicopter'] self.ai_unit_index = 0 async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) # Start game loop if not already running if self.game_loop_task is None or self.game_loop_task.done(): self.game_loop_task = asyncio.create_task(self.game_loop()) def disconnect(self, websocket: WebSocket): if websocket in self.active_connections: self.active_connections.remove(websocket) async def broadcast(self, message: dict): """Send message to all connected clients""" disconnected = [] for connection in self.active_connections: try: await connection.send_json(message) except: disconnected.append(connection) # Clean up disconnected clients for conn in disconnected: self.disconnect(conn) async def game_loop(self): """Main game loop - runs at 20 ticks per second""" print("🔄 Game loop STARTED") while True: # Run forever, check connections inside loop try: # Update game state self.update_game_state() # AI Analysis (periodic) - only if model is available current_time = time.time() if (self.ai_analyzer.model_available and current_time - self.last_ai_analysis_time >= self.ai_analysis_interval): await self.run_ai_analysis() self.last_ai_analysis_time = current_time # Broadcast state to all clients state_dict = self.game_state.to_dict() state_dict['ai_analysis'] = self.last_ai_analysis # Include AI insights # Include model download status so UI can show progress if not self.ai_analyzer.model_available: state_dict['model_download'] = get_model_download_status() # Broadcast only if there are active connections if self.active_connections: await self.broadcast({ "type": "state_update", "state": state_dict }) # 50ms delay = 20 ticks/sec await asyncio.sleep(0.05) except Exception as e: print(f"Game loop error: {e}") await asyncio.sleep(0.1) async def run_ai_analysis(self): """Run AI tactical analysis in background""" # Skip if model not available if not self.ai_analyzer.model_available: # Provide heuristic analysis so panel is never empty player_lang = self.game_state.players.get(0, Player(0, "Player", "#000", 0, 0, 0, False)).language self.last_ai_analysis = self.ai_analyzer._heuristic_analysis(self.game_state.to_dict(), player_lang) return try: # Get player language preference player_lang = self.game_state.players.get(0, Player(0, "Player", "#000", 0, 0, 0, False)).language # Run analysis in thread pool to avoid blocking loop = asyncio.get_event_loop() analysis = await loop.run_in_executor( None, self.ai_analyzer.summarize_combat_situation, self.game_state.to_dict(), player_lang ) self.last_ai_analysis = analysis # Don't print every time to avoid console spam # print(f"🤖 AI Analysis: {analysis.get('summary', '')}") except Exception as e: print(f"⚠️ AI analysis error: {e}") player_lang = self.game_state.players.get(0, Player(0, "Player", "#000", 0, 0, 0, False)).language self.last_ai_analysis = self.ai_analyzer._heuristic_analysis(self.game_state.to_dict(), player_lang) def update_game_state(self): """Update game simulation - Red Alert style!""" self.game_state.tick += 1 # DEBUG: Log every 5 seconds to confirm game loop is running if self.game_state.tick % 100 == 0: print(f"⏱️ Game tick: {self.game_state.tick} (loop running)") # Cleanup old LLM requests every 30 seconds (600 ticks at 20Hz) if self.game_state.tick % 600 == 0: from model_manager import get_shared_model model = get_shared_model() model.cleanup_old_requests(max_age=300.0) # 5 minutes # Also cleanup translator translator = get_nl_translator() translator.cleanup_old_requests(max_age=60.0) # 1 minute # Update superweapon charge (30 seconds = 1800 ticks at 60 ticks/sec) for player in self.game_state.players.values(): if not player.superweapon_ready and player.superweapon_charge < 1800: player.superweapon_charge += 1 if player.superweapon_charge >= 1800: player.superweapon_ready = True # RED ALERT: Calculate power for both players power_prod_p0, power_cons_p0, power_status_p0 = self.game_state.calculate_power(0) power_prod_p1, power_cons_p1, power_status_p1 = self.game_state.calculate_power(1) # Store power status for later use (warning every 5 seconds = 100 ticks at 20Hz) if not hasattr(self, 'last_low_power_warning'): self.last_low_power_warning = 0 if power_status_p0 == 'red' and self.game_state.tick - self.last_low_power_warning > 100: # Send low power warning to player (translated) player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en" message = LOCALIZATION.translate(player_language, "notification.low_power") asyncio.create_task(self.broadcast({ "type": "notification", "message": message, "level": "warning" })) self.last_low_power_warning = self.game_state.tick # RED ALERT: Enemy AI strategic decisions if self.game_state.tick - self.ai_last_action_tick >= self.ai_action_interval: self.update_enemy_ai() self.ai_last_action_tick = self.game_state.tick # Check victory conditions (no HQ = defeat) if not self.game_state.game_over: player_hq_exists = any(b.type == BuildingType.HQ and b.player_id == 0 for b in self.game_state.buildings.values()) enemy_hq_exists = any(b.type == BuildingType.HQ and b.player_id == 1 for b in self.game_state.buildings.values()) if not player_hq_exists and enemy_hq_exists: # Player lost self.game_state.game_over = True self.game_state.winner = "enemy" player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en" winner_name = LOCALIZATION.translate(player_language, "game.winner.enemy") message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name) asyncio.create_task(self.broadcast({ "type": "game_over", "winner": "enemy", "message": message })) elif not enemy_hq_exists and player_hq_exists: # Player won! self.game_state.game_over = True self.game_state.winner = "player" player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en" winner_name = LOCALIZATION.translate(player_language, "game.winner.player") message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name) asyncio.create_task(self.broadcast({ "type": "game_over", "winner": "player", "message": message })) elif not player_hq_exists and not enemy_hq_exists: # Draw (both destroyed simultaneously) self.game_state.game_over = True self.game_state.winner = "draw" asyncio.create_task(self.broadcast({ "type": "game_over", "winner": "draw", "message": "Draw! Both HQs destroyed" })) # Update units for unit in list(self.game_state.units.values()): # RED ALERT: Harvester AI (only if not manually controlled) if unit.type == UnitType.HARVESTER and not unit.manual_control: self.update_harvester(unit) # Don't continue - let it move with the target set by AI # RED ALERT: Auto-defense - if attacked, fight back! (but respect manual orders) if unit.last_attacker_id and unit.last_attacker_id in self.game_state.units: if not unit.target_unit_id and not unit.manual_order: # Not already attacking and no manual order unit.target_unit_id = unit.last_attacker_id # Don't clear movement target if player gave manual move order # RED ALERT: Auto-acquire nearby enemies when idle (but respect manual orders) if not unit.target_unit_id and not unit.target and unit.damage > 0 and not unit.manual_order: nearest_enemy = self.find_nearest_enemy(unit) if nearest_enemy and unit.position.distance_to(nearest_enemy.position) < unit.range * 3: unit.target_unit_id = nearest_enemy.id # Handle combat if unit.target_unit_id: if unit.target_unit_id in self.game_state.units: target = self.game_state.units[unit.target_unit_id] distance = unit.position.distance_to(target.position) if distance <= unit.range: # In range - attack! unit.target = None # Stop moving # Cooldown system - attack every N frames if unit.attack_cooldown <= 0: # Calculate damage based on unit type # Infantry: 5-10 damage per hit, fast attacks (20 frames) # Tank: 30-40 damage per hit, slow attacks (40 frames) # Artillery: 50-60 damage per hit, very slow (60 frames) # Helicopter: 15-20 damage per hit, medium speed (30 frames) damage_multipliers = { UnitType.INFANTRY: (5, 20), # (damage, cooldown) UnitType.TANK: (35, 40), UnitType.ARTILLERY: (55, 60), UnitType.HELICOPTER: (18, 30), UnitType.HARVESTER: (0, 0), } damage, cooldown = damage_multipliers.get(unit.type, (5, 20)) # Apply damage target.health -= damage unit.attack_cooldown = cooldown unit.attack_animation = 10 # 10 frames of attack animation target.last_attacker_id = unit.id # RED ALERT: Track attacker for auto-defense if target.health <= 0: # Target destroyed del self.game_state.units[unit.target_unit_id] unit.target_unit_id = None unit.last_attacker_id = None else: # Move closer unit.target = Position(target.position.x, target.position.y) else: # Target no longer exists unit.target_unit_id = None # Handle building attacks if unit.target_building_id: if unit.target_building_id in self.game_state.buildings: target = self.game_state.buildings[unit.target_building_id] distance = unit.position.distance_to(target.position) if distance <= unit.range: # In range - attack! unit.target = None # Stop moving # Cooldown system - attack every N frames if unit.attack_cooldown <= 0: damage_multipliers = { UnitType.INFANTRY: (5, 20), # (damage, cooldown) UnitType.TANK: (35, 40), UnitType.ARTILLERY: (55, 60), UnitType.HELICOPTER: (18, 30), UnitType.HARVESTER: (0, 0), } damage, cooldown = damage_multipliers.get(unit.type, (5, 20)) # Apply damage to building target.health -= damage unit.attack_cooldown = cooldown unit.attack_animation = 10 # 10 frames of attack animation if target.health <= 0: # Building destroyed del self.game_state.buildings[unit.target_building_id] unit.target_building_id = None else: # Move closer unit.target = Position(target.position.x, target.position.y) else: # Target no longer exists unit.target_building_id = None # Decrease attack cooldown and animation if unit.attack_cooldown > 0: unit.attack_cooldown -= 1 if unit.attack_animation > 0: unit.attack_animation -= 1 # Movement # DEBUG: Log unit target status for player units if unit.player_id == 0 and self.game_state.tick % 40 == 0: # Every 2 seconds if unit.target: print(f" 🎯 Unit {unit.id[:8]} ({unit.type}) HAS target: ({unit.target.x:.1f}, {unit.target.y:.1f}), pos: ({unit.position.x:.1f}, {unit.position.y:.1f})") else: print(f" ❌ Unit {unit.id[:8]} ({unit.type}) NO target, pos: ({unit.position.x:.1f}, {unit.position.y:.1f})") if unit.target: # Move towards target dx = unit.target.x - unit.position.x dy = unit.target.y - unit.position.y dist = (dx*dx + dy*dy) ** 0.5 if dist > 5: old_x, old_y = unit.position.x, unit.position.y unit.position.x += (dx / dist) * unit.speed unit.position.y += (dy / dist) * unit.speed # Apply dispersion after movement self.apply_unit_dispersion(unit) # Debug log for movement (only log occasionally to avoid spam) if self.game_state.tick % 20 == 0: # Every second (20 ticks/sec) print(f" 🚶 Unit {unit.id[:8]} ({unit.type}) moving: ({old_x:.1f},{old_y:.1f}) -> ({unit.position.x:.1f},{unit.position.y:.1f}), dist={dist:.1f}") else: print(f" ✅ Unit {unit.id[:8]} ({unit.type}) reached destination ({unit.position.x:.1f},{unit.position.y:.1f})") unit.target = None unit.manual_order = False # Clear manual order flag when destination reached # If Harvester reached manual destination, resume AI if unit.type == UnitType.HARVESTER and unit.manual_control: unit.manual_control = False # RED ALERT: AI unit behavior (enemy side) if self.game_state.players[unit.player_id].is_ai: self.update_ai_unit(unit) # Update buildings production for building in self.game_state.buildings.values(): # Defense turret auto-attack logic if building.type == BuildingType.DEFENSE_TURRET: turret_range = 300.0 # Defense turret range # Find nearest enemy unit if not building.target_unit_id or building.target_unit_id not in self.game_state.units: min_dist = float('inf') nearest_enemy = None for enemy_unit in self.game_state.units.values(): if enemy_unit.player_id != building.player_id: dist = building.position.distance_to(enemy_unit.position) if dist < turret_range and dist < min_dist: min_dist = dist nearest_enemy = enemy_unit if nearest_enemy: building.target_unit_id = nearest_enemy.id # Attack target if in range if building.target_unit_id and building.target_unit_id in self.game_state.units: target = self.game_state.units[building.target_unit_id] distance = building.position.distance_to(target.position) if distance <= turret_range: # Attack! if building.attack_cooldown <= 0: damage = 20 # Turret damage target.health -= damage building.attack_cooldown = 30 # 30 frames cooldown building.attack_animation = 10 if target.health <= 0: # Target destroyed del self.game_state.units[building.target_unit_id] building.target_unit_id = None else: # Out of range, lose target building.target_unit_id = None # Decrease cooldowns if building.attack_cooldown > 0: building.attack_cooldown -= 1 if building.attack_animation > 0: building.attack_animation -= 1 # DEBUG: Log production queue status if building.player_id == 0 and building.type in [BuildingType.BARRACKS, BuildingType.WAR_FACTORY]: if self.game_state.tick % 60 == 0: # Every 3 seconds if building.production_queue: print(f" 📋 Building {building.id[:8]} ({building.type}) queue: {building.production_queue}, progress: {building.production_progress:.2%}") else: print(f" 💤 Building {building.id[:8]} ({building.type}) IDLE (no queue)") if building.production_queue: # RED ALERT: Check power status for this building's player _, _, power_status = self.game_state.calculate_power(building.player_id) # Adjust production speed based on power production_speed = 0.01 if power_status == 'red': production_speed *= LOW_POWER_PRODUCTION_FACTOR # 50% speed when low power building.production_progress += production_speed # Debug log every 2 seconds (40 ticks at 20Hz) if self.game_state.tick % 40 == 0: print(f" 🏭 Building {building.id[:8]} ({building.type}) producing: {building.production_queue[0]} - progress: {building.production_progress:.2%}") if building.production_progress >= 1.0: # Complete production print(f" ✅ Production complete! Creating {building.production_queue[0]}") unit_type = UnitType(building.production_queue.pop(0)) spawn_pos = Position( building.position.x + TILE_SIZE * 2, building.position.y + TILE_SIZE * 2 ) # Find free position near spawn point new_unit = self.game_state.create_unit(unit_type, building.player_id, spawn_pos) if new_unit: free_pos = self.find_free_position_nearby(spawn_pos, new_unit.id) new_unit.position = free_pos building.production_progress = 0 # Endgame checks: elimination-first (after all destructions this tick) if not self.game_state.game_over: p_units = sum(1 for u in self.game_state.units.values() if u.player_id == 0) e_units = sum(1 for u in self.game_state.units.values() if u.player_id == 1) p_buildings = sum(1 for b in self.game_state.buildings.values() if b.player_id == 0) e_buildings = sum(1 for b in self.game_state.buildings.values() if b.player_id == 1) player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en" if p_units == 0 and p_buildings == 0 and (e_units > 0 or e_buildings > 0): # Player eliminated self.game_state.game_over = True self.game_state.winner = "enemy" winner_name = LOCALIZATION.translate(player_language, "game.winner.enemy") message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name) asyncio.create_task(self.broadcast({ "type": "game_over", "winner": "enemy", "message": message })) elif e_units == 0 and e_buildings == 0 and (p_units > 0 or p_buildings > 0): # Enemy eliminated self.game_state.game_over = True self.game_state.winner = "player" winner_name = LOCALIZATION.translate(player_language, "game.winner.player") message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name) asyncio.create_task(self.broadcast({ "type": "game_over", "winner": "player", "message": message })) elif p_units == 0 and p_buildings == 0 and e_units == 0 and e_buildings == 0: # Total annihilation -> draw self.game_state.game_over = True self.game_state.winner = "draw" # Localized draw banner if available try: draw_msg = LOCALIZATION.translate(player_language, "game.draw.banner") except Exception: draw_msg = "Draw!" asyncio.create_task(self.broadcast({ "type": "game_over", "winner": "draw", "message": draw_msg })) def find_nearest_enemy(self, unit: Unit) -> Optional[Unit]: """RED ALERT: Find nearest enemy unit""" min_dist = float('inf') nearest_enemy = None for other_unit in self.game_state.units.values(): if other_unit.player_id != unit.player_id: dist = unit.position.distance_to(other_unit.position) if dist < min_dist: min_dist = dist nearest_enemy = other_unit return nearest_enemy def is_position_occupied(self, position: Position, current_unit_id: str, radius: float = 15.0) -> bool: """Check if a position is occupied by another unit""" for unit_id, unit in self.game_state.units.items(): if unit_id == current_unit_id: continue distance = position.distance_to(unit.position) if distance < (radius + unit.collision_radius): return True return False def find_free_position_nearby(self, position: Position, unit_id: str, max_attempts: int = 16) -> Position: """Find a free position around the given position using spiral search""" # Check if current position is free if not self.is_position_occupied(position, unit_id): return position # Spiral search outward with circular pattern import math for ring in range(1, max_attempts + 1): # Number of positions to test in this ring (8 directions) num_positions = 8 radius = 25.0 * ring # Increase radius with each ring for i in range(num_positions): angle = (i * 360.0 / num_positions) * (math.pi / 180.0) # Convert to radians offset_x = radius * math.cos(angle) offset_y = radius * math.sin(angle) new_pos = Position( position.x + offset_x, position.y + offset_y ) # Keep within map bounds new_pos.x = max(TILE_SIZE, min(MAP_WIDTH * TILE_SIZE - TILE_SIZE, new_pos.x)) new_pos.y = max(TILE_SIZE, min(MAP_HEIGHT * TILE_SIZE - TILE_SIZE, new_pos.y)) if not self.is_position_occupied(new_pos, unit_id): return new_pos # If no free position found, return original (fallback) return position def apply_unit_dispersion(self, unit: Unit): """Apply automatic dispersion to prevent units from overlapping""" if self.is_position_occupied(unit.position, unit.id): new_position = self.find_free_position_nearby(unit.position, unit.id) unit.position = new_position def update_harvester(self, unit: Unit): """RED ALERT: Harvester AI - auto-collect resources!""" # If returning to base with cargo if unit.returning: # Find nearest Refinery or HQ depot = self.find_nearest_depot(unit.player_id, unit.position) if depot: distance = unit.position.distance_to(depot.position) if distance < TILE_SIZE * 2: # Deposit cargo self.game_state.players[unit.player_id].credits += unit.cargo unit.cargo = 0 unit.returning = False unit.gathering = False unit.ore_target = None unit.target = None # Clear target after deposit unit.manual_control = False # Resume AI after deposit else: # Move to depot unit.target = Position(depot.position.x, depot.position.y) else: # No depot - stop returning unit.returning = False return # If at ore patch, harvest it if unit.ore_target: distance = ((unit.position.x - unit.ore_target.x) ** 2 + (unit.position.y - unit.ore_target.y) ** 2) ** 0.5 if distance < TILE_SIZE / 2: # Harvest ore tile_x = int(unit.ore_target.x / TILE_SIZE) tile_y = int(unit.ore_target.y / TILE_SIZE) if (0 <= tile_x < MAP_WIDTH and 0 <= tile_y < MAP_HEIGHT): terrain = self.game_state.terrain[tile_y][tile_x] if terrain == TerrainType.ORE: unit.cargo = min(HARVESTER_CAPACITY, unit.cargo + HARVEST_AMOUNT_PER_ORE) self.game_state.terrain[tile_y][tile_x] = TerrainType.GRASS elif terrain == TerrainType.GEM: unit.cargo = min(HARVESTER_CAPACITY, unit.cargo + HARVEST_AMOUNT_PER_GEM) self.game_state.terrain[tile_y][tile_x] = TerrainType.GRASS unit.ore_target = None unit.gathering = False # If cargo full or nearly full, return if unit.cargo >= HARVESTER_CAPACITY * 0.9: unit.returning = True unit.target = None else: # Move to ore unit.target = unit.ore_target return # FIXED: Always search for ore when idle (not gathering and no ore target) # This ensures Harvester automatically finds ore after spawning or depositing if not unit.gathering and not unit.ore_target: nearest_ore = self.find_nearest_ore(unit.position) if nearest_ore: unit.ore_target = nearest_ore unit.gathering = True unit.target = nearest_ore # If no ore found, clear any residual target to stay idle elif unit.target: unit.target = None def find_nearest_depot(self, player_id: int, position: Position) -> Optional[Building]: """Find nearest Refinery or HQ for harvester""" nearest = None min_dist = float('inf') for building in self.game_state.buildings.values(): if building.player_id == player_id: if building.type in [BuildingType.REFINERY, BuildingType.HQ]: dist = position.distance_to(building.position) if dist < min_dist: min_dist = dist nearest = building return nearest def find_nearest_ore(self, position: Position) -> Optional[Position]: """Find nearest ore or gem tile""" nearest = None min_dist = float('inf') for y in range(MAP_HEIGHT): for x in range(MAP_WIDTH): if self.game_state.terrain[y][x] in [TerrainType.ORE, TerrainType.GEM]: ore_pos = Position(x * TILE_SIZE + TILE_SIZE/2, y * TILE_SIZE + TILE_SIZE/2) dist = position.distance_to(ore_pos) if dist < min_dist: min_dist = dist nearest = ore_pos return nearest def update_ai_unit(self, unit: Unit): """RED ALERT: Enemy AI behavior - aggressive!""" if unit.damage == 0: # Don't attack with harvesters return # Always try to attack nearest enemy if not unit.target_unit_id: nearest_enemy = self.find_nearest_enemy(unit) if nearest_enemy: distance = unit.position.distance_to(nearest_enemy.position) # Attack if within aggro range if distance < 500: # Aggro range unit.target_unit_id = nearest_enemy.id def update_enemy_ai(self): """RED ALERT: Enemy AI strategic decision making""" player_ai = self.game_state.players[1] # 1. Check if AI should build next building if self.ai_build_index < len(self.ai_build_plan): next_building_type = self.ai_build_plan[self.ai_build_index] building_type = BuildingType(next_building_type) cost = BUILDING_COSTS.get(building_type, 0) # Check if AI can afford it if player_ai.credits >= cost: # Check if prerequisites are met (simplified) can_build = True # Check power plant requirement if building_type in [BuildingType.BARRACKS, BuildingType.REFINERY, BuildingType.WAR_FACTORY]: has_power_plant = any( b.type == BuildingType.POWER_PLANT and b.player_id == 1 for b in self.game_state.buildings.values() ) if not has_power_plant: can_build = False # Check barracks requirement for war factory if building_type == BuildingType.WAR_FACTORY: has_barracks = any( b.type == BuildingType.BARRACKS and b.player_id == 1 for b in self.game_state.buildings.values() ) if not has_barracks: can_build = False if can_build: # Find build location near AI HQ ai_hq = next( (b for b in self.game_state.buildings.values() if b.player_id == 1 and b.type == BuildingType.HQ), None ) if ai_hq: # Random offset from HQ offset_x = random.randint(-200, 200) offset_y = random.randint(-200, 200) build_pos = Position( max(TILE_SIZE * 3, min(MAP_WIDTH * TILE_SIZE - TILE_SIZE * 3, ai_hq.position.x + offset_x)), max(TILE_SIZE * 3, min(MAP_HEIGHT * TILE_SIZE - TILE_SIZE * 3, ai_hq.position.y + offset_y)) ) # Deduct credits player_ai.credits -= cost # Create building immediately (simplified - no construction queue for AI) self.game_state.create_building(building_type, 1, build_pos) print(f"🤖 AI built {building_type.value} at {build_pos.x:.0f},{build_pos.y:.0f}") # Move to next building in plan self.ai_build_index += 1 # 2. Produce units if we have production buildings ai_barracks = [ b for b in self.game_state.buildings.values() if b.player_id == 1 and b.type == BuildingType.BARRACKS ] ai_war_factory = [ b for b in self.game_state.buildings.values() if b.player_id == 1 and b.type == BuildingType.WAR_FACTORY ] # Produce infantry from barracks if ai_barracks and len(ai_barracks[0].production_queue) == 0: if player_ai.credits >= UNIT_COSTS[UnitType.INFANTRY]: player_ai.credits -= UNIT_COSTS[UnitType.INFANTRY] ai_barracks[0].production_queue.append(UnitType.INFANTRY.value) print(f"🤖 AI queued Infantry") # Produce vehicles from war factory (cycle through unit types) if ai_war_factory and len(ai_war_factory[0].production_queue) == 0: next_unit_type = self.ai_unit_cycle[self.ai_unit_index] unit_type = UnitType(next_unit_type) cost = UNIT_COSTS.get(unit_type, 0) if player_ai.credits >= cost: player_ai.credits -= cost ai_war_factory[0].production_queue.append(unit_type.value) self.ai_unit_index = (self.ai_unit_index + 1) % len(self.ai_unit_cycle) print(f"🤖 AI queued {unit_type.value}") # 3. Make AI harvesters collect resources for unit in self.game_state.units.values(): if unit.player_id == 1 and unit.type == UnitType.HARVESTER: if not unit.manual_control: # Harvester AI is handled by update_harvester() in main loop pass async def launch_nuke(self, player_id: int, target: Position): """Launch nuclear strike at target location""" # Damage radius: 200 pixels = 5 tiles NUKE_DAMAGE_RADIUS = 200.0 NUKE_MAX_DAMAGE = 200 # Maximum damage at center # Damage all units within radius units_to_remove = [] for unit_id, unit in self.game_state.units.items(): distance = unit.position.distance_to(target) if distance <= NUKE_DAMAGE_RADIUS: # Damage decreases with distance (full damage at center, 50% at edge) damage_factor = 1.0 - (distance / NUKE_DAMAGE_RADIUS) * 0.5 damage = int(NUKE_MAX_DAMAGE * damage_factor) unit.health -= damage if unit.health <= 0: units_to_remove.append(unit_id) # Remove destroyed units for unit_id in units_to_remove: del self.game_state.units[unit_id] # Damage buildings within radius buildings_to_remove = [] for building_id, building in self.game_state.buildings.items(): distance = building.position.distance_to(target) if distance <= NUKE_DAMAGE_RADIUS: # Damage decreases with distance damage_factor = 1.0 - (distance / NUKE_DAMAGE_RADIUS) * 0.5 damage = int(NUKE_MAX_DAMAGE * damage_factor) building.health -= damage if building.health <= 0: buildings_to_remove.append(building_id) # Remove destroyed buildings for building_id in buildings_to_remove: del self.game_state.buildings[building_id] print(f"💥 NUKE launched by player {player_id} at ({target.x:.0f}, {target.y:.0f})") print(f" Destroyed {len(units_to_remove)} units and {len(buildings_to_remove)} buildings") async def handle_command(self, command: dict): """Handle game commands from clients""" cmd_type = command.get("type") if cmd_type == "move_unit": unit_ids = command.get("unit_ids", []) target = command.get("target") print(f"🎮 MOVE_UNIT command received: unit_ids={unit_ids}, target={target}") if target and "x" in target and "y" in target: base_target = Position(target["x"], target["y"]) print(f" Moving to position ({base_target.x:.1f}, {base_target.y:.1f})") # If multiple units, spread them in a formation if len(unit_ids) > 1: print(f" Setting formation for {len(unit_ids)} units") # Formation pattern: circular spread around target radius = 30.0 # Distance between units in formation for idx, uid in enumerate(unit_ids): if uid in self.game_state.units: unit = self.game_state.units[uid] print(f" Unit {uid} ({unit.type}) - setting target") # Calculate offset position in circular formation angle = (idx * 360.0 / len(unit_ids)) * (3.14159 / 180.0) offset_x = radius * (1 + idx // 8) * 0.707106781 * ((idx % 2) * 2 - 1) offset_y = radius * (1 + idx // 8) * 0.707106781 * (((idx + 1) % 2) * 2 - 1) unit.target = Position( base_target.x + offset_x, base_target.y + offset_y ) # FIX: Clear combat target and set manual order flag unit.target_unit_id = None unit.manual_order = True # If it's a Harvester, enable manual control to override AI if unit.type == UnitType.HARVESTER: unit.manual_control = True # Clear AI state unit.gathering = False unit.returning = False unit.ore_target = None else: # Single unit - move to exact target print(f" Setting single unit target") for uid in unit_ids: if uid in self.game_state.units: unit = self.game_state.units[uid] print(f" Unit {uid} ({unit.type}) at ({unit.position.x:.1f},{unit.position.y:.1f}) -> ({base_target.x:.1f},{base_target.y:.1f})") unit.target = base_target # FIX: Clear combat target and set manual order flag unit.target_unit_id = None unit.manual_order = True # If it's a Harvester, enable manual control to override AI if unit.type == UnitType.HARVESTER: unit.manual_control = True # Clear AI state unit.gathering = False unit.returning = False unit.ore_target = None elif cmd_type == "attack_unit": attacker_ids = command.get("attacker_ids", []) target_id = command.get("target_id") for uid in attacker_ids: if uid in self.game_state.units and target_id in self.game_state.units: attacker = self.game_state.units[uid] attacker.target_unit_id = target_id attacker.target_building_id = None # Clear building target attacker.manual_order = True # Set manual order flag elif cmd_type == "attack_building": attacker_ids = command.get("attacker_ids", []) target_id = command.get("target_id") for uid in attacker_ids: if uid in self.game_state.units and target_id in self.game_state.buildings: attacker = self.game_state.units[uid] attacker.target_building_id = target_id attacker.target_unit_id = None # Clear unit target attacker.manual_order = True # Set manual order flag elif cmd_type == "build_unit": unit_type_str = command.get("unit_type") player_id = command.get("player_id", 0) preferred_building_id = command.get("building_id") # optional: choose production building print(f"🏭 BUILD_UNIT command received: unit_type={unit_type_str}, player={player_id}, building={preferred_building_id}") if not unit_type_str: print(" ❌ No unit_type provided") return try: unit_type = UnitType(unit_type_str) except ValueError: return # RED ALERT: Check cost! cost = UNIT_COSTS.get(unit_type, 0) player_language = self.game_state.players[player_id].language if player_id in self.game_state.players else "en" current_credits = self.game_state.players[player_id].credits if player_id in self.game_state.players else 0 if current_credits < cost: # Not enough credits! (translated) message = LOCALIZATION.translate( player_language, "notification.insufficient_credits", cost=cost, current=current_credits ) await self.broadcast({ "type": "notification", "message": message, "level": "error" }) return # Find required building type required_building = PRODUCTION_REQUIREMENTS.get(unit_type) if not required_building: return # If provided, use preferred building if valid suitable_building = None if preferred_building_id and preferred_building_id in self.game_state.buildings: b = self.game_state.buildings[preferred_building_id] if b.player_id == player_id and b.type == required_building: suitable_building = b # Otherwise choose least busy eligible building if not suitable_building: eligible = [ b for b in self.game_state.buildings.values() if b.player_id == player_id and b.type == required_building ] if eligible: suitable_building = min(eligible, key=lambda b: len(b.production_queue)) if suitable_building: # RED ALERT: Deduct credits! self.game_state.players[player_id].credits -= cost # Add to production queue suitable_building.production_queue.append(unit_type_str) print(f" ✅ Added {unit_type_str} to {suitable_building.type} queue (building {suitable_building.id})") print(f" Queue is now: {suitable_building.production_queue}") print(f" Credits: {self.game_state.players[player_id].credits}") # Translated notification unit_name = LOCALIZATION.translate(player_language, f"unit.{unit_type_str}") message = LOCALIZATION.translate(player_language, "notification.unit_training", unit=unit_name) await self.broadcast({ "type": "notification", "message": message, "level": "success" }) else: # Translated requirement message unit_name = LOCALIZATION.translate(player_language, f"unit.{unit_type_str}") building_name = LOCALIZATION.translate(player_language, f"building.{required_building.value}") message = LOCALIZATION.translate( player_language, "notification.unit_requires", unit=unit_name, requirement=building_name ) await self.broadcast({ "type": "notification", "message": message, "level": "error" }) elif cmd_type == "build_building": building_type_str = command.get("building_type") position = command.get("position") player_id = command.get("player_id", 0) if not building_type_str or not position: return try: building_type = BuildingType(building_type_str) except ValueError: return # RED ALERT: Check cost! cost = BUILDING_COSTS.get(building_type, 0) player_language = self.game_state.players[player_id].language if player_id in self.game_state.players else "en" current_credits = self.game_state.players[player_id].credits if player_id in self.game_state.players else 0 if current_credits < cost: # Not enough credits! (translated) message = LOCALIZATION.translate( player_language, "notification.insufficient_credits", cost=cost, current=current_credits ) await self.broadcast({ "type": "notification", "message": message, "level": "error" }) return # Rule: limit multiple same-type buildings if disabled if not ALLOW_MULTIPLE_SAME_BUILDING and building_type != BuildingType.HQ: for b in self.game_state.buildings.values(): if b.player_id == player_id and b.type == building_type: message = LOCALIZATION.translate(player_language, "notification.building_limit_one", building=LOCALIZATION.translate(player_language, f"building.{building_type_str}")) await self.broadcast({"type":"notification","message":message,"level":"error"}) return # Enforce HQ build radius # Find player's HQ hq = None for b in self.game_state.buildings.values(): if b.player_id == player_id and b.type == BuildingType.HQ: hq = b break if hq and position and "x" in position and "y" in position: max_dist = HQ_BUILD_RADIUS_TILES * TILE_SIZE dx = position["x"] - hq.position.x dy = position["y"] - hq.position.y if (dx*dx + dy*dy) ** 0.5 > max_dist: message = LOCALIZATION.translate(player_language, "notification.building_too_far_from_hq") await self.broadcast({"type":"notification","message":message,"level":"error"}) return # RED ALERT: Deduct credits! self.game_state.players[player_id].credits -= cost if position and "x" in position and "y" in position: self.game_state.create_building( building_type, player_id, Position(position["x"], position["y"]) ) # Translated notification building_name = LOCALIZATION.translate(player_language, f"building.{building_type_str}") message = LOCALIZATION.translate(player_language, "notification.building_placed", building=building_name) await self.broadcast({ "type": "notification", "message": message, "level": "success" }) elif cmd_type == "stop_units": unit_ids = command.get("unit_ids", []) for uid in unit_ids: if uid in self.game_state.units: self.game_state.units[uid].target = None elif cmd_type == "prepare_nuke": player_id = command.get("player_id", 0) if player_id in self.game_state.players: player = self.game_state.players[player_id] if player.superweapon_ready: player.nuke_preparing = True await self.broadcast({ "type": "nuke_preparing", "player_id": player_id }) elif cmd_type == "cancel_nuke": player_id = command.get("player_id", 0) if player_id in self.game_state.players: self.game_state.players[player_id].nuke_preparing = False elif cmd_type == "launch_nuke": player_id = command.get("player_id", 0) target = command.get("target") if player_id in self.game_state.players and target: player = self.game_state.players[player_id] if player.superweapon_ready and player.nuke_preparing: target_pos = Position(target["x"], target["y"]) # Launch nuke effect await self.launch_nuke(player_id, target_pos) # Reset superweapon state player.superweapon_ready = False player.superweapon_charge = 0 player.nuke_preparing = False # Broadcast nuke launch await self.broadcast({ "type": "nuke_launched", "player_id": player_id, "target": {"x": target_pos.x, "y": target_pos.y} }) elif cmd_type == "change_language": player_id = command.get("player_id", 0) language = command.get("language", "en") if player_id in self.game_state.players: # Validate language supported = list(LOCALIZATION.get_supported_languages()) if language in supported: self.game_state.players[player_id].language = language # Trigger immediate AI analysis in new language self.last_ai_analysis_time = 0 await self.broadcast({ "type": "notification", "message": f"Language changed to {LOCALIZATION.get_display_name(language)}", "level": "info" }) elif cmd_type == "request_ai_analysis": # Force immediate AI analysis await self.run_ai_analysis() await self.broadcast({ "type": "ai_analysis_update", "analysis": self.last_ai_analysis }) # Global connection manager manager = ConnectionManager() # Routes @app.get("/") async def get_home(): """Serve the main game interface""" return HTMLResponse(content=open("static/index.html").read()) @app.get("/health") async def health_check(): """Health check endpoint for HuggingFace Spaces""" return { "status": "healthy", "players": len(manager.game_state.players), "units": len(manager.game_state.units), "buildings": len(manager.game_state.buildings), "active_connections": len(manager.active_connections), "ai_available": manager.ai_analyzer.model_available, "supported_languages": list(LOCALIZATION.get_supported_languages()) } @app.get("/api/languages") async def get_languages(): """Get supported languages""" languages = [] for lang_code in LOCALIZATION.get_supported_languages(): languages.append({ "code": lang_code, "name": LOCALIZATION.get_display_name(lang_code) }) return {"languages": languages} @app.get("/api/translations/{language}") async def get_translations(language: str): """Get all translations for a language""" from localization import TRANSLATIONS if language not in TRANSLATIONS: language = "en" return {"translations": TRANSLATIONS[language], "language": language} @app.post("/api/player/{player_id}/language") async def set_player_language(player_id: int, language: str): """Set player's preferred language""" if player_id in manager.game_state.players: manager.game_state.players[player_id].language = language return {"success": True, "language": language} return {"success": False, "error": "Player not found"} @app.get("/api/ai/status") async def get_ai_status(): """Get AI analyzer status""" return { "available": manager.ai_analyzer.model_available, "model_path": manager.ai_analyzer.model_path if manager.ai_analyzer.model_available else None, "last_analysis": manager.last_ai_analysis } @app.post("/api/nl/translate") async def translate_nl_command(data: dict): """ Translate natural language command to MCP JSON Body: {"command": str, "language": str (optional)} """ translator = get_nl_translator() command = data.get("command", "") language = data.get("language", None) if not command: return {"success": False, "error": "No command provided"} result = translator.translate_command(command, language) return result @app.get("/api/nl/examples") async def get_nl_examples(language: str = "en"): """Get example natural language commands""" translator = get_nl_translator() return { "language": language, "examples": translator.get_example_commands(language) } @app.get("/api/nl/status") async def get_nl_status(): """Get NL translator status""" translator = get_nl_translator() return { "available": translator.model_loaded, "model_path": translator.model_path, "last_error": translator.last_error } @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time game communication""" await manager.connect(websocket) try: # Send initial state await websocket.send_json({ "type": "init", "state": manager.game_state.to_dict() }) # Handle incoming messages while True: data = await websocket.receive_json() await manager.handle_command(data) except WebSocketDisconnect: manager.disconnect(websocket) except Exception as e: print(f"WebSocket error: {e}") manager.disconnect(websocket) # Mount static files (will be created next) try: app.mount("/static", StaticFiles(directory="static"), name="static") except: pass if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)