Riy777 commited on
Commit
d21f065
·
1 Parent(s): 5243bd9

Update data_manager.py

Browse files
Files changed (1) hide show
  1. data_manager.py +100 -21
data_manager.py CHANGED
@@ -1,5 +1,5 @@
1
  # ml_engine/data_manager.py
2
- # (V12.3 - Hybrid Data Pipeline with Correct Naming)
3
 
4
  import os
5
  import asyncio
@@ -45,6 +45,7 @@ class DataManager:
45
  self.whale_monitor = whale_monitor
46
  self.r2_service = r2_service
47
 
 
48
  self.exchange = ccxt.kucoin({
49
  'enableRateLimit': True,
50
  'timeout': 30000,
@@ -60,6 +61,7 @@ class DataManager:
60
  self.pattern_analyzer = None
61
 
62
  async def initialize(self):
 
63
  self.http_client = httpx.AsyncClient(timeout=30.0)
64
  await self._load_markets()
65
 
@@ -75,9 +77,10 @@ class DataManager:
75
  except Exception as e:
76
  print(f"⚠️ [DataManager] تحذير أثناء تهيئة النماذج المساندة: {e}")
77
 
78
- print(f"✅ DataManager V12.3 initialized (Hybrid Threshold: {self.HYBRID_ENTRY_THRESHOLD})")
79
 
80
  async def _load_markets(self):
 
81
  try:
82
  if self.exchange:
83
  await self.exchange.load_markets()
@@ -86,49 +89,125 @@ class DataManager:
86
  print(f"❌ [DataManager] فشل تحميل الأسواق: {e}")
87
 
88
  async def close(self):
 
89
  if self.http_client: await self.http_client.aclose()
90
  if self.exchange: await self.exchange.close()
 
91
  if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'):
92
  self.pattern_analyzer.clear_memory()
93
  if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'):
94
  self.layer1_ranker.clear_memory()
95
 
 
 
 
96
  async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
97
- print(f"🔍 [Layer 1] بدء الغربلة السريعة...")
 
 
 
 
98
  volume_data = await self._get_volume_data_live()
99
- if not volume_data: return []
 
 
 
 
 
100
  candidates = volume_data[:150]
101
  print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.")
102
  return candidates
103
 
104
  async def _get_volume_data_live(self):
 
105
  try:
106
  tickers = await self.exchange.fetch_tickers()
107
  data = []
108
- for s, t in tickers.items():
109
- if s.endswith('/USDT') and t['quoteVolume'] and t['quoteVolume'] > 100000:
110
- data.append({'symbol': s, 'dollar_volume': t['quoteVolume'], 'current_price': t['last']})
 
 
 
 
 
 
111
  data.sort(key=lambda x: x['dollar_volume'], reverse=True)
112
  return data
113
- except: return []
 
 
114
 
 
 
 
115
  async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue):
116
- tfs = ['5m', '15m', '1h', '4h', '1d']
117
- limit = 500
 
 
 
 
 
118
  for sym_data in symbols:
119
- sym = sym_data['symbol']
120
- tasks = [self._fetch_ohlcv_live(sym, tf, limit) for tf in tfs]
 
121
  results = await asyncio.gather(*tasks, return_exceptions=False)
122
- ohlcv = {}
 
 
123
  for i, res in enumerate(results):
 
 
124
  if res and isinstance(res, list) and len(res) >= 200:
125
- ohlcv[tfs[i]] = res
126
- if len(ohlcv) >= 4 and '5m' in ohlcv:
127
- sym_data['ohlcv'] = ohlcv
128
- await queue.put([sym_data])
129
- await asyncio.sleep(0.1)
 
 
 
 
 
 
 
 
 
130
  await queue.put(None)
131
 
132
- async def _fetch_ohlcv_live(self, symbol, tf, limit):
133
- try: return await self.exchange.fetch_ohlcv(symbol, tf, limit=limit)
134
- except: return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # ml_engine/data_manager.py
2
+ # (V12.4 - Hybrid Data Pipeline with 'get_latest_ohlcv' Fix)
3
 
4
  import os
5
  import asyncio
 
45
  self.whale_monitor = whale_monitor
46
  self.r2_service = r2_service
47
 
48
+ # تهيئة منصة التبادل (KuCoin كمثال)
49
  self.exchange = ccxt.kucoin({
50
  'enableRateLimit': True,
51
  'timeout': 30000,
 
61
  self.pattern_analyzer = None
62
 
63
  async def initialize(self):
64
+ """تهيئة مدير البيانات والاتصالات"""
65
  self.http_client = httpx.AsyncClient(timeout=30.0)
66
  await self._load_markets()
67
 
 
77
  except Exception as e:
78
  print(f"⚠️ [DataManager] تحذير أثناء تهيئة النماذج المساندة: {e}")
79
 
80
+ print(f"✅ DataManager V12.4 initialized (Hybrid Threshold: {self.HYBRID_ENTRY_THRESHOLD})")
81
 
82
  async def _load_markets(self):
83
+ """تحميل بيانات الأسواق وتخزينها مؤقتاً"""
84
  try:
85
  if self.exchange:
86
  await self.exchange.load_markets()
 
89
  print(f"❌ [DataManager] فشل تحميل الأسواق: {e}")
90
 
91
  async def close(self):
92
+ """إغلاق جميع الاتصالات بأمان"""
93
  if self.http_client: await self.http_client.aclose()
94
  if self.exchange: await self.exchange.close()
95
+ # تنظيف ذاكرة النماذج الفرعية إذا لزم الأمر
96
  if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'):
97
  self.pattern_analyzer.clear_memory()
98
  if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'):
99
  self.layer1_ranker.clear_memory()
100
 
101
+ # ==================================================================
102
+ # 🛡️ دوال الطبقة الأولى (Layer 1 Screening)
103
+ # ==================================================================
104
  async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
105
+ """
106
+ الغربلة الأولية السريعة جداً بناءً على الحجم فقط.
107
+ تختار أعلى 150 عملة سيولةً لتمريرها للتحليل الهجين المعمق.
108
+ """
109
+ print(f"🔍 [Layer 1] بدء الغربلة السريعة (Top Liquid Assets)...")
110
  volume_data = await self._get_volume_data_live()
111
+
112
+ if not volume_data:
113
+ print("⚠️ [Layer 1 Warning] لم يتم العثور على بيانات حجم تداول.")
114
+ return []
115
+
116
+ # اختيار أعلى 150 عملة من حيث حجم التداول الدولاري
117
  candidates = volume_data[:150]
118
  print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.")
119
  return candidates
120
 
121
  async def _get_volume_data_live(self):
122
+ """جلب بيانات الحجم الحية لجميع الأزواج"""
123
  try:
124
  tickers = await self.exchange.fetch_tickers()
125
  data = []
126
+ for symbol, ticker in tickers.items():
127
+ # تصفية الأزواج: USDT فقط، وحجم تداول معقول (> 100k$) لتجنب العملات الميتة
128
+ if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000:
129
+ data.append({
130
+ 'symbol': symbol,
131
+ 'dollar_volume': ticker['quoteVolume'],
132
+ 'current_price': ticker['last']
133
+ })
134
+ # الترتيب التنازلي حسب الحجم
135
  data.sort(key=lambda x: x['dollar_volume'], reverse=True)
136
  return data
137
+ except Exception as e:
138
+ print(f"❌ [DataManager] خطأ في جلب بيانات الحجم: {e}")
139
+ return []
140
 
141
+ # ==================================================================
142
+ # 📊 دوال جلب البيانات (Data Fetching Pipeline)
143
+ # ==================================================================
144
  async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue):
145
+ """
146
+ مولد بيانات متدفق (Streaming Generator) يجلب شموع OHLCV
147
+ لعدة إطارات زمنية لكل عملة، ويضع النتائج في طابور المعالجة.
148
+ """
149
+ timeframes = ['5m', '15m', '1h', '4h', '1d']
150
+ limit = 500 # نحتاج تاريخ كافي للمؤشرات المعقدة
151
+
152
  for sym_data in symbols:
153
+ symbol = sym_data['symbol']
154
+ # جلب جميع الإطارات الزمنية بالتوازي للسرعة القصوى
155
+ tasks = [self._fetch_ohlcv_live(symbol, tf, limit) for tf in timeframes]
156
  results = await asyncio.gather(*tasks, return_exceptions=False)
157
+
158
+ ohlcv_packet = {}
159
+ valid_packet = True
160
  for i, res in enumerate(results):
161
+ tf = timeframes[i]
162
+ # التحقق من جودة البيانات (على الأقل 200 شمعة للتحليل الدقيق)
163
  if res and isinstance(res, list) and len(res) >= 200:
164
+ ohlcv_packet[tf] = res
165
+ else:
166
+ # إذا فقدنا إطاراً زمنياً حيوياً (مثل 5m أو 1h)، قد نعتبر الحزمة غير صالحة
167
+ if tf in ['5m', '1h']: valid_packet = False
168
+
169
+ # إذا كانت الحزمة مكتملة بما يكفي، نرسلها للمعالجة
170
+ if valid_packet and len(ohlcv_packet) >= 4:
171
+ sym_data['ohlcv'] = ohlcv_packet
172
+ await queue.put([sym_data]) # نرسل كقائمة لتوافق المعالج
173
+
174
+ # فاصل زمني صغير جداً لتجنب تجاوز حدود الـ API بعنف
175
+ await asyncio.sleep(0.05)
176
+
177
+ # علامة نهاية التدفق
178
  await queue.put(None)
179
 
180
+ async def _fetch_ohlcv_live(self, symbol, timeframe, limit):
181
+ """دالة مساعدة لجلب الشموع مع معالجة الأخطاء البسيطة"""
182
+ try:
183
+ return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
184
+ except Exception:
185
+ # نتجاهل الأخطاء الفردية لعدم إيقاف التدفق الكامل
186
+ return None
187
+
188
+ # ==================================================================
189
+ # 🎯 دوال مساعدة للحارس والدماغ (Sentry & Brain Helpers)
190
+ # ==================================================================
191
+ async def get_latest_price_async(self, symbol: str) -> float:
192
+ """جلب آخر سعر حقيقي (للتنفيذ والمراقبة)"""
193
+ try:
194
+ ticker = await self.exchange.fetch_ticker(symbol)
195
+ return float(ticker['last'])
196
+ except Exception as e:
197
+ print(f"⚠️ [DataManager] Failed to fetch price for {symbol}: {e}")
198
+ return 0.0
199
+
200
+ # [NEW FIX V12.4] الدالة التي كانت مفقودة وتمت إضافتها
201
+ async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
202
+ """
203
+ جلب عدد محدود من الشموع الأخيرة بسرعة.
204
+ يستخدمها 'القناص' (Sniper) للمراقبة اللحظية الخفيفة.
205
+ """
206
+ try:
207
+ candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
208
+ if candles and len(candles) > 0:
209
+ return candles
210
+ return []
211
+ except Exception as e:
212
+ # print(f"⚠️ [DataManager] Failed rapid OHLCV fetch for {symbol}: {e}")
213
+ return []