Trad / whale_news_data.py
Riy777's picture
Update whale_news_data.py
0a3a574
raw
history blame
56.2 kB
# whale_news_data.py
import os
import asyncio
import httpx
import json
import traceback
import time
from datetime import datetime, timedelta
from collections import defaultdict, deque
import ccxt.pro as ccxt
import numpy as np
import logging
# تعطيل تسجيل HTTP المزعج
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
class EnhancedWhaleMonitor:
def __init__(self, contracts_db=None, r2_service=None):
self.http_client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
follow_redirects=True
)
# الاحتفاظ بالمفاتيح المطلوبة فقط
self.moralis_key = os.getenv("MORALIS_KEY")
self.etherscan_key = os.getenv("ETHERSCAN_KEY")
self.infura_key = os.getenv("INFURA_KEY")
# التحقق من صحة المفاتيح
self._validate_api_keys()
self.whale_threshold_usd = 50000
self.contracts_db = contracts_db or {}
self.data_manager = None
self.r2_service = r2_service
self.address_labels = {}
self._initialize_comprehensive_exchange_addresses()
self.contract_cache = {}
self.symbol_networks = {}
# إضافة جميع الشبكات المدعومة مع Solana
self.supported_networks = {
'ethereum': {
'name': 'Ethereum',
'chain_id': '0x1',
'native_coin': 'ETH',
'explorer': 'etherscan',
'rpc_endpoints': self._get_ethereum_rpc_endpoints(),
'type': 'evm'
},
'bsc': {
'name': 'Binance Smart Chain',
'chain_id': '0x38',
'native_coin': 'BNB',
'explorer': 'bscscan',
'rpc_endpoints': self._get_bsc_rpc_endpoints(),
'type': 'evm'
},
'polygon': {
'name': 'Polygon',
'chain_id': '0x89',
'native_coin': 'MATIC',
'explorer': 'polygonscan',
'rpc_endpoints': self._get_polygon_rpc_endpoints(),
'type': 'evm'
},
'arbitrum': {
'name': 'Arbitrum',
'chain_id': '0xa4b1',
'native_coin': 'ETH',
'explorer': 'arbiscan',
'rpc_endpoints': self._get_arbitrum_rpc_endpoints(),
'type': 'evm'
},
'optimism': {
'name': 'Optimism',
'chain_id': '0xa',
'native_coin': 'ETH',
'explorer': 'optimistic',
'rpc_endpoints': self._get_optimism_rpc_endpoints(),
'type': 'evm'
},
'avalanche': {
'name': 'Avalanche',
'chain_id': '0xa86a',
'native_coin': 'AVAX',
'explorer': 'snowtrace',
'rpc_endpoints': self._get_avalanche_rpc_endpoints(),
'type': 'evm'
},
'fantom': {
'name': 'Fantom',
'chain_id': '0xfa',
'native_coin': 'FTM',
'explorer': 'ftmscan',
'rpc_endpoints': self._get_fantom_rpc_endpoints(),
'type': 'evm'
},
'solana': {
'name': 'Solana',
'chain_id': 'mainnet-beta',
'native_coin': 'SOL',
'explorer': 'solscan',
'rpc_endpoints': self._get_solana_rpc_endpoints(),
'type': 'solana'
}
}
if self.r2_service:
asyncio.create_task(self._load_contracts_from_r2())
def _validate_api_keys(self):
"""التحقق من صحة مفاتيح API المطلوبة فقط"""
print("🔍 التحقق من صحة مفاتيح API...")
api_keys = {
'MORALIS_KEY': self.moralis_key,
'ETHERSCAN_KEY': self.etherscan_key,
'INFURA_KEY': self.infura_key
}
for key_name, key_value in api_keys.items():
if key_value:
if len(key_value) < 10:
print(f"❌ مفتاح {key_name} غير صالح - سيتم التعطيل")
setattr(self, key_name.lower(), None)
else:
print(f"✅ مفتاح {key_name} صالح")
else:
print(f"⚠️ مفتاح {key_name} غير متوفر")
def _get_ethereum_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Ethereum"""
endpoints = [
'https://rpc.ankr.com/eth',
'https://cloudflare-eth.com',
'https://eth.llamarpc.com',
'https://ethereum.publicnode.com',
'https://1rpc.io/eth',
'https://rpc.flashbots.net'
]
if self.infura_key:
endpoints.insert(0, f'https://mainnet.infura.io/v3/{self.infura_key}')
return endpoints
def _get_bsc_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة BSC"""
return [
'https://bsc-dataseed.binance.org/',
'https://bsc-dataseed1.defibit.io/',
'https://bsc-dataseed1.ninicoin.io/',
'https://bsc.publicnode.com',
'https://binance.llamarpc.com',
'https://1rpc.io/bnb'
]
def _get_polygon_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Polygon"""
endpoints = [
'https://polygon-rpc.com',
'https://rpc-mainnet.matic.network',
'https://polygon-bor.publicnode.com',
'https://1rpc.io/matic',
'https://polygon-rpc-public.com'
]
if self.infura_key:
endpoints.insert(0, f'https://polygon-mainnet.infura.io/v3/{self.infura_key}')
return endpoints
def _get_arbitrum_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Arbitrum"""
endpoints = [
'https://arb1.arbitrum.io/rpc',
'https://arbitrum.publicnode.com',
'https://1rpc.io/arb',
'https://rpc.ankr.com/arbitrum'
]
if self.infura_key:
endpoints.insert(0, f'https://arbitrum-mainnet.infura.io/v3/{self.infura_key}')
return endpoints
def _get_optimism_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Optimism"""
endpoints = [
'https://mainnet.optimism.io',
'https://optimism.publicnode.com',
'https://1rpc.io/op',
'https://rpc.ankr.com/optimism'
]
if self.infura_key:
endpoints.insert(0, f'https://optimism-mainnet.infura.io/v3/{self.infura_key}')
return endpoints
def _get_avalanche_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Avalanche"""
return [
'https://api.avax.network/ext/bc/C/rpc',
'https://avalanche-c-chain.publicnode.com',
'https://1rpc.io/avax/c',
'https://rpc.ankr.com/avalanche'
]
def _get_fantom_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Fantom"""
return [
'https://rpc.ftm.tools/',
'https://fantom.publicnode.com',
'https://1rpc.io/ftm',
'https://rpc.ankr.com/fantom'
]
def _get_solana_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Solana"""
return [
'https://api.mainnet-beta.solana.com',
'https://solana-api.projectserum.com',
'https://rpc.ankr.com/solana',
'https://solana.public-rpc.com',
'https://ssc-dao.genesysgo.net'
]
def _initialize_comprehensive_exchange_addresses(self):
"""تهيئة قاعدة بيانات شاملة لعناوين كل المنصات"""
self.exchange_addresses = {
# 🔵 منصات KuCoin (شاملة)
'kucoin': [
'0x2b5634c42055806a59e9107ed44d43c426e58258',
'0x689c56aef474df92d44a1b70850f808488f9769c',
'0xa1d8d972560c2f8144af871db508f0b0b10a3fbf',
'0x4ad64983349c49defe8d7a4686202d24b25d0ce8',
'0x1692e170361cefd1eb7240ec13d048fd9af6d667',
'0xd6216fc19db775df9774a6e33526131da7d19a2c',
'0xe59cd29be3be4461d79c0881a238c467a2b3775c',
'0x899b5d52671830f567bf43a14684eb14e1f945fe'
],
# 🔵 منصات مركزية أخرى
'binance': [
'0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be',
'0xd551234ae421e3bcba99a0da6d736074f22192ff',
'0x564286362092d8e7936f0549571a803b203aaced',
'0x0681d8db095565fe8a346fa0277bffde9c0edbbf',
'0xfe9e8709d3215310075d67e3ed32a380ccf451c8'
],
'coinbase': [
'0x71660c4005ba85c37ccec55d0c4493e66fe775d3',
'0x503828976d22510aad0201ac7ec88293211d23da',
'0xddfabcdc4d8ffc6d5beaf154f18b778f892a0740'
],
'kraken': [
'0x2910543af39aba0cd09dbb2d50200b3e800a63d2',
'0xa160cdab225685da1d56aa342ad8841c3b53f291'
],
'okx': [
'0x6cc5f688a315f3dc28a7781717a9a798a59fda7b',
'0x2c8fbb630289363ac80705a1a61273f76fd5a161'
],
# 🟢 منصات لامركزية
'uniswap': [
'0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad',
'0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45'
],
'pancakeswap': [
'0x10ed43c718714eb63d5aa57b78b54704e256024e',
'0x13f4ea83d0bd40e75c8222255bc855a974568dd4'
],
# 🔵 عناوين Solana للمنصات
'solana_kucoin': [
'F3a5ZLPKUCrCj6CGKP2m9wS9Y2L8RcUBoJq9L2D2sUYi',
'8Wq7Z6Z1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t'
],
'solana_binance': [
'5tzFkiKscXHK5ZXCGbXZJpXaWuBtZ6RrH8eL2a7Yi7Vn',
'8Wq7Z6Z1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t'
],
'solana_coinbase': [
'H8q7Z6Z1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t',
'G8q7Z6Z1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t'
]
}
# تحميل جميع العناوين في التصنيفات
self.address_categories = {
'exchange': set(),
'cex': set(),
'dex': set(),
'whale': set(),
'unknown': set()
}
for exchange_type, addresses in self.exchange_addresses.items():
for address in addresses:
self.address_labels[address.lower()] = exchange_type
if exchange_type in ['binance', 'kucoin', 'coinbase', 'kraken', 'okx', 'solana_kucoin', 'solana_binance', 'solana_coinbase']:
self.address_categories['cex'].add(address.lower())
else:
self.address_categories['dex'].add(address.lower())
self.address_categories['exchange'].add(address.lower())
async def _load_contracts_from_r2(self):
"""تحميل قاعدة بيانات العقود من R2"""
try:
key = "contracts.json"
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
contracts_data = json.loads(response['Body'].read())
self.contracts_db.update(contracts_data)
print(f"✅ تم تحميل {len(contracts_data)} عقد من R2")
except Exception as e:
print(f"⚠️ لم يتم العثور على قاعدة بيانات العقود في R2: {e}")
async def get_symbol_whale_activity(self, symbol, contract_address=None):
"""
مراقبة تحركات الحيتان لعملة محددة مع دعم Solana
"""
try:
print(f"🔍 بدء مراقبة الحيتان المتقدمة للعملة: {symbol}")
# 1. البحث عن عقد العملة إذا لم يتم توفيره
if not contract_address:
contract_address = await self._find_contract_address_enhanced(symbol)
if not contract_address:
print(f"❌ لم يتم العثور على عقد للعملة {symbol}")
return self._create_no_contract_response(symbol)
# 2. تحديد الشبكة
network = self.symbol_networks.get(symbol.lower(), 'ethereum')
# 3. جلب بيانات التحويلات من جميع المصادر بما فيها Solana
transfers = await self._get_enhanced_transfer_data(contract_address, network)
if not transfers:
print(f"⚠️ لم يتم العثور على تحويلات للعملة {symbol}")
return self._create_no_transfers_response(symbol)
# 4. تحليل التحركات الحديثة فقط (آخر 120 دقيقة)
recent_transfers = self._filter_recent_transfers(transfers, max_minutes=120)
if not recent_transfers:
print(f"⚠️ لا توجد تحويلات حديثة للعملة {symbol}")
return self._create_no_recent_activity_response(symbol)
# 5. تحليل تأثير الحيتان المحسن
analysis = self._analyze_enhanced_whale_impact(recent_transfers, symbol)
# 6. طباعة ملخص مفصل
self._print_detailed_whale_summary(symbol, analysis)
return analysis
except Exception as e:
print(f"❌ خطأ في مراقبة الحيتان للعملة {symbol}: {e}")
return self._create_error_response(symbol, str(e))
async def _get_enhanced_transfer_data(self, contract_address, network):
"""جلب بيانات التحويلات من مصادر متعددة محسنة مع دعم Solana"""
print(f"🌐 جلب بيانات التحويلات من جميع الشبكات للعقد: {contract_address}")
# التأكد من أن contract_address هو نصي وليس قاموس
if isinstance(contract_address, dict):
print(f"⚠️ contract_address هو قاموس، تحويل إلى نص: {contract_address}")
contract_address = str(contract_address)
tasks = []
# جلب البيانات من جميع الشبكات المدعومة
for net_name, net_config in self.supported_networks.items():
if net_config['type'] == 'evm':
tasks.append(self._get_evm_network_transfer_data(contract_address, net_name))
elif net_config['type'] == 'solana':
tasks.append(self._get_solana_transfer_data(contract_address, net_name))
# إضافة مصادر خارجية
if self.moralis_key:
tasks.append(self._get_moralis_token_data(contract_address, network))
results = await asyncio.gather(*tasks, return_exceptions=True)
all_transfers = []
for i, res in enumerate(results):
if isinstance(res, list):
all_transfers.extend(res)
net_name = list(self.supported_networks.keys())[i] if i < len(self.supported_networks) else 'external'
print(f" ✅ {net_name}: {len(res)} تحويلة")
elif isinstance(res, Exception):
net_name = list(self.supported_networks.keys())[i] if i < len(self.supported_networks) else 'external'
print(f" ❌ {net_name}: {str(res)}")
print(f"📊 إجمالي التحويلات المجمعة: {len(all_transfers)}")
return all_transfers
async def _get_evm_network_transfer_data(self, contract_address, network):
"""جلب بيانات التحويلات من شبكة EVM محددة"""
try:
transfers = []
# المحاولة عبر RPC أولاً
rpc_transfers = await self._get_rpc_token_data_enhanced(contract_address, network)
transfers.extend(rpc_transfers)
# المحاولة عبر Explorer إذا كان متوفراً
explorer_transfers = await self._get_explorer_token_data(contract_address, network)
transfers.extend(explorer_transfers)
return transfers
except Exception as e:
print(f"❌ خطأ في جلب بيانات {network}: {e}")
return []
async def _get_solana_transfer_data(self, token_address, network='solana', limit=100):
"""جلب بيانات التحويلات من شبكة Solana"""
try:
print(f" 🔍 جلب تحويلات Solana للتوكن: {token_address}")
transfers = []
for endpoint in self.supported_networks[network]['rpc_endpoints']:
try:
# جلب تحويلات التوكن من Solana
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "getSignaturesForAddress",
"params": [
token_address,
{"limit": limit}
]
}
async with httpx.AsyncClient(timeout=20.0) as client:
response = await client.post(endpoint, json=payload)
if response.status_code != 200:
continue
data = response.json()
if 'result' not in data:
continue
signatures = [tx['signature'] for tx in data['result']]
# جلب تفاصيل كل معاملة
for signature in signatures:
try:
tx_detail = await self._get_solana_transaction_detail(signature, endpoint)
if tx_detail and self._is_solana_token_transfer(tx_detail, token_address):
transfer = await self._parse_solana_transfer(tx_detail, token_address)
if transfer:
transfers.append(transfer)
except Exception as e:
continue
print(f" ✅ Solana: تم العثور على {len(transfers)} تحويلة")
break # إذا نجح endpoint واحد، نتوقف
except Exception as e:
print(f" ❌ فشل endpoint في Solana: {e}")
continue
return transfers
except Exception as e:
print(f"❌ فشل جلب بيانات Solana: {e}")
return []
async def _get_solana_transaction_detail(self, signature, endpoint):
"""جلب تفاصيل معاملة Solana"""
try:
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "getTransaction",
"params": [
signature,
{"encoding": "jsonParsed"}
]
}
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(endpoint, json=payload)
if response.status_code == 200:
return response.json().get('result')
return None
except Exception as e:
return None
def _is_solana_token_transfer(self, transaction, token_address):
"""التحقق إذا كانت المعاملة تحويل توكن في Solana"""
try:
if not transaction or 'meta' not in transaction:
return False
meta = transaction['meta']
if 'postTokenBalances' not in meta or 'preTokenBalances' not in meta:
return False
# التحقق من وجود تغييرات في أرصدة التوكن المحدد
for balance in meta['postTokenBalances'] + meta['preTokenBalances']:
if balance.get('mint') == token_address:
return True
return False
except Exception:
return False
async def _parse_solana_transfer(self, transaction, token_address):
"""تحليل معاملة Solana لاستخراج بيانات التحويل"""
try:
# استخراج المعلومات الأساسية
signature = transaction['transaction']['signatures'][0]
block_time = transaction.get('blockTime', int(time.time()))
# استخراج المرسل والمستلم من تغييرات الأرصدة
sender = ""
receiver = ""
amount = 0
meta = transaction['meta']
pre_balances = {bal['owner']: bal for bal in meta.get('preTokenBalances', []) if bal.get('mint') == token_address}
post_balances = {bal['owner']: bal for bal in meta.get('postTokenBalances', []) if bal.get('mint') == token_address}
for owner in set(list(pre_balances.keys()) + list(post_balances.keys())):
pre_balance = pre_balances.get(owner, {}).get('uiTokenAmount', {}).get('uiAmount', 0)
post_balance = post_balances.get(owner, {}).get('uiTokenAmount', {}).get('uiAmount', 0)
if post_balance > pre_balance:
receiver = owner
amount = post_balance - pre_balance
elif pre_balance > post_balance:
sender = owner
return {
'hash': signature,
'from': sender,
'to': receiver,
'value': str(amount),
'timeStamp': str(block_time),
'blockNumber': str(transaction.get('slot', 0)),
'network': 'solana',
'type': 'solana_transfer'
}
except Exception as e:
print(f"❌ خطأ في تحليل معاملة Solana: {e}")
return None
async def _get_rpc_token_data_enhanced(self, contract_address, network='ethereum', blocks=100):
"""جلب بيانات التحويلات عبر RPC مع معالجة الأخطاء المحسنة"""
try:
if network not in self.supported_networks or self.supported_networks[network]['type'] != 'evm':
return []
# التأكد من أن contract_address هو نصي
if not isinstance(contract_address, str):
print(f"⚠️ contract_address ليس نصي في {network}: {type(contract_address)}")
contract_address = str(contract_address)
endpoints = self.supported_networks[network]['rpc_endpoints']
transfers = []
for endpoint in endpoints:
try:
endpoint_name = endpoint.split('//')[1].split('/')[0] if '//' in endpoint else endpoint
print(f" 🔍 فحص {blocks} كتلة في {network} عبر {endpoint_name}")
# الحصول على أحدث كتلة
payload = {"jsonrpc": "2.0", "method": "eth_blockNumber", "params": [], "id": 1}
async with httpx.AsyncClient(timeout=20.0) as client:
response = await client.post(endpoint, json=payload)
if response.status_code != 200:
print(f" ❌ فشل في الحصول على أحدث كتلة من {endpoint_name}")
continue
result = response.json().get('result')
if not result:
print(f" ❌ لم يتم العثور على نتيجة من {endpoint_name}")
continue
latest_block = int(result, 16)
print(f" 📦 أحدث كتلة في {network}: {latest_block}")
# فحص آخر `blocks` كتلة
successful_blocks = 0
for block_offset in range(blocks):
block_number = latest_block - block_offset
if block_number < 0:
break
payload = {
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": [hex(block_number), True],
"id": 1
}
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(endpoint, json=payload)
if response.status_code != 200:
continue
block_data = response.json().get('result')
if not block_data:
continue
successful_blocks += 1
# فحص جميع المعاملات في الكتلة
for tx in block_data.get('transactions', []):
# التأكد من أن to هو نصي قبل استخدام lower()
tx_to = tx.get('to', '')
if not isinstance(tx_to, str):
continue
# فحص إذا كانت المعاملة مرسلة إلى عقد العملة
if (tx_to.lower() == contract_address.lower() and
tx.get('input', '0x') != '0x'):
transfers.append({
'hash': tx.get('hash'),
'from': tx.get('from'),
'to': tx.get('to'),
'value': str(int(tx.get('value', '0x0'), 16)),
'timeStamp': str(int(block_data.get('timestamp', '0x0'), 16)),
'blockNumber': str(block_number),
'network': network
})
if block_offset % 20 == 0:
print(f" 📦 تم فحص {block_offset} كتلة من {blocks} في {network}")
except Exception as block_error:
print(f" ⚠️ خطأ في كتلة {block_number}: {block_error}")
continue
print(f" ✅ {network}: تم فحص {successful_blocks} كتلة بنجاح، تم العثور على {len([t for t in transfers if t['network'] == network])} تحويلة")
if successful_blocks > 0:
break # إذا نجح endpoint واحد، نتوقف
except Exception as e:
print(f" ❌ فشل endpoint في {network}: {e}")
continue
return transfers
except Exception as e:
print(f"❌ فشل جلب بيانات RPC لـ {network}: {e}")
return []
async def _get_explorer_token_data(self, contract_address, network):
"""جلب بيانات التحويلات من Explorer"""
if network not in self.supported_networks:
return []
try:
# استخدام Etherscan فقط للشبكات المدعومة
explorer_config = {
'ethereum': {'url': 'https://api.etherscan.io/api', 'key': self.etherscan_key},
}
if network not in explorer_config or not explorer_config[network]['key']:
return []
config = explorer_config[network]
params = {
"module": "account",
"action": "tokentx",
"contractaddress": contract_address,
"page": 1,
"offset": 100,
"sort": "desc",
"apikey": config['key']
}
async with httpx.AsyncClient(timeout=20.0) as client:
response = await client.get(config['url'], params=params)
if response.status_code == 200:
data = response.json()
if data.get('status') == '1' and data.get('message') == 'OK':
transfers = data.get('result', [])
# إضافة معلومات الشبكة
for transfer in transfers:
transfer['network'] = network
return transfers
else:
print(f"⚠️ Explorer {network} returned error: {data.get('message')}")
return []
else:
print(f"⚠️ Explorer {network} request failed: {response.status_code}")
return []
except Exception as e:
print(f"❌ فشل جلب بيانات Explorer لـ {network}: {e}")
return []
async def _get_moralis_token_data(self, contract_address, network='ethereum'):
"""جلب بيانات التحويلات من Moralis"""
if not self.moralis_key:
return []
try:
chains = {
'ethereum': '0x1',
'bsc': '0x38',
'polygon': '0x89',
'arbitrum': '0xa4b1',
'optimism': '0xa',
'avalanche': '0xa86a',
'fantom': '0xfa'
}
chain_id = chains.get(network)
if not chain_id:
return []
url = f"https://deep-index.moralis.io/api/v2/erc20/{contract_address}/transfers"
headers = {
"X-API-Key": self.moralis_key,
"Accept": "application/json"
}
params = {
"chain": chain_id,
"limit": 100
}
async with httpx.AsyncClient(timeout=20.0) as client:
response = await client.get(url, headers=headers, params=params)
if response.status_code == 200:
result = response.json().get('result', [])
# إضافة معلومات الشبكة
for transfer in result:
transfer['network'] = network
return result
else:
return []
except Exception as e:
return []
def _filter_recent_transfers(self, transfers, max_minutes=120):
"""ترشيح التحويلات الحديثة فقط"""
recent_transfers = []
current_time = datetime.now()
for transfer in transfers:
try:
if 'timeStamp' in transfer:
timestamp = int(transfer['timeStamp'])
elif 'block_timestamp' in transfer:
timestamp = int(transfer['block_timestamp'])
else:
continue
transfer_time = datetime.fromtimestamp(timestamp)
time_diff = current_time - transfer_time
if time_diff.total_seconds() <= max_minutes * 60:
transfer['minutes_ago'] = time_diff.total_seconds() / 60
transfer['human_time'] = transfer_time.isoformat()
recent_transfers.append(transfer)
except Exception as e:
continue
return recent_transfers
def _analyze_enhanced_whale_impact(self, transfers, symbol):
"""تحليل تأثير تحركات الحيتان المحسن"""
# تحليل التدفقات إلى/من المنصات
exchange_flows = {
'to_exchanges': [],
'from_exchanges': [],
'other_transfers': [],
'network_breakdown': defaultdict(lambda: {'to_exchanges': 0, 'from_exchanges': 0, 'other': 0})
}
total_volume = 0
whale_transfers = 0
network_stats = defaultdict(int)
for transfer in transfers:
from_addr = transfer.get('from', '')
to_addr = transfer.get('to', '')
value = float(transfer.get('value', 0))
network = transfer.get('network', 'unknown')
# التأكد من أن العناوين نصوص
if not isinstance(from_addr, str):
from_addr = str(from_addr)
if not isinstance(to_addr, str):
to_addr = str(to_addr)
# تحويل القيمة إلى دولار (تقديري)
value_usd = value / 1e18 * 1000 # تقدير - يحتاج لدمج مع سعر العملة
total_volume += value_usd
network_stats[network] += 1
if value_usd >= self.whale_threshold_usd:
whale_transfers += 1
# تصنيف اتجاه التدفق
if self._is_exchange_address(to_addr):
exchange_flows['to_exchanges'].append({
'value_usd': value_usd,
'from_type': self._classify_address(from_addr),
'to_exchange': self._classify_address(to_addr),
'transaction': transfer,
'network': network
})
exchange_flows['network_breakdown'][network]['to_exchanges'] += 1
elif self._is_exchange_address(from_addr):
exchange_flows['from_exchanges'].append({
'value_usd': value_usd,
'from_exchange': self._classify_address(from_addr),
'to_type': self._classify_address(to_addr),
'transaction': transfer,
'network': network
})
exchange_flows['network_breakdown'][network]['from_exchanges'] += 1
else:
exchange_flows['other_transfers'].append(transfer)
exchange_flows['network_breakdown'][network]['other'] += 1
# حساب الإحصائيات
total_to_exchanges = sum(t['value_usd'] for t in exchange_flows['to_exchanges'])
total_from_exchanges = sum(t['value_usd'] for t in exchange_flows['from_exchanges'])
net_flow = total_to_exchanges - total_from_exchanges
# توليد الإشارة المحسنة
signal = self._generate_enhanced_trading_signal(
total_to_exchanges,
total_from_exchanges,
net_flow,
len(exchange_flows['to_exchanges']),
len(exchange_flows['from_exchanges']),
whale_transfers,
network_stats
)
return {
'symbol': symbol,
'data_available': True,
'analysis_timestamp': datetime.now().isoformat(),
'summary': {
'total_transfers_analyzed': len(transfers),
'whale_transfers_count': whale_transfers,
'total_volume_usd': total_volume,
'time_window_minutes': 120,
'networks_analyzed': dict(network_stats)
},
'exchange_flows': {
'to_exchanges_usd': total_to_exchanges,
'from_exchanges_usd': total_from_exchanges,
'net_flow_usd': net_flow,
'deposit_count': len(exchange_flows['to_exchanges']),
'withdrawal_count': len(exchange_flows['from_exchanges']),
'network_breakdown': dict(exchange_flows['network_breakdown'])
},
'trading_signal': signal,
'llm_friendly_summary': self._create_enhanced_llm_summary(signal, exchange_flows, network_stats)
}
def _generate_enhanced_trading_signal(self, to_exchanges, from_exchanges, net_flow, deposit_count, withdrawal_count, whale_count, network_stats):
"""توليد إشارة تداول محسنة بناء على تحركات الحيتان"""
# حساب قوة الإشارة بناء على عدد الشبكات
network_strength = min(len(network_stats) / 3, 1.0)
if net_flow > 500000 and deposit_count >= 2:
confidence = min(0.95, (net_flow / 1000000) * (1 + network_strength * 0.2))
return {
'action': 'STRONG_SELL',
'confidence': confidence,
'reason': f'ضغط بيعي قوي عبر {len(network_stats)} شبكة: ${net_flow:,.0f} إيداع إلى المنصات',
'critical_alert': net_flow > 1000000
}
elif net_flow > 250000 and deposit_count >= 1:
confidence = min(0.80, (net_flow / 500000) * (1 + network_strength * 0.1))
return {
'action': 'SELL',
'confidence': confidence,
'reason': f'ضغط بيعي عبر {len(network_stats)} شبكة: ${net_flow:,.0f} إيداع إلى المنصات',
'critical_alert': False
}
elif net_flow < -500000 and withdrawal_count >= 2:
confidence = min(0.95, (abs(net_flow) / 1000000) * (1 + network_strength * 0.2))
return {
'action': 'STRONG_BUY',
'confidence': confidence,
'reason': f'تراكم شرائي قوي عبر {len(network_stats)} شبكة: ${abs(net_flow):,.0f} سحب من المنصات',
'critical_alert': abs(net_flow) > 1000000
}
elif net_flow < -250000 and withdrawal_count >= 1:
confidence = min(0.80, (abs(net_flow) / 500000) * (1 + network_strength * 0.1))
return {
'action': 'BUY',
'confidence': confidence,
'reason': f'تراكم شرائي عبر {len(network_stats)} شبكة: ${abs(net_flow):,.0f} سحب من المنصات',
'critical_alert': False
}
else:
return {
'action': 'HOLD',
'confidence': 0.5 + (network_strength * 0.1),
'reason': f'نشاط حيتان طبيعي عبر {len(network_stats)} شبكة: {whale_count} تحويلة كبيرة',
'critical_alert': False
}
def _create_enhanced_llm_summary(self, signal, exchange_flows, network_stats):
"""إنشاء ملخص محسن للنموذج الضخم"""
return {
'whale_activity_summary': signal['reason'],
'recommended_action': signal['action'],
'confidence': signal['confidence'],
'key_metrics': {
'net_flow_direction': 'TO_EXCHANGES' if signal['action'] in ['SELL', 'STRONG_SELL'] else 'FROM_EXCHANGES' if signal['action'] in ['BUY', 'STRONG_BUY'] else 'BALANCED',
'whale_movement_impact': 'HIGH' if signal['confidence'] > 0.8 else 'MEDIUM' if signal['confidence'] > 0.6 else 'LOW',
'exchange_involvement': 'HIGH' if len(exchange_flows['to_exchanges']) + len(exchange_flows['from_exchanges']) > 5 else 'MEDIUM' if len(exchange_flows['to_exchanges']) + len(exchange_flows['from_exchanges']) > 2 else 'LOW',
'network_coverage': f"{len(network_stats)} شبكات",
'total_networks_analyzed': len(network_stats)
}
}
def _print_detailed_whale_summary(self, symbol, analysis):
"""طباعة ملخص مفصل لتحركات الحيتان"""
summary = analysis['summary']
signal = analysis['trading_signal']
flows = analysis['exchange_flows']
print(f"🐋 ملخص الحيتان المحسن لـ {symbol}:")
print(f" • التحويلات الكبيرة: {summary['whale_transfers_count']}")
print(f" • الشبكات التي تم تحليلها: {', '.join(summary['networks_analyzed'].keys())}")
print(f" • الإيداعات للمنصات: {flows['deposit_count']} (${flows['to_exchanges_usd']:,.0f})")
print(f" • السحوبات من المنصات: {flows['withdrawal_count']} (${flows['from_exchanges_usd']:,.0f})")
print(f" • صافي التدفق: ${flows['net_flow_usd']:,.0f}")
print(f" • الإشارة: {signal['action']} (ثقة: {signal['confidence']:.2f})")
print(f" • السبب: {signal['reason']}")
# طباعة تفصيل الشبكات
for network, stats in flows['network_breakdown'].items():
print(f" • {network}: إيداعات={stats['to_exchanges']}, سحوبات={stats['from_exchanges']}")
async def _find_contract_address_enhanced(self, symbol):
"""بحث متقدم عن عقد العملة مع دعم Solana"""
base_symbol = symbol.split("/")[0] if '/' in symbol else symbol
symbol_lower = base_symbol.lower()
# 1. البحث في الكاش أولاً
if symbol_lower in self.contract_cache:
return self.contract_cache[symbol_lower]
# 2. البحث في قاعدة البيانات المحلية
for key, address in self.contracts_db.items():
if symbol_lower in key.lower():
self.contract_cache[symbol_lower] = address
return address
# 3. البحث في CoinGecko
print(f"🔍 البحث عن عقد {symbol} في CoinGecko...")
coingecko_address = await self._find_contract_via_coingecko(base_symbol)
if coingecko_address:
self.contract_cache[symbol_lower] = coingecko_address
self.contracts_db[symbol_lower] = coingecko_address
if self.r2_service:
await self._save_contracts_to_r2()
print(f"✅ تم العثور على عقد {symbol} عبر CoinGecko")
return coingecko_address
# 4. البحث في Moralis
print(f"🔍 البحث عن عقد {symbol} في Moralis...")
moralis_address = await self._find_contract_via_moralis(base_symbol)
if moralis_address:
self.contract_cache[symbol_lower] = moralis_address
self.contracts_db[symbol_lower] = moralis_address
if self.r2_service:
await self._save_contracts_to_r2()
print(f"✅ تم العثور على عقد {symbol} عبر Moralis")
return moralis_address
return None
async def _find_contract_via_coingecko(self, symbol):
"""البحث عن عقد العملة عبر CoinGecko"""
try:
search_url = f"https://api.coingecko.com/api/v3/search?query={symbol}"
async with httpx.AsyncClient(timeout=15) as client:
response = await client.get(search_url)
if response.status_code != 200:
return None
data = response.json()
coins = data.get('coins', [])
if not coins:
return None
# البحث عن أفضل تطابق
best_coin = None
for coin in coins:
if coin.get('symbol', '').lower() == symbol.lower() or coin.get('name', '').lower() == symbol.lower():
best_coin = coin
break
if not best_coin:
best_coin = coins[0]
coin_id = best_coin.get('id')
if not coin_id:
return None
detail_url = f"https://api.coingecko.com/api/v3/coins/{coin_id}"
detail_response = await client.get(detail_url)
if detail_response.status_code != 200:
return None
detail_data = detail_response.json()
platforms = detail_data.get('platforms', {})
network_priority = ['ethereum', 'binance-smart-chain', 'polygon-pos', 'arbitrum-one', 'optimistic-ethereum', 'avalanche', 'fantom', 'solana']
for platform in network_priority:
if platform in platforms and platforms[platform]:
address = platforms[platform]
if address:
network_map = {
'ethereum': 'ethereum',
'binance-smart-chain': 'bsc',
'polygon-pos': 'polygon',
'arbitrum-one': 'arbitrum',
'optimistic-ethereum': 'optimism',
'avalanche': 'avalanche',
'fantom': 'fantom',
'solana': 'solana'
}
self.symbol_networks[symbol.lower()] = network_map.get(platform, 'ethereum')
return address
return None
except Exception as e:
print(f"❌ فشل البحث في CoinGecko: {e}")
return None
async def _find_contract_via_moralis(self, symbol):
"""البحث عن عقد العملة عبر Moralis"""
if not self.moralis_key:
return None
try:
url = f"https://deep-index.moralis.io/api/v2/erc20/search?chain=eth&q={symbol}"
headers = {
"X-API-Key": self.moralis_key,
"Accept": "application/json"
}
async with httpx.AsyncClient(timeout=15) as client:
response = await client.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
if data.get('result') and len(data['result']) > 0:
token = data['result'][0]
address = token.get('address')
if address and len(address) == 42:
return address
return None
except Exception as e:
print(f"❌ فشل البحث في Moralis: {e}")
return None
def _is_exchange_address(self, address):
"""التحقق إذا كان العنوان ينتمي إلى منصة"""
try:
if not isinstance(address, str):
address = str(address)
address_lower = address.lower()
return address_lower in self.address_categories['exchange']
except Exception as e:
return False
def _classify_address(self, address):
"""تصنيف العنوان إلى نوع محدد"""
try:
if not isinstance(address, str):
address = str(address)
address_lower = address.lower()
if address_lower in self.address_labels:
return self.address_labels[address_lower]
return 'unknown'
except Exception as e:
return 'unknown'
def _create_no_contract_response(self, symbol):
return {
'symbol': symbol,
'data_available': False,
'error': 'NO_CONTRACT_FOUND',
'trading_signal': {
'action': 'HOLD',
'confidence': 0.3,
'reason': 'لم يتم العثور على عقد العملة'
},
'llm_friendly_summary': {
'whale_activity_summary': 'لا توجد بيانات عن تحركات الحيتان - لم يتم العثور على عقد العملة',
'recommended_action': 'HOLD',
'confidence': 0.3,
'key_metrics': {'whale_movement_impact': 'NO_DATA'}
}
}
def _create_no_transfers_response(self, symbol):
return {
'symbol': symbol,
'data_available': False,
'error': 'NO_TRANSFERS_FOUND',
'trading_signal': {
'action': 'HOLD',
'confidence': 0.5,
'reason': 'لا توجد تحويلات للعملة'
},
'llm_friendly_summary': {
'whale_activity_summary': 'لا توجد تحويلات حديثة للعملة',
'recommended_action': 'HOLD',
'confidence': 0.5,
'key_metrics': {'whale_movement_impact': 'NO_DATA'}
}
}
def _create_no_recent_activity_response(self, symbol):
return {
'symbol': symbol,
'data_available': False,
'error': 'NO_RECENT_ACTIVITY',
'trading_signal': {
'action': 'HOLD',
'confidence': 0.5,
'reason': 'لا توجد تحويلات حديثة للعملة'
},
'llm_friendly_summary': {
'whale_activity_summary': 'لا توجد تحويلات حيتان حديثة في آخر 120 دقيقة',
'recommended_action': 'HOLD',
'confidence': 0.5,
'key_metrics': {'whale_movement_impact': 'LOW'}
}
}
def _create_error_response(self, symbol, error_msg):
return {
'symbol': symbol,
'data_available': False,
'error': f'ANALYSIS_ERROR: {error_msg}',
'trading_signal': {
'action': 'HOLD',
'confidence': 0.3,
'reason': f'خطأ في تحليل الحيتان: {error_msg}'
},
'llm_friendly_summary': {
'whale_activity_summary': f'فشل في تحليل تحركات الحيتان: {error_msg}',
'recommended_action': 'HOLD',
'confidence': 0.3,
'key_metrics': {'whale_movement_impact': 'ERROR'}
}
}
async def _save_contracts_to_r2(self):
"""حفظ قاعدة البيانات العقود إلى R2"""
try:
key = "contracts.json"
data_json = json.dumps(self.contracts_db, indent=2).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
print(f"✅ تم حفظ قاعدة بيانات العقود إلى R2")
except Exception as e:
print(f"❌ فشل حفظ قاعدة البيانات العقود: {e}")
def _calculate_whale_activity_score(self, whale_data):
"""حساب درجة نشاط الحيتان"""
try:
if not whale_data or not whale_data.get('data_available', False):
return 0.0
signal = whale_data.get('trading_signal', {})
confidence = signal.get('confidence', 0.0)
action = signal.get('action', 'HOLD')
# تعيين أوزان بناءً على نوع الإجراء
action_weights = {
'STRONG_BUY': 1.0,
'BUY': 0.7,
'STRONG_SELL': -1.0,
'SELL': -0.7,
'HOLD': 0.0
}
base_score = action_weights.get(action, 0.0)
weighted_score = base_score * confidence
# تطبيع النتيجة بين 0 و 1
normalized_score = (weighted_score + 1) / 2
return max(0.0, min(1.0, normalized_score))
except Exception as e:
print(f"❌ خطأ في حساب درجة نشاط الحيتان: {e}")
return 0.0
async def generate_whale_trading_signal(self, symbol, whale_data, market_context):
"""توليد إشارة تداول بناءً على بيانات الحيتان"""
try:
if not whale_data or not whale_data.get('data_available', False):
return {
'action': 'HOLD',
'confidence': 0.3,
'reason': 'لا توجد بيانات كافية عن نشاط الحيتان',
'source': 'whale_analysis'
}
whale_signal = whale_data.get('trading_signal', {})
return {
'action': whale_signal.get('action', 'HOLD'),
'confidence': whale_signal.get('confidence', 0.5),
'reason': whale_signal.get('reason', 'تحليل الحيتان غير متوفر'),
'source': 'whale_analysis',
'critical_alert': whale_signal.get('critical_alert', False),
'whale_activity_score': self._calculate_whale_activity_score(whale_data)
}
except Exception as e:
print(f"❌ خطأ في توليد إشارة تداول الحيتان لـ {symbol}: {e}")
return {
'action': 'HOLD',
'confidence': 0.3,
'reason': f'خطأ في تحليل الحيتان: {str(e)}',
'source': 'whale_analysis'
}
async def cleanup(self):
await self.http_client.aclose()
print("✅ EnhancedWhaleMonitor loaded - Advanced multi-network whale monitoring with Solana support ready")