File size: 8,437 Bytes
223ef32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#!/usr/bin/env python3
"""

Count tactic occurrences in response analysis JSON files.



Reads all *_response_analysis.json files from final_response/ directory

and counts how many times each tactic appears in the analysis.



Usage: 

    python count_tactics.py [--output OUTPUT_PATH]

"""
import argparse
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, Any


def find_project_root(start: Path) -> Path:
    """Find the project root by looking for common markers."""
    for p in [start] + list(start.parents):
        if (p / 'final_response').exists() or (p / 'src').exists() or (p / '.git').exists():
            return p
    return start.parent


# Define the 8 allowed tactics that match Mordor dataset folder names
ALLOWED_TACTICS = {
    "collection", "credential_access", "defense_evasion", "discovery", 
    "execution", "lateral_movement", "persistance"
}

def detect_tactic_in_json(path: Path, target_tactic: str) -> int:
    """

    Detect if a tactic exists in JSON file (binary detection).

    Now simplified since tactics are standardized as lists with only the 8 allowed values.

    Returns 1 if tactic found at least once, 0 if not found.

    """
    def find_tactic_in_lists(obj):
        """Recursively search for tactic lists and check if target is present"""
        if isinstance(obj, dict):
            for k, v in obj.items():
                if k == "tactic" and isinstance(v, list):
                    # Check if target tactic is in the list
                    if target_tactic in v:
                        return True
                # Recurse into nested objects
                if find_tactic_in_lists(v):
                    return True
        elif isinstance(obj, list):
            for item in obj:
                if find_tactic_in_lists(item):
                    return True
        return False

    try:
        data = json.loads(path.read_text(encoding="utf-8"))
        return 1 if find_tactic_in_lists(data) else 0
    except Exception as e:
        print(f"[WARNING] Error reading {path}: {e}")
        return 0


def extract_total_events_analyzed(path: Path) -> int:
    """Extract total_events_analyzed from JSON file."""
    try:
        data = json.loads(path.read_text(encoding="utf-8"))
        
        # Check various possible locations
        if isinstance(data, dict):
            # Top level
            if "total_events_analyzed" in data:
                return data["total_events_analyzed"]
            
            # correlation_analysis level
            if "correlation_analysis" in data and isinstance(data["correlation_analysis"], dict):
                if "total_events_analyzed" in data["correlation_analysis"]:
                    return data["correlation_analysis"]["total_events_analyzed"]
            
            # metadata level
            if "metadata" in data and isinstance(data["metadata"], dict):
                if "total_events_analyzed" in data["metadata"]:
                    return data["metadata"]["total_events_analyzed"]
                if "total_abnormal_events" in data["metadata"]:
                    return data["metadata"]["total_abnormal_events"]
        
        return 0
    except Exception:
        return 0


def find_response_analysis_files(base_path: Path) -> list:
    """Find all response analysis JSON files in model/tactic folder structure."""
    results = []
    
    # Iterate through model folders (first level)
    for model_folder in sorted(base_path.iterdir()):
        if not model_folder.is_dir():
            continue
        
        model_name = model_folder.name
        
        # Iterate through tactic folders (second level)
        for tactic_folder in sorted(model_folder.iterdir()):
            if not tactic_folder.is_dir():
                continue
            
            tactic_label = tactic_folder.name
            
            # Iterate through timestamped folders (third level)
            for timestamp_folder in sorted(tactic_folder.iterdir()):
                if not timestamp_folder.is_dir():
                    continue
                
                # Find response analysis JSON files
                json_files = list(timestamp_folder.glob('*_response_analysis.json'))
                
                for json_file in json_files:
                    results.append({
                        'json_path': json_file,
                        'tactic_label': tactic_label,
                        'model_name': model_name
                    })
    
    return results


def main():
    parser = argparse.ArgumentParser(
        description="Count tactic occurrences in response analysis files"
    )
    parser.add_argument(
        "--output",
        default="full_pipeline_evaluation/results/tactic_counts_summary.json",
        help="Output file for summary results"
    )
    args = parser.parse_args()

    # Find project root and final_response directory
    current_file = Path(__file__).resolve()
    project_root = find_project_root(current_file.parent)
    final_response_dir = project_root / "final_response"

    if not final_response_dir.exists():
        print(f"[ERROR] final_response directory not found at: {final_response_dir}")
        print("Run execute_pipeline.py first to generate analysis results")
        return 1

    print("="*80)
    print("COUNTING TACTIC OCCURRENCES")
    print("="*80)
    print(f"Scanning: {final_response_dir}")
    print(f"Allowed tactics: {', '.join(sorted(ALLOWED_TACTICS))}")
    print()

    # Find all response analysis files
    file_info_list = find_response_analysis_files(final_response_dir)

    if not file_info_list:
        print("[ERROR] No response analysis JSON files found")
        print("Expected structure: final_response/model_name/tactic_name/timestamp/*_response_analysis.json")
        return 1

    print(f"Found {len(file_info_list)} response analysis files\n")

    # Process each file
    results = []
    for file_info in file_info_list:
        json_path = file_info['json_path']
        tactic_label = file_info['tactic_label']
        model_name = file_info['model_name']
        
        # Since tactics are now standardized, we can directly use the folder name
        # The folder name should match one of the 8 allowed tactics
        target_tactic = tactic_label
        
        # Validate that the tactic is in our allowed list
        if target_tactic not in ALLOWED_TACTICS:
            print(f"[WARNING] Unknown tactic '{target_tactic}' in folder name, skipping...")
            continue
        
        # Binary detection: 1 if detected, 0 if not
        tactic_detected = detect_tactic_in_json(json_path, target_tactic)
        total_events = extract_total_events_analyzed(json_path)
        
        results.append({
            "file": str(json_path.relative_to(final_response_dir)),
            "model": model_name,
            "tactic": target_tactic,
            "tactic_detected": tactic_detected,
            "total_abnormal_events_detected": total_events
        })
        
        status = "DETECTED" if tactic_detected == 1 else "NOT DETECTED"
        print(f"  {model_name}/{tactic_label}/{json_path.parent.name}/{json_path.name}")
        print(f"    Status: {status}, Events analyzed: {total_events}")

    # Create output summary
    output_path = Path(args.output)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    
    summary = {
        "timestamp": datetime.now().isoformat(),
        "total_files_processed": len(results),
        "results": results
    }
    
    output_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")

    # Calculate summary statistics
    total_detected = sum(1 for r in results if r['tactic_detected'] == 1)
    total_files = len(results)
    detection_rate = (total_detected / total_files * 100) if total_files > 0 else 0
    
    print("\n" + "="*80)
    print("TACTIC COUNTING COMPLETE")
    print("="*80)
    print(f"Processed: {total_files} files")
    print(f"Tactics detected: {total_detected}/{total_files} ({detection_rate:.1f}%)")
    print(f"Output: {output_path}")
    print("="*80 + "\n")

    return 0


if __name__ == "__main__":
    exit(main())