Trad / ML.py
Riy777's picture
Update ML.py
bb03264
raw
history blame
51.1 kB
import pandas as pd
import pandas_ta as ta
import numpy as np
from datetime import datetime
import asyncio
from data_manager import DataManager
class AdvancedTechnicalAnalyzer:
def __init__(self):
self.indicators_config = {
'trend': ['ema_9', 'ema_21', 'ema_50', 'ema_200', 'ichimoku', 'adx', 'parabolic_sar', 'dmi'],
'momentum': ['rsi', 'stoch_rsi', 'macd', 'williams_r', 'cci', 'awesome_oscillator', 'momentum'],
'volatility': ['bbands', 'atr', 'keltner', 'donchian', 'rvi'],
'volume': ['vwap', 'obv', 'mfi', 'volume_profile', 'ad', 'volume_oscillator'],
'cycle': ['hull_ma', 'supertrend', 'zigzag', 'fisher_transform']
}
def calculate_all_indicators(self, dataframe, timeframe):
if dataframe.empty: return {}
indicators = {}
indicators.update(self._calculate_trend_indicators(dataframe))
indicators.update(self._calculate_momentum_indicators(dataframe))
indicators.update(self._calculate_volatility_indicators(dataframe))
indicators.update(self._calculate_volume_indicators(dataframe))
indicators.update(self._calculate_cycle_indicators(dataframe))
return indicators
def _calculate_trend_indicators(self, dataframe):
trend = {}
if len(dataframe) >= 9: trend['ema_9'] = float(ta.ema(dataframe['close'], length=9).iloc[-1])
if len(dataframe) >= 21: trend['ema_21'] = float(ta.ema(dataframe['close'], length=21).iloc[-1])
if len(dataframe) >= 50: trend['ema_50'] = float(ta.ema(dataframe['close'], length=50).iloc[-1])
if len(dataframe) >= 200: trend['ema_200'] = float(ta.ema(dataframe['close'], length=200).iloc[-1])
if len(dataframe) >= 26:
ichimoku = ta.ichimoku(dataframe['high'], dataframe['low'], dataframe['close'])
if ichimoku is not None:
if not ichimoku[0]['ITS_9'].empty: trend['ichimoku_conversion'] = float(ichimoku[0]['ITS_9'].iloc[-1])
if not ichimoku[0]['IKS_26'].empty: trend['ichimoku_base'] = float(ichimoku[0]['IKS_26'].iloc[-1])
if not ichimoku[0]['ISA_9'].empty: trend['ichimoku_span_a'] = float(ichimoku[0]['ISA_9'].iloc[-1])
if not ichimoku[0]['ISB_26'].empty: trend['ichimoku_span_b'] = float(ichimoku[0]['ISB_26'].iloc[-1])
if len(dataframe) >= 14:
adx_result = ta.adx(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
if adx_result is not None:
if not adx_result['ADX_14'].empty: trend['adx'] = float(adx_result['ADX_14'].iloc[-1])
if not adx_result['DMP_14'].empty: trend['dmi_plus'] = float(adx_result['DMP_14'].iloc[-1])
if not adx_result['DMN_14'].empty: trend['dmi_minus'] = float(adx_result['DMN_14'].iloc[-1])
if len(dataframe) >= 5:
psar = ta.psar(dataframe['high'], dataframe['low'], dataframe['close'])
if psar is not None and not psar['PSARl_0.02_0.2'].empty: trend['psar'] = float(psar['PSARl_0.02_0.2'].iloc[-1])
return {key: value for key, value in trend.items() if value is not None}
def _calculate_momentum_indicators(self, dataframe):
momentum = {}
if len(dataframe) >= 14:
rsi = ta.rsi(dataframe['close'], length=14)
if not rsi.empty: momentum['rsi'] = float(rsi.iloc[-1])
if len(dataframe) >= 14:
stoch_rsi = ta.stochrsi(dataframe['close'], length=14)
if stoch_rsi is not None:
if not stoch_rsi['STOCHRSIk_14_14_3_3'].empty: momentum['stoch_rsi_k'] = float(stoch_rsi['STOCHRSIk_14_14_3_3'].iloc[-1])
if not stoch_rsi['STOCHRSId_14_14_3_3'].empty: momentum['stoch_rsi_d'] = float(stoch_rsi['STOCHRSId_14_14_3_3'].iloc[-1])
if len(dataframe) >= 26:
macd = ta.macd(dataframe['close'])
if macd is not None:
if not macd['MACD_12_26_9'].empty: momentum['macd_line'] = float(macd['MACD_12_26_9'].iloc[-1])
if not macd['MACDs_12_26_9'].empty: momentum['macd_signal'] = float(macd['MACDs_12_26_9'].iloc[-1])
if not macd['MACDh_12_26_9'].empty: momentum['macd_hist'] = float(macd['MACDh_12_26_9'].iloc[-1])
if len(dataframe) >= 14:
williams = ta.willr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
if not williams.empty: momentum['williams_r'] = float(williams.iloc[-1])
if len(dataframe) >= 20:
cci = ta.cci(dataframe['high'], dataframe['low'], dataframe['close'], length=20)
if not cci.empty: momentum['cci'] = float(cci.iloc[-1])
if len(dataframe) >= 34:
awesome_oscillator = ta.ao(dataframe['high'], dataframe['low'])
if not awesome_oscillator.empty: momentum['awesome_oscillator'] = float(awesome_oscillator.iloc[-1])
if len(dataframe) >= 10:
momentum_indicator = ta.mom(dataframe['close'], length=10)
if not momentum_indicator.empty: momentum['momentum'] = float(momentum_indicator.iloc[-1])
return {key: value for key, value in momentum.items() if value is not None}
def _calculate_volatility_indicators(self, dataframe):
volatility = {}
if len(dataframe) >= 20:
bollinger_bands = ta.bbands(dataframe['close'], length=20, std=2)
if bollinger_bands is not None:
if not bollinger_bands['BBU_20_2.0'].empty: volatility['bb_upper'] = float(bollinger_bands['BBU_20_2.0'].iloc[-1])
if not bollinger_bands['BBM_20_2.0'].empty: volatility['bb_middle'] = float(bollinger_bands['BBM_20_2.0'].iloc[-1])
if not bollinger_bands['BBL_20_2.0'].empty: volatility['bb_lower'] = float(bollinger_bands['BBL_20_2.0'].iloc[-1])
if all(key in volatility for key in ['bb_upper', 'bb_lower', 'bb_middle']): volatility['bb_width'] = (volatility['bb_upper'] - volatility['bb_lower']) / volatility['bb_middle']
if len(dataframe) >= 14:
average_true_range = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
if not average_true_range.empty:
volatility['atr'] = float(average_true_range.iloc[-1])
if volatility['atr']: volatility['atr_percent'] = volatility['atr'] / dataframe['close'].iloc[-1]
if len(dataframe) >= 20:
keltner_channel = ta.kc(dataframe['high'], dataframe['low'], dataframe['close'], length=20)
if keltner_channel is not None:
if not keltner_channel['KCUe_20_2'].empty: volatility['kc_upper'] = float(keltner_channel['KCUe_20_2'].iloc[-1])
if not keltner_channel['KCLe_20_2'].empty: volatility['kc_lower'] = float(keltner_channel['KCLe_20_2'].iloc[-1])
if len(dataframe) >= 20:
donchian_channel = ta.donchian(dataframe['high'], dataframe['low'], length=20)
if donchian_channel is not None:
if not donchian_channel['DCU_20_20'].empty: volatility['dc_upper'] = float(donchian_channel['DCU_20_20'].iloc[-1])
if not donchian_channel['DCL_20_20'].empty: volatility['dc_lower'] = float(donchian_channel['DCL_20_20'].iloc[-1])
if len(dataframe) >= 14:
relative_volatility_index = ta.rvi(dataframe['close'], length=14)
if not relative_volatility_index.empty: volatility['rvi'] = float(relative_volatility_index.iloc[-1])
return {key: value for key, value in volatility.items() if value is not None}
def _calculate_volume_indicators(self, dataframe):
volume = {}
if len(dataframe) >= 1:
volume_weighted_average_price = ta.vwap(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'])
if not volume_weighted_average_price.empty: volume['vwap'] = float(volume_weighted_average_price.iloc[-1])
on_balance_volume = ta.obv(dataframe['close'], dataframe['volume'])
if not on_balance_volume.empty: volume['obv'] = float(on_balance_volume.iloc[-1])
if len(dataframe) >= 14:
money_flow_index = ta.mfi(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'], length=14)
if not money_flow_index.empty: volume['mfi'] = float(money_flow_index.iloc[-1])
accumulation_distribution = ta.ad(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'])
if not accumulation_distribution.empty: volume['ad_line'] = float(accumulation_distribution.iloc[-1])
if len(dataframe) >= 20:
volume_oscillator = ta.pvo(dataframe['volume'])
if volume_oscillator is not None and not volume_oscillator['PVO_12_26_9'].empty: volume['volume_oscillator'] = float(volume_oscillator['PVO_12_26_9'].iloc[-1])
volume['volume_avg_20'] = float(dataframe['volume'].tail(20).mean()) if len(dataframe) >= 20 else None
if volume['volume_avg_20'] and volume['volume_avg_20'] > 0: volume['volume_ratio'] = float(dataframe['volume'].iloc[-1] / volume['volume_avg_20'])
return {key: value for key, value in volume.items() if value is not None}
def _calculate_cycle_indicators(self, dataframe):
cycle = {}
if len(dataframe) >= 9:
hull_moving_average = ta.hma(dataframe['close'], length=9)
if not hull_moving_average.empty: cycle['hull_ma'] = float(hull_moving_average.iloc[-1])
if len(dataframe) >= 10:
supertrend = ta.supertrend(dataframe['high'], dataframe['low'], dataframe['close'], length=10, multiplier=3)
if supertrend is not None:
if not supertrend['SUPERT_10_3.0'].empty: cycle['supertrend'] = float(supertrend['SUPERT_10_3.0'].iloc[-1])
if not supertrend['SUPERTd_10_3.0'].empty: cycle['supertrend_direction'] = float(supertrend['SUPERTd_10_3.0'].iloc[-1])
if len(dataframe) >= 10:
fisher_transform = ta.fisher(dataframe['high'], dataframe['low'], length=10)
if fisher_transform is not None and not fisher_transform['FISHERT_10_1'].empty: cycle['fisher_transform'] = float(fisher_transform['FISHERT_10_1'].iloc[-1])
return {key: value for key, value in cycle.items() if value is not None}
class PatternEnhancedStrategyEngine:
def __init__(self, data_manager, learning_engine):
self.data_manager = data_manager
self.learning_engine = learning_engine
async def enhance_strategy_with_patterns(self, strategy_scores, pattern_analysis, symbol):
if not pattern_analysis or pattern_analysis.get('pattern_detected') in ['no_clear_pattern', 'insufficient_data']: return strategy_scores
pattern_confidence = pattern_analysis.get('pattern_confidence', 0)
pattern_name = pattern_analysis.get('pattern_detected', '')
predicted_direction = pattern_analysis.get('predicted_direction', '')
if pattern_confidence >= 0.7:
enhancement_factor = self._calculate_pattern_enhancement(pattern_confidence, pattern_name)
enhanced_strategies = self._get_pattern_appropriate_strategies(pattern_name, predicted_direction)
for strategy in enhanced_strategies:
if strategy in strategy_scores:
strategy_scores[strategy] *= enhancement_factor
return strategy_scores
def _calculate_pattern_enhancement(self, pattern_confidence, pattern_name):
base_enhancement = 1.0 + (pattern_confidence * 0.5)
high_reliability_patterns = ['Double Top', 'Double Bottom', 'Head & Shoulders', 'Cup and Handle']
if pattern_name in high_reliability_patterns: base_enhancement *= 1.2
return min(base_enhancement, 2.0)
def _get_pattern_appropriate_strategies(self, pattern_name, direction):
reversal_patterns = ['Double Top', 'Double Bottom', 'Head & Shoulders', 'Triple Top', 'Triple Bottom']
continuation_patterns = ['Flags', 'Pennants', 'Triangles', 'Rectangles']
if pattern_name in reversal_patterns:
if direction == 'down': return ['breakout_momentum', 'trend_following']
else: return ['mean_reversion', 'breakout_momentum']
elif pattern_name in continuation_patterns: return ['trend_following', 'breakout_momentum']
else: return ['breakout_momentum', 'hybrid_ai']
class MultiStrategyEngine:
def __init__(self, data_manager, learning_engine):
self.data_manager = data_manager
self.learning_engine = learning_engine
self.pattern_enhancer = PatternEnhancedStrategyEngine(data_manager, learning_engine)
self.strategies = {
'trend_following': self._trend_following_strategy,
'mean_reversion': self._mean_reversion_strategy,
'breakout_momentum': self._breakout_momentum_strategy,
'volume_spike': self._volume_spike_strategy,
'whale_tracking': self._whale_tracking_strategy,
'pattern_recognition': self._pattern_recognition_strategy,
'hybrid_ai': self._hybrid_ai_strategy
}
async def evaluate_all_strategies(self, symbol_data, market_context):
try:
market_condition = market_context.get('market_trend', 'sideways_market')
if self.learning_engine and hasattr(self.learning_engine, 'initialized') and self.learning_engine.initialized:
try: optimized_weights = await self.learning_engine.get_optimized_strategy_weights(market_condition)
except Exception as e:
print(f"⚠️ Failed to get optimized weights: {e}")
optimized_weights = await self.get_default_weights()
else:
print("⚠️ Learning engine not available, using default weights")
optimized_weights = await self.get_default_weights()
strategy_scores = {}
base_scores = {}
for strategy_name, strategy_function in self.strategies.items():
try:
base_score = await strategy_function(symbol_data, market_context)
base_scores[strategy_name] = base_score
weight = optimized_weights.get(strategy_name, 0.1)
weighted_score = base_score * weight
strategy_scores[strategy_name] = min(weighted_score, 1.0)
except Exception as error:
print(f"⚠️ Strategy {strategy_name} failed: {error}")
base_score = await self._fallback_strategy_score(strategy_name, symbol_data, market_context)
base_scores[strategy_name] = base_score
strategy_scores[strategy_name] = base_score * optimized_weights.get(strategy_name, 0.1)
pattern_analysis = symbol_data.get('pattern_analysis')
if pattern_analysis: strategy_scores = await self.pattern_enhancer.enhance_strategy_with_patterns(strategy_scores, pattern_analysis, symbol_data.get('symbol'))
if base_scores:
best_strategy = max(base_scores.items(), key=lambda x: x[1])
best_strategy_name = best_strategy[0]
best_strategy_score = best_strategy[1]
symbol_data['recommended_strategy'] = best_strategy_name
symbol_data['strategy_confidence'] = best_strategy_score
if (pattern_analysis and pattern_analysis.get('pattern_confidence', 0) > 0.7 and self._is_strategy_pattern_aligned(best_strategy_name, pattern_analysis)):
pattern_bonus = pattern_analysis.get('pattern_confidence', 0) * 0.3
enhanced_confidence = min(best_strategy_score + pattern_bonus, 1.0)
symbol_data['strategy_confidence'] = enhanced_confidence
return strategy_scores, base_scores
except Exception as error:
print(f"❌ Strategy evaluation failed: {error}")
fallback_scores = await self.get_fallback_scores()
return fallback_scores, fallback_scores
def _is_strategy_pattern_aligned(self, strategy_name, pattern_analysis):
pattern_direction = pattern_analysis.get('predicted_direction', '')
pattern_type = pattern_analysis.get('pattern_detected', '')
bullish_strategies = ['trend_following', 'breakout_momentum']
bearish_strategies = ['mean_reversion', 'breakout_momentum']
if pattern_direction == 'up' and strategy_name in bullish_strategies: return True
elif pattern_direction == 'down' and strategy_name in bearish_strategies: return True
return False
async def get_default_weights(self):
return {'trend_following': 0.15, 'mean_reversion': 0.12,'breakout_momentum': 0.18, 'volume_spike': 0.10,'whale_tracking': 0.20, 'pattern_recognition': 0.15,'hybrid_ai': 0.10}
async def get_fallback_scores(self):
return {'trend_following': 0.5, 'mean_reversion': 0.5,'breakout_momentum': 0.5, 'volume_spike': 0.5,'whale_tracking': 0.5, 'pattern_recognition': 0.5,'hybrid_ai': 0.5}
async def _trend_following_strategy(self, symbol_data, market_context):
try:
score = 0.0
indicators = symbol_data.get('advanced_indicators', {})
timeframes = ['4h', '1h', '15m']
for timeframe in timeframes:
if timeframe in indicators:
timeframe_indicators = indicators[timeframe]
if self._check_ema_alignment(timeframe_indicators): score += 0.20
adx_value = timeframe_indicators.get('adx', 0)
if adx_value > 20: score += 0.15
volume_ratio = timeframe_indicators.get('volume_ratio', 0)
if volume_ratio > 1.2: score += 0.10
pattern_analysis = symbol_data.get('pattern_analysis')
if (pattern_analysis and pattern_analysis.get('pattern_confidence', 0) > 0.7 and pattern_analysis.get('predicted_direction') == 'up'):
pattern_bonus = pattern_analysis.get('pattern_confidence', 0) * 0.2
score += pattern_bonus
return min(score, 1.0)
except Exception as error:
print(f"⚠️ Trend following strategy error: {error}")
return 0.3
def _check_ema_alignment(self, indicators):
required_emas = ['ema_9', 'ema_21', 'ema_50']
if all(ema in indicators for ema in required_emas): return (indicators['ema_9'] > indicators['ema_21'] > indicators['ema_50'])
return False
async def _mean_reversion_strategy(self, symbol_data, market_context):
try:
score = 0.0
current_price = symbol_data['current_price']
indicators = symbol_data.get('advanced_indicators', {})
if '1h' in indicators:
hourly_indicators = indicators['1h']
if all(key in hourly_indicators for key in ['bb_upper', 'bb_lower', 'bb_middle']):
position_in_band = (current_price - hourly_indicators['bb_lower']) / (hourly_indicators['bb_upper'] - hourly_indicators['bb_lower'])
if position_in_band < 0.1 and hourly_indicators.get('rsi', 50) < 35: score += 0.45
if position_in_band > 0.9 and hourly_indicators.get('rsi', 50) > 65: score += 0.45
rsi_value = hourly_indicators.get('rsi', 50)
if rsi_value < 30: score += 0.35
elif rsi_value > 70: score += 0.35
pattern_analysis = symbol_data.get('pattern_analysis')
if (pattern_analysis and pattern_analysis.get('pattern_confidence', 0) > 0.7 and pattern_analysis.get('predicted_direction') in ['up', 'down']):
pattern_bonus = pattern_analysis.get('pattern_confidence', 0) * 0.15
score += pattern_bonus
return min(score, 1.0)
except Exception as error:
print(f"⚠️ Mean reversion strategy error: {error}")
return 0.3
async def _breakout_momentum_strategy(self, symbol_data, market_context):
try:
score = 0.0
indicators = symbol_data.get('advanced_indicators', {})
for timeframe in ['1h', '15m', '5m']:
if timeframe in indicators:
timeframe_indicators = indicators[timeframe]
volume_ratio = timeframe_indicators.get('volume_ratio', 0)
if volume_ratio > 1.8: score += 0.25
elif volume_ratio > 1.3: score += 0.15
if timeframe_indicators.get('macd_hist', 0) > 0: score += 0.20
if 'vwap' in timeframe_indicators and symbol_data['current_price'] > timeframe_indicators['vwap']: score += 0.15
rsi_value = timeframe_indicators.get('rsi', 50)
if 40 <= rsi_value <= 70: score += 0.10
pattern_analysis = symbol_data.get('pattern_analysis')
if pattern_analysis and pattern_analysis.get('pattern_confidence', 0) > 0.6:
pattern_bonus = pattern_analysis.get('pattern_confidence', 0) * 0.3
score += pattern_bonus
if score > 0.2: score = max(score, 0.4)
return min(score, 1.0)
except Exception as error:
print(f"⚠️ Breakout momentum strategy error: {error}")
return 0.4
async def _volume_spike_strategy(self, symbol_data, market_context):
try:
score = 0.0
indicators = symbol_data.get('advanced_indicators', {})
for timeframe in ['1h', '15m', '5m']:
if timeframe in indicators:
volume_ratio = indicators[timeframe].get('volume_ratio', 0)
if volume_ratio > 3.0: score += 0.45
elif volume_ratio > 2.0: score += 0.25
elif volume_ratio > 1.5: score += 0.15
pattern_analysis = symbol_data.get('pattern_analysis')
if (pattern_analysis and pattern_analysis.get('pattern_confidence', 0) > 0.7 and any(indicators[tf].get('volume_ratio', 0) > 2.0 for tf in ['1h', '15m'] if tf in indicators)):
pattern_bonus = pattern_analysis.get('pattern_confidence', 0) * 0.2
score += pattern_bonus
return min(score, 1.0)
except Exception as error:
print(f"⚠️ Volume spike strategy error: {error}")
return 0.3
async def _whale_tracking_strategy(self, symbol_data, market_context):
"""استراتيجية تتبع الحيتان المبسطة - تستخدم بيانات من WhaleMonitor مباشرة"""
try:
whale_data = symbol_data.get('whale_data', {})
if not whale_data.get('data_available', False):
return 0.2
# استخدام إشارة التداول المباشرة من WhaleMonitor
whale_signal = await self.data_manager.get_whale_trading_signal(
symbol_data['symbol'], whale_data, market_context
)
if whale_signal and whale_signal.get('action') != 'HOLD':
confidence = whale_signal.get('confidence', 0)
if whale_signal.get('action') in ['STRONG_BUY', 'BUY']:
return min(confidence * 1.2, 1.0)
elif whale_signal.get('action') in ['STRONG_SELL', 'SELL']:
return min(confidence * 0.8, 1.0)
# منطق احتياطي
total_transactions = whale_data.get('transfer_count', 0)
whale_volume = whale_data.get('total_volume', 0)
score = 0.0
if total_transactions >= 2:
score += 0.35
elif total_transactions >= 1:
score += 0.25
if whale_volume > 25000:
score += 0.25
elif whale_volume > 5000:
score += 0.15
general_whale = market_context.get('general_whale_activity', {})
if general_whale.get('data_available', False) and general_whale.get('transaction_count', 0) > 0:
score += 0.15
return min(score, 1.0)
except Exception as error:
print(f"⚠️ Whale tracking failed: {error}")
return 0.2
async def _pattern_recognition_strategy(self, symbol_data, market_context):
try:
score = 0.0
indicators = symbol_data.get('advanced_indicators', {})
pattern_analysis = symbol_data.get('pattern_analysis')
if pattern_analysis and pattern_analysis.get('pattern_confidence', 0) > 0.6:
score += pattern_analysis.get('pattern_confidence', 0) * 0.8
else:
for timeframe in ['4h', '1h']:
if timeframe in indicators:
timeframe_indicators = indicators[timeframe]
if (timeframe_indicators.get('rsi', 50) > 60 and timeframe_indicators.get('macd_hist', 0) > 0 and timeframe_indicators.get('volume_ratio', 0) > 1.5): score += 0.35
if (timeframe_indicators.get('rsi', 50) < 40 and timeframe_indicators.get('stoch_rsi_k', 50) < 20): score += 0.35
return min(score, 1.0)
except Exception as error:
print(f"⚠️ Pattern recognition strategy error: {error}")
return 0.3
async def _hybrid_ai_strategy(self, symbol_data, market_context):
try:
score = 0.0
monte_carlo_probability = symbol_data.get('monte_carlo_probability', 0.5)
final_score = symbol_data.get('final_score', 0.5)
score += monte_carlo_probability * 0.4
score += final_score * 0.3
if market_context.get('btc_sentiment') == 'BULLISH': score += 0.25
elif market_context.get('btc_sentiment') == 'BEARISH': score -= 0.08
whale_activity = market_context.get('general_whale_activity', {})
if whale_activity.get('sentiment') == 'BULLISH': score += 0.15
pattern_analysis = symbol_data.get('pattern_analysis')
if pattern_analysis and pattern_analysis.get('pattern_confidence', 0) > 0.7:
pattern_bonus = pattern_analysis.get('pattern_confidence', 0) * 0.25
score += pattern_bonus
return max(0.0, min(score, 1.0))
except Exception as error:
print(f"⚠️ Hybrid AI strategy error: {error}")
return 0.3
async def _fallback_strategy_score(self, strategy_name, symbol_data, market_context):
try:
base_score = symbol_data.get('final_score', 0.5)
if strategy_name == 'trend_following':
indicators = symbol_data.get('advanced_indicators', {})
if '1h' in indicators:
rsi_value = indicators['1h'].get('rsi', 50)
ema_9 = indicators['1h'].get('ema_9')
ema_21 = indicators['1h'].get('ema_21')
if ema_9 and ema_21 and ema_9 > ema_21 and 40 <= rsi_value <= 60: return 0.6
return 0.4
elif strategy_name == 'mean_reversion':
current_price = symbol_data.get('current_price', 0)
indicators = symbol_data.get('advanced_indicators', {})
if '1h' in indicators:
rsi_value = indicators['1h'].get('rsi', 50)
bb_lower = indicators['1h'].get('bb_lower')
if bb_lower and current_price <= bb_lower * 1.02 and rsi_value < 35: return 0.7
return 0.3
elif strategy_name == 'breakout_momentum':
volume_ratio = symbol_data.get('advanced_indicators', {}).get('1h', {}).get('volume_ratio', 0)
if volume_ratio > 1.5: return 0.6
return 0.4
elif strategy_name == 'whale_tracking':
whale_data = symbol_data.get('whale_data', {})
if not whale_data.get('data_available', False): return 0.2
total_transactions = whale_data.get('transfer_count', 0)
if total_transactions >= 3: return 0.5
return 0.3
return base_score
except Exception as error:
print(f"⚠️ Fallback strategy failed for {strategy_name}: {error}")
return 0.3
class MLProcessor:
def __init__(self, market_context, data_manager, learning_engine):
self.market_context = market_context
self.data_manager = data_manager
self.learning_engine = learning_engine
self.technical_analyzer = AdvancedTechnicalAnalyzer()
self.strategy_engine = MultiStrategyEngine(data_manager, learning_engine)
def _validate_rsi_safety(self, indicators):
rsi_warnings = []
critical_issues = 0
timeframes_to_check = ['5m', '15m', '1h', '4h']
for timeframe in timeframes_to_check:
if timeframe in indicators:
rsi_value = indicators[timeframe].get('rsi')
if rsi_value:
if rsi_value > 80:
rsi_warnings.append(f"🚨 RSI CRITICAL in {timeframe}: {rsi_value} - EXTREME OVERBOUGHT")
critical_issues += 1
elif rsi_value > 75: rsi_warnings.append(f"⚠️ RSI WARNING in {timeframe}: {rsi_value} - STRONG OVERBOUGHT")
elif rsi_value > 70: rsi_warnings.append(f"📈 RSI HIGH in {timeframe}: {rsi_value} - OVERBOUGHT")
is_safe = critical_issues < 2
return is_safe, rsi_warnings
def _validate_indicators_quality_enhanced(self, indicators, current_price):
quality_issues = []
rsi_safe, rsi_warnings = self._validate_rsi_safety(indicators)
if not rsi_safe: quality_issues.extend(rsi_warnings)
bullish_signals = bearish_signals = 0
for timeframe, data in indicators.items():
if data.get('macd_hist', 0) > 0: bullish_signals += 1
if data.get('rsi', 50) > 70: bearish_signals += 1
if 'ema_9' in data and 'ema_21' in data:
if data['ema_9'] > data['ema_21']: bullish_signals += 1
if bullish_signals > 0 and bearish_signals > bullish_signals: quality_issues.append("⚠️ Conflicting signals: More bearish than bullish indicators")
return quality_issues
def _calculate_enhanced_score_with_safety(self, base_analysis, strategy_scores, quality_issues):
base_score = base_analysis.get('final_score', 0.5)
strategy_average = sum(strategy_scores.values()) / len(strategy_scores) if strategy_scores else 0.5
safety_penalty = 0.0
for issue in quality_issues:
if '🚨 RSI CRITICAL' in issue: safety_penalty += 0.3
elif '⚠️ RSI WARNING' in issue: safety_penalty += 0.15
elif '📈 RSI HIGH' in issue: safety_penalty += 0.05
enhanced_score = (base_score * 0.4) + (strategy_average * 0.6)
enhanced_score = max(0.0, enhanced_score - safety_penalty)
return min(enhanced_score, 1.0)
async def process_and_score_symbol_enhanced(self, raw_data):
try:
if not raw_data or not raw_data.get('ohlcv'): return None
raw_data['raw_ohlcv'] = raw_data.get('ohlcv', {})
base_analysis = await self.process_and_score_symbol(raw_data)
if not base_analysis: return None
try:
current_price = base_analysis.get('current_price', 0)
quality_issues = self._validate_indicators_quality_enhanced(base_analysis.get('advanced_indicators', {}), current_price)
if quality_issues:
print(f"🔍 Quality issues for {base_analysis.get('symbol')}:")
for issue in quality_issues: print(f" {issue}")
if hasattr(self, 'strategy_engine') and self.strategy_engine:
strategy_scores, base_scores = await self.strategy_engine.evaluate_all_strategies(base_analysis, self.market_context)
base_analysis['strategy_scores'] = strategy_scores
base_analysis['base_strategy_scores'] = base_scores
if base_scores:
best_strategy = max(base_scores.items(), key=lambda x: x[1])
best_strategy_name = best_strategy[0]
best_strategy_score = best_strategy[1]
base_analysis['recommended_strategy'] = best_strategy_name
base_analysis['strategy_confidence'] = best_strategy_score
if best_strategy_score > 0.3: base_analysis['target_strategy'] = best_strategy_name
else: base_analysis['target_strategy'] = 'GENERIC'
else:
base_analysis['recommended_strategy'] = 'GENERIC'
base_analysis['strategy_confidence'] = 0.3
base_analysis['target_strategy'] = 'GENERIC'
enhanced_score = self._calculate_enhanced_score_with_safety(base_analysis, strategy_scores, quality_issues)
base_analysis['enhanced_final_score'] = enhanced_score
else:
base_analysis['strategy_scores'] = {}
base_analysis['enhanced_final_score'] = base_analysis.get('final_score', 0.5)
base_analysis['recommended_strategy'] = 'GENERIC'
base_analysis['strategy_confidence'] = 0.3
base_analysis['target_strategy'] = 'GENERIC'
base_analysis['quality_warnings'] = quality_issues
except Exception as strategy_error:
print(f"⚠️ Strategy evaluation failed for {base_analysis.get('symbol')}: {strategy_error}")
base_analysis['strategy_scores'] = {}
base_analysis['enhanced_final_score'] = base_analysis.get('final_score', 0.5)
base_analysis['recommended_strategy'] = 'GENERIC'
base_analysis['strategy_confidence'] = 0.3
base_analysis['target_strategy'] = 'GENERIC'
base_analysis['quality_warnings'] = ['Strategy evaluation failed']
return base_analysis
except Exception as error:
print(f"❌ Enhanced processing failed for {raw_data.get('symbol')}: {error}")
return await self.process_and_score_symbol(raw_data)
def _improve_fibonacci_levels(self, daily_dataframe, current_price):
if len(daily_dataframe) < 50: return {}
recent_high = float(daily_dataframe['high'].iloc[-50:].max())
recent_low = float(daily_dataframe['low'].iloc[-50:].min())
if current_price > recent_high: recent_high = current_price * 1.05
if current_price < recent_low: recent_low = current_price * 0.95
difference = recent_high - recent_low
if difference <= 0: return {}
return {
"0.0%": recent_high, "23.6%": recent_high - 0.236 * difference,
"38.2%": recent_high - 0.382 * difference, "50.0%": recent_high - 0.50 * difference,
"61.8%": recent_high - 0.618 * difference, "78.6%": recent_high - 0.786 * difference,
"100.0%": recent_low
}
async def process_and_score_symbol(self, raw_data):
symbol = raw_data['symbol']
ohlcv_data = raw_data['ohlcv']
reasons_for_candidacy = raw_data.get('reasons', [])
if not ohlcv_data: return None
try:
all_indicators = {}
for timeframe, candles in ohlcv_data.items():
if candles:
dataframe = pd.DataFrame(candles, columns=['time', 'open', 'high', 'low', 'close', 'volume'])
dataframe[['open', 'high', 'low', 'close', 'volume']] = dataframe[['open', 'high', 'low', 'close', 'volume']].astype(float)
all_indicators[timeframe] = self._calculate_indicators(dataframe, timeframe)
hourly_dataframe = pd.DataFrame(ohlcv_data.get('1h', []), columns=['time', 'open', 'high', 'low', 'close', 'volume'])
if hourly_dataframe.empty: return None
hourly_dataframe[['open', 'high', 'low', 'close', 'volume']] = hourly_dataframe[['open', 'high', 'low', 'close', 'volume']].astype(float)
try:
current_price = float(hourly_dataframe['close'].iloc[-1])
if ohlcv_data.get('5m'):
five_minute_dataframe = pd.DataFrame(ohlcv_data['5m'], columns=['time', 'open', 'high', 'low', 'close', 'volume'])
if not five_minute_dataframe.empty:
five_minute_dataframe[['open', 'high', 'low', 'close', 'volume']] = five_minute_dataframe[['open', 'high', 'low', 'close', 'volume']].astype(float)
current_price = float(five_minute_dataframe['close'].iloc[-1])
liquidity_score = self._calculate_liquidity_score(hourly_dataframe)
daily_dataframe = pd.DataFrame(ohlcv_data.get('1d', []), columns=['time', 'open', 'high', 'low', 'close', 'volume'])
if not daily_dataframe.empty: daily_dataframe[['open', 'high', 'low', 'close', 'volume']] = daily_dataframe[['open', 'high', 'low', 'close', 'volume']].astype(float)
average_daily_volume = float(daily_dataframe['volume'].mean()) if not daily_dataframe.empty else 0.0
fibonacci_levels = self._improve_fibonacci_levels(daily_dataframe, current_price)
try: whale_data = await self.data_manager.get_symbol_specific_whale_data(symbol)
except Exception as whale_error:
print(f"⚠️ Whale data failed for {symbol}: {whale_error}.")
whale_data = {"transfer_count": 0, "total_volume": 0, "source": "no_data", "data_available": False}
# استخدام دالة حساب درجة الحيتان من WhaleMonitor مباشرة
whale_score = await self.data_manager.whale_monitor._calculate_whale_activity_score(whale_data)
opportunity_classification = self.classify_opportunity_type(all_indicators, current_price)
initial_score = self._calculate_initial_score(all_indicators, current_price, self.market_context)
monte_carlo_probability = self._run_monte_carlo_simulation(hourly_dataframe)
final_score = (0.35 * initial_score) + (0.50 * monte_carlo_probability) + (0.15 * whale_score)
final_score *= opportunity_classification['confidence']
normalized_indicators = {timeframe: self._normalize_features_corrected(indicators) for timeframe, indicators in all_indicators.items()}
return {
'symbol': symbol, 'reasons_for_candidacy': reasons_for_candidacy, 'current_price': float(current_price),
'liquidity_score': float(liquidity_score) if not np.isnan(liquidity_score) else 0.0, 'avg_daily_volume': float(average_daily_volume),
'whale_data': whale_data, 'whale_score': float(whale_score), 'opportunity_type': opportunity_classification,
'sentiment_data': self.market_context, 'fibonacci_levels': fibonacci_levels, 'final_score': float(final_score),
'initial_score': float(initial_score), 'monte_carlo_probability': float(monte_carlo_probability),
'indicators': normalized_indicators, 'advanced_indicators': all_indicators, 'strategy_scores': {},
'recommended_strategy': 'GENERIC', 'enhanced_final_score': float(final_score), 'target_strategy': 'GENERIC',
'raw_ohlcv': ohlcv_data
}
except (KeyError, IndexError) as error: return None
except Exception as error:
print(f"❌ Failed to process {symbol}: {error}")
return None
def _calculate_indicators(self, dataframe, timeframe):
indicators = {}
if dataframe.empty: return indicators
if not isinstance(dataframe.index, pd.DatetimeIndex):
try:
dataframe['time'] = pd.to_datetime(dataframe['time'], unit='ms')
dataframe = dataframe.set_index('time', drop=True)
except:
dataframe['time'] = pd.to_datetime(dataframe['time'])
dataframe = dataframe.set_index('time', drop=True)
dataframe = dataframe.sort_index()
if len(dataframe) >= 1 and all(column in dataframe.columns for column in ['high', 'low', 'close', 'volume']):
try:
typical_price = (dataframe['high'] + dataframe['low'] + dataframe['close']) / 3
volume_weighted_average_price = (typical_price * dataframe['volume']).cumsum() / dataframe['volume'].cumsum()
if not volume_weighted_average_price.empty and not pd.isna(volume_weighted_average_price.iloc[-1]): indicators['vwap'] = float(volume_weighted_average_price.iloc[-1])
except Exception as error: pass
if len(dataframe) >= 14:
rsi_series = ta.rsi(dataframe['close'], length=14)
if rsi_series is not None and not rsi_series.empty and rsi_series.iloc[-1] is not np.nan: indicators['rsi'] = float(rsi_series.iloc[-1])
if len(dataframe) >= 26:
macd = ta.macd(dataframe['close'])
if macd is not None and not macd.empty:
if 'MACDh_12_26_9' in macd.columns and macd['MACDh_12_26_9'].iloc[-1] is not np.nan: indicators['macd_hist'] = float(macd['MACDh_12_26_9'].iloc[-1])
if 'MACD_12_26_9' in macd.columns and macd['MACD_12_26_9'].iloc[-1] is not np.nan: indicators['macd_line'] = float(macd['MACD_12_26_9'].iloc[-1])
if 'MACDs_12_26_9' in macd.columns and macd['MACDs_12_26_9'].iloc[-1] is not np.nan: indicators['macd_signal'] = float(macd['MACDs_12_26_9'].iloc[-1])
if len(dataframe) >= 20:
bollinger_bands = ta.bbands(dataframe['close'], length=20, std=2)
if bollinger_bands is not None and not bollinger_bands.empty:
if 'BBL_20_2.0' in bollinger_bands.columns and bollinger_bands['BBL_20_2.0'].iloc[-1] is not np.nan: indicators['bb_lower'] = float(bollinger_bands['BBL_20_2.0'].iloc[-1])
if 'BBU_20_2.0' in bollinger_bands.columns and bollinger_bands['BBU_20_2.0'].iloc[-1] is not np.nan: indicators['bb_upper'] = float(bollinger_bands['BBU_20_2.0'].iloc[-1])
if 'BBM_20_2.0' in bollinger_bands.columns and bollinger_bands['BBM_20_2.0'].iloc[-1] is not np.nan: indicators['bb_middle'] = float(bollinger_bands['BBM_20_2.0'].iloc[-1])
if len(dataframe) >= 14:
average_true_range = ta.atr(high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], length=14)
if average_true_range is not None and not average_true_range.empty and average_true_range.iloc[-1] is not np.nan: indicators['atr'] = float(average_true_range.iloc[-1])
if len(dataframe) >= 26:
ema_12 = ta.ema(dataframe['close'], length=12)
ema_26 = ta.ema(dataframe['close'], length=26)
if ema_12 is not None and not ema_12.empty and ema_12.iloc[-1] is not np.nan: indicators['ema_12'] = float(ema_12.iloc[-1])
if ema_26 is not None and not ema_26.empty and ema_26.iloc[-1] is not np.nan: indicators['ema_26'] = float(ema_26.iloc[-1])
return indicators
def _normalize_features_corrected(self, features):
normalized_features = {}
for key, value in features.items():
if value is None: normalized_features[key] = 0.0; continue
if key == 'rsi': normalized_features[key] = max(0, min(100, value))
elif key in ['macd_hist', 'macd_line', 'macd_signal', 'vwap', 'atr']: normalized_features[key] = value
elif 'ema' in key or 'bb_' in key: normalized_features[key] = value
else:
try:
if abs(value) > 1000: normalized_features[key] = value / 1000
else: normalized_features[key] = value
except: normalized_features[key] = value
return normalized_features
def _run_monte_carlo_simulation(self, dataframe, number_of_simulations=1000, number_of_steps=20):
if dataframe.empty or len(dataframe) < 2: return 0.0
log_returns = np.log(dataframe['close'] / dataframe['close'].shift(1)).dropna()
if log_returns.empty: return 0.0
mean_return = log_returns.mean()
volatility = log_returns.std()
initial_price = dataframe['close'].iloc[-1]
success_count = 0
for _ in range(number_of_simulations):
random_values = np.random.normal(0, 1, number_of_steps)
daily_returns = np.exp(mean_return - 0.5 * volatility**2 + volatility * random_values)
simulated_prices = initial_price * daily_returns.cumprod()
if (simulated_prices[-1] / initial_price) > 1.02: success_count += 1
return success_count / number_of_simulations
def _calculate_initial_score(self, indicators, current_price, market_context):
score = 0.5
fast_timeframes = ['5m', '15m']
for timeframe in fast_timeframes:
timeframe_indicators = indicators.get(timeframe, {})
if not timeframe_indicators: continue
if 'rsi' in timeframe_indicators:
rsi_value = timeframe_indicators['rsi']
if isinstance(rsi_value, (int, float)):
if rsi_value < 30: score += 0.2
elif rsi_value < 40: score += 0.1
elif rsi_value > 70: score -= 0.1
if 'macd_hist' in timeframe_indicators and timeframe_indicators['macd_hist'] > 0: score += 0.15
if all(key in timeframe_indicators for key in ['ema_12', 'ema_26']):
if timeframe_indicators['ema_12'] > timeframe_indicators['ema_26']: score += 0.15
slow_timeframes = ['1h', '4h', '1d']
for timeframe in slow_timeframes:
timeframe_indicators = indicators.get(timeframe, {})
if not timeframe_indicators: continue
if all(key in timeframe_indicators for key in ['ema_12', 'ema_26']):
if timeframe_indicators['ema_12'] > timeframe_indicators['ema_26']: score += 0.10
if all(key in timeframe_indicators for key in ['bb_upper', 'bb_lower']):
if current_price > timeframe_indicators['bb_upper']: score += 0.10
elif current_price <= timeframe_indicators['bb_lower']: score += 0.05
if '5m' in indicators and 'vwap' in indicators['5m'] and current_price > indicators['5m']['vwap']: score += 0.10
if market_context:
bitcoin_sentiment = market_context.get('btc_sentiment')
fear_greed_index = market_context.get('fear_and_greed_index', 50)
if bitcoin_sentiment == 'BULLISH' and fear_greed_index > 60: score *= 1.2
elif bitcoin_sentiment == 'BEARISH' or fear_greed_index < 30: score *= 0.8
return min(max(score, 0.0), 1.0)
def _normalize_features(self, features): return self._normalize_features_corrected(features)
def _prepare_data_for_ml(self, all_indicators, current_price):
feature_vector = []
timeframes = ['5m', '15m', '1h', '4h', '1d']
indicator_keys = ['rsi', 'macd_hist', 'macd_line', 'bb_upper', 'bb_lower', 'atr', 'ema_12', 'ema_26', 'vwap']
for timeframe in timeframes:
timeframe_indicators = all_indicators.get(timeframe, {})
for key in indicator_keys: feature_vector.append(timeframe_indicators.get(key, 0.0))
feature_vector.append(current_price)
return feature_vector
def _calculate_liquidity_score(self, hourly_dataframe):
if hourly_dataframe.empty: return 0.0
hourly_dataframe['dollar_volume'] = hourly_dataframe['volume'] * hourly_dataframe['close']
return float(hourly_dataframe['dollar_volume'].mean())
def _calculate_fibonacci_levels(self, daily_dataframe): return self._improve_fibonacci_levels(daily_dataframe, 0)
def classify_opportunity_type(self, indicators, current_price):
fast_signals = slow_signals = 0
for timeframe in ['5m', '15m']:
timeframe_indicators = indicators.get(timeframe, {})
if not timeframe_indicators: continue
if timeframe_indicators.get('rsi', 100) < 35: fast_signals += 1
if timeframe_indicators.get('macd_hist', 0) > 0: fast_signals += 1
if all(key in timeframe_indicators for key in ['ema_12', 'ema_26']):
if timeframe_indicators['ema_12'] > timeframe_indicators['ema_26']: fast_signals += 1
if timeframe == '5m' and timeframe_indicators.get('vwap') and current_price > timeframe_indicators['vwap'] * 1.02: fast_signals += 1
for timeframe in ['1h', '4h', '1d']:
timeframe_indicators = indicators.get(timeframe, {})
if not timeframe_indicators: continue
if 40 <= timeframe_indicators.get('rsi', 50) <= 60: slow_signals += 1
if all(key in timeframe_indicators for key in ['ema_12', 'ema_26']):
if timeframe_indicators['ema_12'] > timeframe_indicators['ema_26']: slow_signals += 1
if timeframe_indicators.get('bb_middle') and current_price > timeframe_indicators['bb_middle']: slow_signals += 1
if fast_signals >= 3:
return {
"type": "FAST_PUMP", "timeframe": "15m-1h", "take_profit_multiplier": 1.08, "stop_loss_multiplier": 0.97,
"confidence": min(fast_signals / 6.0, 1.0), "description": "Strong fast pump opportunity on short timeframes"
}
elif slow_signals >= 3:
return {
"type": "SLOW_GROWTH", "timeframe": "4h-1d", "take_profit_multiplier": 1.05, "stop_loss_multiplier": 0.95,
"confidence": min(slow_signals / 6.0, 1.0), "description": "Sustainable growth opportunity on long timeframes"
}
return {
"type": "NEUTRAL", "timeframe": "N/A", "take_profit_multiplier": 1.05, "stop_loss_multiplier": 0.95,
"confidence": 0.3, "description": "No clear signals for specific opportunity type"
}
def filter_top_candidates(self, candidates, number_of_candidates=10):
valid_candidates = [candidate for candidate in candidates if candidate is not None]
return sorted(valid_candidates, key=lambda candidate: candidate.get('enhanced_final_score', candidate.get('final_score', 0)), reverse=True)[:number_of_candidates]
print("✅ Enhanced ML System Loaded - Integrated with Learning Engine - REAL DATA ONLY - Optimized Strategy Scoring with Pattern Enhancement")