|
|
from langchain_core.tools import tool
|
|
|
from typing import Dict, Any, List
|
|
|
from datetime import datetime
|
|
|
import re
|
|
|
import json
|
|
|
from .base_tool import Tool
|
|
|
|
|
|
class TimelineBuilderTool(Tool):
|
|
|
"""Build focused timelines around suspicious events by parsing raw logs directly"""
|
|
|
|
|
|
def name(self) -> str:
|
|
|
return "timeline_builder"
|
|
|
|
|
|
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
try:
|
|
|
pivot_entity = input_data.get("pivot_entity", "")
|
|
|
pivot_type = input_data.get("pivot_type", "")
|
|
|
time_window_minutes = int(input_data.get("time_window_minutes", 5))
|
|
|
raw_logs = input_data.get("raw_logs", "")
|
|
|
|
|
|
if not pivot_entity or not pivot_type:
|
|
|
return {"error": "Both pivot_entity and pivot_type are required"}
|
|
|
|
|
|
if not raw_logs:
|
|
|
return {"error": "No raw log data provided"}
|
|
|
|
|
|
|
|
|
events = self._parse_raw_logs(raw_logs)
|
|
|
|
|
|
if not events:
|
|
|
return {
|
|
|
"tool": "timeline_builder",
|
|
|
"pivot_entity": pivot_entity,
|
|
|
"pivot_type": pivot_type,
|
|
|
"result": {
|
|
|
"found": False,
|
|
|
"message": "No events could be parsed from logs",
|
|
|
"timeline": []
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
pivot_events = self._find_pivot_events(events, pivot_entity, pivot_type)
|
|
|
|
|
|
if not pivot_events:
|
|
|
return {
|
|
|
"tool": "timeline_builder",
|
|
|
"pivot_entity": pivot_entity,
|
|
|
"pivot_type": pivot_type,
|
|
|
"result": {
|
|
|
"found": False,
|
|
|
"message": f"No events found for {pivot_type}: {pivot_entity}",
|
|
|
"total_events_parsed": len(events),
|
|
|
"timeline": []
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
pivot_event = pivot_events[0]
|
|
|
timeline = self._build_timeline_around_event(
|
|
|
events, pivot_event, time_window_minutes
|
|
|
)
|
|
|
|
|
|
|
|
|
summary = self._generate_timeline_summary(
|
|
|
pivot_event, timeline, len(pivot_events)
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"tool": "timeline_builder",
|
|
|
"pivot_entity": pivot_entity,
|
|
|
"pivot_type": pivot_type,
|
|
|
"time_window_minutes": time_window_minutes,
|
|
|
"result": {
|
|
|
"found": True,
|
|
|
"total_pivot_events": len(pivot_events),
|
|
|
"showing_timeline_for": "first pivot event",
|
|
|
"pivot_event_summary": self._summarize_event(pivot_event),
|
|
|
"timeline": timeline,
|
|
|
"summary": summary
|
|
|
}
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {"error": f"{type(e).__name__}: {str(e)}"}
|
|
|
|
|
|
def _parse_raw_logs(self, raw_logs: str) -> List[Dict]:
|
|
|
"""Parse events from raw log string"""
|
|
|
events = []
|
|
|
|
|
|
|
|
|
for line in raw_logs.split('\n'):
|
|
|
line = line.strip()
|
|
|
if not line:
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
|
|
|
event = json.loads(line)
|
|
|
parsed_event = self._extract_event_data(event)
|
|
|
if parsed_event:
|
|
|
events.append(parsed_event)
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
|
parsed_event = self._parse_text_log_line(line)
|
|
|
if parsed_event:
|
|
|
events.append(parsed_event)
|
|
|
|
|
|
return events
|
|
|
|
|
|
def _extract_event_data(self, event: Dict) -> Dict:
|
|
|
"""Extract relevant fields from a JSON event"""
|
|
|
extracted = {
|
|
|
"raw": event,
|
|
|
"timestamp": None,
|
|
|
"event_id": None,
|
|
|
"user": None,
|
|
|
"computer": None,
|
|
|
"process_name": None,
|
|
|
"command_line": None,
|
|
|
"src_ip": None,
|
|
|
"dst_ip": None,
|
|
|
"file_path": None,
|
|
|
"registry_path": None
|
|
|
}
|
|
|
|
|
|
|
|
|
for ts_field in ["@timestamp", "timestamp", "TimeCreated", "EventTime"]:
|
|
|
if ts_field in event:
|
|
|
extracted["timestamp"] = self._parse_timestamp(event[ts_field])
|
|
|
break
|
|
|
|
|
|
|
|
|
for id_field in ["EventID", "event_id", "event.code", "winlog.event_id"]:
|
|
|
if id_field in event:
|
|
|
extracted["event_id"] = str(event[id_field])
|
|
|
break
|
|
|
|
|
|
|
|
|
for user_field in ["User", "user", "user.name", "winlog.user.name", "SubjectUserName"]:
|
|
|
if user_field in event:
|
|
|
extracted["user"] = str(event[user_field])
|
|
|
break
|
|
|
|
|
|
|
|
|
for comp_field in ["Computer", "computer", "host.name", "winlog.computer_name"]:
|
|
|
if comp_field in event:
|
|
|
extracted["computer"] = str(event[comp_field])
|
|
|
break
|
|
|
|
|
|
|
|
|
for proc_field in ["Image", "process.name", "NewProcessName", "ProcessName"]:
|
|
|
if proc_field in event:
|
|
|
extracted["process_name"] = str(event[proc_field])
|
|
|
break
|
|
|
|
|
|
for cmd_field in ["CommandLine", "command_line", "process.command_line"]:
|
|
|
if cmd_field in event:
|
|
|
extracted["command_line"] = str(event[cmd_field])
|
|
|
break
|
|
|
|
|
|
|
|
|
for src_field in ["SourceIp", "src_ip", "source.ip"]:
|
|
|
if src_field in event:
|
|
|
extracted["src_ip"] = str(event[src_field])
|
|
|
break
|
|
|
|
|
|
for dst_field in ["DestinationIp", "dst_ip", "destination.ip"]:
|
|
|
if dst_field in event:
|
|
|
extracted["dst_ip"] = str(event[dst_field])
|
|
|
break
|
|
|
|
|
|
|
|
|
for file_field in ["TargetFilename", "file.path", "ObjectName"]:
|
|
|
if file_field in event:
|
|
|
extracted["file_path"] = str(event[file_field])
|
|
|
break
|
|
|
|
|
|
|
|
|
for reg_field in ["TargetObject", "registry.path"]:
|
|
|
if reg_field in event:
|
|
|
extracted["registry_path"] = str(event[reg_field])
|
|
|
break
|
|
|
|
|
|
return extracted
|
|
|
|
|
|
def _parse_text_log_line(self, line: str) -> Dict:
|
|
|
"""Parse a plain text log line"""
|
|
|
extracted = {
|
|
|
"raw": {"text": line},
|
|
|
"timestamp": None,
|
|
|
"event_id": None,
|
|
|
"user": None,
|
|
|
"computer": None,
|
|
|
"process_name": None,
|
|
|
"command_line": None,
|
|
|
"src_ip": None,
|
|
|
"dst_ip": None,
|
|
|
"file_path": None,
|
|
|
"registry_path": None
|
|
|
}
|
|
|
|
|
|
|
|
|
ts_match = re.search(r'\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}', line)
|
|
|
if ts_match:
|
|
|
extracted["timestamp"] = self._parse_timestamp(ts_match.group(0))
|
|
|
|
|
|
|
|
|
event_id_match = re.search(r'EventID[:\s=]+(\d+)', line, re.IGNORECASE)
|
|
|
if event_id_match:
|
|
|
extracted["event_id"] = event_id_match.group(1)
|
|
|
|
|
|
|
|
|
ip_matches = re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', line)
|
|
|
if ip_matches:
|
|
|
extracted["src_ip"] = ip_matches[0]
|
|
|
if len(ip_matches) > 1:
|
|
|
extracted["dst_ip"] = ip_matches[1]
|
|
|
|
|
|
return extracted
|
|
|
|
|
|
def _parse_timestamp(self, ts_value) -> datetime:
|
|
|
"""Parse timestamp from various formats"""
|
|
|
if isinstance(ts_value, datetime):
|
|
|
return ts_value
|
|
|
|
|
|
if isinstance(ts_value, (int, float)):
|
|
|
return datetime.fromtimestamp(ts_value)
|
|
|
|
|
|
ts_str = str(ts_value)
|
|
|
|
|
|
|
|
|
formats = [
|
|
|
"%Y-%m-%dT%H:%M:%S.%fZ",
|
|
|
"%Y-%m-%dT%H:%M:%SZ",
|
|
|
"%Y-%m-%d %H:%M:%S.%f",
|
|
|
"%Y-%m-%d %H:%M:%S",
|
|
|
"%Y-%m-%dT%H:%M:%S",
|
|
|
]
|
|
|
|
|
|
for fmt in formats:
|
|
|
try:
|
|
|
return datetime.strptime(ts_str, fmt)
|
|
|
except ValueError:
|
|
|
continue
|
|
|
|
|
|
return None
|
|
|
|
|
|
def _find_pivot_events(self, events: List[Dict], pivot_entity: str, pivot_type: str) -> List[Dict]:
|
|
|
"""Find events that match the pivot criteria"""
|
|
|
pivot_events = []
|
|
|
pivot_entity_lower = pivot_entity.lower()
|
|
|
|
|
|
for event in events:
|
|
|
if self._event_matches_pivot(event, pivot_entity_lower, pivot_type.lower()):
|
|
|
pivot_events.append(event)
|
|
|
|
|
|
return pivot_events
|
|
|
|
|
|
def _event_matches_pivot(self, event: Dict, pivot_entity: str, pivot_type: str) -> bool:
|
|
|
"""Check if an event matches the pivot criteria"""
|
|
|
if pivot_type == "user":
|
|
|
user = (event.get('user') or '').lower()
|
|
|
return pivot_entity in user
|
|
|
|
|
|
elif pivot_type == "process":
|
|
|
process_name = (event.get('process_name') or '').lower()
|
|
|
command_line = (event.get('command_line') or '').lower()
|
|
|
return pivot_entity in process_name or pivot_entity in command_line
|
|
|
|
|
|
elif pivot_type == "ip":
|
|
|
src_ip = (event.get('src_ip') or '').lower()
|
|
|
dst_ip = (event.get('dst_ip') or '').lower()
|
|
|
return pivot_entity in src_ip or pivot_entity in dst_ip
|
|
|
|
|
|
elif pivot_type == "file":
|
|
|
file_path = (event.get('file_path') or '').lower()
|
|
|
return pivot_entity in file_path
|
|
|
|
|
|
elif pivot_type == "computer":
|
|
|
computer = (event.get('computer') or '').lower()
|
|
|
return pivot_entity in computer
|
|
|
|
|
|
elif pivot_type == "event_id":
|
|
|
event_id = str(event.get('event_id') or '').lower()
|
|
|
return pivot_entity in event_id
|
|
|
|
|
|
elif pivot_type == "registry":
|
|
|
registry_path = (event.get('registry_path') or '').lower()
|
|
|
return pivot_entity in registry_path
|
|
|
|
|
|
return False
|
|
|
|
|
|
def _build_timeline_around_event(self, events: List[Dict], pivot_event: Dict, time_window_minutes: int) -> List[Dict]:
|
|
|
"""Build a timeline of events around a pivot event"""
|
|
|
pivot_timestamp = pivot_event.get('timestamp')
|
|
|
|
|
|
|
|
|
if not pivot_timestamp:
|
|
|
pivot_idx = events.index(pivot_event)
|
|
|
start_idx = max(0, pivot_idx - 10)
|
|
|
end_idx = min(len(events), pivot_idx + 11)
|
|
|
|
|
|
timeline = []
|
|
|
for i, event in enumerate(events[start_idx:end_idx]):
|
|
|
timeline.append({
|
|
|
"sequence_position": i,
|
|
|
"is_pivot": event == pivot_event,
|
|
|
"event_summary": self._summarize_event(event)
|
|
|
})
|
|
|
return timeline
|
|
|
|
|
|
|
|
|
from datetime import timedelta
|
|
|
start_time = pivot_timestamp - timedelta(minutes=time_window_minutes)
|
|
|
end_time = pivot_timestamp + timedelta(minutes=time_window_minutes)
|
|
|
|
|
|
timeline = []
|
|
|
for event in events:
|
|
|
event_timestamp = event.get('timestamp')
|
|
|
if event_timestamp and start_time <= event_timestamp <= end_time:
|
|
|
offset_seconds = (event_timestamp - pivot_timestamp).total_seconds()
|
|
|
timeline.append({
|
|
|
"timestamp": event_timestamp.isoformat(),
|
|
|
"offset_seconds": offset_seconds,
|
|
|
"offset_human": self._format_time_offset(offset_seconds),
|
|
|
"is_pivot": event == pivot_event,
|
|
|
"event_summary": self._summarize_event(event)
|
|
|
})
|
|
|
|
|
|
|
|
|
timeline.sort(key=lambda x: x.get('timestamp', ''))
|
|
|
|
|
|
return timeline
|
|
|
|
|
|
def _format_time_offset(self, offset_seconds: float) -> str:
|
|
|
"""Format time offset in human readable form"""
|
|
|
if offset_seconds == 0:
|
|
|
return "PIVOT EVENT"
|
|
|
elif offset_seconds < 0:
|
|
|
return f"{abs(offset_seconds):.1f}s before"
|
|
|
else:
|
|
|
return f"{offset_seconds:.1f}s after"
|
|
|
|
|
|
def _summarize_event(self, event: Dict) -> str:
|
|
|
"""Create a human-readable summary of an event"""
|
|
|
parts = []
|
|
|
|
|
|
if event.get('event_id'):
|
|
|
parts.append(f"EventID {event['event_id']}")
|
|
|
|
|
|
if event.get('user'):
|
|
|
parts.append(f"User: {event['user']}")
|
|
|
|
|
|
if event.get('process_name'):
|
|
|
parts.append(f"Process: {event['process_name']}")
|
|
|
|
|
|
if event.get('command_line'):
|
|
|
cmd = event['command_line']
|
|
|
if len(cmd) > 60:
|
|
|
cmd = cmd[:57] + "..."
|
|
|
parts.append(f"CMD: {cmd}")
|
|
|
|
|
|
if event.get('src_ip') or event.get('dst_ip'):
|
|
|
if event.get('src_ip') and event.get('dst_ip'):
|
|
|
parts.append(f"Network: {event['src_ip']} → {event['dst_ip']}")
|
|
|
elif event.get('src_ip'):
|
|
|
parts.append(f"SrcIP: {event['src_ip']}")
|
|
|
elif event.get('dst_ip'):
|
|
|
parts.append(f"DstIP: {event['dst_ip']}")
|
|
|
|
|
|
if event.get('file_path'):
|
|
|
parts.append(f"File: {event['file_path']}")
|
|
|
|
|
|
if event.get('registry_path'):
|
|
|
parts.append(f"Registry: {event['registry_path']}")
|
|
|
|
|
|
return " | ".join(parts) if parts else "No details available"
|
|
|
|
|
|
def _generate_timeline_summary(self, pivot_event: Dict, timeline: List[Dict], total_pivot_count: int) -> str:
|
|
|
"""Generate a human-readable summary of the timeline"""
|
|
|
summary_parts = []
|
|
|
|
|
|
summary_parts.append(f"Found {total_pivot_count} matching event(s).")
|
|
|
summary_parts.append(f"Timeline shows {len(timeline)} events around the first match.")
|
|
|
|
|
|
|
|
|
before = sum(1 for e in timeline if e.get('offset_seconds', 1) < 0)
|
|
|
after = sum(1 for e in timeline if e.get('offset_seconds', 1) > 0)
|
|
|
|
|
|
if before or after:
|
|
|
summary_parts.append(f"{before} events before, {after} events after pivot.")
|
|
|
|
|
|
|
|
|
event_ids = [e['event_summary'] for e in timeline if 'EventID' in e.get('event_summary', '')]
|
|
|
if len(set(event_ids)) < len(event_ids):
|
|
|
summary_parts.append("Multiple similar events detected in sequence.")
|
|
|
|
|
|
return " ".join(summary_parts)
|
|
|
|
|
|
|
|
|
|
|
|
_timeline_builder_tool = TimelineBuilderTool()
|
|
|
|
|
|
@tool
|
|
|
def timeline_builder(pivot_entity: str, pivot_type: str, time_window_minutes: int = 5, raw_logs: str = None) -> dict:
|
|
|
"""Build a focused timeline around suspicious events to understand attack sequences.
|
|
|
|
|
|
Use this when you suspect coordinated activity or want to understand what happened
|
|
|
before and after a suspicious event. Analyzes the sequence of events to identify patterns.
|
|
|
|
|
|
Args:
|
|
|
pivot_entity: The entity to build timeline around (e.g., "powershell.exe", "admin", "192.168.1.100")
|
|
|
pivot_type: Type of entity - "user", "process", "ip", "file", "computer", "event_id", or "registry"
|
|
|
time_window_minutes: Minutes before and after pivot events to include (default: 5)
|
|
|
raw_logs: The raw log data (automatically provided by agent)
|
|
|
|
|
|
Returns:
|
|
|
Timeline analysis showing events before and after the pivot, helping identify attack sequences.
|
|
|
"""
|
|
|
return _timeline_builder_tool.run({
|
|
|
"pivot_entity": pivot_entity,
|
|
|
"pivot_type": pivot_type,
|
|
|
"time_window_minutes": time_window_minutes,
|
|
|
"raw_logs": raw_logs
|
|
|
}) |