minhan6559's picture
Upload 102 files
9e3d618 verified
"""
Response Agent - Maps Event IDs to MITRE ATT&CK Techniques and Generates Recommendations
This agent analyzes log analysis results and retrieval intelligence to create explicit
Event ID → MITRE technique mappings with actionable recommendations.
"""
import os
import json
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List, Tuple
from langchain.chat_models import init_chat_model
# Import prompts from the separate file
from src.agents.response_agent.prompts import CORRELATION_ANALYSIS_PROMPT
class ResponseAgent:
"""
Response Agent that creates explicit Event ID to MITRE technique mappings
and generates actionable recommendations based on correlation analysis.
"""
def __init__(
self,
model_name: str = "google_genai:gemini-2.0-flash",
temperature: float = 0.1,
output_dir: str = "final_response",
llm_client=None,
):
"""
Initialize the Response Agent.
Args:
model_name: LLM model to use
temperature: Temperature for generation
output_dir: Directory to save final response JSON
llm_client: Optional pre-initialized LLM client (overrides model_name/temperature)
"""
if llm_client:
self.llm = llm_client
# Extract model name from llm_client if possible
if hasattr(llm_client, "model_name"):
self.model_name = llm_client.model_name
else:
# Fallback: try to extract from the model string
self.model_name = (
str(llm_client).split("'")[1]
if "'" in str(llm_client)
else "unknown_model"
)
print(f"[INFO] Response Agent: Using provided LLM client")
else:
self.llm = init_chat_model(model_name, temperature=temperature)
self.model_name = model_name
print(f"[INFO] Response Agent: Using default LLM model: {model_name}")
# Create model-specific output directory (strip provider prefixes like
# "google_genai:" or "models/" so we only keep clean names such as
# "gemini-2.0-flash" or "gemini-2.0-flash-lite")
self.model_dir_name = self._sanitize_model_name(self.model_name)
self.output_dir = Path(output_dir) / self.model_dir_name
self.output_dir.mkdir(parents=True, exist_ok=True)
def _sanitize_model_name(self, model_name: str) -> str:
"""
Produce a clean model directory name without provider prefixes.
Examples:
- "google_genai:gemini-2.0-flash" -> "gemini-2.0-flash"
- "google_genai:gemini-2.0-flash-lite" -> "gemini-2.0-flash-lite"
- "models/gemini-2.0-flash-lite" -> "gemini-2.0-flash-lite"
- "groq:gpt-oss-120b" -> "gpt-oss-120b"
"""
raw = (model_name or "").strip()
# Prefer the segment after ":" if present (provider:model)
if ":" in raw:
raw = raw.split(":", 1)[1]
# Then prefer the last path segment after "/" if present (e.g., models/name)
if "/" in raw or "\\" in raw:
raw = raw.replace("\\", "/").split("/")[-1]
# Final sanitation: allow only safe characters
sanitized = "".join(c for c in raw if c.isalnum() or c in "._-")
# Fallback in case the resulting name is empty
return sanitized or "model"
def analyze_and_map(
self,
log_analysis_result: Dict[str, Any],
retrieval_result: Dict[str, Any],
log_file: str,
tactic: str = None,
) -> Dict[str, Any]:
"""
Analyze log analysis and retrieval results to create Event ID mappings.
Args:
log_analysis_result: Results from log analysis agent
retrieval_result: Results from retrieval supervisor
log_file: Path to original log file
tactic: Optional tactic name for organizing output
Returns:
Structured mapping analysis with recommendations
"""
# Extract data for analysis
abnormal_events = log_analysis_result.get("abnormal_events", [])
overall_assessment = log_analysis_result.get("overall_assessment", "UNKNOWN")
# Extract MITRE techniques from retrieval results with improved parsing
mitre_techniques = self._extract_mitre_techniques(retrieval_result)
# Pre-filter techniques based on semantic similarity
relevant_techniques = self._filter_relevant_techniques(
abnormal_events, mitre_techniques
)
# Create analysis prompt
analysis_prompt = self._create_analysis_prompt(
abnormal_events, relevant_techniques, overall_assessment
)
# Get LLM analysis
response = self.llm.invoke(analysis_prompt)
mapping_analysis = self._parse_response(response.content, log_analysis_result)
# Add metadata
mapping_analysis["metadata"] = {
"analysis_timestamp": datetime.now().isoformat(),
"overall_assessment": overall_assessment,
"total_abnormal_events": len(abnormal_events),
"total_techniques_retrieved": len(mitre_techniques),
}
# Save to JSON file
output_path, markdown_report = self._save_response(
mapping_analysis, log_file, tactic
)
return mapping_analysis, markdown_report
def _extract_mitre_techniques(
self, retrieval_result: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Extract MITRE techniques from structured retrieval supervisor results."""
# NEW APPROACH: Use structured results directly
if "retrieved_techniques" in retrieval_result:
techniques = retrieval_result["retrieved_techniques"]
print(
f"[INFO] Using structured retrieval results: {len(techniques)} techniques"
)
# Ensure all techniques have required fields
validated_techniques = []
for tech in techniques:
# Ensure tactic is a list format
tactic = tech.get("tactic", "")
if isinstance(tactic, str):
# Convert string to list if it's a single tactic
tactic = [tactic] if tactic else []
elif not isinstance(tactic, list):
tactic = []
validated_tech = {
"technique_id": tech.get("technique_id", ""),
"technique_name": tech.get("technique_name", ""),
"tactic": tactic,
"description": tech.get("description", ""),
"relevance_score": tech.get("relevance_score", 0.5),
}
validated_techniques.append(validated_tech)
return validated_techniques
# FALLBACK: Legacy parsing for backward compatibility
print("[WARNING] No structured results found, using legacy message parsing")
return self._extract_mitre_techniques_legacy(retrieval_result)
def _extract_mitre_techniques_legacy(
self, retrieval_result: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Legacy method to extract MITRE techniques from raw message history."""
techniques = []
messages = retrieval_result.get("messages", [])
# PRIORITY STRATEGY: Extract from database agent tool messages
# These contain the original tactic information before it's lost in formatting
for msg in messages:
# Look for tool messages from search_techniques calls
if (
hasattr(msg, "name")
and msg.name
and "search_techniques" in str(msg.name)
):
if hasattr(msg, "content") and msg.content:
try:
# Parse the tool response
tool_data = (
json.loads(msg.content)
if isinstance(msg.content, str)
else msg.content
)
if "techniques" in tool_data:
for tech in tool_data["techniques"]:
# Convert tactics to list format
tactics = tech.get("tactics", [])
if isinstance(tactics, str):
tactics = [tactics] if tactics else []
elif not isinstance(tactics, list):
tactics = []
converted = {
"technique_id": tech.get("attack_id", ""),
"technique_name": tech.get("name", ""),
"tactic": tactics, # Now as list
"platforms": ", ".join(tech.get("platforms", [])),
"description": tech.get("description", ""),
"relevance_score": tech.get("relevance_score", 0),
}
techniques.append(converted)
except (json.JSONDecodeError, TypeError, AttributeError):
continue
# If we successfully extracted techniques with tactics, use them
if techniques:
print(
f"[INFO] Extracted {len(techniques)} techniques with tactics from database agent"
)
# Remove duplicates
unique_techniques = []
seen_ids = set()
for tech in techniques:
tech_id = tech.get("technique_id")
if tech_id and tech_id not in seen_ids:
seen_ids.add(tech_id)
unique_techniques.append(tech)
return unique_techniques
# FALLBACK: Use original extraction strategies
print(
"[WARNING] Could not extract techniques from tool messages, using fallback extraction"
)
# Strategy 1: Look for the final supervisor message with structured data
for msg in reversed(messages):
if hasattr(msg, "content") and msg.content:
content = msg.content
# Look for different possible JSON structures
json_candidates = self._extract_json_from_content(content)
for json_data in json_candidates:
# Try multiple extraction patterns
extracted = self._try_extraction_patterns(json_data)
if extracted:
techniques.extend(extracted)
break
if techniques:
break
# Strategy 2: Look for tool messages with technique data (already tried above)
if not techniques:
for msg in messages:
if hasattr(msg, "name") and "database" in str(msg.name).lower():
if hasattr(msg, "content"):
tool_techniques = self._extract_from_tool_content(msg.content)
if tool_techniques:
techniques.extend(tool_techniques)
# Strategy 3: Parse any structured content that looks like MITRE data
if not techniques:
for msg in messages:
if hasattr(msg, "content") and msg.content:
general_techniques = self._extract_general_technique_mentions(
msg.content
)
if general_techniques:
techniques.extend(general_techniques)
break
# Remove duplicates based on technique_id
unique_techniques = []
seen_ids = set()
for tech in techniques:
tech_id = (
tech.get("technique_id") or tech.get("attack_id") or tech.get("id")
)
if tech_id and tech_id not in seen_ids:
seen_ids.add(tech_id)
unique_techniques.append(tech)
return unique_techniques
def _extract_json_from_content(self, content: str) -> List[Dict[str, Any]]:
"""Extract all possible JSON objects from content."""
json_candidates = []
# Look for JSON blocks
if "```json" in content:
json_blocks = content.split("```json")
for block in json_blocks[1:]:
json_str = block.split("```")[0].strip()
try:
json_data = json.loads(json_str)
json_candidates.append(json_data)
except json.JSONDecodeError:
continue
# Look for any JSON-like structures
start_idx = 0
while True:
start_idx = content.find("{", start_idx)
if start_idx == -1:
break
# Find matching closing brace
brace_count = 0
end_idx = start_idx
for i in range(start_idx, len(content)):
if content[i] == "{":
brace_count += 1
elif content[i] == "}":
brace_count -= 1
if brace_count == 0:
end_idx = i + 1
break
if brace_count == 0:
json_str = content[start_idx:end_idx]
try:
json_data = json.loads(json_str)
json_candidates.append(json_data)
except json.JSONDecodeError:
pass
start_idx += 1
return json_candidates
def _try_extraction_patterns(
self, json_data: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Try different patterns to extract MITRE techniques from JSON data."""
techniques = []
# Pattern 1: Original expected format
if "cybersecurity_intelligence" in json_data:
threat_indicators = json_data["cybersecurity_intelligence"].get(
"threat_indicators", []
)
for indicator in threat_indicators:
mitre_techniques = indicator.get("mitre_attack_techniques", [])
techniques.extend(mitre_techniques)
# Pattern 2: Direct techniques list
if "techniques" in json_data:
techniques.extend(json_data["techniques"])
# Pattern 3: MITRE techniques at root level
if "mitre_techniques" in json_data:
techniques.extend(json_data["mitre_techniques"])
# Pattern 4: mitre_attack_techniques array
if "mitre_attack_techniques" in json_data:
techniques.extend(json_data["mitre_attack_techniques"])
# Pattern 5: Database agent response format
if "search_type" in json_data and "techniques" in json_data:
for tech in json_data["techniques"]:
# Convert database agent format to expected format
# Convert tactics to list format
tactics = tech.get("tactics", [])
if isinstance(tactics, str):
tactics = [tactics] if tactics else []
elif not isinstance(tactics, list):
tactics = []
converted = {
"technique_id": tech.get("attack_id", ""),
"technique_name": tech.get("name", ""),
"tactic": tactics, # Now as list
"description": tech.get("description", ""),
}
techniques.append(converted)
# Pattern 6: Look for any structure with attack_id/technique_id
def find_techniques_recursive(obj, path=""):
found = []
if isinstance(obj, dict):
# Check if this looks like a technique
if "technique_id" in obj and "technique_name" in obj:
# Ensure tactic is a list format
tactic = obj.get("tactic", "")
if isinstance(tactic, str):
tactic = [tactic] if tactic else []
elif not isinstance(tactic, list):
tactic = []
technique = {
"technique_id": obj.get("technique_id", ""),
"technique_name": obj.get("technique_name", ""),
"tactic": tactic, # Now as list
"description": obj.get("description", ""),
}
found.append(technique)
elif "attack_id" in obj:
# Convert tactics to list format
tactics = obj.get("tactics", [])
if isinstance(tactics, str):
tactics = [tactics] if tactics else []
elif not isinstance(tactics, list):
tactics = []
converted = {
"technique_id": obj.get("attack_id", ""),
"technique_name": obj.get("name", ""),
"tactic": tactics, # Now as list
"description": obj.get("description", ""),
}
found.append(converted)
# Recurse into nested objects
for key, value in obj.items():
found.extend(find_techniques_recursive(value, f"{path}.{key}"))
elif isinstance(obj, list):
for i, item in enumerate(obj):
found.extend(find_techniques_recursive(item, f"{path}[{i}]"))
return found
techniques.extend(find_techniques_recursive(json_data))
return techniques
def _filter_relevant_techniques(
self, abnormal_events: List[Dict], techniques: List[Dict]
) -> List[Dict]:
"""Filter techniques based on semantic relevance to events."""
if not techniques or not abnormal_events:
return techniques
relevant_techniques = []
# Extract keywords from events for matching
event_keywords = set()
for event in abnormal_events:
desc = event.get("event_description", "").lower()
indicators = [str(ind).lower() for ind in event.get("indicators", [])]
category = event.get("attack_category", "").lower()
threat = event.get("potential_threat", "").lower()
# Add key terms
event_keywords.update(desc.split())
for ind in indicators:
event_keywords.update(ind.split())
if category:
event_keywords.update(category.split())
if threat:
event_keywords.update(threat.split())
# Score techniques based on keyword overlap
for technique in techniques:
tech_name = technique.get("technique_name", "").lower()
tech_desc = technique.get("description", "").lower()
tech_tactic = technique.get("tactic", [])
# Convert tactics to string for keyword matching
if isinstance(tech_tactic, list):
tech_tactic_str = " ".join(tech_tactic).lower()
else:
tech_tactic_str = str(tech_tactic).lower()
# Calculate relevance score
tech_words = set(
tech_name.split() + tech_desc.split() + tech_tactic_str.split()
)
overlap = len(event_keywords.intersection(tech_words))
# Add technique if there's reasonable overlap or if it's a high-value technique
if overlap > 0 or any(
keyword in tech_name or keyword in tech_desc
for keyword in [
"dns",
"registry",
"token",
"privilege",
"port",
"network",
"process",
]
):
technique["relevance_score"] = overlap
relevant_techniques.append(technique)
# Sort by relevance score (descending) and return relevant techniques
relevant_techniques.sort(
key=lambda x: x.get("relevance_score", 0), reverse=True
)
# Dynamic filtering: return techniques with meaningful relevance or minimum threshold
if relevant_techniques:
# Keep techniques with score > 0 or important cybersecurity techniques
filtered = [
t for t in relevant_techniques if t.get("relevance_score", 0) > 0
]
# If we filtered too aggressively, keep at least the most relevant ones
if not filtered and relevant_techniques:
filtered = relevant_techniques[: min(5, len(relevant_techniques))]
# But don't overwhelm the LLM - if we have too many, keep the most relevant
if len(filtered) > 15: # Reasonable upper limit
filtered = filtered[:15]
return filtered
return relevant_techniques # Return all if no filtering worked
def _extract_from_tool_content(self, content: str) -> List[Dict[str, Any]]:
"""Extract techniques from tool message content."""
techniques = []
# Try to parse as JSON first
try:
if isinstance(content, str):
json_data = json.loads(content)
techniques.extend(self._try_extraction_patterns(json_data))
except json.JSONDecodeError:
pass
return techniques
def _extract_general_technique_mentions(self, content: str) -> List[Dict[str, Any]]:
"""Extract technique mentions from general text content."""
techniques = []
# Look for MITRE technique patterns like T1234, T1234.001
import re
# Pattern for MITRE technique IDs
technique_pattern = r"T\d{4}(?:\.\d{3})?"
technique_matches = re.findall(technique_pattern, content)
# Look for technique names in context
for match in technique_matches:
# Try to extract technique name from surrounding context
pattern = rf"{re.escape(match)}[^.]*?([A-Z][a-zA-Z\s]+)"
context_match = re.search(pattern, content)
technique_name = ""
if context_match:
technique_name = context_match.group(1).strip()
technique = {
"technique_id": match,
"technique_name": technique_name,
"tactic": [], # Empty list for unknown tactics
"description": f"Technique {match} mentioned in retrieval results",
}
techniques.append(technique)
return techniques
def _calculate_bayesian_confidence(
self, llm_confidence: float, event_severity: str, total_matched_techniques: int
) -> float:
"""
Bayesian-inspired confidence calculation.
Based on correlation agent's methodology with weighted factors:
- Correlation (50%): LLM-assigned confidence score
- Evidence (25%): Number and quality of matched techniques
- Severity (25%): Event severity level
Args:
llm_confidence: Original confidence score from LLM (0.0-1.0)
event_severity: Severity level (LOW, MEDIUM, HIGH, CRITICAL)
total_matched_techniques: Total number of matched techniques
Returns:
Adjusted confidence score (0.0-0.95)
"""
# Weight distribution based on cybersecurity research
WEIGHTS = {
"correlation": 0.50, # Primary indicator - LLM confidence
"evidence": 0.25, # Evidence strength
"severity": 0.25, # Contextual severity
}
# Severity scores based on CVSS principles
severity_scores = {"CRITICAL": 1.0, "HIGH": 0.85, "MEDIUM": 0.6, "LOW": 0.35}
severity_component = severity_scores.get(event_severity.upper(), 0.6)
# Evidence component with diminishing returns
# More matched techniques increase confidence but with diminishing returns
quantity_factor = min(1.0, 0.5 + (total_matched_techniques * 0.15))
evidence_component = quantity_factor
# Weighted combination
bayesian_confidence = (
WEIGHTS["correlation"] * llm_confidence
+ WEIGHTS["evidence"] * evidence_component
+ WEIGHTS["severity"] * severity_component
)
# Cap at 0.95 to avoid overconfidence bias
bayesian_confidence = min(bayesian_confidence, 0.95)
# Uncertainty penalty for single weak matches
if total_matched_techniques == 1 and llm_confidence < 0.6:
bayesian_confidence *= 0.8
return round(bayesian_confidence, 3)
def _create_analysis_prompt(
self,
abnormal_events: List[Dict],
mitre_techniques: List[Dict],
overall_assessment: str,
) -> str:
"""Create the analysis prompt for the LLM using the template from prompts.py."""
return CORRELATION_ANALYSIS_PROMPT.format(
abnormal_events=json.dumps(abnormal_events, indent=2),
num_techniques=len(mitre_techniques),
mitre_techniques=json.dumps(mitre_techniques, indent=2),
overall_assessment=overall_assessment,
)
def _parse_response(
self, response_content: str, log_analysis_result: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Parse the LLM response, extract JSON, and apply Bayesian confidence adjustment."""
try:
# Try to extract JSON from the response
if "```json" in response_content:
json_str = response_content.split("```json")[1].split("```")[0].strip()
elif "```" in response_content:
json_str = response_content.split("```")[1].split("```")[0].strip()
else:
# Look for JSON-like structure
start_idx = response_content.find("{")
end_idx = response_content.rfind("}") + 1
if start_idx != -1 and end_idx > start_idx:
json_str = response_content[start_idx:end_idx]
else:
json_str = response_content.strip()
result = json.loads(json_str)
# Apply Bayesian confidence adjustment to each mapping
correlation_analysis = result.get("correlation_analysis", {})
direct_mappings = correlation_analysis.get("direct_mappings", [])
if direct_mappings and log_analysis_result:
# Extract overall severity from log analysis
overall_assessment = log_analysis_result.get(
"overall_assessment", "UNKNOWN"
)
# Map overall assessment to severity level
assessment_to_severity = {
"NORMAL": "LOW",
"SUSPICIOUS": "MEDIUM",
"ABNORMAL": "HIGH",
"CRITICAL": "CRITICAL",
}
log_severity = assessment_to_severity.get(overall_assessment, "MEDIUM")
total_matched = len(direct_mappings)
# Apply Bayesian adjustment to each mapping
for mapping in direct_mappings:
llm_confidence = mapping.get("confidence_score", 0.5)
# Calculate Bayesian-adjusted confidence
bayesian_confidence = self._calculate_bayesian_confidence(
llm_confidence=llm_confidence,
event_severity=log_severity,
total_matched_techniques=total_matched,
)
# Store adjusted confidence (overwrite original)
mapping["confidence_score"] = bayesian_confidence
# Optionally store original for debugging (can remove this)
mapping["_original_llm_confidence"] = llm_confidence
return result
except json.JSONDecodeError as e:
print(f"[WARNING] Failed to parse LLM response as JSON: {e}")
# Return a fallback structure
return {
"correlation_analysis": {
"analysis_summary": "Failed to parse response - manual review required",
"mapping_confidence": "LOW",
"total_events_analyzed": 0,
"total_techniques_retrieved": 0,
"retrieval_success": False,
"direct_mappings": [],
"unmapped_events": [],
"overall_recommendations": [
"Review raw response for manual analysis"
],
},
"raw_response": response_content,
}
def _save_response(
self, mapping_analysis: Dict[str, Any], log_file: str, tactic: str = None
) -> Tuple[str, str]:
"""Save the response analysis to both JSON and Markdown files."""
# Generate folder and filenames based on log file
log_filename = Path(log_file).stem
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Create tactic-specific subdirectory if tactic is provided
if tactic:
base_output_dir = self.output_dir / tactic
base_output_dir.mkdir(exist_ok=True)
else:
base_output_dir = self.output_dir
# Create subfolder with log name and timestamp
output_folder = base_output_dir / f"{log_filename}_{timestamp}"
output_folder.mkdir(exist_ok=True)
# File paths - use shorter, more readable names
json_filename = "response_analysis.json"
md_filename = "threat_report.md"
json_path = output_folder / json_filename
md_path = output_folder / md_filename
try:
# Save JSON file
with open(json_path, "w", encoding="utf-8") as f:
json.dump(mapping_analysis, f, indent=2, ensure_ascii=False)
# Generate and save Markdown report
markdown_report = self._generate_markdown_report(
mapping_analysis, log_filename
)
with open(md_path, "w", encoding="utf-8") as f:
f.write(markdown_report)
return str(output_folder), markdown_report.strip()
except Exception as e:
print(f"[ERROR] Failed to save response analysis: {e}")
return "", "" # Return empty strings for both paths and report
def _generate_markdown_report(
self, mapping_analysis: Dict[str, Any], log_filename: str
) -> str:
"""Generate a nicely formatted Markdown threat intelligence report."""
correlation = mapping_analysis.get("correlation_analysis", {})
metadata = mapping_analysis.get("metadata", {})
# Start building the Markdown content
md = []
# Header
md.append("# Cybersecurity Threat Intelligence Report\n")
md.append("---\n")
# Metadata section
md.append("## Report Metadata\n")
md.append(f"- **Log File:** `{log_filename}`\n")
md.append(
f"- **Analysis Date:** {metadata.get('analysis_timestamp', 'Unknown')[:19].replace('T', ' ')}\n"
)
# Overall assessment with colored badge
assessment = metadata.get("overall_assessment", "Unknown")
assessment_badge = {
"NORMAL": "NORMAL",
"SUSPICIOUS": "SUSPICIOUS",
"ABNORMAL": "ABNORMAL",
"CRITICAL": "CRITICAL",
}.get(assessment, assessment)
md.append(f"- **Overall Assessment:** {assessment_badge}\n")
md.append(
f"- **Events Analyzed:** {correlation.get('total_events_analyzed', 0)}\n"
)
md.append(
f"- **MITRE Techniques Retrieved:** {correlation.get('total_techniques_retrieved', 0)}\n"
)
# Mapping confidence with badge
confidence = correlation.get("mapping_confidence", "Unknown")
confidence_badge = {"HIGH": "HIGH", "MEDIUM": "MEDIUM", "LOW": "LOW"}.get(
confidence, confidence
)
md.append(f"- **Mapping Confidence:** {confidence_badge}\n")
md.append("\n---\n")
# Executive Summary
md.append("## Executive Summary\n")
md.append(f"{correlation.get('analysis_summary', 'No summary available')}\n")
md.append("\n---\n")
# Event-to-Technique Mappings
mappings = correlation.get("direct_mappings", [])
if mappings:
md.append("## Threat Analysis - Event to MITRE ATT&CK Mappings\n")
for i, mapping in enumerate(mappings, 1):
event_id = mapping.get("event_id", "Unknown")
event_desc = mapping.get("event_description", "No description")
technique = mapping.get("mitre_technique", "Unknown")
technique_name = mapping.get("technique_name", "Unknown")
tactic = mapping.get("tactic", [])
# Convert tactic list to string for display
if isinstance(tactic, list):
tactic_str = ", ".join(tactic) if tactic else "Unknown"
else:
tactic_str = str(tactic) if tactic else "Unknown"
confidence = mapping.get("confidence_score", 0)
rationale = mapping.get("mapping_rationale", "No rationale provided")
# Confidence badge
if confidence >= 0.8:
confidence_badge = f"HIGH ({confidence:.2f})"
elif confidence >= 0.6:
confidence_badge = f"MEDIUM ({confidence:.2f})"
else:
confidence_badge = f"LOW ({confidence:.2f})"
md.append(f"### {i}. Event ID: {event_id}\n")
md.append(f"**Event Description:** {event_desc}\n\n")
md.append(
f"#### MITRE Technique: [{technique}](https://attack.mitre.org/techniques/{technique.replace('.', '/')}/)\n"
)
md.append(f"- **Technique Name:** {technique_name}\n")
md.append(f"- **Tactic:** {tactic_str}\n")
md.append(f"- **Confidence:** {confidence_badge}\n")
md.append("\n")
md.append(f"**Analysis:**\n")
md.append(f"> {rationale}\n")
md.append("\n")
# Recommendations
recommendations = mapping.get("recommendations", [])
if recommendations:
md.append("**Immediate Actions:**\n")
for j, rec in enumerate(recommendations, 1):
md.append(f"{j}. {rec}\n")
md.append("\n")
md.append("---\n")
# Unmapped Events
unmapped = correlation.get("unmapped_events", [])
if unmapped:
md.append("## Unmapped Events\n")
md.append(
"The following events could not be confidently mapped to MITRE techniques:\n\n"
)
for event_id in unmapped:
md.append(f"- Event ID: `{event_id}`\n")
md.append(
"\n> **Note:** These events may require manual analysis or additional context.\n"
)
md.append("\n---\n")
# Priority Matrix
if mappings:
high_priority = [m for m in mappings if m.get("confidence_score", 0) >= 0.7]
medium_priority = [
m for m in mappings if 0.5 <= m.get("confidence_score", 0) < 0.7
]
low_priority = [m for m in mappings if m.get("confidence_score", 0) < 0.5]
md.append("## Priority Matrix\n")
if high_priority:
md.append("### HIGH PRIORITY (Investigate Immediately)\n")
md.append(
"| Event ID | MITRE Technique | Technique Name | Confidence |\n"
)
md.append(
"|----------|-----------------|----------------|------------|\n"
)
for mapping in high_priority:
event_id = mapping.get("event_id", "Unknown")
technique = mapping.get("mitre_technique", "Unknown")
name = mapping.get("technique_name", "Unknown")
conf = mapping.get("confidence_score", 0)
md.append(f"| {event_id} | {technique} | {name} | {conf:.2f} |\n")
md.append("\n")
if medium_priority:
md.append("### MEDIUM PRIORITY (Monitor and Investigate)\n")
md.append(
"| Event ID | MITRE Technique | Technique Name | Confidence |\n"
)
md.append(
"|----------|-----------------|----------------|------------|\n"
)
for mapping in medium_priority:
event_id = mapping.get("event_id", "Unknown")
technique = mapping.get("mitre_technique", "Unknown")
name = mapping.get("technique_name", "Unknown")
conf = mapping.get("confidence_score", 0)
md.append(f"| {event_id} | {technique} | {name} | {conf:.2f} |\n")
md.append("\n")
if low_priority:
md.append("### LOW PRIORITY (Review as Needed)\n")
md.append(
"| Event ID | MITRE Technique | Technique Name | Confidence |\n"
)
md.append(
"|----------|-----------------|----------------|------------|\n"
)
for mapping in low_priority:
event_id = mapping.get("event_id", "Unknown")
technique = mapping.get("mitre_technique", "Unknown")
name = mapping.get("technique_name", "Unknown")
conf = mapping.get("confidence_score", 0)
md.append(f"| {event_id} | {technique} | {name} | {conf:.2f} |\n")
md.append("\n")
md.append("---\n")
# Strategic Recommendations
overall_recs = correlation.get("overall_recommendations", [])
if overall_recs:
md.append("## Strategic Recommendations\n")
for i, rec in enumerate(overall_recs, 1):
md.append(f"{i}. {rec}\n")
md.append("\n---\n")
# Footer
md.append("## Additional Information\n")
md.append(
"- **Report Format:** This report provides event-to-technique correlation analysis\n"
)
md.append(
"- **Technical Details:** See the accompanying JSON file for complete technical data\n"
)
md.append(
"- **MITRE ATT&CK:** Click technique IDs above to view full details on the MITRE ATT&CK website\n"
)
md.append("\n")
md.append("---\n")
md.append("*Report generated by Cybersecurity Multi-Agent Pipeline*\n")
return "".join(md)
def get_stats(self) -> Dict[str, Any]:
"""Get statistics about the response agent."""
return {
"agent_type": "Response Agent",
"model": (
self.llm.model_name if hasattr(self.llm, "model_name") else "Unknown"
),
"output_directory": str(self.output_dir),
"version": "1.2",
}
# Test function for the Response Agent
def test_response_agent():
"""Test the Response Agent with sample data."""
# Sample log analysis result
sample_log_analysis = {
"overall_assessment": "SUSPICIOUS",
"abnormal_events": [
{
"event_id": "5156",
"event_description": "DNS connection to external IP 64.4.48.201",
"severity": "HIGH",
"indicators": ["dns.exe", "64.4.48.201"],
},
{
"event_id": "10",
"event_description": "Token right adjustment for MORDORDC$",
"severity": "HIGH",
"indicators": ["svchost.exe", "token adjustment"],
},
],
}
# Sample retrieval result (simplified)
sample_retrieval = {
"messages": [
type(
"MockMessage",
(),
{
"content": """{"cybersecurity_intelligence": {
"threat_indicators": [
{
"mitre_attack_techniques": [
{
"technique_id": "T1071.004",
"technique_name": "DNS",
"tactic": "Command and Control"
},
{
"technique_id": "T1134",
"technique_name": "Access Token Manipulation",
"tactic": "Privilege Escalation"
}
]
}
]
}}"""
},
)()
]
}
# Initialize and test the agent
agent = ResponseAgent()
result = agent.analyze_and_map(
sample_log_analysis, sample_retrieval, "test_sample.json"
)
print("\nTest completed!")
print(f"Analysis result keys: {list(result.keys())}")
if __name__ == "__main__":
test_response_agent()