Spaces:
Runtime error
Runtime error
File size: 20,851 Bytes
2bf9457 00bb5c9 2bf9457 d69dead 28fa18b 394e2c7 164b380 2bf9457 164b380 2bf9457 164b380 11b4dc5 2bf9457 11b4dc5 2bf9457 53cf6c0 2bf9457 28fa18b 2bf9457 53cf6c0 6b00681 53cf6c0 d2775f3 4ace337 c6f72fe 2bf9457 c6f72fe 87e3669 56e3f87 11b4dc5 2bf9457 11b4dc5 2bf9457 d69dead 53cf6c0 2bf9457 248e033 56e3f87 6b00681 2bf9457 6b00681 4ace337 6b00681 2bf9457 4ace337 2bf9457 4ace337 2bf9457 4ace337 2bf9457 6b00681 2bf9457 24a0949 56e3f87 2bf9457 56e3f87 2bf9457 56e3f87 53cf6c0 2bf9457 b866e29 2bf9457 4ace337 24a0949 2bf9457 d69dead 2bf9457 4ace337 24a0949 2bf9457 24a0949 2bf9457 d69dead 2bf9457 d69dead 248e033 24a0949 2bf9457 ea38153 2bf9457 53cf6c0 2bf9457 185f2b0 11b4dc5 2bf9457 11b4dc5 2bf9457 11b4dc5 2bf9457 11b4dc5 2bf9457 24a0949 2bf9457 24a0949 2bf9457 79a9e95 2bf9457 4ace337 2bf9457 79a9e95 2bf9457 79a9e95 2bf9457 79a9e95 2bf9457 79a9e95 2bf9457 164b380 2bf9457 11b4dc5 2bf9457 185f2b0 2bf9457 185f2b0 2bf9457 185f2b0 2bf9457 4ace337 2bf9457 164b380 2bf9457 164b380 2bf9457 164b380 2bf9457 164b380 2bf9457 b44825a 2bf9457 6690430 b44825a 2bf9457 6690430 2bf9457 b44825a 6690430 2bf9457 6690430 2bf9457 3fd2d9a 2bf9457 6690430 2bf9457 6690430 2bf9457 6690430 2bf9457 6690430 2bf9457 6598c39 2bf9457 7f28923 2bf9457 2aa4cc2 2bf9457 2aa4cc2 2bf9457 20a2029 2bf9457 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 |
# ml_engine/data_manager.py
# (V11.0 - Full Real Data Integration: XGBoost 1h + Ranker + MC in Layer 1.1)
import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt.async_support as ccxt
import numpy as np
import logging
from typing import List, Dict, Any
import pandas as pd
# محاولة استيراد pandas_ta (ضرورية للمؤشرات)
try:
import pandas_ta as ta
except ImportError:
print("❌ [DataManager] مكتبة pandas_ta غير موجودة. النظام لن يعمل بشكل صحيح.")
ta = None
# استيراد المحركات التحليلية
from ml_engine.indicators import AdvancedTechnicalAnalyzer
from ml_engine.monte_carlo import MonteCarloAnalyzer
from ml_engine.patterns import ChartPatternAnalyzer # المحرك الجديد V9.0
from ml_engine.ranker import Layer1Ranker
# تقليل ضجيج السجلات للمكتبات الخارجية
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)
class DataManager:
def __init__(self, contracts_db, whale_monitor, r2_service=None):
self.contracts_db = contracts_db or {}
self.whale_monitor = whale_monitor
self.r2_service = r2_service
# إعداد عميل KuCoin غير المتزامن (لبيانات حقيقية فقط)
self.exchange = ccxt.kucoin({
'enableRateLimit': True,
'timeout': 30000, # مهلة 30 ثانية
})
self.http_client = None
self.market_cache = {}
self.last_market_load = None
# تهيئة المحركات التحليلية
self.technical_analyzer = AdvancedTechnicalAnalyzer()
self.mc_analyzer = MonteCarloAnalyzer()
self.pattern_analyzer = None # سيتم تهيئته في initialize
self.layer1_ranker = None # سيتم تهيئته في initialize
async def initialize(self):
"""تهيئة الاتصالات وتحميل نماذج الذكاء الاصطناعي"""
self.http_client = httpx.AsyncClient(timeout=30.0)
await self._load_markets()
print(" > [DataManager V11.0] تهيئة محرك الأنماط (V9.0 XGBoost)...")
try:
self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
await self.pattern_analyzer.initialize()
except Exception as e:
print(f"❌ [DataManager] فشل تهيئة محرك الأنماط V9.0: {e}")
print(" > [DataManager V11.0] تهيئة الكاشف المصغر (Layer1 Ranker)...")
try:
self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
await self.layer1_ranker.initialize()
except Exception as e:
print(f"❌ [DataManager] فشل تهيئة الرانكر: {e}")
print("✅ DataManager initialized - V11.0 (Real Data Only)")
async def _load_markets(self):
"""تحميل بيانات الأسواق المتاحة من المنصة"""
try:
if self.exchange:
await self.exchange.load_markets()
self.market_cache = self.exchange.markets
self.last_market_load = datetime.now()
except Exception as e:
print(f"❌ [DataManager] فشل تحميل الأسواق من KuCoin: {e}")
async def close(self):
"""إغلاق الاتصالات بأمان عند إيقاف النظام"""
if self.http_client:
await self.http_client.aclose()
if self.exchange:
await self.exchange.close()
async def get_market_context_async(self):
"""جلب نظرة عامة حية على السوق (أسعار BTC/ETH ومؤشر الخوف والجشع)"""
try:
sentiment_data = await self.get_sentiment_live_async()
price_data = await self._get_prices_live()
bitcoin_price = price_data.get('bitcoin')
ethereum_price = price_data.get('ethereum')
return {
'timestamp': datetime.now().isoformat(),
'bitcoin_price_usd': bitcoin_price,
'ethereum_price_usd': ethereum_price,
'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None,
'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'NEUTRAL',
'market_trend': self._determine_market_trend(bitcoin_price, sentiment_data),
'btc_sentiment': self._get_btc_sentiment(bitcoin_price),
'data_quality': 'HIGH' if bitcoin_price and ethereum_price else 'LOW'
}
except Exception as e:
print(f"⚠️ [DataManager] خطأ في جلب سياق السوق: {e}")
return self._get_minimal_market_context()
async def get_sentiment_live_async(self):
"""جلب مؤشر الخوف والجشع من API خارجي حي"""
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get("https://api.alternative.me/fng/")
if response.status_code == 200:
data = response.json()
if 'data' in data and len(data['data']) > 0:
latest = data['data'][0]
return {
"feargreed_value": int(latest['value']),
"feargreed_class": latest['value_classification']
}
return None
except Exception:
return None
async def _get_prices_live(self):
"""جلب أسعار BTC و ETH الحية من المنصة"""
if not self.exchange:
return {'bitcoin': None, 'ethereum': None}
try:
# جلب السعرين بالتوازي لتقليل وقت الانتظار
btc_task = asyncio.create_task(self.exchange.fetch_ticker('BTC/USDT'))
eth_task = asyncio.create_task(self.exchange.fetch_ticker('ETH/USDT'))
btc_ticker, eth_ticker = await asyncio.gather(btc_task, eth_task, return_exceptions=True)
btc_price = btc_ticker['last'] if isinstance(btc_ticker, dict) else None
eth_price = eth_ticker['last'] if isinstance(eth_ticker, dict) else None
return {'bitcoin': btc_price, 'ethereum': eth_price}
except Exception:
return {'bitcoin': None, 'ethereum': None}
def _determine_market_trend(self, btc_price, sentiment):
"""تحديد اتجاه السوق بناءً على قواعد سعرية ومشاعرية (قابلة للتعديل)"""
if not btc_price:
return "UNKNOWN"
score = 0
# قواعد بسيطة لتحديد الاتجاه (يمكن جعلها ديناميكية أكثر مستقبلاً)
if btc_price > 95000: score += 1
elif btc_price < 90000: score -= 1
if sentiment:
fg_val = sentiment.get('feargreed_value', 50)
if fg_val > 65: score += 1 # طمع شديد يدعم الصعود
elif fg_val < 35: score -= 1 # خوف شديد يدعم الهبوط
if score > 0: return "bull_market"
elif score < 0: return "bear_market"
else: return "sideways_market"
def _get_btc_sentiment(self, btc_price):
if not btc_price: return "UNKNOWN"
# تصنيف بسيط بناء على مستويات سعرية
if btc_price > 95000: return "BULLISH"
elif btc_price < 90000: return "BEARISH"
else: return "NEUTRAL"
def _get_minimal_market_context(self):
"""سياق احتياطي في حال فشل الاتصال"""
return {
'timestamp': datetime.now().isoformat(),
'data_available': False,
'market_trend': 'UNKNOWN',
'data_quality': 'LOW'
}
def _create_dataframe(self, candles: List) -> pd.DataFrame:
"""تحويل قائمة الشموع الخام إلى Pandas DataFrame للمعالجة"""
if not candles:
return pd.DataFrame()
try:
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
# إزالة أي صفوف مكررة قد تنتج عن مشاكل في API
df = df[~df.index.duplicated(keep='last')]
return df
except Exception as e:
print(f"⚠️ [DataManager] خطأ في إنشاء DataFrame: {e}")
return pd.DataFrame()
# ==================================================================
# 🔴 الطبقة 1.1: الغربلة السريعة (V11.0 - Real Data & ML Integrated)
# ==================================================================
async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
"""
تنفيذ الغربلة الأولية باستخدام بيانات حية و 3 نماذج تقييم.
المعايير: حجم تداول عالي + درجة مجمعة >= 50%.
"""
print("📊 [Layer 1.1] بدء الغربلة الحية (Ranker + XGB 1h + MC)...")
# التحقق من جاهزية المحركات
if not self.layer1_ranker or not self.layer1_ranker.model:
print("⚠️ [Layer 1.1] تحذير: الرانكر (Ranker) غير جاهز.")
if not self.pattern_analyzer or not self.pattern_analyzer.initialized:
print("⚠️ [Layer 1.1] تحذير: محرك الأنماط (XGBoost) غير جاهز.")
# 1. جلب جميع العملات وتصفية أفضل 100 حسب سيولة الدولار (Quote Volume)
volume_data = await self._get_volume_data_live()
if not volume_data:
print("❌ [Layer 1.1] فشل جلب بيانات الأحجام الحية. إيقاف الغربلة.")
return []
# ترتيب تنازلي حسب السيولة وأخذ أفضل 100
volume_data.sort(key=lambda x: x['dollar_volume'], reverse=True)
top_100_candidates = volume_data[:100]
print(f" 🔄 تحليل {len(top_100_candidates)} عملة (إطار 1H) بالتوازي...")
final_qualified_candidates = []
# 2. معالجة العملات على دفعات (لتجنب تجاوز حدود API)
batch_size = 20 # دفعة آمنة لمعظم المنصات
for i in range(0, len(top_100_candidates), batch_size):
batch = top_100_candidates[i:i+batch_size]
# جلب شموع الساعة (1H) لكل عملة في الدفعة بالتوازي
fetch_tasks = [self._fetch_ohlcv_live(c['symbol'], '1h', limit=200) for c in batch]
batch_results = await asyncio.gather(*fetch_tasks, return_exceptions=True)
for j, candles in enumerate(batch_results):
# التحقق من صلاحية البيانات المستلمة
if isinstance(candles, Exception) or not candles or len(candles) < 150:
continue
candidate_data = batch[j]
symbol = candidate_data['symbol']
try:
# تحويل الشموع إلى DataFrame للحسابات
df_1h = self._create_dataframe(candles)
if df_1h.empty: continue
closes_np = df_1h['close'].to_numpy()
# --- أ. حساب درجة الرانكر (LightGBM) [وزن 40%] ---
ranker_score = 0.0
if self.layer1_ranker and self.layer1_ranker.model:
# حساب الميزات الذكية التي يحتاجها الرانكر
features = self.technical_analyzer.calculate_v9_smart_features(df_1h)
if features:
# إضافة ميزات مونت كارلو البسيطة للرانكر (يتوقعها النموذج)
mc_simple_for_ranker = self.mc_analyzer.generate_1h_price_distribution_simple(closes_np[-100:])
features.update(mc_simple_for_ranker)
# التنبؤ
ranker_prob = self.layer1_ranker.predict_proba(pd.DataFrame([features]))
ranker_score = float(ranker_prob[0]) if len(ranker_prob) > 0 else 0.0
# --- ب. حساب درجة XGBoost 1h (النمط) [وزن 40%] ---
xgb_score = 0.0
if self.pattern_analyzer and self.pattern_analyzer.initialized:
# نمرر بيانات 1H فقط للمحرك للحصول على نتيجة سريعة
pattern_result = await self.pattern_analyzer.detect_chart_patterns({'1h': candles})
# استخراج احتمالية الصعود لنموذج 1h
xgb_score = pattern_result.get('details', {}).get('1h') or 0.0
# --- ج. حساب درجة مونت كارلو (احتمالية الربح) [وزن 20%] ---
mc_score = 0.0
mc_result = self.mc_analyzer.generate_1h_price_distribution_simple(closes_np[-100:])
if not mc_result.get('error'):
mc_prob_gain = mc_result.get('mc_prob_gain', 0.5)
# تطبيع الدرجة: 50% احتمال ربح = 0 نقاط، 100% احتمال ربح = 1 نقطة
mc_score = max(0.0, (mc_prob_gain - 0.5) * 2.0)
# --- د. حساب الدرجة النهائية الموزونة ---
final_l1_score = (ranker_score * 0.40) + (xgb_score * 0.40) + (mc_score * 0.20)
# التأهيل إذا تجاوزت العتبة (50%)
if final_l1_score >= 0.50:
candidate_data['layer1_score'] = final_l1_score
# تخزين أسباب الترشح للشفافية
candidate_data['reasons_for_candidacy'] = [
f"Ranker(40%): {ranker_score:.2f}",
f"XGB_1h(40%): {xgb_score:.2f}",
f"MC_Simple(20%): {mc_score:.2f}"
]
final_qualified_candidates.append(candidate_data)
except Exception as e:
# طباعة خطأ صامتة لعدم إغراق السجلات
# print(f"⚠️ خطأ في تحليل {symbol}: {e}")
continue
# ترتيب المرشحين النهائيين حسب الدرجة
final_qualified_candidates.sort(key=lambda x: x['layer1_score'], reverse=True)
print(f"🎯 [Layer 1.1] اكتملت الغربلة. تم تأهيل {len(final_qualified_candidates)} عملة للمرحلة القادمة.")
# إرجاع أفضل 20 عملة فقط للتحليل العميق المكلف
return final_qualified_candidates[:20]
async def _fetch_ohlcv_live(self, symbol, timeframe, limit):
"""جلب شموع حية من المنصة مع التعامل مع الأخطاء"""
if not self.exchange: return None
try:
return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
except Exception:
return None
async def _get_volume_data_live(self) -> List[Dict[str, Any]]:
"""جلب بيانات الحجم الحية لجميع أزواج USDT"""
if not self.exchange: return []
try:
# جلب كل الأسعار والأحجام في طلب واحد سريع
tickers = await self.exchange.fetch_tickers()
volume_data = []
for symbol, ticker in tickers.items():
# فلترة أزواج USDT النشطة فقط ذات السيولة المقبولة (> 50k)
if (symbol.endswith('/USDT') and
ticker.get('quoteVolume') and
ticker['quoteVolume'] > 50000):
volume_data.append({
'symbol': symbol,
'dollar_volume': float(ticker['quoteVolume']),
'current_price': float(ticker['last']) if ticker.get('last') else 0.0,
'price_change_24h': float(ticker['percentage']) if ticker.get('percentage') else 0.0
})
return volume_data
except Exception as e:
print(f"❌ [DataManager] فشل جلب بيانات الحجم الحية: {e}")
return []
async def stream_ohlcv_data(self, symbols: List[Dict[str, Any]], queue: asyncio.Queue):
"""
(للطبقة 1.2) جلب البيانات الكاملة (4 أطر زمنية) للعملات المتأهلة.
يتم إرسال البيانات عبر Queue للمعالجة المتوازية في Processor.
"""
print(f"📊 [Layer 1.2 Producer] بدء جلب البيانات العميقة لـ {len(symbols)} عملة...")
for symbol_data in symbols:
symbol = symbol_data['symbol']
# جلب الأطر الزمنية الأربعة المطلوبة لمحرك الأنماط الجديد
# (نستخدم 300 شمعة لضمان وجود بيانات كافية للمؤشرات)
tasks = [
self._fetch_ohlcv_live(symbol, '15m', 300),
self._fetch_ohlcv_live(symbol, '1h', 300),
self._fetch_ohlcv_live(symbol, '4h', 300),
self._fetch_ohlcv_live(symbol, '1d', 300),
]
# تنفيذ الطلبات الأربعة بالتوازي للعملة الواحدة
results = await asyncio.gather(*tasks, return_exceptions=False)
ohlcv_complete = {}
tfs = ['15m', '1h', '4h', '1d']
valid_tfs_count = 0
for i, res in enumerate(results):
# التأكد من أن البيانات صالحة وكافية (على الأقل 200 شمعة للتحليل الدقيق)
if res and isinstance(res, list) and len(res) >= 200:
ohlcv_complete[tfs[i]] = res
valid_tfs_count += 1
# إرسال العملة للمعالجة فقط إذا توفرت معظم الأطر الزمنية (3 على الأقل)
if valid_tfs_count >= 3:
symbol_data['ohlcv'] = ohlcv_complete
# نضع العملة في قائمة (كدُفعة صغيرة من عنصر واحد) في الطابور
await queue.put([symbol_data])
else:
# تجاهل العملة لعدم كفاية البيانات
pass
# إرسال إشارة نهاية التدفق للمستهلك (Processor)
await queue.put(None)
print("🏁 [Layer 1.2 Producer] اكتمل تدفق البيانات.")
async def get_symbol_daily_volume(self, symbol):
"""جلب حجم التداول اليومي لعملة محددة (يستخدم عند إعادة التحليل)"""
try:
if self.exchange:
ticker = await self.exchange.fetch_ticker(symbol)
return float(ticker['quoteVolume']) if ticker and ticker.get('quoteVolume') else 0.0
return 0.0
except: return 0.0
async def get_whale_data_for_symbol(self, symbol, daily_volume_usd=0):
"""واجهة لجلب بيانات الحيتان من المراقب (مع تمرير الحجم اليومي للتحليل النسبي)"""
if self.whale_monitor:
return await self.whale_monitor.get_symbol_whale_activity(symbol, daily_volume_usd)
return None
# طباعة رسالة تأكيد عند استيراد الملف بنجاح
print("✅ DataManager loaded - V11.0 (Full Real Data Integration)") |