Spaces:
Sleeping
Sleeping
| """ | |
| RTS Game Web Server - FastAPI + WebSocket | |
| Optimized for HuggingFace Spaces with Docker | |
| Features: | |
| - Real-time multiplayer RTS gameplay | |
| - AI tactical analysis via Qwen2.5 LLM | |
| - Multi-language support (EN/FR/ZH-TW) | |
| - Red Alert-style mechanics | |
| """ | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import asyncio | |
| import json | |
| import random | |
| import time | |
| from typing import Dict, List, Optional, Set, Any | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| import uuid | |
| # Import localization and AI systems | |
| from localization import LOCALIZATION | |
| from ai_analysis import get_ai_analyzer, get_model_download_status | |
| from nl_translator_async import get_nl_translator | |
| # Game Constants | |
| TILE_SIZE = 40 | |
| MAP_WIDTH = 96 | |
| MAP_HEIGHT = 72 | |
| VIEW_WIDTH = 48 | |
| VIEW_HEIGHT = 27 | |
| # Initialize FastAPI app | |
| app = FastAPI(title="RTS Game", version="1.0.0") | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| from backend.constants import ( | |
| UnitType, | |
| BuildingType, | |
| UNIT_COSTS, | |
| BUILDING_COSTS, | |
| POWER_PRODUCTION, | |
| POWER_CONSUMPTION, | |
| LOW_POWER_THRESHOLD, | |
| LOW_POWER_PRODUCTION_FACTOR, | |
| HARVESTER_CAPACITY, | |
| HARVEST_AMOUNT_PER_ORE, | |
| HARVEST_AMOUNT_PER_GEM, | |
| HQ_BUILD_RADIUS_TILES, | |
| ALLOW_MULTIPLE_SAME_BUILDING, | |
| ) | |
| class TerrainType(str, Enum): | |
| GRASS = "grass" | |
| ORE = "ore" | |
| GEM = "gem" | |
| WATER = "water" | |
| # Production Requirements - Critical for gameplay! | |
| PRODUCTION_REQUIREMENTS = { | |
| UnitType.INFANTRY: BuildingType.BARRACKS, | |
| UnitType.TANK: BuildingType.WAR_FACTORY, | |
| UnitType.ARTILLERY: BuildingType.WAR_FACTORY, | |
| UnitType.HELICOPTER: BuildingType.WAR_FACTORY, | |
| UnitType.HARVESTER: BuildingType.HQ, # Harvester needs HQ, NOT Refinery! | |
| } | |
| ## Costs, power system, and harvesting constants are imported above | |
| # Data Classes | |
| class Position: | |
| x: float | |
| y: float | |
| def distance_to(self, other: 'Position') -> float: | |
| return ((self.x - other.x) ** 2 + (self.y - other.y) ** 2) ** 0.5 | |
| def to_dict(self): | |
| return {"x": self.x, "y": self.y} | |
| class Unit: | |
| id: str | |
| type: UnitType | |
| player_id: int | |
| position: Position | |
| health: int | |
| max_health: int | |
| speed: float | |
| damage: int | |
| range: float | |
| target: Optional[Position] = None | |
| target_unit_id: Optional[str] = None | |
| target_building_id: Optional[str] = None | |
| cargo: int = 0 | |
| gathering: bool = False | |
| returning: bool = False | |
| ore_target: Optional[Position] = None | |
| last_attacker_id: Optional[str] = None | |
| manual_control: bool = False # True when player gives manual orders | |
| manual_order: bool = False # True when player gives manual move/attack order | |
| collision_radius: float = 15.0 # Collision detection radius | |
| attack_cooldown: int = 0 # Frames until next attack | |
| attack_animation: int = 0 # Frames for attack animation (for visual feedback) | |
| def to_dict(self): | |
| return { | |
| "id": self.id, | |
| "type": self.type.value, | |
| "player_id": self.player_id, | |
| "position": self.position.to_dict(), | |
| "health": self.health, | |
| "max_health": self.max_health, | |
| "speed": self.speed, | |
| "damage": self.damage, | |
| "range": self.range, | |
| "target": self.target.to_dict() if self.target else None, | |
| "target_unit_id": self.target_unit_id, | |
| "target_building_id": self.target_building_id, | |
| "cargo": self.cargo, | |
| "gathering": self.gathering, | |
| "returning": self.returning, | |
| "manual_control": self.manual_control, | |
| "manual_order": self.manual_order, | |
| "collision_radius": self.collision_radius, | |
| "attack_cooldown": self.attack_cooldown, | |
| "attack_animation": self.attack_animation | |
| } | |
| class Building: | |
| id: str | |
| type: BuildingType | |
| player_id: int | |
| position: Position | |
| health: int | |
| max_health: int | |
| production_queue: List[str] | |
| production_progress: float | |
| target_unit_id: Optional[str] = None # For defense turrets | |
| attack_cooldown: int = 0 # For defense turrets | |
| attack_animation: int = 0 # For defense turrets | |
| def to_dict(self): | |
| return { | |
| "id": self.id, | |
| "type": self.type.value, | |
| "player_id": self.player_id, | |
| "position": self.position.to_dict(), | |
| "health": self.health, | |
| "max_health": self.max_health, | |
| "production_queue": self.production_queue, | |
| "production_progress": self.production_progress, | |
| "target_unit_id": self.target_unit_id, | |
| "attack_cooldown": self.attack_cooldown, | |
| "attack_animation": self.attack_animation | |
| } | |
| class Player: | |
| id: int | |
| name: str | |
| color: str | |
| credits: int | |
| power: int | |
| power_consumption: int | |
| is_ai: bool | |
| language: str = "en" # Language preference (en, fr, zh-TW) | |
| superweapon_charge: int = 0 # 0-1800 ticks (30 seconds at 60 ticks/sec) | |
| superweapon_ready: bool = False | |
| nuke_preparing: bool = False # True when 'N' key pressed, waiting for target | |
| def to_dict(self): | |
| return asdict(self) | |
| # Game State Manager | |
| class GameState: | |
| def __init__(self): | |
| self.units: Dict[str, Unit] = {} | |
| self.buildings: Dict[str, Building] = {} | |
| self.players: Dict[int, Player] = {} | |
| self.terrain: List[List[TerrainType]] = [] | |
| self.fog_of_war: List[List[bool]] = [] | |
| self.game_started = False | |
| self.game_over = False | |
| self.winner: Optional[str] = None # "player" or "enemy" | |
| self.tick = 0 | |
| self.init_map() | |
| self.init_players() | |
| def init_map(self): | |
| """Initialize terrain with grass, ore, and water""" | |
| self.terrain = [[TerrainType.GRASS for _ in range(MAP_WIDTH)] for _ in range(MAP_HEIGHT)] | |
| self.fog_of_war = [[True for _ in range(MAP_WIDTH)] for _ in range(MAP_HEIGHT)] | |
| # Add ore patches | |
| for _ in range(15): | |
| ox, oy = random.randint(5, MAP_WIDTH-6), random.randint(5, MAP_HEIGHT-6) | |
| for dx in range(-2, 3): | |
| for dy in range(-2, 3): | |
| if 0 <= ox+dx < MAP_WIDTH and 0 <= oy+dy < MAP_HEIGHT: | |
| if random.random() > 0.3: | |
| self.terrain[oy+dy][ox+dx] = TerrainType.ORE | |
| # Add gem patches (rare) | |
| for _ in range(5): | |
| gx, gy = random.randint(5, MAP_WIDTH-6), random.randint(5, MAP_HEIGHT-6) | |
| for dx in range(-1, 2): | |
| for dy in range(-1, 2): | |
| if 0 <= gx+dx < MAP_WIDTH and 0 <= gy+dy < MAP_HEIGHT: | |
| if random.random() > 0.5: | |
| self.terrain[gy+dy][gx+dx] = TerrainType.GEM | |
| # Add water bodies | |
| for _ in range(8): | |
| wx, wy = random.randint(5, MAP_WIDTH-6), random.randint(5, MAP_HEIGHT-6) | |
| for dx in range(-3, 4): | |
| for dy in range(-3, 4): | |
| if 0 <= wx+dx < MAP_WIDTH and 0 <= wy+dy < MAP_HEIGHT: | |
| if (dx*dx + dy*dy) < 9: | |
| self.terrain[wy+dy][wx+dx] = TerrainType.WATER | |
| def init_players(self): | |
| """Initialize player 0 (human) and player 1 (AI)""" | |
| # Start with power=50 (from HQ), consumption=0 | |
| self.players[0] = Player(0, "Player", "#4A90E2", 5000, 50, 0, False) | |
| self.players[1] = Player(1, "AI", "#E74C3C", 5000, 50, 0, True) | |
| # Create starting HQ for each player | |
| hq0_id = str(uuid.uuid4()) | |
| self.buildings[hq0_id] = Building( | |
| id=hq0_id, | |
| type=BuildingType.HQ, | |
| player_id=0, | |
| position=Position(5 * TILE_SIZE, 5 * TILE_SIZE), | |
| health=500, | |
| max_health=500, | |
| production_queue=[], | |
| production_progress=0 | |
| ) | |
| hq1_id = str(uuid.uuid4()) | |
| self.buildings[hq1_id] = Building( | |
| id=hq1_id, | |
| type=BuildingType.HQ, | |
| player_id=1, | |
| position=Position((MAP_WIDTH-8) * TILE_SIZE, (MAP_HEIGHT-8) * TILE_SIZE), | |
| health=500, | |
| max_health=500, | |
| production_queue=[], | |
| production_progress=0 | |
| ) | |
| # Starting units | |
| for i in range(3): | |
| self.create_unit(UnitType.INFANTRY, 0, Position((7+i)*TILE_SIZE, 7*TILE_SIZE)) | |
| self.create_unit(UnitType.INFANTRY, 1, Position((MAP_WIDTH-10-i)*TILE_SIZE, (MAP_HEIGHT-10)*TILE_SIZE)) | |
| def create_unit(self, unit_type: UnitType, player_id: int, position: Position) -> Unit: | |
| """Create a new unit""" | |
| unit_stats = { | |
| UnitType.INFANTRY: {"health": 100, "speed": 2.0, "damage": 10, "range": 80}, | |
| UnitType.TANK: {"health": 200, "speed": 1.5, "damage": 30, "range": 120}, | |
| UnitType.HARVESTER: {"health": 150, "speed": 1.0, "damage": 0, "range": 0}, | |
| UnitType.HELICOPTER: {"health": 120, "speed": 3.0, "damage": 25, "range": 150}, | |
| UnitType.ARTILLERY: {"health": 100, "speed": 1.0, "damage": 50, "range": 200}, | |
| } | |
| stats = unit_stats[unit_type] | |
| unit_id = str(uuid.uuid4()) | |
| unit = Unit( | |
| id=unit_id, | |
| type=unit_type, | |
| player_id=player_id, | |
| position=position, | |
| health=stats["health"], | |
| max_health=stats["health"], | |
| speed=stats["speed"], | |
| damage=stats["damage"], | |
| range=stats["range"], | |
| target=None, | |
| target_unit_id=None | |
| ) | |
| self.units[unit_id] = unit | |
| return unit | |
| def create_building(self, building_type: BuildingType, player_id: int, position: Position) -> Building: | |
| """Create a new building""" | |
| building_stats = { | |
| BuildingType.HQ: {"health": 500}, | |
| BuildingType.BARRACKS: {"health": 300}, | |
| BuildingType.WAR_FACTORY: {"health": 400}, | |
| BuildingType.REFINERY: {"health": 250}, | |
| BuildingType.POWER_PLANT: {"health": 200}, | |
| BuildingType.DEFENSE_TURRET: {"health": 350}, | |
| } | |
| stats = building_stats[building_type] | |
| building_id = str(uuid.uuid4()) | |
| building = Building( | |
| id=building_id, | |
| type=building_type, | |
| player_id=player_id, | |
| position=position, | |
| health=stats["health"], | |
| max_health=stats["health"], | |
| production_queue=[], | |
| production_progress=0 | |
| ) | |
| self.buildings[building_id] = building | |
| return building | |
| def calculate_power(self, player_id: int) -> tuple[int, int, str]: | |
| """ | |
| Calculate power production and consumption for a player. | |
| Returns: | |
| tuple: (power_production, power_consumption, status) | |
| status: 'green' (enough power), 'yellow' (low power), 'red' (no power) | |
| """ | |
| production = 0 | |
| consumption = 0 | |
| for building in self.buildings.values(): | |
| if building.player_id == player_id: | |
| # Add power production | |
| production += POWER_PRODUCTION.get(building.type, 0) | |
| # Add power consumption | |
| consumption += POWER_CONSUMPTION.get(building.type, 0) | |
| # Determine status | |
| if consumption == 0: | |
| status = 'green' | |
| elif production >= consumption: | |
| status = 'green' | |
| elif production >= consumption * LOW_POWER_THRESHOLD: | |
| status = 'yellow' | |
| else: | |
| status = 'red' | |
| # Update player power values | |
| if player_id in self.players: | |
| self.players[player_id].power = production | |
| self.players[player_id].power_consumption = consumption | |
| return production, consumption, status | |
| def to_dict(self): | |
| """Convert game state to dictionary for JSON serialization""" | |
| return { | |
| "tick": self.tick, | |
| "game_started": self.game_started, | |
| "game_over": self.game_over, | |
| "winner": self.winner, | |
| "players": {pid: p.to_dict() for pid, p in self.players.items()}, | |
| "units": {uid: u.to_dict() for uid, u in self.units.items()}, | |
| "buildings": {bid: b.to_dict() for bid, b in self.buildings.items()}, | |
| "terrain": [[t.value for t in row] for row in self.terrain], | |
| "fog_of_war": self.fog_of_war | |
| } | |
| # WebSocket Connection Manager | |
| class ConnectionManager: | |
| def __init__(self): | |
| self.active_connections: List[WebSocket] = [] | |
| self.game_state = GameState() | |
| self.game_loop_task: Optional[asyncio.Task] = None | |
| self.ai_analyzer = get_ai_analyzer() | |
| self.last_ai_analysis: Dict[str, Any] = {} | |
| self.ai_analysis_interval = 60.0 # Analyze every 60 seconds (reduced frequency to avoid blocking) | |
| self.last_ai_analysis_time = 0.0 | |
| # RED ALERT: Enemy AI state | |
| self.ai_last_action_tick = 0 | |
| self.ai_action_interval = 120 # Take action every 6 seconds (120 ticks at 20Hz) | |
| self.ai_build_plan = [ | |
| 'power_plant', | |
| 'refinery', | |
| 'barracks', | |
| 'power_plant', # Second power plant | |
| 'war_factory', | |
| ] | |
| self.ai_build_index = 0 | |
| self.ai_unit_cycle = ['infantry', 'infantry', 'tank', 'infantry', 'helicopter'] | |
| self.ai_unit_index = 0 | |
| async def connect(self, websocket: WebSocket): | |
| await websocket.accept() | |
| self.active_connections.append(websocket) | |
| # Start game loop if not already running | |
| if self.game_loop_task is None or self.game_loop_task.done(): | |
| self.game_loop_task = asyncio.create_task(self.game_loop()) | |
| def disconnect(self, websocket: WebSocket): | |
| if websocket in self.active_connections: | |
| self.active_connections.remove(websocket) | |
| async def broadcast(self, message: dict): | |
| """Send message to all connected clients""" | |
| disconnected = [] | |
| for connection in self.active_connections: | |
| try: | |
| await connection.send_json(message) | |
| except: | |
| disconnected.append(connection) | |
| # Clean up disconnected clients | |
| for conn in disconnected: | |
| self.disconnect(conn) | |
| async def game_loop(self): | |
| """Main game loop - runs at 20 ticks per second""" | |
| print("🔄 Game loop STARTED") | |
| while True: # Run forever, check connections inside loop | |
| try: | |
| # Update game state | |
| self.update_game_state() | |
| # AI Analysis (periodic) - only if model is available | |
| current_time = time.time() | |
| if (self.ai_analyzer.model_available and | |
| current_time - self.last_ai_analysis_time >= self.ai_analysis_interval): | |
| await self.run_ai_analysis() | |
| self.last_ai_analysis_time = current_time | |
| # Broadcast state to all clients | |
| state_dict = self.game_state.to_dict() | |
| state_dict['ai_analysis'] = self.last_ai_analysis # Include AI insights | |
| # Include model download status so UI can show progress | |
| if not self.ai_analyzer.model_available: | |
| state_dict['model_download'] = get_model_download_status() | |
| # Broadcast only if there are active connections | |
| if self.active_connections: | |
| await self.broadcast({ | |
| "type": "state_update", | |
| "state": state_dict | |
| }) | |
| # 50ms delay = 20 ticks/sec | |
| await asyncio.sleep(0.05) | |
| except Exception as e: | |
| print(f"Game loop error: {e}") | |
| await asyncio.sleep(0.1) | |
| async def run_ai_analysis(self): | |
| """Run AI tactical analysis in background""" | |
| # Skip if model not available | |
| if not self.ai_analyzer.model_available: | |
| # Provide heuristic analysis so panel is never empty | |
| player_lang = self.game_state.players.get(0, Player(0, "Player", "#000", 0, 0, 0, False)).language | |
| self.last_ai_analysis = self.ai_analyzer._heuristic_analysis(self.game_state.to_dict(), player_lang) | |
| return | |
| try: | |
| # Get player language preference | |
| player_lang = self.game_state.players.get(0, Player(0, "Player", "#000", 0, 0, 0, False)).language | |
| # Run analysis in thread pool to avoid blocking | |
| loop = asyncio.get_event_loop() | |
| analysis = await loop.run_in_executor( | |
| None, | |
| self.ai_analyzer.summarize_combat_situation, | |
| self.game_state.to_dict(), | |
| player_lang | |
| ) | |
| self.last_ai_analysis = analysis | |
| # Don't print every time to avoid console spam | |
| # print(f"🤖 AI Analysis: {analysis.get('summary', '')}") | |
| except Exception as e: | |
| print(f"⚠️ AI analysis error: {e}") | |
| player_lang = self.game_state.players.get(0, Player(0, "Player", "#000", 0, 0, 0, False)).language | |
| self.last_ai_analysis = self.ai_analyzer._heuristic_analysis(self.game_state.to_dict(), player_lang) | |
| def update_game_state(self): | |
| """Update game simulation - Red Alert style!""" | |
| self.game_state.tick += 1 | |
| # DEBUG: Log every 5 seconds to confirm game loop is running | |
| if self.game_state.tick % 100 == 0: | |
| print(f"⏱️ Game tick: {self.game_state.tick} (loop running)") | |
| # Cleanup old LLM requests every 30 seconds (600 ticks at 20Hz) | |
| if self.game_state.tick % 600 == 0: | |
| from model_manager import get_shared_model | |
| model = get_shared_model() | |
| model.cleanup_old_requests(max_age=300.0) # 5 minutes | |
| # Also cleanup translator | |
| translator = get_nl_translator() | |
| translator.cleanup_old_requests(max_age=60.0) # 1 minute | |
| # Update superweapon charge (30 seconds = 1800 ticks at 60 ticks/sec) | |
| for player in self.game_state.players.values(): | |
| if not player.superweapon_ready and player.superweapon_charge < 1800: | |
| player.superweapon_charge += 1 | |
| if player.superweapon_charge >= 1800: | |
| player.superweapon_ready = True | |
| # RED ALERT: Calculate power for both players | |
| power_prod_p0, power_cons_p0, power_status_p0 = self.game_state.calculate_power(0) | |
| power_prod_p1, power_cons_p1, power_status_p1 = self.game_state.calculate_power(1) | |
| # Store power status for later use (warning every 5 seconds = 100 ticks at 20Hz) | |
| if not hasattr(self, 'last_low_power_warning'): | |
| self.last_low_power_warning = 0 | |
| if power_status_p0 == 'red' and self.game_state.tick - self.last_low_power_warning > 100: | |
| # Send low power warning to player (translated) | |
| player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en" | |
| message = LOCALIZATION.translate(player_language, "notification.low_power") | |
| asyncio.create_task(self.broadcast({ | |
| "type": "notification", | |
| "message": message, | |
| "level": "warning" | |
| })) | |
| self.last_low_power_warning = self.game_state.tick | |
| # RED ALERT: Enemy AI strategic decisions | |
| if self.game_state.tick - self.ai_last_action_tick >= self.ai_action_interval: | |
| self.update_enemy_ai() | |
| self.ai_last_action_tick = self.game_state.tick | |
| # Check victory conditions (no HQ = defeat) | |
| if not self.game_state.game_over: | |
| player_hq_exists = any(b.type == BuildingType.HQ and b.player_id == 0 | |
| for b in self.game_state.buildings.values()) | |
| enemy_hq_exists = any(b.type == BuildingType.HQ and b.player_id == 1 | |
| for b in self.game_state.buildings.values()) | |
| if not player_hq_exists and enemy_hq_exists: | |
| # Player lost | |
| self.game_state.game_over = True | |
| self.game_state.winner = "enemy" | |
| player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en" | |
| winner_name = LOCALIZATION.translate(player_language, "game.winner.enemy") | |
| message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name) | |
| asyncio.create_task(self.broadcast({ | |
| "type": "game_over", | |
| "winner": "enemy", | |
| "message": message | |
| })) | |
| elif not enemy_hq_exists and player_hq_exists: | |
| # Player won! | |
| self.game_state.game_over = True | |
| self.game_state.winner = "player" | |
| player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en" | |
| winner_name = LOCALIZATION.translate(player_language, "game.winner.player") | |
| message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name) | |
| asyncio.create_task(self.broadcast({ | |
| "type": "game_over", | |
| "winner": "player", | |
| "message": message | |
| })) | |
| elif not player_hq_exists and not enemy_hq_exists: | |
| # Draw (both destroyed simultaneously) | |
| self.game_state.game_over = True | |
| self.game_state.winner = "draw" | |
| asyncio.create_task(self.broadcast({ | |
| "type": "game_over", | |
| "winner": "draw", | |
| "message": "Draw! Both HQs destroyed" | |
| })) | |
| # Update units | |
| for unit in list(self.game_state.units.values()): | |
| # RED ALERT: Harvester AI (only if not manually controlled) | |
| if unit.type == UnitType.HARVESTER and not unit.manual_control: | |
| self.update_harvester(unit) | |
| # Don't continue - let it move with the target set by AI | |
| # RED ALERT: Auto-defense - if attacked, fight back! (but respect manual orders) | |
| if unit.last_attacker_id and unit.last_attacker_id in self.game_state.units: | |
| if not unit.target_unit_id and not unit.manual_order: # Not already attacking and no manual order | |
| unit.target_unit_id = unit.last_attacker_id | |
| # Don't clear movement target if player gave manual move order | |
| # RED ALERT: Auto-acquire nearby enemies when idle (but respect manual orders) | |
| if not unit.target_unit_id and not unit.target and unit.damage > 0 and not unit.manual_order: | |
| nearest_enemy = self.find_nearest_enemy(unit) | |
| if nearest_enemy and unit.position.distance_to(nearest_enemy.position) < unit.range * 3: | |
| unit.target_unit_id = nearest_enemy.id | |
| # Handle combat | |
| if unit.target_unit_id: | |
| if unit.target_unit_id in self.game_state.units: | |
| target = self.game_state.units[unit.target_unit_id] | |
| distance = unit.position.distance_to(target.position) | |
| if distance <= unit.range: | |
| # In range - attack! | |
| unit.target = None # Stop moving | |
| # Cooldown system - attack every N frames | |
| if unit.attack_cooldown <= 0: | |
| # Calculate damage based on unit type | |
| # Infantry: 5-10 damage per hit, fast attacks (20 frames) | |
| # Tank: 30-40 damage per hit, slow attacks (40 frames) | |
| # Artillery: 50-60 damage per hit, very slow (60 frames) | |
| # Helicopter: 15-20 damage per hit, medium speed (30 frames) | |
| damage_multipliers = { | |
| UnitType.INFANTRY: (5, 20), # (damage, cooldown) | |
| UnitType.TANK: (35, 40), | |
| UnitType.ARTILLERY: (55, 60), | |
| UnitType.HELICOPTER: (18, 30), | |
| UnitType.HARVESTER: (0, 0), | |
| } | |
| damage, cooldown = damage_multipliers.get(unit.type, (5, 20)) | |
| # Apply damage | |
| target.health -= damage | |
| unit.attack_cooldown = cooldown | |
| unit.attack_animation = 10 # 10 frames of attack animation | |
| target.last_attacker_id = unit.id # RED ALERT: Track attacker for auto-defense | |
| if target.health <= 0: | |
| # Target destroyed | |
| del self.game_state.units[unit.target_unit_id] | |
| unit.target_unit_id = None | |
| unit.last_attacker_id = None | |
| else: | |
| # Move closer | |
| unit.target = Position(target.position.x, target.position.y) | |
| else: | |
| # Target no longer exists | |
| unit.target_unit_id = None | |
| # Handle building attacks | |
| if unit.target_building_id: | |
| if unit.target_building_id in self.game_state.buildings: | |
| target = self.game_state.buildings[unit.target_building_id] | |
| distance = unit.position.distance_to(target.position) | |
| if distance <= unit.range: | |
| # In range - attack! | |
| unit.target = None # Stop moving | |
| # Cooldown system - attack every N frames | |
| if unit.attack_cooldown <= 0: | |
| damage_multipliers = { | |
| UnitType.INFANTRY: (5, 20), # (damage, cooldown) | |
| UnitType.TANK: (35, 40), | |
| UnitType.ARTILLERY: (55, 60), | |
| UnitType.HELICOPTER: (18, 30), | |
| UnitType.HARVESTER: (0, 0), | |
| } | |
| damage, cooldown = damage_multipliers.get(unit.type, (5, 20)) | |
| # Apply damage to building | |
| target.health -= damage | |
| unit.attack_cooldown = cooldown | |
| unit.attack_animation = 10 # 10 frames of attack animation | |
| if target.health <= 0: | |
| # Building destroyed | |
| del self.game_state.buildings[unit.target_building_id] | |
| unit.target_building_id = None | |
| else: | |
| # Move closer | |
| unit.target = Position(target.position.x, target.position.y) | |
| else: | |
| # Target no longer exists | |
| unit.target_building_id = None | |
| # Decrease attack cooldown and animation | |
| if unit.attack_cooldown > 0: | |
| unit.attack_cooldown -= 1 | |
| if unit.attack_animation > 0: | |
| unit.attack_animation -= 1 | |
| # Movement | |
| # DEBUG: Log unit target status for player units | |
| if unit.player_id == 0 and self.game_state.tick % 40 == 0: # Every 2 seconds | |
| if unit.target: | |
| print(f" 🎯 Unit {unit.id[:8]} ({unit.type}) HAS target: ({unit.target.x:.1f}, {unit.target.y:.1f}), pos: ({unit.position.x:.1f}, {unit.position.y:.1f})") | |
| else: | |
| print(f" ❌ Unit {unit.id[:8]} ({unit.type}) NO target, pos: ({unit.position.x:.1f}, {unit.position.y:.1f})") | |
| if unit.target: | |
| # Move towards target | |
| dx = unit.target.x - unit.position.x | |
| dy = unit.target.y - unit.position.y | |
| dist = (dx*dx + dy*dy) ** 0.5 | |
| if dist > 5: | |
| old_x, old_y = unit.position.x, unit.position.y | |
| unit.position.x += (dx / dist) * unit.speed | |
| unit.position.y += (dy / dist) * unit.speed | |
| # Apply dispersion after movement | |
| self.apply_unit_dispersion(unit) | |
| # Debug log for movement (only log occasionally to avoid spam) | |
| if self.game_state.tick % 20 == 0: # Every second (20 ticks/sec) | |
| print(f" 🚶 Unit {unit.id[:8]} ({unit.type}) moving: ({old_x:.1f},{old_y:.1f}) -> ({unit.position.x:.1f},{unit.position.y:.1f}), dist={dist:.1f}") | |
| else: | |
| print(f" ✅ Unit {unit.id[:8]} ({unit.type}) reached destination ({unit.position.x:.1f},{unit.position.y:.1f})") | |
| unit.target = None | |
| unit.manual_order = False # Clear manual order flag when destination reached | |
| # If Harvester reached manual destination, resume AI | |
| if unit.type == UnitType.HARVESTER and unit.manual_control: | |
| unit.manual_control = False | |
| # RED ALERT: AI unit behavior (enemy side) | |
| if self.game_state.players[unit.player_id].is_ai: | |
| self.update_ai_unit(unit) | |
| # Update buildings production | |
| for building in self.game_state.buildings.values(): | |
| # Defense turret auto-attack logic | |
| if building.type == BuildingType.DEFENSE_TURRET: | |
| turret_range = 300.0 # Defense turret range | |
| # Find nearest enemy unit | |
| if not building.target_unit_id or building.target_unit_id not in self.game_state.units: | |
| min_dist = float('inf') | |
| nearest_enemy = None | |
| for enemy_unit in self.game_state.units.values(): | |
| if enemy_unit.player_id != building.player_id: | |
| dist = building.position.distance_to(enemy_unit.position) | |
| if dist < turret_range and dist < min_dist: | |
| min_dist = dist | |
| nearest_enemy = enemy_unit | |
| if nearest_enemy: | |
| building.target_unit_id = nearest_enemy.id | |
| # Attack target if in range | |
| if building.target_unit_id and building.target_unit_id in self.game_state.units: | |
| target = self.game_state.units[building.target_unit_id] | |
| distance = building.position.distance_to(target.position) | |
| if distance <= turret_range: | |
| # Attack! | |
| if building.attack_cooldown <= 0: | |
| damage = 20 # Turret damage | |
| target.health -= damage | |
| building.attack_cooldown = 30 # 30 frames cooldown | |
| building.attack_animation = 10 | |
| if target.health <= 0: | |
| # Target destroyed | |
| del self.game_state.units[building.target_unit_id] | |
| building.target_unit_id = None | |
| else: | |
| # Out of range, lose target | |
| building.target_unit_id = None | |
| # Decrease cooldowns | |
| if building.attack_cooldown > 0: | |
| building.attack_cooldown -= 1 | |
| if building.attack_animation > 0: | |
| building.attack_animation -= 1 | |
| # DEBUG: Log production queue status | |
| if building.player_id == 0 and building.type in [BuildingType.BARRACKS, BuildingType.WAR_FACTORY]: | |
| if self.game_state.tick % 60 == 0: # Every 3 seconds | |
| if building.production_queue: | |
| print(f" 📋 Building {building.id[:8]} ({building.type}) queue: {building.production_queue}, progress: {building.production_progress:.2%}") | |
| else: | |
| print(f" 💤 Building {building.id[:8]} ({building.type}) IDLE (no queue)") | |
| if building.production_queue: | |
| # RED ALERT: Check power status for this building's player | |
| _, _, power_status = self.game_state.calculate_power(building.player_id) | |
| # Adjust production speed based on power | |
| production_speed = 0.01 | |
| if power_status == 'red': | |
| production_speed *= LOW_POWER_PRODUCTION_FACTOR # 50% speed when low power | |
| building.production_progress += production_speed | |
| # Debug log every 2 seconds (40 ticks at 20Hz) | |
| if self.game_state.tick % 40 == 0: | |
| print(f" 🏭 Building {building.id[:8]} ({building.type}) producing: {building.production_queue[0]} - progress: {building.production_progress:.2%}") | |
| if building.production_progress >= 1.0: | |
| # Complete production | |
| print(f" ✅ Production complete! Creating {building.production_queue[0]}") | |
| unit_type = UnitType(building.production_queue.pop(0)) | |
| spawn_pos = Position( | |
| building.position.x + TILE_SIZE * 2, | |
| building.position.y + TILE_SIZE * 2 | |
| ) | |
| # Find free position near spawn point | |
| new_unit = self.game_state.create_unit(unit_type, building.player_id, spawn_pos) | |
| if new_unit: | |
| free_pos = self.find_free_position_nearby(spawn_pos, new_unit.id) | |
| new_unit.position = free_pos | |
| building.production_progress = 0 | |
| # Endgame checks: elimination-first (after all destructions this tick) | |
| if not self.game_state.game_over: | |
| p_units = sum(1 for u in self.game_state.units.values() if u.player_id == 0) | |
| e_units = sum(1 for u in self.game_state.units.values() if u.player_id == 1) | |
| p_buildings = sum(1 for b in self.game_state.buildings.values() if b.player_id == 0) | |
| e_buildings = sum(1 for b in self.game_state.buildings.values() if b.player_id == 1) | |
| player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en" | |
| if p_units == 0 and p_buildings == 0 and (e_units > 0 or e_buildings > 0): | |
| # Player eliminated | |
| self.game_state.game_over = True | |
| self.game_state.winner = "enemy" | |
| winner_name = LOCALIZATION.translate(player_language, "game.winner.enemy") | |
| message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name) | |
| asyncio.create_task(self.broadcast({ | |
| "type": "game_over", | |
| "winner": "enemy", | |
| "message": message | |
| })) | |
| elif e_units == 0 and e_buildings == 0 and (p_units > 0 or p_buildings > 0): | |
| # Enemy eliminated | |
| self.game_state.game_over = True | |
| self.game_state.winner = "player" | |
| winner_name = LOCALIZATION.translate(player_language, "game.winner.player") | |
| message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name) | |
| asyncio.create_task(self.broadcast({ | |
| "type": "game_over", | |
| "winner": "player", | |
| "message": message | |
| })) | |
| elif p_units == 0 and p_buildings == 0 and e_units == 0 and e_buildings == 0: | |
| # Total annihilation -> draw | |
| self.game_state.game_over = True | |
| self.game_state.winner = "draw" | |
| # Localized draw banner if available | |
| try: | |
| draw_msg = LOCALIZATION.translate(player_language, "game.draw.banner") | |
| except Exception: | |
| draw_msg = "Draw!" | |
| asyncio.create_task(self.broadcast({ | |
| "type": "game_over", | |
| "winner": "draw", | |
| "message": draw_msg | |
| })) | |
| def find_nearest_enemy(self, unit: Unit) -> Optional[Unit]: | |
| """RED ALERT: Find nearest enemy unit""" | |
| min_dist = float('inf') | |
| nearest_enemy = None | |
| for other_unit in self.game_state.units.values(): | |
| if other_unit.player_id != unit.player_id: | |
| dist = unit.position.distance_to(other_unit.position) | |
| if dist < min_dist: | |
| min_dist = dist | |
| nearest_enemy = other_unit | |
| return nearest_enemy | |
| def is_position_occupied(self, position: Position, current_unit_id: str, radius: float = 15.0) -> bool: | |
| """Check if a position is occupied by another unit""" | |
| for unit_id, unit in self.game_state.units.items(): | |
| if unit_id == current_unit_id: | |
| continue | |
| distance = position.distance_to(unit.position) | |
| if distance < (radius + unit.collision_radius): | |
| return True | |
| return False | |
| def find_free_position_nearby(self, position: Position, unit_id: str, max_attempts: int = 16) -> Position: | |
| """Find a free position around the given position using spiral search""" | |
| # Check if current position is free | |
| if not self.is_position_occupied(position, unit_id): | |
| return position | |
| # Spiral search outward with circular pattern | |
| import math | |
| for ring in range(1, max_attempts + 1): | |
| # Number of positions to test in this ring (8 directions) | |
| num_positions = 8 | |
| radius = 25.0 * ring # Increase radius with each ring | |
| for i in range(num_positions): | |
| angle = (i * 360.0 / num_positions) * (math.pi / 180.0) # Convert to radians | |
| offset_x = radius * math.cos(angle) | |
| offset_y = radius * math.sin(angle) | |
| new_pos = Position( | |
| position.x + offset_x, | |
| position.y + offset_y | |
| ) | |
| # Keep within map bounds | |
| new_pos.x = max(TILE_SIZE, min(MAP_WIDTH * TILE_SIZE - TILE_SIZE, new_pos.x)) | |
| new_pos.y = max(TILE_SIZE, min(MAP_HEIGHT * TILE_SIZE - TILE_SIZE, new_pos.y)) | |
| if not self.is_position_occupied(new_pos, unit_id): | |
| return new_pos | |
| # If no free position found, return original (fallback) | |
| return position | |
| def apply_unit_dispersion(self, unit: Unit): | |
| """Apply automatic dispersion to prevent units from overlapping""" | |
| if self.is_position_occupied(unit.position, unit.id): | |
| new_position = self.find_free_position_nearby(unit.position, unit.id) | |
| unit.position = new_position | |
| def update_harvester(self, unit: Unit): | |
| """RED ALERT: Harvester AI - auto-collect resources!""" | |
| # If returning to base with cargo | |
| if unit.returning: | |
| # Find nearest Refinery or HQ | |
| depot = self.find_nearest_depot(unit.player_id, unit.position) | |
| if depot: | |
| distance = unit.position.distance_to(depot.position) | |
| if distance < TILE_SIZE * 2: | |
| # Deposit cargo | |
| self.game_state.players[unit.player_id].credits += unit.cargo | |
| unit.cargo = 0 | |
| unit.returning = False | |
| unit.gathering = False | |
| unit.ore_target = None | |
| unit.target = None # Clear target after deposit | |
| unit.manual_control = False # Resume AI after deposit | |
| else: | |
| # Move to depot | |
| unit.target = Position(depot.position.x, depot.position.y) | |
| else: | |
| # No depot - stop returning | |
| unit.returning = False | |
| return | |
| # If at ore patch, harvest it | |
| if unit.ore_target: | |
| distance = ((unit.position.x - unit.ore_target.x) ** 2 + | |
| (unit.position.y - unit.ore_target.y) ** 2) ** 0.5 | |
| if distance < TILE_SIZE / 2: | |
| # Harvest ore | |
| tile_x = int(unit.ore_target.x / TILE_SIZE) | |
| tile_y = int(unit.ore_target.y / TILE_SIZE) | |
| if (0 <= tile_x < MAP_WIDTH and 0 <= tile_y < MAP_HEIGHT): | |
| terrain = self.game_state.terrain[tile_y][tile_x] | |
| if terrain == TerrainType.ORE: | |
| unit.cargo = min(HARVESTER_CAPACITY, unit.cargo + HARVEST_AMOUNT_PER_ORE) | |
| self.game_state.terrain[tile_y][tile_x] = TerrainType.GRASS | |
| elif terrain == TerrainType.GEM: | |
| unit.cargo = min(HARVESTER_CAPACITY, unit.cargo + HARVEST_AMOUNT_PER_GEM) | |
| self.game_state.terrain[tile_y][tile_x] = TerrainType.GRASS | |
| unit.ore_target = None | |
| unit.gathering = False | |
| # If cargo full or nearly full, return | |
| if unit.cargo >= HARVESTER_CAPACITY * 0.9: | |
| unit.returning = True | |
| unit.target = None | |
| else: | |
| # Move to ore | |
| unit.target = unit.ore_target | |
| return | |
| # FIXED: Always search for ore when idle (not gathering and no ore target) | |
| # This ensures Harvester automatically finds ore after spawning or depositing | |
| if not unit.gathering and not unit.ore_target: | |
| nearest_ore = self.find_nearest_ore(unit.position) | |
| if nearest_ore: | |
| unit.ore_target = nearest_ore | |
| unit.gathering = True | |
| unit.target = nearest_ore | |
| # If no ore found, clear any residual target to stay idle | |
| elif unit.target: | |
| unit.target = None | |
| def find_nearest_depot(self, player_id: int, position: Position) -> Optional[Building]: | |
| """Find nearest Refinery or HQ for harvester""" | |
| nearest = None | |
| min_dist = float('inf') | |
| for building in self.game_state.buildings.values(): | |
| if building.player_id == player_id: | |
| if building.type in [BuildingType.REFINERY, BuildingType.HQ]: | |
| dist = position.distance_to(building.position) | |
| if dist < min_dist: | |
| min_dist = dist | |
| nearest = building | |
| return nearest | |
| def find_nearest_ore(self, position: Position) -> Optional[Position]: | |
| """Find nearest ore or gem tile""" | |
| nearest = None | |
| min_dist = float('inf') | |
| for y in range(MAP_HEIGHT): | |
| for x in range(MAP_WIDTH): | |
| if self.game_state.terrain[y][x] in [TerrainType.ORE, TerrainType.GEM]: | |
| ore_pos = Position(x * TILE_SIZE + TILE_SIZE/2, y * TILE_SIZE + TILE_SIZE/2) | |
| dist = position.distance_to(ore_pos) | |
| if dist < min_dist: | |
| min_dist = dist | |
| nearest = ore_pos | |
| return nearest | |
| def update_ai_unit(self, unit: Unit): | |
| """RED ALERT: Enemy AI behavior - aggressive!""" | |
| if unit.damage == 0: # Don't attack with harvesters | |
| return | |
| # Always try to attack nearest enemy | |
| if not unit.target_unit_id: | |
| nearest_enemy = self.find_nearest_enemy(unit) | |
| if nearest_enemy: | |
| distance = unit.position.distance_to(nearest_enemy.position) | |
| # Attack if within aggro range | |
| if distance < 500: # Aggro range | |
| unit.target_unit_id = nearest_enemy.id | |
| def update_enemy_ai(self): | |
| """RED ALERT: Enemy AI strategic decision making""" | |
| player_ai = self.game_state.players[1] | |
| # 1. Check if AI should build next building | |
| if self.ai_build_index < len(self.ai_build_plan): | |
| next_building_type = self.ai_build_plan[self.ai_build_index] | |
| building_type = BuildingType(next_building_type) | |
| cost = BUILDING_COSTS.get(building_type, 0) | |
| # Check if AI can afford it | |
| if player_ai.credits >= cost: | |
| # Check if prerequisites are met (simplified) | |
| can_build = True | |
| # Check power plant requirement | |
| if building_type in [BuildingType.BARRACKS, BuildingType.REFINERY, BuildingType.WAR_FACTORY]: | |
| has_power_plant = any( | |
| b.type == BuildingType.POWER_PLANT and b.player_id == 1 | |
| for b in self.game_state.buildings.values() | |
| ) | |
| if not has_power_plant: | |
| can_build = False | |
| # Check barracks requirement for war factory | |
| if building_type == BuildingType.WAR_FACTORY: | |
| has_barracks = any( | |
| b.type == BuildingType.BARRACKS and b.player_id == 1 | |
| for b in self.game_state.buildings.values() | |
| ) | |
| if not has_barracks: | |
| can_build = False | |
| if can_build: | |
| # Find build location near AI HQ | |
| ai_hq = next( | |
| (b for b in self.game_state.buildings.values() | |
| if b.player_id == 1 and b.type == BuildingType.HQ), | |
| None | |
| ) | |
| if ai_hq: | |
| # Random offset from HQ | |
| offset_x = random.randint(-200, 200) | |
| offset_y = random.randint(-200, 200) | |
| build_pos = Position( | |
| max(TILE_SIZE * 3, min(MAP_WIDTH * TILE_SIZE - TILE_SIZE * 3, | |
| ai_hq.position.x + offset_x)), | |
| max(TILE_SIZE * 3, min(MAP_HEIGHT * TILE_SIZE - TILE_SIZE * 3, | |
| ai_hq.position.y + offset_y)) | |
| ) | |
| # Deduct credits | |
| player_ai.credits -= cost | |
| # Create building immediately (simplified - no construction queue for AI) | |
| self.game_state.create_building(building_type, 1, build_pos) | |
| print(f"🤖 AI built {building_type.value} at {build_pos.x:.0f},{build_pos.y:.0f}") | |
| # Move to next building in plan | |
| self.ai_build_index += 1 | |
| # 2. Produce units if we have production buildings | |
| ai_barracks = [ | |
| b for b in self.game_state.buildings.values() | |
| if b.player_id == 1 and b.type == BuildingType.BARRACKS | |
| ] | |
| ai_war_factory = [ | |
| b for b in self.game_state.buildings.values() | |
| if b.player_id == 1 and b.type == BuildingType.WAR_FACTORY | |
| ] | |
| # Produce infantry from barracks | |
| if ai_barracks and len(ai_barracks[0].production_queue) == 0: | |
| if player_ai.credits >= UNIT_COSTS[UnitType.INFANTRY]: | |
| player_ai.credits -= UNIT_COSTS[UnitType.INFANTRY] | |
| ai_barracks[0].production_queue.append(UnitType.INFANTRY.value) | |
| print(f"🤖 AI queued Infantry") | |
| # Produce vehicles from war factory (cycle through unit types) | |
| if ai_war_factory and len(ai_war_factory[0].production_queue) == 0: | |
| next_unit_type = self.ai_unit_cycle[self.ai_unit_index] | |
| unit_type = UnitType(next_unit_type) | |
| cost = UNIT_COSTS.get(unit_type, 0) | |
| if player_ai.credits >= cost: | |
| player_ai.credits -= cost | |
| ai_war_factory[0].production_queue.append(unit_type.value) | |
| self.ai_unit_index = (self.ai_unit_index + 1) % len(self.ai_unit_cycle) | |
| print(f"🤖 AI queued {unit_type.value}") | |
| # 3. Make AI harvesters collect resources | |
| for unit in self.game_state.units.values(): | |
| if unit.player_id == 1 and unit.type == UnitType.HARVESTER: | |
| if not unit.manual_control: | |
| # Harvester AI is handled by update_harvester() in main loop | |
| pass | |
| async def launch_nuke(self, player_id: int, target: Position): | |
| """Launch nuclear strike at target location""" | |
| # Damage radius: 200 pixels = 5 tiles | |
| NUKE_DAMAGE_RADIUS = 200.0 | |
| NUKE_MAX_DAMAGE = 200 # Maximum damage at center | |
| # Damage all units within radius | |
| units_to_remove = [] | |
| for unit_id, unit in self.game_state.units.items(): | |
| distance = unit.position.distance_to(target) | |
| if distance <= NUKE_DAMAGE_RADIUS: | |
| # Damage decreases with distance (full damage at center, 50% at edge) | |
| damage_factor = 1.0 - (distance / NUKE_DAMAGE_RADIUS) * 0.5 | |
| damage = int(NUKE_MAX_DAMAGE * damage_factor) | |
| unit.health -= damage | |
| if unit.health <= 0: | |
| units_to_remove.append(unit_id) | |
| # Remove destroyed units | |
| for unit_id in units_to_remove: | |
| del self.game_state.units[unit_id] | |
| # Damage buildings within radius | |
| buildings_to_remove = [] | |
| for building_id, building in self.game_state.buildings.items(): | |
| distance = building.position.distance_to(target) | |
| if distance <= NUKE_DAMAGE_RADIUS: | |
| # Damage decreases with distance | |
| damage_factor = 1.0 - (distance / NUKE_DAMAGE_RADIUS) * 0.5 | |
| damage = int(NUKE_MAX_DAMAGE * damage_factor) | |
| building.health -= damage | |
| if building.health <= 0: | |
| buildings_to_remove.append(building_id) | |
| # Remove destroyed buildings | |
| for building_id in buildings_to_remove: | |
| del self.game_state.buildings[building_id] | |
| print(f"💥 NUKE launched by player {player_id} at ({target.x:.0f}, {target.y:.0f})") | |
| print(f" Destroyed {len(units_to_remove)} units and {len(buildings_to_remove)} buildings") | |
| async def handle_command(self, command: dict): | |
| """Handle game commands from clients""" | |
| cmd_type = command.get("type") | |
| if cmd_type == "move_unit": | |
| unit_ids = command.get("unit_ids", []) | |
| target = command.get("target") | |
| print(f"🎮 MOVE_UNIT command received: unit_ids={unit_ids}, target={target}") | |
| if target and "x" in target and "y" in target: | |
| base_target = Position(target["x"], target["y"]) | |
| print(f" Moving to position ({base_target.x:.1f}, {base_target.y:.1f})") | |
| # If multiple units, spread them in a formation | |
| if len(unit_ids) > 1: | |
| print(f" Setting formation for {len(unit_ids)} units") | |
| # Formation pattern: circular spread around target | |
| radius = 30.0 # Distance between units in formation | |
| for idx, uid in enumerate(unit_ids): | |
| if uid in self.game_state.units: | |
| unit = self.game_state.units[uid] | |
| print(f" Unit {uid} ({unit.type}) - setting target") | |
| # Calculate offset position in circular formation | |
| angle = (idx * 360.0 / len(unit_ids)) * (3.14159 / 180.0) | |
| offset_x = radius * (1 + idx // 8) * 0.707106781 * ((idx % 2) * 2 - 1) | |
| offset_y = radius * (1 + idx // 8) * 0.707106781 * (((idx + 1) % 2) * 2 - 1) | |
| unit.target = Position( | |
| base_target.x + offset_x, | |
| base_target.y + offset_y | |
| ) | |
| # FIX: Clear combat target and set manual order flag | |
| unit.target_unit_id = None | |
| unit.manual_order = True | |
| # If it's a Harvester, enable manual control to override AI | |
| if unit.type == UnitType.HARVESTER: | |
| unit.manual_control = True | |
| # Clear AI state | |
| unit.gathering = False | |
| unit.returning = False | |
| unit.ore_target = None | |
| else: | |
| # Single unit - move to exact target | |
| print(f" Setting single unit target") | |
| for uid in unit_ids: | |
| if uid in self.game_state.units: | |
| unit = self.game_state.units[uid] | |
| print(f" Unit {uid} ({unit.type}) at ({unit.position.x:.1f},{unit.position.y:.1f}) -> ({base_target.x:.1f},{base_target.y:.1f})") | |
| unit.target = base_target | |
| # FIX: Clear combat target and set manual order flag | |
| unit.target_unit_id = None | |
| unit.manual_order = True | |
| # If it's a Harvester, enable manual control to override AI | |
| if unit.type == UnitType.HARVESTER: | |
| unit.manual_control = True | |
| # Clear AI state | |
| unit.gathering = False | |
| unit.returning = False | |
| unit.ore_target = None | |
| elif cmd_type == "attack_unit": | |
| attacker_ids = command.get("attacker_ids", []) | |
| target_id = command.get("target_id") | |
| for uid in attacker_ids: | |
| if uid in self.game_state.units and target_id in self.game_state.units: | |
| attacker = self.game_state.units[uid] | |
| attacker.target_unit_id = target_id | |
| attacker.target_building_id = None # Clear building target | |
| attacker.manual_order = True # Set manual order flag | |
| elif cmd_type == "attack_building": | |
| attacker_ids = command.get("attacker_ids", []) | |
| target_id = command.get("target_id") | |
| for uid in attacker_ids: | |
| if uid in self.game_state.units and target_id in self.game_state.buildings: | |
| attacker = self.game_state.units[uid] | |
| attacker.target_building_id = target_id | |
| attacker.target_unit_id = None # Clear unit target | |
| attacker.manual_order = True # Set manual order flag | |
| elif cmd_type == "build_unit": | |
| unit_type_str = command.get("unit_type") | |
| player_id = command.get("player_id", 0) | |
| preferred_building_id = command.get("building_id") # optional: choose production building | |
| print(f"🏭 BUILD_UNIT command received: unit_type={unit_type_str}, player={player_id}, building={preferred_building_id}") | |
| if not unit_type_str: | |
| print(" ❌ No unit_type provided") | |
| return | |
| try: | |
| unit_type = UnitType(unit_type_str) | |
| except ValueError: | |
| return | |
| # RED ALERT: Check cost! | |
| cost = UNIT_COSTS.get(unit_type, 0) | |
| player_language = self.game_state.players[player_id].language if player_id in self.game_state.players else "en" | |
| current_credits = self.game_state.players[player_id].credits if player_id in self.game_state.players else 0 | |
| if current_credits < cost: | |
| # Not enough credits! (translated) | |
| message = LOCALIZATION.translate( | |
| player_language, | |
| "notification.insufficient_credits", | |
| cost=cost, | |
| current=current_credits | |
| ) | |
| await self.broadcast({ | |
| "type": "notification", | |
| "message": message, | |
| "level": "error" | |
| }) | |
| return | |
| # Find required building type | |
| required_building = PRODUCTION_REQUIREMENTS.get(unit_type) | |
| if not required_building: | |
| return | |
| # If provided, use preferred building if valid | |
| suitable_building = None | |
| if preferred_building_id and preferred_building_id in self.game_state.buildings: | |
| b = self.game_state.buildings[preferred_building_id] | |
| if b.player_id == player_id and b.type == required_building: | |
| suitable_building = b | |
| # Otherwise choose least busy eligible building | |
| if not suitable_building: | |
| eligible = [ | |
| b for b in self.game_state.buildings.values() | |
| if b.player_id == player_id and b.type == required_building | |
| ] | |
| if eligible: | |
| suitable_building = min(eligible, key=lambda b: len(b.production_queue)) | |
| if suitable_building: | |
| # RED ALERT: Deduct credits! | |
| self.game_state.players[player_id].credits -= cost | |
| # Add to production queue | |
| suitable_building.production_queue.append(unit_type_str) | |
| print(f" ✅ Added {unit_type_str} to {suitable_building.type} queue (building {suitable_building.id})") | |
| print(f" Queue is now: {suitable_building.production_queue}") | |
| print(f" Credits: {self.game_state.players[player_id].credits}") | |
| # Translated notification | |
| unit_name = LOCALIZATION.translate(player_language, f"unit.{unit_type_str}") | |
| message = LOCALIZATION.translate(player_language, "notification.unit_training", unit=unit_name) | |
| await self.broadcast({ | |
| "type": "notification", | |
| "message": message, | |
| "level": "success" | |
| }) | |
| else: | |
| # Translated requirement message | |
| unit_name = LOCALIZATION.translate(player_language, f"unit.{unit_type_str}") | |
| building_name = LOCALIZATION.translate(player_language, f"building.{required_building.value}") | |
| message = LOCALIZATION.translate( | |
| player_language, | |
| "notification.unit_requires", | |
| unit=unit_name, | |
| requirement=building_name | |
| ) | |
| await self.broadcast({ | |
| "type": "notification", | |
| "message": message, | |
| "level": "error" | |
| }) | |
| elif cmd_type == "build_building": | |
| building_type_str = command.get("building_type") | |
| position = command.get("position") | |
| player_id = command.get("player_id", 0) | |
| if not building_type_str or not position: | |
| return | |
| try: | |
| building_type = BuildingType(building_type_str) | |
| except ValueError: | |
| return | |
| # RED ALERT: Check cost! | |
| cost = BUILDING_COSTS.get(building_type, 0) | |
| player_language = self.game_state.players[player_id].language if player_id in self.game_state.players else "en" | |
| current_credits = self.game_state.players[player_id].credits if player_id in self.game_state.players else 0 | |
| if current_credits < cost: | |
| # Not enough credits! (translated) | |
| message = LOCALIZATION.translate( | |
| player_language, | |
| "notification.insufficient_credits", | |
| cost=cost, | |
| current=current_credits | |
| ) | |
| await self.broadcast({ | |
| "type": "notification", | |
| "message": message, | |
| "level": "error" | |
| }) | |
| return | |
| # Rule: limit multiple same-type buildings if disabled | |
| if not ALLOW_MULTIPLE_SAME_BUILDING and building_type != BuildingType.HQ: | |
| for b in self.game_state.buildings.values(): | |
| if b.player_id == player_id and b.type == building_type: | |
| message = LOCALIZATION.translate(player_language, "notification.building_limit_one", building=LOCALIZATION.translate(player_language, f"building.{building_type_str}")) | |
| await self.broadcast({"type":"notification","message":message,"level":"error"}) | |
| return | |
| # Enforce HQ build radius | |
| # Find player's HQ | |
| hq = None | |
| for b in self.game_state.buildings.values(): | |
| if b.player_id == player_id and b.type == BuildingType.HQ: | |
| hq = b | |
| break | |
| if hq and position and "x" in position and "y" in position: | |
| max_dist = HQ_BUILD_RADIUS_TILES * TILE_SIZE | |
| dx = position["x"] - hq.position.x | |
| dy = position["y"] - hq.position.y | |
| if (dx*dx + dy*dy) ** 0.5 > max_dist: | |
| message = LOCALIZATION.translate(player_language, "notification.building_too_far_from_hq") | |
| await self.broadcast({"type":"notification","message":message,"level":"error"}) | |
| return | |
| # RED ALERT: Deduct credits! | |
| self.game_state.players[player_id].credits -= cost | |
| if position and "x" in position and "y" in position: | |
| self.game_state.create_building( | |
| building_type, | |
| player_id, | |
| Position(position["x"], position["y"]) | |
| ) | |
| # Translated notification | |
| building_name = LOCALIZATION.translate(player_language, f"building.{building_type_str}") | |
| message = LOCALIZATION.translate(player_language, "notification.building_placed", building=building_name) | |
| await self.broadcast({ | |
| "type": "notification", | |
| "message": message, | |
| "level": "success" | |
| }) | |
| elif cmd_type == "stop_units": | |
| unit_ids = command.get("unit_ids", []) | |
| for uid in unit_ids: | |
| if uid in self.game_state.units: | |
| self.game_state.units[uid].target = None | |
| elif cmd_type == "prepare_nuke": | |
| player_id = command.get("player_id", 0) | |
| if player_id in self.game_state.players: | |
| player = self.game_state.players[player_id] | |
| if player.superweapon_ready: | |
| player.nuke_preparing = True | |
| await self.broadcast({ | |
| "type": "nuke_preparing", | |
| "player_id": player_id | |
| }) | |
| elif cmd_type == "cancel_nuke": | |
| player_id = command.get("player_id", 0) | |
| if player_id in self.game_state.players: | |
| self.game_state.players[player_id].nuke_preparing = False | |
| elif cmd_type == "launch_nuke": | |
| player_id = command.get("player_id", 0) | |
| target = command.get("target") | |
| if player_id in self.game_state.players and target: | |
| player = self.game_state.players[player_id] | |
| if player.superweapon_ready and player.nuke_preparing: | |
| target_pos = Position(target["x"], target["y"]) | |
| # Launch nuke effect | |
| await self.launch_nuke(player_id, target_pos) | |
| # Reset superweapon state | |
| player.superweapon_ready = False | |
| player.superweapon_charge = 0 | |
| player.nuke_preparing = False | |
| # Broadcast nuke launch | |
| await self.broadcast({ | |
| "type": "nuke_launched", | |
| "player_id": player_id, | |
| "target": {"x": target_pos.x, "y": target_pos.y} | |
| }) | |
| elif cmd_type == "change_language": | |
| player_id = command.get("player_id", 0) | |
| language = command.get("language", "en") | |
| if player_id in self.game_state.players: | |
| # Validate language | |
| supported = list(LOCALIZATION.get_supported_languages()) | |
| if language in supported: | |
| self.game_state.players[player_id].language = language | |
| # Trigger immediate AI analysis in new language | |
| self.last_ai_analysis_time = 0 | |
| await self.broadcast({ | |
| "type": "notification", | |
| "message": f"Language changed to {LOCALIZATION.get_display_name(language)}", | |
| "level": "info" | |
| }) | |
| elif cmd_type == "request_ai_analysis": | |
| # Force immediate AI analysis | |
| await self.run_ai_analysis() | |
| await self.broadcast({ | |
| "type": "ai_analysis_update", | |
| "analysis": self.last_ai_analysis | |
| }) | |
| # Global connection manager | |
| manager = ConnectionManager() | |
| # Routes | |
| async def get_home(): | |
| """Serve the main game interface""" | |
| return HTMLResponse(content=open("static/index.html").read()) | |
| async def health_check(): | |
| """Health check endpoint for HuggingFace Spaces""" | |
| return { | |
| "status": "healthy", | |
| "players": len(manager.game_state.players), | |
| "units": len(manager.game_state.units), | |
| "buildings": len(manager.game_state.buildings), | |
| "active_connections": len(manager.active_connections), | |
| "ai_available": manager.ai_analyzer.model_available, | |
| "supported_languages": list(LOCALIZATION.get_supported_languages()) | |
| } | |
| async def get_languages(): | |
| """Get supported languages""" | |
| languages = [] | |
| for lang_code in LOCALIZATION.get_supported_languages(): | |
| languages.append({ | |
| "code": lang_code, | |
| "name": LOCALIZATION.get_display_name(lang_code) | |
| }) | |
| return {"languages": languages} | |
| async def get_translations(language: str): | |
| """Get all translations for a language""" | |
| from localization import TRANSLATIONS | |
| if language not in TRANSLATIONS: | |
| language = "en" | |
| return {"translations": TRANSLATIONS[language], "language": language} | |
| async def set_player_language(player_id: int, language: str): | |
| """Set player's preferred language""" | |
| if player_id in manager.game_state.players: | |
| manager.game_state.players[player_id].language = language | |
| return {"success": True, "language": language} | |
| return {"success": False, "error": "Player not found"} | |
| async def get_ai_status(): | |
| """Get AI analyzer status""" | |
| return { | |
| "available": manager.ai_analyzer.model_available, | |
| "model_path": manager.ai_analyzer.model_path if manager.ai_analyzer.model_available else None, | |
| "last_analysis": manager.last_ai_analysis | |
| } | |
| async def translate_nl_command(data: dict): | |
| """ | |
| Translate natural language command to MCP JSON | |
| Body: {"command": str, "language": str (optional)} | |
| """ | |
| translator = get_nl_translator() | |
| command = data.get("command", "") | |
| language = data.get("language", None) | |
| if not command: | |
| return {"success": False, "error": "No command provided"} | |
| result = translator.translate_command(command, language) | |
| return result | |
| async def get_nl_examples(language: str = "en"): | |
| """Get example natural language commands""" | |
| translator = get_nl_translator() | |
| return { | |
| "language": language, | |
| "examples": translator.get_example_commands(language) | |
| } | |
| async def get_nl_status(): | |
| """Get NL translator status""" | |
| translator = get_nl_translator() | |
| return { | |
| "available": translator.model_loaded, | |
| "model_path": translator.model_path, | |
| "last_error": translator.last_error | |
| } | |
| async def websocket_endpoint(websocket: WebSocket): | |
| """WebSocket endpoint for real-time game communication""" | |
| await manager.connect(websocket) | |
| try: | |
| # Send initial state | |
| await websocket.send_json({ | |
| "type": "init", | |
| "state": manager.game_state.to_dict() | |
| }) | |
| # Handle incoming messages | |
| while True: | |
| data = await websocket.receive_json() | |
| await manager.handle_command(data) | |
| except WebSocketDisconnect: | |
| manager.disconnect(websocket) | |
| except Exception as e: | |
| print(f"WebSocket error: {e}") | |
| manager.disconnect(websocket) | |
| # Mount static files (will be created next) | |
| try: | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| except: | |
| pass | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |