Trad / whale_news_data.py
Riy777's picture
Update whale_news_data.py
3b15070
raw
history blame
53.5 kB
# whale_news_data.py
import os
import asyncio
import httpx
import json
import traceback
import time
from datetime import datetime, timedelta
from collections import defaultdict, deque
import ccxt.pro as ccxt
import numpy as np
import logging
# تعطيل تسجيل HTTP المزعج
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
class EnhancedWhaleMonitor:
def __init__(self, contracts_db=None, r2_service=None):
self.http_client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
follow_redirects=True
)
# الاحتفاظ بالمفاتيح المطلوبة فقط
self.moralis_key = os.getenv("MORALIS_KEY")
self.etherscan_key = os.getenv("ETHERSCAN_KEY")
self.infura_key = os.getenv("INFURA_KEY")
# التحقق من صحة المفاتيح
self._validate_api_keys()
self.whale_threshold_usd = 50000
self.contracts_db = contracts_db or {}
self.data_manager = None
self.r2_service = r2_service
self.address_labels = {}
self._initialize_comprehensive_exchange_addresses()
self.contract_cache = {}
self.symbol_networks = {}
# إضافة جميع الشبكات المدعومة مع Solana
self.supported_networks = {
'ethereum': {
'name': 'Ethereum',
'chain_id': '0x1',
'native_coin': 'ETH',
'explorer': 'etherscan',
'rpc_endpoints': self._get_ethereum_rpc_endpoints(),
'type': 'evm'
},
'bsc': {
'name': 'Binance Smart Chain',
'chain_id': '0x38',
'native_coin': 'BNB',
'explorer': 'bscscan',
'rpc_endpoints': self._get_bsc_rpc_endpoints(),
'type': 'evm'
},
'polygon': {
'name': 'Polygon',
'chain_id': '0x89',
'native_coin': 'MATIC',
'explorer': 'polygonscan',
'rpc_endpoints': self._get_polygon_rpc_endpoints(),
'type': 'evm'
},
'arbitrum': {
'name': 'Arbitrum',
'chain_id': '0xa4b1',
'native_coin': 'ETH',
'explorer': 'arbiscan',
'rpc_endpoints': self._get_arbitrum_rpc_endpoints(),
'type': 'evm'
},
'optimism': {
'name': 'Optimism',
'chain_id': '0xa',
'native_coin': 'ETH',
'explorer': 'optimistic',
'rpc_endpoints': self._get_optimism_rpc_endpoints(),
'type': 'evm'
},
'avalanche': {
'name': 'Avalanche',
'chain_id': '0xa86a',
'native_coin': 'AVAX',
'explorer': 'snowtrace',
'rpc_endpoints': self._get_avalanche_rpc_endpoints(),
'type': 'evm'
},
'fantom': {
'name': 'Fantom',
'chain_id': '0xfa',
'native_coin': 'FTM',
'explorer': 'ftmscan',
'rpc_endpoints': self._get_fantom_rpc_endpoints(),
'type': 'evm'
},
'solana': {
'name': 'Solana',
'chain_id': 'mainnet-beta',
'native_coin': 'SOL',
'explorer': 'solscan',
'rpc_endpoints': self._get_solana_rpc_endpoints(),
'type': 'solana'
}
}
if self.r2_service:
asyncio.create_task(self._load_contracts_from_r2())
def _validate_api_keys(self):
"""التحقق من صحة مفاتيح API المطلوبة فقط"""
print("🔍 التحقق من صحة مفاتيح API...")
api_keys = {
'MORALIS_KEY': self.moralis_key,
'ETHERSCAN_KEY': self.etherscan_key,
'INFURA_KEY': self.infura_key
}
for key_name, key_value in api_keys.items():
if key_value:
if len(key_value) < 10:
print(f"❌ مفتاح {key_name} غير صالح - سيتم التعطيل")
setattr(self, key_name.lower(), None)
else:
print(f"✅ مفتاح {key_name} صالح")
else:
print(f"⚠️ مفتاح {key_name} غير متوفر")
def _get_ethereum_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Ethereum"""
endpoints = [
'https://rpc.ankr.com/eth',
'https://cloudflare-eth.com',
'https://eth.llamarpc.com',
'https://ethereum.publicnode.com',
'https://1rpc.io/eth',
'https://rpc.flashbots.net'
]
if self.infura_key:
endpoints.insert(0, f'https://mainnet.infura.io/v3/{self.infura_key}')
return endpoints
def _get_bsc_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة BSC"""
return [
'https://bsc-dataseed.binance.org/',
'https://bsc-dataseed1.defibit.io/',
'https://bsc-dataseed1.ninicoin.io/',
'https://bsc.publicnode.com',
'https://binance.llamarpc.com',
'https://1rpc.io/bnb'
]
def _get_polygon_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Polygon"""
endpoints = [
'https://polygon-rpc.com',
'https://rpc-mainnet.matic.network',
'https://polygon-bor.publicnode.com',
'https://1rpc.io/matic',
'https://polygon-rpc-public.com'
]
if self.infura_key:
endpoints.insert(0, f'https://polygon-mainnet.infura.io/v3/{self.infura_key}')
return endpoints
def _get_arbitrum_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Arbitrum"""
endpoints = [
'https://arb1.arbitrum.io/rpc',
'https://arbitrum.publicnode.com',
'https://1rpc.io/arb',
'https://rpc.ankr.com/arbitrum'
]
if self.infura_key:
endpoints.insert(0, f'https://arbitrum-mainnet.infura.io/v3/{self.infura_key}')
return endpoints
def _get_optimism_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Optimism"""
endpoints = [
'https://mainnet.optimism.io',
'https://optimism.publicnode.com',
'https://1rpc.io/op',
'https://rpc.ankr.com/optimism'
]
if self.infura_key:
endpoints.insert(0, f'https://optimism-mainnet.infura.io/v3/{self.infura_key}')
return endpoints
def _get_avalanche_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Avalanche"""
return [
'https://api.avax.network/ext/bc/C/rpc',
'https://avalanche-c-chain.publicnode.com',
'https://1rpc.io/avax/c',
'https://rpc.ankr.com/avalanche'
]
def _get_fantom_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Fantom"""
return [
'https://rpc.ftm.tools/',
'https://fantom.publicnode.com',
'https://1rpc.io/ftm',
'https://rpc.ankr.com/fantom'
]
def _get_solana_rpc_endpoints(self):
"""الحصول على نقاط RPC لشبكة Solana"""
return [
'https://api.mainnet-beta.solana.com',
'https://solana-api.projectserum.com',
'https://rpc.ankr.com/solana',
'https://solana.public-rpc.com',
'https://ssc-dao.genesysgo.net'
]
def _initialize_comprehensive_exchange_addresses(self):
"""تهيئة قاعدة بيانات شاملة لعناوين كل المنصات"""
self.exchange_addresses = {
'kucoin': [
'0x2b5634c42055806a59e9107ed44d43c426e58258',
'0x689c56aef474df92d44a1b70850f808488f9769c',
'0xa1d8d972560c2f8144af871db508f0b0b10a3fbf',
'0x4ad64983349c49defe8d7a4686202d24b25d0ce8',
'0x1692e170361cefd1eb7240ec13d048fd9af6d667',
'0xd6216fc19db775df9774a6e33526131da7d19a2c',
'0xe59cd29be3be4461d79c0881a238c467a2b3775c',
'0x899b5d52671830f567bf43a14684eb14e1f945fe'
],
'binance': [
'0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be',
'0xd551234ae421e3bcba99a0da6d736074f22192ff',
'0x564286362092d8e7936f0549571a803b203aaced',
'0x0681d8db095565fe8a346fa0277bffde9c0edbbf',
'0xfe9e8709d3215310075d67e3ed32a380ccf451c8'
],
'coinbase': [
'0x71660c4005ba85c37ccec55d0c4493e66fe775d3',
'0x503828976d22510aad0201ac7ec88293211d23da',
'0xddfabcdc4d8ffc6d5beaf154f18b778f892a0740'
],
'kraken': [
'0x2910543af39aba0cd09dbb2d50200b3e800a63d2',
'0xa160cdab225685da1d56aa342ad8841c3b53f291'
],
'okx': [
'0x6cc5f688a315f3dc28a7781717a9a798a59fda7b',
'0x2c8fbb630289363ac80705a1a61273f76fd5a161'
],
'uniswap': [
'0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad',
'0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45'
],
'pancakeswap': [
'0x10ed43c718714eb63d5aa57b78b54704e256024e',
'0x13f4ea83d0bd40e75c8222255bc855a974568dd4'
],
'solana_kucoin': [
'F3a5ZLPKUCrCj6CGKP2m9wS9Y2L8RcUBoJq9L2D2sUYi',
'8Wq7Z6Z1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t'
],
'solana_binance': [
'5tzFkiKscXHK5ZXCGbXZJpXaWuBtZ6RrH8eL2a7Yi7Vn',
'8Wq7Z6Z1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t'
],
'solana_coinbase': [
'H8q7Z6Z1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t',
'G8q7Z6Z1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t1J7t'
]
}
# تحميل جميع العناوين في التصنيفات
self.address_categories = {
'exchange': set(),
'cex': set(),
'dex': set(),
'whale': set(),
'unknown': set()
}
for exchange_type, addresses in self.exchange_addresses.items():
for address in addresses:
self.address_labels[address.lower()] = exchange_type
if exchange_type in ['binance', 'kucoin', 'coinbase', 'kraken', 'okx', 'solana_kucoin', 'solana_binance', 'solana_coinbase']:
self.address_categories['cex'].add(address.lower())
else:
self.address_categories['dex'].add(address.lower())
self.address_categories['exchange'].add(address.lower())
async def _load_contracts_from_r2(self):
"""تحميل قاعدة بيانات العقود من R2"""
try:
key = "contracts.json"
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
contracts_data = json.loads(response['Body'].read())
self.contracts_db.update(contracts_data)
print(f"✅ تم تحميل {len(contracts_data)} عقد من R2")
except Exception as e:
print(f"⚠️ لم يتم العثور على قاعدة بيانات العقود في R2: {e}")
async def get_symbol_whale_activity(self, symbol, contract_address=None):
"""
مراقبة تحركات الحيتان لعملة محددة مع معالجة العملات الأصلية
"""
try:
print(f"🔍 بدء مراقبة الحيتان المتقدمة للعملة: {symbol}")
# فحص إذا كانت العملة أصلية (مثل BTC, LTC, ETH) وليس لها token
native_coins = ['BTC', 'LTC', 'ETH', 'BNB', 'ADA', 'DOT', 'XRP', 'DOGE', 'BCH', 'XLM', 'TRX', 'EOS', 'XMR']
base_symbol = symbol.split("/")[0] if '/' in symbol else symbol
if base_symbol in native_coins:
print(f"ℹ️ {symbol} عملة أصلية - لا توجد بيانات حيتان للتوكنات")
return self._create_native_coin_response(symbol)
# 1. البحث عن عقد العملة إذا لم يتم توفيره
if not contract_address:
contract_address = await self._find_contract_address_enhanced(symbol)
if not contract_address:
print(f"❌ لم يتم العثور على عقد للعملة {symbol}")
return self._create_no_contract_response(symbol)
# 2. تحديد الشبكة المناسبة فقط
network = self.symbol_networks.get(symbol.lower())
if not network:
print(f"❌ لم يتم تحديد شبكة للعملة {symbol}")
return self._create_no_contract_response(symbol)
print(f"🌐 البحث في الشبكة المحددة: {network}")
# 3. جلب بيانات التحويلات من الشبكة المحددة فقط
transfers = await self._get_targeted_transfer_data(contract_address, network)
if not transfers:
print(f"⚠️ لم يتم العثور على تحويلات للعملة {symbol} على شبكة {network}")
return self._create_no_transfers_response(symbol)
# 4. تحليل التحركات الحديثة فقط (آخر 120 دقيقة)
recent_transfers = self._filter_recent_transfers(transfers, max_minutes=120)
if not recent_transfers:
print(f"⚠️ لا توجد تحويلات حديثة للعملة {symbol}")
return self._create_no_recent_activity_response(symbol)
# 5. تحليل تأثير الحيتان المحسن
analysis = self._analyze_enhanced_whale_impact(recent_transfers, symbol)
# 6. طباعة ملخص مفصل
self._print_detailed_whale_summary(symbol, analysis)
return analysis
except Exception as e:
print(f"❌ خطأ في مراقبة الحيتان للعملة {symbol}: {e}")
traceback.print_exc()
return self._create_error_response(symbol, str(e))
def _create_native_coin_response(self, symbol):
"""إنشاء رد للعملات الأصلية التي ليس لها tokens"""
return {
'symbol': symbol,
'data_available': False,
'error': 'NATIVE_COIN_NO_TOKEN',
'trading_signal': {
'action': 'HOLD',
'confidence': 0.5,
'reason': f'{symbol} عملة أصلية - لا توجد بيانات حيتان للتوكنات'
},
'llm_friendly_summary': {
'whale_activity_summary': f'{symbol} عملة أصلية - نظام مراقبة الحيتان الحالي مصمم للتوكنات فقط',
'recommended_action': 'HOLD',
'confidence': 0.5,
'key_metrics': {'whale_movement_impact': 'NOT_APPLICABLE'}
}
}
async def _get_targeted_transfer_data(self, contract_address, network):
"""جلب بيانات التحويلات من الشبكة المحددة فقط"""
print(f"🌐 جلب بيانات التحويلات من شبكة {network} للعقد: {contract_address}")
# التأكد من أن contract_address هو نصي
if isinstance(contract_address, dict):
# استخراج العنوان للشبكة المحددة
contract_address = contract_address.get(network)
if not contract_address:
print(f"❌ لم يتم العثور على عنوان للشبكة {network} في {contract_address}")
return []
if not isinstance(contract_address, str):
print(f"❌ عنوان العقد غير صالح: {type(contract_address)}")
return []
# جلب البيانات من الشبكة المحددة فقط
if network in self.supported_networks:
net_config = self.supported_networks[network]
if net_config['type'] == 'evm':
return await self._get_evm_network_transfer_data(contract_address, network)
elif net_config['type'] == 'solana':
return await self._get_solana_transfer_data(contract_address, network)
return []
async def _get_evm_network_transfer_data(self, contract_address, network):
"""جلب بيانات التحويلات من شبكة EVM محددة"""
try:
transfers = []
# المحاولة عبر RPC أولاً
rpc_transfers = await self._get_rpc_token_data_targeted(contract_address, network)
transfers.extend(rpc_transfers)
# المحاولة عبر Explorer إذا كان متوفراً
explorer_transfers = await self._get_explorer_token_data(contract_address, network)
transfers.extend(explorer_transfers)
return transfers
except Exception as e:
print(f"❌ خطأ في جلب بيانات {network}: {e}")
return []
async def _get_rpc_token_data_targeted(self, contract_address, network='ethereum', blocks=50):
"""جلب بيانات التحويلات عبر RPC للشبكة المحددة فقط"""
try:
if network not in self.supported_networks or self.supported_networks[network]['type'] != 'evm':
return []
# التأكد من أن contract_address هو نصي
if not isinstance(contract_address, str):
print(f"⚠️ contract_address ليس نصي في {network}: {type(contract_address)}")
return []
endpoints = self.supported_networks[network]['rpc_endpoints']
transfers = []
for endpoint in endpoints:
try:
endpoint_name = endpoint.split('//')[1].split('/')[0] if '//' in endpoint else endpoint
print(f" 🔍 فحص {blocks} كتلة في {network} عبر {endpoint_name}")
# الحصول على أحدث كتلة
payload = {"jsonrpc": "2.0", "method": "eth_blockNumber", "params": [], "id": 1}
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(endpoint, json=payload)
if response.status_code != 200:
continue
result = response.json().get('result')
if not result:
continue
latest_block = int(result, 16)
print(f" 📦 أحدث كتلة في {network}: {latest_block}")
# فحص آخر `blocks` كتلة
successful_blocks = 0
for block_offset in range(blocks):
block_number = latest_block - block_offset
if block_number < 0:
break
payload = {
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": [hex(block_number), True],
"id": 1
}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(endpoint, json=payload)
if response.status_code != 200:
continue
block_data = response.json().get('result')
if not block_data:
continue
successful_blocks += 1
# فحص جميع المعاملات في الكتلة
for tx in block_data.get('transactions', []):
tx_to = tx.get('to', '')
if not isinstance(tx_to, str):
continue
# فحص إذا كانت المعاملة مرسلة إلى عقد العملة
if (tx_to.lower() == contract_address.lower() and
tx.get('input', '0x') != '0x'):
transfers.append({
'hash': tx.get('hash'),
'from': tx.get('from'),
'to': tx.get('to'),
'value': str(int(tx.get('value', '0x0'), 16)),
'timeStamp': str(int(block_data.get('timestamp', '0x0'), 16)),
'blockNumber': str(block_number),
'network': network
})
except Exception as block_error:
continue
print(f" ✅ {network}: تم فحص {successful_blocks} كتلة بنجاح، تم العثور على {len([t for t in transfers if t['network'] == network])} تحويلة")
if successful_blocks > 0:
break # إذا نجح endpoint واحد، نتوقف
except Exception as e:
print(f" ❌ فشل endpoint في {network}: {e}")
continue
return transfers
except Exception as e:
print(f"❌ فشل جلب بيانات RPC لـ {network}: {e}")
return []
async def _get_solana_transfer_data(self, token_address, network='solana', limit=50):
"""جلب بيانات التحويلات من شبكة Solana"""
try:
print(f" 🔍 جلب تحويلات Solana للتوكن: {token_address}")
transfers = []
for endpoint in self.supported_networks[network]['rpc_endpoints']:
try:
# جلب تحويلات التوكن من Solana
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "getSignaturesForAddress",
"params": [
token_address,
{"limit": limit}
]
}
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(endpoint, json=payload)
if response.status_code != 200:
continue
data = response.json()
if 'result' not in data:
continue
signatures = [tx['signature'] for tx in data['result']]
# جلب تفاصيل كل معاملة
for signature in signatures[:10]: # الحد لتحسين الأداء
try:
tx_detail = await self._get_solana_transaction_detail(signature, endpoint)
if tx_detail and self._is_solana_token_transfer(tx_detail, token_address):
transfer = await self._parse_solana_transfer(tx_detail, token_address)
if transfer:
transfers.append(transfer)
except Exception as e:
continue
print(f" ✅ Solana: تم العثور على {len(transfers)} تحويلة")
break # إذا نجح endpoint واحد، نتوقف
except Exception as e:
print(f" ❌ فشل endpoint في Solana: {e}")
continue
return transfers
except Exception as e:
print(f"❌ فشل جلب بيانات Solana: {e}")
return []
async def _get_solana_transaction_detail(self, signature, endpoint):
"""جلب تفاصيل معاملة Solana"""
try:
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "getTransaction",
"params": [
signature,
{"encoding": "jsonParsed"}
]
}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(endpoint, json=payload)
if response.status_code == 200:
return response.json().get('result')
return None
except Exception as e:
return None
def _is_solana_token_transfer(self, transaction, token_address):
"""التحقق إذا كانت المعاملة تحويل توكن في Solana"""
try:
if not transaction or 'meta' not in transaction:
return False
meta = transaction['meta']
if 'postTokenBalances' not in meta or 'preTokenBalances' not in meta:
return False
# التحقق من وجود تغييرات في أرصدة التوكن المحدد
for balance in meta['postTokenBalances'] + meta['preTokenBalances']:
if balance.get('mint') == token_address:
return True
return False
except Exception:
return False
async def _parse_solana_transfer(self, transaction, token_address):
"""تحليل معاملة Solana لاستخراج بيانات التحويل"""
try:
# استخراج المعلومات الأساسية
signature = transaction['transaction']['signatures'][0]
block_time = transaction.get('blockTime', int(time.time()))
# استخراج المرسل والمستلم من تغييرات الأرصدة
sender = ""
receiver = ""
amount = 0
meta = transaction['meta']
pre_balances = {bal['owner']: bal for bal in meta.get('preTokenBalances', []) if bal.get('mint') == token_address}
post_balances = {bal['owner']: bal for bal in meta.get('postTokenBalances', []) if bal.get('mint') == token_address}
for owner in set(list(pre_balances.keys()) + list(post_balances.keys())):
pre_balance = pre_balances.get(owner, {}).get('uiTokenAmount', {}).get('uiAmount', 0)
post_balance = post_balances.get(owner, {}).get('uiTokenAmount', {}).get('uiAmount', 0)
if post_balance > pre_balance:
receiver = owner
amount = post_balance - pre_balance
elif pre_balance > post_balance:
sender = owner
return {
'hash': signature,
'from': sender,
'to': receiver,
'value': str(amount),
'timeStamp': str(block_time),
'blockNumber': str(transaction.get('slot', 0)),
'network': 'solana',
'type': 'solana_transfer'
}
except Exception as e:
print(f"❌ خطأ في تحليل معاملة Solana: {e}")
return None
async def _get_explorer_token_data(self, contract_address, network):
"""جلب بيانات التحويلات من Explorer"""
if network not in self.supported_networks:
return []
try:
# استخدام Etherscan فقط للشبكات المدعومة
explorer_config = {
'ethereum': {'url': 'https://api.etherscan.io/api', 'key': self.etherscan_key},
}
if network not in explorer_config or not explorer_config[network]['key']:
return []
config = explorer_config[network]
params = {
"module": "account",
"action": "tokentx",
"contractaddress": contract_address,
"page": 1,
"offset": 50, # تقليل العدد لتحسين الأداء
"sort": "desc",
"apikey": config['key']
}
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.get(config['url'], params=params)
if response.status_code == 200:
data = response.json()
if data.get('status') == '1' and data.get('message') == 'OK':
transfers = data.get('result', [])
# إضافة معلومات الشبكة
for transfer in transfers:
transfer['network'] = network
return transfers
else:
print(f"⚠️ Explorer {network} returned error: {data.get('message')}")
return []
else:
print(f"⚠️ Explorer {network} request failed: {response.status_code}")
return []
except Exception as e:
print(f"❌ فشل جلب بيانات Explorer لـ {network}: {e}")
return []
def _filter_recent_transfers(self, transfers, max_minutes=120):
"""ترشيح التحويلات الحديثة فقط"""
recent_transfers = []
current_time = datetime.now()
for transfer in transfers:
try:
if 'timeStamp' in transfer:
timestamp = int(transfer['timeStamp'])
elif 'block_timestamp' in transfer:
timestamp = int(transfer['block_timestamp'])
else:
continue
transfer_time = datetime.fromtimestamp(timestamp)
time_diff = current_time - transfer_time
if time_diff.total_seconds() <= max_minutes * 60:
transfer['minutes_ago'] = time_diff.total_seconds() / 60
transfer['human_time'] = transfer_time.isoformat()
recent_transfers.append(transfer)
except Exception as e:
continue
return recent_transfers
def _analyze_enhanced_whale_impact(self, transfers, symbol):
"""تحليل تأثير تحركات الحيتان المحسن"""
# تحليل التدفقات إلى/من المنصات
exchange_flows = {
'to_exchanges': [],
'from_exchanges': [],
'other_transfers': [],
'network_breakdown': defaultdict(lambda: {'to_exchanges': 0, 'from_exchanges': 0, 'other': 0})
}
total_volume = 0
whale_transfers = 0
network_stats = defaultdict(int)
for transfer in transfers:
from_addr = transfer.get('from', '')
to_addr = transfer.get('to', '')
value = float(transfer.get('value', 0))
network = transfer.get('network', 'unknown')
# التأكد من أن العناوين نصوص
if not isinstance(from_addr, str):
from_addr = str(from_addr)
if not isinstance(to_addr, str):
to_addr = str(to_addr)
# تحويل القيمة إلى دولار (تقديري)
value_usd = value / 1e18 * 1000 # تقدير - يحتاج لدمج مع سعر العملة
total_volume += value_usd
network_stats[network] += 1
if value_usd >= self.whale_threshold_usd:
whale_transfers += 1
# تصنيف اتجاه التدفق
if self._is_exchange_address(to_addr):
exchange_flows['to_exchanges'].append({
'value_usd': value_usd,
'from_type': self._classify_address(from_addr),
'to_exchange': self._classify_address(to_addr),
'transaction': transfer,
'network': network
})
exchange_flows['network_breakdown'][network]['to_exchanges'] += 1
elif self._is_exchange_address(from_addr):
exchange_flows['from_exchanges'].append({
'value_usd': value_usd,
'from_exchange': self._classify_address(from_addr),
'to_type': self._classify_address(to_addr),
'transaction': transfer,
'network': network
})
exchange_flows['network_breakdown'][network]['from_exchanges'] += 1
else:
exchange_flows['other_transfers'].append(transfer)
exchange_flows['network_breakdown'][network]['other'] += 1
# حساب الإحصائيات
total_to_exchanges = sum(t['value_usd'] for t in exchange_flows['to_exchanges'])
total_from_exchanges = sum(t['value_usd'] for t in exchange_flows['from_exchanges'])
net_flow = total_to_exchanges - total_from_exchanges
# توليد الإشارة المحسنة
signal = self._generate_enhanced_trading_signal(
total_to_exchanges,
total_from_exchanges,
net_flow,
len(exchange_flows['to_exchanges']),
len(exchange_flows['from_exchanges']),
whale_transfers,
network_stats
)
return {
'symbol': symbol,
'data_available': True,
'analysis_timestamp': datetime.now().isoformat(),
'summary': {
'total_transfers_analyzed': len(transfers),
'whale_transfers_count': whale_transfers,
'total_volume_usd': total_volume,
'time_window_minutes': 120,
'networks_analyzed': dict(network_stats)
},
'exchange_flows': {
'to_exchanges_usd': total_to_exchanges,
'from_exchanges_usd': total_from_exchanges,
'net_flow_usd': net_flow,
'deposit_count': len(exchange_flows['to_exchanges']),
'withdrawal_count': len(exchange_flows['from_exchanges']),
'network_breakdown': dict(exchange_flows['network_breakdown'])
},
'trading_signal': signal,
'llm_friendly_summary': self._create_enhanced_llm_summary(signal, exchange_flows, network_stats)
}
def _generate_enhanced_trading_signal(self, to_exchanges, from_exchanges, net_flow, deposit_count, withdrawal_count, whale_count, network_stats):
"""توليد إشارة تداول محسنة بناء على تحركات الحيتان"""
# حساب قوة الإشارة بناء على عدد الشبكات
network_strength = min(len(network_stats) / 3, 1.0)
if net_flow > 500000 and deposit_count >= 2:
confidence = min(0.95, (net_flow / 1000000) * (1 + network_strength * 0.2))
return {
'action': 'STRONG_SELL',
'confidence': confidence,
'reason': f'ضغط بيعي قوي عبر {len(network_stats)} شبكة: ${net_flow:,.0f} إيداع إلى المنصات',
'critical_alert': net_flow > 1000000
}
elif net_flow > 250000 and deposit_count >= 1:
confidence = min(0.80, (net_flow / 500000) * (1 + network_strength * 0.1))
return {
'action': 'SELL',
'confidence': confidence,
'reason': f'ضغط بيعي عبر {len(network_stats)} شبكة: ${net_flow:,.0f} إيداع إلى المنصات',
'critical_alert': False
}
elif net_flow < -500000 and withdrawal_count >= 2:
confidence = min(0.95, (abs(net_flow) / 1000000) * (1 + network_strength * 0.2))
return {
'action': 'STRONG_BUY',
'confidence': confidence,
'reason': f'تراكم شرائي قوي عبر {len(network_stats)} شبكة: ${abs(net_flow):,.0f} سحب من المنصات',
'critical_alert': abs(net_flow) > 1000000
}
elif net_flow < -250000 and withdrawal_count >= 1:
confidence = min(0.80, (abs(net_flow) / 500000) * (1 + network_strength * 0.1))
return {
'action': 'BUY',
'confidence': confidence,
'reason': f'تراكم شرائي عبر {len(network_stats)} شبكة: ${abs(net_flow):,.0f} سحب من المنصات',
'critical_alert': False
}
else:
return {
'action': 'HOLD',
'confidence': 0.5 + (network_strength * 0.1),
'reason': f'نشاط حيتان طبيعي عبر {len(network_stats)} شبكة: {whale_count} تحويلة كبيرة',
'critical_alert': False
}
def _create_enhanced_llm_summary(self, signal, exchange_flows, network_stats):
"""إنشاء ملخص محسن للنموذج الضخم"""
return {
'whale_activity_summary': signal['reason'],
'recommended_action': signal['action'],
'confidence': signal['confidence'],
'key_metrics': {
'net_flow_direction': 'TO_EXCHANGES' if signal['action'] in ['SELL', 'STRONG_SELL'] else 'FROM_EXCHANGES' if signal['action'] in ['BUY', 'STRONG_BUY'] else 'BALANCED',
'whale_movement_impact': 'HIGH' if signal['confidence'] > 0.8 else 'MEDIUM' if signal['confidence'] > 0.6 else 'LOW',
'exchange_involvement': 'HIGH' if len(exchange_flows['to_exchanges']) + len(exchange_flows['from_exchanges']) > 5 else 'MEDIUM' if len(exchange_flows['to_exchanges']) + len(exchange_flows['from_exchanges']) > 2 else 'LOW',
'network_coverage': f"{len(network_stats)} شبكات",
'total_networks_analyzed': len(network_stats)
}
}
def _print_detailed_whale_summary(self, symbol, analysis):
"""طباعة ملخص مفصل لتحركات الحيتان"""
summary = analysis['summary']
signal = analysis['trading_signal']
flows = analysis['exchange_flows']
print(f"🐋 ملخص الحيتان المحسن لـ {symbol}:")
print(f" • التحويلات الكبيرة: {summary['whale_transfers_count']}")
print(f" • الشبكات التي تم تحليلها: {', '.join(summary['networks_analyzed'].keys())}")
print(f" • الإيداعات للمنصات: {flows['deposit_count']} (${flows['to_exchanges_usd']:,.0f})")
print(f" • السحوبات من المنصات: {flows['withdrawal_count']} (${flows['from_exchanges_usd']:,.0f})")
print(f" • صافي التدفق: ${flows['net_flow_usd']:,.0f}")
print(f" • الإشارة: {signal['action']} (ثقة: {signal['confidence']:.2f})")
print(f" • السبب: {signal['reason']}")
# طباعة تفصيل الشبكات
for network, stats in flows['network_breakdown'].items():
print(f" • {network}: إيداعات={stats['to_exchanges']}, سحوبات={stats['from_exchanges']}")
async def _find_contract_address_enhanced(self, symbol):
"""بحث متقدم عن عقد العملة مع تحديد الشبكة المناسبة"""
base_symbol = symbol.split("/")[0] if '/' in symbol else symbol
symbol_lower = base_symbol.lower()
# 1. البحث في الكاش أولاً
if symbol_lower in self.contract_cache:
return self.contract_cache[symbol_lower]
# 2. البحث في قاعدة البيانات المحلية
for key, address in self.contracts_db.items():
if symbol_lower in key.lower():
self.contract_cache[symbol_lower] = address
return address
# 3. البحث في CoinGecko مع تحديد الشبكة
print(f"🔍 البحث عن عقد {symbol} في CoinGecko...")
coingecko_result = await self._find_contract_via_coingecko(base_symbol)
if coingecko_result:
address, network = coingecko_result
self.contract_cache[symbol_lower] = address
self.symbol_networks[symbol_lower] = network
self.contracts_db[symbol_lower] = address
if self.r2_service:
await self._save_contracts_to_r2()
print(f"✅ تم العثور على عقد {symbol} عبر CoinGecko على شبكة {network}")
return address
return None
async def _find_contract_via_coingecko(self, symbol):
"""البحث عن عقد العملة عبر CoinGecko مع تحديد الشبكة"""
try:
search_url = f"https://api.coingecko.com/api/v3/search?query={symbol}"
async with httpx.AsyncClient(timeout=15) as client:
response = await client.get(search_url)
if response.status_code != 200:
return None
data = response.json()
coins = data.get('coins', [])
if not coins:
return None
# البحث عن أفضل تطابق
best_coin = None
for coin in coins:
if coin.get('symbol', '').lower() == symbol.lower() or coin.get('name', '').lower() == symbol.lower():
best_coin = coin
break
if not best_coin:
best_coin = coins[0]
coin_id = best_coin.get('id')
if not coin_id:
return None
detail_url = f"https://api.coingecko.com/api/v3/coins/{coin_id}"
detail_response = await client.get(detail_url)
if detail_response.status_code != 200:
return None
detail_data = detail_response.json()
platforms = detail_data.get('platforms', {})
network_priority = ['ethereum', 'binance-smart-chain', 'polygon-pos', 'arbitrum-one', 'optimistic-ethereum', 'avalanche', 'fantom', 'solana']
for platform in network_priority:
if platform in platforms and platforms[platform]:
address = platforms[platform]
if address:
network_map = {
'ethereum': 'ethereum',
'binance-smart-chain': 'bsc',
'polygon-pos': 'polygon',
'arbitrum-one': 'arbitrum',
'optimistic-ethereum': 'optimism',
'avalanche': 'avalanche',
'fantom': 'fantom',
'solana': 'solana'
}
network = network_map.get(platform, 'ethereum')
return address, network
return None
except Exception as e:
print(f"❌ فشل البحث في CoinGecko: {e}")
return None
def _is_exchange_address(self, address):
"""التحقق إذا كان العنوان ينتمي إلى منصة"""
try:
if not isinstance(address, str):
address = str(address)
address_lower = address.lower()
return address_lower in self.address_categories['exchange']
except Exception as e:
return False
def _classify_address(self, address):
"""تصنيف العنوان إلى نوع محدد"""
try:
if not isinstance(address, str):
address = str(address)
address_lower = address.lower()
if address_lower in self.address_labels:
return self.address_labels[address_lower]
return 'unknown'
except Exception as e:
return 'unknown'
def _create_no_contract_response(self, symbol):
return {
'symbol': symbol,
'data_available': False,
'error': 'NO_CONTRACT_FOUND',
'trading_signal': {
'action': 'HOLD',
'confidence': 0.3,
'reason': 'لم يتم العثور على عقد العملة'
},
'llm_friendly_summary': {
'whale_activity_summary': 'لا توجد بيانات عن تحركات الحيتان - لم يتم العثور على عقد العملة',
'recommended_action': 'HOLD',
'confidence': 0.3,
'key_metrics': {'whale_movement_impact': 'NO_DATA'}
}
}
def _create_no_transfers_response(self, symbol):
return {
'symbol': symbol,
'data_available': False,
'error': 'NO_TRANSFERS_FOUND',
'trading_signal': {
'action': 'HOLD',
'confidence': 0.5,
'reason': 'لا توجد تحويلات للعملة'
},
'llm_friendly_summary': {
'whale_activity_summary': 'لا توجد تحويلات حديثة للعملة',
'recommended_action': 'HOLD',
'confidence': 0.5,
'key_metrics': {'whale_movement_impact': 'NO_DATA'}
}
}
def _create_no_recent_activity_response(self, symbol):
return {
'symbol': symbol,
'data_available': False,
'error': 'NO_RECENT_ACTIVITY',
'trading_signal': {
'action': 'HOLD',
'confidence': 0.5,
'reason': 'لا توجد تحويلات حديثة للعملة'
},
'llm_friendly_summary': {
'whale_activity_summary': 'لا توجد تحويلات حيتان حديثة في آخر 120 دقيقة',
'recommended_action': 'HOLD',
'confidence': 0.5,
'key_metrics': {'whale_movement_impact': 'LOW'}
}
}
def _create_error_response(self, symbol, error_msg):
return {
'symbol': symbol,
'data_available': False,
'error': f'ANALYSIS_ERROR: {error_msg}',
'trading_signal': {
'action': 'HOLD',
'confidence': 0.3,
'reason': f'خطأ في تحليل الحيتان: {error_msg}'
},
'llm_friendly_summary': {
'whale_activity_summary': f'فشل في تحليل تحركات الحيتان: {error_msg}',
'recommended_action': 'HOLD',
'confidence': 0.3,
'key_metrics': {'whale_movement_impact': 'ERROR'}
}
}
async def _save_contracts_to_r2(self):
"""حفظ قاعدة البيانات العقود إلى R2"""
try:
key = "contracts.json"
data_json = json.dumps(self.contracts_db, indent=2).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
print(f"✅ تم حفظ قاعدة بيانات العقود إلى R2")
except Exception as e:
print(f"❌ فشل حفظ قاعدة البيانات العقود: {e}")
def _calculate_whale_activity_score(self, whale_data):
"""حساب درجة نشاط الحيتان"""
try:
if not whale_data or not whale_data.get('data_available', False):
return 0.0
signal = whale_data.get('trading_signal', {})
confidence = signal.get('confidence', 0.0)
action = signal.get('action', 'HOLD')
# تعيين أوزان بناءً على نوع الإجراء
action_weights = {
'STRONG_BUY': 1.0,
'BUY': 0.7,
'STRONG_SELL': -1.0,
'SELL': -0.7,
'HOLD': 0.0
}
base_score = action_weights.get(action, 0.0)
weighted_score = base_score * confidence
# تطبيع النتيجة بين 0 و 1
normalized_score = (weighted_score + 1) / 2
return max(0.0, min(1.0, normalized_score))
except Exception as e:
print(f"❌ خطأ في حساب درجة نشاط الحيتان: {e}")
return 0.0
async def generate_whale_trading_signal(self, symbol, whale_data, market_context):
"""توليد إشارة تداول بناءً على بيانات الحيتان"""
try:
if not whale_data or not whale_data.get('data_available', False):
return {
'action': 'HOLD',
'confidence': 0.3,
'reason': 'لا توجد بيانات كافية عن نشاط الحيتان',
'source': 'whale_analysis'
}
whale_signal = whale_data.get('trading_signal', {})
return {
'action': whale_signal.get('action', 'HOLD'),
'confidence': whale_signal.get('confidence', 0.5),
'reason': whale_signal.get('reason', 'تحليل الحيتان غير متوفر'),
'source': 'whale_analysis',
'critical_alert': whale_signal.get('critical_alert', False),
'whale_activity_score': self._calculate_whale_activity_score(whale_data)
}
except Exception as e:
print(f"❌ خطأ في توليد إشارة تداول الحيتان لـ {symbol}: {e}")
return {
'action': 'HOLD',
'confidence': 0.3,
'reason': f'خطأ في تحليل الحيتان: {str(e)}',
'source': 'whale_analysis'
}
async def cleanup(self):
await self.http_client.aclose()
print("✅ EnhancedWhaleMonitor loaded - Advanced targeted whale monitoring ready")