# whale_monitor/core.py # (V3.3 - GEM-Architect: Enterprise Edition - Smart Web3 Limits) # يتضمن: Web3 Engine, Smart Retry for Limits, Solscan Fixes, & Full Logic. import os import asyncio import httpx import json import traceback import time from datetime import datetime, timedelta, timezone from collections import defaultdict, deque import logging import ssl from typing import List, Dict, Any # Web3 Integration from web3 import AsyncWeb3 # Local Imports from .config import ( DEFAULT_WHALE_THRESHOLD_USD, TRANSFER_EVENT_SIGNATURE, NATIVE_COINS, DEFAULT_EXCHANGE_ADDRESSES, COINGECKO_BASE_URL, COINGECKO_SYMBOL_MAPPING, ERC20_ABI ) from .rpc_manager import AdaptiveRpcManager # تعطيل تسجيل HTTP المزعج logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("web3").setLevel(logging.WARNING) class EnhancedWhaleMonitor: def __init__(self, contracts_db=None, r2_service=None): print("🔄 [WhaleMonitor V3.3] Initializing Enterprise Engine...") self.r2_service = r2_service self.data_manager = None self.rpc_manager: AdaptiveRpcManager = None self.whale_threshold_usd = DEFAULT_WHALE_THRESHOLD_USD self.contracts_db = {} self._initialize_contracts_db(contracts_db or {}) self.address_labels = {} self.address_categories = {'exchange': set(), 'cex': set(), 'dex': set(), 'bridge': set(), 'whale': set(), 'unknown': set()} self._initialize_comprehensive_exchange_addresses() self.token_price_cache = {} self.token_decimals_cache = {} if self.r2_service: asyncio.create_task(self._load_contracts_from_r2()) print("✅ [WhaleMonitor V3.3] System Ready.") def set_rpc_manager(self, rpc_manager: AdaptiveRpcManager): self.rpc_manager = rpc_manager print("✅ [WhaleMonitor] RPC Manager Linked.") # ============================================================================== # 📚 Initialization & Database Management # ============================================================================== def _initialize_contracts_db(self, initial_contracts): for symbol, contract_data in initial_contracts.items(): symbol_lower = symbol.lower() if isinstance(contract_data, dict) and 'address' in contract_data and 'network' in contract_data: self.contracts_db[symbol_lower] = contract_data elif isinstance(contract_data, str): self.contracts_db[symbol_lower] = { 'address': contract_data, 'network': self._detect_network_from_address(contract_data) } def _detect_network_from_address(self, address): if not isinstance(address, str): return 'ethereum' address_lower = address.lower() if address_lower.startswith('0x') and len(address_lower) == 42: return 'ethereum' if len(address) > 30 and not address.startswith('0x'): return 'solana' return 'ethereum' def _initialize_comprehensive_exchange_addresses(self): for category, addresses in DEFAULT_EXCHANGE_ADDRESSES.items(): for address in addresses: if not isinstance(address, str): continue addr_lower = address.lower() self.address_labels[addr_lower] = category self.address_categories['exchange'].add(addr_lower) if category in ['uniswap', 'pancakeswap', 'sushiswap']: self.address_categories['dex'].add(addr_lower) elif 'wormhole' in category or 'bridge' in category: self.address_categories['bridge'].add(addr_lower) else: self.address_categories['cex'].add(addr_lower) async def _load_contracts_from_r2(self): if not self.r2_service: return try: key = "contracts.json" if hasattr(self.r2_service, 's3_client'): response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) contracts_data = json.loads(response['Body'].read()) for s, d in contracts_data.items(): if s.lower() not in self.contracts_db: self.contracts_db[s.lower()] = d except Exception: pass # ============================================================================== # 🧠 Core Analysis Logic # ============================================================================== async def get_symbol_whale_activity(self, symbol: str, known_price: float = 0.0) -> Dict[str, Any]: try: if not self.rpc_manager: return self._create_error_response(symbol, "RPC Manager Not Injected") self.rpc_manager.reset_session_stats() base_symbol = symbol.split("/")[0].upper() if '/' in symbol else symbol.upper() if base_symbol in NATIVE_COINS: return self._create_native_coin_response(symbol) contract_info = await self._find_contract_address_enhanced(symbol) if not contract_info or not contract_info.get('address'): return self._create_no_contract_response(symbol) contract_address = contract_info['address'] network = contract_info['network'] current_price = known_price if current_price <= 0: current_price = await self._get_token_price(symbol) if current_price <= 0: return self._create_error_response(symbol, "Price unavailable") decimals = await self._get_token_decimals(contract_address, network) if decimals is None: return self._create_error_response(symbol, f"Decimals missing on {network}") # [CONFIG] Scan window scan_hours = 4 all_transfers = await self._get_targeted_transfer_data( contract_address, network, hours=scan_hours, price=current_price, decimals=decimals ) if not all_transfers: return self._create_no_transfers_response(symbol) print(f"📊 [WhaleMonitor] Analyzed {symbol}: {len(all_transfers)} transfers found ({scan_hours}h).") analysis_windows = [ {'name': '1h', 'minutes': 60}, {'name': '4h', 'minutes': 240}, {'name': '24h', 'minutes': 1440} ] multi_window_analysis = {} current_time_utc = datetime.now(timezone.utc) timestamp_cutoff_base = current_time_utc.timestamp() for window in analysis_windows: window_name = window['name'] cutoff_ts = timestamp_cutoff_base - (window['minutes'] * 60) window_transfers = [ t for t in all_transfers if float(t.get('timeStamp', 0)) >= cutoff_ts ] analysis_result = self._analyze_transfer_list( symbol=symbol, transfers=window_transfers, daily_volume_usd=0 ) multi_window_analysis[window_name] = analysis_result if self.r2_service: asyncio.create_task(self._save_learning_record( symbol, current_price, multi_window_analysis, self.rpc_manager.get_session_stats() )) short_term = multi_window_analysis.get('1h', {}) long_term = multi_window_analysis.get('4h', {}) signal = self._generate_enhanced_trading_signal(short_term) llm_summary = self._create_enhanced_llm_summary(signal, short_term) return { 'symbol': symbol, 'data_available': True, 'analysis_timestamp': current_time_utc.isoformat(), 'summary': { 'total_transfers_scanned': len(all_transfers), 'whale_count_1h': short_term.get('whale_transfers_count', 0), 'net_flow_1h': short_term.get('net_flow_usd', 0) }, 'exchange_flows': short_term, 'accumulation_analysis_24h': long_term, 'trading_signal': signal, 'llm_friendly_summary': llm_summary } except Exception as e: # print(f"❌ [WhaleMonitor] Critical Error {symbol}: {e}") traceback.print_exc() return self._create_error_response(symbol, str(e)) # ============================================================================== # 🕵️‍♂️ Data Fetching & Web3 Logic (Smart Limits) # ============================================================================== async def _get_targeted_transfer_data(self, contract_address: str, network: str, hours: int, price: float, decimals: int) -> List[Dict]: all_transfers = [] # A. Solana Logic if network == 'solana': try: transfers = await self._get_solscan_token_data(contract_address, hours, price) if transfers: return transfers except Exception as e: print(f" ⚠️ [Solana] Solscan failed: {e}") return [] # B. EVM Logic # 1. Web3 Direct (with Smart Retry) try: print(f" ⚡ [Web3] Scanning {network} logs...") web3_transfers = await self._get_web3_transfers(contract_address, network, hours, price, decimals) if web3_transfers: print(f" ✅ [Web3] Found {len(web3_transfers)} transfers.") return web3_transfers except Exception as e: print(f" ⚠️ [Web3] Failed: {e}. Trying fallbacks...") # 2. Moralis Fallback try: chain_id = DEFAULT_NETWORK_CONFIGS.get(network, {}).get('moralis_chain_id') if chain_id: print(f" 🛡️ [Moralis] Fallback scan...") moralis_transfers = await self._get_moralis_token_data(contract_address, chain_id, hours, price, decimals) if moralis_transfers: return moralis_transfers except Exception: pass # 3. Scanners Fallback try: print(f" 🔍 [Scanner] Fallback scan...") scanner_transfers = await self._get_scanner_token_data(contract_address, network, hours, price, decimals) if scanner_transfers: return scanner_transfers except Exception: pass return [] async def _get_web3_transfers(self, address: str, network: str, hours: int, price: float, decimals: int): """Web3 Fetcher with Limit Exceeded Handling""" w3 = self.rpc_manager.get_web3(network) if not w3: raise Exception(f"No Web3 provider for {network}") latest = await w3.eth.block_number block_time = 3 if network == 'bsc' else 12 if network == 'ethereum' else 2 # محاولة أولية لكامل المدة blocks_back = int((hours * 3600) / block_time) from_block = max(0, latest - blocks_back) contract = w3.eth.contract(address=w3.to_checksum_address(address), abi=ERC20_ABI) try: # المحاولة الأولى logs = await w3.eth.get_logs({ 'fromBlock': from_block, 'toBlock': 'latest', 'address': w3.to_checksum_address(address), 'topics': [TRANSFER_EVENT_SIGNATURE] }) except Exception as e: # [SMART RETRY] إذا فشل بسبب الحد، قلل المدة للربع (1 ساعة) err_str = str(e).lower() if 'limit' in err_str or 'range' in err_str or 'exceeded' in err_str: print(f" ⚠️ [Web3] Range too big. Retrying with 1 hour window...") blocks_back = int((1 * 3600) / block_time) # 1 hour from_block = max(0, latest - blocks_back) logs = await w3.eth.get_logs({ 'fromBlock': from_block, 'toBlock': 'latest', 'address': w3.to_checksum_address(address), 'topics': [TRANSFER_EVENT_SIGNATURE] }) else: raise e # خطأ آخر لا يمكن التعامل معه transfers = [] for log in logs: try: if len(log['topics']) < 3: continue val_hex = log['data'].hex() val_int = int(val_hex, 16) amount = val_int / (10 ** decimals) val_usd = amount * price # استخدام حد أدنى 20k للمضاربة السريعة if val_usd < 20000.0: continue from_addr = '0x' + log['topics'][1].hex()[-40:] to_addr = '0x' + log['topics'][2].hex()[-40:] transfers.append({ 'hash': log['transactionHash'].hex(), 'from': from_addr, 'to': to_addr, 'value_usd': val_usd, 'timeStamp': time.time(), 'network': network, 'source': 'web3' }) except: continue return transfers async def _get_solscan_token_data(self, address: str, hours: int, price: float): params = {"address": address, "limit": 50} data = await self.rpc_manager.get_solscan_api("/v2.0/token/transfer", params) if not data or not data.get('data'): return [] transfers = [] cutoff = time.time() - (hours * 3600) for tx in data['data']: try: ts = tx.get('blockTime', 0) if ts < cutoff: continue amount_raw = int(tx.get('amount', 0)) dec = tx.get('decimals', 9) val_usd = (amount_raw / (10**dec)) * price if val_usd < 20000.0: continue transfers.append({ 'hash': tx.get('signature'), 'from': tx.get('from_address'), 'to': tx.get('to_address'), 'value_usd': val_usd, 'timeStamp': ts, 'network': 'solana', 'source': 'solscan' }) except: continue return transfers async def _get_moralis_token_data(self, address, chain, hours, price, decimals): params = { "chain": chain, "contract_address": address, "order": "DESC", "limit": 100 } data = await self.rpc_manager.get_moralis_api(params) if not data or not data.get('result'): return [] transfers = [] for tx in data['result']: try: val = int(tx['value']) / (10**decimals) * price if val < 20000.0: continue transfers.append({ 'hash': tx['transaction_hash'], 'from': tx['from_address'], 'to': tx['to_address'], 'value_usd': val, 'timeStamp': datetime.fromisoformat(tx['block_timestamp'][:-1]).timestamp(), 'source': 'moralis' }) except: continue return transfers async def _get_scanner_token_data(self, address, network, hours, price, decimals): config = self.rpc_manager.get_explorer_config(network) if not config: return [] params = { "module": "account", "action": "tokentx", "contractaddress": address, "page": 1, "offset": 100, "sort": "desc", "apikey": config.get('api_key') } data = await self.rpc_manager.get_scanner_api(config['api_url'], params) if not data or not data.get('result') or isinstance(data['result'], str): return [] transfers = [] for tx in data['result']: try: val = int(tx['value']) / (10**decimals) * price if val < 20000.0: continue transfers.append({ 'hash': tx['hash'], 'from': tx['from'], 'to': tx['to'], 'value_usd': val, 'timeStamp': int(tx['timeStamp']), 'source': 'scanner' }) except: continue return transfers # ============================================================================== # 🕵️‍♂️ Contract Finding & Search # ============================================================================== async def _find_contract_address_enhanced(self, symbol: str): base_symbol = symbol.split('/')[0].lower() if base_symbol in self.contracts_db: return self.contracts_db[base_symbol] print(f" 🔍 Searching CoinGecko for {base_symbol}...") try: data = await self.rpc_manager.get_coingecko_api('/search', params={'query': base_symbol}) if not data or not data.get('coins'): return None best_id = None for coin in data['coins']: if coin['symbol'].lower() == base_symbol: best_id = coin['id'] break if not best_id: best_id = data['coins'][0]['id'] details = await self.rpc_manager.get_coingecko_api(f'/coins/{best_id}', params={ "localization": "false", "tickers": "false", "market_data": "false", "community_data": "false", "developer_data": "false" }) if not details or 'platforms' not in details: return None platforms = details['platforms'] priority_nets = ['binance-smart-chain', 'polygon-pos', 'ethereum', 'solana', 'avalanche'] mapping = {'binance-smart-chain': 'bsc', 'polygon-pos': 'polygon', 'ethereum': 'ethereum', 'solana': 'solana', 'avalanche': 'avalanche'} for net_key in priority_nets: if net_key in platforms and platforms[net_key]: res = {'address': platforms[net_key], 'network': mapping[net_key]} self.contracts_db[base_symbol] = res return res except Exception: pass return None # ============================================================================== # 📐 Analysis Logic # ============================================================================== def _analyze_transfer_list(self, symbol: str, transfers: List[Dict], daily_volume_usd: float) -> Dict[str, Any]: stats = { 'to_exchanges_usd': 0.0, 'from_exchanges_usd': 0.0, 'deposit_count': 0, 'withdrawal_count': 0, 'whale_transfers_count': 0, 'total_volume': 0.0 } for tx in transfers: val = tx.get('value_usd', 0) stats['whale_transfers_count'] += 1 stats['total_volume'] += val to_addr = tx.get('to', '').lower() from_addr = tx.get('from', '').lower() is_deposit = to_addr in self.address_categories['exchange'] is_withdrawal = from_addr in self.address_categories['exchange'] if is_deposit: stats['to_exchanges_usd'] += val stats['deposit_count'] += 1 if is_withdrawal: stats['from_exchanges_usd'] += val stats['withdrawal_count'] += 1 stats['net_flow_usd'] = stats['to_exchanges_usd'] - stats['from_exchanges_usd'] return stats def _generate_enhanced_trading_signal(self, analysis: Dict) -> Dict: net = analysis.get('net_flow_usd', 0) whales = analysis.get('whale_transfers_count', 0) if whales < 3: return {'action': 'HOLD', 'confidence': 0.1, 'reason': 'Low Activity'} if net > 500_000: return {'action': 'SELL', 'confidence': 0.8, 'reason': 'Exchange Inflow'} elif net < -500_000: return {'action': 'BUY', 'confidence': 0.8, 'reason': 'Exchange Outflow'} return {'action': 'WATCH', 'confidence': 0.5, 'reason': 'Mixed Activity'} def _create_enhanced_llm_summary(self, signal, analysis): return {'summary': f"Whales: {analysis.get('whale_transfers_count')}", 'action': signal['action']} # ============================================================================== # ⚙️ Helpers # ============================================================================== async def _get_token_price(self, symbol): try: res = await self.rpc_manager.get_coingecko_api('/simple/price', params={'ids': COINGECKO_SYMBOL_MAPPING.get(symbol.split('/')[0], symbol), 'vs_currencies': 'usd'}) if res: return list(res.values())[0]['usd'] except: return 0.0 async def _get_token_decimals(self, address, network): key = f"{address}_{network}" if key in self.token_decimals_cache: return self.token_decimals_cache[key] try: w3 = self.rpc_manager.get_web3(network) if w3: contract = w3.eth.contract(address=w3.to_checksum_address(address), abi=ERC20_ABI) dec = await contract.functions.decimals().call() self.token_decimals_cache[key] = dec return dec except: pass if network == 'solana': return 9 return 18 async def _save_learning_record(self, symbol, price, analysis, api_stats): if not self.r2_service: return try: record = {'id': f"{symbol}_{int(time.time())}", 'symbol': symbol, 'price': price, 'analysis': analysis, 'api_stats': api_stats, 'timestamp': datetime.now(timezone.utc).isoformat()} if hasattr(self.r2_service, 'save_record'): await self.r2_service.save_record(record) except: pass def _create_error_response(self, symbol, err): return {'symbol': symbol, 'data_available': False, 'error': err, 'trading_signal': {}} def _create_native_coin_response(self, symbol): return {'symbol': symbol, 'data_available': False, 'error': 'Native Coin', 'trading_signal': {}} def _create_no_contract_response(self, symbol): return {'symbol': symbol, 'data_available': False, 'error': 'No Contract', 'trading_signal': {}} def _create_no_transfers_response(self, symbol): return {'symbol': symbol, 'data_available': True, 'summary': {'total': 0}, 'trading_signal': {'action': 'HOLD'}} async def cleanup(self): print("🛑 [WhaleMonitor] Cleanup.")