Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Asynchronous Question Processor | |
| Clean question handler that removes hardcoded overrides for honest accuracy measurement. | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import time | |
| import traceback | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any | |
| import subprocess | |
| import sys | |
| import os | |
| # Add the project root to the Python path | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| from gaia_web_loader import GAIAQuestionLoaderWeb | |
| from question_classifier import QuestionClassifier | |
| class AsyncQuestionProcessor: | |
| """Asynchronous processor for individual GAIA questions with clean execution.""" | |
| def __init__(self, | |
| session_dir: Path, | |
| timeout_seconds: int = 900, | |
| model: str = "qwen3-235b"): | |
| """ | |
| Initialize the async question processor. | |
| Args: | |
| session_dir: Directory for this test session | |
| timeout_seconds: Timeout per question processing | |
| model: Model to use for question solving | |
| """ | |
| self.session_dir = session_dir | |
| self.timeout_seconds = timeout_seconds | |
| self.model = model | |
| # Create individual logs directory | |
| self.logs_dir = session_dir / "individual_logs" | |
| self.logs_dir.mkdir(exist_ok=True) | |
| # Setup logging | |
| self.setup_logging() | |
| # Initialize components | |
| self.loader = GAIAQuestionLoaderWeb() | |
| self.classifier = QuestionClassifier() | |
| # Load validation metadata for accuracy checking | |
| self.validation_metadata = self.load_validation_metadata() | |
| def setup_logging(self): | |
| """Setup logging for the question processor.""" | |
| log_file = self.session_dir / "question_processor.log" | |
| self.logger = logging.getLogger("AsyncQuestionProcessor") | |
| self.logger.setLevel(logging.INFO) | |
| # File handler | |
| file_handler = logging.FileHandler(log_file) | |
| file_handler.setLevel(logging.INFO) | |
| # Formatter | |
| formatter = logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| file_handler.setFormatter(formatter) | |
| self.logger.addHandler(file_handler) | |
| def load_validation_metadata(self) -> Dict[str, Any]: | |
| """Load validation metadata for answer checking.""" | |
| metadata_file = Path("gaia_validation_metadata.jsonl") | |
| metadata = {} | |
| if not metadata_file.exists(): | |
| self.logger.warning(f"Validation metadata file not found: {metadata_file}") | |
| return metadata | |
| try: | |
| with open(metadata_file, 'r') as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| try: | |
| data = json.loads(line) | |
| task_id = data.get('task_id') | |
| if task_id: | |
| metadata[task_id] = data | |
| except json.JSONDecodeError: | |
| continue | |
| self.logger.info(f"Loaded validation metadata for {len(metadata)} questions") | |
| except Exception as e: | |
| self.logger.error(f"Failed to load validation metadata: {e}") | |
| return metadata | |
| async def classify_question(self, question: Dict) -> Dict: | |
| """Classify the question using the classification system.""" | |
| try: | |
| classification = await asyncio.to_thread( | |
| self.classifier.classify_question, question | |
| ) | |
| return classification | |
| except Exception as e: | |
| self.logger.error(f"Classification failed: {e}") | |
| return { | |
| "primary_agent": "general", | |
| "secondary_agent": None, | |
| "complexity": 3, | |
| "confidence": 0.0, | |
| "tools_needed": [], | |
| "error": str(e) | |
| } | |
| async def execute_question_solver(self, question_id: str) -> Dict: | |
| """ | |
| Execute the main question solver without hardcoded overrides. | |
| This is the clean version that provides honest accuracy measurement. | |
| """ | |
| start_time = time.time() | |
| # Create individual log file for this question | |
| individual_log = self.logs_dir / f"question_{question_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" | |
| try: | |
| # Build command for question solver | |
| cmd = [ | |
| sys.executable, | |
| "tests/test_specific_question.py", | |
| question_id, | |
| self.model | |
| ] | |
| self.logger.info(f"Executing solver for {question_id}: {' '.join(cmd)}") | |
| # Execute with timeout | |
| process = await asyncio.create_subprocess_exec( | |
| *cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.STDOUT, | |
| cwd=Path.cwd() | |
| ) | |
| try: | |
| stdout, _ = await asyncio.wait_for( | |
| process.communicate(), | |
| timeout=self.timeout_seconds | |
| ) | |
| # Write output to individual log | |
| with open(individual_log, 'w') as f: | |
| f.write(f"Command: {' '.join(cmd)}\n") | |
| f.write(f"Start time: {datetime.fromtimestamp(start_time).isoformat()}\n") | |
| f.write(f"Question ID: {question_id}\n") | |
| f.write("=" * 80 + "\n") | |
| f.write(stdout.decode('utf-8', errors='replace')) | |
| execution_time = time.time() - start_time | |
| # Parse the output for answer extraction | |
| output_text = stdout.decode('utf-8', errors='replace') | |
| answer = self.extract_answer_from_output(output_text) | |
| return { | |
| "status": "completed", | |
| "execution_time": execution_time, | |
| "return_code": process.returncode, | |
| "answer": answer, | |
| "log_file": str(individual_log), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except asyncio.TimeoutError: | |
| # Kill the process on timeout | |
| process.kill() | |
| await process.wait() | |
| execution_time = time.time() - start_time | |
| # Write timeout info to log | |
| with open(individual_log, 'w') as f: | |
| f.write(f"Command: {' '.join(cmd)}\n") | |
| f.write(f"Start time: {datetime.fromtimestamp(start_time).isoformat()}\n") | |
| f.write(f"Question ID: {question_id}\n") | |
| f.write(f"STATUS: TIMEOUT after {self.timeout_seconds} seconds\n") | |
| f.write("=" * 80 + "\n") | |
| return { | |
| "status": "timeout", | |
| "execution_time": execution_time, | |
| "timeout_seconds": self.timeout_seconds, | |
| "log_file": str(individual_log), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| # Write error info to log | |
| with open(individual_log, 'w') as f: | |
| f.write(f"Command: {' '.join(cmd)}\n") | |
| f.write(f"Start time: {datetime.fromtimestamp(start_time).isoformat()}\n") | |
| f.write(f"Question ID: {question_id}\n") | |
| f.write(f"STATUS: ERROR - {str(e)}\n") | |
| f.write("=" * 80 + "\n") | |
| f.write(traceback.format_exc()) | |
| return { | |
| "status": "error", | |
| "execution_time": execution_time, | |
| "error": str(e), | |
| "log_file": str(individual_log), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def extract_answer_from_output(self, output_text: str) -> Optional[str]: | |
| """Extract the final answer from solver output.""" | |
| # Look for common answer patterns | |
| patterns = [ | |
| "Final Answer:", | |
| "FINAL ANSWER:", | |
| "Answer:", | |
| "ANSWER:", | |
| ] | |
| lines = output_text.split('\n') | |
| # Search for answer patterns | |
| for i, line in enumerate(lines): | |
| line_stripped = line.strip() | |
| for pattern in patterns: | |
| if pattern in line_stripped: | |
| # Try to extract answer from same line | |
| answer_part = line_stripped.split(pattern, 1) | |
| if len(answer_part) > 1: | |
| answer = answer_part[1].strip() | |
| if answer: | |
| return answer | |
| # Try next line if current line doesn't have answer | |
| if i + 1 < len(lines): | |
| next_line = lines[i + 1].strip() | |
| if next_line: | |
| return next_line | |
| # Fallback: look for the last non-empty line that might be an answer | |
| for line in reversed(lines): | |
| line_stripped = line.strip() | |
| if line_stripped and not line_stripped.startswith(('=', '-', 'Time:', 'Duration:')): | |
| # Avoid log formatting lines | |
| if len(line_stripped) < 200: # Reasonable answer length | |
| return line_stripped | |
| return None | |
| def validate_answer(self, question_id: str, generated_answer: Optional[str]) -> Dict: | |
| """Validate the generated answer against expected answer.""" | |
| if question_id not in self.validation_metadata: | |
| return { | |
| "validation_status": "no_metadata", | |
| "message": "No validation metadata available" | |
| } | |
| metadata = self.validation_metadata[question_id] | |
| expected_answer = metadata.get('Final answer') | |
| if not generated_answer: | |
| return { | |
| "validation_status": "no_answer", | |
| "expected_answer": expected_answer, | |
| "message": "No answer generated" | |
| } | |
| # Simple string comparison (case-insensitive) | |
| generated_clean = str(generated_answer).strip().lower() | |
| expected_clean = str(expected_answer).strip().lower() | |
| if generated_clean == expected_clean: | |
| status = "correct" | |
| elif generated_clean in expected_clean or expected_clean in generated_clean: | |
| status = "partial" | |
| else: | |
| status = "incorrect" | |
| return { | |
| "validation_status": status, | |
| "generated_answer": generated_answer, | |
| "expected_answer": expected_answer, | |
| "match_details": { | |
| "exact_match": (generated_clean == expected_clean), | |
| "partial_match": (generated_clean in expected_clean or expected_clean in generated_clean) | |
| } | |
| } | |
| async def process_question(self, question: Dict) -> Dict: | |
| """ | |
| Process a single question through the complete pipeline. | |
| This is the clean version without hardcoded overrides for honest accuracy. | |
| """ | |
| question_id = question.get('task_id', 'unknown') | |
| start_time = time.time() | |
| self.logger.info(f"Processing question {question_id}") | |
| try: | |
| # Step 1: Classify the question | |
| classification = await self.classify_question(question) | |
| # Step 2: Execute the solver (clean version) | |
| solver_result = await self.execute_question_solver(question_id) | |
| # Step 3: Validate the answer | |
| validation = self.validate_answer( | |
| question_id, | |
| solver_result.get('answer') | |
| ) | |
| total_time = time.time() - start_time | |
| # Compile complete result | |
| result = { | |
| "question_id": question_id, | |
| "question_text": question.get('Question', '')[:200] + "..." if len(question.get('Question', '')) > 200 else question.get('Question', ''), | |
| "classification": classification, | |
| "solver_result": solver_result, | |
| "validation": validation, | |
| "total_processing_time": total_time, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| self.logger.info(f"Completed question {question_id} in {total_time:.2f}s - Status: {validation.get('validation_status', 'unknown')}") | |
| return result | |
| except Exception as e: | |
| total_time = time.time() - start_time | |
| self.logger.error(f"Failed to process question {question_id}: {e}") | |
| return { | |
| "question_id": question_id, | |
| "status": "error", | |
| "error": str(e), | |
| "total_processing_time": total_time, | |
| "timestamp": datetime.now().isoformat(), | |
| "traceback": traceback.format_exc() | |
| } |