minhan6559's picture
Upload 102 files
9e3d618 verified
from langchain_core.tools import tool
from typing import Dict, Any
import base64
import binascii
import re
from .base_tool import Tool
class DecoderTool(Tool):
"""Decode Base64 and Hex encoded strings commonly used to hide malicious commands"""
def name(self) -> str:
return "decoder"
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
try:
encoded_string = input_data.get("encoded_string", "")
encoding_type = input_data.get("encoding_type", "auto")
if not encoded_string:
return {"error": "No encoded string provided"}
# Auto-detect encoding if not specified
if encoding_type == "auto":
encoding_type = self._detect_encoding(encoded_string)
# Decode the string
decoded_text, success = self._decode_string(encoded_string, encoding_type)
if not success:
return {
"tool": "decoder",
"encoded_string": encoded_string[:100] + "..." if len(encoded_string) > 100 else encoded_string,
"encoding_detected": encoding_type,
"decoded_text": None,
"success": False,
"error": "Failed to decode - invalid encoding or corrupted data"
}
# Analyze decoded content for threats
threat_analysis = self._analyze_decoded_content(decoded_text)
return {
"tool": "decoder",
"encoded_string": encoded_string[:100] + "..." if len(encoded_string) > 100 else encoded_string,
"encoding_detected": encoding_type,
"decoded_text": decoded_text,
"success": True,
"threat_analysis": threat_analysis
}
except Exception as e:
return {"error": f"{type(e).__name__}: {str(e)}"}
def _detect_encoding(self, string: str) -> str:
"""Auto-detect if string is base64 or hex"""
# Remove whitespace
clean_string = string.strip()
# Check for hex (only 0-9, A-F, a-f)
if re.match(r'^[0-9A-Fa-f]+$', clean_string) and len(clean_string) % 2 == 0:
# Could be hex, but also could be base64
# Hex is more restrictive, so check if it's valid hex first
if len(clean_string) > 10: # Reasonable length for encoded command
return "hex"
# Check for base64 characteristics
# Base64 uses A-Z, a-z, 0-9, +, /, and = for padding
if re.match(r'^[A-Za-z0-9+/]+=*$', clean_string):
return "base64"
# Default to base64 as it's more common in PowerShell attacks
return "base64"
def _decode_string(self, encoded_string: str, encoding_type: str) -> tuple:
"""Decode string and return (decoded_text, success)"""
try:
if encoding_type == "base64":
return self._decode_base64(encoded_string)
elif encoding_type == "hex":
return self._decode_hex(encoded_string)
else:
return None, False
except Exception as e:
return None, False
def _decode_base64(self, encoded_string: str) -> tuple:
"""Decode base64 string, trying multiple character encodings"""
try:
# Clean the string
clean_string = encoded_string.strip()
# Decode base64
decoded_bytes = base64.b64decode(clean_string)
# Try different character encodings (PowerShell commonly uses UTF-16LE)
encodings = ['utf-16le', 'utf-16be', 'utf-8', 'ascii', 'latin-1']
for encoding in encodings:
try:
decoded_text = decoded_bytes.decode(encoding)
# Filter out null bytes that sometimes appear in UTF-16
decoded_text = decoded_text.replace('\x00', '')
# If we got readable text, return it
if decoded_text.strip():
return decoded_text, True
except (UnicodeDecodeError, AttributeError):
continue
# If all encodings failed, return raw bytes as hex representation
return decoded_bytes.hex(), True
except Exception as e:
return None, False
def _decode_hex(self, encoded_string: str) -> tuple:
"""Decode hex string"""
try:
clean_string = encoded_string.strip()
decoded_bytes = bytes.fromhex(clean_string)
# Try UTF-8 first, then other encodings
encodings = ['utf-8', 'utf-16le', 'ascii', 'latin-1']
for encoding in encodings:
try:
decoded_text = decoded_bytes.decode(encoding)
decoded_text = decoded_text.replace('\x00', '')
if decoded_text.strip():
return decoded_text, True
except (UnicodeDecodeError, AttributeError):
continue
return None, False
except Exception as e:
return None, False
def _analyze_decoded_content(self, decoded_text: str) -> Dict[str, Any]:
"""Analyze decoded content for malicious patterns"""
if not decoded_text:
return {
"is_suspicious": False,
"threat_level": "UNKNOWN",
"indicators": [],
"attack_techniques": []
}
decoded_lower = decoded_text.lower()
indicators = []
attack_techniques = []
# PowerShell execution patterns
powershell_patterns = {
"iex": "Invoke-Expression - executes arbitrary code",
"invoke-expression": "Executes arbitrary PowerShell code",
"invoke-command": "Remote command execution",
"invoke-webrequest": "Downloads content from internet",
"downloadstring": "Downloads and executes remote code",
"downloadfile": "Downloads file from internet",
"webclient": "Network client for downloading content",
"net.webclient": "Network client object",
"bitstransfer": "Background file transfer (potential data exfiltration)",
"start-bitstransfer": "BITS transfer for file download"
}
# Obfuscation and evasion
evasion_patterns = {
"-nop": "NoProfile flag - avoids loading profile scripts",
"-noprofile": "Skips PowerShell profile loading",
"-w hidden": "Hidden window - runs invisibly",
"-windowstyle hidden": "Hides PowerShell window",
"-ep bypass": "Execution policy bypass",
"-executionpolicy bypass": "Disables script execution restrictions",
"-enc": "Encoded command (nested encoding)",
"-encodedcommand": "Base64 encoded command",
"frombase64string": "Additional decoding layer"
}
# Credential access
credential_patterns = {
"mimikatz": "Credential dumping tool",
"invoke-mimikatz": "PowerShell wrapper for Mimikatz",
"get-credential": "Prompts for credentials",
"convertto-securestring": "Password manipulation",
"sekurlsa": "Mimikatz module for credential extraction",
"lsadump": "LSA secrets dumping",
"password": "Potential credential theft",
"sam": "Security Account Manager access"
}
# Persistence mechanisms
persistence_patterns = {
"schtasks": "Scheduled task creation",
"new-scheduledtask": "Creates scheduled task for persistence",
"register-scheduledtask": "Registers scheduled task",
"startup": "Startup folder modification",
"registry": "Registry modification",
"wmi": "WMI-based persistence",
"new-service": "Service creation"
}
# Lateral movement
lateral_patterns = {
"psexec": "Remote execution tool",
"winrm": "Windows Remote Management",
"invoke-command -computername": "Remote command execution",
"enter-pssession": "Interactive remote session",
"wmic": "WMI command-line tool"
}
# Command and control
c2_patterns = {
"http://": "HTTP connection (potential C2)",
"https://": "HTTPS connection (potential C2)",
"://": "URL connection",
"tcp": "TCP network connection",
"socket": "Network socket creation",
"getstream": "Network stream (potential C2 channel)"
}
# Data exfiltration
exfil_patterns = {
"compress-archive": "File compression before exfiltration",
"out-file": "Writing to file (staging for exfiltration)",
"set-content": "File creation/modification",
"send-mailmessage": "Email-based exfiltration",
"ftp": "FTP transfer",
"post": "HTTP POST (potential data upload)"
}
# Check all patterns
all_patterns = [
(powershell_patterns, "execution"),
(evasion_patterns, "defense_evasion"),
(credential_patterns, "credential_access"),
(persistence_patterns, "persistence"),
(lateral_patterns, "lateral_movement"),
(c2_patterns, "command_and_control"),
(exfil_patterns, "exfiltration")
]
for pattern_dict, technique in all_patterns:
for pattern, description in pattern_dict.items():
if pattern in decoded_lower:
indicators.append(description)
if technique not in attack_techniques:
attack_techniques.append(technique)
# Determine threat level
threat_level = self._calculate_threat_level(len(indicators), attack_techniques)
# Generate threat summary
threat_summary = self._generate_threat_summary(decoded_text, indicators, attack_techniques)
return {
"is_suspicious": len(indicators) > 0,
"threat_level": threat_level,
"indicators": indicators[:10], # Limit to top 10 indicators
"indicator_count": len(indicators),
"attack_techniques": attack_techniques,
"threat_summary": threat_summary
}
def _calculate_threat_level(self, indicator_count: int, attack_techniques: list) -> str:
"""Calculate threat level based on indicators and techniques"""
if indicator_count == 0:
return "LOW"
# High-risk techniques
high_risk = ["credential_access", "command_and_control", "exfiltration"]
has_high_risk = any(tech in attack_techniques for tech in high_risk)
if has_high_risk or indicator_count >= 5:
return "CRITICAL"
elif indicator_count >= 3:
return "HIGH"
elif indicator_count >= 1:
return "MEDIUM"
else:
return "LOW"
def _generate_threat_summary(self, decoded_text: str, indicators: list, attack_techniques: list) -> str:
"""Generate human-readable threat summary"""
if not indicators:
return "No suspicious patterns detected in decoded content"
summary_parts = []
# Describe what was found
if len(indicators) == 1:
summary_parts.append(f"Found 1 suspicious indicator: {indicators[0]}")
else:
summary_parts.append(f"Found {len(indicators)} suspicious indicators including: {indicators[0]}")
# Describe attack techniques
if attack_techniques:
technique_names = {
"execution": "arbitrary code execution",
"defense_evasion": "defense evasion",
"credential_access": "credential theft",
"persistence": "persistence mechanisms",
"lateral_movement": "lateral movement",
"command_and_control": "C2 communication",
"exfiltration": "data exfiltration"
}
readable_techniques = [technique_names.get(t, t) for t in attack_techniques[:3]]
if len(readable_techniques) == 1:
summary_parts.append(f"Indicates {readable_techniques[0]}.")
else:
summary_parts.append(f"Indicates {', '.join(readable_techniques[:-1])} and {readable_techniques[-1]}.")
# Add command preview
preview = decoded_text[:100].strip()
if len(decoded_text) > 100:
preview += "..."
summary_parts.append(f"Command preview: {preview}")
return " ".join(summary_parts)
# Create singleton instance
_decoder_tool = DecoderTool()
@tool
def decoder(encoded_string: str, encoding_type: str = "auto") -> dict:
"""Decodes Base64 or hex-encoded strings commonly used to hide malicious commands.
Use this tool when you see:
- PowerShell with -enc, -e, or -encodedcommand flags
- Long strings of random-looking characters (A-Z, a-z, 0-9, +, /, =)
- Commands that look obfuscated or unreadable
- Hex strings (0-9, A-F only) in unusual contexts
The tool automatically detects encoding type, decodes the string, and analyzes it for
malicious patterns including code execution, credential theft, C2 communication, and more.
Args:
encoded_string: The encoded string to decode (can be base64 or hex)
encoding_type: Type of encoding - "auto", "base64", or "hex" (default: "auto")
Returns:
Decoded content with detailed threat analysis including indicators, attack techniques,
and threat level assessment.
Examples:
- decoder("cG93ZXJzaGVsbC5leGU=") → decodes PowerShell commands
- decoder("496e766f6b652d576562526571756573742068747470733a2f2f6576696c2e636f6d", "hex")
"""
return _decoder_tool.run({
"encoded_string": encoded_string,
"encoding_type": encoding_type
})