Spaces:
Running
Running
File size: 7,392 Bytes
172cf4d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# ml_engine/titan_engine.py
# (V1.0 - Titan Inference Engine)
import os
import joblib
import numpy as np
import pandas as pd
import pandas_ta as ta
import xgboost as xgb
import json
class TitanEngine:
def __init__(self, model_dir="ml_models/layer2"):
self.model_path = os.path.join(model_dir, "Titan_XGB_V1.json")
self.features_path = os.path.join(model_dir, "Titan_Features.pkl")
self.model = None
self.feature_names = None
self.initialized = False
async def initialize(self):
"""تحميل النموذج وقائمة الميزات من القرص"""
print(f"🛡️ [Titan] جاري تهيئة المحرك من {self.model_path}...")
try:
if os.path.exists(self.model_path) and os.path.exists(self.features_path):
# تحميل نموذج XGBoost
self.model = xgb.Booster()
self.model.load_model(self.model_path)
# تحميل قائمة الميزات لضمان الترتيب الصحيح
self.feature_names = joblib.load(self.features_path)
self.initialized = True
print(f"✅ [Titan] تم التحميل بنجاح. جاهز بـ {len(self.feature_names)} ميزة.")
else:
print(f"❌ [Titan] ملفات النموذج مفقودة!")
except Exception as e:
print(f"❌ [Titan] خطأ فادح أثناء التهيئة: {e}")
def apply_inverted_pyramid(self, df, tf):
"""نفس منطق هندسة الميزات المستخدم في التدريب تماماً"""
df = df.copy().sort_values('timestamp').reset_index(drop=True)
# تعيين الفهرس للسهولة في pandas_ta
df = df.set_index(pd.DatetimeIndex(pd.to_datetime(df['timestamp'], unit='ms')))
# --- المستوى 1: دقيق (5m, 15m) ---
if tf in ['5m', '15m']:
df['RSI'] = ta.rsi(df['close'], length=14)
df['MACD'] = ta.macd(df['close']).iloc[:, 0]
df['MACD_h'] = ta.macd(df['close']).iloc[:, 1]
df['CCI'] = ta.cci(df['high'], df['low'], df['close'], length=20)
df['ADX'] = ta.adx(df['high'], df['low'], df['close'], length=14).iloc[:, 0]
for p in [9, 21, 50, 200]:
ema = ta.ema(df['close'], length=p)
df[f'EMA_{p}_dist'] = (df['close'] / ema) - 1
bb = ta.bbands(df['close'], length=20, std=2.0)
df['BB_w'] = (bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1]
df['BB_p'] = (df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0])
df['MFI'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14)
vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume'])
df['VWAP_dist'] = (df['close'] / vwap) - 1
# --- المستوى 2: تكتيكي (1h, 4h) ---
elif tf in ['1h', '4h']:
df['RSI'] = ta.rsi(df['close'], length=14)
df['MACD_h'] = ta.macd(df['close']).iloc[:, 1]
df['EMA_50_dist'] = (df['close'] / ta.ema(df['close'], length=50)) - 1
df['EMA_200_dist'] = (df['close'] / ta.ema(df['close'], length=200)) - 1
df['ATR_pct'] = ta.atr(df['high'], df['low'], df['close'], length=14) / df['close']
# --- المستوى 3: استراتيجي (1d) ---
elif tf == '1d':
df['RSI'] = ta.rsi(df['close'], length=14)
df['EMA_200_dist'] = (df['close'] / ta.ema(df['close'], length=200)) - 1
adx = ta.adx(df['high'], df['low'], df['close'])
if adx is not None and not adx.empty:
df['Trend_Strong'] = np.where(adx.iloc[:, 0] > 25, 1, 0)
else:
df['Trend_Strong'] = 0
return df.reset_index(drop=True)
def predict(self, ohlcv_data: dict) -> dict:
"""
استقبال البيانات الخام (Dictionary of DataFrames/Lists)،
تجهيزها، ثم استدعاء النموذج للتنبؤ.
"""
if not self.initialized or not self.model:
return {'score': 0.0, 'error': 'Titan not initialized'}
try:
# 1. تجهيز البيانات لكل إطار
processed_tfs = {}
for tf, data in ohlcv_data.items():
if not data: continue
# تحويل القوائم إلى DataFrame إذا لزم الأمر
if isinstance(data, list):
df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
else:
df = data.copy()
# تطبيق المؤشرات حسب الإطار
df = self.apply_inverted_pyramid(df, tf)
processed_tfs[tf] = df
# 2. الدمج (Alignment) للحصول على آخر لقطة (Latest Snapshot)
if '5m' not in processed_tfs:
return {'score': 0.0, 'error': 'Missing 5m base timeframe'}
# نأخذ آخر صف فقط من الـ 5m كأساس
latest_5m = processed_tfs['5m'].iloc[-1:].copy()
latest_ts = latest_5m['timestamp'].iloc[0]
base_row = latest_5m.add_prefix('5m_').rename(columns={'5m_timestamp': 'timestamp'})
# دمج باقي الأطر (نأخذ آخر شمعة أغلقت قبل أو مع شمعة الـ 5m الحالية)
for tf, df in processed_tfs.items():
if tf == '5m': continue
# العثور على الشمعة المناسبة زمنياً
relevant_row = df[df['timestamp'] <= latest_ts].iloc[-1:].copy()
if relevant_row.empty: continue
# تجهيز الأعمدة للدمج
cols = [c for c in relevant_row.columns if c not in ['timestamp','open','high','low','close','volume']]
for col in cols:
base_row[f"{tf}_{col}"] = relevant_row[col].values[0]
# 3. تجهيز شعاع الإدخال (Feature Vector)
# التأكد من وجود كل الميزات المطلوبة بالترتيب الصحيح
input_data = []
for feat in self.feature_names:
val = base_row.get(feat, np.nan)
# إذا كانت القيمة مصفوفة أو سلسلة بانداز، نأخذ القيمة الأولى
if isinstance(val, (pd.Series, np.ndarray)):
val = val.iloc[0] if len(val) > 0 else np.nan
input_data.append(val)
# 4. التنبؤ
# تحويل إلى DMatrix (تنسيق XGBoost السريع)
dtest = xgb.DMatrix([input_data], feature_names=self.feature_names)
prediction = self.model.predict(dtest)[0] # إرجاع الاحتمالية الأولى
return {
'score': float(prediction),
'timestamp': int(latest_ts),
'status': 'OK'
}
except Exception as e:
# print(f"⚠️ [Titan Error] {e}")
import traceback
traceback.print_exc()
return {'score': 0.0, 'error': str(e)} |