Spaces:
Sleeping
Sleeping
| from langchain_core.tools import tool | |
| from typing import Dict, Any | |
| import base64 | |
| import binascii | |
| import re | |
| from .base_tool import Tool | |
| class DecoderTool(Tool): | |
| """Decode Base64 and Hex encoded strings commonly used to hide malicious commands""" | |
| def name(self) -> str: | |
| return "decoder" | |
| def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: | |
| try: | |
| encoded_string = input_data.get("encoded_string", "") | |
| encoding_type = input_data.get("encoding_type", "auto") | |
| if not encoded_string: | |
| return {"error": "No encoded string provided"} | |
| # Auto-detect encoding if not specified | |
| if encoding_type == "auto": | |
| encoding_type = self._detect_encoding(encoded_string) | |
| # Decode the string | |
| decoded_text, success = self._decode_string(encoded_string, encoding_type) | |
| if not success: | |
| return { | |
| "tool": "decoder", | |
| "encoded_string": encoded_string[:100] + "..." if len(encoded_string) > 100 else encoded_string, | |
| "encoding_detected": encoding_type, | |
| "decoded_text": None, | |
| "success": False, | |
| "error": "Failed to decode - invalid encoding or corrupted data" | |
| } | |
| # Analyze decoded content for threats | |
| threat_analysis = self._analyze_decoded_content(decoded_text) | |
| return { | |
| "tool": "decoder", | |
| "encoded_string": encoded_string[:100] + "..." if len(encoded_string) > 100 else encoded_string, | |
| "encoding_detected": encoding_type, | |
| "decoded_text": decoded_text, | |
| "success": True, | |
| "threat_analysis": threat_analysis | |
| } | |
| except Exception as e: | |
| return {"error": f"{type(e).__name__}: {str(e)}"} | |
| def _detect_encoding(self, string: str) -> str: | |
| """Auto-detect if string is base64 or hex""" | |
| # Remove whitespace | |
| clean_string = string.strip() | |
| # Check for hex (only 0-9, A-F, a-f) | |
| if re.match(r'^[0-9A-Fa-f]+$', clean_string) and len(clean_string) % 2 == 0: | |
| # Could be hex, but also could be base64 | |
| # Hex is more restrictive, so check if it's valid hex first | |
| if len(clean_string) > 10: # Reasonable length for encoded command | |
| return "hex" | |
| # Check for base64 characteristics | |
| # Base64 uses A-Z, a-z, 0-9, +, /, and = for padding | |
| if re.match(r'^[A-Za-z0-9+/]+=*$', clean_string): | |
| return "base64" | |
| # Default to base64 as it's more common in PowerShell attacks | |
| return "base64" | |
| def _decode_string(self, encoded_string: str, encoding_type: str) -> tuple: | |
| """Decode string and return (decoded_text, success)""" | |
| try: | |
| if encoding_type == "base64": | |
| return self._decode_base64(encoded_string) | |
| elif encoding_type == "hex": | |
| return self._decode_hex(encoded_string) | |
| else: | |
| return None, False | |
| except Exception as e: | |
| return None, False | |
| def _decode_base64(self, encoded_string: str) -> tuple: | |
| """Decode base64 string, trying multiple character encodings""" | |
| try: | |
| # Clean the string | |
| clean_string = encoded_string.strip() | |
| # Decode base64 | |
| decoded_bytes = base64.b64decode(clean_string) | |
| # Try different character encodings (PowerShell commonly uses UTF-16LE) | |
| encodings = ['utf-16le', 'utf-16be', 'utf-8', 'ascii', 'latin-1'] | |
| for encoding in encodings: | |
| try: | |
| decoded_text = decoded_bytes.decode(encoding) | |
| # Filter out null bytes that sometimes appear in UTF-16 | |
| decoded_text = decoded_text.replace('\x00', '') | |
| # If we got readable text, return it | |
| if decoded_text.strip(): | |
| return decoded_text, True | |
| except (UnicodeDecodeError, AttributeError): | |
| continue | |
| # If all encodings failed, return raw bytes as hex representation | |
| return decoded_bytes.hex(), True | |
| except Exception as e: | |
| return None, False | |
| def _decode_hex(self, encoded_string: str) -> tuple: | |
| """Decode hex string""" | |
| try: | |
| clean_string = encoded_string.strip() | |
| decoded_bytes = bytes.fromhex(clean_string) | |
| # Try UTF-8 first, then other encodings | |
| encodings = ['utf-8', 'utf-16le', 'ascii', 'latin-1'] | |
| for encoding in encodings: | |
| try: | |
| decoded_text = decoded_bytes.decode(encoding) | |
| decoded_text = decoded_text.replace('\x00', '') | |
| if decoded_text.strip(): | |
| return decoded_text, True | |
| except (UnicodeDecodeError, AttributeError): | |
| continue | |
| return None, False | |
| except Exception as e: | |
| return None, False | |
| def _analyze_decoded_content(self, decoded_text: str) -> Dict[str, Any]: | |
| """Analyze decoded content for malicious patterns""" | |
| if not decoded_text: | |
| return { | |
| "is_suspicious": False, | |
| "threat_level": "UNKNOWN", | |
| "indicators": [], | |
| "attack_techniques": [] | |
| } | |
| decoded_lower = decoded_text.lower() | |
| indicators = [] | |
| attack_techniques = [] | |
| # PowerShell execution patterns | |
| powershell_patterns = { | |
| "iex": "Invoke-Expression - executes arbitrary code", | |
| "invoke-expression": "Executes arbitrary PowerShell code", | |
| "invoke-command": "Remote command execution", | |
| "invoke-webrequest": "Downloads content from internet", | |
| "downloadstring": "Downloads and executes remote code", | |
| "downloadfile": "Downloads file from internet", | |
| "webclient": "Network client for downloading content", | |
| "net.webclient": "Network client object", | |
| "bitstransfer": "Background file transfer (potential data exfiltration)", | |
| "start-bitstransfer": "BITS transfer for file download" | |
| } | |
| # Obfuscation and evasion | |
| evasion_patterns = { | |
| "-nop": "NoProfile flag - avoids loading profile scripts", | |
| "-noprofile": "Skips PowerShell profile loading", | |
| "-w hidden": "Hidden window - runs invisibly", | |
| "-windowstyle hidden": "Hides PowerShell window", | |
| "-ep bypass": "Execution policy bypass", | |
| "-executionpolicy bypass": "Disables script execution restrictions", | |
| "-enc": "Encoded command (nested encoding)", | |
| "-encodedcommand": "Base64 encoded command", | |
| "frombase64string": "Additional decoding layer" | |
| } | |
| # Credential access | |
| credential_patterns = { | |
| "mimikatz": "Credential dumping tool", | |
| "invoke-mimikatz": "PowerShell wrapper for Mimikatz", | |
| "get-credential": "Prompts for credentials", | |
| "convertto-securestring": "Password manipulation", | |
| "sekurlsa": "Mimikatz module for credential extraction", | |
| "lsadump": "LSA secrets dumping", | |
| "password": "Potential credential theft", | |
| "sam": "Security Account Manager access" | |
| } | |
| # Persistence mechanisms | |
| persistence_patterns = { | |
| "schtasks": "Scheduled task creation", | |
| "new-scheduledtask": "Creates scheduled task for persistence", | |
| "register-scheduledtask": "Registers scheduled task", | |
| "startup": "Startup folder modification", | |
| "registry": "Registry modification", | |
| "wmi": "WMI-based persistence", | |
| "new-service": "Service creation" | |
| } | |
| # Lateral movement | |
| lateral_patterns = { | |
| "psexec": "Remote execution tool", | |
| "winrm": "Windows Remote Management", | |
| "invoke-command -computername": "Remote command execution", | |
| "enter-pssession": "Interactive remote session", | |
| "wmic": "WMI command-line tool" | |
| } | |
| # Command and control | |
| c2_patterns = { | |
| "http://": "HTTP connection (potential C2)", | |
| "https://": "HTTPS connection (potential C2)", | |
| "://": "URL connection", | |
| "tcp": "TCP network connection", | |
| "socket": "Network socket creation", | |
| "getstream": "Network stream (potential C2 channel)" | |
| } | |
| # Data exfiltration | |
| exfil_patterns = { | |
| "compress-archive": "File compression before exfiltration", | |
| "out-file": "Writing to file (staging for exfiltration)", | |
| "set-content": "File creation/modification", | |
| "send-mailmessage": "Email-based exfiltration", | |
| "ftp": "FTP transfer", | |
| "post": "HTTP POST (potential data upload)" | |
| } | |
| # Check all patterns | |
| all_patterns = [ | |
| (powershell_patterns, "execution"), | |
| (evasion_patterns, "defense_evasion"), | |
| (credential_patterns, "credential_access"), | |
| (persistence_patterns, "persistence"), | |
| (lateral_patterns, "lateral_movement"), | |
| (c2_patterns, "command_and_control"), | |
| (exfil_patterns, "exfiltration") | |
| ] | |
| for pattern_dict, technique in all_patterns: | |
| for pattern, description in pattern_dict.items(): | |
| if pattern in decoded_lower: | |
| indicators.append(description) | |
| if technique not in attack_techniques: | |
| attack_techniques.append(technique) | |
| # Determine threat level | |
| threat_level = self._calculate_threat_level(len(indicators), attack_techniques) | |
| # Generate threat summary | |
| threat_summary = self._generate_threat_summary(decoded_text, indicators, attack_techniques) | |
| return { | |
| "is_suspicious": len(indicators) > 0, | |
| "threat_level": threat_level, | |
| "indicators": indicators[:10], # Limit to top 10 indicators | |
| "indicator_count": len(indicators), | |
| "attack_techniques": attack_techniques, | |
| "threat_summary": threat_summary | |
| } | |
| def _calculate_threat_level(self, indicator_count: int, attack_techniques: list) -> str: | |
| """Calculate threat level based on indicators and techniques""" | |
| if indicator_count == 0: | |
| return "LOW" | |
| # High-risk techniques | |
| high_risk = ["credential_access", "command_and_control", "exfiltration"] | |
| has_high_risk = any(tech in attack_techniques for tech in high_risk) | |
| if has_high_risk or indicator_count >= 5: | |
| return "CRITICAL" | |
| elif indicator_count >= 3: | |
| return "HIGH" | |
| elif indicator_count >= 1: | |
| return "MEDIUM" | |
| else: | |
| return "LOW" | |
| def _generate_threat_summary(self, decoded_text: str, indicators: list, attack_techniques: list) -> str: | |
| """Generate human-readable threat summary""" | |
| if not indicators: | |
| return "No suspicious patterns detected in decoded content" | |
| summary_parts = [] | |
| # Describe what was found | |
| if len(indicators) == 1: | |
| summary_parts.append(f"Found 1 suspicious indicator: {indicators[0]}") | |
| else: | |
| summary_parts.append(f"Found {len(indicators)} suspicious indicators including: {indicators[0]}") | |
| # Describe attack techniques | |
| if attack_techniques: | |
| technique_names = { | |
| "execution": "arbitrary code execution", | |
| "defense_evasion": "defense evasion", | |
| "credential_access": "credential theft", | |
| "persistence": "persistence mechanisms", | |
| "lateral_movement": "lateral movement", | |
| "command_and_control": "C2 communication", | |
| "exfiltration": "data exfiltration" | |
| } | |
| readable_techniques = [technique_names.get(t, t) for t in attack_techniques[:3]] | |
| if len(readable_techniques) == 1: | |
| summary_parts.append(f"Indicates {readable_techniques[0]}.") | |
| else: | |
| summary_parts.append(f"Indicates {', '.join(readable_techniques[:-1])} and {readable_techniques[-1]}.") | |
| # Add command preview | |
| preview = decoded_text[:100].strip() | |
| if len(decoded_text) > 100: | |
| preview += "..." | |
| summary_parts.append(f"Command preview: {preview}") | |
| return " ".join(summary_parts) | |
| # Create singleton instance | |
| _decoder_tool = DecoderTool() | |
| def decoder(encoded_string: str, encoding_type: str = "auto") -> dict: | |
| """Decodes Base64 or hex-encoded strings commonly used to hide malicious commands. | |
| Use this tool when you see: | |
| - PowerShell with -enc, -e, or -encodedcommand flags | |
| - Long strings of random-looking characters (A-Z, a-z, 0-9, +, /, =) | |
| - Commands that look obfuscated or unreadable | |
| - Hex strings (0-9, A-F only) in unusual contexts | |
| The tool automatically detects encoding type, decodes the string, and analyzes it for | |
| malicious patterns including code execution, credential theft, C2 communication, and more. | |
| Args: | |
| encoded_string: The encoded string to decode (can be base64 or hex) | |
| encoding_type: Type of encoding - "auto", "base64", or "hex" (default: "auto") | |
| Returns: | |
| Decoded content with detailed threat analysis including indicators, attack techniques, | |
| and threat level assessment. | |
| Examples: | |
| - decoder("cG93ZXJzaGVsbC5leGU=") → decodes PowerShell commands | |
| - decoder("496e766f6b652d576562526571756573742068747470733a2f2f6576696c2e636f6d", "hex") | |
| """ | |
| return _decoder_tool.run({ | |
| "encoded_string": encoded_string, | |
| "encoding_type": encoding_type | |
| }) |