File size: 28,656 Bytes
6dec69a
952bf71
 
 
 
 
 
6a4cb30
585273b
c7c1fc7
8fb1e4e
97b50d2
 
 
 
 
1b1414d
97b50d2
fa58d6b
33d4073
fa58d6b
33d4073
 
fa58d6b
33d4073
1b1414d
a01528e
c7c1fc7
 
a01528e
 
9aa9464
a01528e
60168e1
1b1414d
c3cfdec
 
 
1b1414d
 
c3cfdec
6a4cb30
c3cfdec
2ecc875
c3cfdec
 
1e59082
667edc4
 
fa58d6b
33d4073
fa58d6b
1b1414d
33d4073
1b1414d
 
585273b
a01528e
d480654
667edc4
9aa9464
 
 
 
1b1414d
9aa9464
 
 
1b1414d
9c58061
1b1414d
9c58061
1b1414d
 
 
 
9c58061
1b1414d
9c58061
1b1414d
9c58061
1b1414d
9aa9464
6dec69a
24495bd
667edc4
 
1b1414d
2ecc875
93d8db3
667edc4
 
2ecc875
1b1414d
667edc4
a01528e
1b1414d
db0d805
1b1414d
 
 
24495bd
585273b
d480654
 
1b1414d
 
24495bd
1b1414d
24495bd
 
 
 
 
 
1b1414d
585273b
24495bd
585273b
d480654
24495bd
1b1414d
 
24495bd
1b1414d
 
24495bd
1b1414d
fa58d6b
 
 
d480654
 
fa58d6b
 
 
 
 
5dfcbf0
fa58d6b
 
 
 
5dfcbf0
 
c3cfdec
1b1414d
d480654
1b1414d
c3cfdec
fa58d6b
5dfcbf0
c3cfdec
1b1414d
fa58d6b
1b1414d
fa58d6b
c3cfdec
1b1414d
5dfcbf0
 
d480654
5dfcbf0
fa58d6b
82a1a9b
d480654
1b1414d
 
 
 
 
 
1e59082
1b1414d
a01528e
33d4073
fa58d6b
1b1414d
fa58d6b
 
 
 
 
 
 
 
 
1b1414d
fa58d6b
 
24495bd
 
33d4073
 
 
24495bd
33d4073
6a4cb30
1b1414d
33d4073
1b1414d
 
97b50d2
fa58d6b
1b1414d
6a4cb30
33d4073
6a4cb30
 
 
97b50d2
33d4073
6a4cb30
1b1414d
 
 
97b50d2
1b1414d
 
 
24495bd
1b1414d
 
 
 
 
 
 
 
33d4073
1b1414d
fa58d6b
33d4073
fa58d6b
33d4073
1b1414d
 
 
 
 
 
33d4073
 
1b1414d
33d4073
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fa58d6b
1b1414d
33d4073
1b1414d
 
 
33d4073
1b1414d
 
ace2d50
33d4073
 
5f0f8a5
 
 
 
33d4073
5f0f8a5
9c58061
5f0f8a5
 
33d4073
1b1414d
 
ace2d50
33d4073
cf306f1
1b1414d
9c58061
1b1414d
33d4073
1b1414d
 
cda0397
33d4073
cda0397
1b1414d
 
 
 
 
ace2d50
33d4073
97b50d2
1b1414d
97b50d2
1b1414d
97b50d2
1b1414d
 
fa58d6b
1b1414d
97b50d2
a97dc52
 
fa58d6b
5e06e3f
 
a97dc52
fa58d6b
5e06e3f
 
fa58d6b
5e06e3f
 
 
a97dc52
 
fa58d6b
a97dc52
5e06e3f
fa58d6b
5e06e3f
 
 
 
 
 
 
 
 
 
1b1414d
a97dc52
5e06e3f
 
 
 
a97dc52
 
 
 
5e06e3f
 
 
fa58d6b
5e06e3f
 
 
a97dc52
 
fa58d6b
a97dc52
 
5e06e3f
 
 
 
 
fa58d6b
5e06e3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1b1414d
a97dc52
5e06e3f
 
 
 
99679e1
1b1414d
5e06e3f
 
 
 
84a279b
47a0054
5e06e3f
 
 
 
 
 
84a279b
5e06e3f
47a0054
84a279b
5e06e3f
 
 
 
 
84a279b
5e06e3f
 
 
 
 
 
47a0054
5e06e3f
 
 
 
 
 
 
 
 
 
 
 
35778df
84a279b
aed8c82
5e06e3f
 
 
 
 
 
 
 
 
 
 
84a279b
 
fa58d6b
5e06e3f
 
 
 
 
19236f5
eb8d95c
a631ae8
5e06e3f
1b1414d
 
fa58d6b
5e06e3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
776410b
5e06e3f
 
776410b
 
93d8db3
5e06e3f
 
 
 
1b1414d
5e06e3f
 
 
 
 
35778df
 
fa58d6b
93d8db3
1b1414d
5e06e3f
 
 
6dec69a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e06e3f
6dec69a
 
 
 
5e06e3f
 
6dec69a
5e06e3f
64b02cc
5e06e3f
 
 
 
64b02cc
6dec69a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
# data_manager.py (Updated to V10.2 - Whale Learning Data Link)
import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt
import numpy as np
import logging
from typing import List, Dict, Any
import pandas as pd

try:
    import pandas_ta as ta
except ImportError:
    print("⚠️ مكتبة pandas_ta غير موجودة. النظام سيفشل.")
    ta = None

# (V10.0) استيراد العقل الحسابي المطور
from ml_engine.indicators import AdvancedTechnicalAnalyzer
# (V10.0) استيراد مونت كارلو المطور
from ml_engine.monte_carlo import MonteCarloAnalyzer 
from ml_engine.patterns import ChartPatternAnalyzer
# (V9.1) استيراد "العقل الذكي"
from ml_engine.ranker import Layer1Ranker 

logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)

class DataManager:
    def __init__(self, contracts_db, whale_monitor, r2_service=None):
        self.contracts_db = contracts_db or {}
        self.whale_monitor = whale_monitor
        self.r2_service = r2_service
        
        try:
            self.exchange = ccxt.kucoin({
                'sandbox': False, 'enableRateLimit': True,
                'timeout': 30000, 'verbose': False,
            })
            print("✅ تم تهيئة اتصال KuCoin بنجاح")
        except Exception as e:
            print(f"❌ فشل تهيئة اتصال KuCoin: {e}")
            self.exchange = None
            
        self.http_client = None
        self.market_cache = {}
        self.last_market_load = None

        # (V10.0) تهيئة العقول الحسابية
        self.technical_analyzer = AdvancedTechnicalAnalyzer()
        self.pattern_analyzer = None
        self.mc_analyzer = MonteCarloAnalyzer() 
        # (V9.1) تهيئة "العقل الذكي" (النموذج)
        self.layer1_ranker = None 
        
    async def initialize(self):
        self.http_client = httpx.AsyncClient(timeout=30.0)
        await self._load_markets()
        
        print("   > [DataManager] تهيئة محرك الأنماط V8 (ML-Based)...")
        try:
            self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
            await self.pattern_analyzer.initialize()
        except Exception as e:
            print(f"❌ [DataManager] فشل تهيئة محرك الأنماط V8: {e}")
            self.pattern_analyzer = ChartPatternAnalyzer(r2_service=None)
            
        print("   > [DataManager] تهيئة الكاشف المصغر (Layer1 Ranker V9.8)...")
        try:
            # (تأكد من أن النموذج V9.8 موجود ومسمى بهذا الاسم)
            model_file_path = "ml_models/layer1_ranker.lgbm"
            self.layer1_ranker = Layer1Ranker(model_path=model_file_path)
            await self.layer1_ranker.initialize()
            if self.layer1_ranker.model is None:
                print("   ⚠️ [DataManager V9.8] الرانكر في وضع 'وهمي' (Placeholder).")
            else:
                 print(f"   ✅ [DataManager V9.8] الرانكر {self.layer1_ranker.model_name} جاهز للعمل.")
        except Exception as e:
            print(f"❌ [DataManager V9.8] فشل تهيئة الرانكر V9.8: {e}")
            self.layer1_ranker = None 

        print("✅ DataManager initialized - V10.2 (Whale Learning Data Link)")

    async def _load_markets(self):
        try:
            if not self.exchange: return
            print("🔄 جلب أحدث بيانات الأسواق من KuCoin...")
            self.exchange.load_markets()
            self.market_cache = self.exchange.markets
            self.last_market_load = datetime.now()
            print(f"✅ تم تحميل {len(self.market_cache)} سوق من KuCoin")
        except Exception as e: print(f"❌ فشل تحميل بيانات الأسواق: {e}")

    async def close(self):
        if self.http_client and not self.http_client.is_closed: await self.http_client.aclose()
        if self.exchange:
            try: await self.exchange.close()
            except Exception: pass

    async def get_market_context_async(self):
        try:
            sentiment_data = await self.get_sentiment_safe_async()
            price_data = await self._get_prices_with_fallback()
            bitcoin_price = price_data.get('bitcoin'); ethereum_price = price_data.get('ethereum')
            return {
                'timestamp': datetime.now().isoformat(),
                'bitcoin_price_usd': bitcoin_price, 'ethereum_price_usd': ethereum_price,
                'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None,
                'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'NEUTRAL',
                'market_trend': self._determine_market_trend(bitcoin_price, sentiment_data),
                'btc_sentiment': self._get_btc_sentiment(bitcoin_price),
                'data_quality': 'HIGH' if bitcoin_price and ethereum_price else 'LOW'
            }
        except Exception as e: return self._get_minimal_market_context()

    async def get_sentiment_safe_async(self):
        try:
            async with httpx.AsyncClient(timeout=10) as client:
                response = await client.get("https://api.alternative.me/fng/")
                response.raise_for_status(); data = response.json()
                if 'data' not in data or not data['data']: raise ValueError("بيانات المشاعر غير متوفرة")
                latest_data = data['data'][0]
                return { "feargreed_value": int(latest_data['value']), "feargreed_class": latest_data['value_classification'], "source": "alternative.me", "timestamp": datetime.now().isoformat() }
        except Exception as e: return None
    def _determine_market_trend(self, bitcoin_price, sentiment_data):
        if bitcoin_price is None: return "UNKNOWN"
        if bitcoin_price > 60000: score = 1
        elif bitcoin_price < 55000: score = -1
        else: score = 0
        if sentiment_data and sentiment_data.get('feargreed_value') is not None:
            fear_greed = sentiment_data.get('feargreed_value')
            if fear_greed > 60: score += 1
            elif fear_greed < 40: score -= 1
        if score >= 1: return "bull_market"
        elif score <= -1: return "bear_market"
        else: return "sideways_market"
    def _get_btc_sentiment(self, bitcoin_price):
        if bitcoin_price is None: return 'UNKNOWN'
        elif bitcoin_price > 60000: return 'BULLISH'
        elif bitcoin_price < 55000: return 'BEARISH'
        else: return 'NEUTRAL'
    async def _get_prices_with_fallback(self):
        try:
            prices = await self._get_prices_from_kucoin_safe()
            if prices.get('bitcoin') and prices.get('ethereum'): return prices
            return await self._get_prices_from_coingecko()
        except Exception as e: return {'bitcoin': None, 'ethereum': None}
    async def _get_prices_from_kucoin_safe(self):
        if not self.exchange: return {'bitcoin': None, 'ethereum': None}
        try:
            prices = {'bitcoin': None, 'ethereum': None}
            btc_ticker = self.exchange.fetch_ticker('BTC/USDT'); btc_price = float(btc_ticker.get('last', 0)) if btc_ticker.get('last') else None
            if btc_price and btc_price > 0: prices['bitcoin'] = btc_price
            eth_ticker = self.exchange.fetch_ticker('ETH/USDT'); eth_price = float(eth_ticker.get('last', 0)) if eth_ticker.get('last') else None
            if eth_price and eth_price > 0: prices['ethereum'] = eth_price
            return prices
        except Exception as e: return {'bitcoin': None, 'ethereum': None}
    async def _get_prices_from_coingecko(self):
        try:
            await asyncio.sleep(0.5)
            url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd"
            headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json'}
            async with httpx.AsyncClient(headers=headers) as client:
                response = await client.get(url, timeout=10)
                if response.status_code == 429: await asyncio.sleep(2); response = await client.get(url, timeout=10)
                response.raise_for_status(); data = response.json()
                btc_price = data.get('bitcoin', {}).get('usd'); eth_price = data.get('ethereum', {}).get('usd')
                if btc_price and eth_price: return {'bitcoin': btc_price, 'ethereum': eth_price}
                else: return {'bitcoin': None, 'ethereum': None}
        except Exception as e: return {'bitcoin': None, 'ethereum': None}
    def _get_minimal_market_context(self):
        return { 'timestamp': datetime.now().isoformat(), 'data_available': False, 'market_trend': 'UNKNOWN', 'btc_sentiment': 'UNKNOWN', 'data_quality': 'LOW' }


    def _create_dataframe(self, candles: List) -> pd.DataFrame:
        """(V9.1) إنشاء DataFrame (تحتاج 200 شمعة للميزات)"""
        try:
            if not candles: return pd.DataFrame()
            df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
            df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
            df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
            df.set_index('timestamp', inplace=True)
            df.sort_index(inplace=True)
            return df
        except Exception as e:
            print(f"❌ خطأ في إنشاء DataFrame: {e}")
            return pd.DataFrame()

    async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
        """
        الطبقة 1: فحص سريع - (محدث V10.1)
        - إصلاح مونت كارلو المفقود.
        - استخدام عتبة التوازن 53% (0.53) التي اخترتها.
        """
        print("📊 الطبقة 1 (V10.1 - Balanced 53% + MC Fix): بدء الغربلة...")
        
        if not self.layer1_ranker:
            print("❌ [V10.1] الرانكر غير مهيأ. إيقاف الغربلة.")
            return []

        # الخطوة 1: جلب أفضل 100 عملة حسب الحجم
        volume_data = await self._get_volume_data_optimal()
        if not volume_data: volume_data = await self._get_volume_data_direct_api()
        if not volume_data:
            print("❌ [V10.1] فشل جلب بيانات الأحجام.")
            return []
        
        volume_data.sort(key=lambda x: x['dollar_volume'], reverse=True)
        top_100_by_volume = volume_data[:100]
        print(f"✅ [V10.1] تم تحديد أفضل {len(top_100_by_volume)} عملة. بدء حساب الميزات الذكية...")
        
        final_candidates_with_scores = []
        batch_symbols_data = top_100_by_volume
        batch_symbols = [s['symbol'] for s in batch_symbols_data]
            
        # الخطوة 2: جلب بيانات 1H (200 شمعة)
        tasks = [self._fetch_1h_ohlcv_for_screening(symbol, limit=200) for symbol in batch_symbols]
        results_candles = await asyncio.gather(*tasks, return_exceptions=True)
        
        valid_symbol_data_for_ranking = []
        for j, (candles) in enumerate(results_candles):
            symbol_data = batch_symbols_data[j]
            if isinstance(candles, Exception) or not candles or len(candles) < 200: continue
            symbol_data['ohlcv_1h_raw'] = candles 
            valid_symbol_data_for_ranking.append(symbol_data)

        if not valid_symbol_data_for_ranking:
            print("❌ [V10.1] لا توجد عملات صالحة (تحتاج 200 شمعة 1H).")
            return []

        print(f"   🔄 [V10.1] حساب الميزات (الذكية + مونت كارلو) لـ {len(valid_symbol_data_for_ranking)} عملة...")
        
        # الخطوة 3: حساب "الميزات الذكية V9.8" (الكاملة)
        all_features_list = []
        symbols_in_order = []
        for symbol_data in valid_symbol_data_for_ranking:
            try:
                df = self._create_dataframe(symbol_data['ohlcv_1h_raw'])
                if df.empty: continue
                
                # (أ. حساب الميزات الذكية V9.8 - من indicators.py)
                smart_features = self.technical_analyzer.calculate_v9_smart_features(df)
                if not smart_features:
                    # (فشل حساب الميزات الرئيسية، تجاهل العملة)
                    continue
                
                # 🔴 --- (V10.0 - الإصلاح) --- 🔴
                # (ب. حساب ميزات مونت كارلو V9.8 - من monte_carlo.py)
                # (نستخدم آخر 100 شمعة فقط للمحاكاة البسيطة كما في التدريب)
                closes_np = df['close'].tail(100).to_numpy()
                mc_features = self.mc_analyzer.generate_1h_price_distribution_simple(closes_np)
                
                # (إضافة ميزات مونت كارلو إلى قاموس الميزات)
                smart_features['mc_prob_gain'] = mc_features['mc_prob_gain']
                smart_features['mc_var_95_pct'] = mc_features['mc_var_95_pct']
                # 🔴 --- (نهاية الإصلاح) --- 🔴

                all_features_list.append(smart_features)
                symbols_in_order.append(symbol_data)
                
            except Exception:
                pass # (تجاهل العملة إذا فشل أي جزء)
        
        if not all_features_list:
            print("❌ [V10.1] فشل حساب الميزات الذكية.")
            return []

        # الخطوة 4: التنبؤ (التصنيف)
        print(f"   🧠 [V10.1] إرسال {len(all_features_list)} عملة إلى نموذج الرانكر...")
        features_dataframe = pd.DataFrame(all_features_list)
        probabilities = self.layer1_ranker.predict_proba(features_dataframe)
        
        # (الطباعة التشخيصية)
        print(f"   🔍 [V10.1 DEBUG] تم استلام {len(probabilities)} نتيجة من الرانكر.")
        debug_scores = []
        for i, (symbol_data) in enumerate(symbols_in_order):
            debug_scores.append((symbol_data['symbol'], probabilities[i]))
        debug_scores.sort(key=lambda x: x[1], reverse=True)
        print("   --- 📋 [V10.1 DEBUG] أعلى 10 درجات خام (قبل الفلترة) ---")
        for i, (symbol, score) in enumerate(debug_scores[:10]):
            print(f"      {i+1}. {symbol}: {score:.4f}") 
        print("   -------------------------------------------------")

        # الخطوة 5: تجميع النتائج (مع عتبة التوازن 53%)
        for i, (symbol_data) in enumerate(symbols_in_order):
            score = probabilities[i]
            
            # 🔴 (V10.1: عتبة التوازن 53% - بناءً على اختيارك) 🔴
            if score >= 0.50: 
                symbol = symbol_data['symbol']
                print(f"      ✅ {symbol}: نجح (الاحتمالية: {score:.3f})")
                symbol_data['layer1_score'] = float(score)
                symbol_data['reasons_for_candidacy'] = ["V9_SMART_RANKER_BALANCED_53"]
                if 'ohlcv_1h_raw' in symbol_data: del symbol_data['ohlcv_1h_raw'] 
                final_candidates_with_scores.append(symbol_data)

        print(f"🎯 اكتملت الغربلة (V10.1). تم تأهيل {len(final_candidates_with_scores)} عملة (ثقة >= 53%).")
        
        if final_candidates_with_scores:
            final_candidates_with_scores.sort(key=lambda x: x['layer1_score'], reverse=True)
            print("🏆 المرشحون الناجحون (Top Candidates):")
            for k, candidate in enumerate(final_candidates_with_scores[:5]):
                print(f"   {k+1}. {candidate['symbol']}: (Score: {candidate.get('layer1_score'):.3f})")
        else:
            print("⚠️ [V10.1] لم تنجح أي عملة في تجاوز عتبة الثقة 53%.")

        return final_candidates_with_scores[:20]

    async def _fetch_1h_ohlcv_for_screening(self, symbol: str, limit: int = 200) -> List:
        try:
            ohlcv_data = self.exchange.fetch_ohlcv(symbol, '1h', limit=limit)
            if not ohlcv_data or len(ohlcv_data) < limit: return None
            return ohlcv_data
        except Exception: return None

    async def _get_volume_data_optimal(self) -> List[Dict[str, Any]]:
        try:
            if not self.exchange: return []
            tickers = self.exchange.fetch_tickers()
            volume_data = []
            for symbol, ticker in tickers.items():
                if not symbol.endswith('/USDT') or not ticker.get('active', True): continue
                current_price = ticker.get('last', 0)
                quote_volume = ticker.get('quoteVolume', 0)
                if current_price is None or current_price <= 0: continue
                
                if quote_volume is not None and quote_volume > 0:
                    dollar_volume = quote_volume
                else:
                    base_volume = ticker.get('baseVolume', 0)
                    if base_volume is None: continue
                    dollar_volume = base_volume * current_price
                    
                if dollar_volume is None or dollar_volume < 50000: continue
                
                price_change_24h = ticker.get('percentage', 0) or 0

                volume_data.append({ 
                    'symbol': symbol, 
                    'dollar_volume': dollar_volume, 
                    'current_price': current_price, 
                    'volume_24h': ticker.get('baseVolume', 0) or 0, 
                    'price_change_24h': price_change_24h 
                })
            print(f"✅ تم معالجة {len(volume_data)} عملة في الطريقة المثلى")
            return volume_data
        except Exception as e: 
            print(f"❌ خطأ في جلب بيانات الحجم المثلى: {e}")
            return []
            
    async def _get_volume_data_direct_api(self) -> List[Dict[str, Any]]:
        try:
            url = "https://api.kucoin.com/api/v1/market/allTickers"
            async with httpx.AsyncClient(timeout=15) as client:
                response = await client.get(url)
                response.raise_for_status()
                data = response.json()
                if data.get('code') != '200000': raise ValueError(f"استجابة API غير متوقعة: {data.get('code')}")
                
                tickers = data['data']['ticker']
                volume_data = []
                for ticker in tickers:
                    symbol = ticker['symbol']
                    if not symbol.endswith('USDT'): continue
                    formatted_symbol = symbol.replace('-', '/')
                    try:
                        vol_value = ticker.get('volValue')
                        last_price = ticker.get('last')
                        change_rate = ticker.get('changeRate')
                        vol = ticker.get('vol')
                        
                        if vol_value is None or last_price is None or change_rate is None or vol is None: continue
                        
                        dollar_volume = float(vol_value) if vol_value else 0
                        current_price = float(last_price) if last_price else 0
                        price_change = (float(change_rate) * 100) if change_rate else 0
                        volume_24h = float(vol) if vol else 0
                        
                        if dollar_volume >= 50000 and current_price > 0:
                            volume_data.append({ 
                                'symbol': formatted_symbol, 
                                'dollar_volume': dollar_volume, 
                                'current_price': current_price, 
                                'volume_24h': volume_24h, 
                                'price_change_24h': price_change 
                            })
                    except (ValueError, TypeError, KeyError):
                        continue
                        
                print(f"✅ تم معالجة {len(volume_data)} عملة في الطريقة المباشرة")
                return volume_data
        except Exception as e:
            print(f"❌ خطأ في جلب بيانات الحجم المباشر: {e}")
            return []
            
    async def stream_ohlcv_data(self, symbols: List[Dict[str, Any]], queue: asyncio.Queue):
        print(f"📊 بدء تدفق بيانات OHLCV (الكاملة) لـ {len(symbols)} عملة (مصنفة)...")
        batch_size = 15
        batches = [symbols[i:i + batch_size] for i in range(0, len(symbols), batch_size)]
        total_successful = 0
        
        for batch_num, batch in enumerate(batches):
            print(f"   🔄 [المنتج] جلب الدفعة {batch_num + 1}/{len(batches)} ({len(batch)} عملة)...")
            batch_tasks = []
            for symbol_data in batch:
                symbol_str = symbol_data['symbol'] 
                task = asyncio.create_task(self._fetch_complete_ohlcv_parallel(symbol_str))
                batch_tasks.append(task)
                
            batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
            
            successful_data_for_batch = []
            for i, result in enumerate(batch_results):
                original_symbol_data = batch[i]
                symbol_str = original_symbol_data['symbol']
                
                if isinstance(result, Exception):
                    print(f"      ❌ [المنتج] فشل جلب {symbol_str}: {result}")
                elif result is not None:
                    result.update(original_symbol_data) 
                    successful_data_for_batch.append(result)
                    print(f"      ✅ [المنتج] {symbol_str}: {result.get('successful_timeframes', 0)}/6 أطر زمنية")
                else:
                    print(f"      ⚠️ [المنتج] {symbol_str}: بيانات غير كافية، تم التجاهل")

            if successful_data_for_batch:
                try:
                    await queue.put(successful_data_for_batch)
                    total_successful += len(successful_data_for_batch)
                except Exception as q_err:
                    print(f"      ❌ [المنتج] فشل إرسال الدفعة للطابور: {q_err}")

            if batch_num < len(batches) - 1:
                await asyncio.sleep(1) # (إضافة تأخير بسيط بين الدفعات)
                
        print(f"✅ [المنتج] اكتمل التدفق. تم إرسال {total_successful} عملة للمعالجة.")
        await queue.put(None)
        print("      📬 [المنتج] تم إرسال إشارة الإنهاء (None) إلى الطابور.")

    async def _fetch_complete_ohlcv_parallel(self, symbol: str) -> Dict[str, Any]:
        try:
            ohlcv_data = {}
            timeframes = [('5m', 200), ('15m', 200), ('1h', 200), ('4h', 200), ('1d', 200), ('1w', 200)]
            timeframe_tasks = []
            
            for timeframe, limit in timeframes:
                task = asyncio.create_task(self._fetch_single_timeframe_improved(symbol, timeframe, limit))
                timeframe_tasks.append(task)
                
            timeframe_results = await asyncio.gather(*timeframe_tasks, return_exceptions=True)
            successful_timeframes = 0
            
            for i, (timeframe, limit) in enumerate(timeframes):
                result = timeframe_results[i]
                if isinstance(result, Exception): continue
                if result and len(result) >= 200: # (التأكد من وجود بيانات كافية)
                    ohlcv_data[timeframe] = result
                    successful_timeframes += 1
                    
            if successful_timeframes >= 3 and ohlcv_data: # (نحتاج 3 أطر زمنية على الأقل)
                try:
                    current_price = await self.get_latest_price_async(symbol)
                    if current_price is None:
                        # (محاولة احتياطية للحصول على السعر من آخر شمعة)
                        for tf_data in ohlcv_data.values():
                            if tf_data and len(tf_data) > 0: current_price = tf_data[-1][4]; break
                    if current_price is None: return None
                    
                    return { 
                        'symbol': symbol, 
                        'ohlcv': ohlcv_data, 
                        'raw_ohlcv': ohlcv_data, 
                        'current_price': current_price, 
                        'timestamp': datetime.now().isoformat(), 
                        'candles_count': {tf: len(data) for tf, data in ohlcv_data.items()},
                        'successful_timeframes': successful_timeframes 
                    }
                except Exception: 
                    return None
            else: 
                return None
        except Exception: 
            return None
            
    async def _fetch_single_timeframe_improved(self, symbol: str, timeframe: str, limit: int):
        max_retries = 3
        retry_delay = 2
        for attempt in range(max_retries):
            try:
                ohlcv_data = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
                if ohlcv_data and len(ohlcv_data) > 0: 
                    return ohlcv_data
                else: 
                    return []
            except Exception:
                if attempt < max_retries - 1:
                    await asyncio.sleep(retry_delay * (attempt + 1))
                else: 
                    return []
                    
    async def get_latest_price_async(self, symbol):
        try:
            if not self.exchange: return None
            ticker = self.exchange.fetch_ticker(symbol)
            return float(ticker['last']) if ticker and ticker.get('last') else None
        except Exception: 
            return None
            
    # 🔴 --- START OF CHANGE (V10.2) --- 🔴
    async def get_symbol_daily_volume(self, symbol: str) -> float:
        """
        (جديد) دالة مساعدة لجلب حجم التداول اليومي (بالدولار) لعملة واحدة.
        (تستخدم في إعادة التحليل).
        """
        try:
            if not self.exchange: return 0.0
            ticker = self.exchange.fetch_ticker(symbol)
            if not ticker: return 0.0
            
            current_price = ticker.get('last', 0)
            quote_volume = ticker.get('quoteVolume', 0)
            
            if quote_volume is not None and quote_volume > 0:
                return float(quote_volume)
            elif current_price is not None and current_price > 0:
                base_volume = ticker.get('baseVolume', 0)
                if base_volume is not None:
                    return float(base_volume) * float(current_price)
            
            return 0.0
        except Exception:
            return 0.0

    async def get_whale_data_for_symbol(self, symbol: str, daily_volume_usd: float = 0.0):
        """
        (محدث V10.2)
        تمرير حجم التداول اليومي إلى مراقب الحيتان لتفعيل المقاييس النسبية.
        """
        try:
            if self.whale_monitor:
                return await self.whale_monitor.get_symbol_whale_activity(symbol, daily_volume_usd=daily_volume_usd)
            else:
                return None
        except Exception: 
            return None
    # 🔴 --- END OF CHANGE --- 🔴
            
    async def get_whale_trading_signal(self, symbol, whale_data, market_context):
        try:
            return await self.whale_monitor.generate_whale_trading_signal(symbol, whale_data, market_context) if self.whale_monitor else {'action': 'HOLD', 'confidence': 0.3, 'reason': 'Whale monitor not available', 'source': 'whale_analysis'}
        except Exception as e:
            return {'action': 'HOLD', 'confidence': 0.3, 'reason': f'Error: {str(e)}', 'source': 'whale_analysis'}

print("✅ DataManager loaded - V10.2 (Whale Learning Data Link)")