File size: 11,735 Bytes
2bf9457
12cf278
2bf9457
00bb5c9
 
 
 
 
 
2bf9457
d69dead
28fa18b
394e2c7
164b380
 
 
 
 
909d04f
164b380
11b4dc5
 
2bf9457
 
82e6b70
 
 
 
 
53cf6c0
28fa18b
 
2bf9457
53cf6c0
 
29d027f
4437a6f
29d027f
4437a6f
29d027f
19ecbac
4437a6f
 
53cf6c0
d2775f3
4ace337
c6f72fe
d21f065
29d027f
 
 
 
c6f72fe
87e3669
56e3f87
11b4dc5
 
2bf9457
82e6b70
909d04f
d50c5b6
909d04f
53cf6c0
d21f065
248e033
56e3f87
6b00681
62a43d9
29d027f
 
62a43d9
 
 
29d027f
 
 
62a43d9
 
 
29d027f
 
62a43d9
6b00681
62a43d9
24a0949
56e3f87
d21f065
56e3f87
2bf9457
 
29d027f
2bf9457
909d04f
56e3f87
53cf6c0
d21f065
909d04f
 
d21f065
82e6b70
 
 
 
11b4dc5
12cf278
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d21f065
 
 
24a0949
d21f065
 
 
 
 
2bf9457
d21f065
 
 
 
 
 
29d027f
 
 
4437a6f
909d04f
d21f065
b44825a
2bf9457
909d04f
d21f065
 
 
 
 
 
 
 
 
909d04f
 
d21f065
 
 
909d04f
d21f065
 
 
909d04f
d21f065
 
 
 
 
 
 
4437a6f
d21f065
 
 
909d04f
d21f065
 
 
909d04f
d21f065
 
82e6b70
d21f065
 
 
 
 
 
 
 
 
 
 
 
 
 
82e6b70
 
d21f065
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# ml_engine/data_manager.py
# (V12.5 - Lazy Loading Fix + V15.6 App-Compat Fix)

import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt.async_support as ccxt
import numpy as np
import logging
from typing import List, Dict, Any
import pandas as pd

try:
    import pandas_ta as ta
except ImportError:
    print("❌ [DataManager] مكتبة pandas_ta غير موجودة.")
    ta = None

from ml_engine.indicators import AdvancedTechnicalAnalyzer
from ml_engine.monte_carlo import MonteCarloAnalyzer
from ml_engine.ranker import Layer1Ranker
try:
    from ml_engine.patterns import ChartPatternAnalyzer
except ImportError:
    print("⚠️ [DataManager] لم يتم العثور على ml_engine/patterns.py")
    ChartPatternAnalyzer = None

logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)

class DataManager:
    def __init__(self, contracts_db, whale_monitor, r2_service=None):
        # ==================================================================
        # ⚙️ إعدادات التحكم المركزية (V12.3 Hybrid Thresholds)
        # ==================================================================
        # العتبة الرئيسية للنظام الهجين (Titan + Patterns + MC)
        self.HYBRID_ENTRY_THRESHOLD = 0.60
        # ==================================================================

        self.contracts_db = contracts_db or {}
        self.whale_monitor = whale_monitor
        self.r2_service = r2_service
        
        # تهيئة منصة التبادل (KuCoin كمثال)
        self.exchange = ccxt.kucoin({
            'enableRateLimit': True,
            'timeout': 30000,
        })
            
        self.http_client = None
        self.market_cache = {}

        self.technical_analyzer = AdvancedTechnicalAnalyzer()
        self.mc_analyzer = MonteCarloAnalyzer()
        
        self.layer1_ranker = None
        self.pattern_analyzer = None

    async def initialize(self):
        """تهيئة مدير البيانات والاتصالات"""
        self.http_client = httpx.AsyncClient(timeout=30.0)
        await self._load_markets()
        
        print("   > [DataManager] إنشاء النماذج المساندة (Lazy Load)...") # (تغيير الرسالة)
        try:
            self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
            # [ 🔴🔴🔴 LAZY LOAD 🔴🔴🔴 ]
            # await self.layer1_ranker.initialize() # (تم الإلغاء - سيتم تحميله عند الحاجة)
            # [ 🔴🔴🔴 END LAZY LOAD 🔴🔴🔴 ]
            
            if ChartPatternAnalyzer:
                self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
                # [ 🔴🔴🔴 LAZY LOAD 🔴🔴🔴 ]
                # await self.pattern_analyzer.initialize() # (تم الإلغاء - سيتم تحميله عند الحاجة)
                # [ 🔴🔴🔴 END LAZY LOAD 🔴🔴🔴 ]
                
        except Exception as e:
            print(f"⚠️ [DataManager] تحذير أثناء إنشاء النماذج المساندة: {e}")

        print(f"✅ DataManager V12.5 initialized (Hybrid Threshold: {self.HYBRID_ENTRY_THRESHOLD})")

    async def _load_markets(self):
        """تحميل بيانات الأسواق وتخزينها مؤقتاً"""
        try:
            if self.exchange:
                await self.exchange.load_markets()
                self.market_cache = self.exchange.markets
        except Exception as e:
            print(f"❌ [DataManager] فشل تحميل الأسواق: {e}")

    async def close(self):
        """إغلاق جميع الاتصالات بأمان"""
        if self.http_client: await self.http_client.aclose()
        if self.exchange: await self.exchange.close()
        # تنظيف ذاكرة النماذج الفرعية إذا لزم الأمر
        if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'):
            self.pattern_analyzer.clear_memory()
        if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'):
            self.layer1_ranker.clear_memory()

    # ==================================================================
    # 🚀 [إضافة جديدة V15.6] دوال التوافق مع App
    # ==================================================================
    async def load_contracts_from_r2(self):
        """
        [جديد] يقوم بتحميل قاعدة بيانات العقود من R2 عند بدء التشغيل.
        (مطلوب بواسطة app.py V15.6)
        """
        print("   > [DataManager] Loading contracts database from R2...")
        if not self.r2_service:
            print("❌ [DataManager] R2Service not available. Cannot load contracts.")
            self.contracts_db = {}
            return
        
        try:
            self.contracts_db = await self.r2_service.load_contracts_db_async()
            print(f"✅ [DataManager] Contracts loaded. Total entries: {len(self.contracts_db)}")
        except Exception as e:
            print(f"❌ [DataManager] Failed to load contracts from R2: {e}")
            self.contracts_db = {}

    def get_contracts_db(self) -> Dict[str, Any]:
        """
        [جديد] إرجاع قاعدة بيانات العقود التي تم تحميلها.
        (مطلوب بواسطة app.py V15.6)
        """
        return self.contracts_db
    
    # ==================================================================
    # 🛡️ دوال الطبقة الأولى (Layer 1 Screening)
    # ==================================================================
    async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
        """
        الغربلة الأولية السريعة جداً بناءً على الحجم فقط.
        تختار أعلى 150 عملة سيولةً لتمريرها للتحليل الهجين المعمق.
        """
        print(f"🔍 [Layer 1] بدء الغربلة السريعة (Top Liquid Assets)...")
        volume_data = await self._get_volume_data_live()
        
        if not volume_data:
            print("⚠️ [Layer 1 Warning] لم يتم العثور على بيانات حجم تداول.")
            return []

        # اختيار أعلى 150 عملة من حيث حجم التداول الدولاري
        candidates = volume_data[:150]
        print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.")
        return candidates

    async def _get_volume_data_live(self):
        """جلب بيانات الحجم الحية لجميع الأزواج"""
        try:
            tickers = await self.exchange.fetch_tickers()
            data = []
            for symbol, ticker in tickers.items():
                # تصفية الأزواج: USDT فقط، وحجم تداول معقول (> 100k$) لتجنب العملات الميتة
                if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000:
                    data.append({
                        'symbol': symbol,
                        'dollar_volume': ticker['quoteVolume'],
                        'current_price': ticker['last']
                    })
            # الترتيب التنازلي حسب الحجم
            data.sort(key=lambda x: x['dollar_volume'], reverse=True)
            return data
        except Exception as e:
            print(f"❌ [DataManager] خطأ في جلب بيانات الحجم: {e}")
            return []

    # ==================================================================
    # 📊 دوال جلب البيانات (Data Fetching Pipeline)
    # ==================================================================
    async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue):
        """
        مولد بيانات متدفق (Streaming Generator) يجلب شموع OHLCV
        لعدة إطارات زمنية لكل عملة، ويضع النتائج في طابور المعالجة.
        """
        timeframes = ['5m', '15m', '1h', '4h', '1d']
        limit = 500 # نحتاج تاريخ كافي للمؤشرات المعقدة

        for sym_data in symbols:
            symbol = sym_data['symbol']
            # جلب جميع الإطارات الزمنية بالتوازي للسرعة القصوى
            tasks = [self._fetch_ohlcv_live(symbol, tf, limit) for tf in timeframes]
            results = await asyncio.gather(*tasks, return_exceptions=False)
            
            ohlcv_packet = {}
            valid_packet = True
            for i, res in enumerate(results):
                tf = timeframes[i]
                # التحقق من جودة البيانات (على الأقل 200 شمعة للتحليل الدقيق)
                if res and isinstance(res, list) and len(res) >= 200:
                    ohlcv_packet[tf] = res
                else:
                    # إذا فقدنا إطاراً زمنياً حيوياً (مثل 5m أو 1h)، قد نعتبر الحزمة غير صالحة
                    if tf in ['5m', '1h']: valid_packet = False

            # إذا كانت الحزمة مكتملة بما يكفي، نرسلها للمعالجة
            if valid_packet and len(ohlcv_packet) >= 4:
                sym_data['ohlcv'] = ohlcv_packet
                await queue.put([sym_data]) # نرسل كقائمة لتوافق المعالج
            
            # فاصل زمني صغير جداً لتجنب تجاوز حدود الـ API بعنف
            await asyncio.sleep(0.05)

        # علامة نهاية التدفق
        await queue.put(None)

    async def _fetch_ohlcv_live(self, symbol, timeframe, limit):
        """دالة مساعدة لجلب الشموع مع معالجة الأخطاء البسيطة"""
        try:
            return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
        except Exception:
            # نتجاهل الأخطاء الفردية لعدم إيقاف التدفق الكامل
            return None

    # ==================================================================
    # 🎯 دوال مساعدة للحارس والدماغ (Sentry & Brain Helpers)
    # ==================================================================
    async def get_latest_price_async(self, symbol: str) -> float:
        """جلب آخر سعر حقيقي (للتنفيذ والمراقبة)"""
        try:
            ticker = await self.exchange.fetch_ticker(symbol)
            return float(ticker['last'])
        except Exception as e:
            print(f"⚠️ [DataManager] Failed to fetch price for {symbol}: {e}")
            return 0.0

    # [NEW FIX V12.4] الدالة التي كانت مفقودة وتمت إضافتها
    async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
        """
        جلب عدد محدود من الشموع الأخيرة بسرعة.
        يستخدمها 'القناص' (Sniper) للمراقبة اللحظية الخفيفة.
        """
        try:
            candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
            if candles and len(candles) > 0:
                return candles
            return []
        except Exception as e:
            # print(f"⚠️ [DataManager] Failed rapid OHLCV fetch for {symbol}: {e}")
            return []