|
|
|
|
|
import pandas as pd |
|
|
import pandas_ta as ta |
|
|
import numpy as np |
|
|
|
|
|
class AdvancedTechnicalAnalyzer: |
|
|
def __init__(self): |
|
|
self.indicators_config = { |
|
|
'trend': ['ema_9', 'ema_21', 'ema_50', 'ema_200', 'ichimoku', 'adx', 'parabolic_sar', 'dmi'], |
|
|
'momentum': ['rsi', 'stoch_rsi', 'macd', 'williams_r', 'cci', 'awesome_oscillator', 'momentum'], |
|
|
'volatility': ['bbands', 'atr', 'keltner', 'donchian', 'rvi'], |
|
|
'volume': ['vwap', 'obv', 'mfi', 'volume_profile', 'ad', 'volume_oscillator'], |
|
|
'cycle': ['hull_ma', 'supertrend', 'zigzag', 'fisher_transform'] |
|
|
} |
|
|
|
|
|
def calculate_all_indicators(self, dataframe, timeframe): |
|
|
"""حساب جميع المؤشرات الفنية للإطار الزمني المحدد""" |
|
|
if dataframe.empty or dataframe is None: |
|
|
return {} |
|
|
|
|
|
indicators = {} |
|
|
|
|
|
try: |
|
|
indicators.update(self._calculate_trend_indicators(dataframe)) |
|
|
indicators.update(self._calculate_momentum_indicators(dataframe)) |
|
|
indicators.update(self._calculate_volatility_indicators(dataframe)) |
|
|
indicators.update(self._calculate_volume_indicators(dataframe, timeframe)) |
|
|
indicators.update(self._calculate_cycle_indicators(dataframe)) |
|
|
except Exception as e: |
|
|
print(f"⚠️ خطأ في حساب المؤشرات لـ {timeframe}: {e}") |
|
|
|
|
|
return indicators |
|
|
|
|
|
def _calculate_trend_indicators(self, dataframe): |
|
|
"""حساب مؤشرات الاتجاه""" |
|
|
trend = {} |
|
|
|
|
|
try: |
|
|
if dataframe is None or dataframe.empty or 'close' not in dataframe.columns: |
|
|
return {} |
|
|
|
|
|
if len(dataframe) >= 9: |
|
|
ema_9 = ta.ema(dataframe['close'], length=9) |
|
|
if ema_9 is not None and not ema_9.empty and not pd.isna(ema_9.iloc[-1]): |
|
|
trend['ema_9'] = float(ema_9.iloc[-1]) |
|
|
|
|
|
if len(dataframe) >= 21: |
|
|
ema_21 = ta.ema(dataframe['close'], length=21) |
|
|
if ema_21 is not None and not ema_21.empty and not pd.isna(ema_21.iloc[-1]): |
|
|
trend['ema_21'] = float(ema_21.iloc[-1]) |
|
|
|
|
|
if len(dataframe) >= 50: |
|
|
ema_50 = ta.ema(dataframe['close'], length=50) |
|
|
if ema_50 is not None and not ema_50.empty and not pd.isna(ema_50.iloc[-1]): |
|
|
trend['ema_50'] = float(ema_50.iloc[-1]) |
|
|
|
|
|
if len(dataframe) >= 200: |
|
|
ema_200 = ta.ema(dataframe['close'], length=200) |
|
|
if ema_200 is not None and not ema_200.empty and not pd.isna(ema_200.iloc[-1]): |
|
|
trend['ema_200'] = float(ema_200.iloc[-1]) |
|
|
|
|
|
if len(dataframe) >= 26: |
|
|
try: |
|
|
ichimoku = ta.ichimoku(dataframe['high'], dataframe['low'], dataframe['close']) |
|
|
if ichimoku is not None and len(ichimoku) > 0: |
|
|
conversion_line = ichimoku[0].get('ITS_9') if ichimoku[0] is not None else None |
|
|
base_line = ichimoku[0].get('IKS_26') if ichimoku[0] is not None else None |
|
|
|
|
|
if conversion_line is not None and not conversion_line.empty and not pd.isna(conversion_line.iloc[-1]): |
|
|
trend['ichimoku_conversion'] = float(conversion_line.iloc[-1]) |
|
|
if base_line is not None and not base_line.empty and not pd.isna(base_line.iloc[-1]): |
|
|
trend['ichimoku_base'] = float(base_line.iloc[-1]) |
|
|
except Exception as ichimoku_error: |
|
|
pass |
|
|
|
|
|
if len(dataframe) >= 14: |
|
|
try: |
|
|
adx_result = ta.adx(dataframe['high'], dataframe['low'], dataframe['close'], length=14) |
|
|
if adx_result is not None and not adx_result.empty: |
|
|
adx_value = adx_result.get('ADX_14') |
|
|
if adx_value is not None and not adx_value.empty and not pd.isna(adx_value.iloc[-1]): |
|
|
trend['adx'] = float(adx_value.iloc[-1]) |
|
|
except Exception as adx_error: |
|
|
pass |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ خطأ في حساب مؤشرات الاتجاه: {e}") |
|
|
|
|
|
return {key: value for key, value in trend.items() if value is not None and not np.isnan(value)} |
|
|
|
|
|
def _calculate_momentum_indicators(self, dataframe): |
|
|
"""حساب مؤشرات الزخم""" |
|
|
momentum = {} |
|
|
|
|
|
try: |
|
|
if dataframe is None or dataframe.empty or 'close' not in dataframe.columns: |
|
|
return {} |
|
|
|
|
|
if len(dataframe) >= 14: |
|
|
rsi = ta.rsi(dataframe['close'], length=14) |
|
|
if rsi is not None and not rsi.empty and not pd.isna(rsi.iloc[-1]): |
|
|
momentum['rsi'] = float(rsi.iloc[-1]) |
|
|
|
|
|
if len(dataframe) >= 26: |
|
|
macd = ta.macd(dataframe['close']) |
|
|
if macd is not None and not macd.empty: |
|
|
macd_hist = macd.get('MACDh_12_26_9') |
|
|
macd_line = macd.get('MACD_12_26_9') |
|
|
|
|
|
if macd_hist is not None and not macd_hist.empty and not pd.isna(macd_hist.iloc[-1]): |
|
|
momentum['macd_hist'] = float(macd_hist.iloc[-1]) |
|
|
if macd_line is not None and not macd_line.empty and not pd.isna(macd_line.iloc[-1]): |
|
|
momentum['macd_line'] = float(macd_line.iloc[-1]) |
|
|
|
|
|
if len(dataframe) >= 14: |
|
|
stoch_rsi = ta.stochrsi(dataframe['close'], length=14) |
|
|
if stoch_rsi is not None and not stoch_rsi.empty: |
|
|
stoch_k = stoch_rsi.get('STOCHRSIk_14_14_3_3') |
|
|
if stoch_k is not None and not stoch_k.empty and not pd.isna(stoch_k.iloc[-1]): |
|
|
momentum['stoch_rsi_k'] = float(stoch_k.iloc[-1]) |
|
|
|
|
|
if len(dataframe) >= 14: |
|
|
williams = ta.willr(dataframe['high'], dataframe['low'], dataframe['close'], length=14) |
|
|
if williams is not None and not williams.empty and not pd.isna(williams.iloc[-1]): |
|
|
momentum['williams_r'] = float(williams.iloc[-1]) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ خطأ في حساب مؤشرات الزخم: {e}") |
|
|
|
|
|
return {key: value for key, value in momentum.items() if value is not None and not np.isnan(value)} |
|
|
|
|
|
def _calculate_volatility_indicators(self, dataframe): |
|
|
"""حساب مؤشرات التقلب""" |
|
|
volatility = {} |
|
|
|
|
|
try: |
|
|
if dataframe is None or dataframe.empty or 'close' not in dataframe.columns: |
|
|
return {} |
|
|
|
|
|
if len(dataframe) >= 20: |
|
|
bollinger_bands = ta.bbands(dataframe['close'], length=20, std=2) |
|
|
if bollinger_bands is not None and not bollinger_bands.empty: |
|
|
bb_lower = bollinger_bands.get('BBL_20_2.0') |
|
|
bb_upper = bollinger_bands.get('BBU_20_2.0') |
|
|
bb_middle = bollinger_bands.get('BBM_20_2.0') |
|
|
|
|
|
if bb_lower is not None and not bb_lower.empty and not pd.isna(bb_lower.iloc[-1]): |
|
|
volatility['bb_lower'] = float(bb_lower.iloc[-1]) |
|
|
if bb_upper is not None and not bb_upper.empty and not pd.isna(bb_upper.iloc[-1]): |
|
|
volatility['bb_upper'] = float(bb_upper.iloc[-1]) |
|
|
if bb_middle is not None and not bb_middle.empty and not pd.isna(bb_middle.iloc[-1]): |
|
|
volatility['bb_middle'] = float(bb_middle.iloc[-1]) |
|
|
|
|
|
if len(dataframe) >= 14: |
|
|
average_true_range = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14) |
|
|
if average_true_range is not None and not average_true_range.empty and not pd.isna(average_true_range.iloc[-1]): |
|
|
atr_value = float(average_true_range.iloc[-1]) |
|
|
volatility['atr'] = atr_value |
|
|
current_close = dataframe['close'].iloc[-1] if not dataframe['close'].empty else 0 |
|
|
if atr_value and current_close > 0: |
|
|
volatility['atr_percent'] = (atr_value / current_close) * 100 |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ خطأ في حساب مؤشرات التقلب: {e}") |
|
|
|
|
|
|
|
|
return {key: value for key, value in volatility.items() if value is not None and not np.isnan(value)} |
|
|
|
|
|
def _calculate_volume_indicators(self, dataframe, timeframe): |
|
|
"""حساب مؤشرات الحجم""" |
|
|
volume = {} |
|
|
|
|
|
try: |
|
|
if dataframe is None or dataframe.empty or 'close' not in dataframe.columns or 'volume' not in dataframe.columns: |
|
|
return {} |
|
|
|
|
|
if len(dataframe) >= 1: |
|
|
try: |
|
|
df_vwap = dataframe.copy() |
|
|
|
|
|
if not isinstance(df_vwap.index, pd.DatetimeIndex): |
|
|
if 'timestamp' in df_vwap.columns: |
|
|
df_vwap['timestamp'] = pd.to_datetime(df_vwap['timestamp'], unit='ms') |
|
|
df_vwap.set_index('timestamp', inplace=True) |
|
|
else: |
|
|
raise ValueError("DataFrame needs 'timestamp' column or DatetimeIndex") |
|
|
|
|
|
df_vwap.sort_index(inplace=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
volume_weighted_average_price = ta.vwap( |
|
|
high=df_vwap['high'], |
|
|
low=df_vwap['low'], |
|
|
close=df_vwap['close'], |
|
|
volume=df_vwap['volume'] |
|
|
) |
|
|
|
|
|
|
|
|
if volume_weighted_average_price is not None and not volume_weighted_average_price.empty and not pd.isna(volume_weighted_average_price.iloc[-1]): |
|
|
volume['vwap'] = float(volume_weighted_average_price.iloc[-1]) |
|
|
|
|
|
except Exception as vwap_error: |
|
|
|
|
|
if "VWAP requires an ordered DatetimeIndex" not in str(vwap_error) and "Index" not in str(vwap_error): |
|
|
print(f"⚠️ خطأ في حساب VWAP لـ {timeframe}: {vwap_error}") |
|
|
|
|
|
|
|
|
if len(dataframe) >= 20: |
|
|
try: |
|
|
typical_price = (dataframe['high'] + dataframe['low'] + dataframe['close']) / 3 |
|
|
vwap_simple = (typical_price * dataframe['volume']).sum() / dataframe['volume'].sum() |
|
|
if not np.isnan(vwap_simple): |
|
|
volume['vwap'] = float(vwap_simple) |
|
|
except Exception as simple_vwap_error: |
|
|
pass |
|
|
|
|
|
try: |
|
|
on_balance_volume = ta.obv(dataframe['close'], dataframe['volume']) |
|
|
if on_balance_volume is not None and not on_balance_volume.empty and not pd.isna(on_balance_volume.iloc[-1]): |
|
|
volume['obv'] = float(on_balance_volume.iloc[-1]) |
|
|
except Exception as obv_error: |
|
|
pass |
|
|
|
|
|
if len(dataframe) >= 14: |
|
|
try: |
|
|
money_flow_index = ta.mfi(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'], length=14) |
|
|
if money_flow_index is not None and not money_flow_index.empty and not pd.isna(money_flow_index.iloc[-1]): |
|
|
volume['mfi'] = float(money_flow_index.iloc[-1]) |
|
|
except Exception as mfi_error: |
|
|
pass |
|
|
|
|
|
if len(dataframe) >= 20: |
|
|
try: |
|
|
volume_avg_20 = float(dataframe['volume'].tail(20).mean()) |
|
|
current_volume = float(dataframe['volume'].iloc[-1]) if not dataframe['volume'].empty else 0 |
|
|
if volume_avg_20 and volume_avg_20 > 0 and current_volume > 0: |
|
|
volume_ratio = current_volume / volume_avg_20 |
|
|
if not np.isnan(volume_ratio): |
|
|
volume['volume_ratio'] = volume_ratio |
|
|
except Exception as volume_error: |
|
|
pass |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ خطأ في حساب مؤشرات الحجم: {e}") |
|
|
|
|
|
return {key: value for key, value in volume.items() if value is not None and not np.isnan(value)} |
|
|
|
|
|
def _calculate_cycle_indicators(self, dataframe): |
|
|
"""حساب مؤشرات الدورة""" |
|
|
cycle = {} |
|
|
|
|
|
try: |
|
|
if dataframe is None or dataframe.empty or 'close' not in dataframe.columns: |
|
|
return {} |
|
|
|
|
|
if len(dataframe) >= 9: |
|
|
hull_moving_average = ta.hma(dataframe['close'], length=9) |
|
|
if hull_moving_average is not None and not hull_moving_average.empty and not pd.isna(hull_moving_average.iloc[-1]): |
|
|
cycle['hull_ma'] = float(hull_moving_average.iloc[-1]) |
|
|
|
|
|
if len(dataframe) >= 10: |
|
|
supertrend = ta.supertrend(dataframe['high'], dataframe['low'], dataframe['close'], length=10, multiplier=3) |
|
|
if supertrend is not None and not supertrend.empty: |
|
|
supertrend_value = supertrend.get('SUPERT_10_3.0') |
|
|
if supertrend_value is not None and not supertrend_value.empty and not pd.isna(supertrend_value.iloc[-1]): |
|
|
cycle['supertrend'] = float(supertrend_value.iloc[-1]) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ خطأ في حساب مؤشرات الدورة: {e}") |
|
|
|
|
|
return {key: value for key, value in cycle.items() if value is not None and not np.isnan(value)} |
|
|
|
|
|
print("✅ ML Module: Technical Indicators loaded (V5.3 - VWAP DatetimeIndex Fix)") |