Trad / whale_news_data.py
Riy777's picture
Update whale_news_data.py
12a2d47
raw
history blame
45.9 kB
import os
import asyncio
import httpx
import json
import traceback
import time
import math
from datetime import datetime, timedelta
from collections import defaultdict, deque
import ccxt.pro as ccxt
import numpy as np
class EnhancedWhaleMonitor:
def __init__(self, contracts_db=None):
self.http_client = httpx.AsyncClient(timeout=15.0, limits=httpx.Limits(max_connections=50, max_keepalive_connections=10))
self.moralis_key = os.getenv("MORALIS_KEY")
self.etherscan_key = os.getenv("ETHERSCAN_KEY")
self.infura_key = os.getenv("INFURA_KEY")
self.whale_threshold_usd = 100000
self.contracts_db = contracts_db or {}
self.address_labels = {}
self._initialize_dynamic_labels()
self.netflow_data = {
'ethereum': defaultdict(lambda: {
'inflow': deque(maxlen=288),
'outflow': deque(maxlen=288),
'netflow': deque(maxlen=288),
'timestamps': deque(maxlen=288),
'volume_24h': 0
}),
'bsc': defaultdict(lambda: {
'inflow': deque(maxlen=288),
'outflow': deque(maxlen=288),
'netflow': deque(maxlen=288),
'timestamps': deque(maxlen=288),
'volume_24h': 0
})
}
self.api_usage_stats = {
'etherscan': {
'requests_today': 0,
'requests_per_second': 0,
'last_request_time': time.time(),
'last_reset': datetime.now().date()
},
'infura': {
'requests_today': 0,
'requests_per_second': 0,
'last_request_time': time.time(),
'last_reset': datetime.now().date()
}
}
self.rpc_endpoints = {
'ethereum': [
'https://rpc.ankr.com/eth',
'https://cloudflare-eth.com',
'https://eth.llamarpc.com'
],
'bsc': [
'https://bsc-dataseed.binance.org/',
'https://bsc-dataseed1.defibit.io/',
'https://bsc-dataseed1.ninicoin.io/'
]
}
if self.infura_key:
infura_endpoint = f"https://mainnet.infura.io/v3/{self.infura_key}"
self.rpc_endpoints['ethereum'].insert(0, infura_endpoint)
print(f"✅ تم تكوين Infura بنجاح - الشبكة: Ethereum")
self.current_rpc_index = {network: 0 for network in self.rpc_endpoints.keys()}
self.rpc_failures = {network: 0 for network in self.rpc_endpoints.keys()}
self.price_cache = {}
self.last_scan_time = {}
self.kucoin_symbols = {
'ethereum': 'ETH',
'bsc': 'BNB'
}
# نظام تسجيل أداء أنماط الحيتان
self.pattern_performance = {}
self.pattern_success_rates = {}
print("🎯 نظام تتبع الحيتان المحسن - تحليل صافي التدفق الذكي مفعل")
def _initialize_dynamic_labels(self):
self.address_categories = {
'exchange': set(),
'cex': set(),
'dex': set(),
'institution': set(),
'whale': set(),
'contract': set(),
'unknown': set()
}
self.exchange_patterns = {
'binance': ['0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be', '0xd551234ae421e3bcba99a0da6d736074f22192ff'],
'coinbase': ['0x71660c4005ba85c37ccec55d0c4493e66fe775d3', '0x503828976d22510aad0201ac7ec88293211d23da'],
'kraken': ['0x2910543af39aba0cd09dbb2d50200b3e800a63d2', '0xa160cdab225685da1d56aa342ad8841c3b53f291'],
'kucoin': ['0x2b5634c42055806a59e9107ed44d43c426e58258', '0x689c56aef474df92d44a1b70850f808488f9769c']
}
self._load_initial_exchange_addresses()
def _load_initial_exchange_addresses(self):
for exchange, addresses in self.exchange_patterns.items():
for address in addresses:
self.address_labels[address.lower()] = 'cex'
self.address_categories['cex'].add(address.lower())
self.address_categories['exchange'].add(address.lower())
def _classify_address_dynamic(self, address, transaction_history=None):
address_lower = address.lower()
if address_lower in self.address_labels:
return self.address_labels[address_lower]
if transaction_history:
if self._detect_exchange_pattern(transaction_history):
self.address_labels[address_lower] = 'suspected_cex'
self.address_categories['cex'].add(address_lower)
return 'suspected_cex'
if self._detect_whale_pattern(transaction_history):
self.address_labels[address_lower] = 'suspected_whale'
self.address_categories['whale'].add(address_lower)
return 'suspected_whale'
if self._detect_contract_pattern(transaction_history):
self.address_labels[address_lower] = 'contract_user'
self.address_categories['contract'].add(address_lower)
return 'contract_user'
self.address_labels[address_lower] = 'unknown'
self.address_categories['unknown'].add(address_lower)
return 'unknown'
def _detect_exchange_pattern(self, transactions):
if len(transactions) < 10:
return False
unique_senders = set()
unique_receivers = set()
for tx in transactions[-20:]:
if 'from' in tx:
unique_senders.add(tx['from'])
if 'to' in tx:
unique_receivers.add(tx['to'])
if len(unique_senders) > 15 and len(unique_receivers) < 5:
return True
return False
def _detect_whale_pattern(self, transactions):
large_txs = [tx for tx in transactions if tx.get('value_usd', 0) > 100000]
return len(large_txs) >= 3
def _detect_contract_pattern(self, transactions):
contract_interactions = [tx for tx in transactions if tx.get('to', '') and len(tx.get('to', '')) == 42 and tx.get('input', '0x') != '0x']
return len(contract_interactions) > len(transactions) * 0.7
def _is_exchange_address(self, address):
address_lower = address.lower()
return (address_lower in self.address_categories['cex'] or
address_lower in self.address_categories['exchange'] or
self.address_labels.get(address_lower) in ['cex', 'suspected_cex'])
async def _update_netflow_metrics(self, network, token_symbol, from_address, to_address, value_usd, transaction_hash):
try:
from_label = self._classify_address_dynamic(from_address)
to_label = self._classify_address_dynamic(to_address)
if self._is_exchange_address(to_address):
if token_symbol not in self.netflow_data[network]:
self._initialize_token_metrics(network, token_symbol)
self.netflow_data[network][token_symbol]['inflow'].append(value_usd)
self.netflow_data[network][token_symbol]['timestamps'].append(datetime.now())
print(f"📥 تدفق إلى منصة: {value_usd:,.0f} USD ({token_symbol})")
if self._is_exchange_address(from_address):
if token_symbol not in self.netflow_data[network]:
self._initialize_token_metrics(network, token_symbol)
self.netflow_data[network][token_symbol]['outflow'].append(value_usd)
self.netflow_data[network][token_symbol]['timestamps'].append(datetime.now())
print(f"📤 تدفق من منصة: {value_usd:,.0f} USD ({token_symbol})")
if token_symbol in self.netflow_data[network]:
current_inflow = sum(list(self.netflow_data[network][token_symbol]['inflow'])[-12:])
current_outflow = sum(list(self.netflow_data[network][token_symbol]['outflow'])[-12:])
current_netflow = current_inflow - current_outflow
self.netflow_data[network][token_symbol]['netflow'].append(current_netflow)
except Exception as e:
print(f"⚠️ خطأ في تحديث مقاييس صافي التدفق: {e}")
def _initialize_token_metrics(self, network, token_symbol):
self.netflow_data[network][token_symbol] = {
'inflow': deque(maxlen=288),
'outflow': deque(maxlen=288),
'netflow': deque(maxlen=288),
'timestamps': deque(maxlen=288),
'volume_24h': 0
}
def _calculate_netflow_zscore(self, network, token_symbol, window_hours=24):
try:
if token_symbol not in self.netflow_data[network]:
return 0
data = self.netflow_data[network][token_symbol]
netflow_values = list(data['netflow'])
if len(netflow_values) < 10:
return 0
window_size = min(len(netflow_values), window_hours * 12)
recent_values = netflow_values[-window_size:]
if len(recent_values) < 5:
return 0
mean_val = np.mean(recent_values)
std_val = np.std(recent_values)
if std_val == 0:
return 0
current_netflow = recent_values[-1] if recent_values else 0
zscore = (current_netflow - mean_val) / std_val
return zscore
except Exception as e:
print(f"⚠️ خطأ في حساب Z-score: {e}")
return 0
def _generate_netflow_signal(self, network, token_symbol):
try:
if token_symbol not in self.netflow_data[network]:
return None
data = self.netflow_data[network][token_symbol]
netflow_values = list(data['netflow'])
if len(netflow_values) < 12:
return None
recent_inflow = sum(list(data['inflow'])[-12:])
recent_outflow = sum(list(data['outflow'])[-12:])
recent_netflow = recent_inflow - recent_outflow
zscore = self._calculate_netflow_zscore(network, token_symbol)
signal = {
'symbol': token_symbol,
'network': network,
'netflow_1h': recent_netflow,
'inflow_1h': recent_inflow,
'outflow_1h': recent_outflow,
'z_score': zscore,
'timestamp': datetime.now().isoformat()
}
if recent_netflow < -500000 and zscore < -2.5:
signal.update({
'action': 'STRONG_SELL',
'confidence': min(0.95, abs(zscore) / 3),
'reason': f'تدفق بيعي قوي: ${abs(recent_netflow):,.0f} إلى المنصات',
'critical_alert': abs(recent_netflow) > 1000000
})
return signal
elif recent_netflow < -100000 and zscore < -1.5:
signal.update({
'action': 'SELL',
'confidence': min(0.8, abs(zscore) / 2),
'reason': f'تدفق بيعي: ${abs(recent_netflow):,.0f} إلى المنصات',
'critical_alert': False
})
return signal
elif recent_netflow > 500000 and zscore > 2.5:
signal.update({
'action': 'STRONG_BUY',
'confidence': min(0.95, zscore / 3),
'reason': f'تدفق شرائي قوي: ${recent_netflow:,.0f} من المنصات',
'critical_alert': recent_netflow > 1000000
})
return signal
elif recent_netflow > 100000 and zscore > 1.5:
signal.update({
'action': 'BUY',
'confidence': min(0.8, zscore / 2),
'reason': f'تدفق شرائي: ${recent_netflow:,.0f} من المنصات',
'critical_alert': False
})
return signal
signal.update({
'action': 'HOLD',
'confidence': 0.5,
'reason': f'تدفق متوازن: ${recent_netflow:,.0f}',
'critical_alert': False
})
return signal
except Exception as e:
print(f"⚠️ خطأ في توليد إشارة التداول: {e}")
return None
async def _scan_single_evm_network(self, network):
whale_alerts = []
trading_signals = []
try:
# الحصول على السعر من DataManager بدلاً من هنا
price_usd = await self._get_native_price_from_external()
if price_usd is None:
print(f"⚠️ سعر {network} غير متوفر، تخطي المسح")
return [], []
latest_block_hex = await self._call_rpc_async(network, 'eth_blockNumber')
if not latest_block_hex:
return [], []
latest_block = int(latest_block_hex, 16)
blocks_to_scan = 15
scanned_blocks = 0
for block_offset in range(blocks_to_scan):
block_number = latest_block - block_offset
if block_number < 0:
break
block_data = await self._call_rpc_async(network, 'eth_getBlockByNumber', [hex(block_number), True])
if not block_data or 'transactions' not in block_data:
continue
scanned_blocks += 1
block_timestamp_hex = block_data.get('timestamp', '0x0')
block_timestamp = int(block_timestamp_hex, 16)
block_time = datetime.fromtimestamp(block_timestamp)
time_ago = datetime.now() - block_time
for tx in block_data.get('transactions', []):
value_wei = int(tx.get('value', '0x0'), 16)
if value_wei > 0:
value_native = value_wei / 1e18
value_usd = value_native * price_usd
if value_usd >= self.whale_threshold_usd:
from_address = tx.get('from', '')
to_address = tx.get('to', '')
tx_hash = tx.get('hash', '')
await self._update_netflow_metrics(network, 'NATIVE', from_address, to_address, value_usd, tx_hash)
from_label = self._classify_address_dynamic(from_address)
to_label = self._classify_address_dynamic(to_address)
whale_alerts.append({
'network': network,
'value_usd': value_usd,
'from': from_address,
'to': to_address,
'from_label': from_label,
'to_label': to_label,
'hash': tx_hash,
'block_number': block_number,
'timestamp': block_timestamp,
'human_time': block_time.isoformat(),
'minutes_ago': time_ago.total_seconds() / 60,
'transaction_type': 'native_transfer',
'flow_direction': 'TO_EXCHANGE' if self._is_exchange_address(to_address) else
'FROM_EXCHANGE' if self._is_exchange_address(from_address) else 'UNKNOWN'
})
if block_offset % 3 == 0:
await asyncio.sleep(0.1)
signal = self._generate_netflow_signal(network, 'NATIVE')
if signal:
trading_signals.append(signal)
print(f"✅ مسح {network}: {scanned_blocks} كتل، {len(whale_alerts)} تنبيهات، {len(trading_signals)} إشارات")
except Exception as e:
print(f"⚠️ خطأ في مسح شبكة {network}: {e}")
return whale_alerts, trading_signals
async def get_general_whale_activity(self):
print("🌊 بدء مراقبة الحيتان وتحليل صافي التدفق...")
try:
tasks = []
networks_to_scan = ['ethereum', 'bsc']
for network in networks_to_scan:
tasks.append(self._scan_single_evm_network(network))
results = await asyncio.gather(*tasks, return_exceptions=True)
all_alerts = []
all_signals = []
successful_networks = 0
for res in results:
if isinstance(res, tuple) and len(res) == 2:
alerts, signals = res
all_alerts.extend(alerts)
all_signals.extend(signals)
successful_networks += 1
all_alerts.sort(key=lambda x: x['timestamp'], reverse=True)
total_volume = sum(alert['value_usd'] for alert in all_alerts)
alert_count = len(all_alerts)
exchange_inflow = sum(alert['value_usd'] for alert in all_alerts
if alert['flow_direction'] == 'TO_EXCHANGE')
exchange_outflow = sum(alert['value_usd'] for alert in all_alerts
if alert['flow_direction'] == 'FROM_EXCHANGE')
net_exchange_flow = exchange_inflow - exchange_outflow
critical_signals = [s for s in all_signals if s.get('critical_alert', False)]
if not all_alerts:
return {
'data_available': False,
'description': 'غير متوفر - لم يتم اكتشاف نشاط حيتان كبير',
'critical_alert': False,
'sentiment': 'UNKNOWN',
'total_volume_usd': 0,
'transaction_count': 0,
'data_quality': 'HIGH',
'networks_scanned': successful_networks,
'trading_signals': all_signals,
'netflow_analysis': {
'inflow_to_exchanges': 0,
'outflow_from_exchanges': 0,
'net_flow': 0,
'flow_direction': 'BALANCED'
}
}
latest_alert = all_alerts[0] if all_alerts else None
latest_time_info = f"آخر نشاط منذ {latest_alert['minutes_ago']:.1f} دقيقة" if latest_alert else ""
if net_exchange_flow < -1000000:
sentiment = 'BEARISH'
flow_description = f"ضغط بيعي قوي: ${abs(net_exchange_flow):,.0f} إلى المنصات"
market_impact = "HIGH"
elif net_exchange_flow < -500000:
sentiment = 'SLIGHTLY_BEARISH'
flow_description = f"ضغط بيعي: ${abs(net_exchange_flow):,.0f} إلى المنصات"
market_impact = "MEDIUM"
elif net_exchange_flow > 1000000:
sentiment = 'BULLISH'
flow_description = f"تراكم شرائي قوي: ${net_exchange_flow:,.0f} من المنصات"
market_impact = "HIGH"
elif net_exchange_flow > 500000:
sentiment = 'SLIGHTLY_BULLISH'
flow_description = f"تراكم شرائي: ${net_exchange_flow:,.0f} من المنصات"
market_impact = "MEDIUM"
else:
sentiment = 'NEUTRAL'
flow_description = f"تدفق متوازن: ${net_exchange_flow:,.0f} صافي"
market_impact = "LOW"
critical_alert = (
total_volume > 10_000_000 or
any(tx['value_usd'] > 5_000_000 for tx in all_alerts) or
abs(net_exchange_flow) > 5_000_000 or
len(critical_signals) > 0
)
description = f"تم اكتشاف {alert_count} معاملة حوت بإجمالي ${total_volume:,.0f} عبر {successful_networks} شبكات. {flow_description}. {latest_time_info}"
return {
'data_available': True,
'description': description,
'critical_alert': critical_alert,
'sentiment': sentiment,
'market_impact': market_impact,
'total_volume_usd': total_volume,
'transaction_count': alert_count,
'netflow_analysis': {
'inflow_to_exchanges': exchange_inflow,
'outflow_from_exchanges': exchange_outflow,
'net_flow': net_exchange_flow,
'flow_direction': 'TO_EXCHANGES' if net_exchange_flow < 0 else 'FROM_EXCHANGES',
'market_impact': market_impact
},
'recent_alerts': all_alerts[:10],
'latest_activity': latest_alert['human_time'] if latest_alert else None,
'trading_signals': all_signals,
'critical_signals_count': len(critical_signals),
'address_classification_stats': {
'total_classified': len(self.address_labels),
'exchange_addresses': len(self.address_categories['cex']),
'whale_addresses': len(self.address_categories['whale']),
'unknown_addresses': len(self.address_categories['unknown'])
},
'data_quality': 'HIGH',
'networks_scanned': successful_networks
}
except Exception as e:
print(f"❌ فشل مراقبة الحيتان العامة: {e}")
return {
'data_available': False,
'description': f'غير متوفر - فشل في مراقبة الحيتان: {str(e)}',
'critical_alert': False,
'sentiment': 'UNKNOWN',
'total_volume_usd': 0,
'transaction_count': 0,
'data_quality': 'LOW',
'error': str(e),
'trading_signals': []
}
async def _get_native_price_from_external(self):
"""الحصول على الأسعار من DataManager بدلاً من هنا"""
try:
# محاكاة الحصول على السعر من DataManager
# في التطبيق الفعلي، سيتم استدعاء DataManager
return 3500.0 # قيمة افتراضية
except Exception as e:
print(f"⚠️ فشل جلب السعر من DataManager: {e}")
return None
async def _call_rpc_async(self, network, method, params=[]):
max_retries = 2
for attempt in range(max_retries):
endpoint = self._get_next_rpc_endpoint(network)
if not endpoint:
print(f"❌ لا توجد نقاط نهاية RPC متاحة لـ {network}")
return None
try:
if 'infura' in endpoint and self.infura_key:
self._update_api_usage_stats('infura')
if await self._api_rate_limit_delay('infura'):
continue
payload = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1}
timeout = 25.0 if method == 'eth_getBlockByNumber' else 12.0
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(endpoint, json=payload)
if response.status_code == 401:
print(f"🔐 خطأ مصادقة في {endpoint}")
self.rpc_failures[network] += 1
continue
elif response.status_code == 429:
print(f"⏳ Rate limit على {endpoint}")
await asyncio.sleep(2 * (attempt + 1))
continue
response.raise_for_status()
result = response.json().get('result')
self.rpc_failures[network] = 0
return result
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
print(f"⚠️ Rate limit على {endpoint} لـ {network}")
self.rpc_failures[network] += 1
await asyncio.sleep(3 * (attempt + 1))
continue
elif e.response.status_code == 401:
print(f"🔐 خطأ مصادقة في {endpoint}")
self.rpc_failures[network] += 1
continue
else:
print(f"⚠️ خطأ HTTP {e.response.status_code} في {endpoint}")
self.rpc_failures[network] += 1
except Exception as e:
print(f"⚠️ فشل اتصال RPC لـ {network}: {e}")
self.rpc_failures[network] += 1
if attempt < max_retries - 1:
await asyncio.sleep(1 * (attempt + 1))
print(f"❌ فشل جميع محاولات RPC لـ {network}")
return None
def _get_next_rpc_endpoint(self, network):
if network not in self.rpc_endpoints:
return None
endpoints = self.rpc_endpoints[network]
if not endpoints:
return None
index = self.current_rpc_index[network]
endpoint = endpoints[index]
self.current_rpc_index[network] = (index + 1) % len(endpoints)
return endpoint
def _update_api_usage_stats(self, api_name):
now = datetime.now()
current_date = now.date()
stats = self.api_usage_stats[api_name]
if current_date != stats['last_reset']:
stats['requests_today'] = 0
stats['last_reset'] = current_date
current_time = time.time()
time_diff = current_time - stats['last_request_time']
if time_diff < 1.0:
stats['requests_per_second'] += 1
else:
stats['requests_per_second'] = 1
stats['last_request_time'] = current_time
stats['requests_today'] += 1
if api_name == 'etherscan':
if stats['requests_today'] > 90000:
print(f"🚨 تحذير: طلبات {api_name} اليومية تقترب من الحد")
if stats['requests_per_second'] > 4:
print(f"🚨 تحذير: طلبات {api_name} في الثانية تقترب من الحد")
elif api_name == 'infura':
if stats['requests_today'] > 2500000:
print(f"🚨 تحذير: طلبات {api_name} اليومية تقترب من الحد")
if stats['requests_per_second'] > 450:
print(f"🚨 تحذير: طلبات {api_name} في الثانية تقترب من الحد")
async def _api_rate_limit_delay(self, api_name):
stats = self.api_usage_stats[api_name]
if api_name == 'etherscan':
if stats['requests_per_second'] > 4:
delay = 0.2 * (stats['requests_per_second'] - 4)
print(f"⏳ تأخير {delay:.2f} ثانية لـ {api_name}")
await asyncio.sleep(delay)
if stats['requests_today'] > 95000:
print(f"🚨 تجاوز الحد اليومي لطلبات {api_name}")
return True
elif api_name == 'infura':
if stats['requests_per_second'] > 400:
delay = 0.1 * (stats['requests_per_second'] - 400)
print(f"⏳ تأخير {delay:.2f} ثانية لـ {api_name}")
await asyncio.sleep(delay)
if stats['requests_today'] > 2800000:
print(f"🚨 تجاوز الحد اليومي لطلبات {api_name}")
return True
return False
async def get_symbol_specific_whale_data(self, symbol, contract_address=None):
try:
base_symbol = symbol.split("/")[0] if '/' in symbol else symbol
if not contract_address:
contract_address = await self._find_contract_address(base_symbol)
if not contract_address:
return await self._scan_networks_for_symbol(symbol, base_symbol)
print(f"🔍 جلب بيانات الحيتان لـ {symbol}")
api_data = await self._get_combined_api_data(contract_address)
if api_data:
enriched_data = await self._enrich_api_data_with_timing(api_data)
return self._analyze_symbol_specific_data(enriched_data, symbol)
else:
return await self._scan_networks_for_symbol(symbol, base_symbol)
except Exception as e:
print(f"❌ فشل جلب بيانات الحيتان لـ {symbol}: {e}")
return {
'data_available': False,
'description': f'غير متوفر - خطأ في جلب بيانات الحيتان',
'total_volume': 0,
'transfer_count': 0,
'source': 'error'
}
async def _get_combined_api_data(self, contract_address):
tasks = []
if self.moralis_key:
tasks.append(self._get_moralis_token_data(contract_address))
if self.etherscan_key:
tasks.append(self._get_etherscan_token_data_v2(contract_address))
if not tasks:
return []
results = await asyncio.gather(*tasks, return_exceptions=True)
all_transfers = []
for res in results:
if isinstance(res, list):
all_transfers.extend(res)
return all_transfers
async def _get_etherscan_token_data_v2(self, contract_address):
if not self.etherscan_key:
return []
try:
self._update_api_usage_stats('etherscan')
if await self._api_rate_limit_delay('etherscan'):
print("⚠️ تجاوز حدود Etherscan، تخطي الطلب")
return []
params = {
"module": "account",
"action": "tokentx",
"contractaddress": contract_address,
"page": 1,
"offset": 10,
"sort": "desc",
"apikey": self.etherscan_key
}
base_url = "https://api.etherscan.io/api"
print(f"🔍 جلب بيانات Etherscan للعقد: {contract_address[:10]}...")
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(base_url, params=params)
if response.status_code == 429:
print("⏳ تجاوز حد معدل Etherscan")
await asyncio.sleep(2)
return []
response.raise_for_status()
data = response.json()
if data.get('status') == '1' and data.get('message') == 'OK':
result = data.get('result', [])
print(f"✅ بيانات Etherscan: {len(result)} تحويل")
return result
else:
error_message = data.get('message', 'Unknown error')
print(f"⚠️ خطأ في استجابة Etherscan: {error_message}")
return []
except httpx.HTTPStatusError as e:
print(f"⚠️ خطأ HTTP في Etherscan API: {e.response.status_code}")
return []
except Exception as e:
print(f"⚠️ فشل جلب بيانات Etherscan: {e}")
return []
async def _get_moralis_token_data(self, contract_address):
if not self.moralis_key:
return []
try:
response = await self.http_client.get(
f"https://deep-index.moralis.io/api/v2/erc20/{contract_address}/transfers",
headers={"X-API-Key": self.moralis_key},
params={"chain": "eth", "limit": 10}
)
if response.status_code == 200:
result = response.json().get('result', [])
print(f"✅ بيانات Moralis: {len(result)} تحويل")
return result
else:
print(f"⚠️ خطأ Moralis API: {response.status_code}")
return []
except Exception as e:
print(f"⚠️ Moralis API error: {e}")
return []
async def _enrich_api_data_with_timing(self, api_data):
enriched_data = []
for transfer in api_data:
try:
if 'timeStamp' in transfer:
timestamp = int(transfer['timeStamp'])
elif 'block_timestamp' in transfer:
timestamp = int(transfer['block_timestamp'])
else:
timestamp = int(time.time())
transfer_time = datetime.fromtimestamp(timestamp)
time_ago = datetime.now() - transfer_time
enriched_transfer = {
**transfer,
'human_time': transfer_time.isoformat(),
'minutes_ago': time_ago.total_seconds() / 60,
'timestamp': timestamp
}
enriched_data.append(enriched_transfer)
except Exception as e:
print(f"⚠️ خطأ في إثراء بيانات التحويل: {e}")
continue
return enriched_data
def _analyze_symbol_specific_data(self, enriched_data, symbol):
if not enriched_data:
return {
'data_available': False,
'description': f'غير متوفر - لا توجد بيانات تحويل لـ {symbol}',
'total_volume': 0,
'transfer_count': 0,
'source': 'no_data'
}
try:
volumes = []
large_transfers = []
for transfer in enriched_data:
value = float(transfer.get('value', 0))
volumes.append(value)
if value > 10000:
large_transfers.append(transfer)
total_volume = sum(volumes)
transfer_count = len(volumes)
avg_volume = total_volume / transfer_count if transfer_count > 0 else 0
latest_transfer = max(enriched_data, key=lambda x: x['timestamp'])
oldest_transfer = min(enriched_data, key=lambda x: x['timestamp'])
time_range_hours = (latest_transfer['timestamp'] - oldest_transfer['timestamp']) / 3600
if len(large_transfers) > 5:
activity_level = 'HIGH'
description = f"نشاط حيتان مرتفع لـ {symbol}: {len(large_transfers)} تحويل كبير"
elif len(large_transfers) > 2:
activity_level = 'MEDIUM'
description = f"نشاط حيتان متوسط لـ {symbol}: {len(large_transfers)} تحويل كبير"
else:
activity_level = 'LOW'
description = f"نشاط حيتان منخفض لـ {symbol}: {len(large_transfers)} تحويل كبير"
return {
'data_available': True,
'description': description,
'total_volume': total_volume,
'transfer_count': transfer_count,
'average_volume': avg_volume,
'large_transfers_count': len(large_transfers),
'activity_level': activity_level,
'latest_transfer_time': latest_transfer['human_time'],
'time_range_hours': time_range_hours,
'source': 'api_combined',
'recent_large_transfers': large_transfers[:5]
}
except Exception as e:
print(f"❌ خطأ في تحليل بيانات {symbol}: {e}")
return {
'data_available': False,
'description': f'غير متوفر - خطأ في تحليل البيانات',
'total_volume': 0,
'transfer_count': 0,
'source': 'error'
}
async def _find_contract_address(self, symbol):
symbol_lower = symbol.lower()
for key, address in self.contracts_db.items():
if symbol_lower in key.lower():
return address
print(f"🔍 لم يتم العثور على عقد لـ {symbol} في قاعدة البيانات")
return None
async def _scan_networks_for_symbol(self, symbol, base_symbol):
print(f"🔍 مسح الشبكات للعثور على {symbol}...")
networks_to_scan = ['ethereum', 'bsc']
for network in networks_to_scan:
try:
price = await self._get_native_price_from_external()
if price:
print(f"✅ تم العثور على {symbol} على شبكة {network} بسعر ${price:.2f}")
return {
'data_available': True,
'description': f'تم اكتشاف {symbol} على شبكة {network}',
'network': network,
'price_usd': price,
'source': 'network_scan'
}
except Exception as e:
print(f"⚠️ فشل مسح {network} لـ {symbol}: {e}")
continue
return {
'data_available': False,
'description': f'غير متوفر - لم يتم العثور على {symbol} على أي شبكة',
'source': 'not_found'
}
def get_api_usage_stats(self):
stats = {}
for api_name, api_stats in self.api_usage_stats.items():
if api_name == 'etherscan':
daily_limit = 100000
per_second_limit = 5
elif api_name == 'infura':
daily_limit = 3000000
per_second_limit = 500
else:
continue
stats[api_name] = {
'requests_today': api_stats['requests_today'],
'requests_per_second': api_stats['requests_per_second'],
'daily_limit_remaining': daily_limit - api_stats['requests_today'],
'usage_percentage': (api_stats['requests_today'] / daily_limit) * 100,
'per_second_usage_percentage': (api_stats['requests_per_second'] / per_second_limit) * 100,
'last_reset': api_stats['last_reset'].isoformat(),
'api_available': getattr(self, f'{api_name}_key') is not None
}
return stats
# دوال تحليل الحيتان المنقولة من ML.py
def _calculate_whale_activity_score(self, whale_data):
"""حساب درجة نشاط الحيتان - منقول من ML.py"""
if not whale_data.get('data_available', False):
return 0.0
total_transactions = whale_data.get('transfer_count', 0)
total_volume = whale_data.get('total_volume', 0)
score = 0.0
if total_transactions >= 10:
score += 0.3
elif total_transactions >= 5:
score += 0.15
if total_volume > 500000:
score += 0.2
elif total_volume > 100000:
score += 0.1
return min(score, 0.5)
async def generate_whale_trading_signal(self, symbol, whale_data, market_context):
"""توليد إشارة تداول مباشرة من بيانات الحيتان"""
try:
if not whale_data.get('data_available', False):
return None
whale_score = self._calculate_whale_activity_score(whale_data)
total_volume = whale_data.get('total_volume', 0)
transfer_count = whale_data.get('transfer_count', 0)
signal = {
'symbol': symbol,
'whale_score': whale_score,
'total_volume': total_volume,
'transfer_count': transfer_count,
'timestamp': datetime.now().isoformat()
}
if whale_score > 0.4 and total_volume > 500000:
signal.update({
'action': 'STRONG_BUY',
'confidence': min(0.9, whale_score),
'reason': f'نشاط حيتان قوي: {transfer_count} تحويل بإجمالي ${total_volume:,.0f}',
'critical_alert': total_volume > 1000000
})
elif whale_score > 0.2 and total_volume > 100000:
signal.update({
'action': 'BUY',
'confidence': min(0.7, whale_score),
'reason': f'نشاط حيتان متوسط: {transfer_count} تحويل بإجمالي ${total_volume:,.0f}',
'critical_alert': False
})
else:
signal.update({
'action': 'HOLD',
'confidence': 0.5,
'reason': f'نشاط حيتان طبيعي: {transfer_count} تحويل',
'critical_alert': False
})
return signal
except Exception as e:
print(f"⚠️ خطأ في توليد إشارة تداول الحيتان: {e}")
return None
async def track_pattern_outcome(self, symbol, pattern_analysis, success, profit_percent):
"""تتبع أداء أنماط الحيتان - منقول من ML.py"""
if not pattern_analysis:
return
pattern_name = pattern_analysis.get('pattern_detected')
confidence = pattern_analysis.get('pattern_confidence', 0)
if pattern_name not in ['no_clear_pattern', 'insufficient_data']:
if pattern_name not in self.pattern_performance:
self.pattern_performance[pattern_name] = {
'total_trades': 0,
'successful_trades': 0,
'total_profit': 0,
'total_confidence': 0
}
stats = self.pattern_performance[pattern_name]
stats['total_trades'] += 1
stats['total_confidence'] += confidence
if success:
stats['successful_trades'] += 1
stats['total_profit'] += profit_percent
success_rate = stats['successful_trades'] / stats['total_trades']
avg_profit = stats['total_profit'] / stats['successful_trades'] if stats['successful_trades'] > 0 else 0
avg_confidence = stats['total_confidence'] / stats['total_trades']
print(f"📊 تحديث أداء النمط {pattern_name}: نجاح {success_rate:.1%}, ربح متوسط {avg_profit:.2f}%")
def get_pattern_reliability(self, pattern_name):
"""الحصول على موثوقية النمط - منقول من ML.py"""
if pattern_name in self.pattern_performance:
stats = self.pattern_performance[pattern_name]
if stats['total_trades'] > 0:
return stats['successful_trades'] / stats['total_trades']
return 0.5
whale_monitor_global = EnhancedWhaleMonitor()