|
|
|
|
|
"""
|
|
|
Count tactic occurrences in response analysis JSON files.
|
|
|
|
|
|
Reads all *_response_analysis.json files from final_response/ directory
|
|
|
and counts how many times each tactic appears in the analysis.
|
|
|
|
|
|
Usage:
|
|
|
python count_tactics.py [--output OUTPUT_PATH]
|
|
|
"""
|
|
|
import argparse
|
|
|
import json
|
|
|
from pathlib import Path
|
|
|
from datetime import datetime
|
|
|
from typing import Dict, Any
|
|
|
|
|
|
|
|
|
def find_project_root(start: Path) -> Path:
|
|
|
"""Find the project root by looking for common markers."""
|
|
|
for p in [start] + list(start.parents):
|
|
|
if (p / 'final_response').exists() or (p / 'src').exists() or (p / '.git').exists():
|
|
|
return p
|
|
|
return start.parent
|
|
|
|
|
|
|
|
|
|
|
|
ALLOWED_TACTICS = {
|
|
|
"collection", "credential_access", "defense_evasion", "discovery",
|
|
|
"execution", "lateral_movement", "persistance"
|
|
|
}
|
|
|
|
|
|
def detect_tactic_in_json(path: Path, target_tactic: str) -> int:
|
|
|
"""
|
|
|
Detect if a tactic exists in JSON file (binary detection).
|
|
|
Now simplified since tactics are standardized as lists with only the 8 allowed values.
|
|
|
Returns 1 if tactic found at least once, 0 if not found.
|
|
|
"""
|
|
|
def find_tactic_in_lists(obj):
|
|
|
"""Recursively search for tactic lists and check if target is present"""
|
|
|
if isinstance(obj, dict):
|
|
|
for k, v in obj.items():
|
|
|
if k == "tactic" and isinstance(v, list):
|
|
|
|
|
|
if target_tactic in v:
|
|
|
return True
|
|
|
|
|
|
if find_tactic_in_lists(v):
|
|
|
return True
|
|
|
elif isinstance(obj, list):
|
|
|
for item in obj:
|
|
|
if find_tactic_in_lists(item):
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
data = json.loads(path.read_text(encoding="utf-8"))
|
|
|
return 1 if find_tactic_in_lists(data) else 0
|
|
|
except Exception as e:
|
|
|
print(f"[WARNING] Error reading {path}: {e}")
|
|
|
return 0
|
|
|
|
|
|
|
|
|
def extract_total_events_analyzed(path: Path) -> int:
|
|
|
"""Extract total_events_analyzed from JSON file."""
|
|
|
try:
|
|
|
data = json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
|
|
|
if isinstance(data, dict):
|
|
|
|
|
|
if "total_events_analyzed" in data:
|
|
|
return data["total_events_analyzed"]
|
|
|
|
|
|
|
|
|
if "correlation_analysis" in data and isinstance(data["correlation_analysis"], dict):
|
|
|
if "total_events_analyzed" in data["correlation_analysis"]:
|
|
|
return data["correlation_analysis"]["total_events_analyzed"]
|
|
|
|
|
|
|
|
|
if "metadata" in data and isinstance(data["metadata"], dict):
|
|
|
if "total_events_analyzed" in data["metadata"]:
|
|
|
return data["metadata"]["total_events_analyzed"]
|
|
|
if "total_abnormal_events" in data["metadata"]:
|
|
|
return data["metadata"]["total_abnormal_events"]
|
|
|
|
|
|
return 0
|
|
|
except Exception:
|
|
|
return 0
|
|
|
|
|
|
|
|
|
def find_response_analysis_files(base_path: Path) -> list:
|
|
|
"""Find all response analysis JSON files in model/tactic folder structure."""
|
|
|
results = []
|
|
|
|
|
|
|
|
|
for model_folder in sorted(base_path.iterdir()):
|
|
|
if not model_folder.is_dir():
|
|
|
continue
|
|
|
|
|
|
model_name = model_folder.name
|
|
|
|
|
|
|
|
|
for tactic_folder in sorted(model_folder.iterdir()):
|
|
|
if not tactic_folder.is_dir():
|
|
|
continue
|
|
|
|
|
|
tactic_label = tactic_folder.name
|
|
|
|
|
|
|
|
|
for timestamp_folder in sorted(tactic_folder.iterdir()):
|
|
|
if not timestamp_folder.is_dir():
|
|
|
continue
|
|
|
|
|
|
|
|
|
json_files = list(timestamp_folder.glob('*_response_analysis.json'))
|
|
|
|
|
|
for json_file in json_files:
|
|
|
results.append({
|
|
|
'json_path': json_file,
|
|
|
'tactic_label': tactic_label,
|
|
|
'model_name': model_name
|
|
|
})
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
def main():
|
|
|
parser = argparse.ArgumentParser(
|
|
|
description="Count tactic occurrences in response analysis files"
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"--output",
|
|
|
default="full_pipeline_evaluation/results/tactic_counts_summary.json",
|
|
|
help="Output file for summary results"
|
|
|
)
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
|
current_file = Path(__file__).resolve()
|
|
|
project_root = find_project_root(current_file.parent)
|
|
|
final_response_dir = project_root / "final_response"
|
|
|
|
|
|
if not final_response_dir.exists():
|
|
|
print(f"[ERROR] final_response directory not found at: {final_response_dir}")
|
|
|
print("Run execute_pipeline.py first to generate analysis results")
|
|
|
return 1
|
|
|
|
|
|
print("="*80)
|
|
|
print("COUNTING TACTIC OCCURRENCES")
|
|
|
print("="*80)
|
|
|
print(f"Scanning: {final_response_dir}")
|
|
|
print(f"Allowed tactics: {', '.join(sorted(ALLOWED_TACTICS))}")
|
|
|
print()
|
|
|
|
|
|
|
|
|
file_info_list = find_response_analysis_files(final_response_dir)
|
|
|
|
|
|
if not file_info_list:
|
|
|
print("[ERROR] No response analysis JSON files found")
|
|
|
print("Expected structure: final_response/model_name/tactic_name/timestamp/*_response_analysis.json")
|
|
|
return 1
|
|
|
|
|
|
print(f"Found {len(file_info_list)} response analysis files\n")
|
|
|
|
|
|
|
|
|
results = []
|
|
|
for file_info in file_info_list:
|
|
|
json_path = file_info['json_path']
|
|
|
tactic_label = file_info['tactic_label']
|
|
|
model_name = file_info['model_name']
|
|
|
|
|
|
|
|
|
|
|
|
target_tactic = tactic_label
|
|
|
|
|
|
|
|
|
if target_tactic not in ALLOWED_TACTICS:
|
|
|
print(f"[WARNING] Unknown tactic '{target_tactic}' in folder name, skipping...")
|
|
|
continue
|
|
|
|
|
|
|
|
|
tactic_detected = detect_tactic_in_json(json_path, target_tactic)
|
|
|
total_events = extract_total_events_analyzed(json_path)
|
|
|
|
|
|
results.append({
|
|
|
"file": str(json_path.relative_to(final_response_dir)),
|
|
|
"model": model_name,
|
|
|
"tactic": target_tactic,
|
|
|
"tactic_detected": tactic_detected,
|
|
|
"total_abnormal_events_detected": total_events
|
|
|
})
|
|
|
|
|
|
status = "DETECTED" if tactic_detected == 1 else "NOT DETECTED"
|
|
|
print(f" {model_name}/{tactic_label}/{json_path.parent.name}/{json_path.name}")
|
|
|
print(f" Status: {status}, Events analyzed: {total_events}")
|
|
|
|
|
|
|
|
|
output_path = Path(args.output)
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
summary = {
|
|
|
"timestamp": datetime.now().isoformat(),
|
|
|
"total_files_processed": len(results),
|
|
|
"results": results
|
|
|
}
|
|
|
|
|
|
output_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")
|
|
|
|
|
|
|
|
|
total_detected = sum(1 for r in results if r['tactic_detected'] == 1)
|
|
|
total_files = len(results)
|
|
|
detection_rate = (total_detected / total_files * 100) if total_files > 0 else 0
|
|
|
|
|
|
print("\n" + "="*80)
|
|
|
print("TACTIC COUNTING COMPLETE")
|
|
|
print("="*80)
|
|
|
print(f"Processed: {total_files} files")
|
|
|
print(f"Tactics detected: {total_detected}/{total_files} ({detection_rate:.1f}%)")
|
|
|
print(f"Output: {output_path}")
|
|
|
print("="*80 + "\n")
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
exit(main()) |