# ml_engine/data_manager.py # (V12.5 - Lazy Loading Fix + V15.6 App-Compat Fix + Detailed Logging) import os import asyncio import httpx import traceback import time from datetime import datetime import ccxt.async_support as ccxt import numpy as np import logging from typing import List, Dict, Any import pandas as pd try: import pandas_ta as ta except ImportError: print("❌ [DataManager] مكتبة pandas_ta غير موجودة.") ta = None from ml_engine.indicators import AdvancedTechnicalAnalyzer from ml_engine.monte_carlo import MonteCarloAnalyzer from ml_engine.ranker import Layer1Ranker try: from ml_engine.patterns import ChartPatternAnalyzer except ImportError: print("⚠️ [DataManager] لم يتم العثور على ml_engine/patterns.py") ChartPatternAnalyzer = None logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("ccxt").setLevel(logging.WARNING) class DataManager: def __init__(self, contracts_db, whale_monitor, r2_service=None): # ================================================================== # ⚙️ إعدادات التحكم المركزية (V12.3 Hybrid Thresholds) # ================================================================== self.HYBRID_ENTRY_THRESHOLD = 0.60 # ================================================================== self.contracts_db = contracts_db or {} self.whale_monitor = whale_monitor self.r2_service = r2_service self.exchange = ccxt.kucoin({ 'enableRateLimit': True, 'timeout': 30000, }) self.http_client = None self.market_cache = {} self.technical_analyzer = AdvancedTechnicalAnalyzer() self.mc_analyzer = MonteCarloAnalyzer() self.layer1_ranker = None self.pattern_analyzer = None async def initialize(self): """تهيئة مدير البيانات والاتصالات""" print(" > [DM Log] 0. بدء تهيئة DataManager...") self.http_client = httpx.AsyncClient(timeout=30.0) await self._load_markets() print(" > [DataManager] إنشاء النماذج المساندة (Lazy Load)...") try: self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm") if ChartPatternAnalyzer: self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service) except Exception as e: print(f"⚠️ [DataManager] تحذير أثناء إنشاء النماذج المساندة: {e}") print(f"✅ DataManager V12.5 initialized (Hybrid Threshold: {self.HYBRID_ENTRY_THRESHOLD})") print(" > [DM Log] 4. اكتملت تهيئة DataManager.") async def _load_markets(self): """تحميل بيانات الأسواق وتخزينها مؤقتاً""" print(" > [DM Log] 1. بدء _load_markets...") try: if self.exchange: print(" > [DM Log] 2. استدعاء exchange.load_markets()... (قد يستغرق وقتاً)") await self.exchange.load_markets() self.market_cache = self.exchange.markets if self.market_cache and len(self.market_cache) > 0: print(f" > [DM Log] 3. ✅ نجاح! تم تحميل {len(self.market_cache)} سوق.") else: print(" > [DM Log] 3. ⚠️ تحذير: load_markets() نجح ولكن لم يتم إرجاع أسواق.") else: print(" > [DM Log] 2. ❌ خطأ: self.exchange هو None.") except Exception as e: print(f" > [DM Log] 3. ❌❌❌ فشل فادح في _load_markets: {e}") traceback.print_exc() async def close(self): """إغلاق جميع الاتصالات بأمان""" print(" > [DM Log] 7. إغلاق اتصالات DataManager...") if self.http_client: await self.http_client.aclose() if self.exchange: await self.exchange.close() if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'): self.pattern_analyzer.clear_memory() if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'): self.layer1_ranker.clear_memory() # ================================================================== # 🚀 [إضافة جديدة V15.6] دوال التوافق مع App # ================================================================== async def load_contracts_from_r2(self): """ [جديد] يقوم بتحميل قاعدة بيانات العقود من R2 عند بدء التشغيل. """ print(" > [DataManager] Loading contracts database from R2...") if not self.r2_service: print("❌ [DataManager] R2Service not available. Cannot load contracts.") self.contracts_db = {} return try: self.contracts_db = await self.r2_service.load_contracts_db_async() print(f"✅ [DataManager] Contracts loaded. Total entries: {len(self.contracts_db)}") except Exception as e: print(f"❌ [DataManager] Failed to load contracts from R2: {e}") self.contracts_db = {} def get_contracts_db(self) -> Dict[str, Any]: """ [جديد] إرجاع قاعدة بيانات العقود التي تم تحميلها. """ return self.contracts_db # ================================================================== # 🛡️ دوال الطبقة الأولى (Layer 1 Screening) # ================================================================== async def layer1_rapid_screening(self) -> List[Dict[str, Any]]: """ الغربلة الأولية السريعة جداً بناءً على الحجم فقط. """ print(f"🔍 [Layer 1] بدء الغربلة السريعة (Top Liquid Assets)...") volume_data = await self._get_volume_data_live() if not volume_data: print("⚠️ [Layer 1 Warning] لم يتم العثور على بيانات حجم تداول.") return [] candidates = volume_data[:150] print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.") return candidates async def _get_volume_data_live(self): """جلب بيانات الحجم الحية لجميع الأزواج""" try: tickers = await self.exchange.fetch_tickers() data = [] for symbol, ticker in tickers.items(): if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000: data.append({ 'symbol': symbol, 'dollar_volume': ticker['quoteVolume'], 'current_price': ticker['last'] }) data.sort(key=lambda x: x['dollar_volume'], reverse=True) return data except Exception as e: print(f"❌ [DataManager] خطأ في جلب بيانات الحجم: {e}") return [] # ================================================================== # 📊 دوال جلب البيانات (Data Fetching Pipeline) # ================================================================== async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue): """ مولد بيانات متدفق (Streaming Generator) يجلب شموع OHLCV """ timeframes = ['5m', '15m', '1h', '4h', '1d'] limit = 500 for sym_data in symbols: symbol = sym_data['symbol'] tasks = [self._fetch_ohlcv_live(symbol, tf, limit) for tf in timeframes] results = await asyncio.gather(*tasks, return_exceptions=False) ohlcv_packet = {} valid_packet = True for i, res in enumerate(results): tf = timeframes[i] if res and isinstance(res, list) and len(res) >= 200: ohlcv_packet[tf] = res else: if tf in ['5m', '1h']: valid_packet = False if valid_packet and len(ohlcv_packet) >= 4: sym_data['ohlcv'] = ohlcv_packet await queue.put([sym_data]) await asyncio.sleep(0.05) await queue.put(None) async def _fetch_ohlcv_live(self, symbol, timeframe, limit): """دالة مساعدة لجلب الشموع مع معالجة الأخطاء البسيطة""" try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) except Exception: return None # ================================================================== # 🎯 دوال مساعدة للحارس والدماغ (Sentry & Brain Helpers) # ================================================================== async def get_latest_price_async(self, symbol: str) -> float: """جلب آخر سعر حقيقي (للتنفيذ والمراقبة)""" try: ticker = await self.exchange.fetch_ticker(symbol) return float(ticker['last']) except Exception as e: print(f"⚠️ [DataManager] Failed to fetch price for {symbol}: {e}") return 0.0 async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]: """ جلب عدد محدود من الشموع الأخيرة بسرعة. """ # (إضافة طابع خفيف لتجنب إغراق السجلات) # print(f" > [DM Log] 5. [get_latest_ohlcv] طلب {symbol} {timeframe}...") try: candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) if candles and len(candles) > 0: # (لا نطبع هذا لأنه سينجح) return candles else: # (هذا هو الطابع المهم الذي يكشف الفشل الصامت) print(f" > [DM Log] 6. ⚠️ [get_latest_ohlcv] فشل صامت لـ {symbol} {timeframe}. (أرجع قائمة فارغة).") return [] except Exception as e: # (هذا هو الطابع المهم الذي يكشف الفشل الفادح) print(f" > [DM Log] 6. ❌ [get_latest_ohlcv] فشل فادح لـ {symbol} {timeframe}: {e}") return []