File size: 2,610 Bytes
f2fde6b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
import os
from dotenv import load_dotenv
from langchain.chat_models import init_chat_model
from langchain_text_splitters import TokenTextSplitter
# Import the @tool decorated functions (not the classes)
from src.agents.log_analysis_agent.tools.shodan_tool import shodan_lookup
from src.agents.log_analysis_agent.tools.virustotal_tool import (
virustotal_lookup,
virustotal_metadata_search,
)
from src.agents.log_analysis_agent.tools.fieldreducer_tool import fieldreducer
from src.agents.log_analysis_agent.tools.event_id_extractor_tool import (
event_id_extractor,
)
from src.agents.log_analysis_agent.tools.timeline_builder_tool import timeline_builder
from src.agents.log_analysis_agent.tools.decoder_tool import decoder
# ----- LLM Setup -----
def get_llm():
"""Initialize and return the LLM instance (without tools)"""
load_dotenv()
return init_chat_model(
"google_genai:gemini-2.0-flash",
temperature=0.1,
)
# ----- Tool Setup -----
def get_tools():
"""Return list of available tools for the agent"""
return [
# shodan_lookup,
# virustotal_lookup,
# virustotal_metadata_search,
fieldreducer,
event_id_extractor,
timeline_builder,
decoder,
]
def get_llm_with_tools():
"""Initialize and return LLM with tools bound"""
llm = get_llm()
tools = get_tools()
return llm.bind_tools(tools)
# ----- Helper Functions -----
def format_execution_time(total_seconds: float) -> dict:
"""Format execution time into a readable format"""
minutes = int(total_seconds // 60)
seconds = total_seconds % 60
return {
"total_seconds": round(total_seconds, 2),
"formatted_time": (
f"{minutes}m {seconds:.2f}s" if minutes > 0 else f"{seconds:.2f}s"
),
}
def truncate_to_tokens(text: str, max_tokens: int) -> str:
"""
Truncate text to a maximum number of tokens using LangChain's TokenTextSplitter.
Args:
text: The text to truncate
max_tokens: Maximum number of tokens
Returns:
Truncated text within the token limit
"""
if not text:
return ""
# Clean the text by replacing newlines with spaces
cleaned_text = text.replace("\n", " ")
# Use TokenTextSplitter to split by tokens
splitter = TokenTextSplitter(
encoding_name="cl100k_base", chunk_size=max_tokens, chunk_overlap=0
)
chunks = splitter.split_text(cleaned_text)
return chunks[0] if chunks else ""
|