File size: 5,391 Bytes
2bf9457
29d027f
2bf9457
00bb5c9
 
 
 
 
 
2bf9457
d69dead
28fa18b
394e2c7
164b380
 
 
 
 
909d04f
164b380
11b4dc5
 
2bf9457
 
82e6b70
 
 
 
 
53cf6c0
28fa18b
 
2bf9457
53cf6c0
 
29d027f
4437a6f
29d027f
4437a6f
29d027f
19ecbac
4437a6f
 
53cf6c0
d2775f3
4ace337
c6f72fe
29d027f
 
 
 
c6f72fe
87e3669
56e3f87
11b4dc5
 
2bf9457
82e6b70
909d04f
d50c5b6
909d04f
53cf6c0
248e033
56e3f87
6b00681
29d027f
 
 
 
 
 
 
 
 
 
 
6b00681
29d027f
24a0949
56e3f87
 
2bf9457
 
29d027f
2bf9457
909d04f
56e3f87
53cf6c0
909d04f
 
82e6b70
 
 
 
11b4dc5
24a0949
29d027f
2bf9457
909d04f
29d027f
 
 
4437a6f
909d04f
b44825a
2bf9457
909d04f
4437a6f
29d027f
909d04f
 
 
 
 
 
82e6b70
29d027f
4437a6f
 
909d04f
 
82e6b70
909d04f
82e6b70
 
 
 
 
29d027f
82e6b70
 
 
 
d50c5b6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# ml_engine/data_manager.py
# (V12.3 - Hybrid Data Pipeline with Correct Naming)

import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt.async_support as ccxt
import numpy as np
import logging
from typing import List, Dict, Any
import pandas as pd

try:
    import pandas_ta as ta
except ImportError:
    print("❌ [DataManager] مكتبة pandas_ta غير موجودة.")
    ta = None

from ml_engine.indicators import AdvancedTechnicalAnalyzer
from ml_engine.monte_carlo import MonteCarloAnalyzer
from ml_engine.ranker import Layer1Ranker
try:
    from ml_engine.patterns import ChartPatternAnalyzer
except ImportError:
    print("⚠️ [DataManager] لم يتم العثور على ml_engine/patterns.py")
    ChartPatternAnalyzer = None

logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)

class DataManager:
    def __init__(self, contracts_db, whale_monitor, r2_service=None):
        # ==================================================================
        # ⚙️ إعدادات التحكم المركزية (V12.3 Hybrid Thresholds)
        # ==================================================================
        # العتبة الرئيسية للنظام الهجين (Titan + Patterns + MC)
        self.HYBRID_ENTRY_THRESHOLD = 0.60
        # ==================================================================

        self.contracts_db = contracts_db or {}
        self.whale_monitor = whale_monitor
        self.r2_service = r2_service
        
        self.exchange = ccxt.kucoin({
            'enableRateLimit': True,
            'timeout': 30000,
        })
            
        self.http_client = None
        self.market_cache = {}

        self.technical_analyzer = AdvancedTechnicalAnalyzer()
        self.mc_analyzer = MonteCarloAnalyzer()
        
        self.layer1_ranker = None
        self.pattern_analyzer = None

    async def initialize(self):
        self.http_client = httpx.AsyncClient(timeout=30.0)
        await self._load_markets()
        
        print("   > [DataManager] تهيئة النماذج المساندة (Ranker + Patterns)...")
        try:
            self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
            await self.layer1_ranker.initialize()
            
            if ChartPatternAnalyzer:
                self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
                await self.pattern_analyzer.initialize()
                
        except Exception as e:
            print(f"⚠️ [DataManager] تحذير أثناء تهيئة النماذج المساندة: {e}")

        print(f"✅ DataManager V12.3 initialized (Hybrid Threshold: {self.HYBRID_ENTRY_THRESHOLD})")

    async def _load_markets(self):
        try:
            if self.exchange:
                await self.exchange.load_markets()
                self.market_cache = self.exchange.markets
        except Exception as e:
            print(f"❌ [DataManager] فشل تحميل الأسواق: {e}")

    async def close(self):
        if self.http_client: await self.http_client.aclose()
        if self.exchange: await self.exchange.close()
        if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'):
            self.pattern_analyzer.clear_memory()
        if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'):
            self.layer1_ranker.clear_memory()

    async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
        print(f"🔍 [Layer 1] بدء الغربلة السريعة...")
        volume_data = await self._get_volume_data_live()
        if not volume_data: return []
        candidates = volume_data[:150]
        print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.")
        return candidates

    async def _get_volume_data_live(self):
        try:
            tickers = await self.exchange.fetch_tickers()
            data = []
            for s, t in tickers.items():
                if s.endswith('/USDT') and t['quoteVolume'] and t['quoteVolume'] > 100000:
                    data.append({'symbol': s, 'dollar_volume': t['quoteVolume'], 'current_price': t['last']})
            data.sort(key=lambda x: x['dollar_volume'], reverse=True)
            return data
        except: return []

    async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue):
        tfs = ['5m', '15m', '1h', '4h', '1d']
        limit = 500
        for sym_data in symbols:
            sym = sym_data['symbol']
            tasks = [self._fetch_ohlcv_live(sym, tf, limit) for tf in tfs]
            results = await asyncio.gather(*tasks, return_exceptions=False)
            ohlcv = {}
            for i, res in enumerate(results):
                if res and isinstance(res, list) and len(res) >= 200:
                    ohlcv[tfs[i]] = res
            if len(ohlcv) >= 4 and '5m' in ohlcv:
                sym_data['ohlcv'] = ohlcv
                await queue.put([sym_data])
            await asyncio.sleep(0.1)
        await queue.put(None)

    async def _fetch_ohlcv_live(self, symbol, tf, limit):
        try: return await self.exchange.fetch_ohlcv(symbol, tf, limit=limit)
        except: return None