File size: 11,735 Bytes
2bf9457 12cf278 2bf9457 00bb5c9 2bf9457 d69dead 28fa18b 394e2c7 164b380 909d04f 164b380 11b4dc5 2bf9457 82e6b70 53cf6c0 28fa18b 2bf9457 53cf6c0 29d027f 4437a6f 29d027f 4437a6f 29d027f 19ecbac 4437a6f 53cf6c0 d2775f3 4ace337 c6f72fe d21f065 29d027f c6f72fe 87e3669 56e3f87 11b4dc5 2bf9457 82e6b70 909d04f d50c5b6 909d04f 53cf6c0 d21f065 248e033 56e3f87 6b00681 62a43d9 29d027f 62a43d9 29d027f 62a43d9 29d027f 62a43d9 6b00681 62a43d9 24a0949 56e3f87 d21f065 56e3f87 2bf9457 29d027f 2bf9457 909d04f 56e3f87 53cf6c0 d21f065 909d04f d21f065 82e6b70 11b4dc5 12cf278 d21f065 24a0949 d21f065 2bf9457 d21f065 29d027f 4437a6f 909d04f d21f065 b44825a 2bf9457 909d04f d21f065 909d04f d21f065 909d04f d21f065 909d04f d21f065 4437a6f d21f065 909d04f d21f065 909d04f d21f065 82e6b70 d21f065 82e6b70 d21f065 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# ml_engine/data_manager.py
# (V12.5 - Lazy Loading Fix + V15.6 App-Compat Fix)
import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt.async_support as ccxt
import numpy as np
import logging
from typing import List, Dict, Any
import pandas as pd
try:
import pandas_ta as ta
except ImportError:
print("❌ [DataManager] مكتبة pandas_ta غير موجودة.")
ta = None
from ml_engine.indicators import AdvancedTechnicalAnalyzer
from ml_engine.monte_carlo import MonteCarloAnalyzer
from ml_engine.ranker import Layer1Ranker
try:
from ml_engine.patterns import ChartPatternAnalyzer
except ImportError:
print("⚠️ [DataManager] لم يتم العثور على ml_engine/patterns.py")
ChartPatternAnalyzer = None
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)
class DataManager:
def __init__(self, contracts_db, whale_monitor, r2_service=None):
# ==================================================================
# ⚙️ إعدادات التحكم المركزية (V12.3 Hybrid Thresholds)
# ==================================================================
# العتبة الرئيسية للنظام الهجين (Titan + Patterns + MC)
self.HYBRID_ENTRY_THRESHOLD = 0.60
# ==================================================================
self.contracts_db = contracts_db or {}
self.whale_monitor = whale_monitor
self.r2_service = r2_service
# تهيئة منصة التبادل (KuCoin كمثال)
self.exchange = ccxt.kucoin({
'enableRateLimit': True,
'timeout': 30000,
})
self.http_client = None
self.market_cache = {}
self.technical_analyzer = AdvancedTechnicalAnalyzer()
self.mc_analyzer = MonteCarloAnalyzer()
self.layer1_ranker = None
self.pattern_analyzer = None
async def initialize(self):
"""تهيئة مدير البيانات والاتصالات"""
self.http_client = httpx.AsyncClient(timeout=30.0)
await self._load_markets()
print(" > [DataManager] إنشاء النماذج المساندة (Lazy Load)...") # (تغيير الرسالة)
try:
self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
# [ 🔴🔴🔴 LAZY LOAD 🔴🔴🔴 ]
# await self.layer1_ranker.initialize() # (تم الإلغاء - سيتم تحميله عند الحاجة)
# [ 🔴🔴🔴 END LAZY LOAD 🔴🔴🔴 ]
if ChartPatternAnalyzer:
self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
# [ 🔴🔴🔴 LAZY LOAD 🔴🔴🔴 ]
# await self.pattern_analyzer.initialize() # (تم الإلغاء - سيتم تحميله عند الحاجة)
# [ 🔴🔴🔴 END LAZY LOAD 🔴🔴🔴 ]
except Exception as e:
print(f"⚠️ [DataManager] تحذير أثناء إنشاء النماذج المساندة: {e}")
print(f"✅ DataManager V12.5 initialized (Hybrid Threshold: {self.HYBRID_ENTRY_THRESHOLD})")
async def _load_markets(self):
"""تحميل بيانات الأسواق وتخزينها مؤقتاً"""
try:
if self.exchange:
await self.exchange.load_markets()
self.market_cache = self.exchange.markets
except Exception as e:
print(f"❌ [DataManager] فشل تحميل الأسواق: {e}")
async def close(self):
"""إغلاق جميع الاتصالات بأمان"""
if self.http_client: await self.http_client.aclose()
if self.exchange: await self.exchange.close()
# تنظيف ذاكرة النماذج الفرعية إذا لزم الأمر
if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'):
self.pattern_analyzer.clear_memory()
if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'):
self.layer1_ranker.clear_memory()
# ==================================================================
# 🚀 [إضافة جديدة V15.6] دوال التوافق مع App
# ==================================================================
async def load_contracts_from_r2(self):
"""
[جديد] يقوم بتحميل قاعدة بيانات العقود من R2 عند بدء التشغيل.
(مطلوب بواسطة app.py V15.6)
"""
print(" > [DataManager] Loading contracts database from R2...")
if not self.r2_service:
print("❌ [DataManager] R2Service not available. Cannot load contracts.")
self.contracts_db = {}
return
try:
self.contracts_db = await self.r2_service.load_contracts_db_async()
print(f"✅ [DataManager] Contracts loaded. Total entries: {len(self.contracts_db)}")
except Exception as e:
print(f"❌ [DataManager] Failed to load contracts from R2: {e}")
self.contracts_db = {}
def get_contracts_db(self) -> Dict[str, Any]:
"""
[جديد] إرجاع قاعدة بيانات العقود التي تم تحميلها.
(مطلوب بواسطة app.py V15.6)
"""
return self.contracts_db
# ==================================================================
# 🛡️ دوال الطبقة الأولى (Layer 1 Screening)
# ==================================================================
async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
"""
الغربلة الأولية السريعة جداً بناءً على الحجم فقط.
تختار أعلى 150 عملة سيولةً لتمريرها للتحليل الهجين المعمق.
"""
print(f"🔍 [Layer 1] بدء الغربلة السريعة (Top Liquid Assets)...")
volume_data = await self._get_volume_data_live()
if not volume_data:
print("⚠️ [Layer 1 Warning] لم يتم العثور على بيانات حجم تداول.")
return []
# اختيار أعلى 150 عملة من حيث حجم التداول الدولاري
candidates = volume_data[:150]
print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.")
return candidates
async def _get_volume_data_live(self):
"""جلب بيانات الحجم الحية لجميع الأزواج"""
try:
tickers = await self.exchange.fetch_tickers()
data = []
for symbol, ticker in tickers.items():
# تصفية الأزواج: USDT فقط، وحجم تداول معقول (> 100k$) لتجنب العملات الميتة
if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000:
data.append({
'symbol': symbol,
'dollar_volume': ticker['quoteVolume'],
'current_price': ticker['last']
})
# الترتيب التنازلي حسب الحجم
data.sort(key=lambda x: x['dollar_volume'], reverse=True)
return data
except Exception as e:
print(f"❌ [DataManager] خطأ في جلب بيانات الحجم: {e}")
return []
# ==================================================================
# 📊 دوال جلب البيانات (Data Fetching Pipeline)
# ==================================================================
async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue):
"""
مولد بيانات متدفق (Streaming Generator) يجلب شموع OHLCV
لعدة إطارات زمنية لكل عملة، ويضع النتائج في طابور المعالجة.
"""
timeframes = ['5m', '15m', '1h', '4h', '1d']
limit = 500 # نحتاج تاريخ كافي للمؤشرات المعقدة
for sym_data in symbols:
symbol = sym_data['symbol']
# جلب جميع الإطارات الزمنية بالتوازي للسرعة القصوى
tasks = [self._fetch_ohlcv_live(symbol, tf, limit) for tf in timeframes]
results = await asyncio.gather(*tasks, return_exceptions=False)
ohlcv_packet = {}
valid_packet = True
for i, res in enumerate(results):
tf = timeframes[i]
# التحقق من جودة البيانات (على الأقل 200 شمعة للتحليل الدقيق)
if res and isinstance(res, list) and len(res) >= 200:
ohlcv_packet[tf] = res
else:
# إذا فقدنا إطاراً زمنياً حيوياً (مثل 5m أو 1h)، قد نعتبر الحزمة غير صالحة
if tf in ['5m', '1h']: valid_packet = False
# إذا كانت الحزمة مكتملة بما يكفي، نرسلها للمعالجة
if valid_packet and len(ohlcv_packet) >= 4:
sym_data['ohlcv'] = ohlcv_packet
await queue.put([sym_data]) # نرسل كقائمة لتوافق المعالج
# فاصل زمني صغير جداً لتجنب تجاوز حدود الـ API بعنف
await asyncio.sleep(0.05)
# علامة نهاية التدفق
await queue.put(None)
async def _fetch_ohlcv_live(self, symbol, timeframe, limit):
"""دالة مساعدة لجلب الشموع مع معالجة الأخطاء البسيطة"""
try:
return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
except Exception:
# نتجاهل الأخطاء الفردية لعدم إيقاف التدفق الكامل
return None
# ==================================================================
# 🎯 دوال مساعدة للحارس والدماغ (Sentry & Brain Helpers)
# ==================================================================
async def get_latest_price_async(self, symbol: str) -> float:
"""جلب آخر سعر حقيقي (للتنفيذ والمراقبة)"""
try:
ticker = await self.exchange.fetch_ticker(symbol)
return float(ticker['last'])
except Exception as e:
print(f"⚠️ [DataManager] Failed to fetch price for {symbol}: {e}")
return 0.0
# [NEW FIX V12.4] الدالة التي كانت مفقودة وتمت إضافتها
async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
"""
جلب عدد محدود من الشموع الأخيرة بسرعة.
يستخدمها 'القناص' (Sniper) للمراقبة اللحظية الخفيفة.
"""
try:
candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
if candles and len(candles) > 0:
return candles
return []
except Exception as e:
# print(f"⚠️ [DataManager] Failed rapid OHLCV fetch for {symbol}: {e}")
return [] |