Trad / whale_news_data.py
Riy777's picture
Update whale_news_data.py
781de6c
raw
history blame
98.9 kB
# whale_news_data.py
import os
import asyncio
import httpx
import json
import traceback
import time
from datetime import datetime, timedelta
from collections import defaultdict, deque
import ccxt.pro as ccxt # ✅ ملاحظة: هذا هو ccxt.pro (غير متزامن)
import numpy as np
import logging
import ssl # ✅ استيراد مكتبة ssl للتحقق
from botocore.exceptions import ClientError # ✅ استيراد ClientError
import base58 # ✅ استيراد base58 للتحقق من عناوين Solana
# تعطيل تسجيل HTTP المزعج
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
# ✅ الإصلاح: تعريف ثابت لتوقيع حدث التحويل ERC20
TRANSFER_EVENT_SIGNATURE = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
class EnhancedWhaleMonitor:
def __init__(self, contracts_db=None, r2_service=None):
# ✅ الإصلاح: إنشاء سياق SSL آمن للتحقق من الشهادات
self.ssl_context = ssl.create_default_context()
self.http_client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
follow_redirects=True,
verify=self.ssl_context # ✅ استخدام سياق SSL للتحقق
)
# الاحتفاظ بالمفاتيح المطلوبة فقط
self.moralis_key = os.getenv("MORALIS_KEY")
self.etherscan_key = os.getenv("ETHERSCAN_KEY")
self.infura_key = os.getenv("INFURA_KEY")
# ✅ إضافة مفاتيح اختيارية للمستكشفات الأخرى
self.bscscan_key = os.getenv("BSCSCAN_KEY")
self.polygonscan_key = os.getenv("POLYGONSCAN_KEY")
# التحقق من صحة المفاتيح
self._validate_api_keys()
self.whale_threshold_usd = 50000
# تحسين تخزين العقود والشبكات
self.contracts_db = {}
self._initialize_contracts_db(contracts_db or {})
self.data_manager = None
# ✅ الإصلاح: إضافة منظم طلبات لـ CoinGecko
self.coingecko_semaphore = asyncio.Semaphore(1) # 1 طلب متزامن فقط لـ CoinGecko
# ✅ الإصلاح: إضافة منظم طلبات لـ RPC
self.rpc_semaphore = asyncio.Semaphore(5) # 5 طلبات RPC متزامنة كحد أقصى (لجميع الشبكات)
self.r2_service = r2_service
self.address_labels = {}
self._initialize_comprehensive_exchange_addresses()
self.token_price_cache = {}
self.token_decimals_cache = {}
# إضافة جميع الشبكات المدعومة مع Solana
self.supported_networks = {
'ethereum': {
'name': 'Ethereum',
'chain_id': '0x1',
'native_coin': 'ETH',
'explorer': 'etherscan',
'rpc_endpoints': self._get_ethereum_rpc_endpoints(),
'type': 'evm'
},
'bsc': {
'name': 'Binance Smart Chain',
'chain_id': '0x38',
'native_coin': 'BNB',
'explorer': 'bscscan',
'rpc_endpoints': self._get_bsc_rpc_endpoints(),
'type': 'evm'
},
'polygon': {
'name': 'Polygon',
'chain_id': '0x89',
'native_coin': 'MATIC',
'explorer': 'polygonscan',
'rpc_endpoints': self._get_polygon_rpc_endpoints(),
'type': 'evm'
},
'arbitrum': {
'name': 'Arbitrum',
'chain_id': '0xa4b1',
'native_coin': 'ETH',
'explorer': 'arbiscan',
'rpc_endpoints': self._get_arbitrum_rpc_endpoints(),
'type': 'evm'
},
'optimism': {
'name': 'Optimism',
'chain_id': '0xa',
'native_coin': 'ETH',
'explorer': 'optimistic',
'rpc_endpoints': self._get_optimism_rpc_endpoints(),
'type': 'evm'
},
'avalanche': {
'name': 'Avalanche',
'chain_id': '0xa86a',
'native_coin': 'AVAX',
'explorer': 'snowtrace',
'rpc_endpoints': self._get_avalanche_rpc_endpoints(),
'type': 'evm'
},
'fantom': {
'name': 'Fantom',
'chain_id': '0xfa',
'native_coin': 'FTM',
'explorer': 'ftmscan',
'rpc_endpoints': self._get_fantom_rpc_endpoints(),
'type': 'evm'
},
'solana': {
'name': 'Solana',
'chain_id': 'mainnet-beta',
'native_coin': 'SOL',
'explorer': 'solscan',
'rpc_endpoints': self._get_solana_rpc_endpoints(), # ✅ استخدام الدالة المحدثة
'type': 'solana'
}
}
if self.r2_service:
asyncio.create_task(self._load_contracts_from_r2())
def _initialize_contracts_db(self, initial_contracts):
"""تهيئة قاعدة بيانات العقود مع تحسين تخزين الشبكات"""
print("🔄 تهيئة قاعدة بيانات العقود...")
for symbol, contract_data in initial_contracts.items():
symbol_lower = symbol.lower() # استخدام lower لتوحيد المفاتيح
if isinstance(contract_data, dict) and 'address' in contract_data and 'network' in contract_data:
self.contracts_db[symbol_lower] = contract_data
elif isinstance(contract_data, str):
self.contracts_db[symbol_lower] = {
'address': contract_data,
'network': self._detect_network_from_address(contract_data)
}
# تجاهل الإدخالات غير الصالحة
print(f"✅ تم تحميل {len(self.contracts_db)} عقد في قاعدة البيانات")
def _detect_network_from_address(self, address):
"""اكتشاف الشبكة من عنوان العقد"""
if not isinstance(address, str):
return 'ethereum'
address_lower = address.lower()
if address_lower.startswith('0x') and len(address_lower) == 42:
# هنا يمكن إضافة منطق أدق مستقبلاً للتمييز بين شبكات EVM
# حاليًا، الافتراضي هو Ethereum
return 'ethereum'
elif len(address_lower) >= 32 and len(address_lower) <= 44: # طول عناوين Solana Base58
base58_chars = set("123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz")
if all(char in base58_chars for char in address):
try:
base58.b58decode(address)
return 'solana'
except ValueError:
return 'ethereum' # ليس Base58 صالحًا
else:
return 'ethereum' # يحتوي أحرف غير صالحة
else:
return 'ethereum' # افتراضي
def _validate_api_keys(self):
"""التحقق من صحة مفاتيح API المطلوبة فقط"""
print("🔍 التحقق من صحة مفاتيح API...")
# إضافة المفاتيح الاختيارية الجديدة
api_keys_to_check = {
'MORALIS_KEY': 'moralis_key',
'ETHERSCAN_KEY': 'etherscan_key',
'INFURA_KEY': 'infura_key',
'BSCSCAN_KEY': 'bscscan_key',
'POLYGONSCAN_KEY': 'polygonscan_key'
# يمكن إضافة مفاتيح أخرى هنا لمستكشفات أخرى
}
for env_var, attr_name in api_keys_to_check.items():
# التأكد من أن الكلاس لديه الخاصية قبل الوصول إليها
if hasattr(self, attr_name):
key_value = getattr(self, attr_name, None)
if key_value:
if isinstance(key_value, str) and len(key_value) >= 10:
print(f"✅ مفتاح {env_var} موجود وصالح مبدئيًا.")
else:
print(f"❌ مفتاح {env_var} موجود ولكنه غير صالح. سيتم التعطيل.")
setattr(self, attr_name, None)
else:
print(f"⚠️ مفتاح {env_var} غير متوفر أو فارغ.")
setattr(self, attr_name, None)
else:
# إذا لم تكن الخاصية موجودة أصلاً
print(f"ℹ️ خاصية المفتاح {attr_name} (لـ {env_var}) غير معرفة في الكلاس.")
setattr(self, attr_name, None)
def _get_ethereum_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Ethereum"""
endpoints = []
if self.infura_key:
endpoints.append(f'https://mainnet.infura.io/v3/{self.infura_key}')
endpoints.extend([
'https://rpc.ankr.com/eth',
'https://cloudflare-eth.com',
'https://eth.llamarpc.com',
'https://ethereum.publicnode.com',
'https://1rpc.io/eth',
])
return endpoints
def _get_bsc_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة BSC"""
return [
'https://bsc-dataseed.binance.org/',
'https://bsc-dataseed1.defibit.io/',
'https://bsc.publicnode.com',
'https://binance.llamarpc.com',
'https://1rpc.io/bnb',
'https://rpc.ankr.com/bsc'
]
def _get_polygon_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Polygon"""
endpoints = []
if self.infura_key:
endpoints.append(f'https://polygon-mainnet.infura.io/v3/{self.infura_key}')
endpoints.extend([
'https://polygon-rpc.com',
'https://polygon.llamarpc.com',
'https://polygon-bor.publicnode.com',
'https://1rpc.io/matic',
'https://rpc.ankr.com/polygon'
])
return endpoints
def _get_arbitrum_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Arbitrum"""
endpoints = []
if self.infura_key:
endpoints.append(f'https://arbitrum-mainnet.infura.io/v3/{self.infura_key}')
endpoints.extend([
'https://arb1.arbitrum.io/rpc',
'https://arbitrum.llamarpc.com',
'https://arbitrum.publicnode.com',
'https://1rpc.io/arb',
'https://rpc.ankr.com/arbitrum'
])
return endpoints
def _get_optimism_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Optimism"""
endpoints = []
if self.infura_key:
endpoints.append(f'https://optimism-mainnet.infura.io/v3/{self.infura_key}')
endpoints.extend([
'https://mainnet.optimism.io',
'https://optimism.llamarpc.com',
'https://optimism.publicnode.com',
'https://1rpc.io/op',
'https://rpc.ankr.com/optimism'
])
return endpoints
def _get_avalanche_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Avalanche"""
return [
'https://api.avax.network/ext/bc/C/rpc',
'https://avalanche.publicnode.com',
'https://avax.llamarpc.com',
'https://1rpc.io/avax/c',
'https://rpc.ankr.com/avalanche'
]
def _get_fantom_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Fantom"""
return [
'https://rpc.ftm.tools/',
'https://fantom.publicnode.com',
'https://fantom.llamarpc.com',
'https://1rpc.io/ftm',
'https://rpc.ankr.com/fantom'
]
def _get_solana_rpc_endpoints(self):
"""الحصول على نقاط RPC مجانية لشبكة Solana (بدون مفاتيح API) - محدثة"""
endpoints = [
'https://api.mainnet-beta.solana.com', # الرسمي (قد يكون محدود الطلبات بشدة)
'https://solana-mainnet.publicnode.com', # PublicNode (جديد وموثوق)
'https://ssc-dao.genesysgo.net/', # GenesysGo (قديم نسبياً)
'https://solana-mainnet.rpc.extrnode.com', # Extrnode (مزود عام)
'https://solana-mainnet.phantom.tech/', # Phantom Wallet RPC (قد يكون للاستخدام المحدود)
'https://rpc.ankr.com/solana', # Ankr (يسبب 403 غالباً بدون مفتاح)
# 'https://mainnet.rpc.solana.pyth.network', # تمت إزالته (خطأ DNS)
]
print(f"ℹ️ قائمة Solana RPC Endpoints (مجانية وبدون مفاتيح) المستخدمة: {endpoints}")
return endpoints
def _initialize_comprehensive_exchange_addresses(self):
"""تهيئة قاعدة بيانات شاملة لعناوين كل المنصات"""
self.exchange_addresses = {
'kucoin': [
'0x2b5634c42055806a59e9107ed44d43c426e58258', '0x689c56aef474df92d44a1b70850f808488f9769c',
'0xa1d8d972560c2f8144af871db508f0b0b10a3fbf', '0x4ad64983349c49defe8d7a4686202d24b25d0ce8',
'0x1692e170361cefd1eb7240ec13d048fd9af6d667', '0xd6216fc19db775df9774a6e33526131da7d19a2c',
'0xe59cd29be3be4461d79c0881a238c467a2b3775c', '0x899b5d52671830f567bf43a14684eb14e1f945fe'
],
'binance': [
'0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be', '0xd551234ae421e3bcba99a0da6d736074f22192ff',
'0x564286362092d8e7936f0549571a803b203aaced', '0x0681d8db095565fe8a346fa0277bffde9c0edbbf',
'0xfe9e8709d3215310075d67e3ed32a380ccf451c8', '0x28c6c06298d514db089934071355e5743bf21d60'
],
'coinbase': [
'0x71660c4005ba85c37ccec55d0c4493e66fe775d3', '0x503828976d22510aad0201ac7ec88293211d23da',
'0xddfabcdc4d8ffc6d5beaf154f18b778f892a0740'
],
'kraken': [ '0x2910543af39aba0cd09dbb2d50200b3e800a63d2', '0xa160cdab225685da1d56aa342ad8841c3b53f291' ],
'okx': [ '0x6cc5f688a315f3dc28a7781717a9a798a59fda7b', '0x2c8fbb630289363ac80705a1a61273f76fd5a161' ],
'gate': [ '0x0d0707963952f2fba59dd06f2b425ace40b492fe', '0xc6f9741f5a5a676b91171804cf3c500ab438bb6e' ],
'uniswap': [ '0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad', '0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45' ],
'pancakeswap': [ '0x10ed43c718714eb63d5aa57b78b54704e256024e', '0x13f4ea83d0bd40e75c8222255bc855a974568dd4' ],
'solana_kucoin': ['F3a5ZLPKUCrCj6CGKP2m9wS9Y2L8RcUBoJq9L2D2sUYi', '2AQdpHJ2JpcEgPiATFnAKfV9hPMvouWJAhKvamQ2Krqy' ],
'solana_binance': ['5tzFkiKscXHK5ZXCGbXZJpXaWuBtZ6RrH8eL2a7Yi7Vn', '9WzDXwBbmkg8ZTbNMqUxvQRApF22xwsgMj2SUZrchK2E'],
'solana_coinbase': ['CeBiLnAE3UAW9mCDSjD1VULKLeQjefjN4d941fVKazSM'],
'solana_wormhole': ['worm2ZoG2kUd4vFXhvjh93UUH596ayRfgQ2MgjNMTth']
}
self.address_categories = {'exchange': set(), 'cex': set(), 'dex': set(), 'bridge': set(), 'whale': set(), 'unknown': set()}
for category, addresses in self.exchange_addresses.items():
for address in addresses:
if not isinstance(address, str): continue
addr_lower = address.lower()
self.address_labels[addr_lower] = category
self.address_categories['exchange'].add(addr_lower)
if category in ['kucoin', 'binance', 'coinbase', 'kraken', 'okx', 'gate', 'solana_kucoin', 'solana_binance', 'solana_coinbase']:
self.address_categories['cex'].add(addr_lower)
elif category in ['uniswap', 'pancakeswap']:
self.address_categories['dex'].add(addr_lower)
elif 'wormhole' in category:
self.address_categories['bridge'].add(addr_lower)
print(f"✅ تم تهيئة {len(self.address_labels)} عنوان منصة/بروتوكول معروف.")
async def _load_contracts_from_r2(self):
"""تحميل قاعدة بيانات العقود من R2"""
if not self.r2_service: return
try:
key = "contracts.json"
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
contracts_data = json.loads(response['Body'].read())
loaded_count = 0
updated_count = 0
updated_contracts_db = self.contracts_db.copy()
for symbol, contract_data in contracts_data.items():
symbol_lower = symbol.lower()
if isinstance(contract_data, dict) and 'address' in contract_data and 'network' in contract_data:
updated_contracts_db[symbol_lower] = contract_data
loaded_count += 1
elif isinstance(contract_data, str):
new_format = {
'address': contract_data,
'network': self._detect_network_from_address(contract_data)
}
updated_contracts_db[symbol_lower] = new_format
loaded_count += 1
updated_count +=1
self.contracts_db = updated_contracts_db
print(f"✅ تم تحميل {loaded_count} عقد من R2.")
if updated_count > 0:
print(f" ℹ️ تم تحديث صيغة {updated_count} عقد إلى الصيغة الجديدة.")
await self._save_contracts_to_r2()
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
print("⚠️ لم يتم العثور على قاعدة بيانات العقود في R2. ستبدأ فارغة.")
else:
print(f"❌ خطأ ClientError أثناء تحميل العقود من R2: {e}")
except Exception as e:
print(f"❌ خطأ عام أثناء تحميل العقود من R2: {e}")
async def get_symbol_whale_activity(self, symbol, contract_address=None):
"""
مراقبة تحركات الحيتان لعملة محددة مع معالجة العملات الأصلية
"""
try:
print(f"🔍 بدء مراقبة الحيتان الحقيقية للعملة: {symbol}")
native_coins = ['BTC', 'LTC', 'ETH', 'BNB', 'ADA', 'DOT', 'XRP', 'DOGE', 'BCH', 'XLM', 'TRX', 'EOS', 'XMR']
base_symbol = symbol.split("/")[0].upper() if '/' in symbol else symbol.upper()
if base_symbol in native_coins:
print(f"ℹ️ {symbol} عملة أصلية - لا توجد بيانات حيتان للتوكنات")
return self._create_native_coin_response(symbol)
contract_info = None
network = None
if contract_address and isinstance(contract_address, str):
network = self._detect_network_from_address(contract_address)
contract_info_from_db = await self._find_contract_address_enhanced(symbol)
if contract_info_from_db and contract_info_from_db.get('address', '').lower() != contract_address.lower():
print(f" ⚠️ تم توفير عنوان عقد ({contract_address}) يختلف عن المخزن لـ {symbol}. سيتم استخدام العنوان المُقدم.")
contract_info = {'address': contract_address, 'network': network}
elif contract_info_from_db:
contract_info = contract_info_from_db
else:
contract_info = {'address': contract_address, 'network': network}
else:
contract_info = await self._find_contract_address_enhanced(symbol)
if not contract_info or not contract_info.get('address') or not contract_info.get('network'):
print(f"❌ لم يتم العثور على عقد أو شبكة للعملة {symbol}")
return self._create_no_contract_response(symbol)
contract_address_final = contract_info['address']
network_final = contract_info['network']
print(f"🌐 البحث في الشبكة المحددة: {network_final} للعقد: {contract_address_final}")
transfers = await self._get_targeted_transfer_data(contract_address_final, network_final)
if not transfers:
print(f"⚠️ لم يتم العثور على تحويلات للعملة {symbol} على شبكة {network_final}")
return self._create_no_transfers_response(symbol)
recent_transfers = self._filter_recent_transfers(transfers, max_minutes=120)
if not recent_transfers:
print(f"⚠️ لا توجد تحويلات حديثة للعملة {symbol}")
return self._create_no_recent_activity_response(symbol)
print(f"📊 تم جلب {len(recent_transfers)} تحويلة حديثة فريدة للعملة {symbol}")
analysis = await self._analyze_enhanced_whale_impact(recent_transfers, symbol, contract_address_final, network_final)
return analysis
except Exception as e:
print(f"❌ خطأ في مراقبة الحيتان للعملة {symbol}: {e}")
traceback.print_exc()
return self._create_error_response(symbol, str(e))
def _create_native_coin_response(self, symbol):
"""إنشاء رد للعملات الأصلية التي ليس لها tokens"""
return {
'symbol': symbol, 'data_available': False, 'error': 'NATIVE_COIN_NO_TOKEN',
'trading_signal': {'action': 'HOLD', 'confidence': 0.3, 'reason': f'{symbol} عملة أصلية - لا توجد بيانات حيتان للتوكنات'},
'llm_friendly_summary': {'whale_activity_summary': f'{symbol} عملة أصلية - نظام مراقبة الحيتان الحالي مصمم للتوكنات فقط', 'recommended_action': 'HOLD', 'confidence': 0.3, 'key_metrics': {'whale_movement_impact': 'NOT_APPLICABLE'}}
}
async def _get_targeted_transfer_data(self, contract_address, network):
"""جلب بيانات التحويلات من الشبكة المحددة فقط"""
print(f"🌐 جلب بيانات التحويلات من شبكة {network} للعقد: {contract_address}")
if isinstance(contract_address, dict):
contract_address = contract_address.get(network)
if not contract_address:
print(f"❌ لم يتم العثور على عنوان للشبكة {network}")
return []
if not isinstance(contract_address, str):
print(f"❌ عنوان العقد غير صالح: {type(contract_address)}")
return []
if network in self.supported_networks:
net_config = self.supported_networks[network]
if net_config['type'] == 'evm':
return await self._get_evm_network_transfer_data(contract_address, network)
elif net_config['type'] == 'solana':
return await self._get_solana_transfer_data(contract_address, network)
print(f"⚠️ نوع الشبكة {network} غير مدعوم حاليًا.")
return []
async def _get_evm_network_transfer_data(self, contract_address, network):
"""جلب بيانات التحويلات من شبكة EVM محددة (باستخدام RPC و Explorer)"""
try:
transfers = []
rpc_successful = False
try:
rpc_transfers = await self._get_rpc_token_data_targeted(contract_address, network)
if rpc_transfers:
transfers.extend(rpc_transfers)
rpc_successful = True
print(f"✅ RPC {network}: تم جلب {len(rpc_transfers)} تحويلة عبر eth_getLogs")
else:
print(f"⚠️ RPC {network}: لم يتم العثور على تحويلات عبر eth_getLogs")
except Exception as rpc_error:
print(f"❌ خطأ في جلب بيانات RPC {network}: {rpc_error}")
if not rpc_successful or len(transfers) < 5:
print(f"ℹ️ محاولة جلب بيانات إضافية من Explorer لـ {network}...")
try:
explorer_transfers = await self._get_explorer_token_data(contract_address, network)
if explorer_transfers:
existing_hashes = {f"{t.get('hash')}-{t.get('logIndex','N/A')}" for t in transfers}
new_explorer_transfers = []
for et in explorer_transfers:
explorer_key = f"{et.get('hash')}-{et.get('logIndex','N/A')}"
if explorer_key not in existing_hashes:
new_explorer_transfers.append(et)
if new_explorer_transfers:
transfers.extend(new_explorer_transfers)
print(f"✅ Explorer {network}: تم إضافة {len(new_explorer_transfers)} تحويلة جديدة.")
else:
print(f"⚠️ Explorer {network}: لم يتم العثور على تحويلات جديدة.")
else:
print(f"⚠️ Explorer {network}: لم يتم العثور على تحويلات.")
except Exception as explorer_error:
print(f"❌ خطأ في جلب بيانات Explorer {network}: {explorer_error}")
print(f"📊 إجمالي التحويلات المجمعة لـ {network}: {len(transfers)}")
final_transfers = []
seen_keys = set()
for t in transfers:
key = f"{t.get('hash')}-{t.get('logIndex','N/A')}"
if key not in seen_keys:
final_transfers.append(t)
seen_keys.add(key)
return final_transfers
except Exception as e:
print(f"❌ خطأ عام في جلب بيانات {network}: {e}")
return []
async def _get_rpc_token_data_targeted(self, contract_address, network='ethereum', blocks_to_scan=500):
"""جلب تحويلات التوكن ERC20 عبر RPC باستخدام eth_getLogs"""
try:
if network not in self.supported_networks or self.supported_networks[network]['type'] != 'evm':
print(f"⚠️ شبكة {network} غير مدعومة أو ليست EVM لـ eth_getLogs.")
return []
if not isinstance(contract_address, str) or not contract_address.startswith('0x'):
print(f"❌ عنوان العقد غير صالح لـ EVM: {contract_address}")
return []
contract_address_checksum = contract_address
endpoints = self.supported_networks[network]['rpc_endpoints']
if not endpoints:
print(f" ❌ لا توجد نقاط RPC معرفة لشبكة {network}.")
return []
transfers = []
successful_endpoint = None
for endpoint in endpoints:
try:
endpoint_name = endpoint.split('//')[1].split('/')[0] if '//' in endpoint else endpoint
print(f" 🔍 محاولة جلب سجلات التحويلات لـ {network} عبر {endpoint_name}...")
# ✅ الإصلاح: استخدام "self.rpc_semaphore"
async with self.rpc_semaphore:
payload_block = {"jsonrpc": "2.0", "method": "eth_blockNumber", "params": [], "id": int(time.time())}
response_block = await self.http_client.post(endpoint, json=payload_block, timeout=15.0)
response_block.raise_for_status()
json_response_block = response_block.json()
result_block = json_response_block.get('result')
if not result_block:
rpc_error = json_response_block.get('error')
error_msg = rpc_error.get('message') if rpc_error else 'No block number result'
print(f" ❌ لم يتم الحصول على رقم أحدث كتلة من {endpoint_name}: {error_msg}")
continue
latest_block = int(result_block, 16)
async with self.rpc_semaphore:
payload_latest_block_time = {"jsonrpc": "2.0", "method": "eth_getBlockByNumber", "params": [hex(latest_block), False], "id": int(time.time())+1}
response_latest_time = await self.http_client.post(endpoint, json=payload_latest_block_time, timeout=15.0)
latest_block_timestamp_approx = int(time.time())
if response_latest_time.status_code == 200:
latest_block_data = response_latest_time.json().get('result')
if latest_block_data and latest_block_data.get('timestamp'):
latest_block_timestamp_approx = int(latest_block_data['timestamp'], 16)
from_block = max(0, latest_block - blocks_to_scan)
print(f" 📦 فحص الكتل من {from_block} إلى {latest_block} (آخر {blocks_to_scan} كتل تقريباً)")
payload_logs = {
"jsonrpc": "2.0", "method": "eth_getLogs",
"params": [{"fromBlock": hex(from_block), "toBlock": hex(latest_block), "address": contract_address_checksum, "topics": [TRANSFER_EVENT_SIGNATURE]}],
"id": int(time.time())+2
}
async with self.rpc_semaphore:
response_logs = await self.http_client.post(endpoint, json=payload_logs, timeout=45.0)
response_logs.raise_for_status()
json_response_logs = response_logs.json()
logs = json_response_logs.get('result')
if logs is None:
rpc_error = json_response_logs.get('error')
error_msg = rpc_error.get('message') if rpc_error else 'Invalid logs response'
print(f" ❌ استجابة السجلات غير صالحة من {endpoint_name}: {error_msg}")
continue
if not logs:
print(f" ✅ لا توجد سجلات تحويل لـ {contract_address} في آخر {blocks_to_scan} كتل عبر {endpoint_name}")
successful_endpoint = endpoint_name
break
print(f" 📊 تم العثور على {len(logs)} سجل تحويل محتمل.")
for log in logs:
try:
topics = log.get('topics', [])
data = log.get('data', '0x')
block_num_hex = log.get('blockNumber')
tx_hash = log.get('transactionHash')
log_index_hex = log.get('logIndex', '0x0')
if len(topics) == 3 and data != '0x' and block_num_hex and tx_hash:
sender = '0x' + topics[1][26:]
receiver = '0x' + topics[2][26:]
value = str(int(data, 16))
block_num = str(int(block_num_hex, 16))
log_index = str(int(log_index_hex, 16))
transfers.append({
'hash': tx_hash, 'from': sender.lower(), 'to': receiver.lower(),
'value': value, 'timeStamp': str(latest_block_timestamp_approx),
'blockNumber': block_num, 'network': network, 'logIndex': log_index
})
else:
print(f" ⚠️ سجل غير متوافق مع Transfer Event: Topics={len(topics)}, Data={data[:10]}...")
except (ValueError, TypeError, KeyError, IndexError) as log_parse_error:
print(f" ⚠️ خطأ في تحليل سجل فردي: {log_parse_error} - Log: {log}")
continue
print(f" ✅ تم تحليل {len(transfers)} تحويلة بنجاح من {endpoint_name}")
successful_endpoint = endpoint_name
break
except ssl.SSLCertVerificationError as ssl_err:
print(f" ❌ خطأ SSL مع {endpoint_name}: {ssl_err}")
continue
except httpx.HTTPStatusError as http_err:
print(f" ❌ خطأ HTTP من {endpoint_name}: {http_err.response.status_code} - {http_err}")
continue
except (httpx.RequestError, asyncio.TimeoutError) as req_err:
print(f" ❌ خطأ اتصال/مهلة بـ {endpoint_name}: {req_err}")
continue
except json.JSONDecodeError as json_err:
print(f" ❌ خطأ في تحليل استجابة JSON من {endpoint_name}: {json_err}")
continue
except Exception as e:
if "Cannot reopen a client instance" in str(e):
print(f" ❌ فشل فادح: محاولة استخدام client مغلق. {e}")
return transfers
print(f" ❌ فشل غير متوقع مع {endpoint_name}: {e}")
traceback.print_exc()
continue
if successful_endpoint:
print(f"✅ اكتمل جلب بيانات RPC لـ {network} بنجاح باستخدام {successful_endpoint}. إجمالي التحويلات الأولية: {len(transfers)}")
else:
print(f"❌ فشلت جميع نقاط RPC لـ {network} في الاستجابة بشكل صحيح.")
return transfers
except Exception as e:
print(f"❌ فشل عام في جلب بيانات RPC لـ {network}: {e}")
traceback.print_exc()
return []
async def _get_solana_transfer_data(self, token_address, network='solana', limit=50):
"""جلب بيانات التحويلات من شبكة Solana مع تحسين التعامل مع الأخطاء"""
try:
print(f" 🔍 جلب تحويلات Solana للتوكن: {token_address}")
transfers = []
endpoints = self.supported_networks[network]['rpc_endpoints']
if not endpoints:
print(f" ❌ لا توجد نقاط RPC معرفة لشبكة Solana.")
return []
signatures_found = False
successful_endpoint_name = None
for endpoint in endpoints:
endpoint_name = endpoint.split('//')[-1]
try:
payload_signatures = {
"jsonrpc": "2.0", "id": int(time.time()),
"method": "getSignaturesForAddress",
"params": [ token_address, {"limit": limit, "commitment": "confirmed"} ]
}
# ✅ الإصلاح: استخدام "self.rpc_semaphore"
async with self.rpc_semaphore:
response_signatures = await self.http_client.post(endpoint, json=payload_signatures, timeout=20.0)
if response_signatures.status_code == 403:
print(f" ⚠️ Solana endpoint {endpoint_name} failed: 403 Forbidden")
continue
elif response_signatures.status_code != 200:
print(f" ⚠️ Solana endpoint {endpoint_name} failed: {response_signatures.status_code}")
continue
data_signatures = response_signatures.json()
if 'error' in data_signatures:
print(f" ❌ خطأ RPC (getSignatures) من {endpoint_name}: {data_signatures['error'].get('message', 'Unknown RPC error')}")
continue
if 'result' not in data_signatures or data_signatures['result'] is None:
print(f" ✅ Solana: لا توجد معاملات حديثة ({endpoint_name})")
successful_endpoint_name = endpoint_name
signatures_found = True
break
signatures_data = data_signatures['result']
if not signatures_data:
print(f" ✅ Solana: لا توجد معاملات حديثة ({endpoint_name})")
successful_endpoint_name = endpoint_name
signatures_found = True
break
signatures = [tx['signature'] for tx in signatures_data]
print(f" 📊 Solana: وجد {len(signatures)} توقيع معاملة محتملة ({endpoint_name})")
signatures_found = True
successful_endpoint_name = endpoint_name
processed_count = 0
transaction_tasks = []
# ✅ الإصلاح: تمرير "self" إلى الدالة المساعدة
for signature in signatures[:20]:
transaction_tasks.append(
self._get_solana_transaction_detail_with_retry(self, signature, endpoint, self.http_client)
)
transaction_details = await asyncio.gather(*transaction_tasks)
for detail_result in transaction_details:
if detail_result and self._is_solana_token_transfer(detail_result, token_address):
transfer = await self._parse_solana_transfer(detail_result, token_address)
if transfer:
transfers.append(transfer)
processed_count += 1
print(f" ✅ Solana: تم تحليل {processed_count} تحويلة توكن فعلية من {endpoint_name}")
break
except ssl.SSLCertVerificationError as ssl_err:
print(f" ❌ فشل endpoint في Solana ({endpoint_name}): خطأ في شهادة SSL - {ssl_err}")
continue
except httpx.HTTPStatusError as http_err:
if http_err.response.status_code == 403: print(f" ⚠️ Solana endpoint {endpoint_name} failed: 403 Forbidden")
else: print(f" ❌ خطأ HTTP من Solana ({endpoint_name}): {http_err.response.status_code}")
continue
except (httpx.RequestError, asyncio.TimeoutError) as req_err:
print(f" ❌ خطأ اتصال/مهلة بـ Solana ({endpoint_name}): {req_err}")
continue
except json.JSONDecodeError as json_err:
print(f" ❌ خطأ في تحليل استجابة JSON من Solana ({endpoint_name}): {json_err}")
continue
except Exception as e:
if "Cannot reopen a client instance" in str(e):
print(f" ❌ فشل فادح: محاولة استخدام client مغلق. {e}")
return transfers
print(f" ❌ فشل غير متوقع مع endpoint Solana ({endpoint_name}): {e}")
traceback.print_exc()
continue
if successful_endpoint_name:
print(f"✅ اكتمل جلب بيانات Solana بنجاح باستخدام {successful_endpoint_name}. إجمالي التحويلات النهائية: {len(transfers)}")
elif not signatures_found:
print(f" ❌ فشلت جميع نقاط RPC لـ Solana في الاستجابة بشكل صحيح لجلب التوقيعات.")
else:
print(f"⚠️ تم العثور على توقيعات Solana ولكن حدثت مشاكل في جلب التفاصيل أو تحليلها. التحويلات النهائية: {len(transfers)}")
return transfers
except Exception as e:
print(f"❌ فشل عام في جلب بيانات Solana: {e}")
traceback.print_exc()
return []
# ✅ الإصلاح: إضافة "self" كباراميتر
async def _get_solana_transaction_detail_with_retry(self, signature, endpoint, client: httpx.AsyncClient, retries=2, delay=0.5):
"""جلب تفاصيل معاملة Solana مع محاولات إعادة بسيطة باستخدام client مشترك"""
last_exception = None
for attempt in range(retries):
try:
# ✅ الإصلاح: استخدام "self.rpc_semaphore"
async with self.rpc_semaphore:
detail = await self._get_solana_transaction_detail(signature, endpoint, client)
if detail:
return detail
elif detail is None:
return None
print(f" ⚠️ نتيجة فارغة غير متوقعة لـ {signature[:10]}... (محاولة {attempt + 1}/{retries})")
last_exception = Exception("Empty result without error")
except (httpx.RequestError, asyncio.TimeoutError, ssl.SSLError) as e:
print(f" ⏳ خطأ مؤقت ({type(e).__name__}) في جلب تفاصيل {signature[:10]}... (محاولة {attempt + 1}/{retries})")
last_exception = e
if attempt < retries - 1:
await asyncio.sleep(delay * (attempt + 1))
else:
print(f" ❌ فشل جلب تفاصيل {signature[:10]}... بعد {retries} محاولات.")
return None
except Exception as e:
print(f" ❌ خطأ غير متوقع في جلب تفاصيل {signature[:10]}...: {e}")
return None
if attempt < retries - 1 and not isinstance(last_exception, (httpx.RequestError, asyncio.TimeoutError, ssl.SSLError)):
await asyncio.sleep(delay * (attempt + 1))
return None
async def _get_solana_transaction_detail(self, signature, endpoint, client: httpx.AsyncClient):
"""جلب تفاصيل معاملة Solana (يستخدم الآن client مُمرر)"""
try:
payload = {
"jsonrpc": "2.0",
"id": f"gtx-{signature[:8]}-{int(time.time()*1000)}",
"method": "getTransaction",
"params": [ signature, {"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0, "commitment": "confirmed"} ]
}
# ✅ الإصلاح: لا يوجد semaphore هنا، يتم تطبيقه في الدالة التي تستدعيها
response = await client.post(endpoint, json=payload, timeout=20.0)
response.raise_for_status()
data = response.json()
if 'error' in data:
print(f" ❌ خطأ RPC في getTransaction لـ {signature[:10]}... : {data['error'].get('message')}")
return None
result = data.get('result')
if result is None:
print(f" ℹ️ لم يتم العثور على نتيجة للمعاملة {signature[:10]}...")
return None
return result
except httpx.HTTPStatusError as http_err:
print(f" ⚠️ فشل HTTP في getTransaction لـ {signature[:10]}... : {http_err.response.status_code}")
return None
except (httpx.RequestError, asyncio.TimeoutError) as req_err:
print(f" ⏳ خطأ اتصال/مهلة في getTransaction لـ {signature[:10]}...")
raise req_err
except ssl.SSLError as ssl_err:
print(f" ❌ خطأ SSL في getTransaction لـ {signature[:10]}... : {ssl_err}")
raise ssl_err
except json.JSONDecodeError as json_err:
print(f" ❌ خطأ JSON في getTransaction لـ {signature[:10]}... : {response.text[:200]}")
return None
except Exception as e:
print(f" ❌ استثناء غير متوقع في getTransaction لـ {signature[:10]}... : {e}")
traceback.print_exc()
return None
def _is_solana_token_transfer(self, transaction, token_address):
"""التحقق إذا كانت المعاملة تحويل توكن في Solana"""
try:
if not transaction or 'meta' not in transaction or transaction.get('meta') is None:
return False
meta = transaction['meta']
post_balances = meta.get('postTokenBalances')
pre_balances = meta.get('preTokenBalances')
if post_balances is None and pre_balances is None:
inner_instructions = meta.get('innerInstructions', [])
for inner_inst_set in inner_instructions:
for inst in inner_inst_set.get('instructions', []):
parsed = inst.get('parsed')
if isinstance(parsed, dict) and parsed.get('type') in ['transfer', 'transferChecked'] and parsed.get('info', {}).get('mint') == token_address:
return True
return False
all_balances = (post_balances or []) + (pre_balances or [])
for balance in all_balances:
if isinstance(balance, dict) and balance.get('mint') == token_address:
pre_amount = next((pb.get('uiTokenAmount',{}).get('amount','0') for pb in (pre_balances or []) if isinstance(pb, dict) and pb.get('owner') == balance.get('owner') and pb.get('mint') == token_address), '0')
post_amount = balance.get('uiTokenAmount',{}).get('amount','0')
try:
if int(post_amount) != int(pre_amount):
return True
except (ValueError, TypeError):
pass
return False
except Exception as e:
return False
async def _parse_solana_transfer(self, transaction, token_address):
"""تحليل معاملة Solana لاستخراج بيانات التحويل"""
try:
if not transaction or 'transaction' not in transaction or not transaction['transaction'].get('signatures'):
return None
signature = transaction['transaction']['signatures'][0]
block_time = transaction.get('blockTime')
timestamp_to_use = str(block_time) if block_time else str(int(time.time()))
slot = transaction.get('slot', 0)
sender = None
receiver = None
amount_raw = 0
meta = transaction.get('meta')
if not meta: return None
pre_balances = {bal.get('owner'): bal for bal in meta.get('preTokenBalances', []) if isinstance(bal, dict) and bal.get('mint') == token_address}
post_balances = {bal.get('owner'): bal for bal in meta.get('postTokenBalances', []) if isinstance(bal, dict) and bal.get('mint') == token_address}
all_owners = set(list(pre_balances.keys()) + list(post_balances.keys()))
for owner in all_owners:
pre_bal_data = pre_balances.get(owner, {}).get('uiTokenAmount', {})
post_bal_data = post_balances.get(owner, {}).get('uiTokenAmount', {})
pre_amount_str = pre_bal_data.get('amount', '0')
post_amount_str = post_bal_data.get('amount', '0')
try:
pre_amount_raw = int(pre_amount_str)
post_amount_raw = int(post_amount_str)
diff_raw = post_amount_raw - pre_amount_raw
if diff_raw > 0:
receiver = owner
amount_raw = diff_raw
elif diff_raw < 0:
sender = owner
except (ValueError, TypeError):
print(f" ⚠️ تحذير: فشل تحليل القيمة الخام لـ Solana ({owner}).")
pre_ui = pre_bal_data.get('uiAmount', 0.0) or 0.0
post_ui = post_bal_data.get('uiAmount', 0.0) or 0.0
diff_ui = post_ui - pre_ui
if diff_ui > 1e-9:
receiver = owner
decimals = await self._get_token_decimals(token_address, 'solana')
if decimals is not None: amount_raw = int(diff_ui * (10**decimals))
else: amount_raw = 0
elif diff_ui < -1e-9:
sender = owner
if not sender or not receiver or amount_raw == 0:
inner_instructions = meta.get('innerInstructions', [])
for inner_inst_set in inner_instructions:
for inst in inner_inst_set.get('instructions', []):
parsed = inst.get('parsed')
if isinstance(parsed, dict) and parsed.get('type') in ['transfer', 'transferChecked'] and parsed.get('info', {}).get('mint') == token_address:
info = parsed.get('info', {})
sender_inner = info.get('source') or info.get('authority')
receiver_inner = info.get('destination')
amount_inner_str = info.get('tokenAmount', {}).get('amount') or info.get('amount')
try:
amount_inner_raw = int(amount_inner_str)
if sender_inner and receiver_inner and amount_inner_raw > 0:
sender = sender_inner
receiver = receiver_inner
amount_raw = amount_inner_raw
break
except (ValueError, TypeError): continue
if sender and receiver and amount_raw > 0: break
if sender and receiver and amount_raw > 0: break
if sender and receiver and amount_raw > 0:
return {
'hash': signature, 'from': sender, 'to': receiver,
'value': str(amount_raw), 'timeStamp': timestamp_to_use,
'blockNumber': str(slot), 'network': 'solana',
'type': 'solana_transfer', 'logIndex': '0'
}
else:
return None
except Exception as e:
sig_short = signature[:10] if 'signature' in locals() and signature else "N/A"
print(f"❌ خطأ في تحليل معاملة Solana ({sig_short}...): {e}")
traceback.print_exc()
return None
async def _get_explorer_token_data(self, contract_address, network):
"""جلب بيانات التحويلات من Explorer"""
if network not in self.supported_networks:
return []
try:
explorer_config = {
'ethereum': {'url': 'https://api.etherscan.io/api', 'key': self.etherscan_key},
'bsc': {'url': 'https://api.bscscan.com/api', 'key': self.bscscan_key},
'polygon': {'url': 'https://api.polygonscan.com/api', 'key': self.polygonscan_key}
}
if network not in explorer_config or not explorer_config[network].get('key'):
return []
config = explorer_config[network]
address_param = "contractaddress" if network == 'ethereum' else "address"
params = {
"module": "account", "action": "tokentx",
address_param: contract_address,
"page": 1, "offset": 100, "startblock": 0,
"endblock": 999999999, "sort": "desc", "apikey": config['key']
}
# ✅ الإصلاح: استخدام "self.http_client" بدلاً من إنشاء client جديد
# ويفضل استخدام semaphore خاص بالمستكشفات إذا كان لديهم حد طلبات صارم
# حالياً، سنستخدم semaphore الـ RPC العام كإجراء وقائي
async with self.rpc_semaphore:
response = await self.http_client.get(config['url'], params=params, timeout=20.0)
if response.status_code == 200:
data = response.json()
status = str(data.get('status', '0'))
message = data.get('message', '').upper()
if status == '1' and message == 'OK':
transfers = data.get('result', [])
processed_transfers = []
for tf in transfers:
if tf.get('tokenSymbol') and tf.get('contractAddress','').lower() == contract_address.lower():
tf['network'] = network
if 'from' in tf: tf['from'] = tf['from'].lower()
if 'to' in tf: tf['to'] = tf['to'].lower()
if 'logIndex' not in tf: tf['logIndex'] = 'N/A'
processed_transfers.append(tf)
print(f" ✅ Explorer {network}: تم جلب {len(processed_transfers)} تحويلة توكن للعقد المحدد.")
return processed_transfers
elif status == '0' and "NO TRANSACTIONS FOUND" in message:
print(f" ✅ Explorer {network}: لا توجد تحويلات.")
return []
else:
error_message = data.get('result', message)
print(f"⚠️ Explorer {network} returned error: Status={status}, Message={message}, Result={error_message}")
if "INVALID API KEY" in str(error_message).upper():
print(f"🚨 خطأ فادح: مفتاح API الخاص بـ {network.upper()} غير صالح!")
return []
else:
print(f"⚠️ Explorer {network} request failed: {response.status_code}")
return []
except Exception as e:
if "Cannot reopen a client instance" in str(e):
print(f" ❌ فشل فادح (Explorer): محاولة استخدام client مغلق. {e}")
return []
print(f"❌ فشل جلب بيانات Explorer لـ {network}: {e}")
return []
def _filter_recent_transfers(self, transfers, max_minutes=120):
"""ترشيح التحويلات الحديثة فقط"""
recent_transfers = []
current_time_dt = datetime.now()
cutoff_timestamp = int((current_time_dt - timedelta(minutes=max_minutes)).timestamp())
processed_keys = set()
for transfer in transfers:
try:
tx_hash = transfer.get('hash', 'N/A')
log_index = transfer.get('logIndex', 'N/A')
unique_key = f"{tx_hash}-{log_index}"
if unique_key in processed_keys:
continue
timestamp_str = transfer.get('timeStamp')
if not timestamp_str or not timestamp_str.isdigit():
continue
timestamp = int(timestamp_str)
if timestamp >= cutoff_timestamp:
transfer_time_dt = datetime.fromtimestamp(timestamp)
time_diff = current_time_dt - transfer_time_dt
transfer['minutes_ago'] = round(time_diff.total_seconds() / 60, 2)
transfer['human_time'] = transfer_time_dt.isoformat()
recent_transfers.append(transfer)
processed_keys.add(unique_key)
except (ValueError, TypeError, KeyError) as e:
print(f" ⚠️ خطأ في ترشيح تحويلة: {e} - البيانات: {transfer}")
continue
print(f" 🔍 تم ترشيح {len(recent_transfers)} تحويلة حديثة فريدة (آخر {max_minutes} دقيقة).")
recent_transfers.sort(key=lambda x: int(x.get('timeStamp', 0)), reverse=True)
return recent_transfers
async def _analyze_enhanced_whale_impact(self, transfers, symbol, contract_address, network):
"""تحليل تأثير تحركات الحيتان المحسن (مع إصلاح KeyError)"""
print(f"📊 تحليل {len(transfers)} تحويلة حديثة للعملة {symbol}")
exchange_flows = {
'to_exchanges': [], 'from_exchanges': [], 'other_transfers': [],
'network_breakdown': defaultdict(lambda: {'to_exchanges': 0, 'from_exchanges': 0, 'other': 0})
}
total_volume_usd = 0.0
whale_transfers_count = 0
network_stats = defaultdict(int)
unique_transfers = {}
decimals = await self._get_token_decimals(contract_address, network)
if decimals is None:
print(f"❌ لا يمكن تحليل الحيتان بدون decimals لـ {symbol} على {network}.")
return self._create_error_response(symbol, f"Failed to get decimals on {network}")
for transfer in transfers:
try:
unique_key = f"{transfer.get('hash', '')}-{transfer.get('logIndex', 'N/A')}"
if unique_key in unique_transfers: continue
unique_transfers[unique_key] = True
from_addr = str(transfer.get('from', '')).lower()
to_addr = str(transfer.get('to', '')).lower()
raw_value_str = transfer.get('value', '0')
transfer_network = transfer.get('network', 'unknown')
if not raw_value_str or not raw_value_str.isdigit(): continue
raw_value = int(raw_value_str)
if raw_value == 0: continue
value_usd = await self._get_accurate_token_value_optimized(raw_value, decimals, symbol)
if value_usd is None or value_usd < 0:
print(f" ⚠️ قيمة USD غير صالحة ({value_usd}) للتحويلة {unique_key}. التخطي.")
continue
total_volume_usd += value_usd
network_stats[transfer_network] += 1
if value_usd >= self.whale_threshold_usd:
whale_transfers_count += 1
is_to_exchange = to_addr in self.address_categories['exchange']
is_from_exchange = from_addr in self.address_categories['exchange']
if is_to_exchange:
exchange_flows['to_exchanges'].append({
'value_usd': value_usd, 'from_type': self._classify_address(from_addr),
'to_exchange': self._classify_address(to_addr),
'transaction': {'hash': transfer.get('hash'), 'minutes_ago': transfer.get('minutes_ago')},
'network': transfer_network
})
exchange_flows['network_breakdown'][transfer_network]['to_exchanges'] += 1
elif is_from_exchange:
exchange_flows['from_exchanges'].append({
'value_usd': value_usd, 'from_exchange': self._classify_address(from_addr),
'to_type': self._classify_address(to_addr),
'transaction': {'hash': transfer.get('hash'), 'minutes_ago': transfer.get('minutes_ago')},
'network': transfer_network
})
exchange_flows['network_breakdown'][transfer_network]['from_exchanges'] += 1
else:
exchange_flows['other_transfers'].append({
'value_usd': value_usd, 'from_type': self._classify_address(from_addr),
'to_type': self._classify_address(to_addr),
'transaction': {'hash': transfer.get('hash'), 'minutes_ago': transfer.get('minutes_ago')},
'network': transfer_network
})
exchange_flows['network_breakdown'][transfer_network]['other'] += 1
except Exception as analysis_loop_error:
print(f" ⚠️ خطأ في تحليل حلقة التحويلات: {analysis_loop_error}")
continue
total_to_exchanges_usd = sum(t['value_usd'] for t in exchange_flows['to_exchanges'])
total_from_exchanges_usd = sum(t['value_usd'] for t in exchange_flows['from_exchanges'])
deposit_count = len(exchange_flows['to_exchanges'])
withdrawal_count = len(exchange_flows['from_exchanges'])
net_flow_usd = total_to_exchanges_usd - total_from_exchanges_usd
exchange_flows['deposit_count'] = deposit_count
exchange_flows['withdrawal_count'] = withdrawal_count
exchange_flows['net_flow_usd'] = net_flow_usd
print(f"🐋 تحليل الحيتان لـ {symbol}:")
print(f" • إجمالي التحويلات الفريدة: {len(unique_transfers)}")
print(f" • تحويلات الحيتان (>${self.whale_threshold_usd:,.0f}): {whale_transfers_count}")
print(f" • إيداعات للمنصات: {deposit_count} (${total_to_exchanges_usd:,.2f})")
print(f" • سحوبات من المنصات: {withdrawal_count} (${total_from_exchanges_usd:,.2f})")
print(f" • صافي التدفق (للمنصات): ${net_flow_usd:,.2f}")
signal = self._generate_enhanced_trading_signal(
total_to_exchanges_usd, total_from_exchanges_usd, net_flow_usd,
deposit_count, withdrawal_count, whale_transfers_count, network_stats
)
llm_summary = self._create_enhanced_llm_summary(signal, exchange_flows, network_stats, total_volume_usd, whale_transfers_count)
return {
'symbol': symbol, 'data_available': True, 'analysis_timestamp': datetime.now().isoformat(),
'summary': {
'total_transfers_analyzed': len(unique_transfers), 'whale_transfers_count': whale_transfers_count,
'total_volume_usd': total_volume_usd, 'time_window_minutes': 120,
'networks_analyzed': dict(network_stats)
},
'exchange_flows': {
'to_exchanges_usd': total_to_exchanges_usd, 'from_exchanges_usd': total_from_exchanges_usd,
'net_flow_usd': net_flow_usd, 'deposit_count': deposit_count, 'withdrawal_count': withdrawal_count,
'network_breakdown': dict(exchange_flows['network_breakdown']),
'top_deposits': sorted([{'value_usd': t['value_usd'], 'network': t['network'], 'to': t['to_exchange']} for t in exchange_flows['to_exchanges']], key=lambda x: x['value_usd'], reverse=True)[:3],
'top_withdrawals': sorted([{'value_usd': t['value_usd'], 'network': t['network'], 'from': t['from_exchange']} for t in exchange_flows['from_exchanges']], key=lambda x: x['value_usd'], reverse=True)[:3]
},
'trading_signal': signal,
'llm_friendly_summary': llm_summary
}
async def _get_accurate_token_value_optimized(self, raw_value, decimals, symbol):
"""حساب القيمة بالدولار باستخدام decimals مُمررة والسعر الحالي"""
try:
if decimals is None or decimals < 0:
print(f"⚠️ قيمة decimals غير صالحة ({decimals}) لـ {symbol}. لا يمكن حساب القيمة.")
return 0
price = await self._get_token_price(symbol)
if price is None or price <= 0:
return 0
token_amount = raw_value / (10 ** decimals)
value_usd = token_amount * price
return value_usd
except OverflowError:
print(f"⚠️ خطأ OverflowError أثناء حساب قيمة {symbol} (Raw={raw_value}, Decimals={decimals})")
return 0
except Exception as e:
print(f"❌ خطأ في _get_accurate_token_value_optimized لـ {symbol}: {e}")
return 0
async def _get_token_decimals(self, contract_address, network):
"""جلب عدد الكسور العشرية للعملة"""
try:
if isinstance(contract_address, dict):
addr_for_network = contract_address.get(network)
if not addr_for_network: return None
contract_address = addr_for_network
if not isinstance(contract_address, str): return None
cache_key = f"{contract_address.lower()}_{network}"
if cache_key in self.token_decimals_cache:
return self.token_decimals_cache[cache_key]
if network in self.supported_networks and self.supported_networks[network]['type'] == 'evm':
endpoints = self.supported_networks[network]['rpc_endpoints']
for endpoint in endpoints:
try:
payload = {
"jsonrpc": "2.0", "method": "eth_call",
"params": [{"to": contract_address, "data": "0x313ce567"}, "latest"],
"id": int(time.time())
}
# ✅ الإصلاح: استخدام "self.rpc_semaphore"
async with self.rpc_semaphore:
response = await self.http_client.post(endpoint, json=payload, timeout=10.0)
if response.status_code == 200:
result = response.json().get('result')
if result and result != '0x' and result.startswith('0x'):
try:
decimals = int(result, 16)
if isinstance(decimals, int) and decimals >= 0:
self.token_decimals_cache[cache_key] = decimals
print(f" ℹ️ تم جلب Decimals لـ {contract_address[:10]}... على {network}: {decimals}")
return decimals
except ValueError:
print(f" ⚠️ فشل تحويل نتيجة decimals من RPC ({result}) لـ {contract_address} على {network}")
continue
except Exception as rpc_decimal_error:
if "Cannot reopen a client instance" in str(rpc_decimal_error):
print(f" ❌ فشل فادح (Decimals): محاولة استخدام client مغلق. {rpc_decimal_error}")
return None
print(f" ⚠️ خطأ RPC أثناء جلب decimals من {endpoint.split('//')[-1]}: {rpc_decimal_error}")
continue
elif network == 'solana':
print(f"ℹ️ جلب decimals لـ Solana ({contract_address}) غير مدعوم حاليًا عبر RPC.")
estimated_decimals = 9
self.token_decimals_cache[cache_key] = estimated_decimals
print(f" ⚠️ استخدام قيمة decimals تقديرية لـ Solana: {estimated_decimals}")
return estimated_decimals
print(f"❌ فشل جلب الكسور العشرية للعقد {contract_address} على شبكة {network} من جميع المصادر.")
return None
except Exception as e:
print(f"❌ فشل عام في جلب الكسور العشرية للعقد {contract_address} على شبكة {network}: {e}")
return None
async def _get_token_price(self, symbol):
"""جلب سعر العملة من CoinGecko مع fallbacks"""
try:
base_symbol = symbol.split('/')[0].upper()
cache_entry = self.token_price_cache.get(base_symbol)
if cache_entry and time.time() - cache_entry.get('timestamp', 0) < 300:
return cache_entry['price']
price = await self._get_token_price_from_coingecko(symbol)
if price is not None and price > 0:
self.token_price_cache[base_symbol] = {'price': price, 'timestamp': time.time()}
return price
price = await self._get_token_price_from_kucoin(symbol)
if price is not None and price > 0:
self.token_price_cache[base_symbol] = {'price': price, 'timestamp': time.time()}
return price
print(f"❌ فشل جميع محاولات جلب سعر العملة {symbol}")
self.token_price_cache[base_symbol] = {'price': 0, 'timestamp': time.time()}
return 0
except Exception as e:
print(f"❌ فشل عام في جلب سعر العملة {symbol}: {e}")
return 0
async def _get_token_price_from_coingecko(self, symbol):
"""جلب سعر العملة من CoinGecko مع التعامل مع rate limiting"""
try:
symbol_mapping = {
'BTC': 'bitcoin', 'ETH': 'ethereum', 'BNB': 'binancecoin', 'ADA': 'cardano',
'DOT': 'polkadot', 'XRP': 'ripple', 'DOGE': 'dogecoin', 'BCH': 'bitcoin-cash',
'LTC': 'litecoin', 'XLM': 'stellar', 'TRX': 'tron', 'EOS': 'eos', 'XMR': 'monero',
'SOL': 'solana', 'MATIC': 'matic-network', 'AVAX': 'avalanche-2', 'LINK': 'chainlink',
'ATOM': 'cosmos', 'UNI': 'uniswap', 'AAVE': 'aave', 'KCS': 'kucoin-shares',
'MAVIA': 'heroes-of-mavia', 'COMMON': 'commonwealth', 'WLFI': 'wolfi',
'PINGPONG': 'pingpong', 'YB': 'yourbusd', 'REACT': 'react', 'XMN': 'xmine',
'ANOME': 'anome', 'ZEN': 'zencash', 'AKT': 'akash-network', 'UB': 'unibit', 'WLD': 'worldcoin'
}
base_symbol = symbol.split('/')[0].upper()
coingecko_id = symbol_mapping.get(base_symbol)
if not coingecko_id:
coingecko_id = base_symbol.lower()
print(f"ℹ️ رمز {base_symbol} غير موجود في symbol_mapping، استخدام {coingecko_id} لـ CoinGecko.")
url = f"https://api.coingecko.com/api/v3/simple/price?ids={coingecko_id}&vs_currencies=usd"
headers = {'User-Agent': 'Mozilla/5.0', 'Accept': 'application/json'}
max_retries = 2
for attempt in range(max_retries):
async with self.coingecko_semaphore:
# ✅ الإصلاح: استخدام "self.http_client"
try:
response = await self.http_client.get(url, headers=headers, timeout=10.0) # استخدام headers محلي
if response.status_code == 429:
wait_time = 3 * (attempt + 1)
print(f"⏰ Rate limit من CoinGecko لـ {symbol} (محاولة {attempt + 1}) - الانتظار {wait_time}s")
await asyncio.sleep(wait_time)
continue
response.raise_for_status()
data = response.json()
price_data = data.get(coingecko_id)
if price_data and 'usd' in price_data:
price = price_data['usd']
if isinstance(price, (int, float)) and price > 0:
return price
else:
print(f"⚠️ استجابة CoinGecko تحتوي على سعر غير صالح لـ {symbol}: {price}")
return 0
else:
if attempt == 0:
print(f"⚠️ لم يتم العثور على سعر لـ {coingecko_id} في CoinGecko، محاولة البحث عن المعرف...")
await asyncio.sleep(1.1)
# ✅ الإصلاح: استخدام "self.http_client" للبحث
found_id = await self._find_coingecko_id_via_search(base_symbol, self.http_client)
if found_id and found_id != coingecko_id:
print(f" 🔄 تم العثور على معرف بديل: {found_id}. إعادة المحاولة...")
coingecko_id = found_id
url = f"https://api.coingecko.com/api/v3/simple/price?ids={coingecko_id}&vs_currencies=usd"
continue
else:
print(f" ❌ لم يتم العثور على معرف بديل لـ {base_symbol}.")
return 0
else:
print(f"⚠️ لم يتم العثور على سعر لـ {coingecko_id} في استجابة CoinGecko بعد البحث.")
return 0
except Exception as inner_e:
if "Cannot reopen a client instance" in str(inner_e):
print(f" ❌ فشل فادح (CoinGecko): محاولة استخدام client مغلق. {inner_e}")
return 0 # لا يمكن المتابعة
# التعامل مع الأخطاء الأخرى
if isinstance(inner_e, httpx.HTTPStatusError):
print(f"❌ خطأ HTTP من CoinGecko لـ {symbol}: {inner_e.response.status_code}")
elif isinstance(inner_e, httpx.RequestError):
print(f"❌ خطأ اتصال بـ CoinGecko لـ {symbol}: {inner_e}")
else:
print(f"❌ خطأ غير متوقع أثناء جلب السعر من CoinGecko لـ {symbol}: {inner_e}")
return 0 # فشل في هذه المحاولة
print(f"❌ فشل جلب السعر من CoinGecko لـ {symbol} بعد {max_retries} محاولات.")
return 0
except Exception as e:
print(f"❌ فشل عام في _get_token_price_from_coingecko لـ {symbol}: {e}")
return 0
async def _find_coingecko_id_via_search(self, symbol, client: httpx.AsyncClient):
"""دالة مساعدة للبحث عن معرف CoinGecko"""
try:
search_url = f"https://api.coingecko.com/api/v3/search?query={symbol}"
# ✅ الإصلاح: استخدام "self.coingecko_semaphore"
async with self.coingecko_semaphore:
response = await client.get(search_url, timeout=10.0)
response.raise_for_status()
data = response.json()
coins = data.get('coins', [])
if coins:
search_symbol_lower = symbol.lower()
for coin in coins:
if coin.get('symbol', '').lower() == search_symbol_lower:
return coin.get('id')
return coins[0].get('id')
return None
except Exception as e:
print(f" ⚠️ خطأ أثناء البحث عن معرف CoinGecko لـ {symbol}: {e}")
return None
async def _get_token_price_from_kucoin(self, symbol):
"""جلب سعر العملة من KuCoin كبديل (بشكل غير متزامن)"""
exchange = None
try:
exchange = ccxt.kucoin({'enableRateLimit': True})
markets = await exchange.load_markets()
if symbol not in markets:
print(f"⚠️ الرمز {symbol} غير موجود في أسواق KuCoin.")
await exchange.close()
return 0
ticker = await exchange.fetch_ticker(symbol)
price = ticker.get('last')
if price is not None:
try:
price_float = float(price)
if price_float > 0:
return price_float
else:
print(f"⚠️ KuCoin أعاد سعراً غير موجب لـ {symbol}: {price}")
return 0
except (ValueError, TypeError):
print(f"⚠️ KuCoin أعاد سعراً غير رقمي لـ {symbol}: {price}")
return 0
else:
print(f"⚠️ KuCoin لم يُعد مفتاح 'last' في ticker لـ {symbol}")
return 0
except ccxt.NetworkError as e:
print(f"❌ خطأ شبكة أثناء جلب السعر من KuCoin لـ {symbol}: {e}")
return 0
except ccxt.ExchangeError as e:
print(f"❌ خطأ منصة KuCoin ({type(e).__name__}) لـ {symbol}: {e}")
return 0
except Exception as e:
print(f"❌ خطأ غير متوقع أثناء جلب السعر من KuCoin لـ {symbol}: {e}")
return 0
finally:
if exchange:
try:
await exchange.close()
except Exception:
pass
def _generate_enhanced_trading_signal(self, to_exchanges_usd, from_exchanges_usd, net_flow_usd, deposit_count, withdrawal_count, whale_transfers_count, network_stats):
"""توليد إشارة تداول محسنة بناء على تحركات الحيتان"""
network_strength = min(len(network_stats) / 3.0, 1.0)
abs_net_flow = abs(net_flow_usd)
# ✅ الإصلاح: استخدام أسماء الباراميترات الصحيحة
total_flow_volume = to_exchanges_usd + from_exchanges_usd
confidence_multiplier = min( (abs_net_flow / 500000.0) + (whale_transfers_count / 5.0) , 1.5)
base_confidence = 0.5 + (network_strength * 0.1)
action = 'HOLD'
reason = f'نشاط حيتان عبر {len(network_stats)} شبكة: {whale_transfers_count} تحويلة كبيرة بقيمة إجمالية ${total_flow_volume:,.0f}. صافي التدفق ${net_flow_usd:,.0f}'
critical_alert = False
if net_flow_usd > 500000 and deposit_count >= 2:
action = 'STRONG_SELL'
confidence = min(0.7 + (base_confidence * confidence_multiplier * 0.3), 0.95)
reason = f'ضغط بيعي قوي عبر {len(network_stats)} شبكة: ${net_flow_usd:,.0f} إيداع للمنصات ({deposit_count} تحويلة).'
critical_alert = abs_net_flow > 1000000
elif net_flow_usd > 150000 and (deposit_count >= 1 or whale_transfers_count > 0):
action = 'SELL'
confidence = min(0.6 + (base_confidence * confidence_multiplier * 0.2), 0.85)
reason = f'ضغط بيعي محتمل عبر {len(network_stats)} شبكة: ${net_flow_usd:,.0f} إيداع للمنصات.'
critical_alert = abs_net_flow > 750000
elif net_flow_usd < -500000 and withdrawal_count >= 2:
action = 'STRONG_BUY'
confidence = min(0.7 + (base_confidence * confidence_multiplier * 0.3), 0.95)
reason = f'تراكم شرائي قوي عبر {len(network_stats)} شبكة: ${abs_net_flow:,.0f} سحب من المنصات ({withdrawal_count} تحويلة).'
critical_alert = abs_net_flow > 1000000
elif net_flow_usd < -150000 and (withdrawal_count >= 1 or whale_transfers_count > 0):
action = 'BUY'
confidence = min(0.6 + (base_confidence * confidence_multiplier * 0.2), 0.85)
reason = f'تراكم شرائي محتمل عبر {len(network_stats)} شبكة: ${abs_net_flow:,.0f} سحب من المنصات.'
critical_alert = abs_net_flow > 750000
else:
if whale_transfers_count > 0 and abs_net_flow < 100000:
confidence = min(base_confidence * 1.1, 0.6)
reason += " (صافي التدفق منخفض نسبياً)"
elif whale_transfers_count > 0 and abs_net_flow < 50000:
confidence = max(0.4, base_confidence * 0.9)
reason += " (صافي التدفق شبه متعادل)"
elif whale_transfers_count == 0:
confidence = 0.3
reason = 'لا توجد تحركات حيتان كبيرة في الساعات الأخيرة'
else:
confidence = base_confidence
final_confidence = max(0.3, min(confidence, 0.95))
return {'action': action, 'confidence': final_confidence, 'reason': reason, 'critical_alert': critical_alert}
def _create_enhanced_llm_summary(self, signal, exchange_flows, network_stats, total_volume, whale_count):
"""إنشاء ملخص محسن للنموذج الضخم (مع إصلاح KeyError)"""
deposit_count_val = exchange_flows.get('deposit_count', 0)
withdrawal_count_val = exchange_flows.get('withdrawal_count', 0)
exchange_involvement_str = f"{deposit_count_val + withdrawal_count_val} معاملة مع المنصات"
return {
'whale_activity_summary': signal['reason'],
'recommended_action': signal['action'],
'confidence': signal['confidence'],
'key_metrics': {
'total_whale_transfers': whale_count,
'total_volume_analyzed': f"${total_volume:,.0f}",
'net_flow_direction': 'TO_EXCHANGES' if signal['action'] in ['SELL', 'STRONG_SELL'] else 'FROM_EXCHANGES' if signal['action'] in ['BUY', 'STRONG_BUY'] else 'BALANCED',
'whale_movement_impact': 'HIGH' if signal['confidence'] > 0.8 else 'MEDIUM' if signal['confidence'] > 0.6 else 'LOW',
'exchange_involvement': exchange_involvement_str,
'network_coverage': f"{len(network_stats)} شبكات",
'data_quality': 'REAL_TIME'
}
}
async def _find_contract_address_enhanced(self, symbol):
"""بحث متقدم عن عقد العملة مع تحديد الشبكة المناسبة"""
base_symbol = symbol.split("/")[0].upper() if '/' in symbol else symbol.upper()
symbol_lower = base_symbol.lower()
print(f"🔍 البحث عن عقد للعملة: {symbol}")
if symbol_lower in self.contracts_db:
contract_info = self.contracts_db[symbol_lower]
if isinstance(contract_info, str):
network = self._detect_network_from_address(contract_info)
contract_info = {'address': contract_info, 'network': network}
self.contracts_db[symbol_lower] = contract_info
if 'address' in contract_info and 'network' in contract_info:
print(f" ✅ وجد في قاعدة البيانات المحلية: {contract_info}")
return contract_info
else:
print(f" ⚠️ بيانات العقد غير مكتملة في القاعدة المحلية لـ {symbol}: {contract_info}")
print(f" 🔍 البحث في CoinGecko عن {base_symbol}...")
coingecko_result = await self._find_contract_via_coingecko(base_symbol)
if coingecko_result:
address, network = coingecko_result
contract_info = {'address': address, 'network': network}
self.contracts_db[symbol_lower] = contract_info
print(f" ✅ تم العثور على عقد {symbol} عبر CoinGecko على شبكة {network}: {address}")
if self.r2_service:
await self._save_contracts_to_r2()
return contract_info
print(f" ❌ فشل العثور على عقد لـ {symbol} في جميع المصادر")
return None
async def _find_contract_via_coingecko(self, symbol):
"""البحث عن عقد العملة عبر CoinGecko مع التعامل مع الأخطاء ومعدل الطلبات"""
try:
search_url = f"https://api.coingecko.com/api/v3/search?query={symbol}"
headers = {'User-Agent': 'Mozilla/5.0', 'Accept': 'application/json'}
max_retries = 2
for attempt in range(max_retries):
async with self.coingecko_semaphore:
# ✅ الإصلاح: استخدام "self.http_client"
try:
print(f" 🔍 CoinGecko Search API Call (Attempt {attempt + 1}) for {symbol}")
response = await self.http_client.get(search_url, headers=headers, timeout=15.0)
if response.status_code == 429:
wait_time = 3 * (attempt + 1)
print(f" ⏰ Rate limit (Search) for {symbol} - Waiting {wait_time}s")
await asyncio.sleep(wait_time)
continue
response.raise_for_status()
data = response.json()
coins = data.get('coins', [])
if not coins:
print(f" ❌ No matching coins found for {symbol} on CoinGecko")
return None
print(f" 📊 Found {len(coins)} potential matches for {symbol}")
best_coin = None
search_symbol_lower = symbol.lower()
for coin in coins:
coin_symbol = coin.get('symbol', '').lower()
coin_name = coin.get('name', '').lower()
if coin_symbol == search_symbol_lower:
best_coin = coin; print(f" ✅ Exact symbol match: {coin.get('name')}"); break
if not best_coin and coin_name == search_symbol_lower:
best_coin = coin; print(f" ✅ Exact name match: {coin.get('name')}")
if not best_coin:
best_coin = coins[0]; print(f" ⚠️ Using first result: {best_coin.get('name')}")
coin_id = best_coin.get('id')
if not coin_id: print(f" ❌ No ID found"); return None
print(f" 🔍 Fetching details for CoinGecko ID: {coin_id}")
detail_url = f"https://api.coingecko.com/api/v3/coins/{coin_id}"
await asyncio.sleep(1.1)
detail_response = await self.http_client.get(detail_url, headers=headers, timeout=15.0)
if detail_response.status_code == 429:
wait_time = 5 * (attempt + 1)
print(f" ⏰ Rate limit (Details) for {coin_id} - Waiting {wait_time}s")
await asyncio.sleep(wait_time)
detail_response = await self.http_client.get(detail_url, headers=headers, timeout=15.0)
detail_response.raise_for_status()
detail_data = detail_response.json()
platforms = detail_data.get('platforms', {})
if not platforms: print(f" ❌ No platform data found"); return None
network_priority = ['ethereum', 'binance-smart-chain', 'polygon-pos', 'arbitrum-one', 'optimistic-ethereum', 'avalanche', 'fantom', 'solana']
network_map = {'ethereum': 'ethereum', 'binance-smart-chain': 'bsc', 'polygon-pos': 'polygon', 'arbitrum-one': 'arbitrum', 'optimistic-ethereum': 'optimism', 'avalanche': 'avalanche', 'fantom': 'fantom', 'solana': 'solana'}
for platform_cg in network_priority:
address = platforms.get(platform_cg)
if address and isinstance(address, str) and address.strip():
network = network_map.get(platform_cg)
if network:
print(f" ✅ Found contract on {network}: {address}")
return address, network
print(f" ❌ No contract found on supported platforms for {coin_id}")
return None
except Exception as inner_e:
if "Cannot reopen a client instance" in str(inner_e):
print(f" ❌ فشل فادح (CoinGecko): محاولة استخدام client مغلق. {inner_e}")
return None
if isinstance(inner_e, httpx.HTTPStatusError):
print(f" ❌ HTTP Error from CoinGecko ({inner_e.request.url}): {inner_e.response.status_code}")
elif isinstance(inner_e, httpx.RequestError):
print(f" ❌ Connection Error with CoinGecko ({inner_e.request.url}): {inner_e}")
else:
print(f" ❌ Unexpected error during CoinGecko API call (Attempt {attempt + 1}): {inner_e}")
if attempt < max_retries - 1: await asyncio.sleep(1); continue
else: return None
print(f" ❌ Failed to get contract from CoinGecko for {symbol} after {max_retries} attempts.")
return None
except Exception as e:
print(f"❌ General failure in _find_contract_via_coingecko for {symbol}: {e}")
traceback.print_exc()
return None
def _is_exchange_address(self, address):
"""التحقق إذا كان العنوان ينتمي إلى منصة"""
try:
if not isinstance(address, str): return False
return address.lower() in self.address_categories['exchange']
except Exception: return False
def _classify_address(self, address):
"""تصنيف العنوان إلى نوع محدد"""
try:
if not isinstance(address, str): return 'unknown'
return self.address_labels.get(address.lower(), 'unknown')
except Exception: return 'unknown'
def _create_no_contract_response(self, symbol):
"""إنشاء رد عند عدم العثور على عقد"""
return {
'symbol': symbol, 'data_available': False, 'error': 'NO_CONTRACT_FOUND',
'trading_signal': {'action': 'HOLD', 'confidence': 0.3, 'reason': 'لم يتم العثور على عقد العملة - لا توجد بيانات حيتان'},
'llm_friendly_summary': {'whale_activity_summary': 'لا توجد بيانات عن تحركات الحيتان - لم يتم العثور على عقد العملة', 'recommended_action': 'HOLD', 'confidence': 0.3, 'key_metrics': {'whale_movement_impact': 'NO_DATA'}}
}
def _create_no_transfers_response(self, symbol):
"""إنشاء رد عند عدم العثور على تحويلات"""
return {
'symbol': symbol, 'data_available': False, 'error': 'NO_TRANSFERS_FOUND',
'trading_signal': {'action': 'HOLD', 'confidence': 0.3, 'reason': 'لا توجد تحويلات للعملة - لا توجد بيانات حيتان'},
'llm_friendly_summary': {'whale_activity_summary': 'لا توجد تحويلات حديثة للعملة - لا توجد بيانات حيتان', 'recommended_action': 'HOLD', 'confidence': 0.3, 'key_metrics': {'whale_movement_impact': 'NO_DATA'}}
}
def _create_no_recent_activity_response(self, symbol):
"""إنشاء رد عند عدم وجود نشاط حديث"""
return {
'symbol': symbol, 'data_available': False, 'error': 'NO_RECENT_ACTIVITY',
'trading_signal': {'action': 'HOLD', 'confidence': 0.3, 'reason': 'لا توجد تحويلات حديثة للعملة - لا توجد بيانات حيتان'},
'llm_friendly_summary': {'whale_activity_summary': 'لا توجد تحويلات حيتان حديثة في آخر 120 دقيقة - لا توجد بيانات حيتان', 'recommended_action': 'HOLD', 'confidence': 0.3, 'key_metrics': {'whale_movement_impact': 'NO_DATA'}}
}
def _create_error_response(self, symbol, error_msg):
"""إنشاء رد في حالة حدوث خطأ"""
return {
'symbol': symbol, 'data_available': False, 'error': f'ANALYSIS_ERROR: {error_msg}',
'trading_signal': {'action': 'HOLD', 'confidence': 0.3, 'reason': f'خطأ في تحليل الحيتان: {error_msg} - لا توجد بيانات حيتان'},
'llm_friendly_summary': {'whale_activity_summary': f'فشل في تحليل تحركات الحيتان: {error_msg} - لا توجد بيانات حيتان', 'recommended_action': 'HOLD', 'confidence': 0.3, 'key_metrics': {'whale_movement_impact': 'ERROR'}}
}
async def _save_contracts_to_r2(self):
"""حفظ قاعدة البيانات العقود إلى R2"""
if not self.r2_service: return
try:
key = "contracts.json"
contracts_to_save = {}
for symbol, data in self.contracts_db.items():
if isinstance(data, dict) and 'address' in data and 'network' in data:
contracts_to_save[symbol] = data
elif isinstance(data, str):
contracts_to_save[symbol] = {'address': data, 'network': self._detect_network_from_address(data)}
if not contracts_to_save:
print("⚠️ لا توجد بيانات عقود صالحة للحفظ في R2.")
return
data_json = json.dumps(contracts_to_save, indent=2).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
print(f"✅ تم حفظ قاعدة بيانات العقود ({len(contracts_to_save)} إدخال) إلى R2")
except Exception as e:
print(f"❌ فشل حفظ قاعدة البيانات العقود: {e}")
async def generate_whale_trading_signal(self, symbol, whale_data, market_context):
"""توليد إشارة تداول بناءً على بيانات الحيتان"""
try:
if not whale_data or not whale_data.get('data_available', False):
return {
'action': 'HOLD', 'confidence': 0.3,
'reason': 'لا توجد بيانات كافية عن نشاط الحيتان',
'source': 'whale_analysis', 'data_available': False
}
whale_signal = whale_data.get('trading_signal', {})
analysis_details = whale_data
return {
'action': whale_signal.get('action', 'HOLD'),
'confidence': whale_signal.get('confidence', 0.3),
'reason': whale_signal.get('reason', 'تحليل الحيتان غير متوفر'),
'source': 'whale_analysis',
'critical_alert': whale_signal.get('critical_alert', False),
'data_available': True,
'whale_analysis_details': analysis_details
}
except Exception as e:
print(f"❌ خطأ في توليد إشارة تداول الحيتان لـ {symbol}: {e}")
return {
'action': 'HOLD', 'confidence': 0.3,
'reason': f'خطأ في تحليل الحيتان: {str(e)} - لا توجد بيانات حيتان',
'source': 'whale_analysis', 'data_available': False
}
async def cleanup(self):
"""تنظيف الموارد عند الإغلاق"""
if hasattr(self, 'http_client') and self.http_client and not self.http_client.is_closed:
try:
await self.http_client.aclose()
print("✅ تم إغلاق اتصال HTTP client لمراقب الحيتان.")
except Exception as e:
print(f"⚠️ خطأ أثناء إغلاق HTTP client لمراقب الحيتان: {e}")
print("✅ EnhancedWhaleMonitor loaded - RPC Rate Limiting (Semaphore) & Updated Endpoints")