Trad / data_manager.py
Riy777's picture
Update data_manager.py
6c7250f
raw
history blame
33.8 kB
# data_manager.py (Updated to V7.4 - 1H Momentum Burst Filter)
import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt
import numpy as np
import logging
from typing import List, Dict, Any
import pandas as pd
try:
import pandas_ta as ta
except ImportError:
print("⚠️ مكتبة pandas_ta غير موجودة، فلتر الغربلة المتقدم سيفشل.")
ta = None
from ml_engine.indicators import AdvancedTechnicalAnalyzer
from ml_engine.monte_carlo import MonteCarloAnalyzer
# (V8-MODIFICATION) استيراد المحرك الصحيح (V8)
from ml_engine.patterns import ChartPatternAnalyzer
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
class DataManager:
# (V8-MODIFICATION) قبول r2_service
def __init__(self, contracts_db, whale_monitor, r2_service=None):
self.contracts_db = contracts_db or {}
self.whale_monitor = whale_monitor
self.r2_service = r2_service # (V8-MODIFICATION) الإضافة الجديدة
try:
self.exchange = ccxt.kucoin({
'sandbox': False,
'enableRateLimit': True,
'timeout': 30000,
'verbose': False,
})
print("✅ تم تهيئة اتصال KuCoin بنجاح")
except Exception as e:
print(f"❌ فشل تهيئة اتصال KuCoin: {e}")
self.exchange = None
self.http_client = None
self.market_cache = {}
self.last_market_load = None
self.technical_analyzer = AdvancedTechnicalAnalyzer()
self.monte_carlo_analyzer = MonteCarloAnalyzer()
self.pattern_analyzer = None # (V8-MODIFICATION) سيتم تهيئته في initialize
async def initialize(self):
self.http_client = httpx.AsyncClient(timeout=30.0)
await self._load_markets()
# (V8-MODIFICATION) تهيئة محرك الأنماط V8 (ML-Based)
print(" > [DataManager] تهيئة محرك الأنماط V8 (ML-Based)...")
try:
self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
await self.pattern_analyzer.initialize() # (تحميل النموذج والمقياس من R2)
except Exception as e:
print(f"❌ [DataManager] فشل تهيئة محرك الأنماط V8: {e}")
# (العودة للوضع الآمن إذا فشل التحميل)
self.pattern_analyzer = ChartPatternAnalyzer(r2_service=None)
# --- (نهاية الإضافة) ---
print("✅ DataManager initialized - V7.4 (1H Momentum Burst Filter)")
async def _load_markets(self):
try:
if not self.exchange:
return
print("🔄 جلب أحدث بيانات الأسواق من KuCoin...")
self.exchange.load_markets()
self.market_cache = self.exchange.markets
self.last_market_load = datetime.now()
print(f"✅ تم تحميل {len(self.market_cache)} سوق من KuCoin")
except Exception as e:
print(f"❌ فشل تحميل بيانات الأسواق: {e}")
async def close(self):
if self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
print(" ✅ DataManager: http_client closed.")
if self.exchange:
try:
await self.exchange.close()
print(" ✅ DataManager: ccxt.kucoin exchange closed.")
except Exception as e:
print(f" ⚠️ DataManager: Error closing ccxt.kucoin: {e}")
async def get_market_context_async(self):
try:
sentiment_data = await self.get_sentiment_safe_async()
price_data = await self._get_prices_with_fallback()
bitcoin_price = price_data.get('bitcoin')
ethereum_price = price_data.get('ethereum')
market_context = {
'timestamp': datetime.now().isoformat(),
'bitcoin_price_usd': bitcoin_price,
'ethereum_price_usd': ethereum_price,
'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None,
'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'NEUTRAL',
'market_trend': self._determine_market_trend(bitcoin_price, sentiment_data),
'btc_sentiment': self._get_btc_sentiment(bitcoin_price),
'data_quality': 'HIGH' if bitcoin_price and ethereum_price else 'LOW'
}
return market_context
except Exception as e:
return self._get_minimal_market_context()
async def get_sentiment_safe_async(self):
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get("https://api.alternative.me/fng/")
response.raise_for_status()
data = response.json()
if 'data' not in data or not data['data']:
raise ValueError("بيانات المشاعر غير متوفرة")
latest_data = data['data'][0]
return {
"feargreed_value": int(latest_data['value']),
"feargreed_class": latest_data['value_classification'],
"source": "alternative.me",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return None
def _determine_market_trend(self, bitcoin_price, sentiment_data):
if bitcoin_price is None:
return "UNKNOWN"
if bitcoin_price > 60000: score = 1
elif bitcoin_price < 55000: score = -1
else: score = 0
if sentiment_data and sentiment_data.get('feargreed_value') is not None:
fear_greed = sentiment_data.get('feargreed_value')
if fear_greed > 60: score += 1
elif fear_greed < 40: score -= 1
if score >= 1: return "bull_market"
elif score <= -1: return "bear_market"
else: return "sideways_market"
def _get_btc_sentiment(self, bitcoin_price):
if bitcoin_price is None: return 'UNKNOWN'
elif bitcoin_price > 60000: return 'BULLISH'
elif bitcoin_price < 55000: return 'BEARISH'
else: return 'NEUTRAL'
async def _get_prices_with_fallback(self):
try:
prices = await self._get_prices_from_kucoin_safe()
if prices.get('bitcoin') and prices.get('ethereum'):
return prices
return await self._get_prices_from_coingecko()
except Exception as e:
return {'bitcoin': None, 'ethereum': None}
async def _get_prices_from_kucoin_safe(self):
if not self.exchange: return {'bitcoin': None, 'ethereum': None}
try:
prices = {'bitcoin': None, 'ethereum': None}
btc_ticker = self.exchange.fetch_ticker('BTC/USDT')
btc_price = float(btc_ticker.get('last', 0)) if btc_ticker.get('last') else None
if btc_price and btc_price > 0: prices['bitcoin'] = btc_price
eth_ticker = self.exchange.fetch_ticker('ETH/USDT')
eth_price = float(eth_ticker.get('last', 0)) if eth_ticker.get('last') else None
if eth_price and eth_price > 0: prices['ethereum'] = eth_price
return prices
except Exception as e:
return {'bitcoin': None, 'ethereum': None}
async def _get_prices_from_coingecko(self):
try:
await asyncio.sleep(0.5)
url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json'}
async with httpx.AsyncClient(headers=headers) as client:
response = await client.get(url, timeout=10)
if response.status_code == 429:
await asyncio.sleep(2)
response = await client.get(url, timeout=10)
response.raise_for_status()
data = response.json()
btc_price = data.get('bitcoin', {}).get('usd')
eth_price = data.get('ethereum', {}).get('usd')
if btc_price and eth_price:
return {'bitcoin': btc_price, 'ethereum': eth_price}
else:
return {'bitcoin': None, 'ethereum': None}
except Exception as e:
return {'bitcoin': None, 'ethereum': None}
def _get_minimal_market_context(self):
return {
'timestamp': datetime.now().isoformat(),
'data_available': False,
'market_trend': 'UNKNOWN',
'btc_sentiment': 'UNKNOWN',
'data_quality': 'LOW'
}
def _create_dataframe(self, candles: List) -> pd.DataFrame:
"""(V7.1) دالة مساعدة لإنشاء DataFrame لتحليل 1H"""
try:
if not candles: return pd.DataFrame()
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
return df
except Exception as e:
print(f"❌ خطأ في إنشاء DataFrame لمرشح 1H: {e}")
return pd.DataFrame()
# 🔴 --- START OF CHANGE (V7.4) --- 🔴
# (دالة مساعدة جديدة لتقسيم منطق MC)
def _get_mc_score_for_filter(self, analysis: Dict) -> float:
"""(V7.4) (دالة مساعدة) لحساب درجة مونت كارلو للفلتر"""
mc_distribution = analysis.get('monte_carlo_distribution')
monte_carlo_score = 0
if mc_distribution and mc_distribution.get('error') is None:
prob_gain = mc_distribution.get('probability_of_gain', 0)
var_95_value = mc_distribution.get('risk_metrics', {}).get('VaR_95_value', 0)
current_price = analysis.get('current_price', 1)
if current_price > 0:
normalized_var = var_95_value / current_price
risk_penalty = 1.0
if normalized_var > 0.05: risk_penalty = 0.5
elif normalized_var > 0.03: risk_penalty = 0.8
normalized_prob_score = max(0.0, (prob_gain - 0.5) * 2)
monte_carlo_score = normalized_prob_score * risk_penalty
return monte_carlo_score
def _calculate_1h_filter_score(self, analysis: Dict) -> float:
"""
(محدث V7.4 - فلتر الزخم المتفجر 1H)
"فلتر شمس منتصف الظهر"
يبحث عن:
1. انفجار الحجم (Volume Explosion)
2. قوة الاتجاه (Trend Strength - ADX)
3. المنطقة الآمنة (RSI Safe Zone)
4. (يحتوي على واقي العملات المستقرة V7.2)
"""
try:
# (V7.2) واقي العملات المستقرة (لا تغيير)
ohlcv_candles = analysis.get('ohlcv_1h', {}).get('1h', [])
if not ohlcv_candles or len(ohlcv_candles) < 30: # (تحتاج 30 لـ ADX و Vol MA)
return 0.0
closes_1h = [c[4] for c in ohlcv_candles]
if len(closes_1h) > 20: # (التحقق من العملة المستقرة أولاً)
std_dev = np.std(closes_1h[-20:])
if std_dev < 1e-5:
# print(f" - {analysis.get('symbol', 'N/A')}: تم الاستبعاد (عملة مستقرة)")
return 0.0
# --- (الإضافة الجديدة: حساب المؤشرات المتقدمة للفلتر) ---
if ta is None: # (التحقق من pandas_ta)
return 0.0 # لا يمكن الحساب بدون المكتبة
df = self._create_dataframe(ohlcv_candles) # (إعادة إنشاء DF لحساب ADX/Vol)
if df.empty:
return 0.0
# 1. حساب مؤشرات الزخم المتفجر
volume = df['volume']
vol_ma = ta.sma(volume, length=20)
if vol_ma is None or vol_ma.empty: return 0.0
current_volume = volume.iloc[-1]
avg_volume = vol_ma.iloc[-1]
adx_data = ta.adx(df['high'], df['low'], df['close'], length=14)
current_adx = adx_data['ADX_14'].iloc[-1] if adx_data is not None and not adx_data.empty else 0
# 2. جلب المؤشرات الأساسية (المحسوبة مسبقاً)
indicators = analysis.get('advanced_indicators', {}).get('1h', {})
rsi = indicators.get('rsi', 50)
# 3. جلب درجة مونت كارلو (المحسوبة مسبقاً)
monte_carlo_score = self._get_mc_score_for_filter(analysis)
# 4. جلب درجة الأنماط (المحسوبة مسبقاً)
pattern_confidence = analysis.get('pattern_analysis', {}).get('pattern_confidence', 0)
# --- (منطق الفلترة الجديد) ---
# المعايير الصارمة لـ "شمس منتصف الظهر"
VOL_MULTIPLIER = 1.75 # (يجب أن يكون الحجم الحالي 1.75x المتوسط)
ADX_THRESHOLD = 25.0 # (يجب أن يكون الاتجاه قوياً)
RSI_MIN = 60 # (يجب أن يكون في منطقة صاعدة)
RSI_MAX = 85 # (يجب ألا يكون منهكاً تماماً)
vol_score = 0.0
if avg_volume > 0:
# (تطبيع درجة الحجم: 1.0 إذا كان يساوي أو يفوق المضاعف)
vol_score = min(1.0, max(0.0, (current_volume / avg_volume) / VOL_MULTIPLIER))
# (تطبيع درجة ADX: 0.0 عند 25، و 1.0 عند 40+)
adx_score = min(1.0, max(0.0, (current_adx - ADX_THRESHOLD) / 15.0))
rsi_score = 0.0
if RSI_MIN <= rsi <= RSI_MAX:
rsi_score = 1.0
elif rsi > RSI_MAX: # (عقوبة بسيطة للإرهاق)
rsi_score = 0.5
# (الأوزان الجديدة) - إعطاء الأولوية للزخم والحجم
WEIGHT_VOL = 0.30
WEIGHT_ADX = 0.30
WEIGHT_RSI = 0.15
WEIGHT_MC = 0.15
WEIGHT_PATTERN = 0.10 # (تقليل أهمية النمط أثناء الانفجار)
final_score = (
(vol_score * WEIGHT_VOL) +
(adx_score * WEIGHT_ADX) +
(rsi_score * WEIGHT_RSI) +
(monte_carlo_score * WEIGHT_MC) +
(pattern_confidence * WEIGHT_PATTERN)
)
# (العتبة (Threshold) لا تزال 0.50 كما هي في V7.3)
return min(max(final_score, 0.0), 1.0)
except Exception as e:
# print(f"❌ خطأ في حساب درجة فلتر 1H (V-Burst): {e}")
return 0.0
# 🔴 --- END OF CHANGE --- 🔴
async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
"""
الطبقة 1: فحص سريع - (محدث بالكامل V7.3)
"""
print("📊 الطبقة 1 (V7.4): بدء الغربلة (الكاشف المتفجر 1H)...")
# الخطوة 1: جلب أفضل 100 عملة حسب الحجم
volume_data = await self._get_volume_data_optimal()
if not volume_data:
volume_data = await self._get_volume_data_direct_api()
if not volume_data:
print("❌ فشل جلب بيانات الأحجام للطبقة 1")
return []
volume_data.sort(key=lambda x: x['dollar_volume'], reverse=True)
top_100_by_volume = volume_data[:100]
print(f"✅ تم تحديد أفضل {len(top_100_by_volume)} عملة. بدء تشغيل الكاشف المتفجر (1H)...")
final_candidates = []
batch_size = 20
for i in range(0, len(top_100_by_volume), batch_size):
batch_symbols_data = top_100_by_volume[i:i + batch_size]
batch_symbols = [s['symbol'] for s in batch_symbols_data]
print(f" 🔄 معالجة دفعة {int(i/batch_size) + 1}/{(len(top_100_by_volume) + batch_size - 1) // batch_size} ({len(batch_symbols)} عملة)...")
# الخطوة 2: جلب بيانات 1H بالتوازي
tasks = [self._fetch_1h_ohlcv_for_screening(symbol) for symbol in batch_symbols]
results_candles = await asyncio.gather(*tasks, return_exceptions=True)
analysis_tasks = []
valid_symbol_data = []
for j, (candles) in enumerate(results_candles):
symbol_data = batch_symbols_data[j]
symbol = symbol_data['symbol']
if isinstance(candles, Exception) or not candles or len(candles) < 50:
continue
ohlcv_1h_only = {'1h': candles}
symbol_data['ohlcv_1h'] = ohlcv_1h_only
symbol_data['current_price'] = candles[-1][4]
analysis_tasks.append(self._run_mini_detector(symbol_data))
valid_symbol_data.append(symbol_data)
if not analysis_tasks:
continue
analysis_results = await asyncio.gather(*analysis_tasks, return_exceptions=True)
for j, (analysis_output) in enumerate(analysis_results):
symbol_data = valid_symbol_data[j]
symbol = symbol_data['symbol']
if isinstance(analysis_output, Exception):
print(f" - {symbol}: فشل الكاشف المصغر ({analysis_output})")
continue
analysis_output['ohlcv_1h'] = symbol_data['ohlcv_1h']
analysis_output['symbol'] = symbol
# (استدعاء الدالة الجديدة V7.4)
filter_score = self._calculate_1h_filter_score(analysis_output)
# (لا تغيير في العتبة، V7.3)
if filter_score >= 0.50:
print(f" ✅ {symbol}: نجح (الدرجة: {filter_score:.2f})")
symbol_data['layer1_score'] = filter_score
symbol_data['reasons_for_candidacy'] = [f'1H_MOMENTUM_BURST']
if 'ohlcv_1h' in symbol_data: del symbol_data['ohlcv_1h']
final_candidates.append(symbol_data)
print(f"🎯 اكتملت الغربلة (V7.4). تم تأهيل {len(final_candidates)} عملة من أصل 100 للطبقة 2.")
print("🏆 المرشحون الناجحون:")
for k, candidate in enumerate(final_candidates[:15]):
score = candidate.get('layer1_score', 0)
volume = candidate.get('dollar_volume', 0)
print(f" {k+1:2d}. {candidate['symbol']}: (الدرجة: {score:.2f}) | ${volume:,.0f}")
return final_candidates
async def _run_mini_detector(self, symbol_data: Dict) -> Dict:
"""(V7.1) يشغل المحللات الأساسية بالتوازي على بيانات 1H فقط"""
ohlcv_1h = symbol_data.get('ohlcv_1h')
current_price = symbol_data.get('current_price')
df = self._create_dataframe(ohlcv_1h.get('1h'))
if df.empty:
raise ValueError("DataFrame فارغ لتحليل 1H")
analysis_dict = {'current_price': current_price}
task_indicators = self.technical_analyzer.calculate_all_indicators(df, '1h')
task_mc = self.monte_carlo_analyzer.generate_1h_price_distribution(ohlcv_1h)
# (V8-MODIFICATION) استخدام الدالة الجديدة
task_pattern = self.pattern_analyzer.detect_chart_patterns(ohlcv_1h)
results = await asyncio.gather(task_mc, task_pattern, return_exceptions=True)
analysis_dict['advanced_indicators'] = {'1h': task_indicators}
if not isinstance(results[0], Exception):
analysis_dict['monte_carlo_distribution'] = results[0]
if not isinstance(results[1], Exception):
analysis_dict['pattern_analysis'] = results[1]
return analysis_dict
async def _fetch_1h_ohlcv_for_screening(self, symbol: str) -> List:
"""(V7.1) جلب 100 شمعة لإطار الساعة (1H) للغربلة السريعة"""
try:
ohlcv_data = self.exchange.fetch_ohlcv(symbol, '1h', limit=100)
if not ohlcv_data or len(ohlcv_data) < 50:
return None
return ohlcv_data
except Exception:
return None
async def _get_volume_data_optimal(self) -> List[Dict[str, Any]]:
try:
if not self.exchange: return []
tickers = self.exchange.fetch_tickers()
volume_data = []
for symbol, ticker in tickers.items():
if not symbol.endswith('/USDT') or not ticker.get('active', True): continue
current_price = ticker.get('last', 0)
quote_volume = ticker.get('quoteVolume', 0)
if current_price is None or current_price <= 0: continue
if quote_volume is not None and quote_volume > 0:
dollar_volume = quote_volume
else:
base_volume = ticker.get('baseVolume', 0)
if base_volume is None: continue
dollar_volume = base_volume * current_price
if dollar_volume is None or dollar_volume < 50000: continue
price_change_24h = ticker.get('percentage', 0) or 0
if price_change_24h is None: price_change_24h = 0
volume_data.append({
'symbol': symbol, 'dollar_volume': dollar_volume,
'current_price': current_price, 'volume_24h': ticker.get('baseVolume', 0) or 0,
'price_change_24h': price_change_24h
})
print(f"✅ تم معالجة {len(volume_data)} عملة في الطريقة المثلى (لجلب الحجم)")
return volume_data
except Exception as e:
print(f"❌ خطأ في جلب بيانات الحجم المثلى: {e}")
return []
async def _get_volume_data_direct_api(self) -> List[Dict[str, Any]]:
try:
url = "https://api.kucoin.com/api/v1/market/allTickers"
async with httpx.AsyncClient(timeout=15) as client:
response = await client.get(url)
response.raise_for_status()
data = response.json()
if data.get('code') != '200000': raise ValueError(f"استجابة API غير متوقعة: {data.get('code')}")
tickers = data['data']['ticker']
volume_data = []
for ticker in tickers:
symbol = ticker['symbol']
if not symbol.endswith('USDT'): continue
formatted_symbol = symbol.replace('-', '/')
try:
vol_value = ticker.get('volValue')
last_price = ticker.get('last')
change_rate = ticker.get('changeRate')
vol = ticker.get('vol')
if vol_value is None or last_price is None or change_rate is None or vol is None: continue
dollar_volume = float(vol_value) if vol_value else 0
current_price = float(last_price) if last_price else 0
price_change = (float(change_rate) * 100) if change_rate else 0
volume_24h = float(vol) if vol else 0
if dollar_volume >= 50000 and current_price > 0:
volume_data.append({
'symbol': formatted_symbol, 'dollar_volume': dollar_volume,
'current_price': current_price, 'volume_24h': volume_24h,
'price_change_24h': price_change
})
except (ValueError, TypeError, KeyError) as e: continue
print(f"✅ تم معالجة {len(volume_data)} عملة في الطريقة المباشرة (لجلب الحجم)")
return volume_data
except Exception as e:
print(f"❌ خطأ في جلب بيانات الحجم المباشر: {e}")
return []
async def stream_ohlcv_data(self, symbols: List[Dict[str, Any]], queue: asyncio.Queue):
"""
(محدث V7.2)
جلب بيانات OHLCV كاملة (6 أطر زمنية) للعملات الناجحة فقط
"""
print(f"📊 بدء تدفق بيانات OHLCV (الكاملة) لـ {len(symbols)} عملة (الناجحين من الغربلة)...")
batch_size = 15
batches = [symbols[i:i + batch_size] for i in range(0, len(symbols), batch_size)]
total_successful = 0
for batch_num, batch in enumerate(batches):
print(f" 🔄 [المنتج] جلب الدفعة {batch_num + 1}/{len(batches)} ({len(batch)} عملة)...")
batch_tasks = []
# (V7.2 FIX)
for symbol_data in batch:
symbol_str = symbol_data['symbol']
task = asyncio.create_task(self._fetch_complete_ohlcv_parallel(symbol_str))
batch_tasks.append(task)
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
successful_data_for_batch = []
successful_count = 0
for i, result in enumerate(batch_results):
original_symbol_data = batch[i]
symbol_str = original_symbol_data['symbol']
if isinstance(result, Exception):
print(f" ❌ [المنتج] فشل جلب {symbol_str}: {result}")
elif result is not None:
result.update(original_symbol_data)
successful_data_for_batch.append(result)
successful_count += 1
timeframes_count = result.get('successful_timeframes', 0)
print(f" ✅ [المنتج] {symbol_str}: {timeframes_count}/6 أطر زمنية")
else:
print(f" ⚠️ [المنتج] {symbol_str}: بيانات غير كافية، تم التجاهل")
print(f" 📦 [المنتج] اكتملت الدفعة {batch_num + 1}: {successful_count}/{len(batch)} ناجحة")
if successful_data_for_batch:
try:
await queue.put(successful_data_for_batch)
print(f" 📬 [المنتج] تم إرسال {len(successful_data_for_batch)} عملة إلى طابور المعالجة")
total_successful += len(successful_data_for_batch)
except Exception as q_err:
print(f" ❌ [المنتج] فشل إرسال الدفعة للطابور: {q_err}")
if batch_num < len(batches) - 1:
await asyncio.sleep(1)
print(f"✅ [المنتج] اكتمل تدفق بيانات OHLCV (الكاملة). تم إرسال {total_successful} عملة للمعالجة.")
try:
await queue.put(None)
print(" 📬 [المنتج] تم إرسال إشارة الإنهاء (None) إلى الطابور.")
except Exception as q_err:
print(f" ❌ [المنتج] فشل إرسال إشارة الإنهاء (None) للطابور: {q_err}")
async def _fetch_complete_ohlcv_parallel(self, symbol: str) -> Dict[str, Any]:
"""(V7.2) جلب بيانات OHLCV كاملة - يتوقع 'symbol' كنص"""
try:
ohlcv_data = {}
timeframes = [
('5m', 200), ('15m', 200), ('1h', 200),
('4h', 200), ('1d', 200), ('1w', 200),
]
timeframe_tasks = []
for timeframe, limit in timeframes:
task = asyncio.create_task(self._fetch_single_timeframe_improved(symbol, timeframe, limit))
timeframe_tasks.append(task)
timeframe_results = await asyncio.gather(*timeframe_tasks, return_exceptions=True)
successful_timeframes = 0
min_required_timeframes = 2
for i, (timeframe, limit) in enumerate(timeframes):
result = timeframe_results[i]
if isinstance(result, Exception): continue
# (V8-MODIFICATION) زيادة الحد الأدنى من الشموع لاستيعاب المؤشرات
if result and len(result) >= 200: # (كان 10)
ohlcv_data[timeframe] = result
successful_timeframes += 1
# (V8-MODIFICATION) زيادة الحد الأدنى للأطر الزمنية
if successful_timeframes >= 3 and ohlcv_data: # (كان 2)
try:
current_price = await self.get_latest_price_async(symbol)
if current_price is None:
for timeframe_data in ohlcv_data.values():
if timeframe_data and len(timeframe_data) > 0:
last_candle = timeframe_data[-1]
if len(last_candle) >= 5:
current_price = last_candle[4]; break
if current_price is None: return None
result_data = {
'symbol': symbol, 'ohlcv': ohlcv_data, 'raw_ohlcv': ohlcv_data,
'current_price': current_price, 'timestamp': datetime.now().isoformat(),
'candles_count': {tf: len(data) for tf, data in ohlcv_data.items()},
'successful_timeframes': successful_timeframes
}
return result_data
except Exception as price_error: return None
else: return None
except Exception as e: return None
async def _fetch_single_timeframe_improved(self, symbol: str, timeframe: str, limit: int):
"""(V7.2) جلب بيانات إطار زمني واحد - يتوقع 'symbol' كنص"""
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
ohlcv_data = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
if ohlcv_data and len(ohlcv_data) > 0:
return ohlcv_data
else:
return []
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay * (attempt + 1))
else:
return []
async def get_latest_price_async(self, symbol):
"""(V7.2) جلب السعر الحالي - يتوقع 'symbol' كنص"""
try:
if not self.exchange: return None
if not symbol or '/' not in symbol: return None
ticker = self.exchange.fetch_ticker(symbol)
if not ticker: return None
current_price = ticker.get('last')
if current_price is None: return None
return float(current_price)
except Exception as e: return None
async def get_whale_data_for_symbol(self, symbol):
try:
if self.whale_monitor:
whale_data = await self.whale_monitor.get_symbol_whale_activity(symbol)
return whale_data
else: return None
except Exception as e: return None
async def get_whale_trading_signal(self, symbol, whale_data, market_context):
try:
if self.whale_monitor:
return await self.whale_monitor.generate_whale_trading_signal(symbol, whale_data, market_context)
else:
return {'action': 'HOLD', 'confidence': 0.3, 'reason': 'Whale monitor not available', 'source': 'whale_analysis'}
except Exception as e:
return {'action': 'HOLD', 'confidence': 0.3, 'reason': f'Error: {str(e)}', 'source': 'whale_analysis'}
print("✅ DataManager loaded - V7.4 (1H Momentum Burst Filter)")