# ml_engine/titan_engine.py # (V1.0 - Titan Inference Engine) import os import joblib import numpy as np import pandas as pd import pandas_ta as ta import xgboost as xgb import json class TitanEngine: def __init__(self, model_dir="ml_models/layer2"): self.model_path = os.path.join(model_dir, "Titan_XGB_V1.json") self.features_path = os.path.join(model_dir, "Titan_Features.pkl") self.model = None self.feature_names = None self.initialized = False async def initialize(self): """تحميل النموذج وقائمة الميزات من القرص""" print(f"🛡️ [Titan] جاري تهيئة المحرك من {self.model_path}...") try: if os.path.exists(self.model_path) and os.path.exists(self.features_path): # تحميل نموذج XGBoost self.model = xgb.Booster() self.model.load_model(self.model_path) # تحميل قائمة الميزات لضمان الترتيب الصحيح self.feature_names = joblib.load(self.features_path) self.initialized = True print(f"✅ [Titan] تم التحميل بنجاح. جاهز بـ {len(self.feature_names)} ميزة.") else: print(f"❌ [Titan] ملفات النموذج مفقودة!") except Exception as e: print(f"❌ [Titan] خطأ فادح أثناء التهيئة: {e}") def apply_inverted_pyramid(self, df, tf): """نفس منطق هندسة الميزات المستخدم في التدريب تماماً""" df = df.copy().sort_values('timestamp').reset_index(drop=True) # تعيين الفهرس للسهولة في pandas_ta df = df.set_index(pd.DatetimeIndex(pd.to_datetime(df['timestamp'], unit='ms'))) # --- المستوى 1: دقيق (5m, 15m) --- if tf in ['5m', '15m']: df['RSI'] = ta.rsi(df['close'], length=14) df['MACD'] = ta.macd(df['close']).iloc[:, 0] df['MACD_h'] = ta.macd(df['close']).iloc[:, 1] df['CCI'] = ta.cci(df['high'], df['low'], df['close'], length=20) df['ADX'] = ta.adx(df['high'], df['low'], df['close'], length=14).iloc[:, 0] for p in [9, 21, 50, 200]: ema = ta.ema(df['close'], length=p) df[f'EMA_{p}_dist'] = (df['close'] / ema) - 1 bb = ta.bbands(df['close'], length=20, std=2.0) df['BB_w'] = (bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1] df['BB_p'] = (df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0]) df['MFI'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14) vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume']) df['VWAP_dist'] = (df['close'] / vwap) - 1 # --- المستوى 2: تكتيكي (1h, 4h) --- elif tf in ['1h', '4h']: df['RSI'] = ta.rsi(df['close'], length=14) df['MACD_h'] = ta.macd(df['close']).iloc[:, 1] df['EMA_50_dist'] = (df['close'] / ta.ema(df['close'], length=50)) - 1 df['EMA_200_dist'] = (df['close'] / ta.ema(df['close'], length=200)) - 1 df['ATR_pct'] = ta.atr(df['high'], df['low'], df['close'], length=14) / df['close'] # --- المستوى 3: استراتيجي (1d) --- elif tf == '1d': df['RSI'] = ta.rsi(df['close'], length=14) df['EMA_200_dist'] = (df['close'] / ta.ema(df['close'], length=200)) - 1 adx = ta.adx(df['high'], df['low'], df['close']) if adx is not None and not adx.empty: df['Trend_Strong'] = np.where(adx.iloc[:, 0] > 25, 1, 0) else: df['Trend_Strong'] = 0 return df.reset_index(drop=True) def predict(self, ohlcv_data: dict) -> dict: """ استقبال البيانات الخام (Dictionary of DataFrames/Lists)، تجهيزها، ثم استدعاء النموذج للتنبؤ. """ if not self.initialized or not self.model: return {'score': 0.0, 'error': 'Titan not initialized'} try: # 1. تجهيز البيانات لكل إطار processed_tfs = {} for tf, data in ohlcv_data.items(): if not data: continue # تحويل القوائم إلى DataFrame إذا لزم الأمر if isinstance(data, list): df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) else: df = data.copy() # تطبيق المؤشرات حسب الإطار df = self.apply_inverted_pyramid(df, tf) processed_tfs[tf] = df # 2. الدمج (Alignment) للحصول على آخر لقطة (Latest Snapshot) if '5m' not in processed_tfs: return {'score': 0.0, 'error': 'Missing 5m base timeframe'} # نأخذ آخر صف فقط من الـ 5m كأساس latest_5m = processed_tfs['5m'].iloc[-1:].copy() latest_ts = latest_5m['timestamp'].iloc[0] base_row = latest_5m.add_prefix('5m_').rename(columns={'5m_timestamp': 'timestamp'}) # دمج باقي الأطر (نأخذ آخر شمعة أغلقت قبل أو مع شمعة الـ 5m الحالية) for tf, df in processed_tfs.items(): if tf == '5m': continue # العثور على الشمعة المناسبة زمنياً relevant_row = df[df['timestamp'] <= latest_ts].iloc[-1:].copy() if relevant_row.empty: continue # تجهيز الأعمدة للدمج cols = [c for c in relevant_row.columns if c not in ['timestamp','open','high','low','close','volume']] for col in cols: base_row[f"{tf}_{col}"] = relevant_row[col].values[0] # 3. تجهيز شعاع الإدخال (Feature Vector) # التأكد من وجود كل الميزات المطلوبة بالترتيب الصحيح input_data = [] for feat in self.feature_names: val = base_row.get(feat, np.nan) # إذا كانت القيمة مصفوفة أو سلسلة بانداز، نأخذ القيمة الأولى if isinstance(val, (pd.Series, np.ndarray)): val = val.iloc[0] if len(val) > 0 else np.nan input_data.append(val) # 4. التنبؤ # تحويل إلى DMatrix (تنسيق XGBoost السريع) dtest = xgb.DMatrix([input_data], feature_names=self.feature_names) prediction = self.model.predict(dtest)[0] # إرجاع الاحتمالية الأولى return { 'score': float(prediction), 'timestamp': int(latest_ts), 'status': 'OK' } except Exception as e: # print(f"⚠️ [Titan Error] {e}") import traceback traceback.print_exc() return {'score': 0.0, 'error': str(e)}