Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Comprehensive Async Batch Logging System for GAIA Questions | |
| Provides detailed per-question logs, batch summary, and classification analysis | |
| """ | |
| import os | |
| import json | |
| import asyncio | |
| import logging | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any | |
| from collections import defaultdict | |
| from dataclasses import dataclass, asdict | |
| class QuestionResult: | |
| """Data class for storing question processing results""" | |
| task_id: str | |
| question_text: str | |
| classification: str | |
| complexity: int | |
| confidence: float | |
| expected_answer: str | |
| our_answer: str | |
| status: str # CORRECT, INCORRECT, PARTIAL, ERROR | |
| accuracy_score: float | |
| total_duration: float | |
| classification_time: float | |
| solving_time: float | |
| validation_time: float | |
| error_type: Optional[str] = None | |
| error_details: Optional[str] = None | |
| tools_used: List[str] = None | |
| anti_hallucination_applied: bool = False | |
| override_reason: Optional[str] = None | |
| def __post_init__(self): | |
| if self.tools_used is None: | |
| self.tools_used = [] | |
| class AsyncBatchLogger: | |
| """Comprehensive logging system for async batch processing""" | |
| def __init__(self, base_log_dir: str = "logs"): | |
| self.base_log_dir = Path(base_log_dir) | |
| self.base_log_dir.mkdir(exist_ok=True) | |
| # Initialize timestamps | |
| self.batch_start_time = datetime.now() | |
| self.timestamp = self.batch_start_time.strftime("%Y%m%d_%H%M%S") | |
| # Create log files | |
| self.summary_log_path = self.base_log_dir / f"async_batch_summary_{self.timestamp}.log" | |
| self.batch_analysis_path = self.base_log_dir / f"async_batch_analysis_{self.timestamp}.json" | |
| # Initialize data structures | |
| self.question_results: Dict[str, QuestionResult] = {} | |
| self.classification_results = defaultdict(list) | |
| self.batch_metrics = { | |
| "total_questions": 0, | |
| "completed_questions": 0, | |
| "correct_answers": 0, | |
| "accuracy_rate": 0.0, | |
| "total_duration": 0.0, | |
| "start_time": self.batch_start_time.isoformat(), | |
| "end_time": None | |
| } | |
| # Initialize summary logger | |
| self.summary_logger = self._setup_summary_logger() | |
| # Active question loggers for concurrent access | |
| self.question_loggers: Dict[str, logging.Logger] = {} | |
| def _setup_summary_logger(self) -> logging.Logger: | |
| """Set up the batch summary logger""" | |
| logger = logging.getLogger(f"batch_summary_{self.timestamp}") | |
| logger.setLevel(logging.INFO) | |
| # Create file handler | |
| handler = logging.FileHandler(self.summary_log_path) | |
| formatter = logging.Formatter('[%(asctime)s] %(message)s', datefmt='%H:%M:%S') | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| # Also log to console | |
| console_handler = logging.StreamHandler() | |
| console_handler.setFormatter(formatter) | |
| logger.addHandler(console_handler) | |
| return logger | |
| def _setup_question_logger(self, task_id: str) -> logging.Logger: | |
| """Set up detailed logger for a specific question""" | |
| question_log_path = self.base_log_dir / f"async_batch_question_{task_id}_{self.timestamp}.log" | |
| logger = logging.getLogger(f"question_{task_id}_{self.timestamp}") | |
| logger.setLevel(logging.INFO) | |
| # Create file handler | |
| handler = logging.FileHandler(question_log_path) | |
| formatter = logging.Formatter('%(message)s') | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| return logger | |
| async def log_batch_start(self, total_questions: int, concurrency: int): | |
| """Log the start of batch processing""" | |
| self.batch_metrics["total_questions"] = total_questions | |
| self.summary_logger.info(f"BATCH_START | Total: {total_questions} questions | Concurrency: {concurrency}") | |
| self.summary_logger.info(f"Timestamp: {self.batch_start_time.isoformat()}") | |
| self.summary_logger.info(f"Log Directory: {self.base_log_dir}") | |
| self.summary_logger.info("-" * 80) | |
| async def log_question_start(self, task_id: str, question_data: Dict): | |
| """Log the start of processing a specific question""" | |
| # Set up question-specific logger | |
| question_logger = self._setup_question_logger(task_id) | |
| self.question_loggers[task_id] = question_logger | |
| # Log detailed question start | |
| question_logger.info("=" * 80) | |
| question_logger.info("ASYNC BATCH QUESTION PROCESSING") | |
| question_logger.info("=" * 80) | |
| question_logger.info(f"Question ID: {task_id}") | |
| question_logger.info(f"Start Time: {datetime.now().isoformat()}") | |
| question_logger.info(f"Question Text: {question_data.get('question', 'N/A')}") | |
| question_logger.info(f"Level: {question_data.get('Level', 'Unknown')}") | |
| question_logger.info(f"Has File: {'Yes' if question_data.get('file_name') else 'No'}") | |
| if question_data.get('file_name'): | |
| question_logger.info(f"File: {question_data.get('file_name')}") | |
| question_logger.info("") | |
| async def log_classification(self, task_id: str, classification: Dict): | |
| """Log question classification details""" | |
| if task_id not in self.question_loggers: | |
| return | |
| logger = self.question_loggers[task_id] | |
| logger.info("--- CLASSIFICATION PHASE ---") | |
| logger.info(f"Primary Agent: {classification.get('primary_agent', 'unknown')}") | |
| logger.info(f"Secondary Agents: {', '.join(classification.get('secondary_agents', []))}") | |
| logger.info(f"Complexity: {classification.get('complexity', 0)}/5") | |
| logger.info(f"Confidence: {classification.get('confidence', 0.0):.3f}") | |
| logger.info(f"Tools Needed: {', '.join(classification.get('tools_needed', []))}") | |
| logger.info(f"Reasoning: {classification.get('reasoning', 'N/A')}") | |
| logger.info("") | |
| async def log_solving_start(self, task_id: str, routing_plan: Dict): | |
| """Log the start of the solving phase""" | |
| if task_id not in self.question_loggers: | |
| return | |
| logger = self.question_loggers[task_id] | |
| logger.info("--- SOLVING PHASE ---") | |
| logger.info(f"Route to: {routing_plan.get('primary_route', 'unknown')} agent") | |
| logger.info(f"Coordination: {'Yes' if routing_plan.get('requires_coordination') else 'No'}") | |
| logger.info(f"Estimated Duration: {routing_plan.get('estimated_duration', 'unknown')}") | |
| logger.info("") | |
| logger.info("Tool Executions:") | |
| async def log_tool_execution(self, task_id: str, tool_name: str, duration: float, result_summary: str): | |
| """Log individual tool execution""" | |
| if task_id not in self.question_loggers: | |
| return | |
| logger = self.question_loggers[task_id] | |
| logger.info(f" - {tool_name}: {duration:.1f}s โ {result_summary[:100]}...") | |
| async def log_answer_processing(self, task_id: str, raw_response: str, processed_answer: str, | |
| anti_hallucination_applied: bool = False, override_reason: str = None): | |
| """Log answer processing and anti-hallucination details""" | |
| if task_id not in self.question_loggers: | |
| return | |
| logger = self.question_loggers[task_id] | |
| logger.info("") | |
| logger.info("Agent Response (first 500 chars):") | |
| logger.info(raw_response[:500] + ("..." if len(raw_response) > 500 else "")) | |
| logger.info("") | |
| logger.info(f"Processed Answer: {processed_answer}") | |
| if anti_hallucination_applied: | |
| logger.info(f"๐จ ANTI-HALLUCINATION OVERRIDE APPLIED") | |
| logger.info(f"Reason: {override_reason}") | |
| logger.info("") | |
| async def log_question_complete(self, task_id: str, result: QuestionResult): | |
| """Log the completion of a question with full results""" | |
| if task_id not in self.question_loggers: | |
| return | |
| logger = self.question_loggers[task_id] | |
| # Store result | |
| self.question_results[task_id] = result | |
| self.classification_results[result.classification].append(result) | |
| # Update batch metrics | |
| self.batch_metrics["completed_questions"] += 1 | |
| if result.status == "CORRECT": | |
| self.batch_metrics["correct_answers"] += 1 | |
| # Log validation phase | |
| logger.info("--- VALIDATION PHASE ---") | |
| logger.info(f"Expected Answer: {result.expected_answer}") | |
| logger.info(f"Our Answer: {result.our_answer}") | |
| logger.info(f"Status: {result.status}") | |
| logger.info(f"Accuracy Score: {result.accuracy_score:.1%}") | |
| logger.info("") | |
| # Log performance metrics | |
| logger.info("--- PERFORMANCE METRICS ---") | |
| logger.info(f"Total Duration: {result.total_duration:.1f}s") | |
| logger.info(f"Classification Time: {result.classification_time:.1f}s") | |
| logger.info(f"Solving Time: {result.solving_time:.1f}s") | |
| logger.info(f"Validation Time: {result.validation_time:.1f}s") | |
| if result.error_type: | |
| logger.info(f"Error Type: {result.error_type}") | |
| logger.info(f"Error Details: {result.error_details}") | |
| logger.info("") | |
| logger.info("=" * 80) | |
| logger.info("END QUESTION LOG") | |
| logger.info("=" * 80) | |
| # Log to summary | |
| status_emoji = "โ " if result.status == "CORRECT" else "๐ก" if result.status == "PARTIAL" else "โ" | |
| override_info = f" | {result.override_reason}" if result.anti_hallucination_applied else "" | |
| self.summary_logger.info( | |
| f"{status_emoji} {task_id[:8]}... | {result.classification} | {result.status} | " | |
| f"{result.accuracy_score:.0%} | {result.total_duration:.1f}s{override_info}" | |
| ) | |
| async def log_batch_progress(self): | |
| """Log current batch progress with ETA""" | |
| completed = self.batch_metrics["completed_questions"] | |
| total = self.batch_metrics["total_questions"] | |
| if completed == 0: | |
| return | |
| # Calculate accuracy | |
| accuracy = (self.batch_metrics["correct_answers"] / completed) * 100 | |
| # Calculate ETA | |
| elapsed_time = (datetime.now() - self.batch_start_time).total_seconds() | |
| avg_time_per_question = elapsed_time / completed | |
| remaining_questions = total - completed | |
| eta_seconds = remaining_questions * avg_time_per_question | |
| eta_minutes = int(eta_seconds // 60) | |
| eta_seconds = int(eta_seconds % 60) | |
| self.summary_logger.info( | |
| f"๐ PROGRESS | {completed}/{total} completed | {accuracy:.1f}% accuracy | " | |
| f"ETA: {eta_minutes}m {eta_seconds}s" | |
| ) | |
| async def log_batch_complete(self): | |
| """Log batch completion with final summary""" | |
| end_time = datetime.now() | |
| total_duration = (end_time - self.batch_start_time).total_seconds() | |
| # Update batch metrics | |
| self.batch_metrics["end_time"] = end_time.isoformat() | |
| self.batch_metrics["total_duration"] = total_duration | |
| completed = self.batch_metrics["completed_questions"] | |
| total = self.batch_metrics["total_questions"] | |
| accuracy = (self.batch_metrics["correct_answers"] / completed * 100) if completed > 0 else 0 | |
| self.batch_metrics["accuracy_rate"] = accuracy / 100 | |
| self.summary_logger.info("-" * 80) | |
| self.summary_logger.info( | |
| f"๐ BATCH_COMPLETE | {completed}/{total} | {accuracy:.1f}% accuracy | " | |
| f"Total: {int(total_duration//60)}m {int(total_duration%60)}s" | |
| ) | |
| # Generate classification analysis | |
| await self.generate_classification_analysis() | |
| # Export final results | |
| await self.export_results() | |
| self.summary_logger.info(f"๐ Analysis exported: {self.batch_analysis_path}") | |
| self.summary_logger.info(f"๐ Summary log: {self.summary_log_path}") | |
| async def generate_classification_analysis(self): | |
| """Generate detailed analysis by classification""" | |
| analysis = { | |
| "batch_metadata": self.batch_metrics, | |
| "classification_breakdown": {}, | |
| "overall_recommendations": [] | |
| } | |
| for classification, results in self.classification_results.items(): | |
| if not results: | |
| continue | |
| # Calculate metrics | |
| total = len(results) | |
| correct = len([r for r in results if r.status == "CORRECT"]) | |
| partial = len([r for r in results if r.status == "PARTIAL"]) | |
| errors = len([r for r in results if r.status == "ERROR"]) | |
| accuracy_rate = correct / total if total > 0 else 0 | |
| avg_duration = sum(r.total_duration for r in results) / total if total > 0 else 0 | |
| # Error analysis | |
| error_types = defaultdict(int) | |
| failed_questions = [] | |
| for result in results: | |
| if result.status in ["INCORRECT", "ERROR"]: | |
| error_types[result.error_type or "unknown"] += 1 | |
| failed_questions.append({ | |
| "task_id": result.task_id, | |
| "error_type": result.error_type, | |
| "error_details": result.error_details | |
| }) | |
| # Generate recommendations | |
| recommendations = self._generate_recommendations(classification, results, error_types) | |
| classification_analysis = { | |
| "classification": classification, | |
| "total_questions": total, | |
| "accuracy_rate": accuracy_rate, | |
| "successful": correct, | |
| "partial": partial, | |
| "failed": total - correct - partial, | |
| "errors": errors, | |
| "performance_metrics": { | |
| "avg_duration": avg_duration, | |
| "min_duration": min(r.total_duration for r in results) if results else 0, | |
| "max_duration": max(r.total_duration for r in results) if results else 0 | |
| }, | |
| "error_breakdown": dict(error_types), | |
| "failed_questions": failed_questions, | |
| "improvement_recommendations": recommendations | |
| } | |
| analysis["classification_breakdown"][classification] = classification_analysis | |
| # Generate overall recommendations | |
| analysis["overall_recommendations"] = self._generate_overall_recommendations() | |
| # Save classification analysis | |
| with open(self.batch_analysis_path, 'w') as f: | |
| json.dump(analysis, f, indent=2, ensure_ascii=False) | |
| def _generate_recommendations(self, classification: str, results: List[QuestionResult], | |
| error_types: Dict[str, int]) -> List[str]: | |
| """Generate specific recommendations for a classification""" | |
| recommendations = [] | |
| accuracy_rate = len([r for r in results if r.status == "CORRECT"]) / len(results) | |
| if accuracy_rate < 0.8: | |
| recommendations.append(f"๐ง Low accuracy ({accuracy_rate:.1%}) - needs immediate attention") | |
| # Classification-specific recommendations | |
| if classification == "multimedia": | |
| if "timeout" in error_types: | |
| recommendations.append("โฑ๏ธ Optimize video processing timeout limits") | |
| if "audio_processing" in error_types: | |
| recommendations.append("๐ต Enhance audio transcription accuracy") | |
| if accuracy_rate > 0.9: | |
| recommendations.append("โ Excellent multimedia processing - ready for production") | |
| elif classification == "research": | |
| if "hallucination" in error_types: | |
| recommendations.append("๐จ Strengthen anti-hallucination safeguards") | |
| if "wikipedia" in error_types: | |
| recommendations.append("๐ Improve Wikipedia tool integration") | |
| if accuracy_rate > 0.9: | |
| recommendations.append("โ Excellent research capabilities - ready for production") | |
| elif classification == "logic_math": | |
| if "chess" in error_types: | |
| recommendations.append("โ๏ธ Enhance chess analysis algorithms") | |
| if "calculation" in error_types: | |
| recommendations.append("๐งฎ Improve mathematical calculation accuracy") | |
| if accuracy_rate > 0.9: | |
| recommendations.append("โ Excellent logic/math processing - ready for production") | |
| elif classification == "file_processing": | |
| if "python_execution" in error_types: | |
| recommendations.append("๐ Optimize Python code execution environment") | |
| if "excel_processing" in error_types: | |
| recommendations.append("๐ Enhance Excel file processing capabilities") | |
| if accuracy_rate > 0.9: | |
| recommendations.append("โ Excellent file processing - ready for production") | |
| # Performance recommendations | |
| avg_duration = sum(r.total_duration for r in results) / len(results) | |
| if avg_duration > 60: | |
| recommendations.append(f"โก Optimize performance - avg duration {avg_duration:.1f}s") | |
| return recommendations | |
| def _generate_overall_recommendations(self) -> List[str]: | |
| """Generate overall system recommendations""" | |
| recommendations = [] | |
| total_accuracy = self.batch_metrics["accuracy_rate"] | |
| if total_accuracy >= 0.95: | |
| recommendations.append("๐ EXCELLENT: 95%+ accuracy achieved - production ready!") | |
| elif total_accuracy >= 0.90: | |
| recommendations.append("โ GREAT: 90%+ accuracy - minor optimizations needed") | |
| elif total_accuracy >= 0.80: | |
| recommendations.append("๐ง GOOD: 80%+ accuracy - moderate improvements needed") | |
| elif total_accuracy >= 0.70: | |
| recommendations.append("โ ๏ธ ACCEPTABLE: 70%+ accuracy - significant improvements needed") | |
| else: | |
| recommendations.append("๐จ CRITICAL: <70% accuracy - major system overhaul required") | |
| # Add specific system recommendations | |
| recommendations.extend([ | |
| "๐ Monitor performance metrics for production deployment", | |
| "๐ Implement continuous improvement based on classification analysis", | |
| "๐ Track accuracy trends over time", | |
| "๐ ๏ธ Focus improvement efforts on lowest-performing classifications" | |
| ]) | |
| return recommendations | |
| async def export_results(self): | |
| """Export comprehensive results for analysis""" | |
| # Export individual question results | |
| results_data = { | |
| "batch_metadata": self.batch_metrics, | |
| "question_results": [asdict(result) for result in self.question_results.values()], | |
| "classification_summary": { | |
| classification: { | |
| "count": len(results), | |
| "accuracy": len([r for r in results if r.status == "CORRECT"]) / len(results) | |
| } | |
| for classification, results in self.classification_results.items() | |
| } | |
| } | |
| results_file = self.base_log_dir / f"async_batch_results_{self.timestamp}.json" | |
| with open(results_file, 'w') as f: | |
| json.dump(results_data, f, indent=2, ensure_ascii=False) | |
| self.summary_logger.info(f"๐ Detailed results: {results_file}") |