Trad / data_manager.py
Riy777's picture
Update data_manager.py
2bf9457
raw
history blame
20.9 kB
# ml_engine/data_manager.py
# (V11.0 - Full Real Data Integration: XGBoost 1h + Ranker + MC in Layer 1.1)
import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt.async_support as ccxt
import numpy as np
import logging
from typing import List, Dict, Any
import pandas as pd
# محاولة استيراد pandas_ta (ضرورية للمؤشرات)
try:
import pandas_ta as ta
except ImportError:
print("❌ [DataManager] مكتبة pandas_ta غير موجودة. النظام لن يعمل بشكل صحيح.")
ta = None
# استيراد المحركات التحليلية
from ml_engine.indicators import AdvancedTechnicalAnalyzer
from ml_engine.monte_carlo import MonteCarloAnalyzer
from ml_engine.patterns import ChartPatternAnalyzer # المحرك الجديد V9.0
from ml_engine.ranker import Layer1Ranker
# تقليل ضجيج السجلات للمكتبات الخارجية
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)
class DataManager:
def __init__(self, contracts_db, whale_monitor, r2_service=None):
self.contracts_db = contracts_db or {}
self.whale_monitor = whale_monitor
self.r2_service = r2_service
# إعداد عميل KuCoin غير المتزامن (لبيانات حقيقية فقط)
self.exchange = ccxt.kucoin({
'enableRateLimit': True,
'timeout': 30000, # مهلة 30 ثانية
})
self.http_client = None
self.market_cache = {}
self.last_market_load = None
# تهيئة المحركات التحليلية
self.technical_analyzer = AdvancedTechnicalAnalyzer()
self.mc_analyzer = MonteCarloAnalyzer()
self.pattern_analyzer = None # سيتم تهيئته في initialize
self.layer1_ranker = None # سيتم تهيئته في initialize
async def initialize(self):
"""تهيئة الاتصالات وتحميل نماذج الذكاء الاصطناعي"""
self.http_client = httpx.AsyncClient(timeout=30.0)
await self._load_markets()
print(" > [DataManager V11.0] تهيئة محرك الأنماط (V9.0 XGBoost)...")
try:
self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
await self.pattern_analyzer.initialize()
except Exception as e:
print(f"❌ [DataManager] فشل تهيئة محرك الأنماط V9.0: {e}")
print(" > [DataManager V11.0] تهيئة الكاشف المصغر (Layer1 Ranker)...")
try:
self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
await self.layer1_ranker.initialize()
except Exception as e:
print(f"❌ [DataManager] فشل تهيئة الرانكر: {e}")
print("✅ DataManager initialized - V11.0 (Real Data Only)")
async def _load_markets(self):
"""تحميل بيانات الأسواق المتاحة من المنصة"""
try:
if self.exchange:
await self.exchange.load_markets()
self.market_cache = self.exchange.markets
self.last_market_load = datetime.now()
except Exception as e:
print(f"❌ [DataManager] فشل تحميل الأسواق من KuCoin: {e}")
async def close(self):
"""إغلاق الاتصالات بأمان عند إيقاف النظام"""
if self.http_client:
await self.http_client.aclose()
if self.exchange:
await self.exchange.close()
async def get_market_context_async(self):
"""جلب نظرة عامة حية على السوق (أسعار BTC/ETH ومؤشر الخوف والجشع)"""
try:
sentiment_data = await self.get_sentiment_live_async()
price_data = await self._get_prices_live()
bitcoin_price = price_data.get('bitcoin')
ethereum_price = price_data.get('ethereum')
return {
'timestamp': datetime.now().isoformat(),
'bitcoin_price_usd': bitcoin_price,
'ethereum_price_usd': ethereum_price,
'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None,
'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'NEUTRAL',
'market_trend': self._determine_market_trend(bitcoin_price, sentiment_data),
'btc_sentiment': self._get_btc_sentiment(bitcoin_price),
'data_quality': 'HIGH' if bitcoin_price and ethereum_price else 'LOW'
}
except Exception as e:
print(f"⚠️ [DataManager] خطأ في جلب سياق السوق: {e}")
return self._get_minimal_market_context()
async def get_sentiment_live_async(self):
"""جلب مؤشر الخوف والجشع من API خارجي حي"""
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get("https://api.alternative.me/fng/")
if response.status_code == 200:
data = response.json()
if 'data' in data and len(data['data']) > 0:
latest = data['data'][0]
return {
"feargreed_value": int(latest['value']),
"feargreed_class": latest['value_classification']
}
return None
except Exception:
return None
async def _get_prices_live(self):
"""جلب أسعار BTC و ETH الحية من المنصة"""
if not self.exchange:
return {'bitcoin': None, 'ethereum': None}
try:
# جلب السعرين بالتوازي لتقليل وقت الانتظار
btc_task = asyncio.create_task(self.exchange.fetch_ticker('BTC/USDT'))
eth_task = asyncio.create_task(self.exchange.fetch_ticker('ETH/USDT'))
btc_ticker, eth_ticker = await asyncio.gather(btc_task, eth_task, return_exceptions=True)
btc_price = btc_ticker['last'] if isinstance(btc_ticker, dict) else None
eth_price = eth_ticker['last'] if isinstance(eth_ticker, dict) else None
return {'bitcoin': btc_price, 'ethereum': eth_price}
except Exception:
return {'bitcoin': None, 'ethereum': None}
def _determine_market_trend(self, btc_price, sentiment):
"""تحديد اتجاه السوق بناءً على قواعد سعرية ومشاعرية (قابلة للتعديل)"""
if not btc_price:
return "UNKNOWN"
score = 0
# قواعد بسيطة لتحديد الاتجاه (يمكن جعلها ديناميكية أكثر مستقبلاً)
if btc_price > 95000: score += 1
elif btc_price < 90000: score -= 1
if sentiment:
fg_val = sentiment.get('feargreed_value', 50)
if fg_val > 65: score += 1 # طمع شديد يدعم الصعود
elif fg_val < 35: score -= 1 # خوف شديد يدعم الهبوط
if score > 0: return "bull_market"
elif score < 0: return "bear_market"
else: return "sideways_market"
def _get_btc_sentiment(self, btc_price):
if not btc_price: return "UNKNOWN"
# تصنيف بسيط بناء على مستويات سعرية
if btc_price > 95000: return "BULLISH"
elif btc_price < 90000: return "BEARISH"
else: return "NEUTRAL"
def _get_minimal_market_context(self):
"""سياق احتياطي في حال فشل الاتصال"""
return {
'timestamp': datetime.now().isoformat(),
'data_available': False,
'market_trend': 'UNKNOWN',
'data_quality': 'LOW'
}
def _create_dataframe(self, candles: List) -> pd.DataFrame:
"""تحويل قائمة الشموع الخام إلى Pandas DataFrame للمعالجة"""
if not candles:
return pd.DataFrame()
try:
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
# إزالة أي صفوف مكررة قد تنتج عن مشاكل في API
df = df[~df.index.duplicated(keep='last')]
return df
except Exception as e:
print(f"⚠️ [DataManager] خطأ في إنشاء DataFrame: {e}")
return pd.DataFrame()
# ==================================================================
# 🔴 الطبقة 1.1: الغربلة السريعة (V11.0 - Real Data & ML Integrated)
# ==================================================================
async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
"""
تنفيذ الغربلة الأولية باستخدام بيانات حية و 3 نماذج تقييم.
المعايير: حجم تداول عالي + درجة مجمعة >= 50%.
"""
print("📊 [Layer 1.1] بدء الغربلة الحية (Ranker + XGB 1h + MC)...")
# التحقق من جاهزية المحركات
if not self.layer1_ranker or not self.layer1_ranker.model:
print("⚠️ [Layer 1.1] تحذير: الرانكر (Ranker) غير جاهز.")
if not self.pattern_analyzer or not self.pattern_analyzer.initialized:
print("⚠️ [Layer 1.1] تحذير: محرك الأنماط (XGBoost) غير جاهز.")
# 1. جلب جميع العملات وتصفية أفضل 100 حسب سيولة الدولار (Quote Volume)
volume_data = await self._get_volume_data_live()
if not volume_data:
print("❌ [Layer 1.1] فشل جلب بيانات الأحجام الحية. إيقاف الغربلة.")
return []
# ترتيب تنازلي حسب السيولة وأخذ أفضل 100
volume_data.sort(key=lambda x: x['dollar_volume'], reverse=True)
top_100_candidates = volume_data[:100]
print(f" 🔄 تحليل {len(top_100_candidates)} عملة (إطار 1H) بالتوازي...")
final_qualified_candidates = []
# 2. معالجة العملات على دفعات (لتجنب تجاوز حدود API)
batch_size = 20 # دفعة آمنة لمعظم المنصات
for i in range(0, len(top_100_candidates), batch_size):
batch = top_100_candidates[i:i+batch_size]
# جلب شموع الساعة (1H) لكل عملة في الدفعة بالتوازي
fetch_tasks = [self._fetch_ohlcv_live(c['symbol'], '1h', limit=200) for c in batch]
batch_results = await asyncio.gather(*fetch_tasks, return_exceptions=True)
for j, candles in enumerate(batch_results):
# التحقق من صلاحية البيانات المستلمة
if isinstance(candles, Exception) or not candles or len(candles) < 150:
continue
candidate_data = batch[j]
symbol = candidate_data['symbol']
try:
# تحويل الشموع إلى DataFrame للحسابات
df_1h = self._create_dataframe(candles)
if df_1h.empty: continue
closes_np = df_1h['close'].to_numpy()
# --- أ. حساب درجة الرانكر (LightGBM) [وزن 40%] ---
ranker_score = 0.0
if self.layer1_ranker and self.layer1_ranker.model:
# حساب الميزات الذكية التي يحتاجها الرانكر
features = self.technical_analyzer.calculate_v9_smart_features(df_1h)
if features:
# إضافة ميزات مونت كارلو البسيطة للرانكر (يتوقعها النموذج)
mc_simple_for_ranker = self.mc_analyzer.generate_1h_price_distribution_simple(closes_np[-100:])
features.update(mc_simple_for_ranker)
# التنبؤ
ranker_prob = self.layer1_ranker.predict_proba(pd.DataFrame([features]))
ranker_score = float(ranker_prob[0]) if len(ranker_prob) > 0 else 0.0
# --- ب. حساب درجة XGBoost 1h (النمط) [وزن 40%] ---
xgb_score = 0.0
if self.pattern_analyzer and self.pattern_analyzer.initialized:
# نمرر بيانات 1H فقط للمحرك للحصول على نتيجة سريعة
pattern_result = await self.pattern_analyzer.detect_chart_patterns({'1h': candles})
# استخراج احتمالية الصعود لنموذج 1h
xgb_score = pattern_result.get('details', {}).get('1h') or 0.0
# --- ج. حساب درجة مونت كارلو (احتمالية الربح) [وزن 20%] ---
mc_score = 0.0
mc_result = self.mc_analyzer.generate_1h_price_distribution_simple(closes_np[-100:])
if not mc_result.get('error'):
mc_prob_gain = mc_result.get('mc_prob_gain', 0.5)
# تطبيع الدرجة: 50% احتمال ربح = 0 نقاط، 100% احتمال ربح = 1 نقطة
mc_score = max(0.0, (mc_prob_gain - 0.5) * 2.0)
# --- د. حساب الدرجة النهائية الموزونة ---
final_l1_score = (ranker_score * 0.40) + (xgb_score * 0.40) + (mc_score * 0.20)
# التأهيل إذا تجاوزت العتبة (50%)
if final_l1_score >= 0.50:
candidate_data['layer1_score'] = final_l1_score
# تخزين أسباب الترشح للشفافية
candidate_data['reasons_for_candidacy'] = [
f"Ranker(40%): {ranker_score:.2f}",
f"XGB_1h(40%): {xgb_score:.2f}",
f"MC_Simple(20%): {mc_score:.2f}"
]
final_qualified_candidates.append(candidate_data)
except Exception as e:
# طباعة خطأ صامتة لعدم إغراق السجلات
# print(f"⚠️ خطأ في تحليل {symbol}: {e}")
continue
# ترتيب المرشحين النهائيين حسب الدرجة
final_qualified_candidates.sort(key=lambda x: x['layer1_score'], reverse=True)
print(f"🎯 [Layer 1.1] اكتملت الغربلة. تم تأهيل {len(final_qualified_candidates)} عملة للمرحلة القادمة.")
# إرجاع أفضل 20 عملة فقط للتحليل العميق المكلف
return final_qualified_candidates[:20]
async def _fetch_ohlcv_live(self, symbol, timeframe, limit):
"""جلب شموع حية من المنصة مع التعامل مع الأخطاء"""
if not self.exchange: return None
try:
return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
except Exception:
return None
async def _get_volume_data_live(self) -> List[Dict[str, Any]]:
"""جلب بيانات الحجم الحية لجميع أزواج USDT"""
if not self.exchange: return []
try:
# جلب كل الأسعار والأحجام في طلب واحد سريع
tickers = await self.exchange.fetch_tickers()
volume_data = []
for symbol, ticker in tickers.items():
# فلترة أزواج USDT النشطة فقط ذات السيولة المقبولة (> 50k)
if (symbol.endswith('/USDT') and
ticker.get('quoteVolume') and
ticker['quoteVolume'] > 50000):
volume_data.append({
'symbol': symbol,
'dollar_volume': float(ticker['quoteVolume']),
'current_price': float(ticker['last']) if ticker.get('last') else 0.0,
'price_change_24h': float(ticker['percentage']) if ticker.get('percentage') else 0.0
})
return volume_data
except Exception as e:
print(f"❌ [DataManager] فشل جلب بيانات الحجم الحية: {e}")
return []
async def stream_ohlcv_data(self, symbols: List[Dict[str, Any]], queue: asyncio.Queue):
"""
(للطبقة 1.2) جلب البيانات الكاملة (4 أطر زمنية) للعملات المتأهلة.
يتم إرسال البيانات عبر Queue للمعالجة المتوازية في Processor.
"""
print(f"📊 [Layer 1.2 Producer] بدء جلب البيانات العميقة لـ {len(symbols)} عملة...")
for symbol_data in symbols:
symbol = symbol_data['symbol']
# جلب الأطر الزمنية الأربعة المطلوبة لمحرك الأنماط الجديد
# (نستخدم 300 شمعة لضمان وجود بيانات كافية للمؤشرات)
tasks = [
self._fetch_ohlcv_live(symbol, '15m', 300),
self._fetch_ohlcv_live(symbol, '1h', 300),
self._fetch_ohlcv_live(symbol, '4h', 300),
self._fetch_ohlcv_live(symbol, '1d', 300),
]
# تنفيذ الطلبات الأربعة بالتوازي للعملة الواحدة
results = await asyncio.gather(*tasks, return_exceptions=False)
ohlcv_complete = {}
tfs = ['15m', '1h', '4h', '1d']
valid_tfs_count = 0
for i, res in enumerate(results):
# التأكد من أن البيانات صالحة وكافية (على الأقل 200 شمعة للتحليل الدقيق)
if res and isinstance(res, list) and len(res) >= 200:
ohlcv_complete[tfs[i]] = res
valid_tfs_count += 1
# إرسال العملة للمعالجة فقط إذا توفرت معظم الأطر الزمنية (3 على الأقل)
if valid_tfs_count >= 3:
symbol_data['ohlcv'] = ohlcv_complete
# نضع العملة في قائمة (كدُفعة صغيرة من عنصر واحد) في الطابور
await queue.put([symbol_data])
else:
# تجاهل العملة لعدم كفاية البيانات
pass
# إرسال إشارة نهاية التدفق للمستهلك (Processor)
await queue.put(None)
print("🏁 [Layer 1.2 Producer] اكتمل تدفق البيانات.")
async def get_symbol_daily_volume(self, symbol):
"""جلب حجم التداول اليومي لعملة محددة (يستخدم عند إعادة التحليل)"""
try:
if self.exchange:
ticker = await self.exchange.fetch_ticker(symbol)
return float(ticker['quoteVolume']) if ticker and ticker.get('quoteVolume') else 0.0
return 0.0
except: return 0.0
async def get_whale_data_for_symbol(self, symbol, daily_volume_usd=0):
"""واجهة لجلب بيانات الحيتان من المراقب (مع تمرير الحجم اليومي للتحليل النسبي)"""
if self.whale_monitor:
return await self.whale_monitor.get_symbol_whale_activity(symbol, daily_volume_usd)
return None
# طباعة رسالة تأكيد عند استيراد الملف بنجاح
print("✅ DataManager loaded - V11.0 (Full Real Data Integration)")