# ml_engine/patterns.py # (V8.3 - إصلاح KeyError: استخدام الاستدعاء الوظيفي المباشر) import pandas as pd import numpy as np import joblib import asyncio import io # (يجب التأكد من أن pandas-ta مثبت في بيئة Hugging Face) try: import pandas_ta as ta except ImportError: print("❌❌ [PatternEngineV8] مكتبة pandas_ta غير موجودة! هذا المحرك سيفشل.") ta = None class ChartPatternAnalyzer: def __init__(self, r2_service=None, model_key="lgbm_pattern_model_combined.pkl", scaler_key="scaler_combined.pkl", window_size=60): # (سنستخدم window_size كحد أدنى للبيانات) """ تهيئة المحرك بتحميل النماذج من R2. """ self.window_size = window_size self.model = None self.scaler = None # (هذه هي الفئات من ملف evaluation_results.txt، مرتبة) self.class_names = ["Bearish Pattern", "Neutral / No Pattern", "Bullish Pattern"] # (IDs: -1, 0, 1) self.r2_service = r2_service self.model_key = model_key self.scaler_key = scaler_key # (هذه هي "الوصفة" الدقيقة من X_test_combined.csv) self.feature_names = [ 'RSI_14', 'MACD_12_26_9', 'MACDh_12_26_9', 'MACDs_12_26_9', 'SMA_20', 'EMA_20', 'BBL_5_2.0_2.0', 'BBM_5_2.0_2.0', 'BBU_5_2.0_2.0', 'BBB_5_2.0_2.0', 'BBP_5_2.0_2.0', 'STOCHk_14_3_3', 'STOCHd_14_3_3', 'STOCHh_14_3_3', 'ADX_14', 'ADXR_14_2', 'DMP_14', 'DMN_14', 'VWAP_D', 'MIDPOINT_14', 'TEMA_20', 'OBV', 'AD', 'ATRr_14', 'DPO_20', 'KVO_34_55_13', 'KVOs_34_55_13', 'CMO_14', 'ROC_10', 'WILLR_14' ] # (إزالة الأعمدة الأساسية التي لا تُستخدم كخصائص) self.base_cols = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] # (خصائص المؤشرات فقط) self.indicator_features = [col for col in self.feature_names if col not in self.base_cols] if not self.r2_service: print("⚠️ [PatternEngineV8] R2Service غير متوفر. يجب التحميل يدوياً.") async def initialize(self): """ يجب استدعاؤها من app.py أو data_manager لتحميل النماذج. """ if self.model and self.scaler: return True # (تم التحميل مسبقاً) if not self.r2_service: print("❌ [PatternEngineV8] لا يمكن التهيئة بدون R2 Service.") return False try: # 1. تحميل النموذج print(f" > [PatternEngineV8] تحميل {self.model_key} من R2...") model_obj = self.r2_service.s3_client.get_object(Bucket=self.r2_service.BUCKET_NAME, Key=self.model_key) # (استخدام io.BytesIO لقراءة الجسم) model_bytes = io.BytesIO(model_obj['Body'].read()) self.model = joblib.load(model_bytes) # 2. تحميل المقياس (Scaler) print(f" > [PatternEngineV8] تحميل {self.scaler_key} من R2...") scaler_obj = self.r2_service.s3_client.get_object(Bucket=self.r2_service.BUCKET_NAME, Key=self.scaler_key) scaler_bytes = io.BytesIO(scaler_obj['Body'].read()) self.scaler = joblib.load(scaler_bytes) print("✅ [PatternEngineV8] تم تحميل النموذج (58%) والمقياس بنجاح.") # (التحقق من الخصائص) if hasattr(self.scaler, 'feature_names_in_'): print(f" > يتوقع المقياس {len(self.scaler.feature_names_in_)} خاصية.") return True except Exception as e: print(f"❌❌ [PatternEngineV8] فشل فادح في تحميل النماذج من R2: {e}") self.model = None self.scaler = None return False # 🔴 --- START OF CHANGE (V8.3) --- 🔴 # (V8.3 - إصلاح KeyError: استخدام الاستدعاء الوظيفي المباشر بدلاً من ملحق .ta) def _extract_features(self, df_window: pd.DataFrame) -> pd.DataFrame: """ (الوصفة V8 - معدلة - V8.3) حساب الـ 30 مؤشراً (وظيفياً) لتجنب أخطاء ملحق .ta """ if not ta: raise ImportError("مكتبة pandas-ta غير مثبتة.") # (إنشاء DF فارغ بنفس الفهرس (Index) الخاص بآخر صف) # (هذا يضمن أننا نأخذ آخر قيمة فقط من حسابات المؤشرات) df = pd.DataFrame(index=df_window.iloc[-1:].index) # (تمرير الأعمدة كسلاسل (Series) مباشرة) c = df_window['close'] h = df_window['high'] l = df_window['low'] v = df_window['volume'] try: # --- حساب المؤشرات وظيفياً --- df['RSI_14'] = ta.rsi(c, length=14) macd_data = ta.macd(c, fast=12, slow=26, signal=9) if macd_data is not None and not macd_data.empty: df['MACD_12_26_9'] = macd_data['MACD_12_26_9'] df['MACDh_12_26_9'] = macd_data['MACDh_12_26_9'] df['MACDs_12_26_9'] = macd_data['MACDs_12_26_9'] df['SMA_20'] = ta.sma(c, length=20) df['EMA_20'] = ta.ema(c, length=20) bb_data = ta.bbands(c, length=5, std=2.0) if bb_data is not None and not bb_data.empty: # (إعادة التسمية لتطابق توقعات النموذج) df['BBL_5_2.0_2.0'] = bb_data['BBL_5_2.0'] df['BBM_5_2.0_2.0'] = bb_data['BBM_5_2.0'] df['BBU_5_2.0_2.0'] = bb_data['BBU_5_2.0'] df['BBB_5_2.0_2.0'] = bb_data['BBB_5_2.0'] df['BBP_5_2.0_2.0'] = bb_data['BBP_5_2.0'] stoch_data = ta.stoch(h, l, c, k=14, d=3, smooth_k=3) if stoch_data is not None and not stoch_data.empty: df['STOCHk_14_3_3'] = stoch_data['STOCHk_14_3_3'] df['STOCHd_14_3_3'] = stoch_data['STOCHd_14_3_3'] df['STOCHh_14_3_3'] = stoch_data['STOCHh_14_3_3'] adx_data = ta.adx(h, l, c, length=14, adxr=2) if adx_data is not None and not adx_data.empty: df['ADX_14'] = adx_data['ADX_14'] df['ADXR_14_2'] = adx_data['ADXR_14_2'] df['DMP_14'] = adx_data['DMP_14'] df['DMN_14'] = adx_data['DMN_14'] # (VWAP يحتاج تمرير البيانات بهذه الطريقة) vwap_series = ta.vwap(h, l, c, v) if vwap_series is not None: df['VWAP_D'] = vwap_series df['MIDPOINT_14'] = ta.midpoint(c, length=14) df['TEMA_20'] = ta.tema(c, length=20) df['OBV'] = ta.obv(c, v) df['AD'] = ta.ad(h, l, c, v) df['ATRr_14'] = ta.atr(h, l, c, percent=True, length=14) df['DPO_20'] = ta.dpo(c, length=20) kvo_data = ta.kvo(h, l, c, v, fast=34, slow=55, signal=13) if kvo_data is not None and not kvo_data.empty: df['KVO_34_55_13'] = kvo_data['KVO_34_55_13'] df['KVOs_34_55_13'] = kvo_data['KVOs_34_55_13'] df['CMO_14'] = ta.cmo(c, length=14) df['ROC_10'] = ta.roc(c, length=10) df['WILLR_14'] = ta.willr(h, l, c, length=14) except Exception as e: print(f"❌ [PatternEngineV8.3] خطأ أثناء حساب المؤشرات وظيفياً: {e}") # (سنستمر، والصفوف المفقودة سيتم ملؤها بـ 0) pass # --- (نهاية حساب المؤشرات) --- # (نأخذ الصف الأخير فقط، لأن المؤشرات السابقة حسبت كل شيء) last_features = df.iloc[-1:].copy() # (إصلاح FutureWarning) last_features.ffill(inplace=True) last_features.fillna(0, inplace=True) # (التأكد من أننا نمرر فقط الخصائص الـ 30 التي يتوقعها النموذج، وبالترتيب) final_features = pd.DataFrame(columns=self.indicator_features) for col in self.indicator_features: if col in last_features: final_features[col] = last_features[col].values else: # (إذا فشل حساب المؤشر، نضع 0) final_features[col] = 0 return final_features # 🔴 --- END OF CHANGE (V8.3) --- 🔴 async def detect_chart_patterns(self, ohlcv_data: dict) -> dict: """ (الدالة الرئيسية التي يستدعيها النظام) تستخدم نموذج 58% للتنبؤ بالنمط. """ best_match = { 'pattern_detected': 'no_clear_pattern', 'pattern_confidence': 0, 'predicted_direction': 'neutral', 'timeframe': None, 'details': {} } if not self.model or not self.scaler: if not hasattr(self, '_init_warned'): print("⚠️ [PatternEngineV8] النموذج/المقياس غير محمل. يجب استدعاء .initialize() أولاً.") self._init_warned = True return best_match all_results = [] for timeframe, candles in ohlcv_data.items(): # (نحتاج إلى بيانات كافية لحساب المؤشرات، 200 شمعة هي الأفضل) if len(candles) >= max(self.window_size, 200): try: window_candles = candles[-200:] df_window = pd.DataFrame(window_candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) # (V8.3) ملاحظة: لا نضع 'timestamp' كفهرس هنا # df_window['timestamp'] = pd.to_datetime(df_window['timestamp'], unit='ms') # df_window.set_index('timestamp', inplace=True) # 1. استخراج الخصائص (الوصفة V8.3 اليدوية) features_df = self._extract_features(df_window) if features_df is None or features_df.empty: continue # 2. تطبيع الخصائص (Scaler) # (التأكد من مطابقة الأسماء التي يتوقعها المقياس) features_df_ordered = features_df[self.scaler.feature_names_in_] features_scaled = self.scaler.transform(features_df_ordered) # 3. التنبؤ بالاحتماليات (Probabilities) probabilities = self.model.predict_proba(features_scaled)[0] best_class_index = np.argmax(probabilities) confidence = probabilities[best_class_index] pattern_name = self.class_names[best_class_index] if pattern_name != "Neutral / No Pattern" and confidence > 0.5: all_results.append({ 'pattern': pattern_name, 'confidence': float(confidence), 'timeframe': timeframe }) except Exception as e: print(f"❌ [PatternEngineV8.3] فشل التنبؤ لـ {timeframe}: {e}") # 4. اختيار أفضل نمط من *جميع* الأطر الزمنية if all_results: best_result = max(all_results, key=lambda x: x['confidence']) direction = 'neutral' if "Bullish" in best_result['pattern']: direction = 'up' elif "Bearish" in best_result['pattern']: direction = 'down' best_match['pattern_detected'] = best_result['pattern'] best_match['pattern_confidence'] = best_result['confidence'] best_match['timeframe'] = best_result['timeframe'] best_match['predicted_direction'] = direction best_match['details'] = {'ml_confidence': best_result['confidence']} return best_match print("✅ ML Module: Pattern Engine V8.3 (Direct Functional Calls) loaded")