minhan6559's picture
Upload 102 files
9e3d618 verified
"""
LogAnalysisAgent - Main orchestrator for cybersecurity log analysis
"""
import os
import json
import time
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional
from langchain_core.messages import HumanMessage
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END
from langchain.chat_models import init_chat_model
from langsmith import traceable, Client, get_current_run_tree
from src.agents.log_analysis_agent.state_models import AnalysisState
from src.agents.log_analysis_agent.utils import (
get_llm,
get_tools,
format_execution_time,
truncate_to_tokens,
)
from src.agents.log_analysis_agent.prompts import (
ANALYSIS_PROMPT,
CRITIC_FEEDBACK_TEMPLATE,
SELF_CRITIC_PROMPT,
)
ls_client = Client(api_key=os.getenv("LANGSMITH_API_KEY"))
class LogAnalysisAgent:
"""
Main orchestrator for cybersecurity log analysis.
Coordinates the entire workflow: load → preprocess → analyze → save → display
"""
def __init__(
self,
model_name: str = "google_genai:gemini-2.0-flash",
temperature: float = 0.1,
output_dir: str = "analysis",
max_iterations: int = 2,
llm_client=None,
):
"""
Initialize the Log Analysis Agent
Args:
model_name: Name of the model to use (e.g. "google_genai:gemini-2.0-flash")
temperature: Temperature for the model
output_dir: Directory name for saving outputs (relative to package directory)
max_iterations: Maximum number of iterations for the ReAct agent
llm_client: Optional pre-initialized LLM client (overrides model_name/temperature)
"""
if llm_client:
self.llm = llm_client
print(f"[INFO] Log Analysis Agent: Using provided LLM client")
else:
self.llm = init_chat_model(model_name, temperature=temperature)
print(f"[INFO] Log Analysis Agent: Using default LLM model: {model_name}")
self.base_tools = get_tools()
self.output_root = Path(output_dir)
self.output_root.mkdir(exist_ok=True)
# Initialize helper components
self.log_processor = LogProcessor(model_name=model_name)
self.react_analyzer = ReactAnalyzer(
self.llm, self.base_tools, max_iterations=max_iterations
)
self.result_manager = ResultManager(self.output_root)
# Create workflow graph
self.workflow = self._create_workflow()
def _create_workflow(self) -> StateGraph:
"""Create and configure the analysis workflow graph"""
workflow = StateGraph(AnalysisState)
# Add nodes using instance methods
workflow.add_node("load_logs", self.log_processor.load_logs)
workflow.add_node("preprocess_logs", self.log_processor.preprocess_logs)
workflow.add_node("react_agent_analysis", self.react_analyzer.analyze)
workflow.add_node("save_results", self.result_manager.save_results)
workflow.add_node("display_results", self.result_manager.display_results)
# Define workflow edges
workflow.set_entry_point("load_logs")
workflow.add_edge("load_logs", "preprocess_logs")
workflow.add_edge("preprocess_logs", "react_agent_analysis")
workflow.add_edge("react_agent_analysis", "save_results")
workflow.add_edge("save_results", "display_results")
workflow.add_edge("display_results", END)
return workflow.compile(name="log_analysis_agent")
def _log_workflow_metrics(
self,
workflow_step: str,
execution_time: float,
success: bool,
details: dict = None,
):
"""Log workflow step performance metrics to LangSmith."""
try:
current_run = get_current_run_tree()
if current_run:
ls_client.create_feedback(
run_id=current_run.id,
key="log_analysis_workflow_performance",
score=1.0 if success else 0.0,
value={
"workflow_step": workflow_step,
"execution_time": execution_time,
"success": success,
"details": details or {},
"agent_type": "log_analysis_workflow",
},
)
except Exception as e:
print(f"Failed to log workflow metrics: {e}")
def _log_security_analysis_results(self, analysis_result: dict):
"""Log security analysis findings to LangSmith."""
try:
current_run = get_current_run_tree()
if current_run:
assessment = analysis_result.get("overall_assessment", "UNKNOWN")
abnormal_events = analysis_result.get("abnormal_events", [])
total_events = analysis_result.get("total_events_analyzed", 0)
# Calculate threat score
threat_score = 0.0
if assessment == "CRITICAL":
threat_score = 1.0
elif assessment == "HIGH":
threat_score = 0.8
elif assessment == "MEDIUM":
threat_score = 0.5
elif assessment == "LOW":
threat_score = 0.2
ls_client.create_feedback(
run_id=current_run.id,
key="security_analysis_results",
score=threat_score,
value={
"overall_assessment": assessment,
"abnormal_events_count": len(abnormal_events),
"total_events_analyzed": total_events,
"execution_time": analysis_result.get(
"execution_time_formatted", "Unknown"
),
"iteration_count": analysis_result.get("iteration_count", 1),
"abnormal_events": abnormal_events[
:5
], # Limit to first 5 for logging
},
)
except Exception as e:
print(f"Failed to log security analysis results: {e}")
def _log_batch_analysis_metrics(
self,
total_files: int,
successful: int,
start_time: datetime,
end_time: datetime,
):
"""Log batch analysis performance metrics."""
try:
current_run = get_current_run_tree()
if current_run:
duration = (end_time - start_time).total_seconds()
success_rate = successful / total_files if total_files > 0 else 0
ls_client.create_feedback(
run_id=current_run.id,
key="batch_analysis_performance",
score=success_rate,
value={
"total_files": total_files,
"successful_files": successful,
"failed_files": total_files - successful,
"success_rate": success_rate,
"duration_seconds": duration,
"files_per_minute": (
(total_files / duration) * 60 if duration > 0 else 0
),
},
)
except Exception as e:
print(f"Failed to log batch analysis metrics: {e}")
@traceable(name="log_analysis_agent_full_workflow")
def analyze(self, log_file: str) -> Dict:
"""
Analyze a single log file
Args:
log_file: Path to the log file to analyze
Returns:
Dictionary containing the analysis result
"""
state = self._initialize_state(log_file)
result = self.workflow.invoke(state, config={"recursion_limit": 100})
analysis_result = result.get("analysis_result", {})
if analysis_result:
self._log_security_analysis_results(analysis_result)
return analysis_result
@traceable(name="log_analysis_agent_batch_workflow")
def analyze_batch(
self, dataset_dir: str, skip_existing: bool = False
) -> List[Dict]:
"""
Analyze all log files in a dataset directory
Args:
dataset_dir: Path to directory containing log files
skip_existing: Whether to skip already analyzed files
Returns:
List of result dictionaries for each file
"""
print("=" * 60)
print("BATCH MODE: Analyzing all files in dataset")
print("=" * 60 + "\n")
files = self._find_dataset_files(dataset_dir)
if not files:
print("No JSON files found in dataset directory")
return []
print(f"Found {len(files)} files to analyze")
if skip_existing:
print("Skip mode enabled: Already analyzed files will be skipped")
print()
results = []
batch_start = datetime.now()
for idx, file_path in enumerate(files, 1):
filename = os.path.basename(file_path)
print(f"\n[{idx}/{len(files)}] Processing: {filename}")
print("-" * 60)
result = self._analyze_single_file(file_path, skip_existing)
results.append(result)
if result["success"]:
print(f"Status: {result['message']}")
else:
print(f"Status: FAILED - {result['message']}")
batch_end = datetime.now()
successful = sum(1 for r in results if r["success"])
self._log_batch_analysis_metrics(len(files), successful, batch_start, batch_end)
self.result_manager.display_batch_summary(results, batch_start, batch_end)
return results
def _initialize_state(self, log_file: str) -> Dict:
"""Initialize the analysis state with default values"""
return {
"log_file": log_file,
"raw_logs": "",
"prepared_logs": "",
"analysis_result": {},
"messages": [],
"agent_reasoning": "",
"agent_observations": [],
"iteration_count": 0,
"critic_feedback": "",
"iteration_history": [],
"start_time": 0.0,
"end_time": 0.0,
}
def _analyze_single_file(self, log_file: str, skip_existing: bool = False) -> Dict:
"""Analyze a single log file with error handling"""
try:
if skip_existing:
existing = self.result_manager.get_existing_output(log_file)
if existing:
return {
"success": True,
"log_file": log_file,
"message": "Skipped (already analyzed)",
"result": None,
}
state = self._initialize_state(log_file)
self.workflow.invoke(state, config={"recursion_limit": 100})
return {
"success": True,
"log_file": log_file,
"message": "Analysis completed",
"result": state.get("analysis_result"),
}
except Exception as e:
return {
"success": False,
"log_file": log_file,
"message": f"Error: {str(e)}",
"result": None,
}
def _find_dataset_files(self, dataset_dir: str) -> List[str]:
"""Find all JSON files in the dataset directory"""
import glob
if not os.path.exists(dataset_dir):
print(f"Error: Dataset directory not found: {dataset_dir}")
return []
json_files = glob.glob(os.path.join(dataset_dir, "*.json"))
return sorted(json_files)
class LogProcessor:
"""
Handles log loading and preprocessing operations
"""
def __init__(self, max_size: int = 30000, model_name: str = ""):
"""
Initialize the log processor
Args:
max_size: Maximum character size before applying sampling
model_name: Model name to adjust limits accordingly
"""
if "gpt-oss" in model_name.lower():
self.max_size = 5000 # Conservative limit for GPT-OSS models
print(
f"[INFO] Using reduced sampling size ({self.max_size}) for GPT-OSS model"
)
else:
self.max_size = max_size
self.model_name = model_name
def _get_max_input_tokens(self, model_name: str) -> int:
"""
Determine maximum input tokens based on model capabilities
Args:
model_name: Name of the model to determine token limits for
Returns:
Maximum input tokens for the model
"""
model_lower = model_name.lower()
# Gemini models: 300k tokens
if "gemini" in model_lower:
return 200_000
# elif "gpt-5" in model_lower:
# return 80_000
# Default for other models: 45k tokens
else:
return 45_000
@traceable(name="log_processor_load_logs")
def load_logs(self, state: AnalysisState) -> AnalysisState:
"""Load logs from file and initialize state"""
filename = os.path.basename(state["log_file"])
print(f"Loading logs from: {filename}")
# Record start time
state["start_time"] = time.time()
start_time = time.time()
try:
with open(state["log_file"], "r", encoding="utf-8") as f:
raw = f.read()
success = True
except Exception as e:
print(f"Error reading file: {e}")
raw = f"Error loading file: {e}"
success = False
execution_time = time.time() - start_time
self._log_loading_metrics(filename, len(raw), execution_time, success)
state["raw_logs"] = raw
state["max_input_token"] = self._get_max_input_tokens(self.model_name)
state["messages"] = []
state["agent_reasoning"] = ""
state["agent_observations"] = []
state["iteration_count"] = 0
state["critic_feedback"] = ""
state["iteration_history"] = []
state["end_time"] = 0.0
return state
@traceable(name="log_processor_preprocess_logs")
def preprocess_logs(self, state: AnalysisState) -> AnalysisState:
"""Preprocess logs for analysis - token-based truncation based on model capabilities"""
raw = state["raw_logs"]
line_count = raw.count("\n")
max_tokens = state["max_input_token"]
print(
f"Loaded {line_count} lines, {len(raw)} characters (max tokens: {max_tokens:,})"
)
start_time = time.time()
# Truncate by tokens to keep context windows manageable
truncated = truncate_to_tokens(raw, max_tokens)
token_truncation_applied = len(truncated) < len(raw)
# Prepare final text with minimal header
state["prepared_logs"] = f"TOTAL LINES: {line_count}\n\n{truncated}"
execution_time = time.time() - start_time
self._log_preprocessing_metrics(
line_count,
len(raw),
len(truncated),
token_truncation_applied,
execution_time,
)
return state
def _log_loading_metrics(
self, filename: str, file_size: int, execution_time: float, success: bool
):
"""Log file loading performance metrics."""
try:
current_run = get_current_run_tree()
if current_run:
ls_client.create_feedback(
run_id=current_run.id,
key="log_loading_performance",
score=1.0 if success else 0.0,
value={
"filename": filename,
"file_size_chars": file_size,
"execution_time": execution_time,
"success": success,
},
)
except Exception as e:
print(f"Failed to log loading metrics: {e}")
def _log_preprocessing_metrics(
self,
line_count: int,
original_size: int,
processed_size: int,
sampling_applied: bool,
execution_time: float,
):
"""Log preprocessing performance metrics."""
try:
current_run = get_current_run_tree()
if current_run:
ls_client.create_feedback(
run_id=current_run.id,
key="log_preprocessing_performance",
score=1.0,
value={
"line_count": line_count,
"original_size_chars": original_size,
"processed_size_chars": processed_size,
"sampling_applied": sampling_applied,
"size_reduction": (
(original_size - processed_size) / original_size
if original_size > 0
else 0
),
"execution_time": execution_time,
},
)
except Exception as e:
print(f"Failed to log preprocessing metrics: {e}")
def _apply_sampling(self, raw: str) -> str:
"""Apply sampling strategy with line-aware boundaries"""
lines = raw.split("\n")
total_lines = len(lines)
if total_lines <= 50: # Small files, return as-is
return raw
# Take proportional samples but respect line boundaries
first_lines = lines[: int(total_lines * 0.25)] # First 25%
middle_start = int(total_lines * 0.4)
middle_end = int(total_lines * 0.6)
middle_lines = lines[middle_start:middle_end] # Middle 20%
last_lines = lines[-int(total_lines * 0.25) :] # Last 25%
return f"""=== BEGINNING ({len(first_lines)} lines) ===
{chr(10).join(first_lines)}
=== MIDDLE (lines {middle_start}-{middle_end}) ===
{chr(10).join(middle_lines)}
=== END ({len(last_lines)} lines) ===
{chr(10).join(last_lines)}"""
class ReactAnalyzer:
"""
Handles ReAct agent analysis with iterative refinement
Combines react_engine + criticism_engine logic
"""
def __init__(self, llm, base_tools, max_iterations: int = 2):
"""
Initialize the ReAct analyzer
Args:
llm: Language model instance
base_tools: List of base tools for the agent
max_iterations: Maximum refinement iterations
"""
self.llm = llm
self.base_tools = base_tools
self.max_iterations = max_iterations
@traceable(name="react_analyzer_analysis")
def analyze(self, state: AnalysisState) -> AnalysisState:
"""Perform ReAct agent analysis with iterative refinement"""
print("Starting ReAct agent analysis with iterative refinement...")
start_time = time.time()
# Create state-aware tools
tools = self._create_state_aware_tools(state)
# Create ReAct agent
agent_executor = create_react_agent(
self.llm, tools, name="react_agent_analysis"
)
# System context
system_context = """You are Agent A, an autonomous cybersecurity analyst.
IMPORTANT CONTEXT - RAW LOGS AVAILABLE:
The complete raw logs are available for certain tools automatically.
When you call event_id_extractor_with_logs or timeline_builder_with_logs,
you only need to provide the required parameters - the tools will automatically
access the raw logs to perform their analysis.
"""
try:
# Iterative refinement loop
for iteration in range(self.max_iterations):
state["iteration_count"] = iteration
print(f"\n{'='*60}")
print(f"ITERATION {iteration + 1}/{self.max_iterations}")
print(f"{'='*60}")
# Prepare prompt with optional feedback
messages = self._prepare_messages(state, iteration, system_context)
# Run ReAct agent
print(f"Running agent analysis...")
result = agent_executor.invoke(
{"messages": messages}, config={"recursion_limit": 100}
)
state["messages"] = result["messages"]
# Extract and process final analysis
final_analysis = self._extract_final_analysis(state["messages"])
# Calculate execution time
state["end_time"] = time.time()
execution_time = format_execution_time(
state["end_time"] - state["start_time"]
)
# Extract reasoning
state["agent_reasoning"] = final_analysis.get("reasoning", "")
# Format result
state["analysis_result"] = self._format_analysis_result(
final_analysis,
execution_time,
iteration + 1,
state["agent_reasoning"],
)
# Run self-critic review
print("Running self-critic review...")
original_analysis = state["analysis_result"].copy()
critic_result = self._critic_review(state)
# Store iteration in history
state["iteration_history"].append(
{
"iteration": iteration + 1,
"original_analysis": original_analysis,
"critic_evaluation": {
"quality_acceptable": critic_result["quality_acceptable"],
"issues": critic_result["issues"],
"feedback": critic_result["feedback"],
},
"corrected_analysis": critic_result["corrected_analysis"],
}
)
# Use corrected analysis
corrected = critic_result["corrected_analysis"]
corrected["execution_time_seconds"] = original_analysis.get(
"execution_time_seconds", 0
)
corrected["execution_time_formatted"] = original_analysis.get(
"execution_time_formatted", "Unknown"
)
corrected["iteration_count"] = iteration + 1
state["analysis_result"] = corrected
# Check if refinement is needed
if critic_result["quality_acceptable"]:
print(
f"✓ Quality acceptable - stopping at iteration {iteration + 1}"
)
break
elif iteration < self.max_iterations - 1:
print(
f"✗ Quality needs improvement - proceeding to iteration {iteration + 2}"
)
state["critic_feedback"] = critic_result["feedback"]
else:
print(f"✗ Max iterations reached - using current analysis")
print(
f"\nAnalysis complete after {state['iteration_count'] + 1} iteration(s)"
)
print(f"Total messages: {len(state['messages'])}")
except Exception as e:
print(f"Error in analysis: {e}")
import traceback
traceback.print_exc()
state["end_time"] = time.time()
execution_time = format_execution_time(
state["end_time"] - state["start_time"]
)
state["analysis_result"] = {
"overall_assessment": "ERROR",
"total_events_analyzed": 0,
"execution_time_seconds": execution_time["total_seconds"],
"execution_time_formatted": execution_time["formatted_time"],
"analysis_summary": f"Analysis failed: {e}",
"agent_reasoning": "",
"abnormal_event_ids": [],
"abnormal_events": [],
"iteration_count": state.get("iteration_count", 0),
}
return state
def _create_state_aware_tools(self, state: AnalysisState):
"""Create state-aware versions of tools that need raw logs"""
# Create state-aware event_id_extractor
@tool
def event_id_extractor_with_logs(suspected_event_id: str) -> dict:
"""Validates and corrects Windows Event IDs identified in log analysis."""
from .tools.event_id_extractor_tool import _event_id_extractor_tool
return _event_id_extractor_tool.run(
{
"suspected_event_id": suspected_event_id,
"raw_logs": state["raw_logs"],
}
)
# Create state-aware timeline_builder
@tool
def timeline_builder_with_logs(
pivot_entity: str, pivot_type: str, time_window_minutes: int = 5
) -> dict:
"""Build a focused timeline around suspicious events to understand attack sequences.
Use this when you suspect coordinated activity or want to understand what happened
before and after a suspicious event. Analyzes the sequence of events to identify patterns.
Args:
pivot_entity: The entity to build timeline around (e.g., "powershell.exe", "admin", "192.168.1.100")
pivot_type: Type of entity - "user", "process", "ip", "file", "computer", "event_id", or "registry"
time_window_minutes: Minutes before and after pivot events to include (default: 5)
Returns:
Timeline analysis showing events before and after the pivot, helping identify attack sequences.
"""
from .tools.timeline_builder_tool import _timeline_builder_tool
return _timeline_builder_tool.run(
{
"pivot_entity": pivot_entity,
"pivot_type": pivot_type,
"time_window_minutes": time_window_minutes,
"raw_logs": state["raw_logs"],
}
)
# Replace base tools with state-aware versions
tools = [
t
for t in self.base_tools
if t.name not in ["event_id_extractor", "timeline_builder"]
]
tools.append(event_id_extractor_with_logs)
tools.append(timeline_builder_with_logs)
return tools
def _prepare_messages(
self, state: AnalysisState, iteration: int, system_context: str
):
"""Prepare messages for the ReAct agent"""
if iteration == 0:
# First iteration - no feedback
critic_feedback_section = ""
full_prompt = system_context + ANALYSIS_PROMPT.format(
logs=state["prepared_logs"],
critic_feedback_section=critic_feedback_section,
)
messages = [HumanMessage(content=full_prompt)]
else:
# Subsequent iterations - include feedback and preserve messages
critic_feedback_section = CRITIC_FEEDBACK_TEMPLATE.format(
iteration=iteration + 1, feedback=state["critic_feedback"]
)
# ONLY COPY LANGCHAIN MESSAGE OBJECTS, NOT DICTS
messages = [msg for msg in state["messages"] if not isinstance(msg, dict)]
messages.append(HumanMessage(content=critic_feedback_section))
return messages
def _extract_final_analysis(self, messages):
"""Extract the final analysis from agent messages"""
final_message = None
for msg in reversed(messages):
if (
hasattr(msg, "__class__")
and msg.__class__.__name__ == "AIMessage"
and hasattr(msg, "content")
and msg.content
and (not hasattr(msg, "tool_calls") or not msg.tool_calls)
):
final_message = msg.content
break
if not final_message:
raise Exception("No final analysis message found")
return self._parse_agent_output(final_message)
def _parse_agent_output(self, content: str) -> dict:
"""Parse agent's final output"""
try:
if "```json" in content:
json_str = content.split("```json")[1].split("```")[0].strip()
elif "```" in content:
json_str = content.split("```")[1].split("```")[0].strip()
else:
json_str = content.strip()
return json.loads(json_str)
except Exception as e:
print(f"Failed to parse agent output: {e}")
return {
"overall_assessment": "UNKNOWN",
"total_events_analyzed": 0,
"analysis_summary": content[:500],
"reasoning": "",
"abnormal_event_ids": [],
"abnormal_events": [],
}
def _format_analysis_result(
self, final_analysis, execution_time, iteration_count, agent_reasoning
):
"""Format the analysis result into the expected structure"""
abnormal_events = []
for event in final_analysis.get("abnormal_events", []):
event_with_tools = {
"event_id": event.get("event_id", ""),
"event_description": event.get("event_description", ""),
"why_abnormal": event.get("why_abnormal", ""),
"severity": event.get("severity", ""),
"indicators": event.get("indicators", []),
"potential_threat": event.get("potential_threat", ""),
"attack_category": event.get("attack_category", ""),
"tool_enrichment": event.get("tool_enrichment", {}),
}
abnormal_events.append(event_with_tools)
return {
"overall_assessment": final_analysis.get("overall_assessment", "UNKNOWN"),
"total_events_analyzed": final_analysis.get("total_events_analyzed", 0),
"execution_time_seconds": execution_time["total_seconds"],
"execution_time_formatted": execution_time["formatted_time"],
"analysis_summary": final_analysis.get("analysis_summary", ""),
"agent_reasoning": agent_reasoning,
"abnormal_event_ids": final_analysis.get("abnormal_event_ids", []),
"abnormal_events": abnormal_events,
"iteration_count": iteration_count,
}
# ========== CRITIC ENGINE METHODS ==========
def _critic_review(self, state: dict) -> dict:
"""Run self-critic review with quality evaluation"""
critic_input = SELF_CRITIC_PROMPT.format(
final_json=json.dumps(state["analysis_result"], indent=2),
messages="\n".join(
[str(m.content) for m in state["messages"] if hasattr(m, "content")]
),
logs=state["prepared_logs"],
)
resp = self.llm.invoke(critic_input)
full_response = resp.content
try:
# Parse critic response
quality_acceptable, issues, feedback, corrected_json = (
self._parse_critic_response(full_response)
)
return {
"quality_acceptable": quality_acceptable,
"issues": issues,
"feedback": feedback,
"corrected_analysis": corrected_json,
"full_response": full_response,
}
except Exception as e:
print(f"[Critic] Failed to parse review: {e}")
# If critic fails, accept current analysis
return {
"quality_acceptable": True,
"issues": [],
"feedback": "",
"corrected_analysis": state["analysis_result"],
"full_response": full_response,
}
def _parse_critic_response(self, content: str) -> tuple:
"""Parse critic response and evaluate quality"""
# Extract sections
issues_section = ""
feedback_section = ""
if "## ISSUES FOUND" in content:
parts = content.split("## ISSUES FOUND")
if len(parts) > 1:
issues_part = parts[1].split("##")[0].strip()
issues_section = issues_part
if "## FEEDBACK FOR AGENT" in content:
parts = content.split("## FEEDBACK FOR AGENT")
if len(parts) > 1:
feedback_part = parts[1].split("##")[0].strip()
feedback_section = feedback_part
# Extract corrected JSON
if "```json" in content:
json_str = content.split("```json")[1].split("```")[0].strip()
elif "```" in content:
json_str = content.split("```")[1].split("```")[0].strip()
else:
json_str = "{}"
corrected_json = json.loads(json_str)
# Evaluate quality based on issues
issues = self._extract_issues(issues_section)
quality_acceptable = self._evaluate_quality(issues, issues_section)
return quality_acceptable, issues, feedback_section, corrected_json
def _extract_issues(self, issues_text: str) -> list:
"""Extract structured issues from text"""
issues = []
# Check for "None" or "no issues"
if (
"none" in issues_text.lower()
and "analysis is acceptable" in issues_text.lower()
):
return issues
# Extract issue types
issue_types = {
"MISSING_EVENT_IDS": "missing_event_ids",
"SEVERITY_MISMATCH": "severity_mismatch",
"IGNORED_TOOLS": "ignored_tool_results",
"INCOMPLETE_EVENTS": "incomplete_abnormal_events",
"EVENT_ID_FORMAT": "event_id_format",
"SCHEMA_ISSUES": "schema_issues",
"UNDECODED_COMMANDS": "undecoded_commands",
}
for keyword, issue_type in issue_types.items():
if keyword in issues_text:
issues.append({"type": issue_type, "text": issues_text})
return issues
def _evaluate_quality(self, issues: list, issues_text: str) -> bool:
"""Evaluate if quality is acceptable"""
# If no issues found
if not issues:
return True
# Critical issue types that trigger iteration
critical_types = {
"missing_event_ids",
"severity_mismatch",
"ignored_tool_results",
"incomplete_abnormal_events",
"undecoded_commands",
}
# Count critical issues
critical_count = sum(1 for issue in issues if issue["type"] in critical_types)
# Quality threshold: max 1 critical issue is acceptable
if critical_count >= 2:
return False
# Additional check: if issues_text indicates major problems
if any(
word in issues_text.lower() for word in ["critical", "major", "serious"]
):
return False
return True
class ResultManager:
"""
Handles saving results to disk and displaying to console
"""
def __init__(self, output_root: Path):
"""
Initialize the result manager
Args:
output_root: Root directory for saving outputs
"""
self.output_root = output_root
@traceable(name="result_manager_save_results")
def save_results(self, state: AnalysisState) -> AnalysisState:
"""Save analysis results and messages to files"""
input_name = os.path.splitext(os.path.basename(state["log_file"]))[0]
analysis_dir = self.output_root / input_name
analysis_dir.mkdir(exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
start_time = time.time()
success = True
try:
# Save main analysis result
out_file = analysis_dir / f"analysis_{ts}.json"
with open(out_file, "w", encoding="utf-8") as f:
json.dump(state["analysis_result"], f, indent=2)
# Save iteration history
history_file = analysis_dir / f"iterations_{ts}.json"
with open(history_file, "w", encoding="utf-8") as f:
json.dump(state.get("iteration_history", []), f, indent=2)
# Save messages history
messages_file = analysis_dir / f"messages_{ts}.json"
serializable_messages = self._serialize_messages(state.get("messages", []))
with open(messages_file, "w", encoding="utf-8") as f:
json.dump(serializable_messages, f, indent=2)
except Exception as e:
print(f"Error saving results: {e}")
success = False
execution_time = time.time() - start_time
self._log_save_metrics(input_name, execution_time, success)
return state
def _log_save_metrics(self, input_name: str, execution_time: float, success: bool):
"""Log file saving performance metrics."""
try:
current_run = get_current_run_tree()
if current_run:
ls_client.create_feedback(
run_id=current_run.id,
key="result_save_performance",
score=1.0 if success else 0.0,
value={
"input_name": input_name,
"execution_time": execution_time,
"success": success,
},
)
except Exception as e:
print(f"Failed to log save metrics: {e}")
@traceable(name="result_manager_display_results")
def display_results(self, state: AnalysisState) -> AnalysisState:
"""Display formatted analysis results"""
result = state["analysis_result"]
assessment = result.get("overall_assessment", "UNKNOWN")
execution_time = result.get("execution_time_formatted", "Unknown")
abnormal_events = result.get("abnormal_events", [])
iteration_count = result.get("iteration_count", 1)
print("\n" + "=" * 60)
print("ANALYSIS COMPLETE")
print("=" * 60)
print(f"ASSESSMENT: {assessment}")
print(f"ITERATIONS: {iteration_count}")
print(f"EXECUTION TIME: {execution_time}")
print(f"EVENTS ANALYZED: {result.get('total_events_analyzed', 'Unknown')}")
# Tools Used
tools_used = self._extract_tools_used(state.get("messages", []))
if tools_used:
print(f"TOOLS USED: {len(tools_used)} tools")
print(f" Types: {', '.join(sorted(tools_used))}")
else:
print("TOOLS USED: None")
# Abnormal Events
if abnormal_events:
print(f"\nABNORMAL EVENTS: {len(abnormal_events)}")
for event in abnormal_events:
severity = event.get("severity", "UNKNOWN")
event_id = event.get("event_id", "N/A")
print(f" EventID {event_id} [{severity}]")
else:
print("\nNO ABNORMAL EVENTS")
print("=" * 60)
return state
def display_batch_summary(
self, results: List[Dict], start_time: datetime, end_time: datetime
):
"""Print summary of batch processing results"""
total = len(results)
successful = sum(1 for r in results if r["success"])
skipped = sum(1 for r in results if "Skipped" in r["message"])
failed = total - successful
duration = (end_time - start_time).total_seconds()
print("\n" + "=" * 60)
print("BATCH ANALYSIS SUMMARY")
print("=" * 60)
print(f"Total files: {total}")
print(f"Successful: {successful}")
print(f"Skipped: {skipped}")
print(f"Failed: {failed}")
print(f"Total time: {duration:.2f} seconds ({duration/60:.2f} minutes)")
if failed > 0:
print(f"\nFailed files:")
for r in results:
if not r["success"]:
filename = os.path.basename(r["log_file"])
print(f" - {filename}: {r['message']}")
print("=" * 60 + "\n")
def get_existing_output(self, log_file: str) -> Optional[str]:
"""Get the output file path for a given log file if it exists"""
import glob
input_name = os.path.splitext(os.path.basename(log_file))[0]
analysis_dir = self.output_root / input_name
if analysis_dir.exists():
existing_files = list(analysis_dir.glob("analysis_*.json"))
if existing_files:
return str(existing_files[0])
return None
def _serialize_messages(self, messages) -> List[dict]:
"""Serialize messages for JSON storage"""
serializable_messages = []
for msg in messages:
if isinstance(msg, dict):
serializable_messages.append(msg)
else:
msg_dict = {
"type": msg.__class__.__name__,
"content": msg.content if hasattr(msg, "content") else str(msg),
}
if hasattr(msg, "tool_calls") and msg.tool_calls:
msg_dict["tool_calls"] = [
{"name": tc.get("name", ""), "args": tc.get("args", {})}
for tc in msg.tool_calls
]
serializable_messages.append(msg_dict)
return serializable_messages
def _extract_tools_used(self, messages) -> set:
"""Extract set of tool names used during analysis"""
tools_used = set()
for msg in messages:
if hasattr(msg, "tool_calls") and msg.tool_calls:
for tc in msg.tool_calls:
tool_name = tc.get("name", "")
if tool_name:
tools_used.add(tool_name)
return tools_used