rts-commander / app.py
Luigi's picture
perf: Non-blocking LLM architecture to prevent game lag
7e8483f
"""
RTS Game Web Server - FastAPI + WebSocket
Optimized for HuggingFace Spaces with Docker
Features:
- Real-time multiplayer RTS gameplay
- AI tactical analysis via Qwen2.5 LLM
- Multi-language support (EN/FR/ZH-TW)
- Red Alert-style mechanics
"""
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import json
import random
import time
from typing import Dict, List, Optional, Set, Any
from dataclasses import dataclass, asdict
from enum import Enum
import uuid
# Import localization and AI systems
from localization import LOCALIZATION
from ai_analysis import get_ai_analyzer, get_model_download_status
from nl_translator_async import get_nl_translator
# Game Constants
TILE_SIZE = 40
MAP_WIDTH = 96
MAP_HEIGHT = 72
VIEW_WIDTH = 48
VIEW_HEIGHT = 27
# Initialize FastAPI app
app = FastAPI(title="RTS Game", version="1.0.0")
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
from backend.constants import (
UnitType,
BuildingType,
UNIT_COSTS,
BUILDING_COSTS,
POWER_PRODUCTION,
POWER_CONSUMPTION,
LOW_POWER_THRESHOLD,
LOW_POWER_PRODUCTION_FACTOR,
HARVESTER_CAPACITY,
HARVEST_AMOUNT_PER_ORE,
HARVEST_AMOUNT_PER_GEM,
HQ_BUILD_RADIUS_TILES,
ALLOW_MULTIPLE_SAME_BUILDING,
)
class TerrainType(str, Enum):
GRASS = "grass"
ORE = "ore"
GEM = "gem"
WATER = "water"
# Production Requirements - Critical for gameplay!
PRODUCTION_REQUIREMENTS = {
UnitType.INFANTRY: BuildingType.BARRACKS,
UnitType.TANK: BuildingType.WAR_FACTORY,
UnitType.ARTILLERY: BuildingType.WAR_FACTORY,
UnitType.HELICOPTER: BuildingType.WAR_FACTORY,
UnitType.HARVESTER: BuildingType.HQ, # Harvester needs HQ, NOT Refinery!
}
## Costs, power system, and harvesting constants are imported above
# Data Classes
@dataclass
class Position:
x: float
y: float
def distance_to(self, other: 'Position') -> float:
return ((self.x - other.x) ** 2 + (self.y - other.y) ** 2) ** 0.5
def to_dict(self):
return {"x": self.x, "y": self.y}
@dataclass
class Unit:
id: str
type: UnitType
player_id: int
position: Position
health: int
max_health: int
speed: float
damage: int
range: float
target: Optional[Position] = None
target_unit_id: Optional[str] = None
target_building_id: Optional[str] = None
cargo: int = 0
gathering: bool = False
returning: bool = False
ore_target: Optional[Position] = None
last_attacker_id: Optional[str] = None
manual_control: bool = False # True when player gives manual orders
manual_order: bool = False # True when player gives manual move/attack order
collision_radius: float = 15.0 # Collision detection radius
attack_cooldown: int = 0 # Frames until next attack
attack_animation: int = 0 # Frames for attack animation (for visual feedback)
def to_dict(self):
return {
"id": self.id,
"type": self.type.value,
"player_id": self.player_id,
"position": self.position.to_dict(),
"health": self.health,
"max_health": self.max_health,
"speed": self.speed,
"damage": self.damage,
"range": self.range,
"target": self.target.to_dict() if self.target else None,
"target_unit_id": self.target_unit_id,
"target_building_id": self.target_building_id,
"cargo": self.cargo,
"gathering": self.gathering,
"returning": self.returning,
"manual_control": self.manual_control,
"manual_order": self.manual_order,
"collision_radius": self.collision_radius,
"attack_cooldown": self.attack_cooldown,
"attack_animation": self.attack_animation
}
@dataclass
class Building:
id: str
type: BuildingType
player_id: int
position: Position
health: int
max_health: int
production_queue: List[str]
production_progress: float
target_unit_id: Optional[str] = None # For defense turrets
attack_cooldown: int = 0 # For defense turrets
attack_animation: int = 0 # For defense turrets
def to_dict(self):
return {
"id": self.id,
"type": self.type.value,
"player_id": self.player_id,
"position": self.position.to_dict(),
"health": self.health,
"max_health": self.max_health,
"production_queue": self.production_queue,
"production_progress": self.production_progress,
"target_unit_id": self.target_unit_id,
"attack_cooldown": self.attack_cooldown,
"attack_animation": self.attack_animation
}
@dataclass
class Player:
id: int
name: str
color: str
credits: int
power: int
power_consumption: int
is_ai: bool
language: str = "en" # Language preference (en, fr, zh-TW)
superweapon_charge: int = 0 # 0-1800 ticks (30 seconds at 60 ticks/sec)
superweapon_ready: bool = False
nuke_preparing: bool = False # True when 'N' key pressed, waiting for target
def to_dict(self):
return asdict(self)
# Game State Manager
class GameState:
def __init__(self):
self.units: Dict[str, Unit] = {}
self.buildings: Dict[str, Building] = {}
self.players: Dict[int, Player] = {}
self.terrain: List[List[TerrainType]] = []
self.fog_of_war: List[List[bool]] = []
self.game_started = False
self.game_over = False
self.winner: Optional[str] = None # "player" or "enemy"
self.tick = 0
self.init_map()
self.init_players()
def init_map(self):
"""Initialize terrain with grass, ore, and water"""
self.terrain = [[TerrainType.GRASS for _ in range(MAP_WIDTH)] for _ in range(MAP_HEIGHT)]
self.fog_of_war = [[True for _ in range(MAP_WIDTH)] for _ in range(MAP_HEIGHT)]
# Add ore patches
for _ in range(15):
ox, oy = random.randint(5, MAP_WIDTH-6), random.randint(5, MAP_HEIGHT-6)
for dx in range(-2, 3):
for dy in range(-2, 3):
if 0 <= ox+dx < MAP_WIDTH and 0 <= oy+dy < MAP_HEIGHT:
if random.random() > 0.3:
self.terrain[oy+dy][ox+dx] = TerrainType.ORE
# Add gem patches (rare)
for _ in range(5):
gx, gy = random.randint(5, MAP_WIDTH-6), random.randint(5, MAP_HEIGHT-6)
for dx in range(-1, 2):
for dy in range(-1, 2):
if 0 <= gx+dx < MAP_WIDTH and 0 <= gy+dy < MAP_HEIGHT:
if random.random() > 0.5:
self.terrain[gy+dy][gx+dx] = TerrainType.GEM
# Add water bodies
for _ in range(8):
wx, wy = random.randint(5, MAP_WIDTH-6), random.randint(5, MAP_HEIGHT-6)
for dx in range(-3, 4):
for dy in range(-3, 4):
if 0 <= wx+dx < MAP_WIDTH and 0 <= wy+dy < MAP_HEIGHT:
if (dx*dx + dy*dy) < 9:
self.terrain[wy+dy][wx+dx] = TerrainType.WATER
def init_players(self):
"""Initialize player 0 (human) and player 1 (AI)"""
# Start with power=50 (from HQ), consumption=0
self.players[0] = Player(0, "Player", "#4A90E2", 5000, 50, 0, False)
self.players[1] = Player(1, "AI", "#E74C3C", 5000, 50, 0, True)
# Create starting HQ for each player
hq0_id = str(uuid.uuid4())
self.buildings[hq0_id] = Building(
id=hq0_id,
type=BuildingType.HQ,
player_id=0,
position=Position(5 * TILE_SIZE, 5 * TILE_SIZE),
health=500,
max_health=500,
production_queue=[],
production_progress=0
)
hq1_id = str(uuid.uuid4())
self.buildings[hq1_id] = Building(
id=hq1_id,
type=BuildingType.HQ,
player_id=1,
position=Position((MAP_WIDTH-8) * TILE_SIZE, (MAP_HEIGHT-8) * TILE_SIZE),
health=500,
max_health=500,
production_queue=[],
production_progress=0
)
# Starting units
for i in range(3):
self.create_unit(UnitType.INFANTRY, 0, Position((7+i)*TILE_SIZE, 7*TILE_SIZE))
self.create_unit(UnitType.INFANTRY, 1, Position((MAP_WIDTH-10-i)*TILE_SIZE, (MAP_HEIGHT-10)*TILE_SIZE))
def create_unit(self, unit_type: UnitType, player_id: int, position: Position) -> Unit:
"""Create a new unit"""
unit_stats = {
UnitType.INFANTRY: {"health": 100, "speed": 2.0, "damage": 10, "range": 80},
UnitType.TANK: {"health": 200, "speed": 1.5, "damage": 30, "range": 120},
UnitType.HARVESTER: {"health": 150, "speed": 1.0, "damage": 0, "range": 0},
UnitType.HELICOPTER: {"health": 120, "speed": 3.0, "damage": 25, "range": 150},
UnitType.ARTILLERY: {"health": 100, "speed": 1.0, "damage": 50, "range": 200},
}
stats = unit_stats[unit_type]
unit_id = str(uuid.uuid4())
unit = Unit(
id=unit_id,
type=unit_type,
player_id=player_id,
position=position,
health=stats["health"],
max_health=stats["health"],
speed=stats["speed"],
damage=stats["damage"],
range=stats["range"],
target=None,
target_unit_id=None
)
self.units[unit_id] = unit
return unit
def create_building(self, building_type: BuildingType, player_id: int, position: Position) -> Building:
"""Create a new building"""
building_stats = {
BuildingType.HQ: {"health": 500},
BuildingType.BARRACKS: {"health": 300},
BuildingType.WAR_FACTORY: {"health": 400},
BuildingType.REFINERY: {"health": 250},
BuildingType.POWER_PLANT: {"health": 200},
BuildingType.DEFENSE_TURRET: {"health": 350},
}
stats = building_stats[building_type]
building_id = str(uuid.uuid4())
building = Building(
id=building_id,
type=building_type,
player_id=player_id,
position=position,
health=stats["health"],
max_health=stats["health"],
production_queue=[],
production_progress=0
)
self.buildings[building_id] = building
return building
def calculate_power(self, player_id: int) -> tuple[int, int, str]:
"""
Calculate power production and consumption for a player.
Returns:
tuple: (power_production, power_consumption, status)
status: 'green' (enough power), 'yellow' (low power), 'red' (no power)
"""
production = 0
consumption = 0
for building in self.buildings.values():
if building.player_id == player_id:
# Add power production
production += POWER_PRODUCTION.get(building.type, 0)
# Add power consumption
consumption += POWER_CONSUMPTION.get(building.type, 0)
# Determine status
if consumption == 0:
status = 'green'
elif production >= consumption:
status = 'green'
elif production >= consumption * LOW_POWER_THRESHOLD:
status = 'yellow'
else:
status = 'red'
# Update player power values
if player_id in self.players:
self.players[player_id].power = production
self.players[player_id].power_consumption = consumption
return production, consumption, status
def to_dict(self):
"""Convert game state to dictionary for JSON serialization"""
return {
"tick": self.tick,
"game_started": self.game_started,
"game_over": self.game_over,
"winner": self.winner,
"players": {pid: p.to_dict() for pid, p in self.players.items()},
"units": {uid: u.to_dict() for uid, u in self.units.items()},
"buildings": {bid: b.to_dict() for bid, b in self.buildings.items()},
"terrain": [[t.value for t in row] for row in self.terrain],
"fog_of_war": self.fog_of_war
}
# WebSocket Connection Manager
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
self.game_state = GameState()
self.game_loop_task: Optional[asyncio.Task] = None
self.ai_analyzer = get_ai_analyzer()
self.last_ai_analysis: Dict[str, Any] = {}
self.ai_analysis_interval = 60.0 # Analyze every 60 seconds (reduced frequency to avoid blocking)
self.last_ai_analysis_time = 0.0
# RED ALERT: Enemy AI state
self.ai_last_action_tick = 0
self.ai_action_interval = 120 # Take action every 6 seconds (120 ticks at 20Hz)
self.ai_build_plan = [
'power_plant',
'refinery',
'barracks',
'power_plant', # Second power plant
'war_factory',
]
self.ai_build_index = 0
self.ai_unit_cycle = ['infantry', 'infantry', 'tank', 'infantry', 'helicopter']
self.ai_unit_index = 0
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
# Start game loop if not already running
if self.game_loop_task is None or self.game_loop_task.done():
self.game_loop_task = asyncio.create_task(self.game_loop())
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
async def broadcast(self, message: dict):
"""Send message to all connected clients"""
disconnected = []
for connection in self.active_connections:
try:
await connection.send_json(message)
except:
disconnected.append(connection)
# Clean up disconnected clients
for conn in disconnected:
self.disconnect(conn)
async def game_loop(self):
"""Main game loop - runs at 20 ticks per second"""
print("🔄 Game loop STARTED")
while True: # Run forever, check connections inside loop
try:
# Update game state
self.update_game_state()
# AI Analysis (periodic) - only if model is available
current_time = time.time()
if (self.ai_analyzer.model_available and
current_time - self.last_ai_analysis_time >= self.ai_analysis_interval):
await self.run_ai_analysis()
self.last_ai_analysis_time = current_time
# Broadcast state to all clients
state_dict = self.game_state.to_dict()
state_dict['ai_analysis'] = self.last_ai_analysis # Include AI insights
# Include model download status so UI can show progress
if not self.ai_analyzer.model_available:
state_dict['model_download'] = get_model_download_status()
# Broadcast only if there are active connections
if self.active_connections:
await self.broadcast({
"type": "state_update",
"state": state_dict
})
# 50ms delay = 20 ticks/sec
await asyncio.sleep(0.05)
except Exception as e:
print(f"Game loop error: {e}")
await asyncio.sleep(0.1)
async def run_ai_analysis(self):
"""Run AI tactical analysis in background"""
# Skip if model not available
if not self.ai_analyzer.model_available:
# Provide heuristic analysis so panel is never empty
player_lang = self.game_state.players.get(0, Player(0, "Player", "#000", 0, 0, 0, False)).language
self.last_ai_analysis = self.ai_analyzer._heuristic_analysis(self.game_state.to_dict(), player_lang)
return
try:
# Get player language preference
player_lang = self.game_state.players.get(0, Player(0, "Player", "#000", 0, 0, 0, False)).language
# Run analysis in thread pool to avoid blocking
loop = asyncio.get_event_loop()
analysis = await loop.run_in_executor(
None,
self.ai_analyzer.summarize_combat_situation,
self.game_state.to_dict(),
player_lang
)
self.last_ai_analysis = analysis
# Don't print every time to avoid console spam
# print(f"🤖 AI Analysis: {analysis.get('summary', '')}")
except Exception as e:
print(f"⚠️ AI analysis error: {e}")
player_lang = self.game_state.players.get(0, Player(0, "Player", "#000", 0, 0, 0, False)).language
self.last_ai_analysis = self.ai_analyzer._heuristic_analysis(self.game_state.to_dict(), player_lang)
def update_game_state(self):
"""Update game simulation - Red Alert style!"""
self.game_state.tick += 1
# DEBUG: Log every 5 seconds to confirm game loop is running
if self.game_state.tick % 100 == 0:
print(f"⏱️ Game tick: {self.game_state.tick} (loop running)")
# Cleanup old LLM requests every 30 seconds (600 ticks at 20Hz)
if self.game_state.tick % 600 == 0:
from model_manager import get_shared_model
model = get_shared_model()
model.cleanup_old_requests(max_age=300.0) # 5 minutes
# Also cleanup translator
translator = get_nl_translator()
translator.cleanup_old_requests(max_age=60.0) # 1 minute
# Update superweapon charge (30 seconds = 1800 ticks at 60 ticks/sec)
for player in self.game_state.players.values():
if not player.superweapon_ready and player.superweapon_charge < 1800:
player.superweapon_charge += 1
if player.superweapon_charge >= 1800:
player.superweapon_ready = True
# RED ALERT: Calculate power for both players
power_prod_p0, power_cons_p0, power_status_p0 = self.game_state.calculate_power(0)
power_prod_p1, power_cons_p1, power_status_p1 = self.game_state.calculate_power(1)
# Store power status for later use (warning every 5 seconds = 100 ticks at 20Hz)
if not hasattr(self, 'last_low_power_warning'):
self.last_low_power_warning = 0
if power_status_p0 == 'red' and self.game_state.tick - self.last_low_power_warning > 100:
# Send low power warning to player (translated)
player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en"
message = LOCALIZATION.translate(player_language, "notification.low_power")
asyncio.create_task(self.broadcast({
"type": "notification",
"message": message,
"level": "warning"
}))
self.last_low_power_warning = self.game_state.tick
# RED ALERT: Enemy AI strategic decisions
if self.game_state.tick - self.ai_last_action_tick >= self.ai_action_interval:
self.update_enemy_ai()
self.ai_last_action_tick = self.game_state.tick
# Check victory conditions (no HQ = defeat)
if not self.game_state.game_over:
player_hq_exists = any(b.type == BuildingType.HQ and b.player_id == 0
for b in self.game_state.buildings.values())
enemy_hq_exists = any(b.type == BuildingType.HQ and b.player_id == 1
for b in self.game_state.buildings.values())
if not player_hq_exists and enemy_hq_exists:
# Player lost
self.game_state.game_over = True
self.game_state.winner = "enemy"
player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en"
winner_name = LOCALIZATION.translate(player_language, "game.winner.enemy")
message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name)
asyncio.create_task(self.broadcast({
"type": "game_over",
"winner": "enemy",
"message": message
}))
elif not enemy_hq_exists and player_hq_exists:
# Player won!
self.game_state.game_over = True
self.game_state.winner = "player"
player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en"
winner_name = LOCALIZATION.translate(player_language, "game.winner.player")
message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name)
asyncio.create_task(self.broadcast({
"type": "game_over",
"winner": "player",
"message": message
}))
elif not player_hq_exists and not enemy_hq_exists:
# Draw (both destroyed simultaneously)
self.game_state.game_over = True
self.game_state.winner = "draw"
asyncio.create_task(self.broadcast({
"type": "game_over",
"winner": "draw",
"message": "Draw! Both HQs destroyed"
}))
# Update units
for unit in list(self.game_state.units.values()):
# RED ALERT: Harvester AI (only if not manually controlled)
if unit.type == UnitType.HARVESTER and not unit.manual_control:
self.update_harvester(unit)
# Don't continue - let it move with the target set by AI
# RED ALERT: Auto-defense - if attacked, fight back! (but respect manual orders)
if unit.last_attacker_id and unit.last_attacker_id in self.game_state.units:
if not unit.target_unit_id and not unit.manual_order: # Not already attacking and no manual order
unit.target_unit_id = unit.last_attacker_id
# Don't clear movement target if player gave manual move order
# RED ALERT: Auto-acquire nearby enemies when idle (but respect manual orders)
if not unit.target_unit_id and not unit.target and unit.damage > 0 and not unit.manual_order:
nearest_enemy = self.find_nearest_enemy(unit)
if nearest_enemy and unit.position.distance_to(nearest_enemy.position) < unit.range * 3:
unit.target_unit_id = nearest_enemy.id
# Handle combat
if unit.target_unit_id:
if unit.target_unit_id in self.game_state.units:
target = self.game_state.units[unit.target_unit_id]
distance = unit.position.distance_to(target.position)
if distance <= unit.range:
# In range - attack!
unit.target = None # Stop moving
# Cooldown system - attack every N frames
if unit.attack_cooldown <= 0:
# Calculate damage based on unit type
# Infantry: 5-10 damage per hit, fast attacks (20 frames)
# Tank: 30-40 damage per hit, slow attacks (40 frames)
# Artillery: 50-60 damage per hit, very slow (60 frames)
# Helicopter: 15-20 damage per hit, medium speed (30 frames)
damage_multipliers = {
UnitType.INFANTRY: (5, 20), # (damage, cooldown)
UnitType.TANK: (35, 40),
UnitType.ARTILLERY: (55, 60),
UnitType.HELICOPTER: (18, 30),
UnitType.HARVESTER: (0, 0),
}
damage, cooldown = damage_multipliers.get(unit.type, (5, 20))
# Apply damage
target.health -= damage
unit.attack_cooldown = cooldown
unit.attack_animation = 10 # 10 frames of attack animation
target.last_attacker_id = unit.id # RED ALERT: Track attacker for auto-defense
if target.health <= 0:
# Target destroyed
del self.game_state.units[unit.target_unit_id]
unit.target_unit_id = None
unit.last_attacker_id = None
else:
# Move closer
unit.target = Position(target.position.x, target.position.y)
else:
# Target no longer exists
unit.target_unit_id = None
# Handle building attacks
if unit.target_building_id:
if unit.target_building_id in self.game_state.buildings:
target = self.game_state.buildings[unit.target_building_id]
distance = unit.position.distance_to(target.position)
if distance <= unit.range:
# In range - attack!
unit.target = None # Stop moving
# Cooldown system - attack every N frames
if unit.attack_cooldown <= 0:
damage_multipliers = {
UnitType.INFANTRY: (5, 20), # (damage, cooldown)
UnitType.TANK: (35, 40),
UnitType.ARTILLERY: (55, 60),
UnitType.HELICOPTER: (18, 30),
UnitType.HARVESTER: (0, 0),
}
damage, cooldown = damage_multipliers.get(unit.type, (5, 20))
# Apply damage to building
target.health -= damage
unit.attack_cooldown = cooldown
unit.attack_animation = 10 # 10 frames of attack animation
if target.health <= 0:
# Building destroyed
del self.game_state.buildings[unit.target_building_id]
unit.target_building_id = None
else:
# Move closer
unit.target = Position(target.position.x, target.position.y)
else:
# Target no longer exists
unit.target_building_id = None
# Decrease attack cooldown and animation
if unit.attack_cooldown > 0:
unit.attack_cooldown -= 1
if unit.attack_animation > 0:
unit.attack_animation -= 1
# Movement
# DEBUG: Log unit target status for player units
if unit.player_id == 0 and self.game_state.tick % 40 == 0: # Every 2 seconds
if unit.target:
print(f" 🎯 Unit {unit.id[:8]} ({unit.type}) HAS target: ({unit.target.x:.1f}, {unit.target.y:.1f}), pos: ({unit.position.x:.1f}, {unit.position.y:.1f})")
else:
print(f" ❌ Unit {unit.id[:8]} ({unit.type}) NO target, pos: ({unit.position.x:.1f}, {unit.position.y:.1f})")
if unit.target:
# Move towards target
dx = unit.target.x - unit.position.x
dy = unit.target.y - unit.position.y
dist = (dx*dx + dy*dy) ** 0.5
if dist > 5:
old_x, old_y = unit.position.x, unit.position.y
unit.position.x += (dx / dist) * unit.speed
unit.position.y += (dy / dist) * unit.speed
# Apply dispersion after movement
self.apply_unit_dispersion(unit)
# Debug log for movement (only log occasionally to avoid spam)
if self.game_state.tick % 20 == 0: # Every second (20 ticks/sec)
print(f" 🚶 Unit {unit.id[:8]} ({unit.type}) moving: ({old_x:.1f},{old_y:.1f}) -> ({unit.position.x:.1f},{unit.position.y:.1f}), dist={dist:.1f}")
else:
print(f" ✅ Unit {unit.id[:8]} ({unit.type}) reached destination ({unit.position.x:.1f},{unit.position.y:.1f})")
unit.target = None
unit.manual_order = False # Clear manual order flag when destination reached
# If Harvester reached manual destination, resume AI
if unit.type == UnitType.HARVESTER and unit.manual_control:
unit.manual_control = False
# RED ALERT: AI unit behavior (enemy side)
if self.game_state.players[unit.player_id].is_ai:
self.update_ai_unit(unit)
# Update buildings production
for building in self.game_state.buildings.values():
# Defense turret auto-attack logic
if building.type == BuildingType.DEFENSE_TURRET:
turret_range = 300.0 # Defense turret range
# Find nearest enemy unit
if not building.target_unit_id or building.target_unit_id not in self.game_state.units:
min_dist = float('inf')
nearest_enemy = None
for enemy_unit in self.game_state.units.values():
if enemy_unit.player_id != building.player_id:
dist = building.position.distance_to(enemy_unit.position)
if dist < turret_range and dist < min_dist:
min_dist = dist
nearest_enemy = enemy_unit
if nearest_enemy:
building.target_unit_id = nearest_enemy.id
# Attack target if in range
if building.target_unit_id and building.target_unit_id in self.game_state.units:
target = self.game_state.units[building.target_unit_id]
distance = building.position.distance_to(target.position)
if distance <= turret_range:
# Attack!
if building.attack_cooldown <= 0:
damage = 20 # Turret damage
target.health -= damage
building.attack_cooldown = 30 # 30 frames cooldown
building.attack_animation = 10
if target.health <= 0:
# Target destroyed
del self.game_state.units[building.target_unit_id]
building.target_unit_id = None
else:
# Out of range, lose target
building.target_unit_id = None
# Decrease cooldowns
if building.attack_cooldown > 0:
building.attack_cooldown -= 1
if building.attack_animation > 0:
building.attack_animation -= 1
# DEBUG: Log production queue status
if building.player_id == 0 and building.type in [BuildingType.BARRACKS, BuildingType.WAR_FACTORY]:
if self.game_state.tick % 60 == 0: # Every 3 seconds
if building.production_queue:
print(f" 📋 Building {building.id[:8]} ({building.type}) queue: {building.production_queue}, progress: {building.production_progress:.2%}")
else:
print(f" 💤 Building {building.id[:8]} ({building.type}) IDLE (no queue)")
if building.production_queue:
# RED ALERT: Check power status for this building's player
_, _, power_status = self.game_state.calculate_power(building.player_id)
# Adjust production speed based on power
production_speed = 0.01
if power_status == 'red':
production_speed *= LOW_POWER_PRODUCTION_FACTOR # 50% speed when low power
building.production_progress += production_speed
# Debug log every 2 seconds (40 ticks at 20Hz)
if self.game_state.tick % 40 == 0:
print(f" 🏭 Building {building.id[:8]} ({building.type}) producing: {building.production_queue[0]} - progress: {building.production_progress:.2%}")
if building.production_progress >= 1.0:
# Complete production
print(f" ✅ Production complete! Creating {building.production_queue[0]}")
unit_type = UnitType(building.production_queue.pop(0))
spawn_pos = Position(
building.position.x + TILE_SIZE * 2,
building.position.y + TILE_SIZE * 2
)
# Find free position near spawn point
new_unit = self.game_state.create_unit(unit_type, building.player_id, spawn_pos)
if new_unit:
free_pos = self.find_free_position_nearby(spawn_pos, new_unit.id)
new_unit.position = free_pos
building.production_progress = 0
# Endgame checks: elimination-first (after all destructions this tick)
if not self.game_state.game_over:
p_units = sum(1 for u in self.game_state.units.values() if u.player_id == 0)
e_units = sum(1 for u in self.game_state.units.values() if u.player_id == 1)
p_buildings = sum(1 for b in self.game_state.buildings.values() if b.player_id == 0)
e_buildings = sum(1 for b in self.game_state.buildings.values() if b.player_id == 1)
player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en"
if p_units == 0 and p_buildings == 0 and (e_units > 0 or e_buildings > 0):
# Player eliminated
self.game_state.game_over = True
self.game_state.winner = "enemy"
winner_name = LOCALIZATION.translate(player_language, "game.winner.enemy")
message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name)
asyncio.create_task(self.broadcast({
"type": "game_over",
"winner": "enemy",
"message": message
}))
elif e_units == 0 and e_buildings == 0 and (p_units > 0 or p_buildings > 0):
# Enemy eliminated
self.game_state.game_over = True
self.game_state.winner = "player"
winner_name = LOCALIZATION.translate(player_language, "game.winner.player")
message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name)
asyncio.create_task(self.broadcast({
"type": "game_over",
"winner": "player",
"message": message
}))
elif p_units == 0 and p_buildings == 0 and e_units == 0 and e_buildings == 0:
# Total annihilation -> draw
self.game_state.game_over = True
self.game_state.winner = "draw"
# Localized draw banner if available
try:
draw_msg = LOCALIZATION.translate(player_language, "game.draw.banner")
except Exception:
draw_msg = "Draw!"
asyncio.create_task(self.broadcast({
"type": "game_over",
"winner": "draw",
"message": draw_msg
}))
def find_nearest_enemy(self, unit: Unit) -> Optional[Unit]:
"""RED ALERT: Find nearest enemy unit"""
min_dist = float('inf')
nearest_enemy = None
for other_unit in self.game_state.units.values():
if other_unit.player_id != unit.player_id:
dist = unit.position.distance_to(other_unit.position)
if dist < min_dist:
min_dist = dist
nearest_enemy = other_unit
return nearest_enemy
def is_position_occupied(self, position: Position, current_unit_id: str, radius: float = 15.0) -> bool:
"""Check if a position is occupied by another unit"""
for unit_id, unit in self.game_state.units.items():
if unit_id == current_unit_id:
continue
distance = position.distance_to(unit.position)
if distance < (radius + unit.collision_radius):
return True
return False
def find_free_position_nearby(self, position: Position, unit_id: str, max_attempts: int = 16) -> Position:
"""Find a free position around the given position using spiral search"""
# Check if current position is free
if not self.is_position_occupied(position, unit_id):
return position
# Spiral search outward with circular pattern
import math
for ring in range(1, max_attempts + 1):
# Number of positions to test in this ring (8 directions)
num_positions = 8
radius = 25.0 * ring # Increase radius with each ring
for i in range(num_positions):
angle = (i * 360.0 / num_positions) * (math.pi / 180.0) # Convert to radians
offset_x = radius * math.cos(angle)
offset_y = radius * math.sin(angle)
new_pos = Position(
position.x + offset_x,
position.y + offset_y
)
# Keep within map bounds
new_pos.x = max(TILE_SIZE, min(MAP_WIDTH * TILE_SIZE - TILE_SIZE, new_pos.x))
new_pos.y = max(TILE_SIZE, min(MAP_HEIGHT * TILE_SIZE - TILE_SIZE, new_pos.y))
if not self.is_position_occupied(new_pos, unit_id):
return new_pos
# If no free position found, return original (fallback)
return position
def apply_unit_dispersion(self, unit: Unit):
"""Apply automatic dispersion to prevent units from overlapping"""
if self.is_position_occupied(unit.position, unit.id):
new_position = self.find_free_position_nearby(unit.position, unit.id)
unit.position = new_position
def update_harvester(self, unit: Unit):
"""RED ALERT: Harvester AI - auto-collect resources!"""
# If returning to base with cargo
if unit.returning:
# Find nearest Refinery or HQ
depot = self.find_nearest_depot(unit.player_id, unit.position)
if depot:
distance = unit.position.distance_to(depot.position)
if distance < TILE_SIZE * 2:
# Deposit cargo
self.game_state.players[unit.player_id].credits += unit.cargo
unit.cargo = 0
unit.returning = False
unit.gathering = False
unit.ore_target = None
unit.target = None # Clear target after deposit
unit.manual_control = False # Resume AI after deposit
else:
# Move to depot
unit.target = Position(depot.position.x, depot.position.y)
else:
# No depot - stop returning
unit.returning = False
return
# If at ore patch, harvest it
if unit.ore_target:
distance = ((unit.position.x - unit.ore_target.x) ** 2 +
(unit.position.y - unit.ore_target.y) ** 2) ** 0.5
if distance < TILE_SIZE / 2:
# Harvest ore
tile_x = int(unit.ore_target.x / TILE_SIZE)
tile_y = int(unit.ore_target.y / TILE_SIZE)
if (0 <= tile_x < MAP_WIDTH and 0 <= tile_y < MAP_HEIGHT):
terrain = self.game_state.terrain[tile_y][tile_x]
if terrain == TerrainType.ORE:
unit.cargo = min(HARVESTER_CAPACITY, unit.cargo + HARVEST_AMOUNT_PER_ORE)
self.game_state.terrain[tile_y][tile_x] = TerrainType.GRASS
elif terrain == TerrainType.GEM:
unit.cargo = min(HARVESTER_CAPACITY, unit.cargo + HARVEST_AMOUNT_PER_GEM)
self.game_state.terrain[tile_y][tile_x] = TerrainType.GRASS
unit.ore_target = None
unit.gathering = False
# If cargo full or nearly full, return
if unit.cargo >= HARVESTER_CAPACITY * 0.9:
unit.returning = True
unit.target = None
else:
# Move to ore
unit.target = unit.ore_target
return
# FIXED: Always search for ore when idle (not gathering and no ore target)
# This ensures Harvester automatically finds ore after spawning or depositing
if not unit.gathering and not unit.ore_target:
nearest_ore = self.find_nearest_ore(unit.position)
if nearest_ore:
unit.ore_target = nearest_ore
unit.gathering = True
unit.target = nearest_ore
# If no ore found, clear any residual target to stay idle
elif unit.target:
unit.target = None
def find_nearest_depot(self, player_id: int, position: Position) -> Optional[Building]:
"""Find nearest Refinery or HQ for harvester"""
nearest = None
min_dist = float('inf')
for building in self.game_state.buildings.values():
if building.player_id == player_id:
if building.type in [BuildingType.REFINERY, BuildingType.HQ]:
dist = position.distance_to(building.position)
if dist < min_dist:
min_dist = dist
nearest = building
return nearest
def find_nearest_ore(self, position: Position) -> Optional[Position]:
"""Find nearest ore or gem tile"""
nearest = None
min_dist = float('inf')
for y in range(MAP_HEIGHT):
for x in range(MAP_WIDTH):
if self.game_state.terrain[y][x] in [TerrainType.ORE, TerrainType.GEM]:
ore_pos = Position(x * TILE_SIZE + TILE_SIZE/2, y * TILE_SIZE + TILE_SIZE/2)
dist = position.distance_to(ore_pos)
if dist < min_dist:
min_dist = dist
nearest = ore_pos
return nearest
def update_ai_unit(self, unit: Unit):
"""RED ALERT: Enemy AI behavior - aggressive!"""
if unit.damage == 0: # Don't attack with harvesters
return
# Always try to attack nearest enemy
if not unit.target_unit_id:
nearest_enemy = self.find_nearest_enemy(unit)
if nearest_enemy:
distance = unit.position.distance_to(nearest_enemy.position)
# Attack if within aggro range
if distance < 500: # Aggro range
unit.target_unit_id = nearest_enemy.id
def update_enemy_ai(self):
"""RED ALERT: Enemy AI strategic decision making"""
player_ai = self.game_state.players[1]
# 1. Check if AI should build next building
if self.ai_build_index < len(self.ai_build_plan):
next_building_type = self.ai_build_plan[self.ai_build_index]
building_type = BuildingType(next_building_type)
cost = BUILDING_COSTS.get(building_type, 0)
# Check if AI can afford it
if player_ai.credits >= cost:
# Check if prerequisites are met (simplified)
can_build = True
# Check power plant requirement
if building_type in [BuildingType.BARRACKS, BuildingType.REFINERY, BuildingType.WAR_FACTORY]:
has_power_plant = any(
b.type == BuildingType.POWER_PLANT and b.player_id == 1
for b in self.game_state.buildings.values()
)
if not has_power_plant:
can_build = False
# Check barracks requirement for war factory
if building_type == BuildingType.WAR_FACTORY:
has_barracks = any(
b.type == BuildingType.BARRACKS and b.player_id == 1
for b in self.game_state.buildings.values()
)
if not has_barracks:
can_build = False
if can_build:
# Find build location near AI HQ
ai_hq = next(
(b for b in self.game_state.buildings.values()
if b.player_id == 1 and b.type == BuildingType.HQ),
None
)
if ai_hq:
# Random offset from HQ
offset_x = random.randint(-200, 200)
offset_y = random.randint(-200, 200)
build_pos = Position(
max(TILE_SIZE * 3, min(MAP_WIDTH * TILE_SIZE - TILE_SIZE * 3,
ai_hq.position.x + offset_x)),
max(TILE_SIZE * 3, min(MAP_HEIGHT * TILE_SIZE - TILE_SIZE * 3,
ai_hq.position.y + offset_y))
)
# Deduct credits
player_ai.credits -= cost
# Create building immediately (simplified - no construction queue for AI)
self.game_state.create_building(building_type, 1, build_pos)
print(f"🤖 AI built {building_type.value} at {build_pos.x:.0f},{build_pos.y:.0f}")
# Move to next building in plan
self.ai_build_index += 1
# 2. Produce units if we have production buildings
ai_barracks = [
b for b in self.game_state.buildings.values()
if b.player_id == 1 and b.type == BuildingType.BARRACKS
]
ai_war_factory = [
b for b in self.game_state.buildings.values()
if b.player_id == 1 and b.type == BuildingType.WAR_FACTORY
]
# Produce infantry from barracks
if ai_barracks and len(ai_barracks[0].production_queue) == 0:
if player_ai.credits >= UNIT_COSTS[UnitType.INFANTRY]:
player_ai.credits -= UNIT_COSTS[UnitType.INFANTRY]
ai_barracks[0].production_queue.append(UnitType.INFANTRY.value)
print(f"🤖 AI queued Infantry")
# Produce vehicles from war factory (cycle through unit types)
if ai_war_factory and len(ai_war_factory[0].production_queue) == 0:
next_unit_type = self.ai_unit_cycle[self.ai_unit_index]
unit_type = UnitType(next_unit_type)
cost = UNIT_COSTS.get(unit_type, 0)
if player_ai.credits >= cost:
player_ai.credits -= cost
ai_war_factory[0].production_queue.append(unit_type.value)
self.ai_unit_index = (self.ai_unit_index + 1) % len(self.ai_unit_cycle)
print(f"🤖 AI queued {unit_type.value}")
# 3. Make AI harvesters collect resources
for unit in self.game_state.units.values():
if unit.player_id == 1 and unit.type == UnitType.HARVESTER:
if not unit.manual_control:
# Harvester AI is handled by update_harvester() in main loop
pass
async def launch_nuke(self, player_id: int, target: Position):
"""Launch nuclear strike at target location"""
# Damage radius: 200 pixels = 5 tiles
NUKE_DAMAGE_RADIUS = 200.0
NUKE_MAX_DAMAGE = 200 # Maximum damage at center
# Damage all units within radius
units_to_remove = []
for unit_id, unit in self.game_state.units.items():
distance = unit.position.distance_to(target)
if distance <= NUKE_DAMAGE_RADIUS:
# Damage decreases with distance (full damage at center, 50% at edge)
damage_factor = 1.0 - (distance / NUKE_DAMAGE_RADIUS) * 0.5
damage = int(NUKE_MAX_DAMAGE * damage_factor)
unit.health -= damage
if unit.health <= 0:
units_to_remove.append(unit_id)
# Remove destroyed units
for unit_id in units_to_remove:
del self.game_state.units[unit_id]
# Damage buildings within radius
buildings_to_remove = []
for building_id, building in self.game_state.buildings.items():
distance = building.position.distance_to(target)
if distance <= NUKE_DAMAGE_RADIUS:
# Damage decreases with distance
damage_factor = 1.0 - (distance / NUKE_DAMAGE_RADIUS) * 0.5
damage = int(NUKE_MAX_DAMAGE * damage_factor)
building.health -= damage
if building.health <= 0:
buildings_to_remove.append(building_id)
# Remove destroyed buildings
for building_id in buildings_to_remove:
del self.game_state.buildings[building_id]
print(f"💥 NUKE launched by player {player_id} at ({target.x:.0f}, {target.y:.0f})")
print(f" Destroyed {len(units_to_remove)} units and {len(buildings_to_remove)} buildings")
async def handle_command(self, command: dict):
"""Handle game commands from clients"""
cmd_type = command.get("type")
if cmd_type == "move_unit":
unit_ids = command.get("unit_ids", [])
target = command.get("target")
print(f"🎮 MOVE_UNIT command received: unit_ids={unit_ids}, target={target}")
if target and "x" in target and "y" in target:
base_target = Position(target["x"], target["y"])
print(f" Moving to position ({base_target.x:.1f}, {base_target.y:.1f})")
# If multiple units, spread them in a formation
if len(unit_ids) > 1:
print(f" Setting formation for {len(unit_ids)} units")
# Formation pattern: circular spread around target
radius = 30.0 # Distance between units in formation
for idx, uid in enumerate(unit_ids):
if uid in self.game_state.units:
unit = self.game_state.units[uid]
print(f" Unit {uid} ({unit.type}) - setting target")
# Calculate offset position in circular formation
angle = (idx * 360.0 / len(unit_ids)) * (3.14159 / 180.0)
offset_x = radius * (1 + idx // 8) * 0.707106781 * ((idx % 2) * 2 - 1)
offset_y = radius * (1 + idx // 8) * 0.707106781 * (((idx + 1) % 2) * 2 - 1)
unit.target = Position(
base_target.x + offset_x,
base_target.y + offset_y
)
# FIX: Clear combat target and set manual order flag
unit.target_unit_id = None
unit.manual_order = True
# If it's a Harvester, enable manual control to override AI
if unit.type == UnitType.HARVESTER:
unit.manual_control = True
# Clear AI state
unit.gathering = False
unit.returning = False
unit.ore_target = None
else:
# Single unit - move to exact target
print(f" Setting single unit target")
for uid in unit_ids:
if uid in self.game_state.units:
unit = self.game_state.units[uid]
print(f" Unit {uid} ({unit.type}) at ({unit.position.x:.1f},{unit.position.y:.1f}) -> ({base_target.x:.1f},{base_target.y:.1f})")
unit.target = base_target
# FIX: Clear combat target and set manual order flag
unit.target_unit_id = None
unit.manual_order = True
# If it's a Harvester, enable manual control to override AI
if unit.type == UnitType.HARVESTER:
unit.manual_control = True
# Clear AI state
unit.gathering = False
unit.returning = False
unit.ore_target = None
elif cmd_type == "attack_unit":
attacker_ids = command.get("attacker_ids", [])
target_id = command.get("target_id")
for uid in attacker_ids:
if uid in self.game_state.units and target_id in self.game_state.units:
attacker = self.game_state.units[uid]
attacker.target_unit_id = target_id
attacker.target_building_id = None # Clear building target
attacker.manual_order = True # Set manual order flag
elif cmd_type == "attack_building":
attacker_ids = command.get("attacker_ids", [])
target_id = command.get("target_id")
for uid in attacker_ids:
if uid in self.game_state.units and target_id in self.game_state.buildings:
attacker = self.game_state.units[uid]
attacker.target_building_id = target_id
attacker.target_unit_id = None # Clear unit target
attacker.manual_order = True # Set manual order flag
elif cmd_type == "build_unit":
unit_type_str = command.get("unit_type")
player_id = command.get("player_id", 0)
preferred_building_id = command.get("building_id") # optional: choose production building
print(f"🏭 BUILD_UNIT command received: unit_type={unit_type_str}, player={player_id}, building={preferred_building_id}")
if not unit_type_str:
print(" ❌ No unit_type provided")
return
try:
unit_type = UnitType(unit_type_str)
except ValueError:
return
# RED ALERT: Check cost!
cost = UNIT_COSTS.get(unit_type, 0)
player_language = self.game_state.players[player_id].language if player_id in self.game_state.players else "en"
current_credits = self.game_state.players[player_id].credits if player_id in self.game_state.players else 0
if current_credits < cost:
# Not enough credits! (translated)
message = LOCALIZATION.translate(
player_language,
"notification.insufficient_credits",
cost=cost,
current=current_credits
)
await self.broadcast({
"type": "notification",
"message": message,
"level": "error"
})
return
# Find required building type
required_building = PRODUCTION_REQUIREMENTS.get(unit_type)
if not required_building:
return
# If provided, use preferred building if valid
suitable_building = None
if preferred_building_id and preferred_building_id in self.game_state.buildings:
b = self.game_state.buildings[preferred_building_id]
if b.player_id == player_id and b.type == required_building:
suitable_building = b
# Otherwise choose least busy eligible building
if not suitable_building:
eligible = [
b for b in self.game_state.buildings.values()
if b.player_id == player_id and b.type == required_building
]
if eligible:
suitable_building = min(eligible, key=lambda b: len(b.production_queue))
if suitable_building:
# RED ALERT: Deduct credits!
self.game_state.players[player_id].credits -= cost
# Add to production queue
suitable_building.production_queue.append(unit_type_str)
print(f" ✅ Added {unit_type_str} to {suitable_building.type} queue (building {suitable_building.id})")
print(f" Queue is now: {suitable_building.production_queue}")
print(f" Credits: {self.game_state.players[player_id].credits}")
# Translated notification
unit_name = LOCALIZATION.translate(player_language, f"unit.{unit_type_str}")
message = LOCALIZATION.translate(player_language, "notification.unit_training", unit=unit_name)
await self.broadcast({
"type": "notification",
"message": message,
"level": "success"
})
else:
# Translated requirement message
unit_name = LOCALIZATION.translate(player_language, f"unit.{unit_type_str}")
building_name = LOCALIZATION.translate(player_language, f"building.{required_building.value}")
message = LOCALIZATION.translate(
player_language,
"notification.unit_requires",
unit=unit_name,
requirement=building_name
)
await self.broadcast({
"type": "notification",
"message": message,
"level": "error"
})
elif cmd_type == "build_building":
building_type_str = command.get("building_type")
position = command.get("position")
player_id = command.get("player_id", 0)
if not building_type_str or not position:
return
try:
building_type = BuildingType(building_type_str)
except ValueError:
return
# RED ALERT: Check cost!
cost = BUILDING_COSTS.get(building_type, 0)
player_language = self.game_state.players[player_id].language if player_id in self.game_state.players else "en"
current_credits = self.game_state.players[player_id].credits if player_id in self.game_state.players else 0
if current_credits < cost:
# Not enough credits! (translated)
message = LOCALIZATION.translate(
player_language,
"notification.insufficient_credits",
cost=cost,
current=current_credits
)
await self.broadcast({
"type": "notification",
"message": message,
"level": "error"
})
return
# Rule: limit multiple same-type buildings if disabled
if not ALLOW_MULTIPLE_SAME_BUILDING and building_type != BuildingType.HQ:
for b in self.game_state.buildings.values():
if b.player_id == player_id and b.type == building_type:
message = LOCALIZATION.translate(player_language, "notification.building_limit_one", building=LOCALIZATION.translate(player_language, f"building.{building_type_str}"))
await self.broadcast({"type":"notification","message":message,"level":"error"})
return
# Enforce HQ build radius
# Find player's HQ
hq = None
for b in self.game_state.buildings.values():
if b.player_id == player_id and b.type == BuildingType.HQ:
hq = b
break
if hq and position and "x" in position and "y" in position:
max_dist = HQ_BUILD_RADIUS_TILES * TILE_SIZE
dx = position["x"] - hq.position.x
dy = position["y"] - hq.position.y
if (dx*dx + dy*dy) ** 0.5 > max_dist:
message = LOCALIZATION.translate(player_language, "notification.building_too_far_from_hq")
await self.broadcast({"type":"notification","message":message,"level":"error"})
return
# RED ALERT: Deduct credits!
self.game_state.players[player_id].credits -= cost
if position and "x" in position and "y" in position:
self.game_state.create_building(
building_type,
player_id,
Position(position["x"], position["y"])
)
# Translated notification
building_name = LOCALIZATION.translate(player_language, f"building.{building_type_str}")
message = LOCALIZATION.translate(player_language, "notification.building_placed", building=building_name)
await self.broadcast({
"type": "notification",
"message": message,
"level": "success"
})
elif cmd_type == "stop_units":
unit_ids = command.get("unit_ids", [])
for uid in unit_ids:
if uid in self.game_state.units:
self.game_state.units[uid].target = None
elif cmd_type == "prepare_nuke":
player_id = command.get("player_id", 0)
if player_id in self.game_state.players:
player = self.game_state.players[player_id]
if player.superweapon_ready:
player.nuke_preparing = True
await self.broadcast({
"type": "nuke_preparing",
"player_id": player_id
})
elif cmd_type == "cancel_nuke":
player_id = command.get("player_id", 0)
if player_id in self.game_state.players:
self.game_state.players[player_id].nuke_preparing = False
elif cmd_type == "launch_nuke":
player_id = command.get("player_id", 0)
target = command.get("target")
if player_id in self.game_state.players and target:
player = self.game_state.players[player_id]
if player.superweapon_ready and player.nuke_preparing:
target_pos = Position(target["x"], target["y"])
# Launch nuke effect
await self.launch_nuke(player_id, target_pos)
# Reset superweapon state
player.superweapon_ready = False
player.superweapon_charge = 0
player.nuke_preparing = False
# Broadcast nuke launch
await self.broadcast({
"type": "nuke_launched",
"player_id": player_id,
"target": {"x": target_pos.x, "y": target_pos.y}
})
elif cmd_type == "change_language":
player_id = command.get("player_id", 0)
language = command.get("language", "en")
if player_id in self.game_state.players:
# Validate language
supported = list(LOCALIZATION.get_supported_languages())
if language in supported:
self.game_state.players[player_id].language = language
# Trigger immediate AI analysis in new language
self.last_ai_analysis_time = 0
await self.broadcast({
"type": "notification",
"message": f"Language changed to {LOCALIZATION.get_display_name(language)}",
"level": "info"
})
elif cmd_type == "request_ai_analysis":
# Force immediate AI analysis
await self.run_ai_analysis()
await self.broadcast({
"type": "ai_analysis_update",
"analysis": self.last_ai_analysis
})
# Global connection manager
manager = ConnectionManager()
# Routes
@app.get("/")
async def get_home():
"""Serve the main game interface"""
return HTMLResponse(content=open("static/index.html").read())
@app.get("/health")
async def health_check():
"""Health check endpoint for HuggingFace Spaces"""
return {
"status": "healthy",
"players": len(manager.game_state.players),
"units": len(manager.game_state.units),
"buildings": len(manager.game_state.buildings),
"active_connections": len(manager.active_connections),
"ai_available": manager.ai_analyzer.model_available,
"supported_languages": list(LOCALIZATION.get_supported_languages())
}
@app.get("/api/languages")
async def get_languages():
"""Get supported languages"""
languages = []
for lang_code in LOCALIZATION.get_supported_languages():
languages.append({
"code": lang_code,
"name": LOCALIZATION.get_display_name(lang_code)
})
return {"languages": languages}
@app.get("/api/translations/{language}")
async def get_translations(language: str):
"""Get all translations for a language"""
from localization import TRANSLATIONS
if language not in TRANSLATIONS:
language = "en"
return {"translations": TRANSLATIONS[language], "language": language}
@app.post("/api/player/{player_id}/language")
async def set_player_language(player_id: int, language: str):
"""Set player's preferred language"""
if player_id in manager.game_state.players:
manager.game_state.players[player_id].language = language
return {"success": True, "language": language}
return {"success": False, "error": "Player not found"}
@app.get("/api/ai/status")
async def get_ai_status():
"""Get AI analyzer status"""
return {
"available": manager.ai_analyzer.model_available,
"model_path": manager.ai_analyzer.model_path if manager.ai_analyzer.model_available else None,
"last_analysis": manager.last_ai_analysis
}
@app.post("/api/nl/translate")
async def translate_nl_command(data: dict):
"""
Translate natural language command to MCP JSON
Body: {"command": str, "language": str (optional)}
"""
translator = get_nl_translator()
command = data.get("command", "")
language = data.get("language", None)
if not command:
return {"success": False, "error": "No command provided"}
result = translator.translate_command(command, language)
return result
@app.get("/api/nl/examples")
async def get_nl_examples(language: str = "en"):
"""Get example natural language commands"""
translator = get_nl_translator()
return {
"language": language,
"examples": translator.get_example_commands(language)
}
@app.get("/api/nl/status")
async def get_nl_status():
"""Get NL translator status"""
translator = get_nl_translator()
return {
"available": translator.model_loaded,
"model_path": translator.model_path,
"last_error": translator.last_error
}
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time game communication"""
await manager.connect(websocket)
try:
# Send initial state
await websocket.send_json({
"type": "init",
"state": manager.game_state.to_dict()
})
# Handle incoming messages
while True:
data = await websocket.receive_json()
await manager.handle_command(data)
except WebSocketDisconnect:
manager.disconnect(websocket)
except Exception as e:
print(f"WebSocket error: {e}")
manager.disconnect(websocket)
# Mount static files (will be created next)
try:
app.mount("/static", StaticFiles(directory="static"), name="static")
except:
pass
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)