Riy777 commited on
Commit
82e6b70
·
1 Parent(s): ed906df

Update data_manager.py

Browse files
Files changed (1) hide show
  1. data_manager.py +52 -98
data_manager.py CHANGED
@@ -1,5 +1,5 @@
1
  # ml_engine/data_manager.py
2
- # (V12.0 - Titan Ready Data Pipeline)
3
 
4
  import os
5
  import asyncio
@@ -23,7 +23,12 @@ except ImportError:
23
  from ml_engine.indicators import AdvancedTechnicalAnalyzer
24
  from ml_engine.monte_carlo import MonteCarloAnalyzer
25
  from ml_engine.ranker import Layer1Ranker
26
- # لاحظ: لم نعد بحاجة لمحرك الأنماط القديم هنا، Titan سيقوم بالمهمة في Processor
 
 
 
 
 
27
 
28
  logging.getLogger("httpx").setLevel(logging.WARNING)
29
  logging.getLogger("httpcore").setLevel(logging.WARNING)
@@ -32,11 +37,10 @@ logging.getLogger("ccxt").setLevel(logging.WARNING)
32
  class DataManager:
33
  def __init__(self, contracts_db, whale_monitor, r2_service=None):
34
  # ==================================================================
35
- # ⚙️ إعدادات التحكم المركزية (V12.0 Titan Thresholds)
36
  # ==================================================================
37
- self.SCREENING_THRESHOLD = 0.40 # عتبة الغربلة الأولية السريعة (Layer 1)
38
- self.TITAN_ENTRY_THRESHOLD = 0.90 # عتبة دخول Titan الصارمة (Layer 2)
39
- self.PROFIT_SAVE_THRESHOLD = 0.30 # عتبة الخروج الذكي (إذا انخفضت ثقة Titan عن هذا الحد)
40
  # ==================================================================
41
 
42
  self.contracts_db = contracts_db or {}
@@ -50,33 +54,40 @@ class DataManager:
50
 
51
  self.http_client = None
52
  self.market_cache = {}
53
- self.last_market_load = None
54
 
55
  self.technical_analyzer = AdvancedTechnicalAnalyzer()
56
  self.mc_analyzer = MonteCarloAnalyzer()
 
 
57
  self.layer1_ranker = None
 
58
 
59
  async def initialize(self):
60
- """تهيئة الاتصالات وتحميل نماذج الطبقة الأولى"""
61
  self.http_client = httpx.AsyncClient(timeout=30.0)
62
  await self._load_markets()
63
 
64
- print(" > [DataManager V12.0] تهيئة الكاشف المصغر (Layer1 Ranker)...")
65
  try:
66
- # تأكد من وجود ملف النموذج هذا في مساره
67
  self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
68
  await self.layer1_ranker.initialize()
 
 
 
 
 
 
69
  except Exception as e:
70
- print(f"⚠️ [DataManager] تحذير: فشل تهيئة Layer1 Ranker: {e}")
71
 
72
- print(f"✅ DataManager V12.0 initialized. Titan Threshold: {self.TITAN_ENTRY_THRESHOLD}")
73
 
74
  async def _load_markets(self):
75
  try:
76
  if self.exchange:
77
  await self.exchange.load_markets()
78
  self.market_cache = self.exchange.markets
79
- self.last_market_load = datetime.now()
80
  except Exception as e:
81
  print(f"❌ [DataManager] فشل تحميل الأسواق: {e}")
82
 
@@ -84,123 +95,66 @@ class DataManager:
84
  if self.http_client: await self.http_client.aclose()
85
  if self.exchange: await self.exchange.close()
86
 
87
- async def get_market_context_async(self):
88
- try:
89
- sentiment = await self.get_sentiment_live_async()
90
- prices = await self._get_prices_live()
91
- return {
92
- 'timestamp': datetime.now().isoformat(),
93
- 'bitcoin_price_usd': prices.get('bitcoin'),
94
- 'fear_and_greed_index': sentiment.get('feargreed_value') if sentiment else 50,
95
- 'market_trend': self._determine_market_trend(prices.get('bitcoin'), sentiment)
96
- }
97
- except: return {'market_trend': 'UNKNOWN'}
98
-
99
- async def get_sentiment_live_async(self):
100
- try:
101
- async with httpx.AsyncClient(timeout=10) as client:
102
- resp = await client.get("https://api.alternative.me/fng/")
103
- if resp.status_code == 200:
104
- data = resp.json()
105
- return {"feargreed_value": int(data['data'][0]['value'])}
106
- except: return None
107
-
108
- async def _get_prices_live(self):
109
- if not self.exchange: return {}
110
- try:
111
- btc = await self.exchange.fetch_ticker('BTC/USDT')
112
- return {'bitcoin': btc['last']}
113
- except: return {}
114
-
115
- def _determine_market_trend(self, btc_price, sentiment):
116
- # منطق بسيط لتحديد الاتجاه العام
117
- if not btc_price: return "NEUTRAL"
118
- score = 0
119
- if btc_price > 65000: score += 1 # مثال
120
- if sentiment and sentiment.get('feargreed_value', 50) > 60: score += 1
121
- return "BULLISH" if score > 0 else "BEARISH"
122
 
123
  # ==================================================================
124
- # 🔴 الطبقة 1: الغربلة السريعة (Layer 1 Rapid Screening)
125
  # ==================================================================
126
  async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
127
- """غربلة أولية سريعة لتقليل عدد العملات التي يحللها Titan"""
128
  print(f"🔍 [Layer 1] بدء الغربلة السريعة...")
129
  volume_data = await self._get_volume_data_live()
130
  if not volume_data: return []
131
 
132
- # نأخذ أعلى 150 عملة سيولة لتوسيع نطاق البحث لـ Titan
133
  candidates = volume_data[:150]
134
- qualified = []
135
-
136
- # تحليل سريع جداً (فريم 1H فقط)
137
- batch_size = 30 # دفعة أكبر قليلاً للسرعة
138
- for i in range(0, len(candidates), batch_size):
139
- batch = candidates[i:i+batch_size]
140
- tasks = [self._fetch_ohlcv_live(c['symbol'], '1h', 150) for c in batch]
141
- results = await asyncio.gather(*tasks, return_exceptions=True)
142
-
143
- for j, candles in enumerate(results):
144
- if isinstance(candles, list) and len(candles) >= 100:
145
- # هنا يمكن إضافة منطق فلترة بسيط (مثلاً: فوق متوسط 50)
146
- # حالياً سنمرر معظم العملات الجيدة لـ Titan ليقرر هو
147
- candidates[i+j]['layer1_score'] = 0.5 # درجة مبدئية
148
- qualified.append(candidates[i+j])
149
-
150
- print(f"✅ [Layer 1] تأهل {len(qualified)} عملة للتحليل العميق بواسطة Titan.")
151
- return qualified
152
 
153
  async def _get_volume_data_live(self):
154
  try:
155
  tickers = await self.exchange.fetch_tickers()
156
  data = []
157
  for s, t in tickers.items():
 
158
  if s.endswith('/USDT') and t['quoteVolume'] and t['quoteVolume'] > 100000:
159
  data.append({'symbol': s, 'dollar_volume': t['quoteVolume'], 'current_price': t['last']})
160
  data.sort(key=lambda x: x['dollar_volume'], reverse=True)
161
  return data
162
  except: return []
163
 
164
- async def _fetch_ohlcv_live(self, symbol, tf, limit):
165
- try: return await self.exchange.fetch_ohlcv(symbol, tf, limit=limit)
166
- except: return None
167
-
168
  # ==================================================================
169
- # 🔵 خط أنابيب بيانات Titan (Layer 2 Data Stream)
170
  # ==================================================================
171
  async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue):
172
- """
173
- يجلب البيانات العميقة (500 شمعة) لجميع الأطر التي يحتاجها Titan.
174
- الأطر المطلوبة: 5m, 15m, 1h, 4h, 1d
175
- """
176
- print(f"🚚 [Titan Data Stream] جلب بيانات عميقة لـ {len(symbols)} عملة...")
177
-
178
- tfs = ['5m', '15m', '1h', '4h', '1d'] # أطر Titan الكاملة
179
- limit = 500 # عمق كافٍ لحساب كل المؤشرات بدقة (مثل EMA200)
180
 
181
  for sym_data in symbols:
182
  sym = sym_data['symbol']
183
- # جلب متوازي لكل الأطر الزمنية للعملة الواحدة
184
  tasks = [self._fetch_ohlcv_live(sym, tf, limit) for tf in tfs]
185
  results = await asyncio.gather(*tasks, return_exceptions=False)
186
 
187
- ohlcv_package = {}
188
- complete_data = True
189
  for i, res in enumerate(results):
190
- if res and isinstance(res, list) and len(res) >= 200: # نحتاج 200 شمعة على الأقل
191
- ohlcv_package[tfs[i]] = res
192
- else:
193
- # إذا فقدنا إطاراً رئيسياً (مثل 5m)، قد لا يعمل Titan بدقة
194
- if tfs[i] == '5m': complete_data = False
195
 
196
- if complete_data and len(ohlcv_package) >= 4: # نتسامح مع فقدان إطار واحد غير رئيسي
197
- sym_data['ohlcv'] = ohlcv_package
198
- await queue.put([sym_data]) # إرسال العملة للمعالجة فوراً
 
199
 
200
- # فاصل زمني صغير لتجنب حظر الـ API
201
- await asyncio.sleep(0.1)
202
 
203
- await queue.put(None) # إشارة انتهاء الدفق
204
- print("🏁 [Titan Data Stream] اكتمل النقل.")
 
 
 
205
 
206
- print("✅ DataManager V12.0 (Titan Pipeline) loaded.")
 
1
  # ml_engine/data_manager.py
2
+ # (V12.1 - Titan + Hybrid Support Pipeline)
3
 
4
  import os
5
  import asyncio
 
23
  from ml_engine.indicators import AdvancedTechnicalAnalyzer
24
  from ml_engine.monte_carlo import MonteCarloAnalyzer
25
  from ml_engine.ranker import Layer1Ranker
26
+ # 🔥 إعادة استيراد محرك الأنماط لدعم النظام الهجين
27
+ try:
28
+ from ml_engine.patterns import ChartPatternAnalyzer
29
+ except ImportError:
30
+ print("⚠️ [DataManager] لم يتم العثور على ml_engine/patterns.py")
31
+ ChartPatternAnalyzer = None
32
 
33
  logging.getLogger("httpx").setLevel(logging.WARNING)
34
  logging.getLogger("httpcore").setLevel(logging.WARNING)
 
37
  class DataManager:
38
  def __init__(self, contracts_db, whale_monitor, r2_service=None):
39
  # ==================================================================
40
+ # ⚙️ إعدادات التحكم المركزية (V12 Hybrid Thresholds)
41
  # ==================================================================
42
+ self.SCREENING_THRESHOLD = 0.40 # غربلة أولية
43
+ self.TITAN_ENTRY_THRESHOLD = 0.90 # عتبة Titan الصارمة
 
44
  # ==================================================================
45
 
46
  self.contracts_db = contracts_db or {}
 
54
 
55
  self.http_client = None
56
  self.market_cache = {}
 
57
 
58
  self.technical_analyzer = AdvancedTechnicalAnalyzer()
59
  self.mc_analyzer = MonteCarloAnalyzer()
60
+
61
+ # نماذج الطبقة الأولى والثانية المساندة
62
  self.layer1_ranker = None
63
+ self.pattern_analyzer = None # 🔥 تمت إعادته لتجنب AttributeError
64
 
65
  async def initialize(self):
66
+ """تهيئة الاتصالات وتحميل جميع النماذج المساندة"""
67
  self.http_client = httpx.AsyncClient(timeout=30.0)
68
  await self._load_markets()
69
 
70
+ print(" > [DataManager] تهيئة النماذج المساندة (Ranker + Patterns)...")
71
  try:
72
+ # 1. الكاشف المصغر (Ranker)
73
  self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
74
  await self.layer1_ranker.initialize()
75
+
76
+ # 2. محرك الأنماط (Patterns) - للنظام الهجين
77
+ if ChartPatternAnalyzer:
78
+ self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
79
+ await self.pattern_analyzer.initialize()
80
+
81
  except Exception as e:
82
+ print(f"⚠️ [DataManager] تحذير أثناء تهيئة النماذج المساندة: {e}")
83
 
84
+ print(f"✅ DataManager V12.1 initialized (Hybrid Ready).")
85
 
86
  async def _load_markets(self):
87
  try:
88
  if self.exchange:
89
  await self.exchange.load_markets()
90
  self.market_cache = self.exchange.markets
 
91
  except Exception as e:
92
  print(f"❌ [DataManager] فشل تحميل الأسواق: {e}")
93
 
 
95
  if self.http_client: await self.http_client.aclose()
96
  if self.exchange: await self.exchange.close()
97
 
98
+ # تنظيف الذاكرة عند الإغلاق
99
+ if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'):
100
+ self.pattern_analyzer.clear_memory()
101
+ if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'):
102
+ self.layer1_ranker.clear_memory()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
103
 
104
  # ==================================================================
105
+ # 🔴 الطبقة 1: الغربلة السريعة
106
  # ==================================================================
107
  async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
 
108
  print(f"🔍 [Layer 1] بدء الغربلة السريعة...")
109
  volume_data = await self._get_volume_data_live()
110
  if not volume_data: return []
111
 
112
+ # توسيع النطاق لـ 150 عملة لزيادة فرص Titan
113
  candidates = volume_data[:150]
114
+ print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.")
115
+ return candidates
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
 
117
  async def _get_volume_data_live(self):
118
  try:
119
  tickers = await self.exchange.fetch_tickers()
120
  data = []
121
  for s, t in tickers.items():
122
+ # فلترة العملات ذات السيولة الضعيفة جداً (< 100k)
123
  if s.endswith('/USDT') and t['quoteVolume'] and t['quoteVolume'] > 100000:
124
  data.append({'symbol': s, 'dollar_volume': t['quoteVolume'], 'current_price': t['last']})
125
  data.sort(key=lambda x: x['dollar_volume'], reverse=True)
126
  return data
127
  except: return []
128
 
 
 
 
 
129
  # ==================================================================
130
+ # 🔵 خط أنابيب البيانات الهجين (Hybrid Data Stream)
131
  # ==================================================================
132
  async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue):
133
+ """جلب بيانات عميقة (500 شمعة) لكافة الأطر المطلوبة"""
134
+ tfs = ['5m', '15m', '1h', '4h', '1d']
135
+ limit = 500
 
 
 
 
 
136
 
137
  for sym_data in symbols:
138
  sym = sym_data['symbol']
 
139
  tasks = [self._fetch_ohlcv_live(sym, tf, limit) for tf in tfs]
140
  results = await asyncio.gather(*tasks, return_exceptions=False)
141
 
142
+ ohlcv = {}
 
143
  for i, res in enumerate(results):
144
+ if res and isinstance(res, list) and len(res) >= 200:
145
+ ohlcv[tfs[i]] = res
 
 
 
146
 
147
+ # يجب توفر معظم الأطر لعمل النظام الهجين بدقة
148
+ if len(ohlcv) >= 4 and '5m' in ohlcv:
149
+ sym_data['ohlcv'] = ohlcv
150
+ await queue.put([sym_data])
151
 
152
+ await asyncio.sleep(0.1) # تجنب حظر API
 
153
 
154
+ await queue.put(None)
155
+
156
+ async def _fetch_ohlcv_live(self, symbol, tf, limit):
157
+ try: return await self.exchange.fetch_ohlcv(symbol, tf, limit=limit)
158
+ except: return None
159
 
160
+ print("✅ DataManager V12.1 (Hybrid Support) loaded.")