File size: 20,851 Bytes
2bf9457
 
 
00bb5c9
 
 
 
 
 
2bf9457
d69dead
28fa18b
394e2c7
164b380
 
2bf9457
164b380
 
 
2bf9457
164b380
11b4dc5
2bf9457
11b4dc5
2bf9457
 
 
53cf6c0
2bf9457
28fa18b
 
2bf9457
53cf6c0
 
6b00681
53cf6c0
d2775f3
4ace337
c6f72fe
2bf9457
 
 
 
 
c6f72fe
87e3669
56e3f87
 
11b4dc5
2bf9457
11b4dc5
2bf9457
 
 
d69dead
53cf6c0
2bf9457
248e033
56e3f87
6b00681
2bf9457
6b00681
 
4ace337
6b00681
2bf9457
4ace337
2bf9457
4ace337
2bf9457
4ace337
 
2bf9457
6b00681
2bf9457
24a0949
56e3f87
2bf9457
56e3f87
2bf9457
 
 
 
 
 
56e3f87
53cf6c0
2bf9457
 
 
b866e29
2bf9457
4ace337
24a0949
2bf9457
d69dead
2bf9457
 
 
 
 
 
4ace337
24a0949
2bf9457
 
24a0949
 
 
 
 
 
2bf9457
 
 
d69dead
2bf9457
 
d69dead
248e033
24a0949
2bf9457
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ea38153
2bf9457
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53cf6c0
2bf9457
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185f2b0
11b4dc5
2bf9457
 
 
11b4dc5
 
 
 
 
 
2bf9457
 
11b4dc5
 
2bf9457
11b4dc5
 
2bf9457
 
 
24a0949
 
2bf9457
 
24a0949
2bf9457
79a9e95
2bf9457
 
 
 
 
4ace337
2bf9457
 
79a9e95
2bf9457
79a9e95
2bf9457
 
79a9e95
2bf9457
79a9e95
2bf9457
 
 
 
 
 
 
164b380
2bf9457
 
 
11b4dc5
2bf9457
 
 
185f2b0
 
2bf9457
 
185f2b0
2bf9457
 
 
 
 
185f2b0
2bf9457
 
 
 
 
 
 
 
 
 
 
 
4ace337
2bf9457
 
 
 
 
 
 
164b380
2bf9457
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
164b380
2bf9457
 
 
164b380
2bf9457
 
 
164b380
2bf9457
 
 
b44825a
2bf9457
 
6690430
b44825a
2bf9457
 
 
 
6690430
2bf9457
 
 
 
 
 
b44825a
6690430
2bf9457
6690430
2bf9457
3fd2d9a
2bf9457
 
 
 
 
6690430
2bf9457
 
 
 
 
 
 
 
 
 
 
 
6690430
2bf9457
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6690430
2bf9457
6690430
2bf9457
6598c39
2bf9457
 
7f28923
2bf9457
 
 
2aa4cc2
2bf9457
2aa4cc2
2bf9457
 
 
 
 
20a2029
2bf9457
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# ml_engine/data_manager.py
# (V11.0 - Full Real Data Integration: XGBoost 1h + Ranker + MC in Layer 1.1)

import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt.async_support as ccxt
import numpy as np
import logging
from typing import List, Dict, Any
import pandas as pd

# محاولة استيراد pandas_ta (ضرورية للمؤشرات)
try:
    import pandas_ta as ta
except ImportError:
    print("❌ [DataManager] مكتبة pandas_ta غير موجودة. النظام لن يعمل بشكل صحيح.")
    ta = None

# استيراد المحركات التحليلية
from ml_engine.indicators import AdvancedTechnicalAnalyzer
from ml_engine.monte_carlo import MonteCarloAnalyzer
from ml_engine.patterns import ChartPatternAnalyzer # المحرك الجديد V9.0
from ml_engine.ranker import Layer1Ranker

# تقليل ضجيج السجلات للمكتبات الخارجية
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)

class DataManager:
    def __init__(self, contracts_db, whale_monitor, r2_service=None):
        self.contracts_db = contracts_db or {}
        self.whale_monitor = whale_monitor
        self.r2_service = r2_service
        
        # إعداد عميل KuCoin غير المتزامن (لبيانات حقيقية فقط)
        self.exchange = ccxt.kucoin({
            'enableRateLimit': True,
            'timeout': 30000, # مهلة 30 ثانية
        })
            
        self.http_client = None
        self.market_cache = {}
        self.last_market_load = None

        # تهيئة المحركات التحليلية
        self.technical_analyzer = AdvancedTechnicalAnalyzer()
        self.mc_analyzer = MonteCarloAnalyzer()
        self.pattern_analyzer = None # سيتم تهيئته في initialize
        self.layer1_ranker = None    # سيتم تهيئته في initialize
        
    async def initialize(self):
        """تهيئة الاتصالات وتحميل نماذج الذكاء الاصطناعي"""
        self.http_client = httpx.AsyncClient(timeout=30.0)
        await self._load_markets()
        
        print("   > [DataManager V11.0] تهيئة محرك الأنماط (V9.0 XGBoost)...")
        try:
            self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
            await self.pattern_analyzer.initialize()
        except Exception as e:
            print(f"❌ [DataManager] فشل تهيئة محرك الأنماط V9.0: {e}")
            
        print("   > [DataManager V11.0] تهيئة الكاشف المصغر (Layer1 Ranker)...")
        try:
            self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
            await self.layer1_ranker.initialize()
        except Exception as e:
            print(f"❌ [DataManager] فشل تهيئة الرانكر: {e}")

        print("✅ DataManager initialized - V11.0 (Real Data Only)")

    async def _load_markets(self):
        """تحميل بيانات الأسواق المتاحة من المنصة"""
        try:
            if self.exchange:
                await self.exchange.load_markets()
                self.market_cache = self.exchange.markets
                self.last_market_load = datetime.now()
        except Exception as e:
            print(f"❌ [DataManager] فشل تحميل الأسواق من KuCoin: {e}")

    async def close(self):
        """إغلاق الاتصالات بأمان عند إيقاف النظام"""
        if self.http_client:
            await self.http_client.aclose()
        if self.exchange:
            await self.exchange.close()

    async def get_market_context_async(self):
        """جلب نظرة عامة حية على السوق (أسعار BTC/ETH ومؤشر الخوف والجشع)"""
        try:
            sentiment_data = await self.get_sentiment_live_async()
            price_data = await self._get_prices_live()
            
            bitcoin_price = price_data.get('bitcoin')
            ethereum_price = price_data.get('ethereum')
            
            return {
                'timestamp': datetime.now().isoformat(),
                'bitcoin_price_usd': bitcoin_price,
                'ethereum_price_usd': ethereum_price,
                'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None,
                'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'NEUTRAL',
                'market_trend': self._determine_market_trend(bitcoin_price, sentiment_data),
                'btc_sentiment': self._get_btc_sentiment(bitcoin_price),
                'data_quality': 'HIGH' if bitcoin_price and ethereum_price else 'LOW'
            }
        except Exception as e:
            print(f"⚠️ [DataManager] خطأ في جلب سياق السوق: {e}")
            return self._get_minimal_market_context()

    async def get_sentiment_live_async(self):
        """جلب مؤشر الخوف والجشع من API خارجي حي"""
        try:
            async with httpx.AsyncClient(timeout=10) as client:
                response = await client.get("https://api.alternative.me/fng/")
                if response.status_code == 200:
                    data = response.json()
                    if 'data' in data and len(data['data']) > 0:
                        latest = data['data'][0]
                        return {
                            "feargreed_value": int(latest['value']),
                            "feargreed_class": latest['value_classification']
                        }
            return None
        except Exception:
            return None

    async def _get_prices_live(self):
        """جلب أسعار BTC و ETH الحية من المنصة"""
        if not self.exchange:
            return {'bitcoin': None, 'ethereum': None}
        try:
            # جلب السعرين بالتوازي لتقليل وقت الانتظار
            btc_task = asyncio.create_task(self.exchange.fetch_ticker('BTC/USDT'))
            eth_task = asyncio.create_task(self.exchange.fetch_ticker('ETH/USDT'))
            btc_ticker, eth_ticker = await asyncio.gather(btc_task, eth_task, return_exceptions=True)
            
            btc_price = btc_ticker['last'] if isinstance(btc_ticker, dict) else None
            eth_price = eth_ticker['last'] if isinstance(eth_ticker, dict) else None
            
            return {'bitcoin': btc_price, 'ethereum': eth_price}
        except Exception:
            return {'bitcoin': None, 'ethereum': None}

    def _determine_market_trend(self, btc_price, sentiment):
        """تحديد اتجاه السوق بناءً على قواعد سعرية ومشاعرية (قابلة للتعديل)"""
        if not btc_price:
            return "UNKNOWN"
        
        score = 0
        # قواعد بسيطة لتحديد الاتجاه (يمكن جعلها ديناميكية أكثر مستقبلاً)
        if btc_price > 95000: score += 1
        elif btc_price < 90000: score -= 1
        
        if sentiment:
            fg_val = sentiment.get('feargreed_value', 50)
            if fg_val > 65: score += 1      # طمع شديد يدعم الصعود
            elif fg_val < 35: score -= 1    # خوف شديد يدعم الهبوط
            
        if score > 0: return "bull_market"
        elif score < 0: return "bear_market"
        else: return "sideways_market"

    def _get_btc_sentiment(self, btc_price):
         if not btc_price: return "UNKNOWN"
         # تصنيف بسيط بناء على مستويات سعرية
         if btc_price > 95000: return "BULLISH"
         elif btc_price < 90000: return "BEARISH"
         else: return "NEUTRAL"

    def _get_minimal_market_context(self):
         """سياق احتياطي في حال فشل الاتصال"""
         return {
             'timestamp': datetime.now().isoformat(),
             'data_available': False,
             'market_trend': 'UNKNOWN',
             'data_quality': 'LOW'
         }

    def _create_dataframe(self, candles: List) -> pd.DataFrame:
        """تحويل قائمة الشموع الخام إلى Pandas DataFrame للمعالجة"""
        if not candles:
            return pd.DataFrame()
        try:
            df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
            df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
            df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
            df.set_index('timestamp', inplace=True)
            df.sort_index(inplace=True)
            # إزالة أي صفوف مكررة قد تنتج عن مشاكل في API
            df = df[~df.index.duplicated(keep='last')]
            return df
        except Exception as e:
            print(f"⚠️ [DataManager] خطأ في إنشاء DataFrame: {e}")
            return pd.DataFrame()

    # ==================================================================
    # 🔴 الطبقة 1.1: الغربلة السريعة (V11.0 - Real Data & ML Integrated)
    # ==================================================================
    async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
        """
        تنفيذ الغربلة الأولية باستخدام بيانات حية و 3 نماذج تقييم.
        المعايير: حجم تداول عالي + درجة مجمعة >= 50%.
        """
        print("📊 [Layer 1.1] بدء الغربلة الحية (Ranker + XGB 1h + MC)...")
        
        # التحقق من جاهزية المحركات
        if not self.layer1_ranker or not self.layer1_ranker.model:
            print("⚠️ [Layer 1.1] تحذير: الرانكر (Ranker) غير جاهز.")
        if not self.pattern_analyzer or not self.pattern_analyzer.initialized:
            print("⚠️ [Layer 1.1] تحذير: محرك الأنماط (XGBoost) غير جاهز.")

        # 1. جلب جميع العملات وتصفية أفضل 100 حسب سيولة الدولار (Quote Volume)
        volume_data = await self._get_volume_data_live()
        if not volume_data:
            print("❌ [Layer 1.1] فشل جلب بيانات الأحجام الحية. إيقاف الغربلة.")
            return []
            
        # ترتيب تنازلي حسب السيولة وأخذ أفضل 100
        volume_data.sort(key=lambda x: x['dollar_volume'], reverse=True)
        top_100_candidates = volume_data[:100]
        
        print(f"   🔄 تحليل {len(top_100_candidates)} عملة (إطار 1H) بالتوازي...")
        final_qualified_candidates = []

        # 2. معالجة العملات على دفعات (لتجنب تجاوز حدود API)
        batch_size = 20 # دفعة آمنة لمعظم المنصات
        for i in range(0, len(top_100_candidates), batch_size):
            batch = top_100_candidates[i:i+batch_size]
            
            # جلب شموع الساعة (1H) لكل عملة في الدفعة بالتوازي
            fetch_tasks = [self._fetch_ohlcv_live(c['symbol'], '1h', limit=200) for c in batch]
            batch_results = await asyncio.gather(*fetch_tasks, return_exceptions=True)

            for j, candles in enumerate(batch_results):
                # التحقق من صلاحية البيانات المستلمة
                if isinstance(candles, Exception) or not candles or len(candles) < 150:
                    continue
                
                candidate_data = batch[j]
                symbol = candidate_data['symbol']
                
                try:
                    # تحويل الشموع إلى DataFrame للحسابات
                    df_1h = self._create_dataframe(candles)
                    if df_1h.empty: continue
                    closes_np = df_1h['close'].to_numpy()

                    # --- أ. حساب درجة الرانكر (LightGBM) [وزن 40%] ---
                    ranker_score = 0.0
                    if self.layer1_ranker and self.layer1_ranker.model:
                        # حساب الميزات الذكية التي يحتاجها الرانكر
                        features = self.technical_analyzer.calculate_v9_smart_features(df_1h)
                        if features:
                            # إضافة ميزات مونت كارلو البسيطة للرانكر (يتوقعها النموذج)
                            mc_simple_for_ranker = self.mc_analyzer.generate_1h_price_distribution_simple(closes_np[-100:])
                            features.update(mc_simple_for_ranker)
                            # التنبؤ
                            ranker_prob = self.layer1_ranker.predict_proba(pd.DataFrame([features]))
                            ranker_score = float(ranker_prob[0]) if len(ranker_prob) > 0 else 0.0

                    # --- ب. حساب درجة XGBoost 1h (النمط) [وزن 40%] ---
                    xgb_score = 0.0
                    if self.pattern_analyzer and self.pattern_analyzer.initialized:
                        # نمرر بيانات 1H فقط للمحرك للحصول على نتيجة سريعة
                        pattern_result = await self.pattern_analyzer.detect_chart_patterns({'1h': candles})
                        # استخراج احتمالية الصعود لنموذج 1h
                        xgb_score = pattern_result.get('details', {}).get('1h') or 0.0

                    # --- ج. حساب درجة مونت كارلو (احتمالية الربح) [وزن 20%] ---
                    mc_score = 0.0
                    mc_result = self.mc_analyzer.generate_1h_price_distribution_simple(closes_np[-100:])
                    if not mc_result.get('error'):
                        mc_prob_gain = mc_result.get('mc_prob_gain', 0.5)
                        # تطبيع الدرجة: 50% احتمال ربح = 0 نقاط، 100% احتمال ربح = 1 نقطة
                        mc_score = max(0.0, (mc_prob_gain - 0.5) * 2.0)

                    # --- د. حساب الدرجة النهائية الموزونة ---
                    final_l1_score = (ranker_score * 0.40) + (xgb_score * 0.40) + (mc_score * 0.20)

                    # التأهيل إذا تجاوزت العتبة (50%)
                    if final_l1_score >= 0.50:
                        candidate_data['layer1_score'] = final_l1_score
                        # تخزين أسباب الترشح للشفافية
                        candidate_data['reasons_for_candidacy'] = [
                            f"Ranker(40%): {ranker_score:.2f}",
                            f"XGB_1h(40%): {xgb_score:.2f}",
                            f"MC_Simple(20%): {mc_score:.2f}"
                        ]
                        final_qualified_candidates.append(candidate_data)

                except Exception as e:
                    # طباعة خطأ صامتة لعدم إغراق السجلات
                    # print(f"⚠️ خطأ في تحليل {symbol}: {e}")
                    continue

        # ترتيب المرشحين النهائيين حسب الدرجة
        final_qualified_candidates.sort(key=lambda x: x['layer1_score'], reverse=True)
        
        print(f"🎯 [Layer 1.1] اكتملت الغربلة. تم تأهيل {len(final_qualified_candidates)} عملة للمرحلة القادمة.")
        # إرجاع أفضل 20 عملة فقط للتحليل العميق المكلف
        return final_qualified_candidates[:20]

    async def _fetch_ohlcv_live(self, symbol, timeframe, limit):
        """جلب شموع حية من المنصة مع التعامل مع الأخطاء"""
        if not self.exchange: return None
        try:
            return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
        except Exception:
            return None

    async def _get_volume_data_live(self) -> List[Dict[str, Any]]:
        """جلب بيانات الحجم الحية لجميع أزواج USDT"""
        if not self.exchange: return []
        try:
            # جلب كل الأسعار والأحجام في طلب واحد سريع
            tickers = await self.exchange.fetch_tickers()
            volume_data = []
            for symbol, ticker in tickers.items():
                # فلترة أزواج USDT النشطة فقط ذات السيولة المقبولة (> 50k)
                if (symbol.endswith('/USDT') and 
                    ticker.get('quoteVolume') and 
                    ticker['quoteVolume'] > 50000):
                    
                     volume_data.append({
                        'symbol': symbol,
                        'dollar_volume': float(ticker['quoteVolume']),
                        'current_price': float(ticker['last']) if ticker.get('last') else 0.0,
                        'price_change_24h': float(ticker['percentage']) if ticker.get('percentage') else 0.0
                    })
            return volume_data
        except Exception as e:
            print(f"❌ [DataManager] فشل جلب بيانات الحجم الحية: {e}")
            return []

    async def stream_ohlcv_data(self, symbols: List[Dict[str, Any]], queue: asyncio.Queue):
        """
        (للطبقة 1.2) جلب البيانات الكاملة (4 أطر زمنية) للعملات المتأهلة.
        يتم إرسال البيانات عبر Queue للمعالجة المتوازية في Processor.
        """
        print(f"📊 [Layer 1.2 Producer] بدء جلب البيانات العميقة لـ {len(symbols)} عملة...")
        
        for symbol_data in symbols:
            symbol = symbol_data['symbol']
            # جلب الأطر الزمنية الأربعة المطلوبة لمحرك الأنماط الجديد
            # (نستخدم 300 شمعة لضمان وجود بيانات كافية للمؤشرات)
            tasks = [
                self._fetch_ohlcv_live(symbol, '15m', 300),
                self._fetch_ohlcv_live(symbol, '1h', 300),
                self._fetch_ohlcv_live(symbol, '4h', 300),
                self._fetch_ohlcv_live(symbol, '1d', 300),
            ]
            # تنفيذ الطلبات الأربعة بالتوازي للعملة الواحدة
            results = await asyncio.gather(*tasks, return_exceptions=False)
            
            ohlcv_complete = {}
            tfs = ['15m', '1h', '4h', '1d']
            valid_tfs_count = 0
            
            for i, res in enumerate(results):
                # التأكد من أن البيانات صالحة وكافية (على الأقل 200 شمعة للتحليل الدقيق)
                if res and isinstance(res, list) and len(res) >= 200:
                    ohlcv_complete[tfs[i]] = res
                    valid_tfs_count += 1
            
            # إرسال العملة للمعالجة فقط إذا توفرت معظم الأطر الزمنية (3 على الأقل)
            if valid_tfs_count >= 3:
                symbol_data['ohlcv'] = ohlcv_complete
                # نضع العملة في قائمة (كدُفعة صغيرة من عنصر واحد) في الطابور
                await queue.put([symbol_data])
            else:
                 # تجاهل العملة لعدم كفاية البيانات
                 pass

        # إرسال إشارة نهاية التدفق للمستهلك (Processor)
        await queue.put(None)
        print("🏁 [Layer 1.2 Producer] اكتمل تدفق البيانات.")

    async def get_symbol_daily_volume(self, symbol):
        """جلب حجم التداول اليومي لعملة محددة (يستخدم عند إعادة التحليل)"""
        try:
            if self.exchange:
                ticker = await self.exchange.fetch_ticker(symbol)
                return float(ticker['quoteVolume']) if ticker and ticker.get('quoteVolume') else 0.0
            return 0.0
        except: return 0.0

    async def get_whale_data_for_symbol(self, symbol, daily_volume_usd=0):
        """واجهة لجلب بيانات الحيتان من المراقب (مع تمرير الحجم اليومي للتحليل النسبي)"""
        if self.whale_monitor:
             return await self.whale_monitor.get_symbol_whale_activity(symbol, daily_volume_usd)
        return None

# طباعة رسالة تأكيد عند استيراد الملف بنجاح
print("✅ DataManager loaded - V11.0 (Full Real Data Integration)")