Trad / ML.py
Riy777's picture
Update ML.py
ee8c551
raw
history blame
61.3 kB
import pandas as pd
import pandas_ta as ta
import numpy as np
from datetime import datetime
import asyncio
class AdvancedTechnicalAnalyzer:
def __init__(self):
self.indicators_config = {
'trend': ['ema_9', 'ema_21', 'ema_50', 'ema_200', 'ichimoku', 'adx', 'parabolic_sar', 'dmi'],
'momentum': ['rsi', 'stoch_rsi', 'macd', 'williams_r', 'cci', 'awesome_oscillator', 'momentum'],
'volatility': ['bbands', 'atr', 'keltner', 'donchian', 'rvi'],
'volume': ['vwap', 'obv', 'mfi', 'volume_profile', 'ad', 'volume_oscillator'],
'cycle': ['hull_ma', 'supertrend', 'zigzag', 'fisher_transform']
}
def calculate_all_indicators(self, dataframe, timeframe):
if dataframe.empty: return {}
indicators = {}
indicators.update(self._calculate_trend_indicators(dataframe))
indicators.update(self._calculate_momentum_indicators(dataframe))
indicators.update(self._calculate_volatility_indicators(dataframe))
indicators.update(self._calculate_volume_indicators(dataframe))
indicators.update(self._calculate_cycle_indicators(dataframe))
return indicators
def _calculate_trend_indicators(self, dataframe):
trend = {}
if len(dataframe) >= 9: trend['ema_9'] = float(ta.ema(dataframe['close'], length=9).iloc[-1])
if len(dataframe) >= 21: trend['ema_21'] = float(ta.ema(dataframe['close'], length=21).iloc[-1])
if len(dataframe) >= 50: trend['ema_50'] = float(ta.ema(dataframe['close'], length=50).iloc[-1])
if len(dataframe) >= 200: trend['ema_200'] = float(ta.ema(dataframe['close'], length=200).iloc[-1])
if len(dataframe) >= 26:
ichimoku = ta.ichimoku(dataframe['high'], dataframe['low'], dataframe['close'])
if ichimoku is not None:
if not ichimoku[0]['ITS_9'].empty: trend['ichimoku_conversion'] = float(ichimoku[0]['ITS_9'].iloc[-1])
if not ichimoku[0]['IKS_26'].empty: trend['ichimoku_base'] = float(ichimoku[0]['IKS_26'].iloc[-1])
if not ichimoku[0]['ISA_9'].empty: trend['ichimoku_span_a'] = float(ichimoku[0]['ISA_9'].iloc[-1])
if not ichimoku[0]['ISB_26'].empty: trend['ichimoku_span_b'] = float(ichimoku[0]['ISB_26'].iloc[-1])
if len(dataframe) >= 14:
adx_result = ta.adx(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
if adx_result is not None:
if not adx_result['ADX_14'].empty: trend['adx'] = float(adx_result['ADX_14'].iloc[-1])
if not adx_result['DMP_14'].empty: trend['dmi_plus'] = float(adx_result['DMP_14'].iloc[-1])
if not adx_result['DMN_14'].empty: trend['dmi_minus'] = float(adx_result['DMN_14'].iloc[-1])
if len(dataframe) >= 5:
psar = ta.psar(dataframe['high'], dataframe['low'], dataframe['close'])
if psar is not None and not psar['PSARl_0.02_0.2'].empty: trend['psar'] = float(psar['PSARl_0.02_0.2'].iloc[-1])
return {key: value for key, value in trend.items() if value is not None}
def _calculate_momentum_indicators(self, dataframe):
momentum = {}
if len(dataframe) >= 14:
rsi = ta.rsi(dataframe['close'], length=14)
if not rsi.empty: momentum['rsi'] = float(rsi.iloc[-1])
if len(dataframe) >= 14:
stoch_rsi = ta.stochrsi(dataframe['close'], length=14)
if stoch_rsi is not None:
if not stoch_rsi['STOCHRSIk_14_14_3_3'].empty: momentum['stoch_rsi_k'] = float(stoch_rsi['STOCHRSIk_14_14_3_3'].iloc[-1])
if not stoch_rsi['STOCHRSId_14_14_3_3'].empty: momentum['stoch_rsi_d'] = float(stoch_rsi['STOCHRSId_14_14_3_3'].iloc[-1])
if len(dataframe) >= 26:
macd = ta.macd(dataframe['close'])
if macd is not None:
if not macd['MACD_12_26_9'].empty: momentum['macd_line'] = float(macd['MACD_12_26_9'].iloc[-1])
if not macd['MACDs_12_26_9'].empty: momentum['macd_signal'] = float(macd['MACDs_12_26_9'].iloc[-1])
if not macd['MACDh_12_26_9'].empty: momentum['macd_hist'] = float(macd['MACDh_12_26_9'].iloc[-1])
if len(dataframe) >= 14:
williams = ta.willr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
if not williams.empty: momentum['williams_r'] = float(williams.iloc[-1])
if len(dataframe) >= 20:
cci = ta.cci(dataframe['high'], dataframe['low'], dataframe['close'], length=20)
if not cci.empty: momentum['cci'] = float(cci.iloc[-1])
if len(dataframe) >= 34:
awesome_oscillator = ta.ao(dataframe['high'], dataframe['low'])
if not awesome_oscillator.empty: momentum['awesome_oscillator'] = float(awesome_oscillator.iloc[-1])
if len(dataframe) >= 10:
momentum_indicator = ta.mom(dataframe['close'], length=10)
if not momentum_indicator.empty: momentum['momentum'] = float(momentum_indicator.iloc[-1])
return {key: value for key, value in momentum.items() if value is not None}
def _calculate_volatility_indicators(self, dataframe):
volatility = {}
if len(dataframe) >= 20:
bollinger_bands = ta.bbands(dataframe['close'], length=20, std=2)
if bollinger_bands is not None:
if not bollinger_bands['BBU_20_2.0'].empty: volatility['bb_upper'] = float(bollinger_bands['BBU_20_2.0'].iloc[-1])
if not bollinger_bands['BBM_20_2.0'].empty: volatility['bb_middle'] = float(bollinger_bands['BBM_20_2.0'].iloc[-1])
if not bollinger_bands['BBL_20_2.0'].empty: volatility['bb_lower'] = float(bollinger_bands['BBL_20_2.0'].iloc[-1])
if all(key in volatility for key in ['bb_upper', 'bb_lower', 'bb_middle']): volatility['bb_width'] = (volatility['bb_upper'] - volatility['bb_lower']) / volatility['bb_middle']
if len(dataframe) >= 14:
average_true_range = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
if not average_true_range.empty:
volatility['atr'] = float(average_true_range.iloc[-1])
if volatility['atr']: volatility['atr_percent'] = volatility['atr'] / dataframe['close'].iloc[-1]
if len(dataframe) >= 20:
keltner_channel = ta.kc(dataframe['high'], dataframe['low'], dataframe['close'], length=20)
if keltner_channel is not None:
if not keltner_channel['KCUe_20_2'].empty: volatility['kc_upper'] = float(keltner_channel['KCUe_20_2'].iloc[-1])
if not keltner_channel['KCLe_20_2'].empty: volatility['kc_lower'] = float(keltner_channel['KCLe_20_2'].iloc[-1])
if len(dataframe) >= 20:
donchian_channel = ta.donchian(dataframe['high'], dataframe['low'], length=20)
if donchian_channel is not None:
if not donchian_channel['DCU_20_20'].empty: volatility['dc_upper'] = float(donchian_channel['DCU_20_20'].iloc[-1])
if not donchian_channel['DCL_20_20'].empty: volatility['dc_lower'] = float(donchian_channel['DCL_20_20'].iloc[-1])
if len(dataframe) >= 14:
relative_volatility_index = ta.rvi(dataframe['close'], length=14)
if not relative_volatility_index.empty: volatility['rvi'] = float(relative_volatility_index.iloc[-1])
return {key: value for key, value in volatility.items() if value is not None}
def _calculate_volume_indicators(self, dataframe):
volume = {}
if len(dataframe) >= 1:
volume_weighted_average_price = ta.vwap(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'])
if not volume_weighted_average_price.empty: volume['vwap'] = float(volume_weighted_average_price.iloc[-1])
on_balance_volume = ta.obv(dataframe['close'], dataframe['volume'])
if not on_balance_volume.empty: volume['obv'] = float(on_balance_volume.iloc[-1])
if len(dataframe) >= 14:
money_flow_index = ta.mfi(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'], length=14)
if not money_flow_index.empty: volume['mfi'] = float(money_flow_index.iloc[-1])
accumulation_distribution = ta.ad(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'])
if not accumulation_distribution.empty: volume['ad_line'] = float(accumulation_distribution.iloc[-1])
if len(dataframe) >= 20:
volume_oscillator = ta.pvo(dataframe['volume'])
if volume_oscillator is not None and not volume_oscillator['PVO_12_26_9'].empty: volume['volume_oscillator'] = float(volume_oscillator['PVO_12_26_9'].iloc[-1])
volume['volume_avg_20'] = float(dataframe['volume'].tail(20).mean()) if len(dataframe) >= 20 else None
if volume['volume_avg_20'] and volume['volume_avg_20'] > 0: volume['volume_ratio'] = float(dataframe['volume'].iloc[-1] / volume['volume_avg_20'])
return {key: value for key, value in volume.items() if value is not None}
def _calculate_cycle_indicators(self, dataframe):
cycle = {}
if len(dataframe) >= 9:
hull_moving_average = ta.hma(dataframe['close'], length=9)
if not hull_moving_average.empty: cycle['hull_ma'] = float(hull_moving_average.iloc[-1])
if len(dataframe) >= 10:
supertrend = ta.supertrend(dataframe['high'], dataframe['low'], dataframe['close'], length=10, multiplier=3)
if supertrend is not None:
if not supertrend['SUPERT_10_3.0'].empty: cycle['supertrend'] = float(supertrend['SUPERT_10_3.0'].iloc[-1])
if not supertrend['SUPERTd_10_3.0'].empty: cycle['supertrend_direction'] = float(supertrend['SUPERTd_10_3.0'].iloc[-1])
if len(dataframe) >= 10:
fisher_transform = ta.fisher(dataframe['high'], dataframe['low'], length=10)
if fisher_transform is not None and not fisher_transform['FISHERT_10_1'].empty: cycle['fisher_transform'] = float(fisher_transform['FISHERT_10_1'].iloc[-1])
return {key: value for key, value in cycle.items() if value is not None}
class PatternEnhancedStrategyEngine:
def __init__(self, data_manager, learning_engine):
self.data_manager = data_manager
self.learning_engine = learning_engine
async def enhance_strategy_with_patterns(self, strategy_scores, pattern_analysis, symbol):
"""تعزيز الاستراتيجيات بناءً على الأنماط المكتشفة (لا يتم إرسالها للنموذج الضخم)"""
if not pattern_analysis or pattern_analysis.get('pattern_detected') in ['no_clear_pattern', 'insufficient_data']:
return strategy_scores
pattern_confidence = pattern_analysis.get('pattern_confidence', 0)
pattern_name = pattern_analysis.get('pattern_detected', '')
predicted_direction = pattern_analysis.get('predicted_direction', '')
if pattern_confidence >= 0.6: # تخفيض العتبة لزيادة الفعالية
enhancement_factor = self._calculate_pattern_enhancement(pattern_confidence, pattern_name)
enhanced_strategies = self._get_pattern_appropriate_strategies(pattern_name, predicted_direction)
print(f"🎯 تعزيز استراتيجيات {symbol} بناءً على نمط {pattern_name} (ثقة: {pattern_confidence:.2f})")
for strategy in enhanced_strategies:
if strategy in strategy_scores:
original_score = strategy_scores[strategy]
strategy_scores[strategy] = min(original_score * enhancement_factor, 1.0)
print(f" 📈 {strategy}: {original_score:.3f}{strategy_scores[strategy]:.3f}")
return strategy_scores
def _calculate_pattern_enhancement(self, pattern_confidence, pattern_name):
"""حساب عامل التعزيز بناءً على ثقة النمط ونوعه"""
base_enhancement = 1.0 + (pattern_confidence * 0.3) # تخفيض عامل التعزيز
high_reliability_patterns = ['Double Top', 'Double Bottom', 'Head & Shoulders', 'Cup and Handle']
if pattern_name in high_reliability_patterns:
base_enhancement *= 1.1 # تخفيض التعزيز للأنماط عالية الموثوقية
return min(base_enhancement, 1.5) # تحديد أقصى تعزيز
def _get_pattern_appropriate_strategies(self, pattern_name, direction):
"""تحديد الاستراتيجيات المناسبة للنمط المكتشف"""
reversal_patterns = ['Double Top', 'Double Bottom', 'Head & Shoulders', 'Triple Top', 'Triple Bottom']
continuation_patterns = ['Flags', 'Pennants', 'Triangles', 'Rectangles']
if pattern_name in reversal_patterns:
if direction == 'down':
return ['breakout_momentum', 'trend_following'] # استراتيجيات للهبوط
else:
return ['mean_reversion', 'breakout_momentum'] # استراتيجيات للصعود
elif pattern_name in continuation_patterns:
return ['trend_following', 'breakout_momentum'] # استراتيجيات لاستمرار الاتجاه
else:
return ['breakout_momentum', 'hybrid_ai'] # استراتيجيات عامة
class MultiStrategyEngine:
def __init__(self, data_manager, learning_engine):
self.data_manager = data_manager
self.learning_engine = learning_engine
self.pattern_enhancer = PatternEnhancedStrategyEngine(data_manager, learning_engine)
self.strategies = {
'trend_following': self._trend_following_strategy,
'mean_reversion': self._mean_reversion_strategy,
'breakout_momentum': self._breakout_momentum_strategy,
'volume_spike': self._volume_spike_strategy,
'whale_tracking': self._whale_tracking_strategy,
'pattern_recognition': self._pattern_recognition_strategy,
'hybrid_ai': self._hybrid_ai_strategy
}
async def evaluate_all_strategies(self, symbol_data, market_context):
try:
market_condition = market_context.get('market_trend', 'sideways_market')
if self.learning_engine and hasattr(self.learning_engine, 'initialized') and self.learning_engine.initialized:
try:
optimized_weights = await self.learning_engine.get_optimized_strategy_weights(market_condition)
except Exception as e:
optimized_weights = await self.get_default_weights()
else:
optimized_weights = await self.get_default_weights()
strategy_scores = {}
base_scores = {}
for strategy_name, strategy_function in self.strategies.items():
try:
base_score = await strategy_function(symbol_data, market_context)
base_scores[strategy_name] = base_score
weight = optimized_weights.get(strategy_name, 0.1)
weighted_score = base_score * weight
strategy_scores[strategy_name] = min(weighted_score, 1.0)
except Exception as error:
base_score = await self._fallback_strategy_score(strategy_name, symbol_data, market_context)
base_scores[strategy_name] = base_score
strategy_scores[strategy_name] = base_score * optimized_weights.get(strategy_name, 0.1)
# تطبيق تعزيز الأنماط (لا يتم إرسالها للنموذج الضخم)
pattern_analysis = symbol_data.get('pattern_analysis')
if pattern_analysis:
strategy_scores = await self.pattern_enhancer.enhance_strategy_with_patterns(
strategy_scores, pattern_analysis, symbol_data.get('symbol')
)
if base_scores:
best_strategy = max(base_scores.items(), key=lambda x: x[1])
best_strategy_name = best_strategy[0]
best_strategy_score = best_strategy[1]
symbol_data['recommended_strategy'] = best_strategy_name
symbol_data['strategy_confidence'] = best_strategy_score
if (pattern_analysis and pattern_analysis.get('pattern_confidence', 0) > 0.6 and
self._is_strategy_pattern_aligned(best_strategy_name, pattern_analysis)):
pattern_bonus = pattern_analysis.get('pattern_confidence', 0) * 0.2
enhanced_confidence = min(best_strategy_score + pattern_bonus, 1.0)
symbol_data['strategy_confidence'] = enhanced_confidence
return strategy_scores, base_scores
except Exception as error:
print(f"❌ خطأ في تقييم الاستراتيجيات: {error}")
fallback_scores = await self.get_fallback_scores()
return fallback_scores, fallback_scores
def _is_strategy_pattern_aligned(self, strategy_name, pattern_analysis):
pattern_direction = pattern_analysis.get('predicted_direction', '')
pattern_type = pattern_analysis.get('pattern_detected', '')
bullish_strategies = ['trend_following', 'breakout_momentum']
bearish_strategies = ['mean_reversion', 'breakout_momentum']
if pattern_direction == 'up' and strategy_name in bullish_strategies:
return True
elif pattern_direction == 'down' and strategy_name in bearish_strategies:
return True
return False
async def get_default_weights(self):
return {
'trend_following': 0.15,
'mean_reversion': 0.12,
'breakout_momentum': 0.18,
'volume_spike': 0.10,
'whale_tracking': 0.20,
'pattern_recognition': 0.15,
'hybrid_ai': 0.10
}
async def get_fallback_scores(self):
return {
'trend_following': 0.5,
'mean_reversion': 0.5,
'breakout_momentum': 0.5,
'volume_spike': 0.5,
'whale_tracking': 0.5,
'pattern_recognition': 0.5,
'hybrid_ai': 0.5
}
async def _trend_following_strategy(self, symbol_data, market_context):
try:
score = 0.0
indicators = symbol_data.get('advanced_indicators', {})
timeframes = ['4h', '1h', '15m']
for timeframe in timeframes:
if timeframe in indicators:
timeframe_indicators = indicators[timeframe]
if self._check_ema_alignment(timeframe_indicators):
score += 0.20
adx_value = timeframe_indicators.get('adx', 0)
if adx_value > 20:
score += 0.15
volume_ratio = timeframe_indicators.get('volume_ratio', 0)
if volume_ratio > 1.2:
score += 0.10
return min(score, 1.0)
except Exception as error:
return 0.3
def _check_ema_alignment(self, indicators):
required_emas = ['ema_9', 'ema_21', 'ema_50']
if all(ema in indicators for ema in required_emas):
return (indicators['ema_9'] > indicators['ema_21'] > indicators['ema_50'])
return False
async def _mean_reversion_strategy(self, symbol_data, market_context):
try:
score = 0.0
current_price = symbol_data['current_price']
indicators = symbol_data.get('advanced_indicators', {})
if '1h' in indicators:
hourly_indicators = indicators['1h']
if all(key in hourly_indicators for key in ['bb_upper', 'bb_lower', 'bb_middle']):
position_in_band = (current_price - hourly_indicators['bb_lower']) / (hourly_indicators['bb_upper'] - hourly_indicators['bb_lower'])
if position_in_band < 0.1 and hourly_indicators.get('rsi', 50) < 35:
score += 0.45
if position_in_band > 0.9 and hourly_indicators.get('rsi', 50) > 65:
score += 0.45
rsi_value = hourly_indicators.get('rsi', 50)
if rsi_value < 30:
score += 0.35
elif rsi_value > 70:
score += 0.35
return min(score, 1.0)
except Exception as error:
return 0.3
async def _breakout_momentum_strategy(self, symbol_data, market_context):
try:
score = 0.0
indicators = symbol_data.get('advanced_indicators', {})
for timeframe in ['1h', '15m', '5m']:
if timeframe in indicators:
timeframe_indicators = indicators[timeframe]
volume_ratio = timeframe_indicators.get('volume_ratio', 0)
if volume_ratio > 1.8:
score += 0.25
elif volume_ratio > 1.3:
score += 0.15
if timeframe_indicators.get('macd_hist', 0) > 0:
score += 0.20
if 'vwap' in timeframe_indicators and symbol_data['current_price'] > timeframe_indicators['vwap']:
score += 0.15
rsi_value = timeframe_indicators.get('rsi', 50)
if 40 <= rsi_value <= 70:
score += 0.10
if score > 0.2:
score = max(score, 0.4)
return min(score, 1.0)
except Exception as error:
return 0.4
async def _volume_spike_strategy(self, symbol_data, market_context):
try:
score = 0.0
indicators = symbol_data.get('advanced_indicators', {})
for timeframe in ['1h', '15m', '5m']:
if timeframe in indicators:
volume_ratio = indicators[timeframe].get('volume_ratio', 0)
if volume_ratio > 3.0:
score += 0.45
elif volume_ratio > 2.0:
score += 0.25
elif volume_ratio > 1.5:
score += 0.15
return min(score, 1.0)
except Exception as error:
return 0.3
async def _whale_tracking_strategy(self, symbol_data, market_context):
try:
whale_data = symbol_data.get('whale_data', {})
if not whale_data.get('data_available', False):
return 0.2
whale_signal = await self.data_manager.get_whale_trading_signal(
symbol_data['symbol'], whale_data, market_context
)
if whale_signal and whale_signal.get('action') != 'HOLD':
confidence = whale_signal.get('confidence', 0)
if whale_signal.get('action') in ['STRONG_BUY', 'BUY']:
return min(confidence * 1.2, 1.0)
elif whale_signal.get('action') in ['STRONG_SELL', 'SELL']:
return min(confidence * 0.8, 1.0)
total_transactions = whale_data.get('transfer_count', 0)
whale_volume = whale_data.get('total_volume', 0)
score = 0.0
if total_transactions >= 2:
score += 0.35
elif total_transactions >= 1:
score += 0.25
if whale_volume > 25000:
score += 0.25
elif whale_volume > 5000:
score += 0.15
general_whale = market_context.get('general_whale_activity', {})
if general_whale.get('data_available', False) and general_whale.get('transaction_count', 0) > 0:
score += 0.15
return min(score, 1.0)
except Exception as error:
return 0.2
async def _pattern_recognition_strategy(self, symbol_data, market_context):
try:
score = 0.0
indicators = symbol_data.get('advanced_indicators', {})
pattern_analysis = symbol_data.get('pattern_analysis')
if pattern_analysis and pattern_analysis.get('pattern_confidence', 0) > 0.6:
score += pattern_analysis.get('pattern_confidence', 0) * 0.8
else:
for timeframe in ['4h', '1h']:
if timeframe in indicators:
timeframe_indicators = indicators[timeframe]
if (timeframe_indicators.get('rsi', 50) > 60 and
timeframe_indicators.get('macd_hist', 0) > 0 and
timeframe_indicators.get('volume_ratio', 0) > 1.5):
score += 0.35
if (timeframe_indicators.get('rsi', 50) < 40 and
timeframe_indicators.get('stoch_rsi_k', 50) < 20):
score += 0.35
return min(score, 1.0)
except Exception as error:
return 0.3
async def _hybrid_ai_strategy(self, symbol_data, market_context):
try:
score = 0.0
monte_carlo_probability = symbol_data.get('monte_carlo_probability', 0.5)
final_score = symbol_data.get('final_score', 0.5)
score += monte_carlo_probability * 0.4
score += final_score * 0.3
if market_context.get('btc_sentiment') == 'BULLISH':
score += 0.25
elif market_context.get('btc_sentiment') == 'BEARISH':
score -= 0.08
whale_activity = market_context.get('general_whale_activity', {})
if whale_activity.get('sentiment') == 'BULLISH':
score += 0.15
pattern_analysis = symbol_data.get('pattern_analysis')
if pattern_analysis and pattern_analysis.get('pattern_confidence', 0) > 0.6:
pattern_bonus = pattern_analysis.get('pattern_confidence', 0) * 0.25
score += pattern_bonus
return max(0.0, min(score, 1.0))
except Exception as error:
return 0.3
async def _fallback_strategy_score(self, strategy_name, symbol_data, market_context):
try:
base_score = symbol_data.get('final_score', 0.5)
if strategy_name == 'trend_following':
indicators = symbol_data.get('advanced_indicators', {})
if '1h' in indicators:
rsi_value = indicators['1h'].get('rsi', 50)
ema_9 = indicators['1h'].get('ema_9')
ema_21 = indicators['1h'].get('ema_21')
if ema_9 and ema_21 and ema_9 > ema_21 and 40 <= rsi_value <= 60:
return 0.6
return 0.4
elif strategy_name == 'mean_reversion':
current_price = symbol_data.get('current_price', 0)
indicators = symbol_data.get('advanced_indicators', {})
if '1h' in indicators:
rsi_value = indicators['1h'].get('rsi', 50)
bb_lower = indicators['1h'].get('bb_lower')
if bb_lower and current_price <= bb_lower * 1.02 and rsi_value < 35:
return 0.7
return 0.3
elif strategy_name == 'breakout_momentum':
volume_ratio = symbol_data.get('advanced_indicators', {}).get('1h', {}).get('volume_ratio', 0)
if volume_ratio > 1.5:
return 0.6
return 0.4
elif strategy_name == 'whale_tracking':
whale_data = symbol_data.get('whale_data', {})
if not whale_data.get('data_available', False):
return 0.2
total_transactions = whale_data.get('transfer_count', 0)
if total_transactions >= 3:
return 0.5
return 0.3
return base_score
except Exception as error:
return 0.3
class MLProcessor:
def __init__(self, market_context, data_manager, learning_engine):
self.market_context = market_context
self.data_manager = data_manager
self.learning_engine = learning_engine
self.technical_analyzer = AdvancedTechnicalAnalyzer()
self.strategy_engine = MultiStrategyEngine(data_manager, learning_engine)
def _validate_rsi_safety(self, indicators):
rsi_warnings = []
critical_issues = 0
timeframes_to_check = ['5m', '15m', '1h', '4h']
for timeframe in timeframes_to_check:
if timeframe in indicators:
rsi_value = indicators[timeframe].get('rsi')
if rsi_value:
if rsi_value > 80:
rsi_warnings.append(f"🚨 RSI CRITICAL in {timeframe}: {rsi_value} - EXTREME OVERBOUGHT")
critical_issues += 1
elif rsi_value > 75:
rsi_warnings.append(f"⚠️ RSI WARNING in {timeframe}: {rsi_value} - STRONG OVERBOUGHT")
elif rsi_value > 70:
rsi_warnings.append(f"📈 RSI HIGH in {timeframe}: {rsi_value} - OVERBOUGHT")
is_safe = critical_issues < 2
return is_safe, rsi_warnings
def _validate_indicators_quality_enhanced(self, indicators, current_price):
quality_issues = []
rsi_safe, rsi_warnings = self._validate_rsi_safety(indicators)
if not rsi_safe:
quality_issues.extend(rsi_warnings)
bullish_signals = bearish_signals = 0
for timeframe, data in indicators.items():
if data.get('macd_hist', 0) > 0:
bullish_signals += 1
if data.get('rsi', 50) > 70:
bearish_signals += 1
if 'ema_9' in data and 'ema_21' in data:
if data['ema_9'] > data['ema_21']:
bullish_signals += 1
if bullish_signals > 0 and bearish_signals > bullish_signals:
quality_issues.append("⚠️ Conflicting signals: More bearish than bullish indicators")
return quality_issues
def _calculate_enhanced_score_with_safety(self, base_analysis, strategy_scores, quality_issues):
base_score = base_analysis.get('final_score', 0.5)
strategy_average = sum(strategy_scores.values()) / len(strategy_scores) if strategy_scores else 0.5
safety_penalty = 0.0
for issue in quality_issues:
if '🚨 RSI CRITICAL' in issue:
safety_penalty += 0.3
elif '⚠️ RSI WARNING' in issue:
safety_penalty += 0.15
elif '📈 RSI HIGH' in issue:
safety_penalty += 0.05
enhanced_score = (base_score * 0.4) + (strategy_average * 0.6)
enhanced_score = max(0.0, enhanced_score - safety_penalty)
return min(enhanced_score, 1.0)
async def process_and_score_symbol_enhanced(self, raw_data):
try:
if not raw_data or not raw_data.get('ohlcv'):
print(f"❌ بيانات غير صالحة للرمز {raw_data.get('symbol', 'unknown')}")
return None
symbol = raw_data['symbol']
print(f"🔍 معالجة الرمز {symbol}...")
raw_data['raw_ohlcv'] = raw_data.get('ohlcv', {})
base_analysis = await self.process_and_score_symbol(raw_data)
if not base_analysis:
print(f"❌ فشل التحليل الأساسي للرمز {symbol}")
return None
try:
current_price = base_analysis.get('current_price', 0)
quality_issues = self._validate_indicators_quality_enhanced(base_analysis.get('advanced_indicators', {}), current_price)
if quality_issues:
print(f"⚠️ مشاكل جودة في {symbol}:")
for issue in quality_issues:
print(f" {issue}")
if hasattr(self, 'strategy_engine') and self.strategy_engine:
strategy_scores, base_scores = await self.strategy_engine.evaluate_all_strategies(base_analysis, self.market_context)
base_analysis['strategy_scores'] = strategy_scores
base_analysis['base_strategy_scores'] = base_scores
if base_scores:
best_strategy = max(base_scores.items(), key=lambda x: x[1])
best_strategy_name = best_strategy[0]
best_strategy_score = best_strategy[1]
base_analysis['recommended_strategy'] = best_strategy_name
base_analysis['strategy_confidence'] = best_strategy_score
if best_strategy_score > 0.3:
base_analysis['target_strategy'] = best_strategy_name
else:
base_analysis['target_strategy'] = 'GENERIC'
print(f"🎯 أفضل استراتيجية لـ {symbol}: {best_strategy_name} (ثقة: {best_strategy_score:.2f})")
else:
base_analysis['recommended_strategy'] = 'GENERIC'
base_analysis['strategy_confidence'] = 0.3
base_analysis['target_strategy'] = 'GENERIC'
print(f"⚠️ لا توجد استراتيجيات مناسبة لـ {symbol}")
enhanced_score = self._calculate_enhanced_score_with_safety(base_analysis, strategy_scores, quality_issues)
base_analysis['enhanced_final_score'] = enhanced_score
print(f"📊 النتيجة النهائية لـ {symbol}: {enhanced_score:.3f}")
else:
base_analysis['strategy_scores'] = {}
base_analysis['enhanced_final_score'] = base_analysis.get('final_score', 0.5)
base_analysis['recommended_strategy'] = 'GENERIC'
base_analysis['strategy_confidence'] = 0.3
base_analysis['target_strategy'] = 'GENERIC'
print(f"⚠️ محرك الاستراتيجيات غير متوفر لـ {symbol}")
base_analysis['quality_warnings'] = quality_issues
except Exception as strategy_error:
print(f"❌ خطأ في تقييم الاستراتيجية لـ {symbol}: {strategy_error}")
base_analysis['strategy_scores'] = {}
base_analysis['enhanced_final_score'] = base_analysis.get('final_score', 0.5)
base_analysis['recommended_strategy'] = 'GENERIC'
base_analysis['strategy_confidence'] = 0.3
base_analysis['target_strategy'] = 'GENERIC'
base_analysis['quality_warnings'] = ['Strategy evaluation failed']
return base_analysis
except Exception as error:
print(f"❌ خطأ في المعالجة المحسنة للرمز {raw_data.get('symbol', 'unknown')}: {error}")
return await self.process_and_score_symbol(raw_data)
def _improve_fibonacci_levels(self, daily_dataframe, current_price):
if len(daily_dataframe) < 50:
return {}
recent_high = float(daily_dataframe['high'].iloc[-50:].max())
recent_low = float(daily_dataframe['low'].iloc[-50:].min())
if current_price > recent_high:
recent_high = current_price * 1.05
if current_price < recent_low:
recent_low = current_price * 0.95
difference = recent_high - recent_low
if difference <= 0:
return {}
return {
"0.0%": recent_high,
"23.6%": recent_high - 0.236 * difference,
"38.2%": recent_high - 0.382 * difference,
"50.0%": recent_high - 0.50 * difference,
"61.8%": recent_high - 0.618 * difference,
"78.6%": recent_high - 0.786 * difference,
"100.0%": recent_low
}
async def process_and_score_symbol(self, raw_data):
symbol = raw_data['symbol']
ohlcv_data = raw_data['ohlcv']
reasons_for_candidacy = raw_data.get('reasons', [])
if not ohlcv_data:
print(f"❌ لا توجد بيانات OHLCV للرمز {symbol}")
return None
try:
print(f"📈 تحليل المؤشرات للرمز {symbol}...")
all_indicators = {}
for timeframe, candles in ohlcv_data.items():
if candles:
dataframe = pd.DataFrame(candles, columns=['time', 'open', 'high', 'low', 'close', 'volume'])
dataframe[['open', 'high', 'low', 'close', 'volume']] = dataframe[['open', 'high', 'low', 'close', 'volume']].astype(float)
all_indicators[timeframe] = self._calculate_indicators(dataframe, timeframe)
hourly_dataframe = pd.DataFrame(ohlcv_data.get('1h', []), columns=['time', 'open', 'high', 'low', 'close', 'volume'])
if hourly_dataframe.empty:
print(f"❌ لا توجد بيانات ساعة للرمز {symbol}")
return None
hourly_dataframe[['open', 'high', 'low', 'close', 'volume']] = hourly_dataframe[['open', 'high', 'low', 'close', 'volume']].astype(float)
try:
current_price = float(hourly_dataframe['close'].iloc[-1])
if ohlcv_data.get('5m'):
five_minute_dataframe = pd.DataFrame(ohlcv_data['5m'], columns=['time', 'open', 'high', 'low', 'close', 'volume'])
if not five_minute_dataframe.empty:
five_minute_dataframe[['open', 'high', 'low', 'close', 'volume']] = five_minute_dataframe[['open', 'high', 'low', 'close', 'volume']].astype(float)
current_price = float(five_minute_dataframe['close'].iloc[-1])
liquidity_score = self._calculate_liquidity_score(hourly_dataframe)
daily_dataframe = pd.DataFrame(ohlcv_data.get('1d', []), columns=['time', 'open', 'high', 'low', 'close', 'volume'])
if not daily_dataframe.empty:
daily_dataframe[['open', 'high', 'low', 'close', 'volume']] = daily_dataframe[['open', 'high', 'low', 'close', 'volume']].astype(float)
average_daily_volume = float(daily_dataframe['volume'].mean()) if not daily_dataframe.empty else 0.0
fibonacci_levels = self._improve_fibonacci_levels(daily_dataframe, current_price)
try:
whale_data = await self.data_manager.get_symbol_specific_whale_data(symbol)
print(f"🐋 بيانات الحيتان لـ {symbol}: {whale_data.get('transfer_count', 0)} تحويل")
except Exception as whale_error:
whale_data = {"transfer_count": 0, "total_volume": 0, "source": "no_data", "data_available": False}
print(f"⚠️ لا توجد بيانات حيتان لـ {symbol}")
# إصلاح: إزالة await لأن الدالة ليست async
whale_score = self.data_manager.whale_monitor._calculate_whale_activity_score(whale_data)
# اكتشاف الأنماط لجميع الأطر الزمنية
pattern_analysis = self._detect_chart_patterns(all_indicators, hourly_dataframe)
if pattern_analysis['pattern_detected'] != 'no_clear_pattern':
print(f"🎯 نمط مكتشف لـ {symbol}: {pattern_analysis['pattern_detected']} (ثقة: {pattern_analysis['pattern_confidence']:.2f})")
opportunity_classification = self.classify_opportunity_type(all_indicators, current_price)
initial_score = self._calculate_initial_score(all_indicators, current_price, self.market_context)
print(f"🎲 تشغيل محاكاة مونت كارلو لـ {symbol}...")
monte_carlo_probability = self._run_monte_carlo_simulation(hourly_dataframe)
print(f"📊 نتيجة مونت كارلو لـ {symbol}: {monte_carlo_probability:.3f}")
final_score = (0.35 * initial_score) + (0.50 * monte_carlo_probability) + (0.15 * whale_score)
final_score *= opportunity_classification['confidence']
normalized_indicators = {timeframe: self._normalize_features_corrected(indicators) for timeframe, indicators in all_indicators.items()}
print(f"✅ اكتمل تحليل {symbol} - النتيجة: {final_score:.3f}")
return {
'symbol': symbol,
'reasons_for_candidacy': reasons_for_candidacy,
'current_price': float(current_price),
'liquidity_score': float(liquidity_score) if not np.isnan(liquidity_score) else 0.0,
'avg_daily_volume': float(average_daily_volume),
'whale_data': whale_data,
'whale_score': float(whale_score),
'pattern_analysis': pattern_analysis, # إضافة تحليل الأنماط
'opportunity_type': opportunity_classification,
'sentiment_data': self.market_context,
'fibonacci_levels': fibonacci_levels,
'final_score': float(final_score),
'initial_score': float(initial_score),
'monte_carlo_probability': float(monte_carlo_probability),
'indicators': normalized_indicators,
'advanced_indicators': all_indicators,
'strategy_scores': {},
'recommended_strategy': 'GENERIC',
'enhanced_final_score': float(final_score),
'target_strategy': 'GENERIC',
'raw_ohlcv': ohlcv_data
}
except (KeyError, IndexError) as error:
print(f"❌ خطأ في فهارس البيانات لـ {symbol}: {error}")
return None
except Exception as error:
print(f"❌ خطأ عام في معالجة الرمز {symbol}: {error}")
return None
def _detect_chart_patterns(self, indicators, dataframe):
"""اكتشاف الأنماط السعرية لجميع الأطر الزمنية"""
patterns = {
'pattern_detected': 'no_clear_pattern',
'pattern_confidence': 0,
'predicted_direction': 'neutral',
'timeframe_analysis': {}
}
# تحليل كل إطار زمني
for timeframe, timeframe_indicators in indicators.items():
timeframe_pattern = self._analyze_timeframe_patterns(timeframe_indicators, dataframe, timeframe)
patterns['timeframe_analysis'][timeframe] = timeframe_pattern
# اختيار النمط الأعلى ثقة
if timeframe_pattern['confidence'] > patterns['pattern_confidence']:
patterns.update({
'pattern_detected': timeframe_pattern['pattern'],
'pattern_confidence': timeframe_pattern['confidence'],
'predicted_direction': timeframe_pattern['direction']
})
return patterns
def _analyze_timeframe_patterns(self, indicators, dataframe, timeframe):
"""تحليل الأنماط لإطار زمني محدد"""
pattern_info = {
'pattern': 'no_clear_pattern',
'confidence': 0,
'direction': 'neutral'
}
# شروط بسيطة للكشف عن الأنماط الأساسية
if len(dataframe) >= 20:
closes = dataframe['close'].values
# Double Top/Bottom
if self._detect_double_pattern(closes):
recent_trend = 'up' if closes[-1] > closes[-10] else 'down'
pattern_info.update({
'pattern': 'Double Top' if recent_trend == 'up' else 'Double Bottom',
'confidence': 0.7,
'direction': 'down' if recent_trend == 'up' else 'up'
})
# Support/Resistance Breakout
elif self._detect_breakout_pattern(indicators, closes):
pattern_info.update({
'pattern': 'Breakout',
'confidence': 0.6,
'direction': 'up' if closes[-1] > closes[-5] else 'down'
})
# Trend Detection
elif self._detect_trend_pattern(indicators):
trend = 'up' if indicators.get('ema_9', 0) > indicators.get('ema_21', 0) else 'down'
pattern_info.update({
'pattern': 'Trend Continuation',
'confidence': 0.5,
'direction': trend
})
return pattern_info
def _detect_double_pattern(self, closes):
"""كشف النمط المزدوج (قمة/قاع)"""
if len(closes) < 20:
return False
# تحليل بسيط للقمة/القاع المزدوج
recent_high = max(closes[-10:])
recent_low = min(closes[-10:])
high_count = sum(1 for price in closes[-10:] if price >= recent_high * 0.98)
low_count = sum(1 for price in closes[-10:] if price <= recent_low * 1.02)
return high_count >= 2 or low_count >= 2
def _detect_breakout_pattern(self, indicators, closes):
"""كشف نمط الاختراق"""
if 'bb_upper' not in indicators or 'bb_lower' not in indicators:
return False
current_price = closes[-1]
bb_upper = indicators['bb_upper']
bb_lower = indicators['bb_lower']
# اختراق المقاومة أو الدعم
return current_price > bb_upper * 1.02 or current_price < bb_lower * 0.98
def _detect_trend_pattern(self, indicators):
"""كشف نمط الاستمرارية"""
if 'ema_9' not in indicators or 'ema_21' not in indicators:
return False
# تأكيد الاتجاه
ema_trend = indicators['ema_9'] > indicators['ema_21']
macd_trend = indicators.get('macd_hist', 0) > 0
return ema_trend == macd_trend
def _calculate_indicators(self, dataframe, timeframe):
indicators = {}
if dataframe.empty:
return indicators
if not isinstance(dataframe.index, pd.DatetimeIndex):
try:
dataframe['time'] = pd.to_datetime(dataframe['time'], unit='ms')
dataframe = dataframe.set_index('time', drop=True)
except:
dataframe['time'] = pd.to_datetime(dataframe['time'])
dataframe = dataframe.set_index('time', drop=True)
dataframe = dataframe.sort_index()
if len(dataframe) >= 1 and all(column in dataframe.columns for column in ['high', 'low', 'close', 'volume']):
try:
typical_price = (dataframe['high'] + dataframe['low'] + dataframe['close']) / 3
volume_weighted_average_price = (typical_price * dataframe['volume']).cumsum() / dataframe['volume'].cumsum()
if not volume_weighted_average_price.empty and not pd.isna(volume_weighted_average_price.iloc[-1]):
indicators['vwap'] = float(volume_weighted_average_price.iloc[-1])
except Exception as error:
pass
if len(dataframe) >= 14:
rsi_series = ta.rsi(dataframe['close'], length=14)
if rsi_series is not None and not rsi_series.empty and rsi_series.iloc[-1] is not np.nan:
indicators['rsi'] = float(rsi_series.iloc[-1])
if len(dataframe) >= 26:
macd = ta.macd(dataframe['close'])
if macd is not None and not macd.empty:
if 'MACDh_12_26_9' in macd.columns and macd['MACDh_12_26_9'].iloc[-1] is not np.nan:
indicators['macd_hist'] = float(macd['MACDh_12_26_9'].iloc[-1])
if 'MACD_12_26_9' in macd.columns and macd['MACD_12_26_9'].iloc[-1] is not np.nan:
indicators['macd_line'] = float(macd['MACD_12_26_9'].iloc[-1])
if 'MACDs_12_26_9' in macd.columns and macd['MACDs_12_26_9'].iloc[-1] is not np.nan:
indicators['macd_signal'] = float(macd['MACDs_12_26_9'].iloc[-1])
if len(dataframe) >= 20:
bollinger_bands = ta.bbands(dataframe['close'], length=20, std=2)
if bollinger_bands is not None and not bollinger_bands.empty:
if 'BBL_20_2.0' in bollinger_bands.columns and bollinger_bands['BBL_20_2.0'].iloc[-1] is not np.nan:
indicators['bb_lower'] = float(bollinger_bands['BBL_20_2.0'].iloc[-1])
if 'BBU_20_2.0' in bollinger_bands.columns and bollinger_bands['BBU_20_2.0'].iloc[-1] is not np.nan:
indicators['bb_upper'] = float(bollinger_bands['BBU_20_2.0'].iloc[-1])
if 'BBM_20_2.0' in bollinger_bands.columns and bollinger_bands['BBM_20_2.0'].iloc[-1] is not np.nan:
indicators['bb_middle'] = float(bollinger_bands['BBM_20_2.0'].iloc[-1])
if len(dataframe) >= 14:
average_true_range = ta.atr(high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], length=14)
if average_true_range is not None and not average_true_range.empty and average_true_range.iloc[-1] is not np.nan:
indicators['atr'] = float(average_true_range.iloc[-1])
if len(dataframe) >= 26:
ema_12 = ta.ema(dataframe['close'], length=12)
ema_26 = ta.ema(dataframe['close'], length=26)
if ema_12 is not None and not ema_12.empty and ema_12.iloc[-1] is not np.nan:
indicators['ema_12'] = float(ema_12.iloc[-1])
if ema_26 is not None and not ema_26.empty and ema_26.iloc[-1] is not np.nan:
indicators['ema_26'] = float(ema_26.iloc[-1])
# إضافة المتوسطات المتحركة الإضافية
if len(dataframe) >= 9:
ema_9 = ta.ema(dataframe['close'], length=9)
if ema_9 is not None and not ema_9.empty and ema_9.iloc[-1] is not np.nan:
indicators['ema_9'] = float(ema_9.iloc[-1])
if len(dataframe) >= 21:
ema_21 = ta.ema(dataframe['close'], length=21)
if ema_21 is not None and not ema_21.empty and ema_21.iloc[-1] is not np.nan:
indicators['ema_21'] = float(ema_21.iloc[-1])
return indicators
def _normalize_features_corrected(self, features):
normalized_features = {}
for key, value in features.items():
if value is None:
normalized_features[key] = 0.0
continue
if key == 'rsi':
normalized_features[key] = max(0, min(100, value))
elif key in ['macd_hist', 'macd_line', 'macd_signal', 'vwap', 'atr']:
normalized_features[key] = value
elif 'ema' in key or 'bb_' in key:
normalized_features[key] = value
else:
try:
if abs(value) > 1000:
normalized_features[key] = value / 1000
else:
normalized_features[key] = value
except:
normalized_features[key] = value
return normalized_features
def _run_monte_carlo_simulation(self, dataframe, number_of_simulations=1000, number_of_steps=20):
if dataframe.empty or len(dataframe) < 2:
return 0.0
log_returns = np.log(dataframe['close'] / dataframe['close'].shift(1)).dropna()
if log_returns.empty:
return 0.0
mean_return = log_returns.mean()
volatility = log_returns.std()
initial_price = dataframe['close'].iloc[-1]
success_count = 0
for _ in range(number_of_simulations):
random_values = np.random.normal(0, 1, number_of_steps)
daily_returns = np.exp(mean_return - 0.5 * volatility**2 + volatility * random_values)
simulated_prices = initial_price * daily_returns.cumprod()
if (simulated_prices[-1] / initial_price) > 1.02:
success_count += 1
probability = success_count / number_of_simulations
return probability
def _calculate_initial_score(self, indicators, current_price, market_context):
score = 0.5
fast_timeframes = ['5m', '15m']
for timeframe in fast_timeframes:
timeframe_indicators = indicators.get(timeframe, {})
if not timeframe_indicators:
continue
if 'rsi' in timeframe_indicators:
rsi_value = timeframe_indicators['rsi']
if isinstance(rsi_value, (int, float)):
if rsi_value < 30:
score += 0.2
elif rsi_value < 40:
score += 0.1
elif rsi_value > 70:
score -= 0.1
if 'macd_hist' in timeframe_indicators and timeframe_indicators['macd_hist'] > 0:
score += 0.15
if all(key in timeframe_indicators for key in ['ema_12', 'ema_26']):
if timeframe_indicators['ema_12'] > timeframe_indicators['ema_26']:
score += 0.15
slow_timeframes = ['1h', '4h', '1d']
for timeframe in slow_timeframes:
timeframe_indicators = indicators.get(timeframe, {})
if not timeframe_indicators:
continue
if all(key in timeframe_indicators for key in ['ema_12', 'ema_26']):
if timeframe_indicators['ema_12'] > timeframe_indicators['ema_26']:
score += 0.10
if all(key in timeframe_indicators for key in ['bb_upper', 'bb_lower']):
if current_price > timeframe_indicators['bb_upper']:
score += 0.10
elif current_price <= timeframe_indicators['bb_lower']:
score += 0.05
if '5m' in indicators and 'vwap' in indicators['5m'] and current_price > indicators['5m']['vwap']:
score += 0.10
if market_context:
bitcoin_sentiment = market_context.get('btc_sentiment')
fear_greed_index = market_context.get('fear_and_greed_index', 50)
if bitcoin_sentiment == 'BULLISH' and fear_greed_index > 60:
score *= 1.2
elif bitcoin_sentiment == 'BEARISH' or fear_greed_index < 30:
score *= 0.8
return min(max(score, 0.0), 1.0)
def _normalize_features(self, features):
return self._normalize_features_corrected(features)
def _prepare_data_for_ml(self, all_indicators, current_price):
feature_vector = []
timeframes = ['5m', '15m', '1h', '4h', '1d']
indicator_keys = ['rsi', 'macd_hist', 'macd_line', 'bb_upper', 'bb_lower', 'atr', 'ema_12', 'ema_26', 'vwap']
for timeframe in timeframes:
timeframe_indicators = all_indicators.get(timeframe, {})
for key in indicator_keys:
feature_vector.append(timeframe_indicators.get(key, 0.0))
feature_vector.append(current_price)
return feature_vector
def _calculate_liquidity_score(self, hourly_dataframe):
if hourly_dataframe.empty:
return 0.0
hourly_dataframe['dollar_volume'] = hourly_dataframe['volume'] * hourly_dataframe['close']
return float(hourly_dataframe['dollar_volume'].mean())
def _calculate_fibonacci_levels(self, daily_dataframe):
return self._improve_fibonacci_levels(daily_dataframe, 0)
def classify_opportunity_type(self, indicators, current_price):
fast_signals = slow_signals = 0
for timeframe in ['5m', '15m']:
timeframe_indicators = indicators.get(timeframe, {})
if not timeframe_indicators:
continue
if timeframe_indicators.get('rsi', 100) < 35:
fast_signals += 1
if timeframe_indicators.get('macd_hist', 0) > 0:
fast_signals += 1
if all(key in timeframe_indicators for key in ['ema_12', 'ema_26']):
if timeframe_indicators['ema_12'] > timeframe_indicators['ema_26']:
fast_signals += 1
if timeframe == '5m' and timeframe_indicators.get('vwap') and current_price > timeframe_indicators['vwap'] * 1.02:
fast_signals += 1
for timeframe in ['1h', '4h', '1d']:
timeframe_indicators = indicators.get(timeframe, {})
if not timeframe_indicators:
continue
if 40 <= timeframe_indicators.get('rsi', 50) <= 60:
slow_signals += 1
if all(key in timeframe_indicators for key in ['ema_12', 'ema_26']):
if timeframe_indicators['ema_12'] > timeframe_indicators['ema_26']:
slow_signals += 1
if timeframe_indicators.get('bb_middle') and current_price > timeframe_indicators['bb_middle']:
slow_signals += 1
if fast_signals >= 3:
return {
"type": "FAST_PUMP",
"timeframe": "15m-1h",
"take_profit_multiplier": 1.08,
"stop_loss_multiplier": 0.97,
"confidence": min(fast_signals / 6.0, 1.0),
"description": "Strong fast pump opportunity on short timeframes"
}
elif slow_signals >= 3:
return {
"type": "SLOW_GROWTH",
"timeframe": "4h-1d",
"take_profit_multiplier": 1.05,
"stop_loss_multiplier": 0.95,
"confidence": min(slow_signals / 6.0, 1.0),
"description": "Sustainable growth opportunity on long timeframes"
}
return {
"type": "NEUTRAL",
"timeframe": "N/A",
"take_profit_multiplier": 1.05,
"stop_loss_multiplier": 0.95,
"confidence": 0.3,
"description": "No clear signals for specific opportunity type"
}
def filter_top_candidates(self, candidates, number_of_candidates=10):
valid_candidates = [candidate for candidate in candidates if candidate is not None]
if not valid_candidates:
print("❌ لا توجد مرشحات صالحة للتصفية")
return []
sorted_candidates = sorted(valid_candidates, key=lambda candidate: candidate.get('enhanced_final_score', candidate.get('final_score', 0)), reverse=True)
top_candidates = sorted_candidates[:number_of_candidates]
print(f"🎖️ أفضل {len(top_candidates)} مرشح:")
for i, candidate in enumerate(top_candidates):
score = candidate.get('enhanced_final_score', 0)
strategy = candidate.get('recommended_strategy', 'GENERIC')
print(f" {i+1}. {candidate['symbol']}: {score:.3f} - {strategy}")
return top_candidates