|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import asyncio |
|
|
import httpx |
|
|
import traceback |
|
|
import time |
|
|
from datetime import datetime |
|
|
import ccxt.async_support as ccxt |
|
|
import numpy as np |
|
|
import logging |
|
|
from typing import List, Dict, Any |
|
|
import pandas as pd |
|
|
|
|
|
try: |
|
|
import pandas_ta as ta |
|
|
except ImportError: |
|
|
print("❌ [DataManager] مكتبة pandas_ta غير موجودة.") |
|
|
ta = None |
|
|
|
|
|
from ml_engine.indicators import AdvancedTechnicalAnalyzer |
|
|
from ml_engine.monte_carlo import MonteCarloAnalyzer |
|
|
from ml_engine.ranker import Layer1Ranker |
|
|
try: |
|
|
from ml_engine.patterns import ChartPatternAnalyzer |
|
|
except ImportError: |
|
|
print("⚠️ [DataManager] لم يتم العثور على ml_engine/patterns.py") |
|
|
ChartPatternAnalyzer = None |
|
|
|
|
|
logging.getLogger("httpx").setLevel(logging.WARNING) |
|
|
logging.getLogger("httpcore").setLevel(logging.WARNING) |
|
|
logging.getLogger("ccxt").setLevel(logging.WARNING) |
|
|
|
|
|
class DataManager: |
|
|
def __init__(self, contracts_db, whale_monitor, r2_service=None): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.HYBRID_ENTRY_THRESHOLD = 0.60 |
|
|
|
|
|
|
|
|
self.contracts_db = contracts_db or {} |
|
|
self.whale_monitor = whale_monitor |
|
|
self.r2_service = r2_service |
|
|
|
|
|
self.exchange = ccxt.kucoin({ |
|
|
'enableRateLimit': True, |
|
|
'timeout': 30000, |
|
|
}) |
|
|
|
|
|
self.http_client = None |
|
|
self.market_cache = {} |
|
|
|
|
|
self.technical_analyzer = AdvancedTechnicalAnalyzer() |
|
|
self.mc_analyzer = MonteCarloAnalyzer() |
|
|
|
|
|
self.layer1_ranker = None |
|
|
self.pattern_analyzer = None |
|
|
|
|
|
async def initialize(self): |
|
|
self.http_client = httpx.AsyncClient(timeout=30.0) |
|
|
await self._load_markets() |
|
|
|
|
|
print(" > [DataManager] تهيئة النماذج المساندة (Ranker + Patterns)...") |
|
|
try: |
|
|
self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm") |
|
|
await self.layer1_ranker.initialize() |
|
|
|
|
|
if ChartPatternAnalyzer: |
|
|
self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service) |
|
|
await self.pattern_analyzer.initialize() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ [DataManager] تحذير أثناء تهيئة النماذج المساندة: {e}") |
|
|
|
|
|
print(f"✅ DataManager V12.3 initialized (Hybrid Threshold: {self.HYBRID_ENTRY_THRESHOLD})") |
|
|
|
|
|
async def _load_markets(self): |
|
|
try: |
|
|
if self.exchange: |
|
|
await self.exchange.load_markets() |
|
|
self.market_cache = self.exchange.markets |
|
|
except Exception as e: |
|
|
print(f"❌ [DataManager] فشل تحميل الأسواق: {e}") |
|
|
|
|
|
async def close(self): |
|
|
if self.http_client: await self.http_client.aclose() |
|
|
if self.exchange: await self.exchange.close() |
|
|
if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'): |
|
|
self.pattern_analyzer.clear_memory() |
|
|
if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'): |
|
|
self.layer1_ranker.clear_memory() |
|
|
|
|
|
async def layer1_rapid_screening(self) -> List[Dict[str, Any]]: |
|
|
print(f"🔍 [Layer 1] بدء الغربلة السريعة...") |
|
|
volume_data = await self._get_volume_data_live() |
|
|
if not volume_data: return [] |
|
|
candidates = volume_data[:150] |
|
|
print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.") |
|
|
return candidates |
|
|
|
|
|
async def _get_volume_data_live(self): |
|
|
try: |
|
|
tickers = await self.exchange.fetch_tickers() |
|
|
data = [] |
|
|
for s, t in tickers.items(): |
|
|
if s.endswith('/USDT') and t['quoteVolume'] and t['quoteVolume'] > 100000: |
|
|
data.append({'symbol': s, 'dollar_volume': t['quoteVolume'], 'current_price': t['last']}) |
|
|
data.sort(key=lambda x: x['dollar_volume'], reverse=True) |
|
|
return data |
|
|
except: return [] |
|
|
|
|
|
async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue): |
|
|
tfs = ['5m', '15m', '1h', '4h', '1d'] |
|
|
limit = 500 |
|
|
for sym_data in symbols: |
|
|
sym = sym_data['symbol'] |
|
|
tasks = [self._fetch_ohlcv_live(sym, tf, limit) for tf in tfs] |
|
|
results = await asyncio.gather(*tasks, return_exceptions=False) |
|
|
ohlcv = {} |
|
|
for i, res in enumerate(results): |
|
|
if res and isinstance(res, list) and len(res) >= 200: |
|
|
ohlcv[tfs[i]] = res |
|
|
if len(ohlcv) >= 4 and '5m' in ohlcv: |
|
|
sym_data['ohlcv'] = ohlcv |
|
|
await queue.put([sym_data]) |
|
|
await asyncio.sleep(0.1) |
|
|
await queue.put(None) |
|
|
|
|
|
async def _fetch_ohlcv_live(self, symbol, tf, limit): |
|
|
try: return await self.exchange.fetch_ohlcv(symbol, tf, limit=limit) |
|
|
except: return None |