Trad / ml_engine /xgboost_pattern_v2.py
Riy777's picture
Create xgboost_pattern_v2.py
a836b38
raw
history blame
7.78 kB
# ml_engine/xgboost_pattern_v2.py
# (Pipeline لتجهيز البيانات لنماذج XGBoost V2)
import numpy as np
import pandas as pd
import pandas_ta as ta
import logging
# إعداد التسجيل (Logging) لتتبع الأخطاء الصامتة
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
try:
from hurst import compute_Hc
HURST_AVAILABLE = True
except ImportError:
logger.warning("مكتبة 'hurst' غير موجودة. سيتم استخدام قيمة افتراضية 0.5 لمؤشر Hurst.")
HURST_AVAILABLE = False
# === دوال مساعدة (مطابقة تماماً لما استخدم في التدريب) ===
def _zv(x):
"""حساب Z-Score الآمن (يتجنب القسمة على صفر)"""
with np.errstate(divide='ignore', invalid='ignore'):
x = np.asarray(x, dtype="float32")
m = np.nanmean(x)
s = np.nanstd(x) + 1e-9
x_norm = (x - m) / s
return np.nan_to_num(x_norm, nan=0.0).astype("float32")
def ema_np_safe(x, n):
"""حساب المتوسط المتحرك الأسي (EMA) بشكل آمن وسريع باستخدام Numpy"""
x = np.asarray(x, dtype="float32")
k = 2.0 / (n + 1.0)
out = np.empty_like(x)
out[0] = x[0] if not np.isnan(x[0]) else 0.0
for i in range(1, len(x)):
val = x[i] if not np.isnan(x[i]) else out[i-1]
out[i] = out[i-1] + k * (val - out[i-1])
return out
def mc_simple_fast(closes_np: np.ndarray, target_profit=0.005):
"""نسخة سريعة وآمنة من محاكاة مونت كارلو البسيطة (للميزات الإحصائية)"""
try:
if len(closes_np) < 30: return 0.5, 0.0
c = closes_np
cur = float(c[-1])
if cur <= 0: return 0.5, 0.0
# حساب العوائد اللوغاريتمية
lr = np.diff(np.log1p(c))
lr = lr[np.isfinite(lr)]
if len(lr) < 20: return 0.5, 0.0
mu = np.mean(lr)
sigma = np.std(lr)
if sigma < 1e-9: return 0.5, 0.0 # تجنب التقلب الصفري
# محاكاة سريعة (500 مسار) باستخدام توزيع t-student (كما في التدريب)
n_sims = 500
drift = (mu - 0.5 * sigma**2)
diffusion = sigma * np.random.standard_t(df=10, size=n_sims)
sim_prices = cur * np.exp(drift + diffusion)
var95 = np.percentile(sim_prices, 5)
var95_pct = (cur - var95) / (cur + 1e-9)
prob_gain = np.mean(sim_prices >= cur * (1 + target_profit))
return float(prob_gain), float(var95_pct)
except Exception:
return 0.5, 0.0
# ============================================================
# === الدالة الرئيسية: تجهيز البيانات للنظام الحي (V6 Pipeline) ===
# ============================================================
def transform_candles_for_ml(df_window: pd.DataFrame):
"""
تحويل نافذة من الشموع (200 شمعة) إلى متجه ميزات جاهز لنموذج ML.
Input: DataFrame (must have columns: 'open', 'high', 'low', 'close', 'volume')
Output: Numpy Array shape (1, 3803) or None if failed.
"""
try:
# التأكد من وجود بيانات كافية (نحتاج 200 شمعة بالضبط للحفاظ على الشكل)
if len(df_window) < 200:
# في حالة البيانات الناقصة، يمكننا إما الرفض أو التعبئة بأصفار (الرفض أسلم)
# logger.warning(f"بيانات غير كافية للتحويل: {len(df_window)} < 200")
return None
df = df_window.iloc[-200:].copy() # نضمن أخذ آخر 200 فقط
# تحويل الأعمدة إلى numpy arrays (float32)
o = df["open"].to_numpy(dtype="float32")
h = df["high"].to_numpy(dtype="float32")
l = df["low"].to_numpy(dtype="float32")
c = df["close"].to_numpy(dtype="float32")
v = df["volume"].to_numpy(dtype="float32")
# --- 1. الميزات الأساسية (5 ميزات × 200) ---
base = np.stack([o, h, l, c, v], axis=1) # Shape: (200, 5)
base_z = _zv(base)
# --- 2. ميزات إضافية (2 ميزة × 200) ---
lr = np.zeros_like(c)
lr[1:] = np.diff(np.log1p(c)) # Log Returns
rng = (h - l) / (c + 1e-9) # Range
extra = np.stack([lr, rng], axis=1) # Shape: (200, 2)
extra_z = _zv(extra)
# --- 3. المؤشرات الفنية (12 ميزة × 200) - "المحرك المُدرّع V7" ---
ema9 = ema_np_safe(c, 9)
ema21 = ema_np_safe(c, 21)
ema50 = ema_np_safe(c, 50)
ema200 = ema_np_safe(c, 200)
slope21 = np.concatenate([[0.0], np.diff(ema21)])
slope50 = np.concatenate([[0.0], np.diff(ema50)])
# استخدام try...except لكل مؤشر من مكتبة pandas_ta لضمان المتانة
try: rsi = ta.rsi(pd.Series(c), length=14).fillna(50).to_numpy(dtype="float32")
except: rsi = np.full_like(c, 50.0, dtype="float32")
try:
macd_data = ta.macd(pd.Series(c), fast=12, slow=26, signal=9)
macd_line = macd_data.iloc[:, 0].fillna(0).to_numpy(dtype="float32")
macd_hist = macd_data.iloc[:, 2].fillna(0).to_numpy(dtype="float32")
except:
macd_line = np.zeros_like(c, dtype="float32")
macd_hist = np.zeros_like(c, dtype="float32")
try: atr = ta.atr(pd.Series(h), pd.Series(l), pd.Series(c), length=14).fillna(0).to_numpy(dtype="float32")
except: atr = np.zeros_like(c, dtype="float32")
try:
bb = ta.bbands(pd.Series(c), length=20, std=2)
# BB%B: موقع السعر بالنسبة للنطاق
bb_p = ((c - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0] + 1e-9)).fillna(0.5).to_numpy(dtype="float32")
except: bb_p = np.full_like(c, 0.5, dtype="float32")
try: obv = ta.obv(pd.Series(c), pd.Series(v)).fillna(0).to_numpy(dtype="float32")
except: obv = np.zeros_like(c, dtype="float32")
# تجميع المؤشرات الـ 12 وتطبيق Z-Score عليها
indicators = np.stack([
ema9, ema21, ema50, ema200, slope21, slope50,
rsi, macd_line, macd_hist, atr / (c + 1e-9), bb_p, obv
], axis=1) # Shape: (200, 12)
indicators_z = _zv(indicators)
# --- 4. الدمج والتسطيح (Flattening) ---
# الشكل النهائي قبل التسطيح: (200, 5 + 2 + 12) = (200, 19)
X_seq = np.concatenate([base_z, extra_z, indicators_z], axis=1)
X_seq_flat = X_seq.reshape(1, -1) # Shape: (1, 3800)
# --- 5. الميزات الثابتة (3 ميزات) ---
try: mc_p, mc_var = mc_simple_fast(c[-100:]) # آخر 100 شمعة للمحاكاة السريعة
except: mc_p, mc_var = 0.5, 0.0
hurst_val = 0.5
if HURST_AVAILABLE:
try: hurst_val = compute_Hc(c[-100:], kind='price', simplified=True)[0]
except: pass
X_stat = np.array([[mc_p, mc_var, hurst_val]], dtype="float32") # Shape: (1, 3)
# --- 6. الدمج النهائي ---
X_final = np.concatenate([X_seq_flat, X_stat], axis=1) # Shape: (1, 3803)
# استبدال أي NaN متبقي بـ 0 للأمان التام
X_final = np.nan_to_num(X_final, nan=0.0, posinf=0.0, neginf=0.0)
return X_final
except Exception as e:
# logger.error(f"Pipeline Error during transformation: {e}")
return None