Spaces:
Running
Running
| # ml_engine/data_manager.py | |
| # (V11.0 - Full Real Data Integration: XGBoost 1h + Ranker + MC in Layer 1.1) | |
| import os | |
| import asyncio | |
| import httpx | |
| import traceback | |
| import time | |
| from datetime import datetime | |
| import ccxt.async_support as ccxt | |
| import numpy as np | |
| import logging | |
| from typing import List, Dict, Any | |
| import pandas as pd | |
| # محاولة استيراد pandas_ta (ضرورية للمؤشرات) | |
| try: | |
| import pandas_ta as ta | |
| except ImportError: | |
| print("❌ [DataManager] مكتبة pandas_ta غير موجودة. النظام لن يعمل بشكل صحيح.") | |
| ta = None | |
| # استيراد المحركات التحليلية | |
| from ml_engine.indicators import AdvancedTechnicalAnalyzer | |
| from ml_engine.monte_carlo import MonteCarloAnalyzer | |
| from ml_engine.patterns import ChartPatternAnalyzer # المحرك الجديد V9.0 | |
| from ml_engine.ranker import Layer1Ranker | |
| # تقليل ضجيج السجلات للمكتبات الخارجية | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("httpcore").setLevel(logging.WARNING) | |
| logging.getLogger("ccxt").setLevel(logging.WARNING) | |
| class DataManager: | |
| def __init__(self, contracts_db, whale_monitor, r2_service=None): | |
| self.contracts_db = contracts_db or {} | |
| self.whale_monitor = whale_monitor | |
| self.r2_service = r2_service | |
| # إعداد عميل KuCoin غير المتزامن (لبيانات حقيقية فقط) | |
| self.exchange = ccxt.kucoin({ | |
| 'enableRateLimit': True, | |
| 'timeout': 30000, # مهلة 30 ثانية | |
| }) | |
| self.http_client = None | |
| self.market_cache = {} | |
| self.last_market_load = None | |
| # تهيئة المحركات التحليلية | |
| self.technical_analyzer = AdvancedTechnicalAnalyzer() | |
| self.mc_analyzer = MonteCarloAnalyzer() | |
| self.pattern_analyzer = None # سيتم تهيئته في initialize | |
| self.layer1_ranker = None # سيتم تهيئته في initialize | |
| async def initialize(self): | |
| """تهيئة الاتصالات وتحميل نماذج الذكاء الاصطناعي""" | |
| self.http_client = httpx.AsyncClient(timeout=30.0) | |
| await self._load_markets() | |
| print(" > [DataManager V11.0] تهيئة محرك الأنماط (V9.0 XGBoost)...") | |
| try: | |
| self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service) | |
| await self.pattern_analyzer.initialize() | |
| except Exception as e: | |
| print(f"❌ [DataManager] فشل تهيئة محرك الأنماط V9.0: {e}") | |
| print(" > [DataManager V11.0] تهيئة الكاشف المصغر (Layer1 Ranker)...") | |
| try: | |
| self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm") | |
| await self.layer1_ranker.initialize() | |
| except Exception as e: | |
| print(f"❌ [DataManager] فشل تهيئة الرانكر: {e}") | |
| print("✅ DataManager initialized - V11.0 (Real Data Only)") | |
| async def _load_markets(self): | |
| """تحميل بيانات الأسواق المتاحة من المنصة""" | |
| try: | |
| if self.exchange: | |
| await self.exchange.load_markets() | |
| self.market_cache = self.exchange.markets | |
| self.last_market_load = datetime.now() | |
| except Exception as e: | |
| print(f"❌ [DataManager] فشل تحميل الأسواق من KuCoin: {e}") | |
| async def close(self): | |
| """إغلاق الاتصالات بأمان عند إيقاف النظام""" | |
| if self.http_client: | |
| await self.http_client.aclose() | |
| if self.exchange: | |
| await self.exchange.close() | |
| async def get_market_context_async(self): | |
| """جلب نظرة عامة حية على السوق (أسعار BTC/ETH ومؤشر الخوف والجشع)""" | |
| try: | |
| sentiment_data = await self.get_sentiment_live_async() | |
| price_data = await self._get_prices_live() | |
| bitcoin_price = price_data.get('bitcoin') | |
| ethereum_price = price_data.get('ethereum') | |
| return { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'bitcoin_price_usd': bitcoin_price, | |
| 'ethereum_price_usd': ethereum_price, | |
| 'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None, | |
| 'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'NEUTRAL', | |
| 'market_trend': self._determine_market_trend(bitcoin_price, sentiment_data), | |
| 'btc_sentiment': self._get_btc_sentiment(bitcoin_price), | |
| 'data_quality': 'HIGH' if bitcoin_price and ethereum_price else 'LOW' | |
| } | |
| except Exception as e: | |
| print(f"⚠️ [DataManager] خطأ في جلب سياق السوق: {e}") | |
| return self._get_minimal_market_context() | |
| async def get_sentiment_live_async(self): | |
| """جلب مؤشر الخوف والجشع من API خارجي حي""" | |
| try: | |
| async with httpx.AsyncClient(timeout=10) as client: | |
| response = await client.get("https://api.alternative.me/fng/") | |
| if response.status_code == 200: | |
| data = response.json() | |
| if 'data' in data and len(data['data']) > 0: | |
| latest = data['data'][0] | |
| return { | |
| "feargreed_value": int(latest['value']), | |
| "feargreed_class": latest['value_classification'] | |
| } | |
| return None | |
| except Exception: | |
| return None | |
| async def _get_prices_live(self): | |
| """جلب أسعار BTC و ETH الحية من المنصة""" | |
| if not self.exchange: | |
| return {'bitcoin': None, 'ethereum': None} | |
| try: | |
| # جلب السعرين بالتوازي لتقليل وقت الانتظار | |
| btc_task = asyncio.create_task(self.exchange.fetch_ticker('BTC/USDT')) | |
| eth_task = asyncio.create_task(self.exchange.fetch_ticker('ETH/USDT')) | |
| btc_ticker, eth_ticker = await asyncio.gather(btc_task, eth_task, return_exceptions=True) | |
| btc_price = btc_ticker['last'] if isinstance(btc_ticker, dict) else None | |
| eth_price = eth_ticker['last'] if isinstance(eth_ticker, dict) else None | |
| return {'bitcoin': btc_price, 'ethereum': eth_price} | |
| except Exception: | |
| return {'bitcoin': None, 'ethereum': None} | |
| def _determine_market_trend(self, btc_price, sentiment): | |
| """تحديد اتجاه السوق بناءً على قواعد سعرية ومشاعرية (قابلة للتعديل)""" | |
| if not btc_price: | |
| return "UNKNOWN" | |
| score = 0 | |
| # قواعد بسيطة لتحديد الاتجاه (يمكن جعلها ديناميكية أكثر مستقبلاً) | |
| if btc_price > 95000: score += 1 | |
| elif btc_price < 90000: score -= 1 | |
| if sentiment: | |
| fg_val = sentiment.get('feargreed_value', 50) | |
| if fg_val > 65: score += 1 # طمع شديد يدعم الصعود | |
| elif fg_val < 35: score -= 1 # خوف شديد يدعم الهبوط | |
| if score > 0: return "bull_market" | |
| elif score < 0: return "bear_market" | |
| else: return "sideways_market" | |
| def _get_btc_sentiment(self, btc_price): | |
| if not btc_price: return "UNKNOWN" | |
| # تصنيف بسيط بناء على مستويات سعرية | |
| if btc_price > 95000: return "BULLISH" | |
| elif btc_price < 90000: return "BEARISH" | |
| else: return "NEUTRAL" | |
| def _get_minimal_market_context(self): | |
| """سياق احتياطي في حال فشل الاتصال""" | |
| return { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'data_available': False, | |
| 'market_trend': 'UNKNOWN', | |
| 'data_quality': 'LOW' | |
| } | |
| def _create_dataframe(self, candles: List) -> pd.DataFrame: | |
| """تحويل قائمة الشموع الخام إلى Pandas DataFrame للمعالجة""" | |
| if not candles: | |
| return pd.DataFrame() | |
| try: | |
| df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float) | |
| df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') | |
| df.set_index('timestamp', inplace=True) | |
| df.sort_index(inplace=True) | |
| # إزالة أي صفوف مكررة قد تنتج عن مشاكل في API | |
| df = df[~df.index.duplicated(keep='last')] | |
| return df | |
| except Exception as e: | |
| print(f"⚠️ [DataManager] خطأ في إنشاء DataFrame: {e}") | |
| return pd.DataFrame() | |
| # ================================================================== | |
| # 🔴 الطبقة 1.1: الغربلة السريعة (V11.0 - Real Data & ML Integrated) | |
| # ================================================================== | |
| async def layer1_rapid_screening(self) -> List[Dict[str, Any]]: | |
| """ | |
| تنفيذ الغربلة الأولية باستخدام بيانات حية و 3 نماذج تقييم. | |
| المعايير: حجم تداول عالي + درجة مجمعة >= 50%. | |
| """ | |
| print("📊 [Layer 1.1] بدء الغربلة الحية (Ranker + XGB 1h + MC)...") | |
| # التحقق من جاهزية المحركات | |
| if not self.layer1_ranker or not self.layer1_ranker.model: | |
| print("⚠️ [Layer 1.1] تحذير: الرانكر (Ranker) غير جاهز.") | |
| if not self.pattern_analyzer or not self.pattern_analyzer.initialized: | |
| print("⚠️ [Layer 1.1] تحذير: محرك الأنماط (XGBoost) غير جاهز.") | |
| # 1. جلب جميع العملات وتصفية أفضل 100 حسب سيولة الدولار (Quote Volume) | |
| volume_data = await self._get_volume_data_live() | |
| if not volume_data: | |
| print("❌ [Layer 1.1] فشل جلب بيانات الأحجام الحية. إيقاف الغربلة.") | |
| return [] | |
| # ترتيب تنازلي حسب السيولة وأخذ أفضل 100 | |
| volume_data.sort(key=lambda x: x['dollar_volume'], reverse=True) | |
| top_100_candidates = volume_data[:100] | |
| print(f" 🔄 تحليل {len(top_100_candidates)} عملة (إطار 1H) بالتوازي...") | |
| final_qualified_candidates = [] | |
| # 2. معالجة العملات على دفعات (لتجنب تجاوز حدود API) | |
| batch_size = 20 # دفعة آمنة لمعظم المنصات | |
| for i in range(0, len(top_100_candidates), batch_size): | |
| batch = top_100_candidates[i:i+batch_size] | |
| # جلب شموع الساعة (1H) لكل عملة في الدفعة بالتوازي | |
| fetch_tasks = [self._fetch_ohlcv_live(c['symbol'], '1h', limit=200) for c in batch] | |
| batch_results = await asyncio.gather(*fetch_tasks, return_exceptions=True) | |
| for j, candles in enumerate(batch_results): | |
| # التحقق من صلاحية البيانات المستلمة | |
| if isinstance(candles, Exception) or not candles or len(candles) < 150: | |
| continue | |
| candidate_data = batch[j] | |
| symbol = candidate_data['symbol'] | |
| try: | |
| # تحويل الشموع إلى DataFrame للحسابات | |
| df_1h = self._create_dataframe(candles) | |
| if df_1h.empty: continue | |
| closes_np = df_1h['close'].to_numpy() | |
| # --- أ. حساب درجة الرانكر (LightGBM) [وزن 40%] --- | |
| ranker_score = 0.0 | |
| if self.layer1_ranker and self.layer1_ranker.model: | |
| # حساب الميزات الذكية التي يحتاجها الرانكر | |
| features = self.technical_analyzer.calculate_v9_smart_features(df_1h) | |
| if features: | |
| # إضافة ميزات مونت كارلو البسيطة للرانكر (يتوقعها النموذج) | |
| mc_simple_for_ranker = self.mc_analyzer.generate_1h_price_distribution_simple(closes_np[-100:]) | |
| features.update(mc_simple_for_ranker) | |
| # التنبؤ | |
| ranker_prob = self.layer1_ranker.predict_proba(pd.DataFrame([features])) | |
| ranker_score = float(ranker_prob[0]) if len(ranker_prob) > 0 else 0.0 | |
| # --- ب. حساب درجة XGBoost 1h (النمط) [وزن 40%] --- | |
| xgb_score = 0.0 | |
| if self.pattern_analyzer and self.pattern_analyzer.initialized: | |
| # نمرر بيانات 1H فقط للمحرك للحصول على نتيجة سريعة | |
| pattern_result = await self.pattern_analyzer.detect_chart_patterns({'1h': candles}) | |
| # استخراج احتمالية الصعود لنموذج 1h | |
| xgb_score = pattern_result.get('details', {}).get('1h') or 0.0 | |
| # --- ج. حساب درجة مونت كارلو (احتمالية الربح) [وزن 20%] --- | |
| mc_score = 0.0 | |
| mc_result = self.mc_analyzer.generate_1h_price_distribution_simple(closes_np[-100:]) | |
| if not mc_result.get('error'): | |
| mc_prob_gain = mc_result.get('mc_prob_gain', 0.5) | |
| # تطبيع الدرجة: 50% احتمال ربح = 0 نقاط، 100% احتمال ربح = 1 نقطة | |
| mc_score = max(0.0, (mc_prob_gain - 0.5) * 2.0) | |
| # --- د. حساب الدرجة النهائية الموزونة --- | |
| final_l1_score = (ranker_score * 0.40) + (xgb_score * 0.40) + (mc_score * 0.20) | |
| # التأهيل إذا تجاوزت العتبة (50%) | |
| if final_l1_score >= 0.50: | |
| candidate_data['layer1_score'] = final_l1_score | |
| # تخزين أسباب الترشح للشفافية | |
| candidate_data['reasons_for_candidacy'] = [ | |
| f"Ranker(40%): {ranker_score:.2f}", | |
| f"XGB_1h(40%): {xgb_score:.2f}", | |
| f"MC_Simple(20%): {mc_score:.2f}" | |
| ] | |
| final_qualified_candidates.append(candidate_data) | |
| except Exception as e: | |
| # طباعة خطأ صامتة لعدم إغراق السجلات | |
| # print(f"⚠️ خطأ في تحليل {symbol}: {e}") | |
| continue | |
| # ترتيب المرشحين النهائيين حسب الدرجة | |
| final_qualified_candidates.sort(key=lambda x: x['layer1_score'], reverse=True) | |
| print(f"🎯 [Layer 1.1] اكتملت الغربلة. تم تأهيل {len(final_qualified_candidates)} عملة للمرحلة القادمة.") | |
| # إرجاع أفضل 20 عملة فقط للتحليل العميق المكلف | |
| return final_qualified_candidates[:20] | |
| async def _fetch_ohlcv_live(self, symbol, timeframe, limit): | |
| """جلب شموع حية من المنصة مع التعامل مع الأخطاء""" | |
| if not self.exchange: return None | |
| try: | |
| return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) | |
| except Exception: | |
| return None | |
| async def _get_volume_data_live(self) -> List[Dict[str, Any]]: | |
| """جلب بيانات الحجم الحية لجميع أزواج USDT""" | |
| if not self.exchange: return [] | |
| try: | |
| # جلب كل الأسعار والأحجام في طلب واحد سريع | |
| tickers = await self.exchange.fetch_tickers() | |
| volume_data = [] | |
| for symbol, ticker in tickers.items(): | |
| # فلترة أزواج USDT النشطة فقط ذات السيولة المقبولة (> 50k) | |
| if (symbol.endswith('/USDT') and | |
| ticker.get('quoteVolume') and | |
| ticker['quoteVolume'] > 50000): | |
| volume_data.append({ | |
| 'symbol': symbol, | |
| 'dollar_volume': float(ticker['quoteVolume']), | |
| 'current_price': float(ticker['last']) if ticker.get('last') else 0.0, | |
| 'price_change_24h': float(ticker['percentage']) if ticker.get('percentage') else 0.0 | |
| }) | |
| return volume_data | |
| except Exception as e: | |
| print(f"❌ [DataManager] فشل جلب بيانات الحجم الحية: {e}") | |
| return [] | |
| async def stream_ohlcv_data(self, symbols: List[Dict[str, Any]], queue: asyncio.Queue): | |
| """ | |
| (للطبقة 1.2) جلب البيانات الكاملة (4 أطر زمنية) للعملات المتأهلة. | |
| يتم إرسال البيانات عبر Queue للمعالجة المتوازية في Processor. | |
| """ | |
| print(f"📊 [Layer 1.2 Producer] بدء جلب البيانات العميقة لـ {len(symbols)} عملة...") | |
| for symbol_data in symbols: | |
| symbol = symbol_data['symbol'] | |
| # جلب الأطر الزمنية الأربعة المطلوبة لمحرك الأنماط الجديد | |
| # (نستخدم 300 شمعة لضمان وجود بيانات كافية للمؤشرات) | |
| tasks = [ | |
| self._fetch_ohlcv_live(symbol, '15m', 300), | |
| self._fetch_ohlcv_live(symbol, '1h', 300), | |
| self._fetch_ohlcv_live(symbol, '4h', 300), | |
| self._fetch_ohlcv_live(symbol, '1d', 300), | |
| ] | |
| # تنفيذ الطلبات الأربعة بالتوازي للعملة الواحدة | |
| results = await asyncio.gather(*tasks, return_exceptions=False) | |
| ohlcv_complete = {} | |
| tfs = ['15m', '1h', '4h', '1d'] | |
| valid_tfs_count = 0 | |
| for i, res in enumerate(results): | |
| # التأكد من أن البيانات صالحة وكافية (على الأقل 200 شمعة للتحليل الدقيق) | |
| if res and isinstance(res, list) and len(res) >= 200: | |
| ohlcv_complete[tfs[i]] = res | |
| valid_tfs_count += 1 | |
| # إرسال العملة للمعالجة فقط إذا توفرت معظم الأطر الزمنية (3 على الأقل) | |
| if valid_tfs_count >= 3: | |
| symbol_data['ohlcv'] = ohlcv_complete | |
| # نضع العملة في قائمة (كدُفعة صغيرة من عنصر واحد) في الطابور | |
| await queue.put([symbol_data]) | |
| else: | |
| # تجاهل العملة لعدم كفاية البيانات | |
| pass | |
| # إرسال إشارة نهاية التدفق للمستهلك (Processor) | |
| await queue.put(None) | |
| print("🏁 [Layer 1.2 Producer] اكتمل تدفق البيانات.") | |
| async def get_symbol_daily_volume(self, symbol): | |
| """جلب حجم التداول اليومي لعملة محددة (يستخدم عند إعادة التحليل)""" | |
| try: | |
| if self.exchange: | |
| ticker = await self.exchange.fetch_ticker(symbol) | |
| return float(ticker['quoteVolume']) if ticker and ticker.get('quoteVolume') else 0.0 | |
| return 0.0 | |
| except: return 0.0 | |
| async def get_whale_data_for_symbol(self, symbol, daily_volume_usd=0): | |
| """واجهة لجلب بيانات الحيتان من المراقب (مع تمرير الحجم اليومي للتحليل النسبي)""" | |
| if self.whale_monitor: | |
| return await self.whale_monitor.get_symbol_whale_activity(symbol, daily_volume_usd) | |
| return None | |
| # طباعة رسالة تأكيد عند استيراد الملف بنجاح | |
| print("✅ DataManager loaded - V11.0 (Full Real Data Integration)") |