minhan6559's picture
Upload 102 files
9e3d618 verified
from langchain_core.tools import tool
from typing import Dict, Any, List
from .base_tool import Tool
class FieldReducerTool(Tool):
"""Keep existing implementation"""
def name(self) -> str:
return "fieldreducer"
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
try:
candidates: List[str] = input_data.get("candidates", []) or []
max_fields: int = int(input_data.get("max_fields", 3) or 3)
method: str = input_data.get("priority", "impact") or "impact"
if not isinstance(candidates, list):
return {"error": "candidates must be a list of field names"}
priority_order = [
"event_id", "command_line", "dst_ip", "src_ip", "hash",
"registry_path", "user", "image", "parent_image",
"dst_port", "src_port", "protocol",
]
def score_key(field_name: str) -> int:
try:
return priority_order.index(field_name)
except ValueError:
return len(priority_order)
sorted_candidates = sorted(candidates, key=score_key)
selected = sorted_candidates[:max_fields]
return {
"tool": "fieldreducer",
"selected_names": selected,
"total_candidates": len(candidates),
"method": method,
"max_fields": max_fields
}
except Exception as e:
return {"error": f"{type(e).__name__}: {str(e)}"}
# Create singleton instance
_fieldreducer_tool = FieldReducerTool()
@tool
def fieldreducer(field_names: List[str], max_fields: int = 10) -> dict:
"""Identifies the most security-critical fields from complex log data to focus analysis.
Use this tool when logs contain many fields (10+) and you need to prioritize which data points
are most likely to reveal security threats. This helps avoid analysis paralysis with verbose logs.
Args:
field_names: List of field names from the log data (e.g., ["dst_ip", "src_ip", "event_id", "user"])
max_fields: Maximum number of priority fields to return (default: 10)
Returns:
Prioritized list of fields most relevant for cybersecurity analysis.
"""
return _fieldreducer_tool.run({"candidates": field_names, "max_fields": max_fields})