Spaces:
Sleeping
Sleeping
| from langchain_core.tools import tool | |
| from typing import Dict, Any, List | |
| from datetime import datetime | |
| import re | |
| import json | |
| from .base_tool import Tool | |
| class TimelineBuilderTool(Tool): | |
| """Build focused timelines around suspicious events by parsing raw logs directly""" | |
| def name(self) -> str: | |
| return "timeline_builder" | |
| def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: | |
| try: | |
| pivot_entity = input_data.get("pivot_entity", "") | |
| pivot_type = input_data.get("pivot_type", "") | |
| time_window_minutes = int(input_data.get("time_window_minutes", 5)) | |
| raw_logs = input_data.get("raw_logs", "") | |
| if not pivot_entity or not pivot_type: | |
| return {"error": "Both pivot_entity and pivot_type are required"} | |
| if not raw_logs: | |
| return {"error": "No raw log data provided"} | |
| # Parse events from raw logs | |
| events = self._parse_raw_logs(raw_logs) | |
| if not events: | |
| return { | |
| "tool": "timeline_builder", | |
| "pivot_entity": pivot_entity, | |
| "pivot_type": pivot_type, | |
| "result": { | |
| "found": False, | |
| "message": "No events could be parsed from logs", | |
| "timeline": [] | |
| } | |
| } | |
| # Find pivot events | |
| pivot_events = self._find_pivot_events(events, pivot_entity, pivot_type) | |
| if not pivot_events: | |
| return { | |
| "tool": "timeline_builder", | |
| "pivot_entity": pivot_entity, | |
| "pivot_type": pivot_type, | |
| "result": { | |
| "found": False, | |
| "message": f"No events found for {pivot_type}: {pivot_entity}", | |
| "total_events_parsed": len(events), | |
| "timeline": [] | |
| } | |
| } | |
| # Build timeline around first pivot event (limit scope) | |
| pivot_event = pivot_events[0] | |
| timeline = self._build_timeline_around_event( | |
| events, pivot_event, time_window_minutes | |
| ) | |
| # Generate summary | |
| summary = self._generate_timeline_summary( | |
| pivot_event, timeline, len(pivot_events) | |
| ) | |
| return { | |
| "tool": "timeline_builder", | |
| "pivot_entity": pivot_entity, | |
| "pivot_type": pivot_type, | |
| "time_window_minutes": time_window_minutes, | |
| "result": { | |
| "found": True, | |
| "total_pivot_events": len(pivot_events), | |
| "showing_timeline_for": "first pivot event", | |
| "pivot_event_summary": self._summarize_event(pivot_event), | |
| "timeline": timeline, | |
| "summary": summary | |
| } | |
| } | |
| except Exception as e: | |
| return {"error": f"{type(e).__name__}: {str(e)}"} | |
| def _parse_raw_logs(self, raw_logs: str) -> List[Dict]: | |
| """Parse events from raw log string""" | |
| events = [] | |
| # Try to parse as JSON lines | |
| for line in raw_logs.split('\n'): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| # Try parsing as JSON | |
| event = json.loads(line) | |
| parsed_event = self._extract_event_data(event) | |
| if parsed_event: | |
| events.append(parsed_event) | |
| except json.JSONDecodeError: | |
| # Try parsing as plain text log | |
| parsed_event = self._parse_text_log_line(line) | |
| if parsed_event: | |
| events.append(parsed_event) | |
| return events | |
| def _extract_event_data(self, event: Dict) -> Dict: | |
| """Extract relevant fields from a JSON event""" | |
| extracted = { | |
| "raw": event, | |
| "timestamp": None, | |
| "event_id": None, | |
| "user": None, | |
| "computer": None, | |
| "process_name": None, | |
| "command_line": None, | |
| "src_ip": None, | |
| "dst_ip": None, | |
| "file_path": None, | |
| "registry_path": None | |
| } | |
| # Extract timestamp (try multiple formats) | |
| for ts_field in ["@timestamp", "timestamp", "TimeCreated", "EventTime"]: | |
| if ts_field in event: | |
| extracted["timestamp"] = self._parse_timestamp(event[ts_field]) | |
| break | |
| # Extract Event ID | |
| for id_field in ["EventID", "event_id", "event.code", "winlog.event_id"]: | |
| if id_field in event: | |
| extracted["event_id"] = str(event[id_field]) | |
| break | |
| # Extract user | |
| for user_field in ["User", "user", "user.name", "winlog.user.name", "SubjectUserName"]: | |
| if user_field in event: | |
| extracted["user"] = str(event[user_field]) | |
| break | |
| # Extract computer/host | |
| for comp_field in ["Computer", "computer", "host.name", "winlog.computer_name"]: | |
| if comp_field in event: | |
| extracted["computer"] = str(event[comp_field]) | |
| break | |
| # Extract process info | |
| for proc_field in ["Image", "process.name", "NewProcessName", "ProcessName"]: | |
| if proc_field in event: | |
| extracted["process_name"] = str(event[proc_field]) | |
| break | |
| for cmd_field in ["CommandLine", "command_line", "process.command_line"]: | |
| if cmd_field in event: | |
| extracted["command_line"] = str(event[cmd_field]) | |
| break | |
| # Extract network info | |
| for src_field in ["SourceIp", "src_ip", "source.ip"]: | |
| if src_field in event: | |
| extracted["src_ip"] = str(event[src_field]) | |
| break | |
| for dst_field in ["DestinationIp", "dst_ip", "destination.ip"]: | |
| if dst_field in event: | |
| extracted["dst_ip"] = str(event[dst_field]) | |
| break | |
| # Extract file info | |
| for file_field in ["TargetFilename", "file.path", "ObjectName"]: | |
| if file_field in event: | |
| extracted["file_path"] = str(event[file_field]) | |
| break | |
| # Extract registry info | |
| for reg_field in ["TargetObject", "registry.path"]: | |
| if reg_field in event: | |
| extracted["registry_path"] = str(event[reg_field]) | |
| break | |
| return extracted | |
| def _parse_text_log_line(self, line: str) -> Dict: | |
| """Parse a plain text log line""" | |
| extracted = { | |
| "raw": {"text": line}, | |
| "timestamp": None, | |
| "event_id": None, | |
| "user": None, | |
| "computer": None, | |
| "process_name": None, | |
| "command_line": None, | |
| "src_ip": None, | |
| "dst_ip": None, | |
| "file_path": None, | |
| "registry_path": None | |
| } | |
| # Try to extract timestamp | |
| ts_match = re.search(r'\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}', line) | |
| if ts_match: | |
| extracted["timestamp"] = self._parse_timestamp(ts_match.group(0)) | |
| # Try to extract Event ID | |
| event_id_match = re.search(r'EventID[:\s=]+(\d+)', line, re.IGNORECASE) | |
| if event_id_match: | |
| extracted["event_id"] = event_id_match.group(1) | |
| # Try to extract IP addresses | |
| ip_matches = re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', line) | |
| if ip_matches: | |
| extracted["src_ip"] = ip_matches[0] | |
| if len(ip_matches) > 1: | |
| extracted["dst_ip"] = ip_matches[1] | |
| return extracted | |
| def _parse_timestamp(self, ts_value) -> datetime: | |
| """Parse timestamp from various formats""" | |
| if isinstance(ts_value, datetime): | |
| return ts_value | |
| if isinstance(ts_value, (int, float)): | |
| return datetime.fromtimestamp(ts_value) | |
| ts_str = str(ts_value) | |
| # Try common formats | |
| formats = [ | |
| "%Y-%m-%dT%H:%M:%S.%fZ", | |
| "%Y-%m-%dT%H:%M:%SZ", | |
| "%Y-%m-%d %H:%M:%S.%f", | |
| "%Y-%m-%d %H:%M:%S", | |
| "%Y-%m-%dT%H:%M:%S", | |
| ] | |
| for fmt in formats: | |
| try: | |
| return datetime.strptime(ts_str, fmt) | |
| except ValueError: | |
| continue | |
| return None | |
| def _find_pivot_events(self, events: List[Dict], pivot_entity: str, pivot_type: str) -> List[Dict]: | |
| """Find events that match the pivot criteria""" | |
| pivot_events = [] | |
| pivot_entity_lower = pivot_entity.lower() | |
| for event in events: | |
| if self._event_matches_pivot(event, pivot_entity_lower, pivot_type.lower()): | |
| pivot_events.append(event) | |
| return pivot_events | |
| def _event_matches_pivot(self, event: Dict, pivot_entity: str, pivot_type: str) -> bool: | |
| """Check if an event matches the pivot criteria""" | |
| if pivot_type == "user": | |
| user = (event.get('user') or '').lower() | |
| return pivot_entity in user | |
| elif pivot_type == "process": | |
| process_name = (event.get('process_name') or '').lower() | |
| command_line = (event.get('command_line') or '').lower() | |
| return pivot_entity in process_name or pivot_entity in command_line | |
| elif pivot_type == "ip": | |
| src_ip = (event.get('src_ip') or '').lower() | |
| dst_ip = (event.get('dst_ip') or '').lower() | |
| return pivot_entity in src_ip or pivot_entity in dst_ip | |
| elif pivot_type == "file": | |
| file_path = (event.get('file_path') or '').lower() | |
| return pivot_entity in file_path | |
| elif pivot_type == "computer": | |
| computer = (event.get('computer') or '').lower() | |
| return pivot_entity in computer | |
| elif pivot_type == "event_id": | |
| event_id = str(event.get('event_id') or '').lower() | |
| return pivot_entity in event_id | |
| elif pivot_type == "registry": | |
| registry_path = (event.get('registry_path') or '').lower() | |
| return pivot_entity in registry_path | |
| return False | |
| def _build_timeline_around_event(self, events: List[Dict], pivot_event: Dict, time_window_minutes: int) -> List[Dict]: | |
| """Build a timeline of events around a pivot event""" | |
| pivot_timestamp = pivot_event.get('timestamp') | |
| # If no timestamp, just show events around the pivot in sequence | |
| if not pivot_timestamp: | |
| pivot_idx = events.index(pivot_event) | |
| start_idx = max(0, pivot_idx - 10) | |
| end_idx = min(len(events), pivot_idx + 11) | |
| timeline = [] | |
| for i, event in enumerate(events[start_idx:end_idx]): | |
| timeline.append({ | |
| "sequence_position": i, | |
| "is_pivot": event == pivot_event, | |
| "event_summary": self._summarize_event(event) | |
| }) | |
| return timeline | |
| # Build timeline based on timestamps | |
| from datetime import timedelta | |
| start_time = pivot_timestamp - timedelta(minutes=time_window_minutes) | |
| end_time = pivot_timestamp + timedelta(minutes=time_window_minutes) | |
| timeline = [] | |
| for event in events: | |
| event_timestamp = event.get('timestamp') | |
| if event_timestamp and start_time <= event_timestamp <= end_time: | |
| offset_seconds = (event_timestamp - pivot_timestamp).total_seconds() | |
| timeline.append({ | |
| "timestamp": event_timestamp.isoformat(), | |
| "offset_seconds": offset_seconds, | |
| "offset_human": self._format_time_offset(offset_seconds), | |
| "is_pivot": event == pivot_event, | |
| "event_summary": self._summarize_event(event) | |
| }) | |
| # Sort by timestamp | |
| timeline.sort(key=lambda x: x.get('timestamp', '')) | |
| return timeline | |
| def _format_time_offset(self, offset_seconds: float) -> str: | |
| """Format time offset in human readable form""" | |
| if offset_seconds == 0: | |
| return "PIVOT EVENT" | |
| elif offset_seconds < 0: | |
| return f"{abs(offset_seconds):.1f}s before" | |
| else: | |
| return f"{offset_seconds:.1f}s after" | |
| def _summarize_event(self, event: Dict) -> str: | |
| """Create a human-readable summary of an event""" | |
| parts = [] | |
| if event.get('event_id'): | |
| parts.append(f"EventID {event['event_id']}") | |
| if event.get('user'): | |
| parts.append(f"User: {event['user']}") | |
| if event.get('process_name'): | |
| parts.append(f"Process: {event['process_name']}") | |
| if event.get('command_line'): | |
| cmd = event['command_line'] | |
| if len(cmd) > 60: | |
| cmd = cmd[:57] + "..." | |
| parts.append(f"CMD: {cmd}") | |
| if event.get('src_ip') or event.get('dst_ip'): | |
| if event.get('src_ip') and event.get('dst_ip'): | |
| parts.append(f"Network: {event['src_ip']} → {event['dst_ip']}") | |
| elif event.get('src_ip'): | |
| parts.append(f"SrcIP: {event['src_ip']}") | |
| elif event.get('dst_ip'): | |
| parts.append(f"DstIP: {event['dst_ip']}") | |
| if event.get('file_path'): | |
| parts.append(f"File: {event['file_path']}") | |
| if event.get('registry_path'): | |
| parts.append(f"Registry: {event['registry_path']}") | |
| return " | ".join(parts) if parts else "No details available" | |
| def _generate_timeline_summary(self, pivot_event: Dict, timeline: List[Dict], total_pivot_count: int) -> str: | |
| """Generate a human-readable summary of the timeline""" | |
| summary_parts = [] | |
| summary_parts.append(f"Found {total_pivot_count} matching event(s).") | |
| summary_parts.append(f"Timeline shows {len(timeline)} events around the first match.") | |
| # Count events before/after pivot | |
| before = sum(1 for e in timeline if e.get('offset_seconds', 1) < 0) | |
| after = sum(1 for e in timeline if e.get('offset_seconds', 1) > 0) | |
| if before or after: | |
| summary_parts.append(f"{before} events before, {after} events after pivot.") | |
| # Identify interesting patterns | |
| event_ids = [e['event_summary'] for e in timeline if 'EventID' in e.get('event_summary', '')] | |
| if len(set(event_ids)) < len(event_ids): | |
| summary_parts.append("Multiple similar events detected in sequence.") | |
| return " ".join(summary_parts) | |
| # Create singleton instance | |
| _timeline_builder_tool = TimelineBuilderTool() | |
| def timeline_builder(pivot_entity: str, pivot_type: str, time_window_minutes: int = 5, raw_logs: str = None) -> dict: | |
| """Build a focused timeline around suspicious events to understand attack sequences. | |
| Use this when you suspect coordinated activity or want to understand what happened | |
| before and after a suspicious event. Analyzes the sequence of events to identify patterns. | |
| Args: | |
| pivot_entity: The entity to build timeline around (e.g., "powershell.exe", "admin", "192.168.1.100") | |
| pivot_type: Type of entity - "user", "process", "ip", "file", "computer", "event_id", or "registry" | |
| time_window_minutes: Minutes before and after pivot events to include (default: 5) | |
| raw_logs: The raw log data (automatically provided by agent) | |
| Returns: | |
| Timeline analysis showing events before and after the pivot, helping identify attack sequences. | |
| """ | |
| return _timeline_builder_tool.run({ | |
| "pivot_entity": pivot_entity, | |
| "pivot_type": pivot_type, | |
| "time_window_minutes": time_window_minutes, | |
| "raw_logs": raw_logs | |
| }) |