Spaces:
Running
Running
| # ml_engine/titan_engine.py | |
| # (V1.0 - Titan Inference Engine) | |
| import os | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| import pandas_ta as ta | |
| import xgboost as xgb | |
| import json | |
| class TitanEngine: | |
| def __init__(self, model_dir="ml_models/layer2"): | |
| self.model_path = os.path.join(model_dir, "Titan_XGB_V1.json") | |
| self.features_path = os.path.join(model_dir, "Titan_Features.pkl") | |
| self.model = None | |
| self.feature_names = None | |
| self.initialized = False | |
| async def initialize(self): | |
| """تحميل النموذج وقائمة الميزات من القرص""" | |
| print(f"🛡️ [Titan] جاري تهيئة المحرك من {self.model_path}...") | |
| try: | |
| if os.path.exists(self.model_path) and os.path.exists(self.features_path): | |
| # تحميل نموذج XGBoost | |
| self.model = xgb.Booster() | |
| self.model.load_model(self.model_path) | |
| # تحميل قائمة الميزات لضمان الترتيب الصحيح | |
| self.feature_names = joblib.load(self.features_path) | |
| self.initialized = True | |
| print(f"✅ [Titan] تم التحميل بنجاح. جاهز بـ {len(self.feature_names)} ميزة.") | |
| else: | |
| print(f"❌ [Titan] ملفات النموذج مفقودة!") | |
| except Exception as e: | |
| print(f"❌ [Titan] خطأ فادح أثناء التهيئة: {e}") | |
| def apply_inverted_pyramid(self, df, tf): | |
| """نفس منطق هندسة الميزات المستخدم في التدريب تماماً""" | |
| df = df.copy().sort_values('timestamp').reset_index(drop=True) | |
| # تعيين الفهرس للسهولة في pandas_ta | |
| df = df.set_index(pd.DatetimeIndex(pd.to_datetime(df['timestamp'], unit='ms'))) | |
| # --- المستوى 1: دقيق (5m, 15m) --- | |
| if tf in ['5m', '15m']: | |
| df['RSI'] = ta.rsi(df['close'], length=14) | |
| df['MACD'] = ta.macd(df['close']).iloc[:, 0] | |
| df['MACD_h'] = ta.macd(df['close']).iloc[:, 1] | |
| df['CCI'] = ta.cci(df['high'], df['low'], df['close'], length=20) | |
| df['ADX'] = ta.adx(df['high'], df['low'], df['close'], length=14).iloc[:, 0] | |
| for p in [9, 21, 50, 200]: | |
| ema = ta.ema(df['close'], length=p) | |
| df[f'EMA_{p}_dist'] = (df['close'] / ema) - 1 | |
| bb = ta.bbands(df['close'], length=20, std=2.0) | |
| df['BB_w'] = (bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1] | |
| df['BB_p'] = (df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0]) | |
| df['MFI'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14) | |
| vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume']) | |
| df['VWAP_dist'] = (df['close'] / vwap) - 1 | |
| # --- المستوى 2: تكتيكي (1h, 4h) --- | |
| elif tf in ['1h', '4h']: | |
| df['RSI'] = ta.rsi(df['close'], length=14) | |
| df['MACD_h'] = ta.macd(df['close']).iloc[:, 1] | |
| df['EMA_50_dist'] = (df['close'] / ta.ema(df['close'], length=50)) - 1 | |
| df['EMA_200_dist'] = (df['close'] / ta.ema(df['close'], length=200)) - 1 | |
| df['ATR_pct'] = ta.atr(df['high'], df['low'], df['close'], length=14) / df['close'] | |
| # --- المستوى 3: استراتيجي (1d) --- | |
| elif tf == '1d': | |
| df['RSI'] = ta.rsi(df['close'], length=14) | |
| df['EMA_200_dist'] = (df['close'] / ta.ema(df['close'], length=200)) - 1 | |
| adx = ta.adx(df['high'], df['low'], df['close']) | |
| if adx is not None and not adx.empty: | |
| df['Trend_Strong'] = np.where(adx.iloc[:, 0] > 25, 1, 0) | |
| else: | |
| df['Trend_Strong'] = 0 | |
| return df.reset_index(drop=True) | |
| def predict(self, ohlcv_data: dict) -> dict: | |
| """ | |
| استقبال البيانات الخام (Dictionary of DataFrames/Lists)، | |
| تجهيزها، ثم استدعاء النموذج للتنبؤ. | |
| """ | |
| if not self.initialized or not self.model: | |
| return {'score': 0.0, 'error': 'Titan not initialized'} | |
| try: | |
| # 1. تجهيز البيانات لكل إطار | |
| processed_tfs = {} | |
| for tf, data in ohlcv_data.items(): | |
| if not data: continue | |
| # تحويل القوائم إلى DataFrame إذا لزم الأمر | |
| if isinstance(data, list): | |
| df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| else: | |
| df = data.copy() | |
| # تطبيق المؤشرات حسب الإطار | |
| df = self.apply_inverted_pyramid(df, tf) | |
| processed_tfs[tf] = df | |
| # 2. الدمج (Alignment) للحصول على آخر لقطة (Latest Snapshot) | |
| if '5m' not in processed_tfs: | |
| return {'score': 0.0, 'error': 'Missing 5m base timeframe'} | |
| # نأخذ آخر صف فقط من الـ 5m كأساس | |
| latest_5m = processed_tfs['5m'].iloc[-1:].copy() | |
| latest_ts = latest_5m['timestamp'].iloc[0] | |
| base_row = latest_5m.add_prefix('5m_').rename(columns={'5m_timestamp': 'timestamp'}) | |
| # دمج باقي الأطر (نأخذ آخر شمعة أغلقت قبل أو مع شمعة الـ 5m الحالية) | |
| for tf, df in processed_tfs.items(): | |
| if tf == '5m': continue | |
| # العثور على الشمعة المناسبة زمنياً | |
| relevant_row = df[df['timestamp'] <= latest_ts].iloc[-1:].copy() | |
| if relevant_row.empty: continue | |
| # تجهيز الأعمدة للدمج | |
| cols = [c for c in relevant_row.columns if c not in ['timestamp','open','high','low','close','volume']] | |
| for col in cols: | |
| base_row[f"{tf}_{col}"] = relevant_row[col].values[0] | |
| # 3. تجهيز شعاع الإدخال (Feature Vector) | |
| # التأكد من وجود كل الميزات المطلوبة بالترتيب الصحيح | |
| input_data = [] | |
| for feat in self.feature_names: | |
| val = base_row.get(feat, np.nan) | |
| # إذا كانت القيمة مصفوفة أو سلسلة بانداز، نأخذ القيمة الأولى | |
| if isinstance(val, (pd.Series, np.ndarray)): | |
| val = val.iloc[0] if len(val) > 0 else np.nan | |
| input_data.append(val) | |
| # 4. التنبؤ | |
| # تحويل إلى DMatrix (تنسيق XGBoost السريع) | |
| dtest = xgb.DMatrix([input_data], feature_names=self.feature_names) | |
| prediction = self.model.predict(dtest)[0] # إرجاع الاحتمالية الأولى | |
| return { | |
| 'score': float(prediction), | |
| 'timestamp': int(latest_ts), | |
| 'status': 'OK' | |
| } | |
| except Exception as e: | |
| # print(f"⚠️ [Titan Error] {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return {'score': 0.0, 'error': str(e)} |