# ml_engine/data_manager.py # (V12.1 - Titan + Hybrid Support Pipeline) import os import asyncio import httpx import traceback import time from datetime import datetime import ccxt.async_support as ccxt import numpy as np import logging from typing import List, Dict, Any import pandas as pd # محاولة استيراد pandas_ta try: import pandas_ta as ta except ImportError: print("❌ [DataManager] مكتبة pandas_ta غير موجودة.") ta = None from ml_engine.indicators import AdvancedTechnicalAnalyzer from ml_engine.monte_carlo import MonteCarloAnalyzer from ml_engine.ranker import Layer1Ranker # 🔥 إعادة استيراد محرك الأنماط لدعم النظام الهجين try: from ml_engine.patterns import ChartPatternAnalyzer except ImportError: print("⚠️ [DataManager] لم يتم العثور على ml_engine/patterns.py") ChartPatternAnalyzer = None logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("ccxt").setLevel(logging.WARNING) class DataManager: def __init__(self, contracts_db, whale_monitor, r2_service=None): # ================================================================== # ⚙️ إعدادات التحكم المركزية (V12 Hybrid Thresholds) # ================================================================== self.SCREENING_THRESHOLD = 0.40 # غربلة أولية self.TITAN_ENTRY_THRESHOLD = 0.90 # عتبة Titan الصارمة # ================================================================== self.contracts_db = contracts_db or {} self.whale_monitor = whale_monitor self.r2_service = r2_service self.exchange = ccxt.kucoin({ 'enableRateLimit': True, 'timeout': 30000, }) self.http_client = None self.market_cache = {} self.technical_analyzer = AdvancedTechnicalAnalyzer() self.mc_analyzer = MonteCarloAnalyzer() # نماذج الطبقة الأولى والثانية المساندة self.layer1_ranker = None self.pattern_analyzer = None # 🔥 تمت إعادته لتجنب AttributeError async def initialize(self): """تهيئة الاتصالات وتحميل جميع النماذج المساندة""" self.http_client = httpx.AsyncClient(timeout=30.0) await self._load_markets() print(" > [DataManager] تهيئة النماذج المساندة (Ranker + Patterns)...") try: # 1. الكاشف المصغر (Ranker) self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm") await self.layer1_ranker.initialize() # 2. محرك الأنماط (Patterns) - للنظام الهجين if ChartPatternAnalyzer: self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service) await self.pattern_analyzer.initialize() except Exception as e: print(f"⚠️ [DataManager] تحذير أثناء تهيئة النماذج المساندة: {e}") print(f"✅ DataManager V12.1 initialized (Hybrid Ready).") async def _load_markets(self): try: if self.exchange: await self.exchange.load_markets() self.market_cache = self.exchange.markets except Exception as e: print(f"❌ [DataManager] فشل تحميل الأسواق: {e}") async def close(self): if self.http_client: await self.http_client.aclose() if self.exchange: await self.exchange.close() # تنظيف الذاكرة عند الإغلاق if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'): self.pattern_analyzer.clear_memory() if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'): self.layer1_ranker.clear_memory() # ================================================================== # 🔴 الطبقة 1: الغربلة السريعة # ================================================================== async def layer1_rapid_screening(self) -> List[Dict[str, Any]]: print(f"🔍 [Layer 1] بدء الغربلة السريعة...") volume_data = await self._get_volume_data_live() if not volume_data: return [] # توسيع النطاق لـ 150 عملة لزيادة فرص Titan candidates = volume_data[:150] print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.") return candidates async def _get_volume_data_live(self): try: tickers = await self.exchange.fetch_tickers() data = [] for s, t in tickers.items(): # فلترة العملات ذات السيولة الضعيفة جداً (< 100k) if s.endswith('/USDT') and t['quoteVolume'] and t['quoteVolume'] > 100000: data.append({'symbol': s, 'dollar_volume': t['quoteVolume'], 'current_price': t['last']}) data.sort(key=lambda x: x['dollar_volume'], reverse=True) return data except: return [] # ================================================================== # 🔵 خط أنابيب البيانات الهجين (Hybrid Data Stream) # ================================================================== async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue): """جلب بيانات عميقة (500 شمعة) لكافة الأطر المطلوبة""" tfs = ['5m', '15m', '1h', '4h', '1d'] limit = 500 for sym_data in symbols: sym = sym_data['symbol'] tasks = [self._fetch_ohlcv_live(sym, tf, limit) for tf in tfs] results = await asyncio.gather(*tasks, return_exceptions=False) ohlcv = {} for i, res in enumerate(results): if res and isinstance(res, list) and len(res) >= 200: ohlcv[tfs[i]] = res # يجب توفر معظم الأطر لعمل النظام الهجين بدقة if len(ohlcv) >= 4 and '5m' in ohlcv: sym_data['ohlcv'] = ohlcv await queue.put([sym_data]) await asyncio.sleep(0.1) # تجنب حظر API await queue.put(None) async def _fetch_ohlcv_live(self, symbol, tf, limit): try: return await self.exchange.fetch_ohlcv(symbol, tf, limit=limit) except: return None print("✅ DataManager V12.1 (Hybrid Support) loaded.")