Spaces:
Sleeping
Sleeping
File size: 8,437 Bytes
223ef32 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
#!/usr/bin/env python3
"""
Count tactic occurrences in response analysis JSON files.
Reads all *_response_analysis.json files from final_response/ directory
and counts how many times each tactic appears in the analysis.
Usage:
python count_tactics.py [--output OUTPUT_PATH]
"""
import argparse
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, Any
def find_project_root(start: Path) -> Path:
"""Find the project root by looking for common markers."""
for p in [start] + list(start.parents):
if (p / 'final_response').exists() or (p / 'src').exists() or (p / '.git').exists():
return p
return start.parent
# Define the 8 allowed tactics that match Mordor dataset folder names
ALLOWED_TACTICS = {
"collection", "credential_access", "defense_evasion", "discovery",
"execution", "lateral_movement", "persistance"
}
def detect_tactic_in_json(path: Path, target_tactic: str) -> int:
"""
Detect if a tactic exists in JSON file (binary detection).
Now simplified since tactics are standardized as lists with only the 8 allowed values.
Returns 1 if tactic found at least once, 0 if not found.
"""
def find_tactic_in_lists(obj):
"""Recursively search for tactic lists and check if target is present"""
if isinstance(obj, dict):
for k, v in obj.items():
if k == "tactic" and isinstance(v, list):
# Check if target tactic is in the list
if target_tactic in v:
return True
# Recurse into nested objects
if find_tactic_in_lists(v):
return True
elif isinstance(obj, list):
for item in obj:
if find_tactic_in_lists(item):
return True
return False
try:
data = json.loads(path.read_text(encoding="utf-8"))
return 1 if find_tactic_in_lists(data) else 0
except Exception as e:
print(f"[WARNING] Error reading {path}: {e}")
return 0
def extract_total_events_analyzed(path: Path) -> int:
"""Extract total_events_analyzed from JSON file."""
try:
data = json.loads(path.read_text(encoding="utf-8"))
# Check various possible locations
if isinstance(data, dict):
# Top level
if "total_events_analyzed" in data:
return data["total_events_analyzed"]
# correlation_analysis level
if "correlation_analysis" in data and isinstance(data["correlation_analysis"], dict):
if "total_events_analyzed" in data["correlation_analysis"]:
return data["correlation_analysis"]["total_events_analyzed"]
# metadata level
if "metadata" in data and isinstance(data["metadata"], dict):
if "total_events_analyzed" in data["metadata"]:
return data["metadata"]["total_events_analyzed"]
if "total_abnormal_events" in data["metadata"]:
return data["metadata"]["total_abnormal_events"]
return 0
except Exception:
return 0
def find_response_analysis_files(base_path: Path) -> list:
"""Find all response analysis JSON files in model/tactic folder structure."""
results = []
# Iterate through model folders (first level)
for model_folder in sorted(base_path.iterdir()):
if not model_folder.is_dir():
continue
model_name = model_folder.name
# Iterate through tactic folders (second level)
for tactic_folder in sorted(model_folder.iterdir()):
if not tactic_folder.is_dir():
continue
tactic_label = tactic_folder.name
# Iterate through timestamped folders (third level)
for timestamp_folder in sorted(tactic_folder.iterdir()):
if not timestamp_folder.is_dir():
continue
# Find response analysis JSON files
json_files = list(timestamp_folder.glob('*_response_analysis.json'))
for json_file in json_files:
results.append({
'json_path': json_file,
'tactic_label': tactic_label,
'model_name': model_name
})
return results
def main():
parser = argparse.ArgumentParser(
description="Count tactic occurrences in response analysis files"
)
parser.add_argument(
"--output",
default="full_pipeline_evaluation/results/tactic_counts_summary.json",
help="Output file for summary results"
)
args = parser.parse_args()
# Find project root and final_response directory
current_file = Path(__file__).resolve()
project_root = find_project_root(current_file.parent)
final_response_dir = project_root / "final_response"
if not final_response_dir.exists():
print(f"[ERROR] final_response directory not found at: {final_response_dir}")
print("Run execute_pipeline.py first to generate analysis results")
return 1
print("="*80)
print("COUNTING TACTIC OCCURRENCES")
print("="*80)
print(f"Scanning: {final_response_dir}")
print(f"Allowed tactics: {', '.join(sorted(ALLOWED_TACTICS))}")
print()
# Find all response analysis files
file_info_list = find_response_analysis_files(final_response_dir)
if not file_info_list:
print("[ERROR] No response analysis JSON files found")
print("Expected structure: final_response/model_name/tactic_name/timestamp/*_response_analysis.json")
return 1
print(f"Found {len(file_info_list)} response analysis files\n")
# Process each file
results = []
for file_info in file_info_list:
json_path = file_info['json_path']
tactic_label = file_info['tactic_label']
model_name = file_info['model_name']
# Since tactics are now standardized, we can directly use the folder name
# The folder name should match one of the 8 allowed tactics
target_tactic = tactic_label
# Validate that the tactic is in our allowed list
if target_tactic not in ALLOWED_TACTICS:
print(f"[WARNING] Unknown tactic '{target_tactic}' in folder name, skipping...")
continue
# Binary detection: 1 if detected, 0 if not
tactic_detected = detect_tactic_in_json(json_path, target_tactic)
total_events = extract_total_events_analyzed(json_path)
results.append({
"file": str(json_path.relative_to(final_response_dir)),
"model": model_name,
"tactic": target_tactic,
"tactic_detected": tactic_detected,
"total_abnormal_events_detected": total_events
})
status = "DETECTED" if tactic_detected == 1 else "NOT DETECTED"
print(f" {model_name}/{tactic_label}/{json_path.parent.name}/{json_path.name}")
print(f" Status: {status}, Events analyzed: {total_events}")
# Create output summary
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
summary = {
"timestamp": datetime.now().isoformat(),
"total_files_processed": len(results),
"results": results
}
output_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")
# Calculate summary statistics
total_detected = sum(1 for r in results if r['tactic_detected'] == 1)
total_files = len(results)
detection_rate = (total_detected / total_files * 100) if total_files > 0 else 0
print("\n" + "="*80)
print("TACTIC COUNTING COMPLETE")
print("="*80)
print(f"Processed: {total_files} files")
print(f"Tactics detected: {total_detected}/{total_files} ({detection_rate:.1f}%)")
print(f"Output: {output_path}")
print("="*80 + "\n")
return 0
if __name__ == "__main__":
exit(main()) |