import os import tempfile import shutil import asyncio from pathlib import Path from typing import List, Optional, Dict, Any import uuid import re from fastapi import ( UploadFile, File, Form, HTTPException, FastAPI, ) from fastapi.responses import JSONResponse from pydantic import BaseModel import json import resource import platform from loguru import logger import subprocess app = FastAPI(title="Code Execution and Input Analysis API", docs_url="/") # Configuration MAX_EXECUTION_TIME = 5 # seconds MAX_MEMORY = 256 * 1024 * 1024 # 256MB MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB MAX_TOTAL_SIZE = 2 * 1024 * 1024 # 2MB total size limit for uploads and repos class ExecutionResult(BaseModel): success: bool stdout: str stderr: str execution_time: float exit_code: int error: Optional[str] = None class InputPattern(BaseModel): type: str # "input", "scanf", "cin", etc. line_number: int variable_name: Optional[str] = None prompt_message: Optional[str] = None data_type: Optional[str] = None # "int", "str", "float", etc. raw_code: str class InputAnalysisResult(BaseModel): language: str total_inputs: int input_patterns: List[InputPattern] suggestions: List[str] # UI suggestions for user class CodeExecutor: def __init__(self): self.compilers = { "c": "clang" if platform.system() == "Darwin" else "gcc", "cpp": "clang++" if platform.system() == "Darwin" else "g++", "java": "javac", "python": "python3", } # Check and log ulimits on startup self._check_ulimits() def _check_ulimits(self): """Check current ulimits and log them""" try: # Check various ulimits limits = { "RLIMIT_NOFILE": resource.RLIMIT_NOFILE, # file descriptors "RLIMIT_NPROC": resource.RLIMIT_NPROC, # processes "RLIMIT_STACK": resource.RLIMIT_STACK, # stack size } logger.info("Current ulimits:") for name, limit_type in limits.items(): try: soft, hard = resource.getrlimit(limit_type) logger.info(f"{name}: soft={soft}, hard={hard}") except: pass # Try to increase limits if they're too low try: # Increase file descriptors resource.setrlimit(resource.RLIMIT_NOFILE, (65536, 65536)) except: try: # Try a lower value if the high one fails resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096)) except: pass except Exception as e: logger.warning(f"Could not check/set ulimits: {e}") async def _create_java_wrapper(self, workspace: str, jvm_opts: List[str]) -> str: """Create a wrapper script for Java execution with ulimit settings""" wrapper_content = f"""#!/bin/bash # Set ulimits to prevent memory allocation issues ulimit -n 65536 2>/dev/null || ulimit -n 4096 ulimit -u 32768 2>/dev/null || ulimit -u 2048 ulimit -s 8192 2>/dev/null || ulimit -s 4096 ulimit -v unlimited 2>/dev/null || true ulimit -m unlimited 2>/dev/null || true # Log current ulimits for debugging echo "Current ulimits:" >&2 ulimit -a >&2 # Execute Java with the provided options exec java {' '.join(jvm_opts)} "$@" """ wrapper_path = os.path.join(workspace, "java_wrapper.sh") with open(wrapper_path, 'w') as f: f.write(wrapper_content) # Make executable os.chmod(wrapper_path, 0o755) return wrapper_path def set_resource_limits(self): """Set resource limits for subprocess (Unix only)""" if platform.system() == "Linux": resource.setrlimit( resource.RLIMIT_CPU, (MAX_EXECUTION_TIME, MAX_EXECUTION_TIME) ) resource.setrlimit(resource.RLIMIT_AS, (MAX_MEMORY, MAX_MEMORY)) def check_available_memory(self) -> int: """Create a wrapper script for Java execution with ulimit settings""" wrapper_content = f"""#!/bin/bash # Set ulimits to prevent memory allocation issues ulimit -n 65536 2>/dev/null || ulimit -n 4096 ulimit -u 32768 2>/dev/null || ulimit -u 2048 ulimit -s 8192 2>/dev/null || ulimit -s 4096 ulimit -v unlimited 2>/dev/null || true ulimit -m unlimited 2>/dev/null || true # Log current ulimits for debugging echo "Current ulimits:" >&2 ulimit -a >&2 # Execute Java with the provided options exec java {' '.join(jvm_opts)} "$@" """ wrapper_path = os.path.join(workspace, "java_wrapper.sh") with open(wrapper_path, 'w') as f: f.write(wrapper_content) # Make executable os.chmod(wrapper_path, 0o755) return wrapper_path """Check available memory in MB""" try: with open('/proc/meminfo', 'r') as f: meminfo = f.read() available_match = re.search(r'MemAvailable:\s+(\d+)\s+kB', meminfo) if available_match: available_kb = int(available_match.group(1)) available_mb = available_kb // 1024 logger.info(f"Available memory: {available_mb}MB") return available_mb # Fallback to MemFree + Cached free_match = re.search(r'MemFree:\s+(\d+)\s+kB', meminfo) cached_match = re.search(r'Cached:\s+(\d+)\s+kB', meminfo) if free_match and cached_match: free_kb = int(free_match.group(1)) cached_kb = int(cached_match.group(1)) available_mb = (free_kb + cached_kb) // 1024 logger.info(f"Estimated available memory: {available_mb}MB") return available_mb except Exception as e: logger.warning(f"Could not check memory: {e}") return 0 # Unknown def analyze_input_patterns( self, language: str, file_contents: Dict[str, str] ) -> InputAnalysisResult: """Analyze code files to detect input patterns""" patterns = [] if language == "python": patterns = self._analyze_python_inputs(file_contents) elif language == "java": patterns = self._analyze_java_inputs(file_contents) elif language in ["c", "cpp"]: patterns = self._analyze_c_cpp_inputs(file_contents) suggestions = self._generate_input_suggestions(patterns, language) return InputAnalysisResult( language=language, total_inputs=len(patterns), input_patterns=patterns, suggestions=suggestions, ) def _analyze_python_inputs( self, file_contents: Dict[str, str] ) -> List[InputPattern]: """Analyze Python files for input() patterns""" patterns = [] for filename, content in file_contents.items(): lines = content.split("\n") for i, line in enumerate(lines, 1): # Pattern 1: variable = input("prompt") match = re.search( r'(\w+)\s*=\s*input\s*\(\s*["\']([^"\']*)["\']?\s*\)', line ) if match: var_name, prompt = match.groups() patterns.append( InputPattern( type="input", line_number=i, variable_name=var_name, prompt_message=prompt or f"Enter value for {var_name}", data_type="str", raw_code=line.strip(), ) ) continue # Pattern 2: variable = int(input("prompt")) match = re.search( r'(\w+)\s*=\s*(int|float|str)\s*\(\s*input\s*\(\s*["\']([^"\']*)["\']?\s*\)\s*\)', line, ) if match: var_name, data_type, prompt = match.groups() patterns.append( InputPattern( type="input", line_number=i, variable_name=var_name, prompt_message=prompt or f"Enter {data_type} value for {var_name}", data_type=data_type, raw_code=line.strip(), ) ) continue # Pattern 3: Simple input() without assignment if "input(" in line and "=" not in line: patterns.append( InputPattern( type="input", line_number=i, variable_name=None, prompt_message="Enter input", data_type="str", raw_code=line.strip(), ) ) return patterns def _analyze_java_inputs(self, file_contents: Dict[str, str]) -> List[InputPattern]: """Analyze Java files for Scanner input patterns""" patterns = [] for filename, content in file_contents.items(): lines = content.split("\n") for i, line in enumerate(lines, 1): # Pattern 1: scanner.nextInt(), scanner.nextLine(), etc. match = re.search(r"(\w+)\s*=\s*(\w+)\.next(\w+)\s*\(\s*\)", line) if match: var_name, scanner_name, method = match.groups() data_type = self._java_method_to_type(method) patterns.append( InputPattern( type="scanner", line_number=i, variable_name=var_name, prompt_message=f"Enter {data_type} value for {var_name}", data_type=data_type, raw_code=line.strip(), ) ) continue # Pattern 2: Direct scanner calls without assignment match = re.search(r"(\w+)\.next(\w+)\s*\(\s*\)", line) if match and "=" not in line: scanner_name, method = match.groups() data_type = self._java_method_to_type(method) patterns.append( InputPattern( type="scanner", line_number=i, variable_name=None, prompt_message=f"Enter {data_type} input", data_type=data_type, raw_code=line.strip(), ) ) return patterns def _analyze_c_cpp_inputs( self, file_contents: Dict[str, str] ) -> List[InputPattern]: """Analyze C/C++ files for input patterns""" patterns = [] for filename, content in file_contents.items(): lines = content.split("\n") for i, line in enumerate(lines, 1): # Pattern 1: scanf("%d", &variable) match = re.search( r'scanf\s*\(\s*["\']([^"\']*)["\'],\s*&(\w+)\s*\)', line ) if match: format_spec, var_name = match.groups() data_type = self._c_format_to_type(format_spec) patterns.append( InputPattern( type="scanf", line_number=i, variable_name=var_name, prompt_message=f"Enter {data_type} value for {var_name}", data_type=data_type, raw_code=line.strip(), ) ) continue # Pattern 2: cin >> variable (C++) match = re.search(r"cin\s*>>\s*(\w+)", line) if match: var_name = match.group(1) patterns.append( InputPattern( type="cin", line_number=i, variable_name=var_name, prompt_message=f"Enter value for {var_name}", data_type="unknown", raw_code=line.strip(), ) ) continue # Pattern 3: getline(cin, variable) for strings match = re.search(r"getline\s*\(\s*cin\s*,\s*(\w+)\s*\)", line) if match: var_name = match.group(1) patterns.append( InputPattern( type="getline", line_number=i, variable_name=var_name, prompt_message=f"Enter string for {var_name}", data_type="string", raw_code=line.strip(), ) ) return patterns def _java_method_to_type(self, method: str) -> str: """Convert Java Scanner method to data type""" type_mapping = { "Int": "int", "Double": "double", "Float": "float", "Long": "long", "Line": "string", "": "string", } return type_mapping.get(method, "string") def _c_format_to_type(self, format_spec: str) -> str: """Convert C format specifier to data type""" if "%d" in format_spec or "%i" in format_spec: return "int" elif "%f" in format_spec: return "float" elif "%lf" in format_spec: return "double" elif "%c" in format_spec: return "char" elif "%s" in format_spec: return "string" return "unknown" def _generate_input_suggestions( self, patterns: List[InputPattern], language: str ) -> List[str]: """Generate UI suggestions based on detected patterns""" suggestions = [] if not patterns: suggestions.append( "No input patterns detected. Code will run without user input." ) return suggestions suggestions.append( f"Detected {len(patterns)} input requirement(s) in {language} code:" ) for i, pattern in enumerate(patterns, 1): if pattern.variable_name: suggestions.append( f"{i}. Line {pattern.line_number}: {pattern.prompt_message} " f"(Variable: {pattern.variable_name}, Type: {pattern.data_type})" ) else: suggestions.append( f"{i}. Line {pattern.line_number}: {pattern.prompt_message} " f"(Type: {pattern.data_type})" ) suggestions.append( "Please provide input values in the order they appear in the code." ) return suggestions async def execute_code( self, language: str, main_files: List[str], workspace: str, input_data: Optional[List[str]] = None, ) -> ExecutionResult: """Execute code based on language with optional input data""" try: if language == "python": return await self._execute_python(main_files, workspace, input_data) elif language == "java": return await self._execute_java(main_files, workspace, input_data) elif language in ["c", "cpp"]: return await self._execute_c_cpp( main_files, workspace, language, input_data ) else: raise ValueError(f"Unsupported language: {language}") except Exception as e: return ExecutionResult( success=False, stdout="", stderr=str(e), execution_time=0, exit_code=-1, error=str(e), ) async def _execute_with_input( self, command: List[str], workspace: str, input_data: Optional[List[str]] = None ) -> tuple: """Execute process with input data""" process = await asyncio.create_subprocess_exec( *command, cwd=workspace, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE, preexec_fn=( self.set_resource_limits if platform.system() == "Linux" else None ), ) # Prepare input string stdin_input = None if input_data: stdin_input = "\n".join(input_data) + "\n" stdin_input = stdin_input.encode("utf-8") try: stdout, stderr = await asyncio.wait_for( process.communicate(input=stdin_input), timeout=MAX_EXECUTION_TIME ) return stdout, stderr, process.returncode except asyncio.TimeoutError: process.kill() await process.wait() raise asyncio.TimeoutError() async def _execute_python( self, main_files: List[str], workspace: str, input_data: Optional[List[str]] = None, ) -> ExecutionResult: """Execute Python code with input support""" results = [] for main_file in main_files: file_path = os.path.join(workspace, main_file) if not os.path.exists(file_path): results.append( ExecutionResult( success=False, stdout="", stderr=f"File not found: {main_file}", execution_time=0, exit_code=-1, ) ) continue try: start_time = asyncio.get_event_loop().time() stdout, stderr, returncode = await self._execute_with_input( ["python3", main_file], workspace, input_data ) execution_time = asyncio.get_event_loop().time() - start_time results.append( ExecutionResult( success=returncode == 0, stdout=stdout.decode("utf-8", errors="replace"), stderr=stderr.decode("utf-8", errors="replace"), execution_time=execution_time, exit_code=returncode, ) ) except asyncio.TimeoutError: results.append( ExecutionResult( success=False, stdout="", stderr="Execution timeout exceeded", execution_time=MAX_EXECUTION_TIME, exit_code=-1, ) ) except Exception as e: results.append( ExecutionResult( success=False, stdout="", stderr=str(e), execution_time=0, exit_code=-1, error=str(e), ) ) return self._combine_results(results, main_files) async def _execute_java( self, main_files: List[str], workspace: str, input_data: Optional[List[str]] = None, ) -> ExecutionResult: """Compile and execute Java code with optimized memory settings""" # Check if we have .java files to compile java_files = list(Path(workspace).glob("*.java")) needs_compilation = len(java_files) > 0 # If we have .java files, compile them if needs_compilation: logger.info(f"Found {len(java_files)} Java source files, compiling...") # Compile with minimal memory compile_cmd = ["javac", "-J-Xmx32m"] + [str(f) for f in java_files] compile_process = await asyncio.create_subprocess_exec( *compile_cmd, cwd=workspace, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await compile_process.communicate() if compile_process.returncode != 0: return ExecutionResult( success=False, stdout="", stderr=f"Compilation failed:\n{stderr.decode('utf-8', errors='replace')}", execution_time=0, exit_code=compile_process.returncode, ) logger.info("Java compilation successful") else: # Check if we have .class files class_files_missing = [] for main_file in main_files: if main_file.endswith(".class"): class_file_path = os.path.join(workspace, main_file) else: class_file_path = os.path.join(workspace, f"{main_file}.class") if not os.path.exists(class_file_path): class_files_missing.append(main_file) if class_files_missing: return ExecutionResult( success=False, stdout="", stderr=f"No Java source files found and missing .class files for: {', '.join(class_files_missing)}", execution_time=0, exit_code=-1, ) logger.info("Using existing .class files, skipping compilation") # Execute Java with multiple fallback strategies results = [] for main_file in main_files: # Determine class name if main_file.endswith(".class"): class_name = main_file.replace(".class", "") elif main_file.endswith(".java"): class_name = main_file.replace(".java", "") else: class_name = main_file # Verify the .class file exists class_file_path = os.path.join(workspace, f"{class_name}.class") if not os.path.exists(class_file_path): results.append( ExecutionResult( success=False, stdout="", stderr=f"Class file not found: {class_name}.class", execution_time=0, exit_code=-1, ) ) continue # Try different JVM configurations jvm_configs = [ # Config 1: Ultra minimal with no code cache reservation [ "-Xint", # Interpreter only mode (no JIT) "-Xmx32m", "-Xms2m", "-XX:MaxMetaspaceSize=16m", "-Djava.awt.headless=true", ], # Config 2: Minimal with small code cache [ "-XX:+UseSerialGC", "-Xmx32m", "-Xms4m", "-XX:ReservedCodeCacheSize=8m", "-XX:InitialCodeCacheSize=2m", "-XX:MaxMetaspaceSize=24m", "-XX:-TieredCompilation", "-Djava.awt.headless=true", ], # Config 3: Slightly more memory [ "-Xmx48m", "-Xms8m", "-XX:ReservedCodeCacheSize=16m", "-XX:MaxMetaspaceSize=32m", "-XX:-TieredCompilation", "-Djava.awt.headless=true", ], # Config 4: Use container support [ "-XX:+UseContainerSupport", "-XX:MaxRAMPercentage=25.0", "-XX:-TieredCompilation", "-Djava.awt.headless=true", ], ] result = None for config_idx, jvm_opts in enumerate(jvm_configs): try: logger.info(f"Trying JVM config {config_idx + 1} for {class_name}") start_time = asyncio.get_event_loop().time() # For first two configs, try with wrapper script if config_idx < 2: wrapper_path = await self._create_java_wrapper(workspace, jvm_opts) java_cmd = [wrapper_path, class_name] else: java_cmd = ["java"] + jvm_opts + [class_name] stdout, stderr, returncode = await self._execute_with_input( java_cmd, workspace, input_data ) execution_time = asyncio.get_event_loop().time() - start_time # Check if it's a memory-related error stderr_text = stderr.decode("utf-8", errors="replace") if "Could not reserve" in stderr_text or "insufficient memory" in stderr_text: logger.warning(f"Config {config_idx + 1} failed with memory error, trying next...") continue result = ExecutionResult( success=returncode == 0, stdout=stdout.decode("utf-8", errors="replace"), stderr=stderr_text, execution_time=execution_time, exit_code=returncode, ) logger.info(f"Java execution completed with config {config_idx + 1}") break except asyncio.TimeoutError: result = ExecutionResult( success=False, stdout="", stderr="Execution timeout exceeded", execution_time=MAX_EXECUTION_TIME, exit_code=-1, ) break except Exception as e: logger.error(f"Error with config {config_idx + 1}: {str(e)}") continue # If all configs failed, return error if result is None: result = ExecutionResult( success=False, stdout="", stderr="Failed to execute Java code with all memory configurations. The container environment may have insufficient memory for JVM.", execution_time=0, exit_code=-1, error="All JVM configurations failed" ) results.append(result) return self._combine_results(results, main_files) async def _execute_c_cpp( self, main_files: List[str], workspace: str, language: str, input_data: Optional[List[str]] = None, ) -> ExecutionResult: """Compile and execute C/C++ code with input support""" compiler = self.compilers[language] results = [] for main_file in main_files: file_path = os.path.join(workspace, main_file) if not os.path.exists(file_path): results.append( ExecutionResult( success=False, stdout="", stderr=f"File not found: {main_file}", execution_time=0, exit_code=-1, ) ) continue # Output binary name output_name = main_file.replace(".c", "").replace(".cpp", "") # Compile compile_process = await asyncio.create_subprocess_exec( compiler, main_file, "-o", output_name, cwd=workspace, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await compile_process.communicate() if compile_process.returncode != 0: results.append( ExecutionResult( success=False, stdout="", stderr=f"Compilation failed:\n{stderr.decode('utf-8', errors='replace')}", execution_time=0, exit_code=compile_process.returncode, ) ) continue # Execute try: start_time = asyncio.get_event_loop().time() stdout, stderr, returncode = await self._execute_with_input( [f"./{output_name}"], workspace, input_data ) execution_time = asyncio.get_event_loop().time() - start_time results.append( ExecutionResult( success=returncode == 0, stdout=stdout.decode("utf-8", errors="replace"), stderr=stderr.decode("utf-8", errors="replace"), execution_time=execution_time, exit_code=returncode, ) ) except asyncio.TimeoutError: results.append( ExecutionResult( success=False, stdout="", stderr="Execution timeout exceeded", execution_time=MAX_EXECUTION_TIME, exit_code=-1, ) ) except Exception as e: results.append( ExecutionResult( success=False, stdout="", stderr=str(e), execution_time=0, exit_code=-1, error=str(e), ) ) return self._combine_results(results, main_files) def _combine_results( self, results: List[ExecutionResult], main_files: List[str] ) -> ExecutionResult: """Combine multiple execution results""" if len(results) == 1: return results[0] else: combined_stdout = "\n".join( [f"=== {main_files[i]} ===\n{r.stdout}" for i, r in enumerate(results)] ) combined_stderr = "\n".join( [ f"=== {main_files[i]} ===\n{r.stderr}" for i, r in enumerate(results) if r.stderr ] ) total_time = sum(r.execution_time for r in results) all_success = all(r.success for r in results) return ExecutionResult( success=all_success, stdout=combined_stdout, stderr=combined_stderr, execution_time=total_time, exit_code=0 if all_success else -1, ) def get_directory_size(directory_path: str) -> int: """Calculate total size of directory in bytes""" total_size = 0 try: for dirpath, dirnames, filenames in os.walk(directory_path): for filename in filenames: filepath = os.path.join(dirpath, filename) if os.path.exists(filepath): total_size += os.path.getsize(filepath) except Exception as e: print(f"Error calculating directory size: {e}") return float("inf") return total_size def validate_upload_size(files: List[UploadFile]) -> tuple[bool, int]: """Validate total size of uploaded files""" total_size = 0 for file in files: if hasattr(file, "size") and file.size: total_size += file.size else: return True, 0 return total_size <= MAX_TOTAL_SIZE, total_size # Create executor instance executor = CodeExecutor() async def clone_repo(repo_url: str, workspace: str): """Clone a git repository into the workspace.""" try: process = await asyncio.create_subprocess_exec( "git", "clone", repo_url, ".", cwd=workspace, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode != 0: raise HTTPException( status_code=400, detail=f"Failed to clone repository: {stderr.decode()}" ) repo_size = get_directory_size(workspace) if repo_size > MAX_TOTAL_SIZE: raise HTTPException( status_code=400, detail=f"Repository size ({repo_size / 1024 / 1024:.2f}MB) exceeds limit ({MAX_TOTAL_SIZE / 1024 / 1024:.2f}MB)", ) except HTTPException: raise except Exception as e: raise HTTPException( status_code=500, detail=f"Error cloning repository: {str(e)}" ) def detect_language_from_files(main_files: List[str]) -> str: """Detect programming language from file extensions""" if not main_files: raise HTTPException(status_code=400, detail="No main files provided") first_file = main_files[0] if "." not in first_file: java_extensions = [".java", ".class"] for file in main_files: if any(file.endswith(ext) for ext in java_extensions): return "java" raise HTTPException( status_code=400, detail=f"Cannot detect language: file '{first_file}' has no extension", ) extension = first_file.split(".")[-1].lower() extension_to_language = { "py": "python", "java": "java", "class": "java", "c": "c", "cpp": "cpp", "cc": "cpp", "cxx": "cpp", "c++": "cpp", } if extension not in extension_to_language: supported_extensions = ", ".join(extension_to_language.keys()) raise HTTPException( status_code=400, detail=f"Unsupported file extension '.{extension}'. Supported extensions: {supported_extensions}", ) detected_language = extension_to_language[extension] for file in main_files: if "." in file: file_ext = file.split(".")[-1].lower() file_language = extension_to_language.get(file_ext) if file_language != detected_language: raise HTTPException( status_code=400, detail=f"Mixed languages detected: '{first_file}' ({detected_language}) and '{file}' ({file_language})", ) return detected_language @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "timestamp": asyncio.get_event_loop().time()} @app.post("/analyze-inputs") async def analyze_inputs(code_content: str = Form(...), language: str = Form(...)): """ Analyze code content to detect input patterns with caching Simple API that takes code content and language, returns input patterns Args: code_content: The source code content to analyze language: Programming language (python, java, c, cpp) Returns: InputAnalysisResult with detected input patterns """ try: # Validate language supported_languages = ["python", "java", "c", "cpp"] if language.lower() not in supported_languages: raise HTTPException( status_code=400, detail=f"Unsupported language: {language}. Supported: {supported_languages}", ) # Create a simple file contents dict for analysis file_contents_dict = {"main": code_content} analysis_result = executor.analyze_input_patterns( language.lower(), file_contents_dict ) result_dict = analysis_result.model_dump() logger.info(f"Input analysis completed for {language} code") return JSONResponse(content=result_dict) except HTTPException: raise except Exception as e: logger.error(f"Error analyzing inputs: {str(e)}") raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") @app.post("/judge") async def judge_code( main_files: str = Form(...), files: List[UploadFile] = File(None), repo_url: Optional[str] = Form(None), input_data: Optional[str] = Form(None), # JSON array of input strings ): """ Judge code submission with optional input data - main_files: JSON array of main files to execute - files: Multiple files maintaining folder structure - repo_url: Git repository URL (alternative to files) - input_data: JSON array of input strings for programs that require user input """ # Parse main_files try: main_files_list = json.loads(main_files) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid main_files format") # Parse input_data if provided input_list = None logger.info(f"Received input_data: {input_data}") if input_data: try: input_list = json.loads(input_data) if not isinstance(input_list, list): raise ValueError("Input data must be an array") except (json.JSONDecodeError, ValueError): raise HTTPException( status_code=400, detail="Invalid input_data format - must be JSON array" ) # Auto-detect language from file extensions language = detect_language_from_files(main_files_list) # Validate input: either files or repo_url must be provided if not files and not repo_url: raise HTTPException( status_code=400, detail="Either files or repo_url must be provided" ) if files and repo_url: raise HTTPException( status_code=400, detail="Provide either files or repo_url, not both" ) # Create temporary workspace workspace = None try: # Create unique temporary directory workspace = tempfile.mkdtemp(prefix=f"judge_{uuid.uuid4().hex}_") if repo_url: # Clone repository await clone_repo(repo_url, workspace) else: # Validate total upload size total_upload_size = 0 file_contents = [] # Pre-read all files to check total size for file in files: content = await file.read() if len(content) > MAX_FILE_SIZE: raise HTTPException( status_code=400, detail=f"File {file.filename} exceeds individual size limit", ) total_upload_size += len(content) file_contents.append((file.filename, content)) # Check total size limit if total_upload_size > MAX_TOTAL_SIZE: raise HTTPException( status_code=400, detail=f"Total upload size ({total_upload_size / 1024 / 1024:.2f}MB) exceeds limit ({MAX_TOTAL_SIZE / 1024 / 1024:.2f}MB)", ) # Save uploaded files maintaining structure for filename, content in file_contents: # Create file path file_path = os.path.join(workspace, filename) # Create directories if needed os.makedirs(os.path.dirname(file_path), exist_ok=True) # Write file with open(file_path, "wb") as f: f.write(content) # Execute code with input data result = await executor.execute_code( language, main_files_list, workspace, input_list ) logger.info(f"Execution result: {result}") return JSONResponse(content=result.model_dump()) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) finally: # Clean up temporary files if workspace and os.path.exists(workspace): try: shutil.rmtree(workspace) except Exception as e: print(f"Error cleaning up workspace: {e}") @app.get("/languages") async def get_supported_languages(): """Get supported programming languages with auto-detection info""" return { "languages": [ {"id": "python", "name": "Python 3", "extensions": [".py"]}, {"id": "java", "name": "Java", "extensions": [".java", ".class"]}, {"id": "c", "name": "C", "extensions": [".c"]}, {"id": "cpp", "name": "C++", "extensions": [".cpp", ".cc", ".cxx", ".c++"]}, ], "note": "Language is automatically detected from file extensions. For Java, both source (.java) and compiled (.class) files are supported.", "input_support": "All languages support automatic input detection and handling for interactive programs.", } # Example usage endpoints for testing @app.get("/examples/input-patterns") async def get_input_pattern_examples(): """Get examples of supported input patterns for each language""" return { "python": [ 'name = input("Enter your name: ")', 'age = int(input("Enter your age: "))', 'score = float(input("Enter score: "))', "input() # Simple input without assignment", ], "java": [ "String name = scanner.nextLine();", "int age = scanner.nextInt();", "double score = scanner.nextDouble();", "scanner.next(); # Direct call", ], "c": ['scanf("%s", name);', 'scanf("%d", &age);', 'scanf("%f", &score);'], "cpp": ["cin >> name;", "cin >> age;", "getline(cin, fullName);"], } @app.post("/test-input-analysis") async def test_input_analysis(): """Test endpoint with sample code for input analysis""" # Sample Python code with inputs sample_code = { "main.py": """ name = input("Enter your name: ") age = int(input("Enter your age: ")) score = float(input("Enter your score: ")) print(f"Hello {name}") print(f"You are {age} years old") print(f"Your score is {score}") # Simple input without assignment input("Press enter to continue...") """ } # Test the analysis analysis_result = executor.analyze_input_patterns("python", sample_code) return { "sample_code": sample_code, "analysis": analysis_result.model_dump(), "suggested_inputs": [ "John Doe", # for name "25", # for age "95.5", # for score "", # for press enter ], } if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)