|
|
from langchain_core.tools import tool
|
|
|
from typing import Dict, Any
|
|
|
import base64
|
|
|
import binascii
|
|
|
import re
|
|
|
from .base_tool import Tool
|
|
|
|
|
|
class DecoderTool(Tool):
|
|
|
"""Decode Base64 and Hex encoded strings commonly used to hide malicious commands"""
|
|
|
|
|
|
def name(self) -> str:
|
|
|
return "decoder"
|
|
|
|
|
|
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
try:
|
|
|
encoded_string = input_data.get("encoded_string", "")
|
|
|
encoding_type = input_data.get("encoding_type", "auto")
|
|
|
|
|
|
if not encoded_string:
|
|
|
return {"error": "No encoded string provided"}
|
|
|
|
|
|
|
|
|
if encoding_type == "auto":
|
|
|
encoding_type = self._detect_encoding(encoded_string)
|
|
|
|
|
|
|
|
|
decoded_text, success = self._decode_string(encoded_string, encoding_type)
|
|
|
|
|
|
if not success:
|
|
|
return {
|
|
|
"tool": "decoder",
|
|
|
"encoded_string": encoded_string[:100] + "..." if len(encoded_string) > 100 else encoded_string,
|
|
|
"encoding_detected": encoding_type,
|
|
|
"decoded_text": None,
|
|
|
"success": False,
|
|
|
"error": "Failed to decode - invalid encoding or corrupted data"
|
|
|
}
|
|
|
|
|
|
|
|
|
threat_analysis = self._analyze_decoded_content(decoded_text)
|
|
|
|
|
|
return {
|
|
|
"tool": "decoder",
|
|
|
"encoded_string": encoded_string[:100] + "..." if len(encoded_string) > 100 else encoded_string,
|
|
|
"encoding_detected": encoding_type,
|
|
|
"decoded_text": decoded_text,
|
|
|
"success": True,
|
|
|
"threat_analysis": threat_analysis
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {"error": f"{type(e).__name__}: {str(e)}"}
|
|
|
|
|
|
def _detect_encoding(self, string: str) -> str:
|
|
|
"""Auto-detect if string is base64 or hex"""
|
|
|
|
|
|
clean_string = string.strip()
|
|
|
|
|
|
|
|
|
if re.match(r'^[0-9A-Fa-f]+$', clean_string) and len(clean_string) % 2 == 0:
|
|
|
|
|
|
|
|
|
if len(clean_string) > 10:
|
|
|
return "hex"
|
|
|
|
|
|
|
|
|
|
|
|
if re.match(r'^[A-Za-z0-9+/]+=*$', clean_string):
|
|
|
return "base64"
|
|
|
|
|
|
|
|
|
return "base64"
|
|
|
|
|
|
def _decode_string(self, encoded_string: str, encoding_type: str) -> tuple:
|
|
|
"""Decode string and return (decoded_text, success)"""
|
|
|
try:
|
|
|
if encoding_type == "base64":
|
|
|
return self._decode_base64(encoded_string)
|
|
|
elif encoding_type == "hex":
|
|
|
return self._decode_hex(encoded_string)
|
|
|
else:
|
|
|
return None, False
|
|
|
except Exception as e:
|
|
|
return None, False
|
|
|
|
|
|
def _decode_base64(self, encoded_string: str) -> tuple:
|
|
|
"""Decode base64 string, trying multiple character encodings"""
|
|
|
try:
|
|
|
|
|
|
clean_string = encoded_string.strip()
|
|
|
|
|
|
|
|
|
decoded_bytes = base64.b64decode(clean_string)
|
|
|
|
|
|
|
|
|
encodings = ['utf-16le', 'utf-16be', 'utf-8', 'ascii', 'latin-1']
|
|
|
|
|
|
for encoding in encodings:
|
|
|
try:
|
|
|
decoded_text = decoded_bytes.decode(encoding)
|
|
|
|
|
|
decoded_text = decoded_text.replace('\x00', '')
|
|
|
|
|
|
if decoded_text.strip():
|
|
|
return decoded_text, True
|
|
|
except (UnicodeDecodeError, AttributeError):
|
|
|
continue
|
|
|
|
|
|
|
|
|
return decoded_bytes.hex(), True
|
|
|
|
|
|
except Exception as e:
|
|
|
return None, False
|
|
|
|
|
|
def _decode_hex(self, encoded_string: str) -> tuple:
|
|
|
"""Decode hex string"""
|
|
|
try:
|
|
|
clean_string = encoded_string.strip()
|
|
|
decoded_bytes = bytes.fromhex(clean_string)
|
|
|
|
|
|
|
|
|
encodings = ['utf-8', 'utf-16le', 'ascii', 'latin-1']
|
|
|
|
|
|
for encoding in encodings:
|
|
|
try:
|
|
|
decoded_text = decoded_bytes.decode(encoding)
|
|
|
decoded_text = decoded_text.replace('\x00', '')
|
|
|
if decoded_text.strip():
|
|
|
return decoded_text, True
|
|
|
except (UnicodeDecodeError, AttributeError):
|
|
|
continue
|
|
|
|
|
|
return None, False
|
|
|
|
|
|
except Exception as e:
|
|
|
return None, False
|
|
|
|
|
|
def _analyze_decoded_content(self, decoded_text: str) -> Dict[str, Any]:
|
|
|
"""Analyze decoded content for malicious patterns"""
|
|
|
if not decoded_text:
|
|
|
return {
|
|
|
"is_suspicious": False,
|
|
|
"threat_level": "UNKNOWN",
|
|
|
"indicators": [],
|
|
|
"attack_techniques": []
|
|
|
}
|
|
|
|
|
|
decoded_lower = decoded_text.lower()
|
|
|
indicators = []
|
|
|
attack_techniques = []
|
|
|
|
|
|
|
|
|
powershell_patterns = {
|
|
|
"iex": "Invoke-Expression - executes arbitrary code",
|
|
|
"invoke-expression": "Executes arbitrary PowerShell code",
|
|
|
"invoke-command": "Remote command execution",
|
|
|
"invoke-webrequest": "Downloads content from internet",
|
|
|
"downloadstring": "Downloads and executes remote code",
|
|
|
"downloadfile": "Downloads file from internet",
|
|
|
"webclient": "Network client for downloading content",
|
|
|
"net.webclient": "Network client object",
|
|
|
"bitstransfer": "Background file transfer (potential data exfiltration)",
|
|
|
"start-bitstransfer": "BITS transfer for file download"
|
|
|
}
|
|
|
|
|
|
|
|
|
evasion_patterns = {
|
|
|
"-nop": "NoProfile flag - avoids loading profile scripts",
|
|
|
"-noprofile": "Skips PowerShell profile loading",
|
|
|
"-w hidden": "Hidden window - runs invisibly",
|
|
|
"-windowstyle hidden": "Hides PowerShell window",
|
|
|
"-ep bypass": "Execution policy bypass",
|
|
|
"-executionpolicy bypass": "Disables script execution restrictions",
|
|
|
"-enc": "Encoded command (nested encoding)",
|
|
|
"-encodedcommand": "Base64 encoded command",
|
|
|
"frombase64string": "Additional decoding layer"
|
|
|
}
|
|
|
|
|
|
|
|
|
credential_patterns = {
|
|
|
"mimikatz": "Credential dumping tool",
|
|
|
"invoke-mimikatz": "PowerShell wrapper for Mimikatz",
|
|
|
"get-credential": "Prompts for credentials",
|
|
|
"convertto-securestring": "Password manipulation",
|
|
|
"sekurlsa": "Mimikatz module for credential extraction",
|
|
|
"lsadump": "LSA secrets dumping",
|
|
|
"password": "Potential credential theft",
|
|
|
"sam": "Security Account Manager access"
|
|
|
}
|
|
|
|
|
|
|
|
|
persistence_patterns = {
|
|
|
"schtasks": "Scheduled task creation",
|
|
|
"new-scheduledtask": "Creates scheduled task for persistence",
|
|
|
"register-scheduledtask": "Registers scheduled task",
|
|
|
"startup": "Startup folder modification",
|
|
|
"registry": "Registry modification",
|
|
|
"wmi": "WMI-based persistence",
|
|
|
"new-service": "Service creation"
|
|
|
}
|
|
|
|
|
|
|
|
|
lateral_patterns = {
|
|
|
"psexec": "Remote execution tool",
|
|
|
"winrm": "Windows Remote Management",
|
|
|
"invoke-command -computername": "Remote command execution",
|
|
|
"enter-pssession": "Interactive remote session",
|
|
|
"wmic": "WMI command-line tool"
|
|
|
}
|
|
|
|
|
|
|
|
|
c2_patterns = {
|
|
|
"http://": "HTTP connection (potential C2)",
|
|
|
"https://": "HTTPS connection (potential C2)",
|
|
|
"://": "URL connection",
|
|
|
"tcp": "TCP network connection",
|
|
|
"socket": "Network socket creation",
|
|
|
"getstream": "Network stream (potential C2 channel)"
|
|
|
}
|
|
|
|
|
|
|
|
|
exfil_patterns = {
|
|
|
"compress-archive": "File compression before exfiltration",
|
|
|
"out-file": "Writing to file (staging for exfiltration)",
|
|
|
"set-content": "File creation/modification",
|
|
|
"send-mailmessage": "Email-based exfiltration",
|
|
|
"ftp": "FTP transfer",
|
|
|
"post": "HTTP POST (potential data upload)"
|
|
|
}
|
|
|
|
|
|
|
|
|
all_patterns = [
|
|
|
(powershell_patterns, "execution"),
|
|
|
(evasion_patterns, "defense_evasion"),
|
|
|
(credential_patterns, "credential_access"),
|
|
|
(persistence_patterns, "persistence"),
|
|
|
(lateral_patterns, "lateral_movement"),
|
|
|
(c2_patterns, "command_and_control"),
|
|
|
(exfil_patterns, "exfiltration")
|
|
|
]
|
|
|
|
|
|
for pattern_dict, technique in all_patterns:
|
|
|
for pattern, description in pattern_dict.items():
|
|
|
if pattern in decoded_lower:
|
|
|
indicators.append(description)
|
|
|
if technique not in attack_techniques:
|
|
|
attack_techniques.append(technique)
|
|
|
|
|
|
|
|
|
threat_level = self._calculate_threat_level(len(indicators), attack_techniques)
|
|
|
|
|
|
|
|
|
threat_summary = self._generate_threat_summary(decoded_text, indicators, attack_techniques)
|
|
|
|
|
|
return {
|
|
|
"is_suspicious": len(indicators) > 0,
|
|
|
"threat_level": threat_level,
|
|
|
"indicators": indicators[:10],
|
|
|
"indicator_count": len(indicators),
|
|
|
"attack_techniques": attack_techniques,
|
|
|
"threat_summary": threat_summary
|
|
|
}
|
|
|
|
|
|
def _calculate_threat_level(self, indicator_count: int, attack_techniques: list) -> str:
|
|
|
"""Calculate threat level based on indicators and techniques"""
|
|
|
if indicator_count == 0:
|
|
|
return "LOW"
|
|
|
|
|
|
|
|
|
high_risk = ["credential_access", "command_and_control", "exfiltration"]
|
|
|
has_high_risk = any(tech in attack_techniques for tech in high_risk)
|
|
|
|
|
|
if has_high_risk or indicator_count >= 5:
|
|
|
return "CRITICAL"
|
|
|
elif indicator_count >= 3:
|
|
|
return "HIGH"
|
|
|
elif indicator_count >= 1:
|
|
|
return "MEDIUM"
|
|
|
else:
|
|
|
return "LOW"
|
|
|
|
|
|
def _generate_threat_summary(self, decoded_text: str, indicators: list, attack_techniques: list) -> str:
|
|
|
"""Generate human-readable threat summary"""
|
|
|
if not indicators:
|
|
|
return "No suspicious patterns detected in decoded content"
|
|
|
|
|
|
summary_parts = []
|
|
|
|
|
|
|
|
|
if len(indicators) == 1:
|
|
|
summary_parts.append(f"Found 1 suspicious indicator: {indicators[0]}")
|
|
|
else:
|
|
|
summary_parts.append(f"Found {len(indicators)} suspicious indicators including: {indicators[0]}")
|
|
|
|
|
|
|
|
|
if attack_techniques:
|
|
|
technique_names = {
|
|
|
"execution": "arbitrary code execution",
|
|
|
"defense_evasion": "defense evasion",
|
|
|
"credential_access": "credential theft",
|
|
|
"persistence": "persistence mechanisms",
|
|
|
"lateral_movement": "lateral movement",
|
|
|
"command_and_control": "C2 communication",
|
|
|
"exfiltration": "data exfiltration"
|
|
|
}
|
|
|
|
|
|
readable_techniques = [technique_names.get(t, t) for t in attack_techniques[:3]]
|
|
|
|
|
|
if len(readable_techniques) == 1:
|
|
|
summary_parts.append(f"Indicates {readable_techniques[0]}.")
|
|
|
else:
|
|
|
summary_parts.append(f"Indicates {', '.join(readable_techniques[:-1])} and {readable_techniques[-1]}.")
|
|
|
|
|
|
|
|
|
preview = decoded_text[:100].strip()
|
|
|
if len(decoded_text) > 100:
|
|
|
preview += "..."
|
|
|
summary_parts.append(f"Command preview: {preview}")
|
|
|
|
|
|
return " ".join(summary_parts)
|
|
|
|
|
|
|
|
|
|
|
|
_decoder_tool = DecoderTool()
|
|
|
|
|
|
@tool
|
|
|
def decoder(encoded_string: str, encoding_type: str = "auto") -> dict:
|
|
|
"""Decodes Base64 or hex-encoded strings commonly used to hide malicious commands.
|
|
|
|
|
|
Use this tool when you see:
|
|
|
- PowerShell with -enc, -e, or -encodedcommand flags
|
|
|
- Long strings of random-looking characters (A-Z, a-z, 0-9, +, /, =)
|
|
|
- Commands that look obfuscated or unreadable
|
|
|
- Hex strings (0-9, A-F only) in unusual contexts
|
|
|
|
|
|
The tool automatically detects encoding type, decodes the string, and analyzes it for
|
|
|
malicious patterns including code execution, credential theft, C2 communication, and more.
|
|
|
|
|
|
Args:
|
|
|
encoded_string: The encoded string to decode (can be base64 or hex)
|
|
|
encoding_type: Type of encoding - "auto", "base64", or "hex" (default: "auto")
|
|
|
|
|
|
Returns:
|
|
|
Decoded content with detailed threat analysis including indicators, attack techniques,
|
|
|
and threat level assessment.
|
|
|
|
|
|
Examples:
|
|
|
- decoder("cG93ZXJzaGVsbC5leGU=") → decodes PowerShell commands
|
|
|
- decoder("496e766f6b652d576562526571756573742068747470733a2f2f6576696c2e636f6d", "hex")
|
|
|
"""
|
|
|
return _decoder_tool.run({
|
|
|
"encoded_string": encoded_string,
|
|
|
"encoding_type": encoding_type
|
|
|
}) |