Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Classification Analyzer | |
| Performance analysis by question classification to identify improvement areas. | |
| """ | |
| import json | |
| import logging | |
| from collections import defaultdict, Counter | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple, Any | |
| import statistics | |
| class ClassificationAnalyzer: | |
| """Analyzer for performance metrics by question classification.""" | |
| def __init__(self): | |
| """Initialize the classification analyzer.""" | |
| self.logger = logging.getLogger("ClassificationAnalyzer") | |
| async def analyze_by_classification(self, results: Dict[str, Dict], session_dir: Path) -> Dict: | |
| """ | |
| Analyze test results by question classification. | |
| Args: | |
| results: Test results keyed by question_id | |
| session_dir: Directory to save analysis results | |
| Returns: | |
| Classification analysis report | |
| """ | |
| self.logger.info("Starting classification-based analysis...") | |
| # Organize results by classification | |
| classification_data = self.organize_by_classification(results) | |
| # Calculate performance metrics | |
| performance_metrics = self.calculate_performance_metrics(classification_data) | |
| # Analyze tool effectiveness | |
| tool_effectiveness = self.analyze_tool_effectiveness(classification_data) | |
| # Identify improvement areas | |
| improvement_areas = self.identify_improvement_areas(performance_metrics, tool_effectiveness) | |
| # Create comprehensive report | |
| analysis_report = { | |
| "analysis_timestamp": datetime.now().isoformat(), | |
| "total_questions": len(results), | |
| "classification_breakdown": self.get_classification_breakdown(classification_data), | |
| "performance_metrics": performance_metrics, | |
| "tool_effectiveness": tool_effectiveness, | |
| "improvement_areas": improvement_areas, | |
| "detailed_data": classification_data | |
| } | |
| # Save analysis report | |
| report_file = session_dir / "classification_analysis.json" | |
| with open(report_file, 'w') as f: | |
| json.dump(analysis_report, f, indent=2) | |
| self.logger.info(f"Classification analysis saved to: {report_file}") | |
| return analysis_report | |
| def organize_by_classification(self, results: Dict[str, Dict]) -> Dict[str, List[Dict]]: | |
| """Organize results by question classification.""" | |
| classification_data = defaultdict(list) | |
| for question_id, result in results.items(): | |
| # Get classification info | |
| classification = result.get('classification', {}) | |
| primary_agent = classification.get('primary_agent', 'unknown') | |
| # Add to classification group | |
| classification_data[primary_agent].append({ | |
| 'question_id': question_id, | |
| 'result': result, | |
| 'classification': classification | |
| }) | |
| return dict(classification_data) | |
| def calculate_performance_metrics(self, classification_data: Dict[str, List[Dict]]) -> Dict[str, Dict]: | |
| """Calculate performance metrics for each classification.""" | |
| metrics = {} | |
| for classification, questions in classification_data.items(): | |
| # Accuracy metrics | |
| validation_statuses = [] | |
| execution_times = [] | |
| complexity_scores = [] | |
| confidence_scores = [] | |
| correct_count = 0 | |
| partial_count = 0 | |
| incorrect_count = 0 | |
| timeout_count = 0 | |
| error_count = 0 | |
| for question_data in questions: | |
| result = question_data['result'] | |
| classification_info = question_data['classification'] | |
| # Validation status | |
| validation = result.get('validation', {}) | |
| status = validation.get('validation_status', 'unknown') | |
| validation_statuses.append(status) | |
| if status == 'correct': | |
| correct_count += 1 | |
| elif status == 'partial': | |
| partial_count += 1 | |
| elif status == 'incorrect': | |
| incorrect_count += 1 | |
| # Execution metrics | |
| solver_result = result.get('solver_result', {}) | |
| if solver_result.get('status') == 'timeout': | |
| timeout_count += 1 | |
| elif solver_result.get('status') == 'error': | |
| error_count += 1 | |
| # Timing | |
| exec_time = result.get('total_processing_time', 0) | |
| if exec_time > 0: | |
| execution_times.append(exec_time) | |
| # Classification metrics | |
| complexity = classification_info.get('complexity', 0) | |
| if complexity > 0: | |
| complexity_scores.append(complexity) | |
| confidence = classification_info.get('confidence', 0) | |
| if confidence > 0: | |
| confidence_scores.append(confidence) | |
| total_questions = len(questions) | |
| # Calculate metrics | |
| accuracy = correct_count / total_questions if total_questions > 0 else 0 | |
| partial_rate = partial_count / total_questions if total_questions > 0 else 0 | |
| error_rate = (error_count + timeout_count) / total_questions if total_questions > 0 else 0 | |
| metrics[classification] = { | |
| "total_questions": total_questions, | |
| "accuracy": accuracy, | |
| "partial_accuracy": partial_rate, | |
| "error_rate": error_rate, | |
| "counts": { | |
| "correct": correct_count, | |
| "partial": partial_count, | |
| "incorrect": incorrect_count, | |
| "timeout": timeout_count, | |
| "error": error_count | |
| }, | |
| "execution_time": { | |
| "mean": statistics.mean(execution_times) if execution_times else 0, | |
| "median": statistics.median(execution_times) if execution_times else 0, | |
| "max": max(execution_times) if execution_times else 0, | |
| "min": min(execution_times) if execution_times else 0 | |
| }, | |
| "complexity": { | |
| "mean": statistics.mean(complexity_scores) if complexity_scores else 0, | |
| "distribution": Counter(complexity_scores) | |
| }, | |
| "classification_confidence": { | |
| "mean": statistics.mean(confidence_scores) if confidence_scores else 0, | |
| "min": min(confidence_scores) if confidence_scores else 0 | |
| } | |
| } | |
| return metrics | |
| def analyze_tool_effectiveness(self, classification_data: Dict[str, List[Dict]]) -> Dict[str, Dict]: | |
| """Analyze tool effectiveness across classifications.""" | |
| tool_usage = defaultdict(lambda: { | |
| 'total_uses': 0, | |
| 'successes': 0, | |
| 'by_classification': defaultdict(lambda: {'uses': 0, 'successes': 0}) | |
| }) | |
| for classification, questions in classification_data.items(): | |
| for question_data in questions: | |
| result = question_data['result'] | |
| classification_info = question_data['classification'] | |
| # Get tools needed | |
| tools_needed = classification_info.get('tools_needed', []) | |
| success = result.get('validation', {}).get('validation_status') == 'correct' | |
| for tool in tools_needed: | |
| tool_usage[tool]['total_uses'] += 1 | |
| tool_usage[tool]['by_classification'][classification]['uses'] += 1 | |
| if success: | |
| tool_usage[tool]['successes'] += 1 | |
| tool_usage[tool]['by_classification'][classification]['successes'] += 1 | |
| # Calculate effectiveness rates | |
| tool_effectiveness = {} | |
| for tool, usage_data in tool_usage.items(): | |
| total_uses = usage_data['total_uses'] | |
| successes = usage_data['successes'] | |
| effectiveness_rate = successes / total_uses if total_uses > 0 else 0 | |
| # Per-classification effectiveness | |
| classification_effectiveness = {} | |
| for classification, class_data in usage_data['by_classification'].items(): | |
| class_uses = class_data['uses'] | |
| class_successes = class_data['successes'] | |
| class_rate = class_successes / class_uses if class_uses > 0 else 0 | |
| classification_effectiveness[classification] = { | |
| 'uses': class_uses, | |
| 'successes': class_successes, | |
| 'effectiveness_rate': class_rate | |
| } | |
| tool_effectiveness[tool] = { | |
| 'total_uses': total_uses, | |
| 'total_successes': successes, | |
| 'overall_effectiveness': effectiveness_rate, | |
| 'by_classification': classification_effectiveness | |
| } | |
| return tool_effectiveness | |
| def identify_improvement_areas(self, performance_metrics: Dict, tool_effectiveness: Dict) -> Dict[str, List[str]]: | |
| """Identify specific improvement areas based on analysis.""" | |
| improvements = { | |
| "low_accuracy_classifications": [], | |
| "high_error_rate_classifications": [], | |
| "slow_processing_classifications": [], | |
| "ineffective_tools": [], | |
| "misclassified_questions": [], | |
| "recommendations": [] | |
| } | |
| # Identify low accuracy classifications | |
| for classification, metrics in performance_metrics.items(): | |
| accuracy = metrics['accuracy'] | |
| error_rate = metrics['error_rate'] | |
| avg_time = metrics['execution_time']['mean'] | |
| if accuracy < 0.5: # Less than 50% accuracy | |
| improvements["low_accuracy_classifications"].append({ | |
| "classification": classification, | |
| "accuracy": accuracy, | |
| "details": f"Only {accuracy:.1%} accuracy with {metrics['total_questions']} questions" | |
| }) | |
| if error_rate > 0.3: # More than 30% errors/timeouts | |
| improvements["high_error_rate_classifications"].append({ | |
| "classification": classification, | |
| "error_rate": error_rate, | |
| "details": f"{error_rate:.1%} error/timeout rate" | |
| }) | |
| if avg_time > 600: # More than 10 minutes average | |
| improvements["slow_processing_classifications"].append({ | |
| "classification": classification, | |
| "avg_time": avg_time, | |
| "details": f"Average {avg_time:.0f} seconds processing time" | |
| }) | |
| # Identify ineffective tools | |
| for tool, effectiveness in tool_effectiveness.items(): | |
| overall_rate = effectiveness['overall_effectiveness'] | |
| total_uses = effectiveness['total_uses'] | |
| if overall_rate < 0.4 and total_uses >= 3: # Less than 40% effectiveness with meaningful usage | |
| improvements["ineffective_tools"].append({ | |
| "tool": tool, | |
| "effectiveness": overall_rate, | |
| "uses": total_uses, | |
| "details": f"Only {overall_rate:.1%} success rate across {total_uses} uses" | |
| }) | |
| # Generate recommendations | |
| recommendations = [] | |
| if improvements["low_accuracy_classifications"]: | |
| worst_classification = min(improvements["low_accuracy_classifications"], | |
| key=lambda x: x['accuracy']) | |
| recommendations.append( | |
| f"PRIORITY: Improve {worst_classification['classification']} agent " | |
| f"(currently {worst_classification['accuracy']:.1%} accuracy)" | |
| ) | |
| if improvements["ineffective_tools"]: | |
| worst_tool = min(improvements["ineffective_tools"], | |
| key=lambda x: x['effectiveness']) | |
| recommendations.append( | |
| f"TOOL FIX: Revise {worst_tool['tool']} tool " | |
| f"(currently {worst_tool['effectiveness']:.1%} effectiveness)" | |
| ) | |
| if improvements["high_error_rate_classifications"]: | |
| recommendations.append( | |
| "STABILITY: Address timeout and error handling for classifications with high error rates" | |
| ) | |
| overall_accuracy = self.calculate_overall_accuracy(performance_metrics) | |
| if overall_accuracy < 0.7: | |
| recommendations.append( | |
| f"SYSTEM: Overall accuracy is {overall_accuracy:.1%} - target 70% for production readiness" | |
| ) | |
| improvements["recommendations"] = recommendations | |
| return improvements | |
| def calculate_overall_accuracy(self, performance_metrics: Dict) -> float: | |
| """Calculate overall system accuracy across all classifications.""" | |
| total_correct = 0 | |
| total_questions = 0 | |
| for metrics in performance_metrics.values(): | |
| total_correct += metrics['counts']['correct'] | |
| total_questions += metrics['total_questions'] | |
| return total_correct / total_questions if total_questions > 0 else 0 | |
| def get_classification_breakdown(self, classification_data: Dict[str, List[Dict]]) -> Dict[str, int]: | |
| """Get simple breakdown of question counts by classification.""" | |
| return { | |
| classification: len(questions) | |
| for classification, questions in classification_data.items() | |
| } |