#!/usr/bin/env python3 """ Compare performance metrics across different models. Reads tactic_counts_summary.json and generates a comparison report showing detection rates, coverage, accuracy, and effectiveness for each model. Usage: python compare_models.py [--input INPUT_PATH] [--output OUTPUT_PATH] """ import argparse import json from pathlib import Path from typing import Dict, List, Any from datetime import datetime import statistics class ModelComparator: """Compares performance metrics across different models""" def __init__(self, tactic_counts_file: Path): self.tactic_counts_file = tactic_counts_file self.tactic_data = [] self.load_tactic_counts() def load_tactic_counts(self): """Load tactic counts summary data""" if not self.tactic_counts_file.exists(): raise FileNotFoundError(f"Tactic counts file not found: {self.tactic_counts_file}") data = json.loads(self.tactic_counts_file.read_text(encoding='utf-8')) self.tactic_data = data.get('results', []) print(f"[INFO] Loaded {len(self.tactic_data)} tactic analysis results") def group_by_model(self) -> Dict[str, List[Dict]]: """Group tactic data by model""" models = {} for item in self.tactic_data: model = item['model'] if model not in models: models[model] = [] models[model].append(item) return models def calculate_model_metrics(self, model_data: List[Dict]) -> Dict[str, Any]: """Calculate comprehensive metrics for a single model""" if not model_data: return self._empty_metrics() # Aggregate by tactic for this model tactic_aggregates = {} for item in model_data: tactic = item['tactic'] if tactic not in tactic_aggregates: tactic_aggregates[tactic] = { 'total_files': 0, 'files_detected': 0, 'total_events': 0 } tactic_aggregates[tactic]['total_files'] += 1 tactic_aggregates[tactic]['files_detected'] += item['tactic_detected'] tactic_aggregates[tactic]['total_events'] += item['total_abnormal_events_detected'] # Calculate detection rate total_files = sum(agg['total_files'] for agg in tactic_aggregates.values()) total_detected = sum(agg['files_detected'] for agg in tactic_aggregates.values()) total_events = sum(agg['total_events'] for agg in tactic_aggregates.values()) detection_rate = (total_detected / total_files * 100) if total_files > 0 else 0.0 # Calculate coverage total_tactics = len(tactic_aggregates) tactics_with_detection = sum(1 for agg in tactic_aggregates.values() if agg['files_detected'] > 0) coverage_percent = (tactics_with_detection / total_tactics * 100) if total_tactics > 0 else 0.0 # Calculate accuracy accuracy_scores = [] for tactic, agg in tactic_aggregates.items(): if agg['total_files'] > 0: accuracy = agg['files_detected'] / agg['total_files'] accuracy_scores.append(accuracy) avg_accuracy = statistics.mean(accuracy_scores) if accuracy_scores else 0.0 # Calculate effectiveness effectiveness_score = ( detection_rate * 0.4 + coverage_percent * 0.3 + avg_accuracy * 100 * 0.3 ) # Grade the model if effectiveness_score >= 80: grade = 'EXCELLENT' elif effectiveness_score >= 60: grade = 'GOOD' elif effectiveness_score >= 40: grade = 'FAIR' elif effectiveness_score >= 20: grade = 'POOR' else: grade = 'CRITICAL' # Per-tactic breakdown per_tactic_detection = [] for tactic, agg in sorted(tactic_aggregates.items()): files = agg['total_files'] detected = agg['files_detected'] events = agg['total_events'] tactic_detection_rate = (detected / files * 100) if files > 0 else 0.0 per_tactic_detection.append({ 'tactic': tactic, 'total_files': files, 'files_detected': detected, 'files_missed': files - detected, 'total_abnormal_events_detected': events, 'detection_rate_percent': tactic_detection_rate, 'status': 'GOOD' if tactic_detection_rate >= 50 else ('POOR' if tactic_detection_rate > 0 else 'NONE') }) return { 'model_name': model_data[0]['model'] if model_data else 'unknown', 'total_files_analyzed': total_files, 'total_files_detected': total_detected, 'total_files_missed': total_files - total_detected, 'total_abnormal_events_detected': total_events, 'total_tactics_tested': total_tactics, 'detection_rate_percent': detection_rate, 'coverage_percent': coverage_percent, 'average_accuracy_score': avg_accuracy, 'effectiveness_score': effectiveness_score, 'grade': grade, 'per_tactic_detection': per_tactic_detection, 'tactics_with_detection': tactics_with_detection, 'tactics_with_zero_detection': total_tactics - tactics_with_detection } def _empty_metrics(self) -> Dict[str, Any]: """Return empty metrics structure""" return { 'model_name': 'unknown', 'total_files_analyzed': 0, 'total_files_detected': 0, 'total_files_missed': 0, 'total_abnormal_events_detected': 0, 'total_tactics_tested': 0, 'detection_rate_percent': 0.0, 'coverage_percent': 0.0, 'average_accuracy_score': 0.0, 'effectiveness_score': 0.0, 'grade': 'CRITICAL', 'per_tactic_detection': [], 'tactics_with_detection': 0, 'tactics_with_zero_detection': 0 } def generate_comparison(self) -> Dict[str, Any]: """Generate comprehensive model comparison report""" print("\n" + "="*80) print("GENERATING MODEL COMPARISON") print("="*80 + "\n") # Group data by model models_data = self.group_by_model() if not models_data: print("[WARNING] No model data found") return {'error': 'No model data found'} print(f"Found {len(models_data)} models: {', '.join(models_data.keys())}") # Calculate metrics for each model model_metrics = {} for model_name, model_data in models_data.items(): print(f"\nCalculating metrics for {model_name} ({len(model_data)} files)...") model_metrics[model_name] = self.calculate_model_metrics(model_data) # Generate comparison summary comparison_summary = self._generate_comparison_summary(model_metrics) # Generate ranking ranking = self._generate_ranking(model_metrics) # Generate detailed comparison detailed_comparison = self._generate_detailed_comparison(model_metrics) report = { 'timestamp': datetime.now().isoformat(), 'total_models_compared': len(model_metrics), 'models_analyzed': list(model_metrics.keys()), 'comparison_summary': comparison_summary, 'model_ranking': ranking, 'detailed_model_metrics': model_metrics, 'detailed_comparison': detailed_comparison } return report def _generate_comparison_summary(self, model_metrics: Dict[str, Dict]) -> Dict[str, Any]: """Generate high-level comparison summary""" if not model_metrics: return {} # Find best and worst performers best_detection = max(model_metrics.items(), key=lambda x: x[1]['detection_rate_percent']) worst_detection = min(model_metrics.items(), key=lambda x: x[1]['detection_rate_percent']) best_coverage = max(model_metrics.items(), key=lambda x: x[1]['coverage_percent']) worst_coverage = min(model_metrics.items(), key=lambda x: x[1]['coverage_percent']) best_effectiveness = max(model_metrics.items(), key=lambda x: x[1]['effectiveness_score']) worst_effectiveness = min(model_metrics.items(), key=lambda x: x[1]['effectiveness_score']) # Calculate averages avg_detection = statistics.mean([m['detection_rate_percent'] for m in model_metrics.values()]) avg_coverage = statistics.mean([m['coverage_percent'] for m in model_metrics.values()]) avg_effectiveness = statistics.mean([m['effectiveness_score'] for m in model_metrics.values()]) return { 'average_detection_rate_percent': avg_detection, 'average_coverage_percent': avg_coverage, 'average_effectiveness_score': avg_effectiveness, 'best_detection': { 'model': best_detection[0], 'score': best_detection[1]['detection_rate_percent'] }, 'worst_detection': { 'model': worst_detection[0], 'score': worst_detection[1]['detection_rate_percent'] }, 'best_coverage': { 'model': best_coverage[0], 'score': best_coverage[1]['coverage_percent'] }, 'worst_coverage': { 'model': worst_coverage[0], 'score': worst_coverage[1]['coverage_percent'] }, 'best_overall': { 'model': best_effectiveness[0], 'score': best_effectiveness[1]['effectiveness_score'], 'grade': best_effectiveness[1]['grade'] }, 'worst_overall': { 'model': worst_effectiveness[0], 'score': worst_effectiveness[1]['effectiveness_score'], 'grade': worst_effectiveness[1]['grade'] } } def _generate_ranking(self, model_metrics: Dict[str, Dict]) -> List[Dict[str, Any]]: """Generate ranked list of models by effectiveness""" ranked_models = sorted( model_metrics.items(), key=lambda x: x[1]['effectiveness_score'], reverse=True ) ranking = [] for rank, (model_name, metrics) in enumerate(ranked_models, 1): ranking.append({ 'rank': rank, 'model_name': model_name, 'effectiveness_score': metrics['effectiveness_score'], 'grade': metrics['grade'], 'detection_rate_percent': metrics['detection_rate_percent'], 'coverage_percent': metrics['coverage_percent'], 'average_accuracy_score': metrics['average_accuracy_score'], 'total_files_analyzed': metrics['total_files_analyzed'] }) return ranking def _generate_detailed_comparison(self, model_metrics: Dict[str, Dict]) -> Dict[str, Any]: """Generate detailed side-by-side comparison""" if not model_metrics: return {} # Get all tactics across all models all_tactics = set() for metrics in model_metrics.values(): for tactic_data in metrics['per_tactic_detection']: all_tactics.add(tactic_data['tactic']) all_tactics = sorted(list(all_tactics)) # Create tactic-by-tactic comparison tactic_comparison = {} for tactic in all_tactics: tactic_comparison[tactic] = {} for model_name, metrics in model_metrics.items(): # Find this tactic in the model's data tactic_data = next( (t for t in metrics['per_tactic_detection'] if t['tactic'] == tactic), None ) if tactic_data: tactic_comparison[tactic][model_name] = { 'detection_rate_percent': tactic_data['detection_rate_percent'], 'files_detected': tactic_data['files_detected'], 'total_files': tactic_data['total_files'], 'status': tactic_data['status'] } else: tactic_comparison[tactic][model_name] = { 'detection_rate_percent': 0.0, 'files_detected': 0, 'total_files': 0, 'status': 'NOT_TESTED' } return { 'tactic_by_tactic_comparison': tactic_comparison, 'all_tactics_tested': all_tactics } def main(): parser = argparse.ArgumentParser( description="Compare performance metrics across different models" ) parser.add_argument( "--input", default="full_pipeline_evaluation/results/tactic_counts_summary.json", help="Path to tactic_counts_summary.json" ) parser.add_argument( "--output", default="full_pipeline_evaluation/results/model_comparison.json", help="Output file for model comparison report" ) args = parser.parse_args() input_path = Path(args.input) output_path = Path(args.output) if not input_path.exists(): print(f"[ERROR] Input file not found: {input_path}") print("Run count_tactics.py first to generate tactic counts") return 1 # Run comparison comparator = ModelComparator(input_path) report = comparator.generate_comparison() # Save report output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(report, indent=2), encoding='utf-8') # Display summary print("\n" + "="*80) print("MODEL COMPARISON COMPLETE") print("="*80) if 'error' in report: print(f"Error: {report['error']}") return 1 print(f"Models compared: {report['total_models_compared']}") print(f"Models: {', '.join(report['models_analyzed'])}") if report['model_ranking']: print(f"\nTop performer: {report['model_ranking'][0]['model_name']} " f"(Score: {report['model_ranking'][0]['effectiveness_score']:.1f}, " f"Grade: {report['model_ranking'][0]['grade']})") summary = report['comparison_summary'] if summary: print(f"\nAverage effectiveness: {summary['average_effectiveness_score']:.1f}") print(f"Best detection: {summary['best_detection']['model']} ({summary['best_detection']['score']:.1f}%)") print(f"Best coverage: {summary['best_coverage']['model']} ({summary['best_coverage']['score']:.1f}%)") print(f"\nReport saved to: {output_path}") print("="*80 + "\n") return 0 if __name__ == "__main__": exit(main())