Trad / data_manager.py
Riy777's picture
Update data_manager.py
bdf5ede
raw
history blame
32.7 kB
# data_manager.py
import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt.pro as ccxt
import numpy as np
import logging
# تعطيل تسجيل HTTP المزعج فقط
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
class DataManager:
def __init__(self, contracts_db, whale_monitor):
self.contracts_db = contracts_db or {}
self.whale_monitor = whale_monitor
try:
self.exchange = ccxt.kucoin({
'sandbox': False,
'enableRateLimit': True,
'timeout': 30000,
'verbose': False
})
self.exchange.rateLimit = 800
print("✅ تم تهيئة اتصال KuCoin بنجاح")
except Exception as e:
print(f"❌ فشل تهيئة اتصال KuCoin: {e}")
self.exchange = None
self._whale_data_cache = {}
self.http_client = None
self.fetch_stats = {'successful_fetches': 0, 'failed_fetches': 0, 'rate_limit_hits': 0}
self.price_cache = {}
self.market_cache = {}
self.last_market_load = None
self.price_sources = {
'kucoin': self._get_prices_from_kucoin_safe,
'coingecko': self._get_prices_from_coingecko
}
async def initialize(self):
self.http_client = httpx.AsyncClient(timeout=20.0)
api_status = {
'KUCOIN': '🟢 عام (بدون مفتاح)',
'MORALIS_KEY': "🟢 متوفر" if os.getenv('MORALIS_KEY') else "🔴 غير متوفر",
'ETHERSCAN_KEY': "🟢 متوفر" if os.getenv('ETHERSCAN_KEY') else "🔴 غير متوفر",
'INFURA_KEY': "🟢 متوفر" if os.getenv('INFURA_KEY') else "🔴 غير متوفر"
}
for key, status in api_status.items():
print(f" {key}: {status}")
await self._load_markets()
async def _load_markets(self):
try:
if not self.exchange:
return
print("🔄 جلب أحدث بيانات الأسواق من KuCoin...")
await self.exchange.load_markets()
self.market_cache = self.exchange.markets
self.last_market_load = datetime.now()
print(f"✅ تم تحميل {len(self.market_cache)} سوق من KuCoin")
except Exception as e:
print(f"❌ فشل تحميل بيانات الأسواق: {e}")
async def close(self):
if self.http_client:
await self.http_client.aclose()
if self.exchange:
await self.exchange.close()
async def get_native_coin_price(self, network):
now = time.time()
cache_key = f"{network}_price"
if cache_key in self.price_cache and (now - self.price_cache[cache_key]['timestamp']) < 300:
return self.price_cache[cache_key]['price']
symbol_map = {
'ethereum': 'ETH',
'bsc': 'BNB',
'bitcoin': 'BTC'
}
symbol = symbol_map.get(network)
if not symbol:
return await self._get_price_from_coingecko_fallback(network)
try:
price = await self._get_price_from_kucoin(symbol)
if price and price > 0:
self.price_cache[cache_key] = {'price': price, 'timestamp': now, 'source': 'kucoin'}
return price
price = await self._get_price_from_coingecko_fallback(network)
if price and price > 0:
self.price_cache[cache_key] = {'price': price, 'timestamp': now, 'source': 'coingecko'}
return price
return None
except Exception as e:
print(f"❌ فشل جلب سعر {network}: {e}")
return None
async def _get_price_from_kucoin(self, symbol):
if not self.exchange:
return None
try:
trading_symbol = f"{symbol}/USDT"
if trading_symbol not in self.market_cache:
print(f"⚠️ السوق {trading_symbol} غير متوفر في KuCoin")
return None
ticker = await self.exchange.fetch_ticker(trading_symbol)
price = ticker.get('last')
if price and price > 0:
return float(price)
else:
print(f"⚠️ لم يتم العثور على سعر صالح لـ {symbol}")
return None
except Exception as e:
print(f"❌ فشل جلب سعر {symbol} من KuCoin: {e}")
return None
async def _get_price_from_coingecko_fallback(self, network):
coin_map = {
'ethereum': 'ethereum',
'bsc': 'binancecoin',
'bitcoin': 'bitcoin'
}
coin_id = coin_map.get(network)
if not coin_id:
return None
try:
await asyncio.sleep(0.5)
url = f"https://api.coingecko.com/api/v3/simple/price?ids={coin_id}&vs_currencies=usd"
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10)
response.raise_for_status()
data = response.json()
price = data.get(coin_id, {}).get('usd')
if price and price > 0:
print(f"✅ سعر {network.upper()} (CoinGecko): ${price:,.2f}")
return price
else:
print(f"⚠️ لم يتم العثور على سعر {network} في CoinGecko")
return None
except Exception as e:
print(f"❌ فشل جلب سعر {network} من CoinGecko: {e}")
return None
async def get_sentiment_safe_async(self):
max_retries = 2
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=8) as client:
response = await client.get("https://api.alternative.me/fng/")
response.raise_for_status()
data = response.json()
if 'data' not in data or not data['data']:
raise ValueError("بيانات المشاعر غير متوفرة في الاستجابة")
latest_data = data['data'][0]
return {
"feargreed_value": int(latest_data['value']),
"feargreed_class": latest_data['value_classification'],
"source": "alternative.me",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
print(f"❌ فشل جلب بيانات المشاعر (المحاولة {attempt + 1}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1)
return None
async def get_market_context_async(self):
max_retries = 2
for attempt in range(max_retries):
try:
sentiment_task = asyncio.wait_for(self.get_sentiment_safe_async(), timeout=10)
price_task = asyncio.wait_for(self._get_prices_with_fallback(), timeout=15)
# ❌ إزالة مهمة الحيتان العامة
results = await asyncio.gather(sentiment_task, price_task, return_exceptions=True)
sentiment_data = results[0] if not isinstance(results[0], Exception) else None
price_data = results[1] if not isinstance(results[1], Exception) else {}
bitcoin_price = price_data.get('bitcoin')
ethereum_price = price_data.get('ethereum')
if bitcoin_price is None or ethereum_price is None:
if attempt < max_retries - 1:
await asyncio.sleep(2)
continue
else:
return self._get_minimal_market_context()
market_trend = self._determine_market_trend(bitcoin_price, sentiment_data)
# ❌ إزالة تحليل إشارات التداول المعتمدة على الحيتان العامة
trading_decision = self._analyze_market_trading_signals(sentiment_data)
market_context = {
'timestamp': datetime.now().isoformat(),
'bitcoin_price_usd': bitcoin_price,
'ethereum_price_usd': ethereum_price,
'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None,
'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'UNKNOWN',
# ❌ إزالة بيانات الحيتان العامة
'general_whale_activity': {
'data_available': False,
'description': 'نظام الحيتان العام معطل - يركز النظام على تحليل الحيتان للعملات المرشحة فقط',
'critical_alert': False,
'sentiment': 'NEUTRAL',
'trading_signals': []
},
'market_trend': market_trend,
'trading_decision': trading_decision,
'btc_sentiment': self._get_btc_sentiment(bitcoin_price),
'data_sources': {
'prices': bitcoin_price is not None and ethereum_price is not None,
'sentiment': sentiment_data is not None,
'general_whale_data': False, # ❌ دائماً false
'netflow_analysis': 'DISABLED' # ❌ معطل
},
'data_quality': 'HIGH',
'risk_assessment': self._assess_market_risk(sentiment_data) # ❌ إزالة الحيتان
}
return market_context
except Exception as e:
print(f"❌ فشل جلب سياق السوق (المحاولة {attempt + 1}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(3)
return self._get_minimal_market_context()
def _analyze_market_trading_signals(self, sentiment_data):
"""تحليل إشارات التداول بناءً على مشاعر السوق فقط (بدون حيتان عامة)"""
if not sentiment_data:
return {
'action': 'HOLD',
'confidence': 0.0,
'reason': 'غير متوفر - لا توجد بيانات كافية عن مشاعر السوق',
'risk_level': 'UNKNOWN'
}
fear_greed = sentiment_data.get('feargreed_value', 50)
sentiment_class = sentiment_data.get('feargreed_class', 'NEUTRAL')
if fear_greed <= 25:
return {
'action': 'BUY',
'confidence': 0.7,
'reason': f'مستوى خوف مرتفع في السوق: {fear_greed} - فرصة شرائية',
'risk_level': 'LOW'
}
elif fear_greed >= 75:
return {
'action': 'SELL',
'confidence': 0.6,
'reason': f'مستوى جشع مرتفع في السوق: {fear_greed} - احتياط بيعي',
'risk_level': 'MEDIUM'
}
else:
return {
'action': 'HOLD',
'confidence': 0.5,
'reason': f'مشاعر السوق متوازنة: {sentiment_class} ({fear_greed})',
'risk_level': 'LOW'
}
def _assess_market_risk(self, sentiment_data):
"""تقييم مخاطر السوق بناءً على المشاعر فقط (بدون حيتان)"""
risk_factors = []
risk_score = 0
if sentiment_data and sentiment_data.get('feargreed_value', 50) < 30:
risk_factors.append("مخاوف السوق عالية")
risk_score += 2
elif sentiment_data and sentiment_data.get('feargreed_value', 50) > 70:
risk_factors.append("جشع السوق مرتفع")
risk_score += 1
if risk_score >= 2:
return {'level': 'MEDIUM', 'score': risk_score, 'factors': risk_factors}
else:
return {'level': 'LOW', 'score': risk_score, 'factors': risk_factors}
def _get_btc_sentiment(self, bitcoin_price):
if bitcoin_price is None:
return 'UNKNOWN'
elif bitcoin_price > 60000:
return 'BULLISH'
elif bitcoin_price < 55000:
return 'BEARISH'
else:
return 'NEUTRAL'
async def _get_prices_with_fallback(self):
try:
prices = await self._get_prices_from_kucoin_safe()
if prices.get('bitcoin') and prices.get('ethereum'):
return prices
prices = await self._get_prices_from_coingecko()
if prices.get('bitcoin') and prices.get('ethereum'):
return prices
return {'bitcoin': None, 'ethereum': None}
except Exception as e:
print(f"❌ فشل جلب الأسعار: {e}")
return {'bitcoin': None, 'ethereum': None}
async def _get_prices_from_kucoin_safe(self):
if not self.exchange:
return {'bitcoin': None, 'ethereum': None}
try:
prices = {'bitcoin': None, 'ethereum': None}
try:
btc_ticker = await self.exchange.fetch_ticker('BTC/USDT')
btc_price = float(btc_ticker.get('last', 0)) if btc_ticker.get('last') else None
if btc_price and btc_price > 0:
prices['bitcoin'] = btc_price
self.price_cache['bitcoin'] = btc_price
print(f"✅ سعر BTC: ${btc_price:,.2f}")
else:
print("⚠️ لم يتم العثور على سعر BTC صالح")
except Exception as e:
print(f"❌ فشل جلب سعر BTC: {e}")
try:
eth_ticker = await self.exchange.fetch_ticker('ETH/USDT')
eth_price = float(eth_ticker.get('last', 0)) if eth_ticker.get('last') else None
if eth_price and eth_price > 0:
prices['ethereum'] = eth_price
self.price_cache['ethereum'] = eth_price
print(f"✅ سعر ETH: ${eth_price:,.2f}")
else:
print("⚠️ لم يتم العثور على سعر ETH صالح")
except Exception as e:
print(f"❌ فشل جلب سعر ETH: {e}")
return prices
except Exception as e:
print(f"❌ خطأ في _get_prices_from_kucoin_safe: {e}")
return {'bitcoin': None, 'ethereum': None}
async def _get_prices_from_coingecko(self):
try:
await asyncio.sleep(0.5)
url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd"
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10)
response.raise_for_status()
data = response.json()
btc_price = data.get('bitcoin', {}).get('usd')
eth_price = data.get('ethereum', {}).get('usd')
if btc_price and eth_price:
self.price_cache['bitcoin'] = btc_price
self.price_cache['ethereum'] = eth_price
print(f"✅ سعر BTC (CoinGecko): ${btc_price:,.2f}")
print(f"✅ سعر ETH (CoinGecko): ${eth_price:,.2f}")
return {'bitcoin': btc_price, 'ethereum': eth_price}
else:
print("⚠️ لم يتم العثور على أسعار صالحة من CoinGecko")
return {'bitcoin': None, 'ethereum': None}
except Exception as e:
print(f"❌ فشل جلب الأسعار من CoinGecko: {e}")
return {'bitcoin': None, 'ethereum': None}
def _get_minimal_market_context(self):
return {
'timestamp': datetime.now().isoformat(),
'data_available': False,
'data_sources': {'prices': False, 'sentiment': False, 'general_whale_data': False},
'error': 'غير متوفر - فشل في جلب بيانات السوق من المصادر الخارجية',
'market_trend': 'UNKNOWN',
'btc_sentiment': 'UNKNOWN',
'data_quality': 'LOW',
'general_whale_activity': {
'data_available': False,
'description': 'نظام الحيتان العام معطل - يركز النظام على تحليل الحيتان للعملات المرشحة فقط',
'critical_alert': False,
'sentiment': 'NEUTRAL'
},
'bitcoin_price_usd': None,
'ethereum_price_usd': None,
'fear_and_greed_index': None,
'sentiment_class': 'UNKNOWN',
'missing_data': ['غير متوفر - أسعار البيتكوين', 'غير متوفر - أسعار الإيثيريوم', 'غير متوفر - بيانات المشاعر']
}
def _determine_market_trend(self, bitcoin_price, sentiment_data):
"""تحديد اتجاه السوق بدون بيانات الحيتان العامة"""
try:
if bitcoin_price is None:
return "UNKNOWN"
score = 0
data_points = 1
if bitcoin_price > 60000:
score += 1
elif bitcoin_price < 55000:
score -= 1
if sentiment_data and sentiment_data.get('feargreed_value') is not None:
fear_greed = sentiment_data.get('feargreed_value')
if fear_greed > 60:
score += 1
elif fear_greed < 40:
score -= 1
data_points += 1
if data_points < 2:
return "UNKNOWN"
if score >= 2:
return "bull_market"
elif score <= -2:
return "bear_market"
elif -1 <= score <= 1:
return "sideways_market"
else:
return "volatile_market"
except Exception as e:
return "UNKNOWN"
def get_performance_stats(self):
total_attempts = self.fetch_stats['successful_fetches'] + self.fetch_stats['failed_fetches']
success_rate = (self.fetch_stats['successful_fetches'] / total_attempts * 100) if total_attempts > 0 else 0
stats = {
'total_attempts': total_attempts,
'successful_fetches': self.fetch_stats['successful_fetches'],
'failed_fetches': self.fetch_stats['failed_fetches'],
'rate_limit_hits': self.fetch_stats['rate_limit_hits'],
'success_rate': f"{success_rate:.1f}%",
'timestamp': datetime.now().isoformat(),
'exchange_available': self.exchange is not None,
'markets_loaded': len(self.market_cache) if self.market_cache else 0,
'last_market_load': self.last_market_load.isoformat() if self.last_market_load else None
}
# ❌ إزالة إحصائيات API للحيتان العامة
stats['api_usage'] = {
'note': 'نظام الحيتان العام معطل - يتم تحليل الحيتان للعملات المرشحة فقط'
}
return stats
async def get_symbol_specific_whale_data(self, symbol, contract_address=None):
"""جلب بيانات الحيتان الخاصة برمز معين (للمرشحين النهائيين فقط)"""
if hasattr(self.whale_monitor, 'get_symbol_whale_activity'):
return await self.whale_monitor.get_symbol_whale_activity(symbol, contract_address)
else:
return {
'data_available': False,
'error': 'نظام تحليل الحيتان غير متوفر',
'symbol': symbol
}
async def get_whale_trading_signal(self, symbol, whale_data, market_context):
"""جلب إشارة تداول بناءً على بيانات الحيتان للرمز المحدد"""
if hasattr(self.whale_monitor, 'generate_whale_trading_signal'):
return await self.whale_monitor.generate_whale_trading_signal(symbol, whale_data, market_context)
else:
return {
'action': 'HOLD',
'confidence': 0.3,
'reason': 'نظام تحليل الحيتان غير متوفر',
'source': 'whale_analysis'
}
async def _calculate_technical_score(self, symbol, ohlcv_data):
try:
if not ohlcv_data or '1h' not in ohlcv_data:
return 0.0
hourly_data = ohlcv_data['1h']
if len(hourly_data) < 50:
return 0.0
closes = np.array([candle[4] for candle in hourly_data])
volumes = np.array([candle[5] for candle in hourly_data])
highs = np.array([candle[2] for candle in hourly_data])
lows = np.array([candle[3] for candle in hourly_data])
avg_volume = np.mean(volumes[-50:]) * np.mean(closes[-50:])
liquidity_score = min(avg_volume / 1000000, 1.0)
recent_volume = np.mean(volumes[-10:])
avg_volume_50 = np.mean(volumes[-50:])
volume_ratio = recent_volume / avg_volume_50 if avg_volume_50 > 0 else 1.0
volume_score = min(volume_ratio / 3.0, 1.0)
true_ranges = []
for i in range(1, len(hourly_data)):
high, low, prev_close = highs[i], lows[i], closes[i-1]
tr1 = high - low
tr2 = abs(high - prev_close)
tr3 = abs(low - prev_close)
true_ranges.append(max(tr1, tr2, tr3))
atr = np.mean(true_ranges[-14:]) if true_ranges else 0
current_price = closes[-1]
volatility_score = min((atr / current_price) * 100 / 10, 1.0)
price_change_1h = ((closes[-1] - closes[-2]) / closes[-2]) * 100 if len(closes) > 1 else 0
price_change_4h = ((closes[-1] - closes[-5]) / closes[-5]) * 100 if len(closes) > 5 else 0
price_change_24h = ((closes[-1] - closes[-24]) / closes[-24]) * 100 if len(closes) > 24 else 0
momentum_score = (abs(price_change_1h) + abs(price_change_4h) + abs(price_change_24h)) / 30
momentum_score = min(momentum_score, 1.0)
final_score = (
liquidity_score * 0.3 +
volume_score * 0.25 +
volatility_score * 0.2 +
momentum_score * 0.25
)
return final_score
except Exception as e:
print(f"❌ خطأ في حساب الدرجة التقنية لـ {symbol}: {e}")
return 0.0
async def find_high_potential_candidates(self, count=20):
try:
print(f"🔍 البحث عن {count} رمز ذو إمكانات عالية بناءً على المؤشرات التقنية...")
if not self.exchange:
print("❌ لا يوجد اتصال بـ KuCoin")
return []
if not self.market_cache or not self.last_market_load or (datetime.now() - self.last_market_load).total_seconds() > 3600:
await self._load_markets()
usdt_symbols = [
symbol for symbol in self.market_cache.keys()
if symbol.endswith('/USDT') and self.market_cache[symbol].get('active', False)
]
print(f"✅ تم العثور على {len(usdt_symbols)} رمز USDT نشط في KuCoin")
candidates_with_scores = []
analyzed_count = 0
for symbol in usdt_symbols[:100]:
try:
analyzed_count += 1
if analyzed_count % 10 == 0:
print(f"📊 تم تحليل {analyzed_count} رمز من أصل {min(100, len(usdt_symbols))}")
ohlcv_1h = await self.exchange.fetch_ohlcv(symbol, '1h', limit=50)
if not ohlcv_1h or len(ohlcv_1h) < 20:
continue
ohlcv_data = {'1h': ohlcv_1h}
technical_score = await self._calculate_technical_score(symbol, ohlcv_data)
if technical_score > 0.3:
ticker = await self.exchange.fetch_ticker(symbol)
reasons = []
if technical_score > 0.7:
reasons.append('high_technical_score')
if ticker.get('baseVolume', 0) > 1000000:
reasons.append('high_liquidity')
if abs(ticker.get('percentage', 0)) > 5:
reasons.append('significant_momentum')
candidates_with_scores.append({
'symbol': symbol,
'technical_score': technical_score,
'reasons': reasons,
'volume': ticker.get('baseVolume', 0),
'price_change': ticker.get('percentage', 0),
'current_price': ticker.get('last', 0)
})
except Exception as e:
if "rate limit" not in str(e).lower():
print(f"⚠️ خطأ في تحليل الرمز {symbol}: {e}")
continue
candidates_with_scores.sort(key=lambda x: x['technical_score'], reverse=True)
top_candidates = candidates_with_scores[:count]
print(f"✅ تم العثور على {len(top_candidates)} مرشح عالي الجودة من أصل {analyzed_count} رمز تم تحليله")
for candidate in top_candidates[:5]:
print(f" 🥇 {candidate['symbol']}: درجة {candidate['technical_score']:.3f}")
return top_candidates
except Exception as e:
print(f"❌ خطأ في find_high_potential_candidates: {e}")
return []
async def get_fast_pass_data_async(self, candidates):
try:
print(f"📊 جلب بيانات OHLCV لـ {len(candidates)} مرشح من KuCoin...")
results = []
timeframes = [
('5m', 200),
('15m', 200),
('1h', 200),
('4h', 200),
('1d', 200),
('1w', 200)
]
for candidate in candidates:
symbol = candidate['symbol']
try:
ohlcv_data = {}
has_sufficient_data = True
for timeframe, limit in timeframes:
try:
ohlcv = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
if not ohlcv or len(ohlcv) < 50:
print(f"⚠️ بيانات غير كافية للرمز {symbol} في الإطار {timeframe}: {len(ohlcv) if ohlcv else 0} شمعة")
has_sufficient_data = False
break
ohlcv_data[timeframe] = ohlcv
print(f" ✅ {symbol} - {timeframe}: {len(ohlcv)} شمعة")
await asyncio.sleep(0.1)
except Exception as e:
print(f"❌ خطأ في جلب بيانات {symbol} للإطار {timeframe}: {e}")
has_sufficient_data = False
break
if has_sufficient_data:
result_data = {
'symbol': symbol,
'ohlcv': ohlcv_data,
'reasons': candidate.get('reasons', []),
'technical_score': candidate.get('technical_score', 0)
}
results.append(result_data)
print(f"✅ تم تجميع بيانات {symbol} بنجاح")
else:
print(f"❌ فشل تجميع بيانات كافية لـ {symbol}")
except Exception as symbol_error:
print(f"❌ خطأ في معالجة الرمز {symbol}: {symbol_error}")
continue
print(f"✅ تم تجميع بيانات لـ {len(results)} مرشح بنجاح")
return results
except Exception as e:
print(f"❌ خطأ في get_fast_pass_data_async: {e}")
return []
async def get_latest_price_async(self, symbol):
try:
if not self.exchange:
print("❌ لا يوجد اتصال بـ KuCoin")
return None
if symbol not in self.market_cache:
print(f"⚠️ السوق {symbol} غير متوفر في KuCoin")
return None
ticker = await self.exchange.fetch_ticker(symbol)
current_price = ticker.get('last')
if current_price:
return float(current_price)
else:
print(f"❌ لم يتم العثور على سعر لـ {symbol}")
return None
except Exception as e:
print(f"❌ خطأ في get_latest_price_async لـ {symbol}: {e}")
return None
async def get_available_symbols(self):
try:
if not self.exchange:
return []
if not self.market_cache:
await self._load_markets()
usdt_symbols = [
symbol for symbol in self.market_cache.keys()
if symbol.endswith('/USDT') and self.market_cache[symbol].get('active', False)
]
return usdt_symbols
except Exception as e:
print(f"❌ خطأ في get_available_symbols: {e}")
return []
async def validate_symbol(self, symbol):
try:
if not self.exchange:
return False
if not self.market_cache:
await self._load_markets()
return symbol in self.market_cache and self.market_cache[symbol].get('active', False)
except Exception as e:
print(f"❌ خطأ في validate_symbol لـ {symbol}: {e}")
return False
print("✅ DataManager loaded - Real-time market data from KuCoin ready (General Whale Monitoring DISABLED)")