Trad / data_manager.py
Riy777's picture
Update data_manager.py
af73eb9
raw
history blame
23.6 kB
# data_manager.py
import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt # استخدام ccxt العادي بدلاً من pro
import numpy as np
import logging
from typing import List, Dict, Any
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
class DataManager:
def __init__(self, contracts_db, whale_monitor):
self.contracts_db = contracts_db or {}
self.whale_monitor = whale_monitor
# إعدادات الأداء المحسنة
self.batch_size = 25 # حجم دفعة معقول
self.cache_duration = 300
try:
self.exchange = ccxt.kucoin({
'sandbox': False,
'enableRateLimit': True,
'timeout': 30000,
'verbose': False,
})
print("✅ تم تهيئة اتصال KuCoin بنجاح (CCXT Standard)")
except Exception as e:
print(f"❌ فشل تهيئة اتصال KuCoin: {e}")
self.exchange = None
self.http_client = None
self.market_cache = {}
self.last_market_load = None
self.symbol_cache = {}
self.cache_timestamp = {}
async def initialize(self):
self.http_client = httpx.AsyncClient(timeout=30.0)
await self._load_markets()
print("✅ DataManager initialized - Focused on Top 200 Symbols")
async def _load_markets(self):
try:
if not self.exchange:
return
print("🔄 جلب أحدث بيانات الأسواق من KuCoin...")
self.exchange.load_markets() # sync call instead of await
self.market_cache = self.exchange.markets
self.last_market_load = datetime.now()
print(f"✅ تم تحميل {len(self.market_cache)} سوق من KuCoin")
except Exception as e:
print(f"❌ فشل تحميل بيانات الأسواق: {e}")
async def close(self):
if self.http_client:
await self.http_client.aclose()
# لا داعي لـ close في ccxt العادي
async def get_market_context_async(self):
"""جلب سياق السوق الأساسي فقط"""
try:
sentiment_data = await self.get_sentiment_safe_async()
price_data = await self._get_prices_with_fallback()
bitcoin_price = price_data.get('bitcoin')
ethereum_price = price_data.get('ethereum')
market_context = {
'timestamp': datetime.now().isoformat(),
'bitcoin_price_usd': bitcoin_price,
'ethereum_price_usd': ethereum_price,
'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None,
'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'NEUTRAL',
'market_trend': self._determine_market_trend(bitcoin_price, sentiment_data),
'btc_sentiment': self._get_btc_sentiment(bitcoin_price),
'data_quality': 'HIGH' if bitcoin_price and ethereum_price else 'LOW'
}
return market_context
except Exception as e:
print(f"❌ فشل جلب سياق السوق: {e}")
return self._get_minimal_market_context()
async def get_sentiment_safe_async(self):
"""جلب بيانات المشاعر"""
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get("https://api.alternative.me/fng/")
response.raise_for_status()
data = response.json()
if 'data' not in data or not data['data']:
raise ValueError("بيانات المشاعر غير متوفرة")
latest_data = data['data'][0]
return {
"feargreed_value": int(latest_data['value']),
"feargreed_class": latest_data['value_classification'],
"source": "alternative.me",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
print(f"❌ فشل جلب بيانات المشاعر: {e}")
return None
def _determine_market_trend(self, bitcoin_price, sentiment_data):
"""تحديد اتجاه السوق"""
if bitcoin_price is None:
return "UNKNOWN"
score = 0
if bitcoin_price > 60000:
score += 1
elif bitcoin_price < 55000:
score -= 1
if sentiment_data and sentiment_data.get('feargreed_value') is not None:
fear_greed = sentiment_data.get('feargreed_value')
if fear_greed > 60:
score += 1
elif fear_greed < 40:
score -= 1
if score >= 1:
return "bull_market"
elif score <= -1:
return "bear_market"
else:
return "sideways_market"
def _get_btc_sentiment(self, bitcoin_price):
if bitcoin_price is None:
return 'UNKNOWN'
elif bitcoin_price > 60000:
return 'BULLISH'
elif bitcoin_price < 55000:
return 'BEARISH'
else:
return 'NEUTRAL'
async def _get_prices_with_fallback(self):
"""جلب أسعار البيتكوين والإيثيريوم"""
try:
prices = await self._get_prices_from_kucoin_safe()
if prices.get('bitcoin') and prices.get('ethereum'):
return prices
return await self._get_prices_from_coingecko()
except Exception as e:
print(f"❌ فشل جلب الأسعار: {e}")
return {'bitcoin': None, 'ethereum': None}
async def _get_prices_from_kucoin_safe(self):
if not self.exchange:
return {'bitcoin': None, 'ethereum': None}
try:
prices = {'bitcoin': None, 'ethereum': None}
btc_ticker = self.exchange.fetch_ticker('BTC/USDT') # sync call
btc_price = float(btc_ticker.get('last', 0)) if btc_ticker.get('last') else None
if btc_price and btc_price > 0:
prices['bitcoin'] = btc_price
eth_ticker = self.exchange.fetch_ticker('ETH/USDT') # sync call
eth_price = float(eth_ticker.get('last', 0)) if eth_ticker.get('last') else None
if eth_price and eth_price > 0:
prices['ethereum'] = eth_price
return prices
except Exception as e:
print(f"❌ خطأ في جلب الأسعار من KuCoin: {e}")
return {'bitcoin': None, 'ethereum': None}
async def _get_prices_from_coingecko(self):
"""الاحتياطي: جلب الأسعار من CoinGecko"""
try:
await asyncio.sleep(0.5)
url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd"
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10)
response.raise_for_status()
data = response.json()
btc_price = data.get('bitcoin', {}).get('usd')
eth_price = data.get('ethereum', {}).get('usd')
if btc_price and eth_price:
return {'bitcoin': btc_price, 'ethereum': eth_price}
else:
return {'bitcoin': None, 'ethereum': None}
except Exception as e:
print(f"❌ فشل جلب الأسعار من CoinGecko: {e}")
return {'bitcoin': None, 'ethereum': None}
def _get_minimal_market_context(self):
"""سياق سوق بدائي عند الفشل"""
return {
'timestamp': datetime.now().isoformat(),
'data_available': False,
'market_trend': 'UNKNOWN',
'btc_sentiment': 'UNKNOWN',
'data_quality': 'LOW'
}
async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
"""
الطبقة 1: فحص سريع لأفضل 200 عملة بناءً على مؤشرات هامة
"""
print("📊 الطبقة 1: فحص سريع لأفضل العملات بناءً على مؤشرات هامة...")
# الحصول على جميع الرموز النشطة
usdt_symbols = [
symbol for symbol in self.market_cache.keys()
if symbol.endswith('/USDT') and self.market_cache[symbol].get('active', False)
]
print(f"🔍 تحليل {len(usdt_symbols)} عملة متاحة...")
# جلب بيانات التداول لجميع الرموز وتقييمها
all_symbols_data = []
for i, symbol in enumerate(usdt_symbols):
try:
symbol_data = await self._get_symbol_trading_data(symbol)
if symbol_data:
all_symbols_data.append(symbol_data)
if i % 50 == 0:
print(f" 📈 معالجة {i}/{len(usdt_symbols)} عملة...")
await asyncio.sleep(0.05) # وقت انتظار قصير جداً
except Exception as e:
continue
print(f"✅ تم جمع بيانات {len(all_symbols_data)} عملة")
if not all_symbols_data:
return []
# تصنيف العملات بناءً على مؤشرات هامة
scored_symbols = []
for symbol_data in all_symbols_data:
try:
score = self._calculate_comprehensive_score(symbol_data)
if score > 0: # قبول العملات ذات الدرجة الإيجابية فقط
symbol_data['layer1_score'] = score
scored_symbols.append(symbol_data)
except Exception as e:
continue
# ترتيب العملات حسب الدرجة (من الأعلى للأدنى)
scored_symbols.sort(key=lambda x: x.get('layer1_score', 0), reverse=True)
# أخذ أفضل 200 عملة فقط
top_200 = scored_symbols[:200]
print(f"🎯 تم اختيار أفضل {len(top_200)} عملة للطبقة 2")
# عرض أفضل 15 عملة
print("🏆 أفضل 15 عملة من الطبقة 1:")
for i, symbol_data in enumerate(top_200[:15]):
score = symbol_data.get('layer1_score', 0)
volume = symbol_data.get('dollar_volume', 0)
change = symbol_data.get('price_change_24h', 0)
volatility = symbol_data.get('volatility_score', 0)
print(f" {i+1:2d}. {symbol_data['symbol']}: {score:.3f} | ${volume:>8,.0f} | {change:>+6.1f}% | تقلب: {volatility:.3f}")
return top_200
async def _get_symbol_trading_data(self, symbol: str) -> Dict[str, Any]:
"""جلب بيانات التداول لرمز واحد"""
try:
ticker = self.exchange.fetch_ticker(symbol)
if not ticker:
return None
current_price = ticker.get('last', 0)
volume_24h = ticker.get('baseVolume', 0)
dollar_volume = volume_24h * current_price
price_change_24h = ticker.get('percentage', 0) or 0
high_24h = ticker.get('high', 0)
low_24h = ticker.get('low', 0)
open_price = ticker.get('open', 0)
# حساب مؤشرات إضافية هامة
volatility = self._calculate_volatility_score(high_24h, low_24h, current_price)
volume_trend = self._calculate_volume_trend(dollar_volume)
price_strength = self._calculate_price_strength(current_price, open_price, price_change_24h)
return {
'symbol': symbol,
'current_price': current_price,
'volume_24h': volume_24h,
'dollar_volume': dollar_volume,
'price_change_24h': price_change_24h,
'high_24h': high_24h,
'low_24h': low_24h,
'open_price': open_price,
'volatility_score': volatility,
'volume_trend': volume_trend,
'price_strength': price_strength,
'reasons': []
}
except Exception as e:
return None
def _calculate_comprehensive_score(self, symbol_data: Dict[str, Any]) -> float:
"""حساب درجة شاملة تعتمد على مؤشرات هامة للربحية"""
dollar_volume = symbol_data.get('dollar_volume', 0)
price_change = symbol_data.get('price_change_24h', 0)
volatility = symbol_data.get('volatility_score', 0)
volume_trend = symbol_data.get('volume_trend', 0)
price_strength = symbol_data.get('price_strength', 0)
# 1. السيولة والحجم (35%) - الأهم لتجنب العملات الميتة
if dollar_volume < 100000: # أقل من 100K دولار - رفض
return 0
volume_score = 0
if dollar_volume >= 10000000: # 10M+
volume_score = 1.0
elif dollar_volume >= 5000000: # 5M+
volume_score = 0.9
elif dollar_volume >= 2000000: # 2M+
volume_score = 0.8
elif dollar_volume >= 1000000: # 1M+
volume_score = 0.7
elif dollar_volume >= 500000: # 500K+
volume_score = 0.6
elif dollar_volume >= 250000: # 250K+
volume_score = 0.5
elif dollar_volume >= 100000: # 100K+
volume_score = 0.4
# 2. الزخم السعري (25%) - البحث عن حركة قوية
momentum_score = 0
if price_change >= 20: # +20%+ - قوي جداً
momentum_score = 1.0
elif price_change >= 15: # +15%+
momentum_score = 0.9
elif price_change >= 10: # +10%+
momentum_score = 0.8
elif price_change >= 5: # +5%+
momentum_score = 0.7
elif price_change >= 2: # +2%+
momentum_score = 0.6
elif price_change >= 0: # موجب
momentum_score = 0.5
elif price_change >= -5: # حتى -5% (فرصة شراء)
momentum_score = 0.4
elif price_change >= -10: # حتى -10%
momentum_score = 0.3
else: # أكثر من -10% - تجنب
momentum_score = 0.1
# 3. التقلب المناسب (20%) - ليس عالي جداً ولا منخفض جداً
volatility_score = 0
if 0.02 <= volatility <= 0.15: # تقلب مثالي 2%-15%
volatility_score = 1.0
elif 0.01 <= volatility <= 0.20: # مقبول 1%-20%
volatility_score = 0.8
elif volatility <= 0.01: # قليل جداً
volatility_score = 0.3
elif volatility > 0.20: # عالي جداً
volatility_score = 0.2
# 4. قوة السعر (20%) - مزيج من الاتجاه والقوة
strength_score = price_strength
# الدرجة النهائية
final_score = (
volume_score * 0.35 +
momentum_score * 0.25 +
volatility_score * 0.20 +
strength_score * 0.20
)
# تحديث أسباب الترشيح
reasons = []
if volume_score >= 0.7:
reasons.append('high_liquidity')
if momentum_score >= 0.7:
reasons.append('strong_momentum')
if volatility_score >= 0.8:
reasons.append('optimal_volatility')
if strength_score >= 0.7:
reasons.append('price_strength')
symbol_data['reasons'] = reasons
return final_score
def _calculate_volatility_score(self, high_24h: float, low_24h: float, current_price: float) -> float:
"""حساب درجة التقلب"""
if current_price == 0:
return 0
return (high_24h - low_24h) / current_price
def _calculate_volume_trend(self, dollar_volume: float) -> float:
"""حساب اتجاه الحجم"""
if dollar_volume >= 10000000:
return 1.0
elif dollar_volume >= 5000000:
return 0.8
elif dollar_volume >= 1000000:
return 0.6
elif dollar_volume >= 500000:
return 0.4
elif dollar_volume >= 100000:
return 0.2
else:
return 0.1
def _calculate_price_strength(self, current_price: float, open_price: float, price_change: float) -> float:
"""حساب قوة السعر"""
if open_price == 0:
return 0.5
# قوة السعر تعتمد على المسافة من سعر الافتتاح ونسبة التغير
distance_from_open = abs(current_price - open_price) / open_price
change_strength = min(abs(price_change) / 50, 1.0) # تطبيع قوة التغير
return (distance_from_open * 0.6 + change_strength * 0.4)
async def get_ohlcv_data_for_symbols(self, symbols: List[str]) -> List[Dict[str, Any]]:
"""
جلب بيانات OHLCV كاملة للرموز المحددة مع جميع الإطارات الزمنية الـ6
"""
results = []
print(f"📊 جلب بيانات OHLCV كاملة لـ {len(symbols)} عملة...")
print(" 📈 الإطارات الزمنية: 5m, 15m, 1h, 4h, 1d, 1w")
for i, symbol in enumerate(symbols):
try:
ohlcv_data = await self._fetch_complete_ohlcv(symbol)
if ohlcv_data:
results.append(ohlcv_data)
print(f" ✅ ({i+1}/{len(symbols)}) تم جلب بيانات {symbol}")
else:
print(f" ❌ ({i+1}/{len(symbols)}) فشل جلب بيانات {symbol}")
# وقت انتظار لتجنب rate limits
await asyncio.sleep(0.3)
except Exception as symbol_error:
print(f" ❌ ({i+1}/{len(symbols)}) خطأ في {symbol}: {symbol_error}")
continue
print(f"✅ تم تجميع بيانات OHLCV كاملة لـ {len(results)} عملة")
return results
async def _fetch_complete_ohlcv(self, symbol: str) -> Dict[str, Any]:
"""جلب بيانات OHLCV كاملة مع جميع الإطارات الزمنية الـ6"""
try:
ohlcv_data = {}
# جميع الإطارات الزمنية الـ6 المطلوبة
timeframes = [
('5m', 100), # 5 دقائق - 100 شمعة
('15m', 100), # 15 دقيقة - 100 شمعة
('1h', 100), # 1 ساعة - 100 شمعة
('4h', 100), # 4 ساعات - 100 شمعة
('1d', 100), # 1 يوم - 100 شمعة
('1w', 50), # 1 أسبوع - 50 شمعة
]
has_sufficient_data = True
for timeframe, limit in timeframes:
try:
# استخدام fetch_ohlcv العادي (ليس pro)
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
if ohlcv and len(ohlcv) >= 20: # تأكد من وجود بيانات كافية
ohlcv_data[timeframe] = ohlcv
else:
print(f" ⚠️ بيانات غير كافية لـ {symbol} على {timeframe}")
has_sufficient_data = False
break
except Exception as e:
print(f" ⚠️ خطأ في {symbol} على {timeframe}: {e}")
has_sufficient_data = False
break
if has_sufficient_data and ohlcv_data:
# جلب السعر الحالي
try:
ticker = self.exchange.fetch_ticker(symbol)
current_price = ticker.get('last', 0) if ticker else 0
return {
'symbol': symbol,
'ohlcv': ohlcv_data,
'current_price': current_price,
'timestamp': datetime.now().isoformat()
}
except Exception as price_error:
print(f" ⚠️ خطأ في جلب السعر لـ {symbol}: {price_error}")
return None
else:
return None
except Exception as e:
print(f" ❌ خطأ عام في {symbol}: {e}")
return None
async def get_latest_price_async(self, symbol):
"""جلب السعر الحالي لعملة محددة"""
try:
if not self.exchange:
return None
ticker = self.exchange.fetch_ticker(symbol)
current_price = ticker.get('last')
if current_price:
return float(current_price)
else:
return None
except Exception as e:
print(f"❌ خطأ في جلب السعر لـ {symbol}: {e}")
return None
async def get_available_symbols(self):
"""الحصول على جميع الرموز المتاحة"""
try:
if not self.exchange:
return []
if not self.market_cache:
await self._load_markets()
usdt_symbols = [
symbol for symbol in self.market_cache.keys()
if symbol.endswith('/USDT') and self.market_cache[symbol].get('active', False)
]
return usdt_symbols
except Exception as e:
print(f"❌ خطأ في جلب الرموز المتاحة: {e}")
return []
async def validate_symbol(self, symbol):
"""التحقق من صحة الرمز"""
try:
if not self.exchange:
return False
if not self.market_cache:
await self._load_markets()
return symbol in self.market_cache and self.market_cache[symbol].get('active', False)
except Exception as e:
print(f"❌ خطأ في التحقق من الرمز {symbol}: {e}")
return False
print("✅ DataManager loaded - CCXT Standard with Top 200 Symbols & Full OHLCV Timeframes")