Riy777 commited on
Commit
d50c5b6
·
1 Parent(s): c88a877

Update data_manager.py

Browse files
Files changed (1) hide show
  1. data_manager.py +8 -34
data_manager.py CHANGED
@@ -1,5 +1,5 @@
1
  # ml_engine/data_manager.py
2
- # (V12.1 - Titan + Hybrid Support Pipeline)
3
 
4
  import os
5
  import asyncio
@@ -13,7 +13,6 @@ import logging
13
  from typing import List, Dict, Any
14
  import pandas as pd
15
 
16
- # محاولة استيراد pandas_ta
17
  try:
18
  import pandas_ta as ta
19
  except ImportError:
@@ -23,7 +22,6 @@ except ImportError:
23
  from ml_engine.indicators import AdvancedTechnicalAnalyzer
24
  from ml_engine.monte_carlo import MonteCarloAnalyzer
25
  from ml_engine.ranker import Layer1Ranker
26
- # 🔥 إعادة استيراد محرك الأنماط لدعم النظام الهجين
27
  try:
28
  from ml_engine.patterns import ChartPatternAnalyzer
29
  except ImportError:
@@ -37,10 +35,10 @@ logging.getLogger("ccxt").setLevel(logging.WARNING)
37
  class DataManager:
38
  def __init__(self, contracts_db, whale_monitor, r2_service=None):
39
  # ==================================================================
40
- # ⚙️ إعدادات التحكم المركزية (V12 Hybrid Thresholds)
41
  # ==================================================================
42
- self.SCREENING_THRESHOLD = 0.40 # غربلة أولية
43
- self.TITAN_ENTRY_THRESHOLD = 0.90 # عتبة Titan الصارمة
44
  # ==================================================================
45
 
46
  self.contracts_db = contracts_db or {}
@@ -58,22 +56,18 @@ class DataManager:
58
  self.technical_analyzer = AdvancedTechnicalAnalyzer()
59
  self.mc_analyzer = MonteCarloAnalyzer()
60
 
61
- # نماذج الطبقة الأولى والثانية المساندة
62
  self.layer1_ranker = None
63
- self.pattern_analyzer = None # 🔥 تمت إعادته لتجنب AttributeError
64
 
65
  async def initialize(self):
66
- """تهيئة الاتصالات وتحميل جميع النماذج المساندة"""
67
  self.http_client = httpx.AsyncClient(timeout=30.0)
68
  await self._load_markets()
69
 
70
  print(" > [DataManager] تهيئة النماذج المساندة (Ranker + Patterns)...")
71
  try:
72
- # 1. الكاشف المصغر (Ranker)
73
  self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
74
  await self.layer1_ranker.initialize()
75
 
76
- # 2. محرك الأنماط (Patterns) - للنظام الهجين
77
  if ChartPatternAnalyzer:
78
  self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
79
  await self.pattern_analyzer.initialize()
@@ -81,7 +75,7 @@ class DataManager:
81
  except Exception as e:
82
  print(f"⚠️ [DataManager] تحذير أثناء تهيئة النماذج المساندة: {e}")
83
 
84
- print(f"✅ DataManager V12.1 initialized (Hybrid Ready).")
85
 
86
  async def _load_markets(self):
87
  try:
@@ -94,22 +88,15 @@ class DataManager:
94
  async def close(self):
95
  if self.http_client: await self.http_client.aclose()
96
  if self.exchange: await self.exchange.close()
97
-
98
- # تنظيف الذاكرة عند الإغلاق
99
  if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'):
100
  self.pattern_analyzer.clear_memory()
101
  if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'):
102
  self.layer1_ranker.clear_memory()
103
 
104
- # ==================================================================
105
- # 🔴 الطبقة 1: الغربلة السريعة
106
- # ==================================================================
107
  async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
108
  print(f"🔍 [Layer 1] بدء الغربلة السريعة...")
109
  volume_data = await self._get_volume_data_live()
110
  if not volume_data: return []
111
-
112
- # توسيع النطاق لـ 150 عملة لزيادة فرص Titan
113
  candidates = volume_data[:150]
114
  print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.")
115
  return candidates
@@ -119,42 +106,29 @@ class DataManager:
119
  tickers = await self.exchange.fetch_tickers()
120
  data = []
121
  for s, t in tickers.items():
122
- # فلترة العملات ذات السيولة الضعيفة جداً (< 100k)
123
  if s.endswith('/USDT') and t['quoteVolume'] and t['quoteVolume'] > 100000:
124
  data.append({'symbol': s, 'dollar_volume': t['quoteVolume'], 'current_price': t['last']})
125
  data.sort(key=lambda x: x['dollar_volume'], reverse=True)
126
  return data
127
  except: return []
128
 
129
- # ==================================================================
130
- # 🔵 خط أنابيب البيانات الهجين (Hybrid Data Stream)
131
- # ==================================================================
132
  async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue):
133
- """جلب بيانات عميقة (500 شمعة) لكافة الأطر المطلوبة"""
134
  tfs = ['5m', '15m', '1h', '4h', '1d']
135
  limit = 500
136
-
137
  for sym_data in symbols:
138
  sym = sym_data['symbol']
139
  tasks = [self._fetch_ohlcv_live(sym, tf, limit) for tf in tfs]
140
  results = await asyncio.gather(*tasks, return_exceptions=False)
141
-
142
  ohlcv = {}
143
  for i, res in enumerate(results):
144
  if res and isinstance(res, list) and len(res) >= 200:
145
  ohlcv[tfs[i]] = res
146
-
147
- # يجب توفر معظم الأطر لعمل النظام الهجين بدقة
148
  if len(ohlcv) >= 4 and '5m' in ohlcv:
149
  sym_data['ohlcv'] = ohlcv
150
  await queue.put([sym_data])
151
-
152
- await asyncio.sleep(0.1) # تجنب حظر API
153
-
154
  await queue.put(None)
155
 
156
  async def _fetch_ohlcv_live(self, symbol, tf, limit):
157
  try: return await self.exchange.fetch_ohlcv(symbol, tf, limit=limit)
158
- except: return None
159
-
160
- print("✅ DataManager V12.1 (Hybrid Support) loaded.")
 
1
  # ml_engine/data_manager.py
2
+ # (V12.3 - Hybrid Data Pipeline with Correct Naming)
3
 
4
  import os
5
  import asyncio
 
13
  from typing import List, Dict, Any
14
  import pandas as pd
15
 
 
16
  try:
17
  import pandas_ta as ta
18
  except ImportError:
 
22
  from ml_engine.indicators import AdvancedTechnicalAnalyzer
23
  from ml_engine.monte_carlo import MonteCarloAnalyzer
24
  from ml_engine.ranker import Layer1Ranker
 
25
  try:
26
  from ml_engine.patterns import ChartPatternAnalyzer
27
  except ImportError:
 
35
  class DataManager:
36
  def __init__(self, contracts_db, whale_monitor, r2_service=None):
37
  # ==================================================================
38
+ # ⚙️ إعدادات التحكم المركزية (V12.3 Hybrid Thresholds)
39
  # ==================================================================
40
+ # العتبة الرئيسية للنظام الهجين (Titan + Patterns + MC)
41
+ self.HYBRID_ENTRY_THRESHOLD = 0.90
42
  # ==================================================================
43
 
44
  self.contracts_db = contracts_db or {}
 
56
  self.technical_analyzer = AdvancedTechnicalAnalyzer()
57
  self.mc_analyzer = MonteCarloAnalyzer()
58
 
 
59
  self.layer1_ranker = None
60
+ self.pattern_analyzer = None
61
 
62
  async def initialize(self):
 
63
  self.http_client = httpx.AsyncClient(timeout=30.0)
64
  await self._load_markets()
65
 
66
  print(" > [DataManager] تهيئة النماذج المساندة (Ranker + Patterns)...")
67
  try:
 
68
  self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
69
  await self.layer1_ranker.initialize()
70
 
 
71
  if ChartPatternAnalyzer:
72
  self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
73
  await self.pattern_analyzer.initialize()
 
75
  except Exception as e:
76
  print(f"⚠️ [DataManager] تحذير أثناء تهيئة النماذج المساندة: {e}")
77
 
78
+ print(f"✅ DataManager V12.3 initialized (Hybrid Threshold: {self.HYBRID_ENTRY_THRESHOLD})")
79
 
80
  async def _load_markets(self):
81
  try:
 
88
  async def close(self):
89
  if self.http_client: await self.http_client.aclose()
90
  if self.exchange: await self.exchange.close()
 
 
91
  if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'):
92
  self.pattern_analyzer.clear_memory()
93
  if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'):
94
  self.layer1_ranker.clear_memory()
95
 
 
 
 
96
  async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
97
  print(f"🔍 [Layer 1] بدء الغربلة السريعة...")
98
  volume_data = await self._get_volume_data_live()
99
  if not volume_data: return []
 
 
100
  candidates = volume_data[:150]
101
  print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.")
102
  return candidates
 
106
  tickers = await self.exchange.fetch_tickers()
107
  data = []
108
  for s, t in tickers.items():
 
109
  if s.endswith('/USDT') and t['quoteVolume'] and t['quoteVolume'] > 100000:
110
  data.append({'symbol': s, 'dollar_volume': t['quoteVolume'], 'current_price': t['last']})
111
  data.sort(key=lambda x: x['dollar_volume'], reverse=True)
112
  return data
113
  except: return []
114
 
 
 
 
115
  async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue):
 
116
  tfs = ['5m', '15m', '1h', '4h', '1d']
117
  limit = 500
 
118
  for sym_data in symbols:
119
  sym = sym_data['symbol']
120
  tasks = [self._fetch_ohlcv_live(sym, tf, limit) for tf in tfs]
121
  results = await asyncio.gather(*tasks, return_exceptions=False)
 
122
  ohlcv = {}
123
  for i, res in enumerate(results):
124
  if res and isinstance(res, list) and len(res) >= 200:
125
  ohlcv[tfs[i]] = res
 
 
126
  if len(ohlcv) >= 4 and '5m' in ohlcv:
127
  sym_data['ohlcv'] = ohlcv
128
  await queue.put([sym_data])
129
+ await asyncio.sleep(0.1)
 
 
130
  await queue.put(None)
131
 
132
  async def _fetch_ohlcv_live(self, symbol, tf, limit):
133
  try: return await self.exchange.fetch_ohlcv(symbol, tf, limit=limit)
134
+ except: return None