|
|
import os
|
|
|
from dotenv import load_dotenv
|
|
|
from langchain.chat_models import init_chat_model
|
|
|
from langchain_text_splitters import TokenTextSplitter
|
|
|
|
|
|
|
|
|
from src.agents.log_analysis_agent.tools.shodan_tool import shodan_lookup
|
|
|
from src.agents.log_analysis_agent.tools.virustotal_tool import (
|
|
|
virustotal_lookup,
|
|
|
virustotal_metadata_search,
|
|
|
)
|
|
|
from src.agents.log_analysis_agent.tools.fieldreducer_tool import fieldreducer
|
|
|
from src.agents.log_analysis_agent.tools.event_id_extractor_tool import (
|
|
|
event_id_extractor,
|
|
|
)
|
|
|
from src.agents.log_analysis_agent.tools.timeline_builder_tool import timeline_builder
|
|
|
from src.agents.log_analysis_agent.tools.decoder_tool import decoder
|
|
|
|
|
|
|
|
|
|
|
|
def get_llm():
|
|
|
"""Initialize and return the LLM instance (without tools)"""
|
|
|
load_dotenv()
|
|
|
return init_chat_model(
|
|
|
"google_genai:gemini-2.0-flash",
|
|
|
temperature=0.1,
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def get_tools():
|
|
|
"""Return list of available tools for the agent"""
|
|
|
return [
|
|
|
|
|
|
|
|
|
|
|
|
fieldreducer,
|
|
|
event_id_extractor,
|
|
|
timeline_builder,
|
|
|
decoder,
|
|
|
]
|
|
|
|
|
|
|
|
|
def get_llm_with_tools():
|
|
|
"""Initialize and return LLM with tools bound"""
|
|
|
llm = get_llm()
|
|
|
tools = get_tools()
|
|
|
return llm.bind_tools(tools)
|
|
|
|
|
|
|
|
|
|
|
|
def format_execution_time(total_seconds: float) -> dict:
|
|
|
"""Format execution time into a readable format"""
|
|
|
minutes = int(total_seconds // 60)
|
|
|
seconds = total_seconds % 60
|
|
|
|
|
|
return {
|
|
|
"total_seconds": round(total_seconds, 2),
|
|
|
"formatted_time": (
|
|
|
f"{minutes}m {seconds:.2f}s" if minutes > 0 else f"{seconds:.2f}s"
|
|
|
),
|
|
|
}
|
|
|
|
|
|
def truncate_to_tokens(text: str, max_tokens: int) -> str:
|
|
|
"""
|
|
|
Truncate text to a maximum number of tokens using LangChain's TokenTextSplitter.
|
|
|
|
|
|
Args:
|
|
|
text: The text to truncate
|
|
|
max_tokens: Maximum number of tokens
|
|
|
|
|
|
Returns:
|
|
|
Truncated text within the token limit
|
|
|
"""
|
|
|
if not text:
|
|
|
return ""
|
|
|
|
|
|
|
|
|
cleaned_text = text.replace("\n", " ")
|
|
|
|
|
|
|
|
|
splitter = TokenTextSplitter(
|
|
|
encoding_name="cl100k_base", chunk_size=max_tokens, chunk_overlap=0
|
|
|
)
|
|
|
|
|
|
chunks = splitter.split_text(cleaned_text)
|
|
|
return chunks[0] if chunks else ""
|
|
|
|