|
|
|
|
|
"""
|
|
|
Streamlit Web App for Cybersecurity Agent Pipeline
|
|
|
|
|
|
A simple web interface for uploading log files and running the cybersecurity analysis pipeline
|
|
|
with different LLM models.
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import sys
|
|
|
import tempfile
|
|
|
import shutil
|
|
|
import streamlit as st
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, Any, Optional
|
|
|
|
|
|
from src.full_pipeline.simple_pipeline import analyze_log_file
|
|
|
|
|
|
from dotenv import load_dotenv
|
|
|
from huggingface_hub import login as huggingface_login
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
|
|
|
def get_model_providers() -> Dict[str, Dict[str, str]]:
|
|
|
"""Get available model providers and their models."""
|
|
|
return {
|
|
|
"Google GenAI": {
|
|
|
"gemini-2.0-flash": "google_genai:gemini-2.0-flash",
|
|
|
"gemini-2.0-flash-lite": "google_genai:gemini-2.0-flash-lite",
|
|
|
"gemini-2.5-flash-lite": "google_genai:gemini-2.5-flash-lite",
|
|
|
},
|
|
|
"Groq": {
|
|
|
"openai/gpt-oss-120b": "groq:openai/gpt-oss-120b",
|
|
|
"moonshotai/kimi-k2-instruct-0905": "groq:moonshotai/kimi-k2-instruct-0905",
|
|
|
},
|
|
|
"OpenAI": {"gpt-4o": "openai:gpt-4o", "gpt-4.1": "openai:gpt-4.1"},
|
|
|
}
|
|
|
|
|
|
|
|
|
def get_api_key_help() -> Dict[str, str]:
|
|
|
"""Get API key help information for each provider."""
|
|
|
return {
|
|
|
"Google GenAI": "https://aistudio.google.com/app/apikey",
|
|
|
"Groq": "https://console.groq.com/keys",
|
|
|
"OpenAI": "https://platform.openai.com/api-keys",
|
|
|
}
|
|
|
|
|
|
|
|
|
def setup_temp_directories(temp_dir: str) -> Dict[str, str]:
|
|
|
"""Setup temporary directories for the pipeline."""
|
|
|
log_files_dir = os.path.join(temp_dir, "log_files")
|
|
|
analysis_dir = os.path.join(temp_dir, "analysis")
|
|
|
final_response_dir = os.path.join(temp_dir, "final_response")
|
|
|
|
|
|
os.makedirs(log_files_dir, exist_ok=True)
|
|
|
os.makedirs(analysis_dir, exist_ok=True)
|
|
|
os.makedirs(final_response_dir, exist_ok=True)
|
|
|
|
|
|
return {
|
|
|
"log_files": log_files_dir,
|
|
|
"analysis": analysis_dir,
|
|
|
"final_response": final_response_dir,
|
|
|
}
|
|
|
|
|
|
|
|
|
def save_uploaded_file(uploaded_file, temp_dir: str) -> str:
|
|
|
"""Save uploaded file to temporary directory."""
|
|
|
log_files_dir = os.path.join(temp_dir, "log_files")
|
|
|
file_path = os.path.join(log_files_dir, uploaded_file.name)
|
|
|
|
|
|
with open(file_path, "wb") as f:
|
|
|
f.write(uploaded_file.getbuffer())
|
|
|
|
|
|
return file_path
|
|
|
|
|
|
|
|
|
def run_analysis(
|
|
|
log_file_path: str,
|
|
|
model_name: str,
|
|
|
query: str,
|
|
|
temp_dirs: Dict[str, str],
|
|
|
api_key: str,
|
|
|
provider: str,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Run the cybersecurity analysis pipeline."""
|
|
|
|
|
|
|
|
|
if provider == "Google GenAI":
|
|
|
os.environ["GOOGLE_API_KEY"] = api_key
|
|
|
elif provider == "Groq":
|
|
|
os.environ["GROQ_API_KEY"] = api_key
|
|
|
elif provider == "OpenAI":
|
|
|
os.environ["OPENAI_API_KEY"] = api_key
|
|
|
|
|
|
try:
|
|
|
|
|
|
result = analyze_log_file(
|
|
|
log_file=log_file_path,
|
|
|
query=query,
|
|
|
tactic=None,
|
|
|
model_name=model_name,
|
|
|
temperature=0.1,
|
|
|
log_agent_output_dir=temp_dirs["analysis"],
|
|
|
response_agent_output_dir=temp_dirs["final_response"],
|
|
|
)
|
|
|
return {"success": True, "result": result}
|
|
|
except Exception as e:
|
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
|
|
|
def main():
|
|
|
"""Main Streamlit app."""
|
|
|
|
|
|
if os.getenv("HF_TOKEN"):
|
|
|
huggingface_login(token=os.getenv("HF_TOKEN"))
|
|
|
|
|
|
st.set_page_config(
|
|
|
page_title="Cybersecurity Agent Pipeline", page_icon="🛡️", layout="wide"
|
|
|
)
|
|
|
|
|
|
st.title("Cybersecurity Agent Pipeline")
|
|
|
st.markdown(
|
|
|
"Upload a log file and analyze it using advanced LLM-based cybersecurity agents."
|
|
|
)
|
|
|
|
|
|
|
|
|
with st.sidebar:
|
|
|
st.header("Configuration")
|
|
|
|
|
|
|
|
|
providers = get_model_providers()
|
|
|
selected_provider = st.selectbox(
|
|
|
"Select Model Provider", list(providers.keys())
|
|
|
)
|
|
|
|
|
|
available_models = providers[selected_provider]
|
|
|
selected_model_display = st.selectbox(
|
|
|
"Select Model", list(available_models.keys())
|
|
|
)
|
|
|
selected_model = available_models[selected_model_display]
|
|
|
|
|
|
|
|
|
st.subheader("API Key")
|
|
|
api_key_help = get_api_key_help()
|
|
|
|
|
|
with st.expander("How to get API key", expanded=False):
|
|
|
st.markdown(f"**{selected_provider}**:")
|
|
|
st.markdown(f"[Get API Key]({api_key_help[selected_provider]})")
|
|
|
|
|
|
api_key = st.text_input(
|
|
|
f"Enter {selected_provider} API Key",
|
|
|
type="password",
|
|
|
help=f"Your {selected_provider} API key",
|
|
|
)
|
|
|
|
|
|
|
|
|
st.subheader("Additional Context")
|
|
|
user_query = st.text_area(
|
|
|
"Optional Query",
|
|
|
placeholder="e.g., 'Focus on credential access attacks'",
|
|
|
help="Provide additional context or specific focus areas for the analysis",
|
|
|
)
|
|
|
|
|
|
|
|
|
col1, col2 = st.columns([2, 1])
|
|
|
|
|
|
with col1:
|
|
|
st.header("Upload Log File")
|
|
|
uploaded_file = st.file_uploader(
|
|
|
"Choose a JSON log file",
|
|
|
type=["json"],
|
|
|
help="Upload a JSON log file from the Mordor dataset or similar security logs",
|
|
|
)
|
|
|
|
|
|
with col2:
|
|
|
st.header("Analysis Status")
|
|
|
if uploaded_file is not None:
|
|
|
st.success(f"File uploaded: {uploaded_file.name}")
|
|
|
st.info(f"Size: {uploaded_file.size:,} bytes")
|
|
|
else:
|
|
|
st.warning("Please upload a log file")
|
|
|
|
|
|
|
|
|
if st.button(
|
|
|
"Run Analysis", type="primary", disabled=not (uploaded_file and api_key)
|
|
|
):
|
|
|
if not uploaded_file:
|
|
|
st.error("Please upload a log file first.")
|
|
|
return
|
|
|
|
|
|
if not api_key:
|
|
|
st.error("Please enter your API key.")
|
|
|
return
|
|
|
|
|
|
|
|
|
temp_dir = tempfile.mkdtemp(prefix="cyber_agent_")
|
|
|
|
|
|
try:
|
|
|
|
|
|
temp_dirs = setup_temp_directories(temp_dir)
|
|
|
|
|
|
|
|
|
log_file_path = save_uploaded_file(uploaded_file, temp_dir)
|
|
|
|
|
|
|
|
|
progress_bar = st.progress(0)
|
|
|
status_text = st.empty()
|
|
|
|
|
|
status_text.text("Initializing analysis...")
|
|
|
progress_bar.progress(10)
|
|
|
|
|
|
|
|
|
status_text.text("Running cybersecurity analysis...")
|
|
|
progress_bar.progress(50)
|
|
|
|
|
|
analysis_result = run_analysis(
|
|
|
log_file_path=log_file_path,
|
|
|
model_name=selected_model,
|
|
|
query=user_query,
|
|
|
temp_dirs=temp_dirs,
|
|
|
api_key=api_key,
|
|
|
provider=selected_provider,
|
|
|
)
|
|
|
|
|
|
progress_bar.progress(90)
|
|
|
status_text.text("Finalizing results...")
|
|
|
|
|
|
if analysis_result["success"]:
|
|
|
progress_bar.progress(100)
|
|
|
status_text.text("Analysis completed successfully!")
|
|
|
|
|
|
|
|
|
st.header("Analysis Results")
|
|
|
|
|
|
result = analysis_result["result"]
|
|
|
|
|
|
|
|
|
col1, col2, col3 = st.columns(3)
|
|
|
|
|
|
with col1:
|
|
|
assessment = result.get("log_analysis_result", {}).get(
|
|
|
"overall_assessment", "Unknown"
|
|
|
)
|
|
|
st.metric("Overall Assessment", assessment)
|
|
|
|
|
|
with col2:
|
|
|
abnormal_events = result.get("log_analysis_result", {}).get(
|
|
|
"abnormal_events", []
|
|
|
)
|
|
|
st.metric("Abnormal Events", len(abnormal_events))
|
|
|
|
|
|
with col3:
|
|
|
execution_time = result.get("execution_time", "N/A")
|
|
|
st.metric(
|
|
|
"Execution Time",
|
|
|
(
|
|
|
f"{execution_time:.2f}s"
|
|
|
if isinstance(execution_time, (int, float))
|
|
|
else execution_time
|
|
|
),
|
|
|
)
|
|
|
|
|
|
|
|
|
markdown_report = result.get("markdown_report", "")
|
|
|
if markdown_report:
|
|
|
st.header("Detailed Report")
|
|
|
st.markdown(markdown_report)
|
|
|
else:
|
|
|
st.warning("No detailed report generated.")
|
|
|
|
|
|
else:
|
|
|
st.error(f"Analysis failed: {analysis_result['error']}")
|
|
|
st.exception(analysis_result["error"])
|
|
|
|
|
|
finally:
|
|
|
|
|
|
try:
|
|
|
shutil.rmtree(temp_dir)
|
|
|
except Exception as e:
|
|
|
st.warning(f"Could not clean up temporary directory: {e}")
|
|
|
|
|
|
|
|
|
st.markdown("---")
|
|
|
st.markdown(
|
|
|
"**Cybersecurity Agent Pipeline** - Powered by LangGraph and LangChain | "
|
|
|
"Built for educational purposes demonstrating LLM-based multi-agent systems"
|
|
|
)
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
|