Trad / data_manager.py
Riy777's picture
Update data_manager.py
6b00681
raw
history blame
31.4 kB
# data_manager.py (Updated to V7.3 - V8 Pattern Engine Init)
import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt
import numpy as np
import logging
from typing import List, Dict, Any
import pandas as pd
try:
import pandas_ta as ta
except ImportError:
print("⚠️ مكتبة pandas_ta غير موجودة، فلتر الغربلة المتقدم سيفشل.")
ta = None
from ml_engine.indicators import AdvancedTechnicalAnalyzer
from ml_engine.monte_carlo import MonteCarloAnalyzer
# (V8-MODIFICATION) استيراد المحرك الصحيح (V8)
from ml_engine.patterns import ChartPatternAnalyzer
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
class DataManager:
# (V8-MODIFICATION) قبول r2_service
def __init__(self, contracts_db, whale_monitor, r2_service=None):
self.contracts_db = contracts_db or {}
self.whale_monitor = whale_monitor
self.r2_service = r2_service # (V8-MODIFICATION) الإضافة الجديدة
try:
self.exchange = ccxt.kucoin({
'sandbox': False,
'enableRateLimit': True,
'timeout': 30000,
'verbose': False,
})
print("✅ تم تهيئة اتصال KuCoin بنجاح")
except Exception as e:
print(f"❌ فشل تهيئة اتصال KuCoin: {e}")
self.exchange = None
self.http_client = None
self.market_cache = {}
self.last_market_load = None
self.technical_analyzer = AdvancedTechnicalAnalyzer()
self.monte_carlo_analyzer = MonteCarloAnalyzer()
self.pattern_analyzer = None # (V8-MODIFICATION) سيتم تهيئته في initialize
async def initialize(self):
self.http_client = httpx.AsyncClient(timeout=30.0)
await self._load_markets()
# (V8-MODIFICATION) تهيئة محرك الأنماط V8 (ML-Based)
print(" > [DataManager] تهيئة محرك الأنماط V8 (ML-Based)...")
try:
self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
await self.pattern_analyzer.initialize() # (تحميل النموذج والمقياس من R2)
except Exception as e:
print(f"❌ [DataManager] فشل تهيئة محرك الأنماط V8: {e}")
# (العودة للوضع الآمن إذا فشل التحميل)
self.pattern_analyzer = ChartPatternAnalyzer(r2_service=None)
# --- (نهاية الإضافة) ---
print("✅ DataManager initialized - V7.3 (L1 Threshold @ 0.50)")
async def _load_markets(self):
try:
if not self.exchange:
return
print("🔄 جلب أحدث بيانات الأسواق من KuCoin...")
self.exchange.load_markets()
self.market_cache = self.exchange.markets
self.last_market_load = datetime.now()
print(f"✅ تم تحميل {len(self.market_cache)} سوق من KuCoin")
except Exception as e:
print(f"❌ فشل تحميل بيانات الأسواق: {e}")
async def close(self):
if self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
print(" ✅ DataManager: http_client closed.")
if self.exchange:
try:
await self.exchange.close()
print(" ✅ DataManager: ccxt.kucoin exchange closed.")
except Exception as e:
print(f" ⚠️ DataManager: Error closing ccxt.kucoin: {e}")
async def get_market_context_async(self):
try:
sentiment_data = await self.get_sentiment_safe_async()
price_data = await self._get_prices_with_fallback()
bitcoin_price = price_data.get('bitcoin')
ethereum_price = price_data.get('ethereum')
market_context = {
'timestamp': datetime.now().isoformat(),
'bitcoin_price_usd': bitcoin_price,
'ethereum_price_usd': ethereum_price,
'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None,
'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'NEUTRAL',
'market_trend': self._determine_market_trend(bitcoin_price, sentiment_data),
'btc_sentiment': self._get_btc_sentiment(bitcoin_price),
'data_quality': 'HIGH' if bitcoin_price and ethereum_price else 'LOW'
}
return market_context
except Exception as e:
return self._get_minimal_market_context()
async def get_sentiment_safe_async(self):
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get("https://api.alternative.me/fng/")
response.raise_for_status()
data = response.json()
if 'data' not in data or not data['data']:
raise ValueError("بيانات المشاعر غير متوفرة")
latest_data = data['data'][0]
return {
"feargreed_value": int(latest_data['value']),
"feargreed_class": latest_data['value_classification'],
"source": "alternative.me",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return None
def _determine_market_trend(self, bitcoin_price, sentiment_data):
if bitcoin_price is None:
return "UNKNOWN"
if bitcoin_price > 60000: score = 1
elif bitcoin_price < 55000: score = -1
else: score = 0
if sentiment_data and sentiment_data.get('feargreed_value') is not None:
fear_greed = sentiment_data.get('feargreed_value')
if fear_greed > 60: score += 1
elif fear_greed < 40: score -= 1
if score >= 1: return "bull_market"
elif score <= -1: return "bear_market"
else: return "sideways_market"
def _get_btc_sentiment(self, bitcoin_price):
if bitcoin_price is None: return 'UNKNOWN'
elif bitcoin_price > 60000: return 'BULLISH'
elif bitcoin_price < 55000: return 'BEARISH'
else: return 'NEUTRAL'
async def _get_prices_with_fallback(self):
try:
prices = await self._get_prices_from_kucoin_safe()
if prices.get('bitcoin') and prices.get('ethereum'):
return prices
return await self._get_prices_from_coingecko()
except Exception as e:
return {'bitcoin': None, 'ethereum': None}
async def _get_prices_from_kucoin_safe(self):
if not self.exchange: return {'bitcoin': None, 'ethereum': None}
try:
prices = {'bitcoin': None, 'ethereum': None}
btc_ticker = self.exchange.fetch_ticker('BTC/USDT')
btc_price = float(btc_ticker.get('last', 0)) if btc_ticker.get('last') else None
if btc_price and btc_price > 0: prices['bitcoin'] = btc_price
eth_ticker = self.exchange.fetch_ticker('ETH/USDT')
eth_price = float(eth_ticker.get('last', 0)) if eth_ticker.get('last') else None
if eth_price and eth_price > 0: prices['ethereum'] = eth_price
return prices
except Exception as e:
return {'bitcoin': None, 'ethereum': None}
async def _get_prices_from_coingecko(self):
try:
await asyncio.sleep(0.5)
url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json'}
async with httpx.AsyncClient(headers=headers) as client:
response = await client.get(url, timeout=10)
if response.status_code == 429:
await asyncio.sleep(2)
response = await client.get(url, timeout=10)
response.raise_for_status()
data = response.json()
btc_price = data.get('bitcoin', {}).get('usd')
eth_price = data.get('ethereum', {}).get('usd')
if btc_price and eth_price:
return {'bitcoin': btc_price, 'ethereum': eth_price}
else:
return {'bitcoin': None, 'ethereum': None}
except Exception as e:
return {'bitcoin': None, 'ethereum': None}
def _get_minimal_market_context(self):
return {
'timestamp': datetime.now().isoformat(),
'data_available': False,
'market_trend': 'UNKNOWN',
'btc_sentiment': 'UNKNOWN',
'data_quality': 'LOW'
}
def _create_dataframe(self, candles: List) -> pd.DataFrame:
"""(V7.1) دالة مساعدة لإنشاء DataFrame لتحليل 1H"""
try:
if not candles: return pd.DataFrame()
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
return df
except Exception as e:
print(f"❌ خطأ في إنشاء DataFrame لمرشح 1H: {e}")
return pd.DataFrame()
def _calculate_1h_filter_score(self, analysis: Dict) -> float:
"""
(محدث V7.2) - "الكاشف المصغر"
يحتوي الآن على "واقي العملات المستقرة"
"""
try:
# (V7.2) واقي العملات المستقرة
if 'ohlcv_1h' in analysis and '1h' in analysis['ohlcv_1h']:
closes_1h = [c[4] for c in analysis['ohlcv_1h']['1h']]
if len(closes_1h) > 20:
std_dev = np.std(closes_1h[-20:])
if std_dev < 1e-5:
print(f" - {analysis.get('symbol', 'N/A')}: تم الاستبعاد (عملة مستقرة)")
return 0.0
# 1. درجة الأنماط (Pattern Score)
pattern_confidence = analysis.get('pattern_analysis', {}).get('pattern_confidence', 0)
# 2. درجة مونت كارلو (Monte Carlo Score)
mc_distribution = analysis.get('monte_carlo_distribution')
monte_carlo_score = 0
if mc_distribution and mc_distribution.get('error') is None:
prob_gain = mc_distribution.get('probability_of_gain', 0)
var_95_value = mc_distribution.get('risk_metrics', {}).get('VaR_95_value', 0)
current_price = analysis.get('current_price', 1)
if current_price > 0:
normalized_var = var_95_value / current_price
risk_penalty = 1.0
if normalized_var > 0.05: risk_penalty = 0.5
elif normalized_var > 0.03: risk_penalty = 0.8
normalized_prob_score = max(0.0, (prob_gain - 0.5) * 2)
monte_carlo_score = normalized_prob_score * risk_penalty
# 3. درجة المؤشرات (Indicator Score)
indicator_score = 0
indicators = analysis.get('advanced_indicators', {}).get('1h', {})
if indicators:
rsi = indicators.get('rsi', 50)
macd_hist = indicators.get('macd_hist', 0)
ema_9 = indicators.get('ema_9', 0)
ema_21 = indicators.get('ema_21', 0)
if rsi > 55 and macd_hist > 0 and ema_9 > ema_21:
indicator_score = min(0.5 + (rsi - 55) / 50 + (macd_hist / (analysis.get('current_price', 1) * 0.001)), 1.0)
elif rsi < 35:
indicator_score = min(0.4 + (35 - rsi) / 35, 0.8)
# 4. حساب النتيجة النهائية (بدون استراتيجيات أو حيتان)
components = []
weights = []
if monte_carlo_score > 0: components.append(monte_carlo_score); weights.append(0.40)
if pattern_confidence > 0: components.append(pattern_confidence); weights.append(0.30)
if indicator_score > 0: components.append(indicator_score); weights.append(0.30)
if not components: return 0
total_weight = sum(weights)
if total_weight == 0: return 0
enhanced_score = sum(comp * weight for comp, weight in zip(components, weights)) / total_weight
return min(max(enhanced_score, 0.0), 1.0)
except Exception as e:
print(f"❌ خطأ في حساب درجة فلتر 1H: {e}")
return 0.0
async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
"""
الطبقة 1: فحص سريع - (محدث بالكامل V7.3)
"""
print("📊 الطبقة 1 (V7.3): بدء الغربلة (الكاشف المصغر 1H)...")
# الخطوة 1: جلب أفضل 100 عملة حسب الحجم
volume_data = await self._get_volume_data_optimal()
if not volume_data:
volume_data = await self._get_volume_data_direct_api()
if not volume_data:
print("❌ فشل جلب بيانات الأحجام للطبقة 1")
return []
volume_data.sort(key=lambda x: x['dollar_volume'], reverse=True)
top_100_by_volume = volume_data[:100]
print(f"✅ تم تحديد أفضل {len(top_100_by_volume)} عملة. بدء تشغيل الكاشف المصغر (1H)...")
final_candidates = []
batch_size = 20
for i in range(0, len(top_100_by_volume), batch_size):
batch_symbols_data = top_100_by_volume[i:i + batch_size]
batch_symbols = [s['symbol'] for s in batch_symbols_data]
print(f" 🔄 معالجة دفعة {int(i/batch_size) + 1}/{(len(top_100_by_volume) + batch_size - 1) // batch_size} ({len(batch_symbols)} عملة)...")
# الخطوة 2: جلب بيانات 1H بالتوازي
tasks = [self._fetch_1h_ohlcv_for_screening(symbol) for symbol in batch_symbols]
results_candles = await asyncio.gather(*tasks, return_exceptions=True)
analysis_tasks = []
valid_symbol_data = []
for j, (candles) in enumerate(results_candles):
symbol_data = batch_symbols_data[j]
symbol = symbol_data['symbol']
if isinstance(candles, Exception) or not candles or len(candles) < 50:
continue
ohlcv_1h_only = {'1h': candles}
symbol_data['ohlcv_1h'] = ohlcv_1h_only
symbol_data['current_price'] = candles[-1][4]
analysis_tasks.append(self._run_mini_detector(symbol_data))
valid_symbol_data.append(symbol_data)
if not analysis_tasks:
continue
analysis_results = await asyncio.gather(*analysis_tasks, return_exceptions=True)
for j, (analysis_output) in enumerate(analysis_results):
symbol_data = valid_symbol_data[j]
symbol = symbol_data['symbol']
if isinstance(analysis_output, Exception):
print(f" - {symbol}: فشل الكاشف المصغر ({analysis_output})")
continue
analysis_output['ohlcv_1h'] = symbol_data['ohlcv_1h']
analysis_output['symbol'] = symbol
filter_score = self._calculate_1h_filter_score(analysis_output)
# 🔴 --- START OF CHANGE (V7.3) --- 🔴
# (رفع العتبة من 0.20 إلى 0.50)
if filter_score >= 0.50:
# 🔴 --- END OF CHANGE --- 🔴
print(f" ✅ {symbol}: نجح (الدرجة: {filter_score:.2f})")
symbol_data['layer1_score'] = filter_score
symbol_data['reasons_for_candidacy'] = [f'1H_DETECTOR_PASS']
if 'ohlcv_1h' in symbol_data: del symbol_data['ohlcv_1h']
final_candidates.append(symbol_data)
print(f"🎯 اكتملت الغربلة (V7.3). تم تأهيل {len(final_candidates)} عملة من أصل 100 للطبقة 2.")
print("🏆 المرشحون الناجحون:")
for k, candidate in enumerate(final_candidates[:15]):
score = candidate.get('layer1_score', 0)
volume = candidate.get('dollar_volume', 0)
print(f" {k+1:2d}. {candidate['symbol']}: (الدرجة: {score:.2f}) | ${volume:,.0f}")
return final_candidates
async def _run_mini_detector(self, symbol_data: Dict) -> Dict:
"""(V7.1) يشغل المحللات الأساسية بالتوازي على بيانات 1H فقط"""
ohlcv_1h = symbol_data.get('ohlcv_1h')
current_price = symbol_data.get('current_price')
df = self._create_dataframe(ohlcv_1h.get('1h'))
if df.empty:
raise ValueError("DataFrame فارغ لتحليل 1H")
analysis_dict = {'current_price': current_price}
task_indicators = self.technical_analyzer.calculate_all_indicators(df, '1h')
task_mc = self.monte_carlo_analyzer.generate_1h_price_distribution(ohlcv_1h)
# (V8-MODIFICATION) استخدام الدالة الجديدة
task_pattern = self.pattern_analyzer.detect_chart_patterns(ohlcv_1h)
results = await asyncio.gather(task_mc, task_pattern, return_exceptions=True)
analysis_dict['advanced_indicators'] = {'1h': task_indicators}
if not isinstance(results[0], Exception):
analysis_dict['monte_carlo_distribution'] = results[0]
if not isinstance(results[1], Exception):
analysis_dict['pattern_analysis'] = results[1]
return analysis_dict
async def _fetch_1h_ohlcv_for_screening(self, symbol: str) -> List:
"""(V7.1) جلب 100 شمعة لإطار الساعة (1H) للغربلة السريعة"""
try:
ohlcv_data = self.exchange.fetch_ohlcv(symbol, '1h', limit=100)
if not ohlcv_data or len(ohlcv_data) < 50:
return None
return ohlcv_data
except Exception:
return None
async def _get_volume_data_optimal(self) -> List[Dict[str, Any]]:
try:
if not self.exchange: return []
tickers = self.exchange.fetch_tickers()
volume_data = []
for symbol, ticker in tickers.items():
if not symbol.endswith('/USDT') or not ticker.get('active', True): continue
current_price = ticker.get('last', 0)
quote_volume = ticker.get('quoteVolume', 0)
if current_price is None or current_price <= 0: continue
if quote_volume is not None and quote_volume > 0:
dollar_volume = quote_volume
else:
base_volume = ticker.get('baseVolume', 0)
if base_volume is None: continue
dollar_volume = base_volume * current_price
if dollar_volume is None or dollar_volume < 50000: continue
price_change_24h = ticker.get('percentage', 0) or 0
if price_change_24h is None: price_change_24h = 0
volume_data.append({
'symbol': symbol, 'dollar_volume': dollar_volume,
'current_price': current_price, 'volume_24h': ticker.get('baseVolume', 0) or 0,
'price_change_24h': price_change_24h
})
print(f"✅ تم معالجة {len(volume_data)} عملة في الطريقة المثلى (لجلب الحجم)")
return volume_data
except Exception as e:
print(f"❌ خطأ في جلب بيانات الحجم المثلى: {e}")
return []
async def _get_volume_data_direct_api(self) -> List[Dict[str, Any]]:
try:
url = "https://api.kucoin.com/api/v1/market/allTickers"
async with httpx.AsyncClient(timeout=15) as client:
response = await client.get(url)
response.raise_for_status()
data = response.json()
if data.get('code') != '200000': raise ValueError(f"استجابة API غير متوقعة: {data.get('code')}")
tickers = data['data']['ticker']
volume_data = []
for ticker in tickers:
symbol = ticker['symbol']
if not symbol.endswith('USDT'): continue
formatted_symbol = symbol.replace('-', '/')
try:
vol_value = ticker.get('volValue')
last_price = ticker.get('last')
change_rate = ticker.get('changeRate')
vol = ticker.get('vol')
if vol_value is None or last_price is None or change_rate is None or vol is None: continue
dollar_volume = float(vol_value) if vol_value else 0
current_price = float(last_price) if last_price else 0
price_change = (float(change_rate) * 100) if change_rate else 0
volume_24h = float(vol) if vol else 0
if dollar_volume >= 50000 and current_price > 0:
volume_data.append({
'symbol': formatted_symbol, 'dollar_volume': dollar_volume,
'current_price': current_price, 'volume_24h': volume_24h,
'price_change_24h': price_change
})
except (ValueError, TypeError, KeyError) as e: continue
print(f"✅ تم معالجة {len(volume_data)} عملة في الطريقة المباشرة (لجلب الحجم)")
return volume_data
except Exception as e:
print(f"❌ خطأ في جلب بيانات الحجم المباشر: {e}")
return []
async def stream_ohlcv_data(self, symbols: List[Dict[str, Any]], queue: asyncio.Queue):
"""
(محدث V7.2)
جلب بيانات OHLCV كاملة (6 أطر زمنية) للعملات الناجحة فقط
"""
print(f"📊 بدء تدفق بيانات OHLCV (الكاملة) لـ {len(symbols)} عملة (الناجحين من الغربلة)...")
batch_size = 15
batches = [symbols[i:i + batch_size] for i in range(0, len(symbols), batch_size)]
total_successful = 0
for batch_num, batch in enumerate(batches):
print(f" 🔄 [المنتج] جلب الدفعة {batch_num + 1}/{len(batches)} ({len(batch)} عملة)...")
batch_tasks = []
# (V7.2 FIX)
for symbol_data in batch:
symbol_str = symbol_data['symbol']
task = asyncio.create_task(self._fetch_complete_ohlcv_parallel(symbol_str))
batch_tasks.append(task)
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
successful_data_for_batch = []
successful_count = 0
for i, result in enumerate(batch_results):
original_symbol_data = batch[i]
symbol_str = original_symbol_data['symbol']
if isinstance(result, Exception):
print(f" ❌ [المنتج] فشل جلب {symbol_str}: {result}")
elif result is not None:
result.update(original_symbol_data)
successful_data_for_batch.append(result)
successful_count += 1
timeframes_count = result.get('successful_timeframes', 0)
print(f" ✅ [المنتج] {symbol_str}: {timeframes_count}/6 أطر زمنية")
else:
print(f" ⚠️ [المنتج] {symbol_str}: بيانات غير كافية، تم التجاهل")
print(f" 📦 [المنتج] اكتملت الدفعة {batch_num + 1}: {successful_count}/{len(batch)} ناجحة")
if successful_data_for_batch:
try:
await queue.put(successful_data_for_batch)
print(f" 📬 [المنتج] تم إرسال {len(successful_data_for_batch)} عملة إلى طابور المعالجة")
total_successful += len(successful_data_for_batch)
except Exception as q_err:
print(f" ❌ [المنتج] فشل إرسال الدفعة للطابور: {q_err}")
if batch_num < len(batches) - 1:
await asyncio.sleep(1)
print(f"✅ [المنتج] اكتمل تدفق بيانات OHLCV (الكاملة). تم إرسال {total_successful} عملة للمعالجة.")
try:
await queue.put(None)
print(" 📬 [المنتج] تم إرسال إشارة الإنهاء (None) إلى الطابور.")
except Exception as q_err:
print(f" ❌ [المنتج] فشل إرسال إشارة الإنهاء (None) للطابور: {q_err}")
async def _fetch_complete_ohlcv_parallel(self, symbol: str) -> Dict[str, Any]:
"""(V7.2) جلب بيانات OHLCV كاملة - يتوقع 'symbol' كنص"""
try:
ohlcv_data = {}
timeframes = [
('5m', 200), ('15m', 200), ('1h', 200),
('4h', 200), ('1d', 200), ('1w', 200),
]
timeframe_tasks = []
for timeframe, limit in timeframes:
task = asyncio.create_task(self._fetch_single_timeframe_improved(symbol, timeframe, limit))
timeframe_tasks.append(task)
timeframe_results = await asyncio.gather(*timeframe_tasks, return_exceptions=True)
successful_timeframes = 0
min_required_timeframes = 2
for i, (timeframe, limit) in enumerate(timeframes):
result = timeframe_results[i]
if isinstance(result, Exception): continue
# (V8-MODIFICATION) زيادة الحد الأدنى من الشموع لاستيعاب المؤشرات
if result and len(result) >= 200: # (كان 10)
ohlcv_data[timeframe] = result
successful_timeframes += 1
# (V8-MODIFICATION) زيادة الحد الأدنى للأطر الزمنية
if successful_timeframes >= 3 and ohlcv_data: # (كان 2)
try:
current_price = await self.get_latest_price_async(symbol)
if current_price is None:
for timeframe_data in ohlcv_data.values():
if timeframe_data and len(timeframe_data) > 0:
last_candle = timeframe_data[-1]
if len(last_candle) >= 5:
current_price = last_candle[4]; break
if current_price is None: return None
result_data = {
'symbol': symbol, 'ohlcv': ohlcv_data, 'raw_ohlcv': ohlcv_data,
'current_price': current_price, 'timestamp': datetime.now().isoformat(),
'candles_count': {tf: len(data) for tf, data in ohlcv_data.items()},
'successful_timeframes': successful_timeframes
}
return result_data
except Exception as price_error: return None
else: return None
except Exception as e: return None
async def _fetch_single_timeframe_improved(self, symbol: str, timeframe: str, limit: int):
"""(V7.2) جلب بيانات إطار زمني واحد - يتوقع 'symbol' كنص"""
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
ohlcv_data = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
if ohlcv_data and len(ohlcv_data) > 0:
return ohlcv_data
else:
return []
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay * (attempt + 1))
else:
return []
async def get_latest_price_async(self, symbol):
"""(V7.2) جلب السعر الحالي - يتوقع 'symbol' كنص"""
try:
if not self.exchange: return None
if not symbol or '/' not in symbol: return None
ticker = self.exchange.fetch_ticker(symbol)
if not ticker: return None
current_price = ticker.get('last')
if current_price is None: return None
return float(current_price)
except Exception as e: return None
async def get_whale_data_for_symbol(self, symbol):
try:
if self.whale_monitor:
whale_data = await self.whale_monitor.get_symbol_whale_activity(symbol)
return whale_data
else: return None
except Exception as e: return None
async def get_whale_trading_signal(self, symbol, whale_data, market_context):
try:
if self.whale_monitor:
return await self.whale_monitor.generate_whale_trading_signal(symbol, whale_data, market_context)
else:
return {'action': 'HOLD', 'confidence': 0.3, 'reason': 'Whale monitor not available', 'source': 'whale_analysis'}
except Exception as e:
return {'action': 'HOLD', 'confidence': 0.3, 'reason': f'Error: {str(e)}', 'source': 'whale_analysis'}
print("✅ DataManager loaded - V7.3 (L1 Threshold @ 0.50)")