File size: 14,963 Bytes
9e3d618 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
from langchain_core.tools import tool
from typing import Dict, Any
import base64
import binascii
import re
from .base_tool import Tool
class DecoderTool(Tool):
"""Decode Base64 and Hex encoded strings commonly used to hide malicious commands"""
def name(self) -> str:
return "decoder"
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
try:
encoded_string = input_data.get("encoded_string", "")
encoding_type = input_data.get("encoding_type", "auto")
if not encoded_string:
return {"error": "No encoded string provided"}
# Auto-detect encoding if not specified
if encoding_type == "auto":
encoding_type = self._detect_encoding(encoded_string)
# Decode the string
decoded_text, success = self._decode_string(encoded_string, encoding_type)
if not success:
return {
"tool": "decoder",
"encoded_string": encoded_string[:100] + "..." if len(encoded_string) > 100 else encoded_string,
"encoding_detected": encoding_type,
"decoded_text": None,
"success": False,
"error": "Failed to decode - invalid encoding or corrupted data"
}
# Analyze decoded content for threats
threat_analysis = self._analyze_decoded_content(decoded_text)
return {
"tool": "decoder",
"encoded_string": encoded_string[:100] + "..." if len(encoded_string) > 100 else encoded_string,
"encoding_detected": encoding_type,
"decoded_text": decoded_text,
"success": True,
"threat_analysis": threat_analysis
}
except Exception as e:
return {"error": f"{type(e).__name__}: {str(e)}"}
def _detect_encoding(self, string: str) -> str:
"""Auto-detect if string is base64 or hex"""
# Remove whitespace
clean_string = string.strip()
# Check for hex (only 0-9, A-F, a-f)
if re.match(r'^[0-9A-Fa-f]+$', clean_string) and len(clean_string) % 2 == 0:
# Could be hex, but also could be base64
# Hex is more restrictive, so check if it's valid hex first
if len(clean_string) > 10: # Reasonable length for encoded command
return "hex"
# Check for base64 characteristics
# Base64 uses A-Z, a-z, 0-9, +, /, and = for padding
if re.match(r'^[A-Za-z0-9+/]+=*$', clean_string):
return "base64"
# Default to base64 as it's more common in PowerShell attacks
return "base64"
def _decode_string(self, encoded_string: str, encoding_type: str) -> tuple:
"""Decode string and return (decoded_text, success)"""
try:
if encoding_type == "base64":
return self._decode_base64(encoded_string)
elif encoding_type == "hex":
return self._decode_hex(encoded_string)
else:
return None, False
except Exception as e:
return None, False
def _decode_base64(self, encoded_string: str) -> tuple:
"""Decode base64 string, trying multiple character encodings"""
try:
# Clean the string
clean_string = encoded_string.strip()
# Decode base64
decoded_bytes = base64.b64decode(clean_string)
# Try different character encodings (PowerShell commonly uses UTF-16LE)
encodings = ['utf-16le', 'utf-16be', 'utf-8', 'ascii', 'latin-1']
for encoding in encodings:
try:
decoded_text = decoded_bytes.decode(encoding)
# Filter out null bytes that sometimes appear in UTF-16
decoded_text = decoded_text.replace('\x00', '')
# If we got readable text, return it
if decoded_text.strip():
return decoded_text, True
except (UnicodeDecodeError, AttributeError):
continue
# If all encodings failed, return raw bytes as hex representation
return decoded_bytes.hex(), True
except Exception as e:
return None, False
def _decode_hex(self, encoded_string: str) -> tuple:
"""Decode hex string"""
try:
clean_string = encoded_string.strip()
decoded_bytes = bytes.fromhex(clean_string)
# Try UTF-8 first, then other encodings
encodings = ['utf-8', 'utf-16le', 'ascii', 'latin-1']
for encoding in encodings:
try:
decoded_text = decoded_bytes.decode(encoding)
decoded_text = decoded_text.replace('\x00', '')
if decoded_text.strip():
return decoded_text, True
except (UnicodeDecodeError, AttributeError):
continue
return None, False
except Exception as e:
return None, False
def _analyze_decoded_content(self, decoded_text: str) -> Dict[str, Any]:
"""Analyze decoded content for malicious patterns"""
if not decoded_text:
return {
"is_suspicious": False,
"threat_level": "UNKNOWN",
"indicators": [],
"attack_techniques": []
}
decoded_lower = decoded_text.lower()
indicators = []
attack_techniques = []
# PowerShell execution patterns
powershell_patterns = {
"iex": "Invoke-Expression - executes arbitrary code",
"invoke-expression": "Executes arbitrary PowerShell code",
"invoke-command": "Remote command execution",
"invoke-webrequest": "Downloads content from internet",
"downloadstring": "Downloads and executes remote code",
"downloadfile": "Downloads file from internet",
"webclient": "Network client for downloading content",
"net.webclient": "Network client object",
"bitstransfer": "Background file transfer (potential data exfiltration)",
"start-bitstransfer": "BITS transfer for file download"
}
# Obfuscation and evasion
evasion_patterns = {
"-nop": "NoProfile flag - avoids loading profile scripts",
"-noprofile": "Skips PowerShell profile loading",
"-w hidden": "Hidden window - runs invisibly",
"-windowstyle hidden": "Hides PowerShell window",
"-ep bypass": "Execution policy bypass",
"-executionpolicy bypass": "Disables script execution restrictions",
"-enc": "Encoded command (nested encoding)",
"-encodedcommand": "Base64 encoded command",
"frombase64string": "Additional decoding layer"
}
# Credential access
credential_patterns = {
"mimikatz": "Credential dumping tool",
"invoke-mimikatz": "PowerShell wrapper for Mimikatz",
"get-credential": "Prompts for credentials",
"convertto-securestring": "Password manipulation",
"sekurlsa": "Mimikatz module for credential extraction",
"lsadump": "LSA secrets dumping",
"password": "Potential credential theft",
"sam": "Security Account Manager access"
}
# Persistence mechanisms
persistence_patterns = {
"schtasks": "Scheduled task creation",
"new-scheduledtask": "Creates scheduled task for persistence",
"register-scheduledtask": "Registers scheduled task",
"startup": "Startup folder modification",
"registry": "Registry modification",
"wmi": "WMI-based persistence",
"new-service": "Service creation"
}
# Lateral movement
lateral_patterns = {
"psexec": "Remote execution tool",
"winrm": "Windows Remote Management",
"invoke-command -computername": "Remote command execution",
"enter-pssession": "Interactive remote session",
"wmic": "WMI command-line tool"
}
# Command and control
c2_patterns = {
"http://": "HTTP connection (potential C2)",
"https://": "HTTPS connection (potential C2)",
"://": "URL connection",
"tcp": "TCP network connection",
"socket": "Network socket creation",
"getstream": "Network stream (potential C2 channel)"
}
# Data exfiltration
exfil_patterns = {
"compress-archive": "File compression before exfiltration",
"out-file": "Writing to file (staging for exfiltration)",
"set-content": "File creation/modification",
"send-mailmessage": "Email-based exfiltration",
"ftp": "FTP transfer",
"post": "HTTP POST (potential data upload)"
}
# Check all patterns
all_patterns = [
(powershell_patterns, "execution"),
(evasion_patterns, "defense_evasion"),
(credential_patterns, "credential_access"),
(persistence_patterns, "persistence"),
(lateral_patterns, "lateral_movement"),
(c2_patterns, "command_and_control"),
(exfil_patterns, "exfiltration")
]
for pattern_dict, technique in all_patterns:
for pattern, description in pattern_dict.items():
if pattern in decoded_lower:
indicators.append(description)
if technique not in attack_techniques:
attack_techniques.append(technique)
# Determine threat level
threat_level = self._calculate_threat_level(len(indicators), attack_techniques)
# Generate threat summary
threat_summary = self._generate_threat_summary(decoded_text, indicators, attack_techniques)
return {
"is_suspicious": len(indicators) > 0,
"threat_level": threat_level,
"indicators": indicators[:10], # Limit to top 10 indicators
"indicator_count": len(indicators),
"attack_techniques": attack_techniques,
"threat_summary": threat_summary
}
def _calculate_threat_level(self, indicator_count: int, attack_techniques: list) -> str:
"""Calculate threat level based on indicators and techniques"""
if indicator_count == 0:
return "LOW"
# High-risk techniques
high_risk = ["credential_access", "command_and_control", "exfiltration"]
has_high_risk = any(tech in attack_techniques for tech in high_risk)
if has_high_risk or indicator_count >= 5:
return "CRITICAL"
elif indicator_count >= 3:
return "HIGH"
elif indicator_count >= 1:
return "MEDIUM"
else:
return "LOW"
def _generate_threat_summary(self, decoded_text: str, indicators: list, attack_techniques: list) -> str:
"""Generate human-readable threat summary"""
if not indicators:
return "No suspicious patterns detected in decoded content"
summary_parts = []
# Describe what was found
if len(indicators) == 1:
summary_parts.append(f"Found 1 suspicious indicator: {indicators[0]}")
else:
summary_parts.append(f"Found {len(indicators)} suspicious indicators including: {indicators[0]}")
# Describe attack techniques
if attack_techniques:
technique_names = {
"execution": "arbitrary code execution",
"defense_evasion": "defense evasion",
"credential_access": "credential theft",
"persistence": "persistence mechanisms",
"lateral_movement": "lateral movement",
"command_and_control": "C2 communication",
"exfiltration": "data exfiltration"
}
readable_techniques = [technique_names.get(t, t) for t in attack_techniques[:3]]
if len(readable_techniques) == 1:
summary_parts.append(f"Indicates {readable_techniques[0]}.")
else:
summary_parts.append(f"Indicates {', '.join(readable_techniques[:-1])} and {readable_techniques[-1]}.")
# Add command preview
preview = decoded_text[:100].strip()
if len(decoded_text) > 100:
preview += "..."
summary_parts.append(f"Command preview: {preview}")
return " ".join(summary_parts)
# Create singleton instance
_decoder_tool = DecoderTool()
@tool
def decoder(encoded_string: str, encoding_type: str = "auto") -> dict:
"""Decodes Base64 or hex-encoded strings commonly used to hide malicious commands.
Use this tool when you see:
- PowerShell with -enc, -e, or -encodedcommand flags
- Long strings of random-looking characters (A-Z, a-z, 0-9, +, /, =)
- Commands that look obfuscated or unreadable
- Hex strings (0-9, A-F only) in unusual contexts
The tool automatically detects encoding type, decodes the string, and analyzes it for
malicious patterns including code execution, credential theft, C2 communication, and more.
Args:
encoded_string: The encoded string to decode (can be base64 or hex)
encoding_type: Type of encoding - "auto", "base64", or "hex" (default: "auto")
Returns:
Decoded content with detailed threat analysis including indicators, attack techniques,
and threat level assessment.
Examples:
- decoder("cG93ZXJzaGVsbC5leGU=") → decodes PowerShell commands
- decoder("496e766f6b652d576562526571756573742068747470733a2f2f6576696c2e636f6d", "hex")
"""
return _decoder_tool.run({
"encoded_string": encoded_string,
"encoding_type": encoding_type
}) |