minhan6559's picture
Upload 126 files
223ef32 verified
raw
history blame
8.44 kB
#!/usr/bin/env python3
"""
Count tactic occurrences in response analysis JSON files.
Reads all *_response_analysis.json files from final_response/ directory
and counts how many times each tactic appears in the analysis.
Usage:
python count_tactics.py [--output OUTPUT_PATH]
"""
import argparse
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, Any
def find_project_root(start: Path) -> Path:
"""Find the project root by looking for common markers."""
for p in [start] + list(start.parents):
if (p / 'final_response').exists() or (p / 'src').exists() or (p / '.git').exists():
return p
return start.parent
# Define the 8 allowed tactics that match Mordor dataset folder names
ALLOWED_TACTICS = {
"collection", "credential_access", "defense_evasion", "discovery",
"execution", "lateral_movement", "persistance"
}
def detect_tactic_in_json(path: Path, target_tactic: str) -> int:
"""
Detect if a tactic exists in JSON file (binary detection).
Now simplified since tactics are standardized as lists with only the 8 allowed values.
Returns 1 if tactic found at least once, 0 if not found.
"""
def find_tactic_in_lists(obj):
"""Recursively search for tactic lists and check if target is present"""
if isinstance(obj, dict):
for k, v in obj.items():
if k == "tactic" and isinstance(v, list):
# Check if target tactic is in the list
if target_tactic in v:
return True
# Recurse into nested objects
if find_tactic_in_lists(v):
return True
elif isinstance(obj, list):
for item in obj:
if find_tactic_in_lists(item):
return True
return False
try:
data = json.loads(path.read_text(encoding="utf-8"))
return 1 if find_tactic_in_lists(data) else 0
except Exception as e:
print(f"[WARNING] Error reading {path}: {e}")
return 0
def extract_total_events_analyzed(path: Path) -> int:
"""Extract total_events_analyzed from JSON file."""
try:
data = json.loads(path.read_text(encoding="utf-8"))
# Check various possible locations
if isinstance(data, dict):
# Top level
if "total_events_analyzed" in data:
return data["total_events_analyzed"]
# correlation_analysis level
if "correlation_analysis" in data and isinstance(data["correlation_analysis"], dict):
if "total_events_analyzed" in data["correlation_analysis"]:
return data["correlation_analysis"]["total_events_analyzed"]
# metadata level
if "metadata" in data and isinstance(data["metadata"], dict):
if "total_events_analyzed" in data["metadata"]:
return data["metadata"]["total_events_analyzed"]
if "total_abnormal_events" in data["metadata"]:
return data["metadata"]["total_abnormal_events"]
return 0
except Exception:
return 0
def find_response_analysis_files(base_path: Path) -> list:
"""Find all response analysis JSON files in model/tactic folder structure."""
results = []
# Iterate through model folders (first level)
for model_folder in sorted(base_path.iterdir()):
if not model_folder.is_dir():
continue
model_name = model_folder.name
# Iterate through tactic folders (second level)
for tactic_folder in sorted(model_folder.iterdir()):
if not tactic_folder.is_dir():
continue
tactic_label = tactic_folder.name
# Iterate through timestamped folders (third level)
for timestamp_folder in sorted(tactic_folder.iterdir()):
if not timestamp_folder.is_dir():
continue
# Find response analysis JSON files
json_files = list(timestamp_folder.glob('*_response_analysis.json'))
for json_file in json_files:
results.append({
'json_path': json_file,
'tactic_label': tactic_label,
'model_name': model_name
})
return results
def main():
parser = argparse.ArgumentParser(
description="Count tactic occurrences in response analysis files"
)
parser.add_argument(
"--output",
default="full_pipeline_evaluation/results/tactic_counts_summary.json",
help="Output file for summary results"
)
args = parser.parse_args()
# Find project root and final_response directory
current_file = Path(__file__).resolve()
project_root = find_project_root(current_file.parent)
final_response_dir = project_root / "final_response"
if not final_response_dir.exists():
print(f"[ERROR] final_response directory not found at: {final_response_dir}")
print("Run execute_pipeline.py first to generate analysis results")
return 1
print("="*80)
print("COUNTING TACTIC OCCURRENCES")
print("="*80)
print(f"Scanning: {final_response_dir}")
print(f"Allowed tactics: {', '.join(sorted(ALLOWED_TACTICS))}")
print()
# Find all response analysis files
file_info_list = find_response_analysis_files(final_response_dir)
if not file_info_list:
print("[ERROR] No response analysis JSON files found")
print("Expected structure: final_response/model_name/tactic_name/timestamp/*_response_analysis.json")
return 1
print(f"Found {len(file_info_list)} response analysis files\n")
# Process each file
results = []
for file_info in file_info_list:
json_path = file_info['json_path']
tactic_label = file_info['tactic_label']
model_name = file_info['model_name']
# Since tactics are now standardized, we can directly use the folder name
# The folder name should match one of the 8 allowed tactics
target_tactic = tactic_label
# Validate that the tactic is in our allowed list
if target_tactic not in ALLOWED_TACTICS:
print(f"[WARNING] Unknown tactic '{target_tactic}' in folder name, skipping...")
continue
# Binary detection: 1 if detected, 0 if not
tactic_detected = detect_tactic_in_json(json_path, target_tactic)
total_events = extract_total_events_analyzed(json_path)
results.append({
"file": str(json_path.relative_to(final_response_dir)),
"model": model_name,
"tactic": target_tactic,
"tactic_detected": tactic_detected,
"total_abnormal_events_detected": total_events
})
status = "DETECTED" if tactic_detected == 1 else "NOT DETECTED"
print(f" {model_name}/{tactic_label}/{json_path.parent.name}/{json_path.name}")
print(f" Status: {status}, Events analyzed: {total_events}")
# Create output summary
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
summary = {
"timestamp": datetime.now().isoformat(),
"total_files_processed": len(results),
"results": results
}
output_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")
# Calculate summary statistics
total_detected = sum(1 for r in results if r['tactic_detected'] == 1)
total_files = len(results)
detection_rate = (total_detected / total_files * 100) if total_files > 0 else 0
print("\n" + "="*80)
print("TACTIC COUNTING COMPLETE")
print("="*80)
print(f"Processed: {total_files} files")
print(f"Tactics detected: {total_detected}/{total_files} ({detection_rate:.1f}%)")
print(f"Output: {output_path}")
print("="*80 + "\n")
return 0
if __name__ == "__main__":
exit(main())