Trad / trade_manager.py
Riy777's picture
Update trade_manager.py
624c411
raw
history blame
31.1 kB
# trade_manager.py (V11.0 - Full Real Data & Shared XGBoost Integrated)
import asyncio
import json
import time
import traceback
import os
from datetime import datetime, timedelta
from typing import Dict, Any, List
from collections import deque, defaultdict
import pandas as pd
import numpy as np
# محاولة استيراد المكتبات الاختيارية مع تنبيهات
try:
import pandas_ta as ta
except ImportError:
print("⚠️ [TradeManager] مكتبة pandas_ta غير موجودة. المؤشرات الثانوية ستفشل.")
ta = None
try:
import ccxt.async_support as ccxtasync
from ccxt.base.errors import RequestTimeout, RateLimitExceeded, NetworkError
CCXT_ASYNC_AVAILABLE = True
except ImportError:
print("❌ [TradeManager] خطأ فادح: فشل استيراد 'ccxt.async_support'.")
CCXT_ASYNC_AVAILABLE = False
# استيراد المحللات
from ml_engine.indicators import AdvancedTechnicalAnalyzer
# لا نحتاج لاستيراد ChartPatternAnalyzer هنا لأننا سنستخدم النسخة المشتركة
class TacticalData:
"""
(V11.0) مخزن البيانات التكتيكية الحية (1m, 5m, 1h) للحارس.
يعتمد فقط على بيانات حقيقية من الـ Polling.
"""
def __init__(self, symbol):
self.symbol = symbol
self.order_book = None
self.trades = deque(maxlen=1000) # زيادة السعة لتحليل تدفق الأوامر بدقة
self.cvd = 0.0
self.large_trades = []
self.last_update = time.time()
# لتأكيد الصفقات من منصات أخرى
self.confirmation_trades = defaultdict(lambda: deque(maxlen=100))
self.confirmation_cvd = defaultdict(float)
self.last_kucoin_trade_id = None
self.last_confirmation_trade_ids = defaultdict(lambda: None)
# تخزين الشموع للأطر الزمنية المختلفة (نحتاج 200 على الأقل لـ XGBoost)
self.ohlcv_1m = deque(maxlen=300)
self.ohlcv_5m = deque(maxlen=300)
self.ohlcv_1h = deque(maxlen=300)
self.indicators_1m = {}
self.indicators_1h = {}
self.last_1m_candle_timestamp = None
self.last_5m_candle_timestamp = None
self.last_1h_candle_timestamp = None
self.new_5m_data_added = False
def add_trade(self, trade):
"""إضافة صفقة حية وتحديث CVD"""
trade_id = trade.get('id')
# تجنب التكرار
if trade_id and trade_id == self.last_kucoin_trade_id:
return
self.last_kucoin_trade_id = trade_id
self.trades.append(trade)
self.last_update = time.time()
try:
amount = float(trade['amount'])
price = float(trade['price'])
cost = amount * price
# تحديث CVD (Cumulative Volume Delta)
if trade['side'] == 'buy': self.cvd += amount
else: self.cvd -= amount
# تسجيل الصفقات الكبيرة (> $20k)
if cost > 20000:
self.large_trades.append(trade)
# نحتفظ بآخر 50 صفقة كبيرة فقط لتوفير الذاكرة
if len(self.large_trades) > 50: self.large_trades.pop(0)
except Exception: pass
def set_order_book(self, ob):
self.order_book = ob
self.last_update = time.time()
def analyze_order_book(self):
"""تحليل عمق السوق الحقيقي (Order Book Imbalance)"""
if not self.order_book:
return {"ob_score": 0.0, "ob_ratio": 1.0}
try:
bids = self.order_book.get('bids', [])
asks = self.order_book.get('asks', [])
# حساب عمق أول 20 مستوى (أكثر دقة من 10)
bids_depth = sum(p * a for p, a in bids[:20])
asks_depth = sum(p * a for p, a in asks[:20])
if asks_depth <= 0: return {"ob_score": 0.0, "ob_ratio": 1.0}
ob_ratio = bids_depth / asks_depth
# تطبيع النسبة لتكون درجة بين -1 (ضغط بيع قوي) و +1 (ضغط شراء قوي)
ob_score = np.clip((ob_ratio - 1.0), -1.0, 1.0)
return {"ob_ratio": ob_ratio, "ob_score": ob_score}
except Exception:
return {"ob_score": 0.0, "ob_ratio": 1.0}
def add_ohlcv(self, timeframe, candles):
"""إضافة شموع جديدة وتحديث المؤشرات تلقائياً"""
if not candles: return
target_deque = getattr(self, f"ohlcv_{timeframe}", None)
last_ts_attr = f"last_{timeframe}_candle_timestamp"
last_ts = getattr(self, last_ts_attr)
new_data = False
for candle in candles:
ts = candle[0]
# إضافة الشموع الجديدة فقط
if ts and ts != last_ts:
# التأكد من الترتيب الزمني
if target_deque and len(target_deque) > 0 and ts <= target_deque[-1][0]:
continue
target_deque.append(candle)
setattr(self, last_ts_attr, ts)
new_data = True
if new_data:
# تحديث المؤشرات عند توفر بيانات جديدة كافية
if timeframe == '1m' and len(self.ohlcv_1m) >= 50:
self._analyze_1m_indicators()
elif timeframe == '5m':
self.new_5m_data_added = True # إشارة لتشغيل XGBoost 5m
elif timeframe == '1h' and len(self.ohlcv_1h) >= 50:
self._analyze_1h_indicators()
def _analyze_1m_indicators(self):
"""حساب مؤشرات 1m السريعة للدخول اللحظي"""
if ta is None: return
try:
df = pd.DataFrame(list(self.ohlcv_1m), columns=['ts', 'o', 'h', 'l', 'c', 'v'])
close = df['c']
self.indicators_1m = {
'rsi': ta.rsi(close, length=14).iloc[-1],
'ema_fast': ta.ema(close, length=9).iloc[-1],
'macd': ta.macd(close).iloc[-1]['MACDh_12_26_9']
}
except: self.indicators_1m = {}
def _analyze_1h_indicators(self):
"""حساب مؤشرات 1h (مثل ATR لوقف الخسارة المتحرك)"""
if ta is None: return
try:
df = pd.DataFrame(list(self.ohlcv_1h), columns=['ts', 'o', 'h', 'l', 'c', 'v'])
self.indicators_1h = {
'atr': ta.atr(df['h'], df['l'], df['c'], length=14).iloc[-1]
}
except: self.indicators_1h = {}
def get_tactical_snapshot(self):
"""لقطة شاملة للوضع التكتيكي الحالي"""
ob_analysis = self.analyze_order_book()
# تطبيع CVD بالنسبة للحجم التقريبي (فرضية: 100k حجم كبير لـ 1m)
cvd_score = np.clip(self.cvd / 100000.0, -1.0, 1.0)
return {
"cvd_score": cvd_score,
"ob_score": ob_analysis['ob_score'],
"indicators_1m": self.indicators_1m,
"indicators_1h": self.indicators_1h,
# عدد الصفقات الكبيرة في آخر 5 دقائق
"large_trades_last_5m": len([t for t in self.large_trades if (time.time() - t['timestamp']/1000) < 300])
}
class TradeManager:
def __init__(self, r2_service, learning_hub=None, data_manager=None, state_manager=None, callback_on_close=None):
if not CCXT_ASYNC_AVAILABLE:
raise RuntimeError("مكتبة ccxt.async_support مطلوبة للعمل.")
self.r2_service = r2_service
self.learning_hub = learning_hub
self.data_manager = data_manager
self.state_manager = state_manager
self.callback_on_close = callback_on_close
self.is_running = False
self.sentry_watchlist = {}
self.sentry_tasks = {}
self.tactical_data_cache = {}
self.sentry_lock = asyncio.Lock()
# اتصالات المنصات
self.kucoin_rest = None
self.confirmation_exchanges = {}
# إعدادات الاتصال الحقيقي
self.polling_interval = 2.0 # تحديث كل ثانيتين
self.exchange_config = {
'enableRateLimit': True,
'timeout': 20000, # مهلة 20 ثانية
}
# (V11.0) استخدام محرك الأنماط المشترك من DataManager لتوفير الموارد
if self.data_manager and self.data_manager.pattern_analyzer:
self.pattern_engine = self.data_manager.pattern_analyzer
print("✅ [TradeManager V11.0] تم ربط محرك XGBoost المشترك.")
else:
self.pattern_engine = None
print("⚠️ [TradeManager] تحذير: محرك XGBoost المشترك غير متوفر.")
# الأوزان المتكيفة (يتم تحديثها من Learning Hub)
self.optimized_weights = {}
self.last_weights_update = 0
async def initialize_sentry_exchanges(self):
"""تهيئة الاتصالات الحقيقية بالمنصات"""
try:
print("🔄 [Sentry V11.0] تهيئة الاتصالات الحقيقية...")
# 1. تهيئة منصة KuCoin الأساسية
self.kucoin_rest = ccxtasync.kucoin(self.exchange_config)
# محاولة تحميل الأسواق للتأكد من الاتصال
await self.kucoin_rest.load_markets()
print(" ✅ [Sentry] الاتصال بـ KuCoin جاهز (بيانات حية).")
# 2. تهيئة منصات التأكيد (اختياري)
# (يمكن تفعيل المزيد من المنصات هنا حسب الحاجة)
conf_exs = ['bybit', 'binance']
for ex_id in conf_exs:
try:
if hasattr(ccxtasync, ex_id):
exchange = getattr(ccxtasync, ex_id)(self.exchange_config)
self.confirmation_exchanges[ex_id] = exchange
print(f" ✅ [Sentry] منصة التأكيد {ex_id} جاهزة.")
except Exception: pass
await self._refresh_weights()
print("✅ [Sentry] اكتملت التهيئة. الحارس جاهز للمراقبة الحية.")
except Exception as e:
print(f"❌ [Sentry] فشل فادح في تهيئة الاتصالات: {e}")
raise
async def _refresh_weights(self):
"""تحديث الأوزان من محور التعلم دورياً"""
if self.learning_hub and (time.time() - self.last_weights_update > 300):
try:
self.optimized_weights = await self.learning_hub.get_optimized_weights("general")
self.last_weights_update = time.time()
except: pass
async def start_sentry_and_monitoring_loops(self):
"""بدء حلقة المراقبة الرئيسية للحارس"""
self.is_running = True
print("🛡️ [Sentry] بدء حلقات المراقبة الحية...")
while self.is_running:
try:
await self._refresh_weights()
# تحديد العملات المطلوب مراقبتها (من قائمة المراقبة + الصفقات المفتوحة)
async with self.sentry_lock:
targets = set(self.sentry_watchlist.keys())
open_trades = await self.get_open_trades()
targets.update([t['symbol'] for t in open_trades if t['status'] == 'OPEN'])
current_tasks = set(self.sentry_tasks.keys())
# إضافة مراقبين جدد
for symbol in targets - current_tasks:
print(f" 👀 [Sentry] بدء مراقبة {symbol}")
self.tactical_data_cache[symbol] = TacticalData(symbol)
self.sentry_tasks[symbol] = asyncio.create_task(self._monitor_symbol(symbol))
# إيقاف مراقبين لم يعودوا مطلوبين
for symbol in current_tasks - targets:
if symbol in self.sentry_tasks:
self.sentry_tasks[symbol].cancel()
del self.sentry_tasks[symbol]
if symbol in self.tactical_data_cache:
del self.tactical_data_cache[symbol]
await asyncio.sleep(5) # مراجعة القائمة كل 5 ثواني
except Exception as e:
print(f"❌ [Sentry Loop Error] {e}")
await asyncio.sleep(10)
async def stop_sentry_loops(self):
"""إيقاف جميع أنشطة الحارس وإغلاق الاتصالات"""
self.is_running = False
for task in self.sentry_tasks.values(): task.cancel()
if self.kucoin_rest: await self.kucoin_rest.close()
for ex in self.confirmation_exchanges.values(): await ex.close()
async def update_sentry_watchlist(self, candidates: List[Dict]):
"""تحديث قائمة المراقبة من المستكشف"""
async with self.sentry_lock:
self.sentry_watchlist = {c['symbol']: c for c in candidates}
print(f"📋 [Sentry] تم تحديث قائمة المراقبة: {len(candidates)} عملة.")
async def _monitor_symbol(self, symbol: str):
"""مراقب مخصص لكل عملة (يعمل في الخلفية بشكل مستقل)"""
try:
# تشغيل مهام جمع البيانات وتحليلها بالتوازي
await asyncio.gather(
self._poll_market_data(symbol),
self._analyze_symbol_tactics(symbol)
)
except asyncio.CancelledError:
pass
except Exception as e:
print(f"⚠️ [Sentry Monitor] خطأ غير متوقع في مراقبة {symbol}: {e}")
async def _poll_market_data(self, symbol):
"""حلقة جمع البيانات الحية من المنصة (Polling Loop)"""
backoff = 1.0
while self.is_running:
try:
if not self.kucoin_rest: await asyncio.sleep(5); continue
# جلب البيانات الحية بالتوازي لتقليل التأخير الكلي
t_start = time.time()
# نطلب بيانات كافية لملء المخازن (خاصة 5m لـ XGBoost تحتاج 200)
results = await asyncio.gather(
self.kucoin_rest.fetch_order_book(symbol, limit=50),
self.kucoin_rest.fetch_trades(symbol, limit=100),
self.kucoin_rest.fetch_ohlcv(symbol, '1m', limit=100),
self.kucoin_rest.fetch_ohlcv(symbol, '5m', limit=300), # 300 للأمان
self.kucoin_rest.fetch_ohlcv(symbol, '1h', limit=100),
return_exceptions=True
)
data = self.tactical_data_cache.get(symbol)
if not data: break # توقفت المراقبة
# تحديث TacticalData بالنتائج الناجحة فقط
if not isinstance(results[0], Exception): data.set_order_book(results[0])
if not isinstance(results[1], Exception):
for t in results[1]: data.add_trade(t)
if not isinstance(results[2], Exception): data.add_ohlcv('1m', results[2])
if not isinstance(results[3], Exception): data.add_ohlcv('5m', results[3])
if not isinstance(results[4], Exception): data.add_ohlcv('1h', results[4])
backoff = 1.0 # إعادة تعيين الـ backoff عند النجاح
# انتظار ديناميكي للحفاظ على معدل التحديث المطلوب بدقة
elapsed = time.time() - t_start
await asyncio.sleep(max(0.1, self.polling_interval - elapsed))
except (RateLimitExceeded, RequestTimeout, NetworkError) as e:
# التعامل الذكي مع أخطاء الشبكة والحدود
print(f"⏳ [Polling] {symbol} - مشكلة اتصال ({type(e).__name__}). انتظار {backoff:.1f}s...")
await asyncio.sleep(backoff)
backoff = min(backoff * 1.5, 30.0) # زيادة وقت الانتظار تدريجياً حتى 30 ثانية
except asyncio.CancelledError: raise
except Exception as e:
print(f"⚠️ [Polling] خطأ غير متوقع لـ {symbol}: {e}")
await asyncio.sleep(5)
async def _analyze_symbol_tactics(self, symbol):
"""(دماغ الحارس) حلقة التحليل واتخاذ القرارات اللحظية"""
while self.is_running:
await asyncio.sleep(1.0) # دورة تحليل كل ثانية
try:
data = self.tactical_data_cache.get(symbol)
if not data: continue
# هل لدينا صفقة مفتوحة لهذه العملة؟
trade = await self.get_trade_by_symbol(symbol)
snapshot = data.get_tactical_snapshot()
if trade:
# --- وضع مراقبة الصفقة المفتوحة (Exit Monitor) ---
current_price = self._get_current_price(data)
if not current_price: continue
# 1. التحقق من الوقف/الهدف الثابت (الأولوية القصوى)
exit_reason = self._check_hard_exits(trade, current_price)
# 2. التحقق من الوقف المتحرك (ATR Trailing)
if not exit_reason:
exit_reason = await self._check_atr_trailing(trade, snapshot, current_price)
# 3. (V11.0) التحقق من "حماية الأرباح الذكية" باستخدام XGBoost 5m
# يتم تفعيله فقط عند إغلاق شمعة 5m جديدة
if not exit_reason and data.new_5m_data_added:
ml_exit = await self._run_5m_ml_profit_saver(trade, list(data.ohlcv_5m))
if ml_exit: exit_reason = ml_exit
data.new_5m_data_added = False # إعادة تعيين العلم
# تنفيذ الخروج الفوري إذا لزم الأمر
if exit_reason:
print(f"🛑 [Sentry Executor] خروج فوري لـ {symbol}: {exit_reason}")
await self.immediate_close_trade(symbol, current_price, exit_reason)
else:
# --- وضع مراقبة فرص الدخول (Entry Monitor) ---
watchlist_item = self.sentry_watchlist.get(symbol)
if watchlist_item:
# التحقق من شروط الدخول التكتيكية (المؤشرات + ML)
if await self._check_entry_trigger(symbol, snapshot, data):
print(f"🚀 [Sentry Executor] تنفيذ دخول لـ {symbol}!")
await self._execute_paper_entry(symbol, watchlist_item, data)
except asyncio.CancelledError: raise
except Exception as e:
print(f"⚠️ [Tactics] خطأ في تحليل {symbol}: {e}")
traceback.print_exc()
await asyncio.sleep(5)
def _get_current_price(self, data: TacticalData):
"""أفضل تقدير للسعر الحالي من البيانات المتاحة (Order Book أو آخر صفقة)"""
if data.order_book and data.order_book['bids'] and len(data.order_book['bids']) > 0:
return float(data.order_book['bids'][0][0]) # أفضل سعر طلب
if data.trades:
return float(data.trades[-1]['price']) # آخر سعر تنفيذ
return None
# ==================================================================
# 🧠 منطق القرارات الذكي (V11.0 Integrated)
# ==================================================================
async def _check_entry_trigger(self, symbol, snapshot, data: TacticalData) -> bool:
"""
(V11.0) زناد الدخول الذكي. يدمج:
1. المؤشرات اللحظية السريعة (CVD, OrderBook, RSI 1m).
2. تأكيد قوي من نموذج XGBoost 5m (إذا توفرت بيانات كافية).
"""
# 1. الشروط الأساسية اللحظية (Momentum Filters)
cvd_score = snapshot.get('cvd_score', 0)
ob_score = snapshot.get('ob_score', 0)
rsi_1m = snapshot.get('indicators_1m', {}).get('rsi', 50)
# شروط الزخم: ضغط شراء طفيف على الأقل، RSI في منطقة آمنة
momentum_ok = (cvd_score > 0.05 and ob_score > -0.1 and 40 < rsi_1m < 70)
if not momentum_ok: return False
# 2. التأكيد الذكي (XGBoost 5m)
xgb_conf = 0.0
# نحتاج 200 شمعة على الأقل ليعمل النموذج بدقة
if self.pattern_engine and self.pattern_engine.initialized and len(data.ohlcv_5m) >= 200:
res = await self.pattern_engine.detect_chart_patterns({'5m': list(data.ohlcv_5m)})
xgb_conf = res.get('details', {}).get('5m') or 0.0
# شرط الدخول النهائي: زخم لحظي جيد + تأكيد ML متوسط على الأقل (>50%)
if momentum_ok and xgb_conf > 0.50:
print(f" ✅ [Trigger] {symbol} زناد دخول! (Mom: OK, XGB_5m: {xgb_conf:.2f})")
return True
return False
async def _run_5m_ml_profit_saver(self, trade, candles_5m) -> str:
"""
(V11.0) حماية الأرباح باستخدام XGBoost 5m.
يخرج إذا اكتشف النموذج انعكاساً هبوطياً قوياً (>75%) ونحن في وضع جيد.
"""
if not self.pattern_engine or not self.pattern_engine.initialized or len(candles_5m) < 200:
return None
try:
# تحليل إطار 5 دقائق باستخدام النموذج المشترك
res = await self.pattern_engine.detect_chart_patterns({'5m': candles_5m})
prob_up = res.get('details', {}).get('5m')
if prob_up is not None:
prob_down = 1.0 - prob_up # لأن النموذج ثنائي
# إذا كان احتمال الهبوط قوياً جداً (> 75%)
if prob_down > 0.75:
entry_price = trade.get('entry_price', 0)
current_price = candles_5m[-1][4]
pnl = (current_price - entry_price) / entry_price
# نخرج فقط إذا كنا رابحين (لحماية الربح) أو خاسرين قليلاً جداً (لوقف النزيف)
if pnl > 0.005 or pnl > -0.005:
return f"ML Profit Saver: Bearish Signal on 5m (Prob Down: {prob_down:.2f})"
except Exception as e:
print(f"⚠️ [Sentry] خطأ في ML Profit Saver: {e}")
return None
# ==================================================================
# ⚙️ وظائف التنفيذ وإدارة الصفقات
# ==================================================================
def _check_hard_exits(self, trade, current_price):
"""فحص وقف الخسارة وجني الأرباح الثابت"""
sl = trade.get('dynamic_stop_loss') or trade.get('stop_loss')
tp = trade.get('take_profit')
if sl and current_price <= sl:
return f"Stop Loss hit ({current_price:.4f} <= {sl:.4f})"
if tp and current_price >= tp:
return f"Take Profit hit ({current_price:.4f} >= {tp:.4f})"
return None
async def _check_atr_trailing(self, trade, snapshot, current_price):
"""تحديث وفحص الوقف المتحرك (ATR Trailing)"""
if trade.get('decision_data', {}).get('exit_profile') != 'ATR_TRAILING': return None
atr = snapshot.get('indicators_1h', {}).get('atr')
if not atr or atr <= 0: return None
# إعدادات الوقف المتحرك (الافتراضي: 3 أضعاف ATR)
multiplier = trade.get('decision_data', {}).get('exit_parameters', {}).get('atr_multiplier', 3.0)
new_sl = current_price - (atr * multiplier)
current_sl = trade.get('dynamic_stop_loss', 0)
# تحديث الوقف للأعلى فقط (لا نسمح بنزوله أبداً في صفقات الشراء)
if new_sl > current_sl:
trade['dynamic_stop_loss'] = new_sl
await self._update_trade_in_db(trade)
print(f" ⛓️ [Trailing] رفع وقف {trade['symbol']} إلى {new_sl:.4f}")
# الفحص الفوري بعد التحديث
if current_price <= trade['dynamic_stop_loss']:
return f"ATR Trailing Stop hit ({current_price:.4f} <= {trade['dynamic_stop_loss']:.4f})"
return None
async def _execute_paper_entry(self, symbol, watchlist_item, data):
"""تنفيذ دخول (Paper Trading) بأسعار حقيقية"""
entry_price = self._get_current_price(data)
if not entry_price: return
llm_decision = watchlist_item.get('llm_decision_context', {}).get('decision', {})
# استخدام قيم SL/TP من المستكشف أو قيم افتراضية آمنة (2% وقف، 4% هدف)
sl = llm_decision.get('stop_loss', entry_price * 0.98)
tp = llm_decision.get('take_profit', entry_price * 1.04)
trade = {
"id": f"trade_{int(time.time())}",
"symbol": symbol,
"entry_price": entry_price,
"entry_timestamp": datetime.now().isoformat(),
"status": "OPEN",
"type": "PAPER",
"position_size_usd": 1000.0, # حجم ثابت للتجربة
"stop_loss": sl,
"take_profit": tp,
"dynamic_stop_loss": sl, # يبدأ كـ وقف ثابت
"strategy": watchlist_item.get('strategy_hint', 'GENERIC'),
"decision_data": llm_decision
}
await self._save_new_trade(trade)
# إزالة العملة من قائمة المراقبة لتجنب تكرار الدخول
async with self.sentry_lock:
self.sentry_watchlist.pop(symbol, None)
print(f"🎉 [EXECUTION] تم فتح صفقة جديدة لـ {symbol} بسعر {entry_price:.4f}")
async def immediate_close_trade(self, symbol, close_price, reason):
"""إغلاق فوري للصفقة (Paper) وحساب النتائج"""
trade = await self.get_trade_by_symbol(symbol)
if not trade: return
trade['status'] = 'CLOSED'
trade['close_price'] = close_price
trade['close_timestamp'] = datetime.now().isoformat()
trade['close_reason'] = reason
# حساب الأداء
entry = trade['entry_price']
pnl_pct = ((close_price - entry) / entry) * 100
pnl_usd = (trade['position_size_usd'] * pnl_pct) / 100
trade['pnl_percent'] = pnl_pct
trade['pnl_usd'] = pnl_usd
await self._archive_trade(trade)
print(f"🏁 [CLOSED] {symbol} | PnL: {pnl_pct:+.2f}% (${pnl_usd:+.2f}) | Reason: {reason}")
# استدعاء دورة التعلم (اختياري)
if self.callback_on_close:
asyncio.create_task(self.callback_on_close())
# ==================================================================
# 💾 دوال مساعدة لقاعدة البيانات (R2 Wrapper)
# ==================================================================
async def get_open_trades(self):
return await self.r2_service.get_open_trades_async()
async def get_trade_by_symbol(self, symbol):
trades = await self.get_open_trades()
return next((t for t in trades if t['symbol'] == symbol and t['status'] == 'OPEN'), None)
async def _save_new_trade(self, trade):
trades = await self.get_open_trades()
trades.append(trade)
await self.r2_service.save_open_trades_async(trades)
async def _update_trade_in_db(self, updated_trade):
trades = await self.get_open_trades()
for i, t in enumerate(trades):
if t['id'] == updated_trade['id']:
trades[i] = updated_trade
break
await self.r2_service.save_open_trades_async(trades)
async def _archive_trade(self, closed_trade):
# نقل الصفقة من القائمة المفتوحة إلى الأرشيف (نظرياً)
open_trades = await self.get_open_trades()
open_trades = [t for t in open_trades if t['id'] != closed_trade['id']]
await self.r2_service.save_open_trades_async(open_trades)
# (يمكن إضافة دالة لحفظ الأرشيف هنا إذا لزم الأمر)
async def update_trade_strategy(self, trade, new_decision):
"""تحديث استراتيجية الصفقة بناءً على إعادة التحليل"""
trade['decision_data'].update(new_decision)
if new_decision.get('new_stop_loss'):
trade['stop_loss'] = new_decision['new_stop_loss']
trade['dynamic_stop_loss'] = new_decision['new_stop_loss']
if new_decision.get('new_take_profit'):
trade['take_profit'] = new_decision['new_take_profit']
await self._update_trade_in_db(trade)
print(f"🔄 [Strategy Update] تم تحديث أهداف {trade['symbol']}")
def get_sentry_status(self):
return {
"is_running": self.is_running,
"monitored_symbols": list(self.sentry_tasks.keys()),
"watchlist_size": len(self.sentry_watchlist)
}
print("✅ TradeManager loaded - V11.0 (Full Real Data & Shared XGBoost 5m)")