File size: 6,852 Bytes
2bf9457
82e6b70
2bf9457
00bb5c9
 
 
 
 
 
2bf9457
d69dead
28fa18b
394e2c7
164b380
 
909d04f
164b380
 
 
909d04f
164b380
11b4dc5
 
2bf9457
 
82e6b70
 
 
 
 
 
53cf6c0
28fa18b
 
2bf9457
53cf6c0
 
6b00681
4437a6f
82e6b70
4437a6f
82e6b70
 
4437a6f
 
53cf6c0
d2775f3
4ace337
c6f72fe
2bf9457
 
909d04f
2bf9457
c6f72fe
87e3669
56e3f87
11b4dc5
 
2bf9457
82e6b70
 
909d04f
82e6b70
909d04f
53cf6c0
82e6b70
248e033
56e3f87
6b00681
82e6b70
4ace337
82e6b70
2bf9457
4ace337
82e6b70
 
 
 
 
 
4ace337
82e6b70
6b00681
82e6b70
24a0949
56e3f87
 
2bf9457
 
 
 
909d04f
56e3f87
53cf6c0
909d04f
 
4ace337
82e6b70
 
 
 
 
11b4dc5
2bf9457
82e6b70
2bf9457
24a0949
909d04f
2bf9457
909d04f
79a9e95
82e6b70
909d04f
82e6b70
 
4437a6f
909d04f
b44825a
2bf9457
909d04f
4437a6f
82e6b70
909d04f
 
 
 
 
 
 
82e6b70
909d04f
 
82e6b70
 
 
909d04f
4437a6f
 
909d04f
 
 
82e6b70
909d04f
82e6b70
 
909d04f
82e6b70
 
 
 
2bf9457
82e6b70
4437a6f
82e6b70
 
 
 
 
20a2029
82e6b70
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# ml_engine/data_manager.py
# (V12.1 - Titan + Hybrid Support Pipeline)

import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt.async_support as ccxt
import numpy as np
import logging
from typing import List, Dict, Any
import pandas as pd

# محاولة استيراد pandas_ta
try:
    import pandas_ta as ta
except ImportError:
    print("❌ [DataManager] مكتبة pandas_ta غير موجودة.")
    ta = None

from ml_engine.indicators import AdvancedTechnicalAnalyzer
from ml_engine.monte_carlo import MonteCarloAnalyzer
from ml_engine.ranker import Layer1Ranker
# 🔥 إعادة استيراد محرك الأنماط لدعم النظام الهجين
try:
    from ml_engine.patterns import ChartPatternAnalyzer
except ImportError:
    print("⚠️ [DataManager] لم يتم العثور على ml_engine/patterns.py")
    ChartPatternAnalyzer = None

logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)

class DataManager:
    def __init__(self, contracts_db, whale_monitor, r2_service=None):
        # ==================================================================
        # ⚙️ إعدادات التحكم المركزية (V12 Hybrid Thresholds)
        # ==================================================================
        self.SCREENING_THRESHOLD = 0.40   # غربلة أولية
        self.TITAN_ENTRY_THRESHOLD = 0.90 # عتبة Titan الصارمة
        # ==================================================================

        self.contracts_db = contracts_db or {}
        self.whale_monitor = whale_monitor
        self.r2_service = r2_service
        
        self.exchange = ccxt.kucoin({
            'enableRateLimit': True,
            'timeout': 30000,
        })
            
        self.http_client = None
        self.market_cache = {}

        self.technical_analyzer = AdvancedTechnicalAnalyzer()
        self.mc_analyzer = MonteCarloAnalyzer()
        
        # نماذج الطبقة الأولى والثانية المساندة
        self.layer1_ranker = None
        self.pattern_analyzer = None # 🔥 تمت إعادته لتجنب AttributeError

    async def initialize(self):
        """تهيئة الاتصالات وتحميل جميع النماذج المساندة"""
        self.http_client = httpx.AsyncClient(timeout=30.0)
        await self._load_markets()
        
        print("   > [DataManager] تهيئة النماذج المساندة (Ranker + Patterns)...")
        try:
            # 1. الكاشف المصغر (Ranker)
            self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
            await self.layer1_ranker.initialize()
            
            # 2. محرك الأنماط (Patterns) - للنظام الهجين
            if ChartPatternAnalyzer:
                self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
                await self.pattern_analyzer.initialize()
                
        except Exception as e:
            print(f"⚠️ [DataManager] تحذير أثناء تهيئة النماذج المساندة: {e}")

        print(f"✅ DataManager V12.1 initialized (Hybrid Ready).")

    async def _load_markets(self):
        try:
            if self.exchange:
                await self.exchange.load_markets()
                self.market_cache = self.exchange.markets
        except Exception as e:
            print(f"❌ [DataManager] فشل تحميل الأسواق: {e}")

    async def close(self):
        if self.http_client: await self.http_client.aclose()
        if self.exchange: await self.exchange.close()

        # تنظيف الذاكرة عند الإغلاق
        if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'):
            self.pattern_analyzer.clear_memory()
        if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'):
            self.layer1_ranker.clear_memory()

    # ==================================================================
    # 🔴 الطبقة 1: الغربلة السريعة
    # ==================================================================
    async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
        print(f"🔍 [Layer 1] بدء الغربلة السريعة...")
        volume_data = await self._get_volume_data_live()
        if not volume_data: return []
        
        # توسيع النطاق لـ 150 عملة لزيادة فرص Titan
        candidates = volume_data[:150]
        print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.")
        return candidates

    async def _get_volume_data_live(self):
        try:
            tickers = await self.exchange.fetch_tickers()
            data = []
            for s, t in tickers.items():
                # فلترة العملات ذات السيولة الضعيفة جداً (< 100k)
                if s.endswith('/USDT') and t['quoteVolume'] and t['quoteVolume'] > 100000:
                    data.append({'symbol': s, 'dollar_volume': t['quoteVolume'], 'current_price': t['last']})
            data.sort(key=lambda x: x['dollar_volume'], reverse=True)
            return data
        except: return []

    # ==================================================================
    # 🔵 خط أنابيب البيانات الهجين (Hybrid Data Stream)
    # ==================================================================
    async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue):
        """جلب بيانات عميقة (500 شمعة) لكافة الأطر المطلوبة"""
        tfs = ['5m', '15m', '1h', '4h', '1d']
        limit = 500
        
        for sym_data in symbols:
            sym = sym_data['symbol']
            tasks = [self._fetch_ohlcv_live(sym, tf, limit) for tf in tfs]
            results = await asyncio.gather(*tasks, return_exceptions=False)
            
            ohlcv = {}
            for i, res in enumerate(results):
                if res and isinstance(res, list) and len(res) >= 200:
                    ohlcv[tfs[i]] = res
            
            # يجب توفر معظم الأطر لعمل النظام الهجين بدقة
            if len(ohlcv) >= 4 and '5m' in ohlcv:
                sym_data['ohlcv'] = ohlcv
                await queue.put([sym_data])
            
            await asyncio.sleep(0.1) # تجنب حظر API
        
        await queue.put(None)

    async def _fetch_ohlcv_live(self, symbol, tf, limit):
        try: return await self.exchange.fetch_ohlcv(symbol, tf, limit=limit)
        except: return None

print("✅ DataManager V12.1 (Hybrid Support) loaded.")