Trad / ml_engine /titan_engine.py
Riy777's picture
Create titan_engine.py
172cf4d
raw
history blame
7.39 kB
# ml_engine/titan_engine.py
# (V1.0 - Titan Inference Engine)
import os
import joblib
import numpy as np
import pandas as pd
import pandas_ta as ta
import xgboost as xgb
import json
class TitanEngine:
def __init__(self, model_dir="ml_models/layer2"):
self.model_path = os.path.join(model_dir, "Titan_XGB_V1.json")
self.features_path = os.path.join(model_dir, "Titan_Features.pkl")
self.model = None
self.feature_names = None
self.initialized = False
async def initialize(self):
"""تحميل النموذج وقائمة الميزات من القرص"""
print(f"🛡️ [Titan] جاري تهيئة المحرك من {self.model_path}...")
try:
if os.path.exists(self.model_path) and os.path.exists(self.features_path):
# تحميل نموذج XGBoost
self.model = xgb.Booster()
self.model.load_model(self.model_path)
# تحميل قائمة الميزات لضمان الترتيب الصحيح
self.feature_names = joblib.load(self.features_path)
self.initialized = True
print(f"✅ [Titan] تم التحميل بنجاح. جاهز بـ {len(self.feature_names)} ميزة.")
else:
print(f"❌ [Titan] ملفات النموذج مفقودة!")
except Exception as e:
print(f"❌ [Titan] خطأ فادح أثناء التهيئة: {e}")
def apply_inverted_pyramid(self, df, tf):
"""نفس منطق هندسة الميزات المستخدم في التدريب تماماً"""
df = df.copy().sort_values('timestamp').reset_index(drop=True)
# تعيين الفهرس للسهولة في pandas_ta
df = df.set_index(pd.DatetimeIndex(pd.to_datetime(df['timestamp'], unit='ms')))
# --- المستوى 1: دقيق (5m, 15m) ---
if tf in ['5m', '15m']:
df['RSI'] = ta.rsi(df['close'], length=14)
df['MACD'] = ta.macd(df['close']).iloc[:, 0]
df['MACD_h'] = ta.macd(df['close']).iloc[:, 1]
df['CCI'] = ta.cci(df['high'], df['low'], df['close'], length=20)
df['ADX'] = ta.adx(df['high'], df['low'], df['close'], length=14).iloc[:, 0]
for p in [9, 21, 50, 200]:
ema = ta.ema(df['close'], length=p)
df[f'EMA_{p}_dist'] = (df['close'] / ema) - 1
bb = ta.bbands(df['close'], length=20, std=2.0)
df['BB_w'] = (bb.iloc[:, 2] - bb.iloc[:, 0]) / bb.iloc[:, 1]
df['BB_p'] = (df['close'] - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0])
df['MFI'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14)
vwap = ta.vwap(df['high'], df['low'], df['close'], df['volume'])
df['VWAP_dist'] = (df['close'] / vwap) - 1
# --- المستوى 2: تكتيكي (1h, 4h) ---
elif tf in ['1h', '4h']:
df['RSI'] = ta.rsi(df['close'], length=14)
df['MACD_h'] = ta.macd(df['close']).iloc[:, 1]
df['EMA_50_dist'] = (df['close'] / ta.ema(df['close'], length=50)) - 1
df['EMA_200_dist'] = (df['close'] / ta.ema(df['close'], length=200)) - 1
df['ATR_pct'] = ta.atr(df['high'], df['low'], df['close'], length=14) / df['close']
# --- المستوى 3: استراتيجي (1d) ---
elif tf == '1d':
df['RSI'] = ta.rsi(df['close'], length=14)
df['EMA_200_dist'] = (df['close'] / ta.ema(df['close'], length=200)) - 1
adx = ta.adx(df['high'], df['low'], df['close'])
if adx is not None and not adx.empty:
df['Trend_Strong'] = np.where(adx.iloc[:, 0] > 25, 1, 0)
else:
df['Trend_Strong'] = 0
return df.reset_index(drop=True)
def predict(self, ohlcv_data: dict) -> dict:
"""
استقبال البيانات الخام (Dictionary of DataFrames/Lists)،
تجهيزها، ثم استدعاء النموذج للتنبؤ.
"""
if not self.initialized or not self.model:
return {'score': 0.0, 'error': 'Titan not initialized'}
try:
# 1. تجهيز البيانات لكل إطار
processed_tfs = {}
for tf, data in ohlcv_data.items():
if not data: continue
# تحويل القوائم إلى DataFrame إذا لزم الأمر
if isinstance(data, list):
df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
else:
df = data.copy()
# تطبيق المؤشرات حسب الإطار
df = self.apply_inverted_pyramid(df, tf)
processed_tfs[tf] = df
# 2. الدمج (Alignment) للحصول على آخر لقطة (Latest Snapshot)
if '5m' not in processed_tfs:
return {'score': 0.0, 'error': 'Missing 5m base timeframe'}
# نأخذ آخر صف فقط من الـ 5m كأساس
latest_5m = processed_tfs['5m'].iloc[-1:].copy()
latest_ts = latest_5m['timestamp'].iloc[0]
base_row = latest_5m.add_prefix('5m_').rename(columns={'5m_timestamp': 'timestamp'})
# دمج باقي الأطر (نأخذ آخر شمعة أغلقت قبل أو مع شمعة الـ 5m الحالية)
for tf, df in processed_tfs.items():
if tf == '5m': continue
# العثور على الشمعة المناسبة زمنياً
relevant_row = df[df['timestamp'] <= latest_ts].iloc[-1:].copy()
if relevant_row.empty: continue
# تجهيز الأعمدة للدمج
cols = [c for c in relevant_row.columns if c not in ['timestamp','open','high','low','close','volume']]
for col in cols:
base_row[f"{tf}_{col}"] = relevant_row[col].values[0]
# 3. تجهيز شعاع الإدخال (Feature Vector)
# التأكد من وجود كل الميزات المطلوبة بالترتيب الصحيح
input_data = []
for feat in self.feature_names:
val = base_row.get(feat, np.nan)
# إذا كانت القيمة مصفوفة أو سلسلة بانداز، نأخذ القيمة الأولى
if isinstance(val, (pd.Series, np.ndarray)):
val = val.iloc[0] if len(val) > 0 else np.nan
input_data.append(val)
# 4. التنبؤ
# تحويل إلى DMatrix (تنسيق XGBoost السريع)
dtest = xgb.DMatrix([input_data], feature_names=self.feature_names)
prediction = self.model.predict(dtest)[0] # إرجاع الاحتمالية الأولى
return {
'score': float(prediction),
'timestamp': int(latest_ts),
'status': 'OK'
}
except Exception as e:
# print(f"⚠️ [Titan Error] {e}")
import traceback
traceback.print_exc()
return {'score': 0.0, 'error': str(e)}