|
|
"""
|
|
|
Main entry point for the Cybersecurity Log Analysis Agent
|
|
|
"""
|
|
|
|
|
|
import argparse
|
|
|
import sys
|
|
|
from pathlib import Path
|
|
|
|
|
|
import os
|
|
|
import sys
|
|
|
import json
|
|
|
from dotenv import load_dotenv
|
|
|
from typing import List, Dict, Any, Union, Optional
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
project_root = Path(__file__).parent.parent.parent
|
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
|
|
from src.agents.log_analysis_agent.agent import LogAnalysisAgent
|
|
|
|
|
|
|
|
|
load_dotenv()
|
|
|
os.environ["GOOGLE_API_KEY"] = os.getenv("GOOGLE_API_KEY")
|
|
|
|
|
|
|
|
|
def main():
|
|
|
"""Main entry point for the cybersecurity log analysis agent"""
|
|
|
parser = argparse.ArgumentParser(
|
|
|
description="Agentic Cyber Log Analysis with ReAct"
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"log_file", help="Path to log file or 'all' to process entire dataset"
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"--skip-existing",
|
|
|
action="store_true",
|
|
|
help="Skip files that have already been analyzed",
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"--output-dir",
|
|
|
default="analysis",
|
|
|
help="Output directory name (default: analysis)",
|
|
|
)
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
|
agent = LogAnalysisAgent(
|
|
|
model_name="google_genai:gemini-2.0-flash",
|
|
|
temperature=0.1,
|
|
|
output_dir=args.output_dir,
|
|
|
max_iterations=4,
|
|
|
)
|
|
|
|
|
|
|
|
|
if args.log_file != "all":
|
|
|
print(f"Analyzing single file: {args.log_file}")
|
|
|
result = agent.analyze(args.log_file)
|
|
|
|
|
|
|
|
|
package_dir = Path(__file__).parent
|
|
|
output_path = package_dir / args.output_dir
|
|
|
print(f"\n✓ Analysis complete!")
|
|
|
print(f"Results saved to: {output_path}/")
|
|
|
return
|
|
|
|
|
|
|
|
|
package_dir = Path(__file__).parent
|
|
|
project_root = package_dir.parent.parent
|
|
|
dataset_dir = project_root / "mordor_dataset"
|
|
|
|
|
|
if not dataset_dir.exists():
|
|
|
print(f"Error: Dataset directory not found at {dataset_dir}")
|
|
|
print("Please ensure 'mordor_dataset' exists at project root level")
|
|
|
sys.exit(1)
|
|
|
|
|
|
results = agent.analyze_batch(
|
|
|
dataset_dir=str(dataset_dir), skip_existing=args.skip_existing
|
|
|
)
|
|
|
|
|
|
|
|
|
output_path = package_dir / args.output_dir
|
|
|
print(f"\n✓ Batch analysis complete!")
|
|
|
print(f"Results saved to: {output_path}/")
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
|