|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
|
|
|
class ChartPatternAnalyzer: |
|
|
def __init__(self): |
|
|
self.pattern_cache = {} |
|
|
|
|
|
async def detect_chart_patterns(self, ohlcv_data): |
|
|
"""اكتشاف الأنماط البيانية لجميع الأطر الزمنية""" |
|
|
patterns = { |
|
|
'pattern_detected': 'no_clear_pattern', |
|
|
'pattern_confidence': 0, |
|
|
'predicted_direction': 'neutral', |
|
|
'timeframe_analysis': {}, |
|
|
'all_patterns': [] |
|
|
} |
|
|
|
|
|
try: |
|
|
for timeframe, candles in ohlcv_data.items(): |
|
|
if candles and len(candles) >= 20: |
|
|
dataframe = self._create_dataframe(candles) |
|
|
timeframe_pattern = await self._analyze_timeframe_patterns(dataframe, timeframe) |
|
|
patterns['timeframe_analysis'][timeframe] = timeframe_pattern |
|
|
patterns['all_patterns'].append(timeframe_pattern) |
|
|
|
|
|
if timeframe_pattern['confidence'] > patterns['pattern_confidence']: |
|
|
patterns.update({ |
|
|
'pattern_detected': timeframe_pattern['pattern'], |
|
|
'pattern_confidence': timeframe_pattern['confidence'], |
|
|
'predicted_direction': timeframe_pattern['direction'] |
|
|
}) |
|
|
|
|
|
return patterns |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ خطأ في اكتشاف الأنماط: {e}") |
|
|
return patterns |
|
|
|
|
|
def _create_dataframe(self, candles): |
|
|
"""إنشاء DataFrame من بيانات الشموع""" |
|
|
try: |
|
|
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) |
|
|
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float) |
|
|
return df |
|
|
except Exception as e: |
|
|
print(f"❌ خطأ في إنشاء DataFrame: {e}") |
|
|
return pd.DataFrame() |
|
|
|
|
|
async def _analyze_timeframe_patterns(self, dataframe, timeframe): |
|
|
"""تحليل الأنماط لإطار زمني محدد""" |
|
|
pattern_info = { |
|
|
'pattern': 'no_clear_pattern', |
|
|
'confidence': 0, |
|
|
'direction': 'neutral', |
|
|
'timeframe': timeframe, |
|
|
'details': {} |
|
|
} |
|
|
|
|
|
try: |
|
|
if dataframe is None or dataframe.empty or len(dataframe) < 20: |
|
|
return pattern_info |
|
|
|
|
|
closes = dataframe['close'].values |
|
|
highs = dataframe['high'].values |
|
|
lows = dataframe['low'].values |
|
|
current_price = closes[-1] |
|
|
|
|
|
patterns_detected = [] |
|
|
|
|
|
double_pattern = self._detect_double_pattern(highs, lows, closes) |
|
|
if double_pattern['detected']: |
|
|
patterns_detected.append(double_pattern) |
|
|
|
|
|
breakout_pattern = self._detect_breakout_pattern(highs, lows, closes) |
|
|
if breakout_pattern['detected']: |
|
|
patterns_detected.append(breakout_pattern) |
|
|
|
|
|
trend_pattern = self._detect_trend_pattern(dataframe) |
|
|
if trend_pattern['detected']: |
|
|
patterns_detected.append(trend_pattern) |
|
|
|
|
|
support_resistance_pattern = self._detect_support_resistance(highs, lows, closes) |
|
|
if support_resistance_pattern['detected']: |
|
|
patterns_detected.append(support_resistance_pattern) |
|
|
|
|
|
if patterns_detected: |
|
|
best_pattern = max(patterns_detected, key=lambda x: x['confidence']) |
|
|
pattern_info.update({ |
|
|
'pattern': best_pattern['pattern'], |
|
|
'confidence': best_pattern['confidence'], |
|
|
'direction': best_pattern.get('direction', 'neutral'), |
|
|
'details': best_pattern.get('details', {}) |
|
|
}) |
|
|
|
|
|
return pattern_info |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ خطأ في تحليل الأنماط للإطار {timeframe}: {e}") |
|
|
return pattern_info |
|
|
|
|
|
def _detect_double_pattern(self, highs, lows, closes): |
|
|
"""كشف نمط القمة المزدوجة أو القاع المزدوج""" |
|
|
try: |
|
|
if len(highs) < 15: |
|
|
return {'detected': False} |
|
|
|
|
|
recent_highs = highs[-15:] |
|
|
recent_lows = lows[-15:] |
|
|
|
|
|
high_indices = np.argsort(recent_highs)[-2:] |
|
|
high_indices.sort() |
|
|
|
|
|
low_indices = np.argsort(recent_lows)[:2] |
|
|
low_indices.sort() |
|
|
|
|
|
double_top = False |
|
|
double_bottom = False |
|
|
|
|
|
if len(high_indices) == 2: |
|
|
high1 = recent_highs[high_indices[0]] |
|
|
high2 = recent_highs[high_indices[1]] |
|
|
time_diff = high_indices[1] - high_indices[0] |
|
|
|
|
|
if (abs(high1 - high2) / high1 < 0.02 and |
|
|
time_diff >= 3 and time_diff <= 10 and |
|
|
closes[-1] < min(high1, high2)): |
|
|
double_top = True |
|
|
|
|
|
if len(low_indices) == 2: |
|
|
low1 = recent_lows[low_indices[0]] |
|
|
low2 = recent_lows[low_indices[1]] |
|
|
time_diff = low_indices[1] - low_indices[0] |
|
|
|
|
|
if (abs(low1 - low2) / low1 < 0.02 and |
|
|
time_diff >= 3 and time_diff <= 10 and |
|
|
closes[-1] > max(low1, low2)): |
|
|
double_bottom = True |
|
|
|
|
|
if double_top: |
|
|
return { |
|
|
'detected': True, |
|
|
'pattern': 'Double Top', |
|
|
'confidence': 0.75, |
|
|
'direction': 'down', |
|
|
'details': { |
|
|
'resistance_level': np.mean([high1, high2]), |
|
|
'breakdown_level': min(lows[-5:]) |
|
|
} |
|
|
} |
|
|
elif double_bottom: |
|
|
return { |
|
|
'detected': True, |
|
|
'pattern': 'Double Bottom', |
|
|
'confidence': 0.75, |
|
|
'direction': 'up', |
|
|
'details': { |
|
|
'support_level': np.mean([low1, low2]), |
|
|
'breakout_level': max(highs[-5:]) |
|
|
} |
|
|
} |
|
|
|
|
|
return {'detected': False} |
|
|
|
|
|
except Exception as e: |
|
|
return {'detected': False} |
|
|
|
|
|
def _detect_breakout_pattern(self, highs, lows, closes): |
|
|
"""كشف نمط الاختراق""" |
|
|
try: |
|
|
if len(highs) < 25: |
|
|
return {'detected': False} |
|
|
|
|
|
current_price = closes[-1] |
|
|
|
|
|
resistance = np.max(highs[-25:-5]) |
|
|
support = np.min(lows[-25:-5]) |
|
|
|
|
|
if current_price > resistance * 1.01: |
|
|
return { |
|
|
'detected': True, |
|
|
'pattern': 'Breakout Up', |
|
|
'confidence': 0.8, |
|
|
'direction': 'up', |
|
|
'details': { |
|
|
'breakout_level': resistance, |
|
|
'target_level': resistance * 1.05 |
|
|
} |
|
|
} |
|
|
elif current_price < support * 0.99: |
|
|
return { |
|
|
'detected': True, |
|
|
'pattern': 'Breakout Down', |
|
|
'confidence': 0.8, |
|
|
'direction': 'down', |
|
|
'details': { |
|
|
'breakdown_level': support, |
|
|
'target_level': support * 0.95 |
|
|
} |
|
|
} |
|
|
|
|
|
return {'detected': False} |
|
|
|
|
|
except Exception as e: |
|
|
return {'detected': False} |
|
|
|
|
|
def _detect_trend_pattern(self, dataframe): |
|
|
"""كشف نمط الاتجاه""" |
|
|
try: |
|
|
if dataframe is None or dataframe.empty or len(dataframe) < 20: |
|
|
return {'detected': False} |
|
|
|
|
|
closes = dataframe['close'].values |
|
|
|
|
|
ma_short = np.mean(closes[-5:]) |
|
|
ma_medium = np.mean(closes[-13:]) |
|
|
ma_long = np.mean(closes[-21:]) |
|
|
|
|
|
if ma_short > ma_medium > ma_long and closes[-1] > ma_short: |
|
|
trend_strength = (ma_short - ma_long) / ma_long |
|
|
confidence = min(0.3 + trend_strength * 10, 0.8) |
|
|
return { |
|
|
'detected': True, |
|
|
'pattern': 'Uptrend', |
|
|
'confidence': confidence, |
|
|
'direction': 'up', |
|
|
'details': { |
|
|
'trend_strength': trend_strength, |
|
|
'support_level': ma_medium |
|
|
} |
|
|
} |
|
|
elif ma_short < ma_medium < ma_long and closes[-1] < ma_short: |
|
|
trend_strength = (ma_long - ma_short) / ma_long |
|
|
confidence = min(0.3 + trend_strength * 10, 0.8) |
|
|
return { |
|
|
'detected': True, |
|
|
'pattern': 'Downtrend', |
|
|
'confidence': confidence, |
|
|
'direction': 'down', |
|
|
'details': { |
|
|
'trend_strength': trend_strength, |
|
|
'resistance_level': ma_medium |
|
|
} |
|
|
} |
|
|
|
|
|
return {'detected': False} |
|
|
|
|
|
except Exception as e: |
|
|
return {'detected': False} |
|
|
|
|
|
def _detect_support_resistance(self, highs, lows, closes): |
|
|
"""كشف مستويات الدعم والمقاومة""" |
|
|
try: |
|
|
if len(highs) < 20: |
|
|
return {'detected': False} |
|
|
|
|
|
current_price = closes[-1] |
|
|
|
|
|
resistance_level = np.max(highs[-20:]) |
|
|
support_level = np.min(lows[-20:]) |
|
|
|
|
|
position = (current_price - support_level) / (resistance_level - support_level) |
|
|
|
|
|
if position < 0.2: |
|
|
return { |
|
|
'detected': True, |
|
|
'pattern': 'Near Support', |
|
|
'confidence': 0.6, |
|
|
'direction': 'up', |
|
|
'details': { |
|
|
'support_level': support_level, |
|
|
'resistance_level': resistance_level, |
|
|
'position': position |
|
|
} |
|
|
} |
|
|
elif position > 0.8: |
|
|
return { |
|
|
'detected': True, |
|
|
'pattern': 'Near Resistance', |
|
|
'confidence': 0.6, |
|
|
'direction': 'down', |
|
|
'details': { |
|
|
'support_level': support_level, |
|
|
'resistance_level': resistance_level, |
|
|
'position': position |
|
|
} |
|
|
} |
|
|
|
|
|
return {'detected': False} |
|
|
|
|
|
except Exception as e: |
|
|
return {'detected': False} |
|
|
|
|
|
print("✅ ML Module: Chart Pattern Analyzer loaded") |