minhan6559's picture
Upload 102 files
9e3d618 verified
from langchain_core.tools import tool
from typing import Dict, Any, List
from datetime import datetime
import re
import json
from .base_tool import Tool
class TimelineBuilderTool(Tool):
"""Build focused timelines around suspicious events by parsing raw logs directly"""
def name(self) -> str:
return "timeline_builder"
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
try:
pivot_entity = input_data.get("pivot_entity", "")
pivot_type = input_data.get("pivot_type", "")
time_window_minutes = int(input_data.get("time_window_minutes", 5))
raw_logs = input_data.get("raw_logs", "")
if not pivot_entity or not pivot_type:
return {"error": "Both pivot_entity and pivot_type are required"}
if not raw_logs:
return {"error": "No raw log data provided"}
# Parse events from raw logs
events = self._parse_raw_logs(raw_logs)
if not events:
return {
"tool": "timeline_builder",
"pivot_entity": pivot_entity,
"pivot_type": pivot_type,
"result": {
"found": False,
"message": "No events could be parsed from logs",
"timeline": []
}
}
# Find pivot events
pivot_events = self._find_pivot_events(events, pivot_entity, pivot_type)
if not pivot_events:
return {
"tool": "timeline_builder",
"pivot_entity": pivot_entity,
"pivot_type": pivot_type,
"result": {
"found": False,
"message": f"No events found for {pivot_type}: {pivot_entity}",
"total_events_parsed": len(events),
"timeline": []
}
}
# Build timeline around first pivot event (limit scope)
pivot_event = pivot_events[0]
timeline = self._build_timeline_around_event(
events, pivot_event, time_window_minutes
)
# Generate summary
summary = self._generate_timeline_summary(
pivot_event, timeline, len(pivot_events)
)
return {
"tool": "timeline_builder",
"pivot_entity": pivot_entity,
"pivot_type": pivot_type,
"time_window_minutes": time_window_minutes,
"result": {
"found": True,
"total_pivot_events": len(pivot_events),
"showing_timeline_for": "first pivot event",
"pivot_event_summary": self._summarize_event(pivot_event),
"timeline": timeline,
"summary": summary
}
}
except Exception as e:
return {"error": f"{type(e).__name__}: {str(e)}"}
def _parse_raw_logs(self, raw_logs: str) -> List[Dict]:
"""Parse events from raw log string"""
events = []
# Try to parse as JSON lines
for line in raw_logs.split('\n'):
line = line.strip()
if not line:
continue
try:
# Try parsing as JSON
event = json.loads(line)
parsed_event = self._extract_event_data(event)
if parsed_event:
events.append(parsed_event)
except json.JSONDecodeError:
# Try parsing as plain text log
parsed_event = self._parse_text_log_line(line)
if parsed_event:
events.append(parsed_event)
return events
def _extract_event_data(self, event: Dict) -> Dict:
"""Extract relevant fields from a JSON event"""
extracted = {
"raw": event,
"timestamp": None,
"event_id": None,
"user": None,
"computer": None,
"process_name": None,
"command_line": None,
"src_ip": None,
"dst_ip": None,
"file_path": None,
"registry_path": None
}
# Extract timestamp (try multiple formats)
for ts_field in ["@timestamp", "timestamp", "TimeCreated", "EventTime"]:
if ts_field in event:
extracted["timestamp"] = self._parse_timestamp(event[ts_field])
break
# Extract Event ID
for id_field in ["EventID", "event_id", "event.code", "winlog.event_id"]:
if id_field in event:
extracted["event_id"] = str(event[id_field])
break
# Extract user
for user_field in ["User", "user", "user.name", "winlog.user.name", "SubjectUserName"]:
if user_field in event:
extracted["user"] = str(event[user_field])
break
# Extract computer/host
for comp_field in ["Computer", "computer", "host.name", "winlog.computer_name"]:
if comp_field in event:
extracted["computer"] = str(event[comp_field])
break
# Extract process info
for proc_field in ["Image", "process.name", "NewProcessName", "ProcessName"]:
if proc_field in event:
extracted["process_name"] = str(event[proc_field])
break
for cmd_field in ["CommandLine", "command_line", "process.command_line"]:
if cmd_field in event:
extracted["command_line"] = str(event[cmd_field])
break
# Extract network info
for src_field in ["SourceIp", "src_ip", "source.ip"]:
if src_field in event:
extracted["src_ip"] = str(event[src_field])
break
for dst_field in ["DestinationIp", "dst_ip", "destination.ip"]:
if dst_field in event:
extracted["dst_ip"] = str(event[dst_field])
break
# Extract file info
for file_field in ["TargetFilename", "file.path", "ObjectName"]:
if file_field in event:
extracted["file_path"] = str(event[file_field])
break
# Extract registry info
for reg_field in ["TargetObject", "registry.path"]:
if reg_field in event:
extracted["registry_path"] = str(event[reg_field])
break
return extracted
def _parse_text_log_line(self, line: str) -> Dict:
"""Parse a plain text log line"""
extracted = {
"raw": {"text": line},
"timestamp": None,
"event_id": None,
"user": None,
"computer": None,
"process_name": None,
"command_line": None,
"src_ip": None,
"dst_ip": None,
"file_path": None,
"registry_path": None
}
# Try to extract timestamp
ts_match = re.search(r'\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}', line)
if ts_match:
extracted["timestamp"] = self._parse_timestamp(ts_match.group(0))
# Try to extract Event ID
event_id_match = re.search(r'EventID[:\s=]+(\d+)', line, re.IGNORECASE)
if event_id_match:
extracted["event_id"] = event_id_match.group(1)
# Try to extract IP addresses
ip_matches = re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', line)
if ip_matches:
extracted["src_ip"] = ip_matches[0]
if len(ip_matches) > 1:
extracted["dst_ip"] = ip_matches[1]
return extracted
def _parse_timestamp(self, ts_value) -> datetime:
"""Parse timestamp from various formats"""
if isinstance(ts_value, datetime):
return ts_value
if isinstance(ts_value, (int, float)):
return datetime.fromtimestamp(ts_value)
ts_str = str(ts_value)
# Try common formats
formats = [
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%d %H:%M:%S.%f",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S",
]
for fmt in formats:
try:
return datetime.strptime(ts_str, fmt)
except ValueError:
continue
return None
def _find_pivot_events(self, events: List[Dict], pivot_entity: str, pivot_type: str) -> List[Dict]:
"""Find events that match the pivot criteria"""
pivot_events = []
pivot_entity_lower = pivot_entity.lower()
for event in events:
if self._event_matches_pivot(event, pivot_entity_lower, pivot_type.lower()):
pivot_events.append(event)
return pivot_events
def _event_matches_pivot(self, event: Dict, pivot_entity: str, pivot_type: str) -> bool:
"""Check if an event matches the pivot criteria"""
if pivot_type == "user":
user = (event.get('user') or '').lower()
return pivot_entity in user
elif pivot_type == "process":
process_name = (event.get('process_name') or '').lower()
command_line = (event.get('command_line') or '').lower()
return pivot_entity in process_name or pivot_entity in command_line
elif pivot_type == "ip":
src_ip = (event.get('src_ip') or '').lower()
dst_ip = (event.get('dst_ip') or '').lower()
return pivot_entity in src_ip or pivot_entity in dst_ip
elif pivot_type == "file":
file_path = (event.get('file_path') or '').lower()
return pivot_entity in file_path
elif pivot_type == "computer":
computer = (event.get('computer') or '').lower()
return pivot_entity in computer
elif pivot_type == "event_id":
event_id = str(event.get('event_id') or '').lower()
return pivot_entity in event_id
elif pivot_type == "registry":
registry_path = (event.get('registry_path') or '').lower()
return pivot_entity in registry_path
return False
def _build_timeline_around_event(self, events: List[Dict], pivot_event: Dict, time_window_minutes: int) -> List[Dict]:
"""Build a timeline of events around a pivot event"""
pivot_timestamp = pivot_event.get('timestamp')
# If no timestamp, just show events around the pivot in sequence
if not pivot_timestamp:
pivot_idx = events.index(pivot_event)
start_idx = max(0, pivot_idx - 10)
end_idx = min(len(events), pivot_idx + 11)
timeline = []
for i, event in enumerate(events[start_idx:end_idx]):
timeline.append({
"sequence_position": i,
"is_pivot": event == pivot_event,
"event_summary": self._summarize_event(event)
})
return timeline
# Build timeline based on timestamps
from datetime import timedelta
start_time = pivot_timestamp - timedelta(minutes=time_window_minutes)
end_time = pivot_timestamp + timedelta(minutes=time_window_minutes)
timeline = []
for event in events:
event_timestamp = event.get('timestamp')
if event_timestamp and start_time <= event_timestamp <= end_time:
offset_seconds = (event_timestamp - pivot_timestamp).total_seconds()
timeline.append({
"timestamp": event_timestamp.isoformat(),
"offset_seconds": offset_seconds,
"offset_human": self._format_time_offset(offset_seconds),
"is_pivot": event == pivot_event,
"event_summary": self._summarize_event(event)
})
# Sort by timestamp
timeline.sort(key=lambda x: x.get('timestamp', ''))
return timeline
def _format_time_offset(self, offset_seconds: float) -> str:
"""Format time offset in human readable form"""
if offset_seconds == 0:
return "PIVOT EVENT"
elif offset_seconds < 0:
return f"{abs(offset_seconds):.1f}s before"
else:
return f"{offset_seconds:.1f}s after"
def _summarize_event(self, event: Dict) -> str:
"""Create a human-readable summary of an event"""
parts = []
if event.get('event_id'):
parts.append(f"EventID {event['event_id']}")
if event.get('user'):
parts.append(f"User: {event['user']}")
if event.get('process_name'):
parts.append(f"Process: {event['process_name']}")
if event.get('command_line'):
cmd = event['command_line']
if len(cmd) > 60:
cmd = cmd[:57] + "..."
parts.append(f"CMD: {cmd}")
if event.get('src_ip') or event.get('dst_ip'):
if event.get('src_ip') and event.get('dst_ip'):
parts.append(f"Network: {event['src_ip']}{event['dst_ip']}")
elif event.get('src_ip'):
parts.append(f"SrcIP: {event['src_ip']}")
elif event.get('dst_ip'):
parts.append(f"DstIP: {event['dst_ip']}")
if event.get('file_path'):
parts.append(f"File: {event['file_path']}")
if event.get('registry_path'):
parts.append(f"Registry: {event['registry_path']}")
return " | ".join(parts) if parts else "No details available"
def _generate_timeline_summary(self, pivot_event: Dict, timeline: List[Dict], total_pivot_count: int) -> str:
"""Generate a human-readable summary of the timeline"""
summary_parts = []
summary_parts.append(f"Found {total_pivot_count} matching event(s).")
summary_parts.append(f"Timeline shows {len(timeline)} events around the first match.")
# Count events before/after pivot
before = sum(1 for e in timeline if e.get('offset_seconds', 1) < 0)
after = sum(1 for e in timeline if e.get('offset_seconds', 1) > 0)
if before or after:
summary_parts.append(f"{before} events before, {after} events after pivot.")
# Identify interesting patterns
event_ids = [e['event_summary'] for e in timeline if 'EventID' in e.get('event_summary', '')]
if len(set(event_ids)) < len(event_ids):
summary_parts.append("Multiple similar events detected in sequence.")
return " ".join(summary_parts)
# Create singleton instance
_timeline_builder_tool = TimelineBuilderTool()
@tool
def timeline_builder(pivot_entity: str, pivot_type: str, time_window_minutes: int = 5, raw_logs: str = None) -> dict:
"""Build a focused timeline around suspicious events to understand attack sequences.
Use this when you suspect coordinated activity or want to understand what happened
before and after a suspicious event. Analyzes the sequence of events to identify patterns.
Args:
pivot_entity: The entity to build timeline around (e.g., "powershell.exe", "admin", "192.168.1.100")
pivot_type: Type of entity - "user", "process", "ip", "file", "computer", "event_id", or "registry"
time_window_minutes: Minutes before and after pivot events to include (default: 5)
raw_logs: The raw log data (automatically provided by agent)
Returns:
Timeline analysis showing events before and after the pivot, helping identify attack sequences.
"""
return _timeline_builder_tool.run({
"pivot_entity": pivot_entity,
"pivot_type": pivot_type,
"time_window_minutes": time_window_minutes,
"raw_logs": raw_logs
})