# ml_engine/patterns.py import pandas as pd import numpy as np class ChartPatternAnalyzer: def __init__(self): self.pattern_cache = {} async def detect_chart_patterns(self, ohlcv_data): """اكتشاف الأنماط البيانية لجميع الأطر الزمنية""" patterns = { 'pattern_detected': 'no_clear_pattern', 'pattern_confidence': 0, 'predicted_direction': 'neutral', 'timeframe_analysis': {}, 'all_patterns': [] } try: for timeframe, candles in ohlcv_data.items(): if candles and len(candles) >= 20: dataframe = self._create_dataframe(candles) timeframe_pattern = await self._analyze_timeframe_patterns(dataframe, timeframe) patterns['timeframe_analysis'][timeframe] = timeframe_pattern patterns['all_patterns'].append(timeframe_pattern) if timeframe_pattern['confidence'] > patterns['pattern_confidence']: patterns.update({ 'pattern_detected': timeframe_pattern['pattern'], 'pattern_confidence': timeframe_pattern['confidence'], 'predicted_direction': timeframe_pattern['direction'] }) return patterns except Exception as e: print(f"❌ خطأ في اكتشاف الأنماط: {e}") return patterns def _create_dataframe(self, candles): """إنشاء DataFrame من بيانات الشموع""" try: df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float) return df except Exception as e: print(f"❌ خطأ في إنشاء DataFrame: {e}") return pd.DataFrame() async def _analyze_timeframe_patterns(self, dataframe, timeframe): """تحليل الأنماط لإطار زمني محدد""" pattern_info = { 'pattern': 'no_clear_pattern', 'confidence': 0, 'direction': 'neutral', 'timeframe': timeframe, 'details': {} } try: if dataframe is None or dataframe.empty or len(dataframe) < 20: return pattern_info closes = dataframe['close'].values highs = dataframe['high'].values lows = dataframe['low'].values current_price = closes[-1] patterns_detected = [] double_pattern = self._detect_double_pattern(highs, lows, closes) if double_pattern['detected']: patterns_detected.append(double_pattern) breakout_pattern = self._detect_breakout_pattern(highs, lows, closes) if breakout_pattern['detected']: patterns_detected.append(breakout_pattern) trend_pattern = self._detect_trend_pattern(dataframe) if trend_pattern['detected']: patterns_detected.append(trend_pattern) support_resistance_pattern = self._detect_support_resistance(highs, lows, closes) if support_resistance_pattern['detected']: patterns_detected.append(support_resistance_pattern) if patterns_detected: best_pattern = max(patterns_detected, key=lambda x: x['confidence']) pattern_info.update({ 'pattern': best_pattern['pattern'], 'confidence': best_pattern['confidence'], 'direction': best_pattern.get('direction', 'neutral'), 'details': best_pattern.get('details', {}) }) return pattern_info except Exception as e: print(f"❌ خطأ في تحليل الأنماط للإطار {timeframe}: {e}") return pattern_info def _detect_double_pattern(self, highs, lows, closes): """كشف نمط القمة المزدوجة أو القاع المزدوج""" try: if len(highs) < 15: return {'detected': False} recent_highs = highs[-15:] recent_lows = lows[-15:] high_indices = np.argsort(recent_highs)[-2:] high_indices.sort() low_indices = np.argsort(recent_lows)[:2] low_indices.sort() double_top = False double_bottom = False if len(high_indices) == 2: high1 = recent_highs[high_indices[0]] high2 = recent_highs[high_indices[1]] time_diff = high_indices[1] - high_indices[0] if (abs(high1 - high2) / high1 < 0.02 and time_diff >= 3 and time_diff <= 10 and closes[-1] < min(high1, high2)): double_top = True if len(low_indices) == 2: low1 = recent_lows[low_indices[0]] low2 = recent_lows[low_indices[1]] time_diff = low_indices[1] - low_indices[0] if (abs(low1 - low2) / low1 < 0.02 and time_diff >= 3 and time_diff <= 10 and closes[-1] > max(low1, low2)): double_bottom = True if double_top: return { 'detected': True, 'pattern': 'Double Top', 'confidence': 0.75, 'direction': 'down', 'details': { 'resistance_level': np.mean([high1, high2]), 'breakdown_level': min(lows[-5:]) } } elif double_bottom: return { 'detected': True, 'pattern': 'Double Bottom', 'confidence': 0.75, 'direction': 'up', 'details': { 'support_level': np.mean([low1, low2]), 'breakout_level': max(highs[-5:]) } } return {'detected': False} except Exception as e: return {'detected': False} def _detect_breakout_pattern(self, highs, lows, closes): """كشف نمط الاختراق""" try: if len(highs) < 25: return {'detected': False} current_price = closes[-1] resistance = np.max(highs[-25:-5]) support = np.min(lows[-25:-5]) if current_price > resistance * 1.01: return { 'detected': True, 'pattern': 'Breakout Up', 'confidence': 0.8, 'direction': 'up', 'details': { 'breakout_level': resistance, 'target_level': resistance * 1.05 } } elif current_price < support * 0.99: return { 'detected': True, 'pattern': 'Breakout Down', 'confidence': 0.8, 'direction': 'down', 'details': { 'breakdown_level': support, 'target_level': support * 0.95 } } return {'detected': False} except Exception as e: return {'detected': False} def _detect_trend_pattern(self, dataframe): """كشف نمط الاتجاه""" try: if dataframe is None or dataframe.empty or len(dataframe) < 20: return {'detected': False} closes = dataframe['close'].values ma_short = np.mean(closes[-5:]) ma_medium = np.mean(closes[-13:]) ma_long = np.mean(closes[-21:]) if ma_short > ma_medium > ma_long and closes[-1] > ma_short: trend_strength = (ma_short - ma_long) / ma_long confidence = min(0.3 + trend_strength * 10, 0.8) return { 'detected': True, 'pattern': 'Uptrend', 'confidence': confidence, 'direction': 'up', 'details': { 'trend_strength': trend_strength, 'support_level': ma_medium } } elif ma_short < ma_medium < ma_long and closes[-1] < ma_short: trend_strength = (ma_long - ma_short) / ma_long confidence = min(0.3 + trend_strength * 10, 0.8) return { 'detected': True, 'pattern': 'Downtrend', 'confidence': confidence, 'direction': 'down', 'details': { 'trend_strength': trend_strength, 'resistance_level': ma_medium } } return {'detected': False} except Exception as e: return {'detected': False} def _detect_support_resistance(self, highs, lows, closes): """كشف مستويات الدعم والمقاومة""" try: if len(highs) < 20: return {'detected': False} current_price = closes[-1] resistance_level = np.max(highs[-20:]) support_level = np.min(lows[-20:]) position = (current_price - support_level) / (resistance_level - support_level) if position < 0.2: return { 'detected': True, 'pattern': 'Near Support', 'confidence': 0.6, 'direction': 'up', 'details': { 'support_level': support_level, 'resistance_level': resistance_level, 'position': position } } elif position > 0.8: return { 'detected': True, 'pattern': 'Near Resistance', 'confidence': 0.6, 'direction': 'down', 'details': { 'support_level': support_level, 'resistance_level': resistance_level, 'position': position } } return {'detected': False} except Exception as e: return {'detected': False} print("✅ ML Module: Chart Pattern Analyzer loaded")