Trad / ml_engine /patterns.py
Riy777's picture
Update ml_engine/patterns.py
bf478e8
raw
history blame
12.9 kB
# ml_engine/patterns.py
# (V8.3 - إصلاح KeyError: استخدام الاستدعاء الوظيفي المباشر)
import pandas as pd
import numpy as np
import joblib
import asyncio
import io
# (يجب التأكد من أن pandas-ta مثبت في بيئة Hugging Face)
try:
import pandas_ta as ta
except ImportError:
print("❌❌ [PatternEngineV8] مكتبة pandas_ta غير موجودة! هذا المحرك سيفشل.")
ta = None
class ChartPatternAnalyzer:
def __init__(self, r2_service=None,
model_key="lgbm_pattern_model_combined.pkl",
scaler_key="scaler_combined.pkl",
window_size=60): # (سنستخدم window_size كحد أدنى للبيانات)
"""
تهيئة المحرك بتحميل النماذج من R2.
"""
self.window_size = window_size
self.model = None
self.scaler = None
# (هذه هي الفئات من ملف evaluation_results.txt، مرتبة)
self.class_names = ["Bearish Pattern", "Neutral / No Pattern", "Bullish Pattern"] # (IDs: -1, 0, 1)
self.r2_service = r2_service
self.model_key = model_key
self.scaler_key = scaler_key
# (هذه هي "الوصفة" الدقيقة من X_test_combined.csv)
self.feature_names = [
'RSI_14', 'MACD_12_26_9', 'MACDh_12_26_9', 'MACDs_12_26_9', 'SMA_20',
'EMA_20', 'BBL_5_2.0_2.0', 'BBM_5_2.0_2.0', 'BBU_5_2.0_2.0', 'BBB_5_2.0_2.0',
'BBP_5_2.0_2.0', 'STOCHk_14_3_3', 'STOCHd_14_3_3', 'STOCHh_14_3_3',
'ADX_14', 'ADXR_14_2', 'DMP_14', 'DMN_14', 'VWAP_D', 'MIDPOINT_14',
'TEMA_20', 'OBV', 'AD', 'ATRr_14', 'DPO_20', 'KVO_34_55_13',
'KVOs_34_55_13', 'CMO_14', 'ROC_10', 'WILLR_14'
]
# (إزالة الأعمدة الأساسية التي لا تُستخدم كخصائص)
self.base_cols = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
# (خصائص المؤشرات فقط)
self.indicator_features = [col for col in self.feature_names if col not in self.base_cols]
if not self.r2_service:
print("⚠️ [PatternEngineV8] R2Service غير متوفر. يجب التحميل يدوياً.")
async def initialize(self):
"""
يجب استدعاؤها من app.py أو data_manager لتحميل النماذج.
"""
if self.model and self.scaler:
return True # (تم التحميل مسبقاً)
if not self.r2_service:
print("❌ [PatternEngineV8] لا يمكن التهيئة بدون R2 Service.")
return False
try:
# 1. تحميل النموذج
print(f" > [PatternEngineV8] تحميل {self.model_key} من R2...")
model_obj = self.r2_service.s3_client.get_object(Bucket=self.r2_service.BUCKET_NAME, Key=self.model_key)
# (استخدام io.BytesIO لقراءة الجسم)
model_bytes = io.BytesIO(model_obj['Body'].read())
self.model = joblib.load(model_bytes)
# 2. تحميل المقياس (Scaler)
print(f" > [PatternEngineV8] تحميل {self.scaler_key} من R2...")
scaler_obj = self.r2_service.s3_client.get_object(Bucket=self.r2_service.BUCKET_NAME, Key=self.scaler_key)
scaler_bytes = io.BytesIO(scaler_obj['Body'].read())
self.scaler = joblib.load(scaler_bytes)
print("✅ [PatternEngineV8] تم تحميل النموذج (58%) والمقياس بنجاح.")
# (التحقق من الخصائص)
if hasattr(self.scaler, 'feature_names_in_'):
print(f" > يتوقع المقياس {len(self.scaler.feature_names_in_)} خاصية.")
return True
except Exception as e:
print(f"❌❌ [PatternEngineV8] فشل فادح في تحميل النماذج من R2: {e}")
self.model = None
self.scaler = None
return False
# 🔴 --- START OF CHANGE (V8.3) --- 🔴
# (V8.3 - إصلاح KeyError: استخدام الاستدعاء الوظيفي المباشر بدلاً من ملحق .ta)
def _extract_features(self, df_window: pd.DataFrame) -> pd.DataFrame:
"""
(الوصفة V8 - معدلة - V8.3)
حساب الـ 30 مؤشراً (وظيفياً) لتجنب أخطاء ملحق .ta
"""
if not ta:
raise ImportError("مكتبة pandas-ta غير مثبتة.")
# (إنشاء DF فارغ بنفس الفهرس (Index) الخاص بآخر صف)
# (هذا يضمن أننا نأخذ آخر قيمة فقط من حسابات المؤشرات)
df = pd.DataFrame(index=df_window.iloc[-1:].index)
# (تمرير الأعمدة كسلاسل (Series) مباشرة)
c = df_window['close']
h = df_window['high']
l = df_window['low']
v = df_window['volume']
try:
# --- حساب المؤشرات وظيفياً ---
df['RSI_14'] = ta.rsi(c, length=14)
macd_data = ta.macd(c, fast=12, slow=26, signal=9)
if macd_data is not None and not macd_data.empty:
df['MACD_12_26_9'] = macd_data['MACD_12_26_9']
df['MACDh_12_26_9'] = macd_data['MACDh_12_26_9']
df['MACDs_12_26_9'] = macd_data['MACDs_12_26_9']
df['SMA_20'] = ta.sma(c, length=20)
df['EMA_20'] = ta.ema(c, length=20)
bb_data = ta.bbands(c, length=5, std=2.0)
if bb_data is not None and not bb_data.empty:
# (إعادة التسمية لتطابق توقعات النموذج)
df['BBL_5_2.0_2.0'] = bb_data['BBL_5_2.0']
df['BBM_5_2.0_2.0'] = bb_data['BBM_5_2.0']
df['BBU_5_2.0_2.0'] = bb_data['BBU_5_2.0']
df['BBB_5_2.0_2.0'] = bb_data['BBB_5_2.0']
df['BBP_5_2.0_2.0'] = bb_data['BBP_5_2.0']
stoch_data = ta.stoch(h, l, c, k=14, d=3, smooth_k=3)
if stoch_data is not None and not stoch_data.empty:
df['STOCHk_14_3_3'] = stoch_data['STOCHk_14_3_3']
df['STOCHd_14_3_3'] = stoch_data['STOCHd_14_3_3']
df['STOCHh_14_3_3'] = stoch_data['STOCHh_14_3_3']
adx_data = ta.adx(h, l, c, length=14, adxr=2)
if adx_data is not None and not adx_data.empty:
df['ADX_14'] = adx_data['ADX_14']
df['ADXR_14_2'] = adx_data['ADXR_14_2']
df['DMP_14'] = adx_data['DMP_14']
df['DMN_14'] = adx_data['DMN_14']
# (VWAP يحتاج تمرير البيانات بهذه الطريقة)
vwap_series = ta.vwap(h, l, c, v)
if vwap_series is not None: df['VWAP_D'] = vwap_series
df['MIDPOINT_14'] = ta.midpoint(c, length=14)
df['TEMA_20'] = ta.tema(c, length=20)
df['OBV'] = ta.obv(c, v)
df['AD'] = ta.ad(h, l, c, v)
df['ATRr_14'] = ta.atr(h, l, c, percent=True, length=14)
df['DPO_20'] = ta.dpo(c, length=20)
kvo_data = ta.kvo(h, l, c, v, fast=34, slow=55, signal=13)
if kvo_data is not None and not kvo_data.empty:
df['KVO_34_55_13'] = kvo_data['KVO_34_55_13']
df['KVOs_34_55_13'] = kvo_data['KVOs_34_55_13']
df['CMO_14'] = ta.cmo(c, length=14)
df['ROC_10'] = ta.roc(c, length=10)
df['WILLR_14'] = ta.willr(h, l, c, length=14)
except Exception as e:
print(f"❌ [PatternEngineV8.3] خطأ أثناء حساب المؤشرات وظيفياً: {e}")
# (سنستمر، والصفوف المفقودة سيتم ملؤها بـ 0)
pass
# --- (نهاية حساب المؤشرات) ---
# (نأخذ الصف الأخير فقط، لأن المؤشرات السابقة حسبت كل شيء)
last_features = df.iloc[-1:].copy()
# (إصلاح FutureWarning)
last_features.ffill(inplace=True)
last_features.fillna(0, inplace=True)
# (التأكد من أننا نمرر فقط الخصائص الـ 30 التي يتوقعها النموذج، وبالترتيب)
final_features = pd.DataFrame(columns=self.indicator_features)
for col in self.indicator_features:
if col in last_features:
final_features[col] = last_features[col].values
else:
# (إذا فشل حساب المؤشر، نضع 0)
final_features[col] = 0
return final_features
# 🔴 --- END OF CHANGE (V8.3) --- 🔴
async def detect_chart_patterns(self, ohlcv_data: dict) -> dict:
"""
(الدالة الرئيسية التي يستدعيها النظام)
تستخدم نموذج 58% للتنبؤ بالنمط.
"""
best_match = {
'pattern_detected': 'no_clear_pattern',
'pattern_confidence': 0,
'predicted_direction': 'neutral',
'timeframe': None,
'details': {}
}
if not self.model or not self.scaler:
if not hasattr(self, '_init_warned'):
print("⚠️ [PatternEngineV8] النموذج/المقياس غير محمل. يجب استدعاء .initialize() أولاً.")
self._init_warned = True
return best_match
all_results = []
for timeframe, candles in ohlcv_data.items():
# (نحتاج إلى بيانات كافية لحساب المؤشرات، 200 شمعة هي الأفضل)
if len(candles) >= max(self.window_size, 200):
try:
window_candles = candles[-200:]
df_window = pd.DataFrame(window_candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# (V8.3) ملاحظة: لا نضع 'timestamp' كفهرس هنا
# df_window['timestamp'] = pd.to_datetime(df_window['timestamp'], unit='ms')
# df_window.set_index('timestamp', inplace=True)
# 1. استخراج الخصائص (الوصفة V8.3 اليدوية)
features_df = self._extract_features(df_window)
if features_df is None or features_df.empty:
continue
# 2. تطبيع الخصائص (Scaler)
# (التأكد من مطابقة الأسماء التي يتوقعها المقياس)
features_df_ordered = features_df[self.scaler.feature_names_in_]
features_scaled = self.scaler.transform(features_df_ordered)
# 3. التنبؤ بالاحتماليات (Probabilities)
probabilities = self.model.predict_proba(features_scaled)[0]
best_class_index = np.argmax(probabilities)
confidence = probabilities[best_class_index]
pattern_name = self.class_names[best_class_index]
if pattern_name != "Neutral / No Pattern" and confidence > 0.5:
all_results.append({
'pattern': pattern_name,
'confidence': float(confidence),
'timeframe': timeframe
})
except Exception as e:
print(f"❌ [PatternEngineV8.3] فشل التنبؤ لـ {timeframe}: {e}")
# 4. اختيار أفضل نمط من *جميع* الأطر الزمنية
if all_results:
best_result = max(all_results, key=lambda x: x['confidence'])
direction = 'neutral'
if "Bullish" in best_result['pattern']: direction = 'up'
elif "Bearish" in best_result['pattern']: direction = 'down'
best_match['pattern_detected'] = best_result['pattern']
best_match['pattern_confidence'] = best_result['confidence']
best_match['timeframe'] = best_result['timeframe']
best_match['predicted_direction'] = direction
best_match['details'] = {'ml_confidence': best_result['confidence']}
return best_match
print("✅ ML Module: Pattern Engine V8.3 (Direct Functional Calls) loaded")