# ml_engine/data_manager.py # (V12.3 - Hybrid Data Pipeline with Correct Naming) import os import asyncio import httpx import traceback import time from datetime import datetime import ccxt.async_support as ccxt import numpy as np import logging from typing import List, Dict, Any import pandas as pd try: import pandas_ta as ta except ImportError: print("❌ [DataManager] مكتبة pandas_ta غير موجودة.") ta = None from ml_engine.indicators import AdvancedTechnicalAnalyzer from ml_engine.monte_carlo import MonteCarloAnalyzer from ml_engine.ranker import Layer1Ranker try: from ml_engine.patterns import ChartPatternAnalyzer except ImportError: print("⚠️ [DataManager] لم يتم العثور على ml_engine/patterns.py") ChartPatternAnalyzer = None logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("ccxt").setLevel(logging.WARNING) class DataManager: def __init__(self, contracts_db, whale_monitor, r2_service=None): # ================================================================== # ⚙️ إعدادات التحكم المركزية (V12.3 Hybrid Thresholds) # ================================================================== # العتبة الرئيسية للنظام الهجين (Titan + Patterns + MC) self.HYBRID_ENTRY_THRESHOLD = 0.60 # ================================================================== self.contracts_db = contracts_db or {} self.whale_monitor = whale_monitor self.r2_service = r2_service self.exchange = ccxt.kucoin({ 'enableRateLimit': True, 'timeout': 30000, }) self.http_client = None self.market_cache = {} self.technical_analyzer = AdvancedTechnicalAnalyzer() self.mc_analyzer = MonteCarloAnalyzer() self.layer1_ranker = None self.pattern_analyzer = None async def initialize(self): self.http_client = httpx.AsyncClient(timeout=30.0) await self._load_markets() print(" > [DataManager] تهيئة النماذج المساندة (Ranker + Patterns)...") try: self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm") await self.layer1_ranker.initialize() if ChartPatternAnalyzer: self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service) await self.pattern_analyzer.initialize() except Exception as e: print(f"⚠️ [DataManager] تحذير أثناء تهيئة النماذج المساندة: {e}") print(f"✅ DataManager V12.3 initialized (Hybrid Threshold: {self.HYBRID_ENTRY_THRESHOLD})") async def _load_markets(self): try: if self.exchange: await self.exchange.load_markets() self.market_cache = self.exchange.markets except Exception as e: print(f"❌ [DataManager] فشل تحميل الأسواق: {e}") async def close(self): if self.http_client: await self.http_client.aclose() if self.exchange: await self.exchange.close() if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'): self.pattern_analyzer.clear_memory() if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'): self.layer1_ranker.clear_memory() async def layer1_rapid_screening(self) -> List[Dict[str, Any]]: print(f"🔍 [Layer 1] بدء الغربلة السريعة...") volume_data = await self._get_volume_data_live() if not volume_data: return [] candidates = volume_data[:150] print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.") return candidates async def _get_volume_data_live(self): try: tickers = await self.exchange.fetch_tickers() data = [] for s, t in tickers.items(): if s.endswith('/USDT') and t['quoteVolume'] and t['quoteVolume'] > 100000: data.append({'symbol': s, 'dollar_volume': t['quoteVolume'], 'current_price': t['last']}) data.sort(key=lambda x: x['dollar_volume'], reverse=True) return data except: return [] async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue): tfs = ['5m', '15m', '1h', '4h', '1d'] limit = 500 for sym_data in symbols: sym = sym_data['symbol'] tasks = [self._fetch_ohlcv_live(sym, tf, limit) for tf in tfs] results = await asyncio.gather(*tasks, return_exceptions=False) ohlcv = {} for i, res in enumerate(results): if res and isinstance(res, list) and len(res) >= 200: ohlcv[tfs[i]] = res if len(ohlcv) >= 4 and '5m' in ohlcv: sym_data['ohlcv'] = ohlcv await queue.put([sym_data]) await asyncio.sleep(0.1) await queue.put(None) async def _fetch_ohlcv_live(self, symbol, tf, limit): try: return await self.exchange.fetch_ohlcv(symbol, tf, limit=limit) except: return None