File size: 2,583 Bytes
223ef32 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
"""
Main entry point for the Cybersecurity Log Analysis Agent
"""
import argparse
import sys
from pathlib import Path
import os
import sys
import json
from dotenv import load_dotenv
from typing import List, Dict, Any, Union, Optional
from pathlib import Path
# Add the project root to Python path so we can import from src
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from src.agents.log_analysis_agent.agent import LogAnalysisAgent
load_dotenv()
os.environ["GOOGLE_API_KEY"] = os.getenv("GOOGLE_API_KEY")
def main():
"""Main entry point for the cybersecurity log analysis agent"""
parser = argparse.ArgumentParser(
description="Agentic Cyber Log Analysis with ReAct"
)
parser.add_argument(
"log_file", help="Path to log file or 'all' to process entire dataset"
)
parser.add_argument(
"--skip-existing",
action="store_true",
help="Skip files that have already been analyzed",
)
parser.add_argument(
"--output-dir",
default="analysis",
help="Output directory name (default: analysis)",
)
args = parser.parse_args()
# Initialize the agent
agent = LogAnalysisAgent(
model_name="google_genai:gemini-2.0-flash",
temperature=0.1,
output_dir=args.output_dir,
max_iterations=4,
)
# Single file mode
if args.log_file != "all":
print(f"Analyzing single file: {args.log_file}")
result = agent.analyze(args.log_file)
# Print output location
package_dir = Path(__file__).parent
output_path = package_dir / args.output_dir
print(f"\n✓ Analysis complete!")
print(f"Results saved to: {output_path}/")
return
# Batch mode - find dataset directory
package_dir = Path(__file__).parent
project_root = package_dir.parent.parent # Go up to project root
dataset_dir = project_root / "mordor_dataset"
if not dataset_dir.exists():
print(f"Error: Dataset directory not found at {dataset_dir}")
print("Please ensure 'mordor_dataset' exists at project root level")
sys.exit(1)
results = agent.analyze_batch(
dataset_dir=str(dataset_dir), skip_existing=args.skip_existing
)
# Print output location
output_path = package_dir / args.output_dir
print(f"\n✓ Batch analysis complete!")
print(f"Results saved to: {output_path}/")
if __name__ == "__main__":
main()
|