Trad / trade_manager.py
Riy777's picture
Update trade_manager.py
708a613
raw
history blame
35.1 kB
# trade_manager.py (محدث بالكامل مع محرك الخروج الديناميكي وإدارة التعارض)
import asyncio
import json
import time
import traceback
from datetime import datetime, timedelta
from helpers import safe_float_conversion, _apply_patience_logic
# 🔴 جديد: نحتاج إلى pandas لحساب ATR في المراقب
try:
import pandas as pd
import pandas_ta as ta
PANDAS_TA_AVAILABLE = True
except ImportError:
PANDAS_TA_AVAILABLE = False
print("⚠️ pandas_ta not available. ATR Trailing Stop will be disabled.")
# 🔴 جديد: محرك الخروج الديناميكي (كلاس داخلي أو مساعد)
class _DynamicExitEngine:
def __init__(self, data_manager):
self.data_manager = data_manager
async def evaluate_exit(self, trade: dict, current_price: float):
"""
يقيّم ما إذا كان يجب إغلاق الصفقة بناءً على ملف الخروج الخاص بها.
يرجع: (should_close, close_reason, updated_trade_object)
"""
if not PANDAS_TA_AVAILABLE:
# إذا لم تكن المكتبات متاحة، نعود إلى الوقف الثابت البسيط
return await self._evaluate_fixed_exit(trade, current_price)
try:
exit_profile = trade.get('decision_data', {}).get('exit_profile', 'FIXED_TARGET')
exit_params = trade.get('decision_data', {}).get('exit_parameters', {})
# 1. التحقق من الوقف الكارثي (Hard Stop) والهدف النهائي (Take Profit) دائماً
hard_stop = trade.get('stop_loss')
take_profit = trade.get('take_profit')
if hard_stop and current_price <= hard_stop:
return True, f"Hard Stop Loss hit: {current_price} <= {hard_stop}", trade
if take_profit and current_price >= take_profit:
return True, f"Final Take Profit hit: {current_price} >= {take_profit}", trade
# 2. التحقق من الوقف الديناميكي (المتحرك) إذا كان موجوداً
dynamic_stop = trade.get('dynamic_stop_loss')
if dynamic_stop and current_price <= dynamic_stop:
return True, f"Dynamic Trailing Stop hit: {current_price} <= {dynamic_stop}", trade
# 3. تنفيذ منطق ملف الخروج
if exit_profile == "ATR_TRAILING":
return await self._evaluate_atr_trailing(trade, current_price, exit_params)
elif exit_profile == "TIME_BASED":
return await self._evaluate_time_based(trade, current_price, exit_params)
elif exit_profile == "FIXED_TARGET":
# تم التعامل معه بالفعل (TP/SL)، لكن يمكن إضافة منطق نقطة التعادل
return await self._evaluate_break_even(trade, current_price, exit_params.get("break_even_trigger_percent", 0))
elif exit_profile == "SIGNAL_BASED":
# (منطق الطوارئ - يمكن توسيعه لاحقاً)
# مثال: التحقق من ارتفاع حجم البيع
# volume_spike = await self._check_emergency_signals(trade['symbol'])
# if volume_spike:
# return True, f"Emergency Signal Stop (Volume Spike)", trade
pass
# إذا لم يتم تشغيل أي شيء، تحقق من نقطة التعادل (إذا لم تكن جزءاً من ATR)
if exit_profile != "ATR_TRAILING":
return await self._evaluate_break_even(trade, current_price, exit_params.get("break_even_trigger_percent", 0))
return False, "No exit criteria met", trade
except Exception as e:
print(f"❌ Error in DynamicExitEngine for {trade.get('symbol')}: {e}")
traceback.print_exc()
# في حالة الخطأ، نعود إلى الوقف الثابت الآمن
return await self._evaluate_fixed_exit(trade, current_price)
async def _evaluate_fixed_exit(self, trade: dict, current_price: float):
"""الخروج الآمن (الافتراضي): الوقف الثابت والهدف الثابت فقط"""
hard_stop = trade.get('stop_loss')
take_profit = trade.get('take_profit')
if hard_stop and current_price <= hard_stop:
return True, f"Hard Stop Loss hit: {current_price} <= {hard_stop}", trade
if take_profit and current_price >= take_profit:
return True, f"Final Take Profit hit: {current_price} >= {take_profit}", trade
# التحقق من الوقف الديناميكي (إذا تم تعيينه مسبقاً)
dynamic_stop = trade.get('dynamic_stop_loss')
if dynamic_stop and current_price <= dynamic_stop:
return True, f"Dynamic Stop hit: {current_price} <= {dynamic_stop}", trade
return False, "No exit criteria met", trade
async def _evaluate_atr_trailing(self, trade: dict, current_price: float, params: dict):
"""حساب الوقف المتحرك بناءً على ATR"""
try:
atr_period = params.get("atr_period", 14)
atr_multiplier = params.get("atr_multiplier", 2.0)
break_even_trigger_percent = params.get("break_even_trigger_percent", 0) # مثال: 1.5%
# 1. جلب بيانات الشموع لحساب ATR (نستخدم إطار زمني قصير مثل 15m)
ohlcv_data = await self._fetch_ohlcv_for_atr(trade['symbol'], '15m', atr_period + 50) # +50 لتهيئة المؤشر
if ohlcv_data is None:
return False, "ATR data unavailable", trade # لا يمكن الحساب، استمر
# 2. حساب ATR
df = pd.DataFrame(ohlcv_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=atr_period)
if df['atr'].dropna().empty:
return False, "ATR calculation failed", trade
current_atr = df['atr'].iloc[-1]
if current_atr is None or current_atr == 0:
return False, "Invalid ATR value", trade
# 3. حساب سعر الوقف الجديد بناءً على ATR
new_atr_stop = current_price - (current_atr * atr_multiplier)
# 4. التحقق من نقطة التعادل (Break Even)
current_dynamic_stop = trade.get('dynamic_stop_loss', 0)
entry_price = trade.get('entry_price')
if break_even_trigger_percent > 0:
trigger_price = entry_price * (1 + break_even_trigger_percent / 100)
if current_price >= trigger_price:
# إذا وصلنا لهدف نقطة التعادل، الوقف الجديد *على الأقل* سعر الدخول
new_atr_stop = max(new_atr_stop, entry_price)
# 5. تحديث الوقف الديناميكي
# الوقف المتحرك يجب أن يرتفع فقط، لا ينخفض
if new_atr_stop > current_dynamic_stop:
trade['dynamic_stop_loss'] = new_atr_stop
print(f"📈 {trade['symbol']}: ATR Trailing Stop updated to {new_atr_stop:.6f}")
# ملاحظة: هذا التحديث محلي للمهمة فقط (لا يحفظ في R2 كل دقيقة)
# سيتم التحقق من الوقف المحدث في الدورة التالية (أو في بداية هذه الدالة)
return False, "ATR Trailing logic executed", trade
except Exception as e:
print(f"❌ Error in ATR Trailing logic for {trade.get('symbol')}: {e}")
return False, "ATR logic failed", trade # فشل آمن، لا تغلق الصفقة
async def _evaluate_time_based(self, trade: dict, current_price: float, params: dict):
"""التحقق من الوقف الزمني"""
exit_after_minutes = params.get("exit_after_minutes", 0)
if exit_after_minutes <= 0:
return False, "Time stop not configured", trade
entry_time = datetime.fromisoformat(trade.get('entry_timestamp'))
elapsed_minutes = (datetime.now() - entry_time).total_seconds() / 60
if elapsed_minutes >= exit_after_minutes:
return True, f"Time Stop hit: {elapsed_minutes:.1f}m >= {exit_after_minutes}m", trade
return False, "Time stop not hit", trade
async def _evaluate_break_even(self, trade: dict, current_price: float, break_even_trigger_percent: float):
"""منطق نقطة التعادل (إذا لم يكن جزءاً من ATR)"""
if break_even_trigger_percent <= 0:
return False, "Break-even not configured", trade
entry_price = trade.get('entry_price')
current_dynamic_stop = trade.get('dynamic_stop_loss', 0)
# إذا كان الوقف الحالي أقل من سعر الدخول
if current_dynamic_stop < entry_price:
trigger_price = entry_price * (1 + break_even_trigger_percent / 100)
if current_price >= trigger_price:
trade['dynamic_stop_loss'] = entry_price # ارفع الوقف إلى سعر الدخول
print(f"📈 {trade['symbol']}: Break-Even Stop activated at {entry_price:.6f}")
return False, "Break-even logic executed", trade
async def _fetch_ohlcv_for_atr(self, symbol, timeframe, limit):
"""جلب بيانات الشموع لحساب ATR (دالة مساعدة)"""
try:
if not self.data_manager or not self.data_manager.exchange:
return None
# جلب البيانات بشكل متزامن (لأننا داخل مهمة asyncio بالفعل)
# هذه الدالة (fetch_ohlcv) في ccxt ليست async
loop = asyncio.get_event_loop()
ohlcv_data = await loop.run_in_executor(
None,
lambda: self.data_manager.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
)
if ohlcv_data and len(ohlcv_data) >= limit:
return ohlcv_data
else:
return None
except Exception as e:
print(f"⚠️ Failed to fetch OHLCV for ATR ({symbol}): {e}")
return None
class TradeManager:
def __init__(self, r2_service, learning_engine=None, data_manager=None, state_manager=None): # 🔴 جديد
self.r2_service = r2_service
self.learning_engine = learning_engine
self.data_manager = data_manager
self.state_manager = state_manager # 🔴 جديد: لإدارة التعارض
self.monitoring_tasks = {}
self.is_running = False
self.monitoring_errors = {}
self.max_consecutive_errors = 5
self.exit_engine = _DynamicExitEngine(self.data_manager) # 🔴 جديد: تهيئة محرك الخروج
async def open_trade(self, symbol, decision, current_price):
try:
trade_type = decision.get("trade_type", "LONG")
if trade_type == "SHORT":
print(f"⚠️ تم رفض فتح صفقة SHORT لـ {symbol}. النظام مصمم لـ SPOT فقط.")
await self.r2_service.save_system_logs_async({
"trade_open_rejected": True, "reason": "SHORT trade not allowed", "symbol": symbol
})
return None
portfolio_state = await self.r2_service.get_portfolio_state_async()
available_capital = portfolio_state.get("current_capital_usd", 0)
if available_capital < 1:
print(f"❌ رأس المال غير كافي لـ {symbol}: {available_capital}")
return None
expected_target_minutes = decision.get('expected_target_minutes', 15)
expected_target_minutes = max(5, min(expected_target_minutes, 45))
expected_target_time = (datetime.now() + timedelta(minutes=expected_target_minutes)).isoformat()
strategy = decision.get('strategy')
if not strategy or strategy == 'unknown':
strategy = 'GENERIC'
# 🔴 جديد: جلب ملف الخروج والمعاملات
exit_profile = decision.get('exit_profile', 'FIXED_TARGET') # الافتراضي هو الوقف الثابت
exit_parameters = decision.get('exit_parameters', {})
trades = await self.r2_service.get_open_trades_async()
new_trade = {
"id": str(int(datetime.now().timestamp())),
"symbol": symbol,
"entry_price": current_price,
"entry_timestamp": datetime.now().isoformat(),
"decision_data": decision, # 🔴 هذا يحتوي الآن على ملف الخروج
"status": "OPEN",
"stop_loss": decision.get("stop_loss"), # الوقف الكارثي
"take_profit": decision.get("take_profit"), # الهدف النهائي
"dynamic_stop_loss": decision.get("stop_loss"), # 🔴 جديد: يبدأ الوقف المتحرك عند الوقف الكارثي
"trade_type": "LONG",
"position_size_usd": available_capital,
"expected_target_minutes": expected_target_minutes,
"expected_target_time": expected_target_time,
"is_monitored": True,
"strategy": strategy,
"monitoring_started": False
}
trades.append(new_trade)
await self.r2_service.save_open_trades_async(trades)
portfolio_state["invested_capital_usd"] = available_capital
portfolio_state["current_capital_usd"] = 0.0
portfolio_state["total_trades"] = portfolio_state.get("total_trades", 0) + 1
await self.r2_service.save_portfolio_state_async(portfolio_state)
await self.r2_service.save_system_logs_async({
"new_trade_opened": True, "symbol": symbol, "position_size": available_capital,
"strategy": strategy, "exit_profile": exit_profile # 🔴 جديد: تسجيل ملف الخروج
})
print(f"✅ تم فتح صفقة جديدة (LONG) لـ {symbol} (استراتيجية: {strategy} | ملف خروج: {exit_profile})")
return new_trade
except Exception as e:
print(f"❌ فشل فتح صفقة لـ {symbol}: {e}")
await self.r2_service.save_system_logs_async({
"trade_open_failed": True, "symbol": symbol, "error": str(e)
})
raise
async def close_trade(self, trade_to_close, close_price, reason="إغلاق بالنظام"):
try:
symbol = trade_to_close.get('symbol')
trade_to_close['status'] = 'CLOSED'
trade_to_close['close_price'] = close_price
trade_to_close['close_timestamp'] = datetime.now().isoformat()
trade_to_close['is_monitored'] = False
entry_price = trade_to_close['entry_price']
position_size = trade_to_close['position_size_usd']
trade_type = "LONG"
strategy = trade_to_close.get('strategy', 'unknown')
pnl = 0.0
pnl_percent = 0.0
if entry_price and entry_price > 0 and close_price and close_price > 0:
try:
pnl_percent = ((close_price - entry_price) / entry_price) * 100
pnl = position_size * (pnl_percent / 100)
except (TypeError, ZeroDivisionError) as calc_error:
pnl = 0.0
pnl_percent = 0.0
trade_to_close['pnl_usd'] = pnl
trade_to_close['pnl_percent'] = pnl_percent
await self._archive_closed_trade(trade_to_close)
await self._update_trade_summary(trade_to_close)
portfolio_state = await self.r2_service.get_portfolio_state_async()
current_capital = portfolio_state.get("current_capital_usd", 0)
new_capital = current_capital + position_size + pnl
portfolio_state["current_capital_usd"] = new_capital
portfolio_state["invested_capital_usd"] = 0.0
if pnl > 0:
portfolio_state["winning_trades"] = portfolio_state.get("winning_trades", 0) + 1
portfolio_state["total_profit_usd"] = portfolio_state.get("total_profit_usd", 0.0) + pnl
elif pnl < 0:
portfolio_state["total_loss_usd"] = portfolio_state.get("total_loss_usd", 0.0) + abs(pnl)
await self.r2_service.save_portfolio_state_async(portfolio_state)
open_trades = await self.r2_service.get_open_trades_async()
trades_to_keep = [t for t in open_trades if t.get('id') != trade_to_close.get('id')]
await self.r2_service.save_open_trades_async(trades_to_keep)
if symbol in self.monitoring_tasks:
del self.monitoring_tasks[symbol]
if symbol in self.monitoring_errors:
del self.monitoring_errors[symbol]
await self.r2_service.save_system_logs_async({
"trade_closed": True, "symbol": symbol, "pnl_usd": pnl, "pnl_percent": pnl_percent,
"new_capital": new_capital, "strategy": strategy, "trade_type": trade_type, "reason": reason
})
if self.learning_engine and self.learning_engine.initialized:
# 🔴 نمرر الصفقة كاملة (تحتوي على decision_data -> exit_profile)
await self.learning_engine.analyze_trade_outcome(trade_to_close, reason)
print(f"✅ تم إغلاق صفقة {symbol} - السبب: {reason} - الربح: {pnl_percent:+.2f}%")
return True
except Exception as e:
print(f"❌ فشل إغلاق صفقة {trade_to_close.get('symbol')}: {e}")
await self.r2_service.save_system_logs_async({
"trade_close_failed": True, "symbol": trade_to_close.get('symbol'), "error": str(e)
})
raise
async def update_trade(self, trade_to_update, re_analysis_decision):
try:
symbol = trade_to_update.get('symbol')
# 🔴 جديد: تحديث ملف الخروج والمعاملات
if re_analysis_decision.get('action') == "UPDATE_TRADE":
trade_to_update['stop_loss'] = re_analysis_decision['new_stop_loss']
trade_to_update['take_profit'] = re_analysis_decision['new_take_profit']
# تحديث الوقف المتحرك ليبدأ من الوقف الجديد
trade_to_update['dynamic_stop_loss'] = re_analysis_decision['new_stop_loss']
trade_to_update['decision_data']['exit_profile'] = re_analysis_decision['new_exit_profile']
trade_to_update['decision_data']['exit_parameters'] = re_analysis_decision['new_exit_parameters']
print(f" 🔄 {symbol}: تم تحديث ملف الخروج إلى {re_analysis_decision['new_exit_profile']}")
new_expected_minutes = re_analysis_decision.get('new_expected_minutes')
if new_expected_minutes:
new_expected_minutes = max(5, min(new_expected_minutes, 45))
trade_to_update['expected_target_minutes'] = new_expected_minutes
trade_to_update['expected_target_time'] = (datetime.now() + timedelta(minutes=new_expected_minutes)).isoformat()
original_strategy = trade_to_update.get('strategy')
if not original_strategy or original_strategy == 'unknown':
original_strategy = re_analysis_decision.get('strategy', 'GENERIC')
trade_to_update['strategy'] = original_strategy
trade_to_update['decision_data']['reasoning'] = re_analysis_decision.get('reasoning') # تحديث سبب القرار
trade_to_update['is_monitored'] = True
trade_to_update['trade_type'] = "LONG"
open_trades = await self.r2_service.get_open_trades_async()
for i, trade in enumerate(open_trades):
if trade.get('id') == trade_to_update.get('id'):
open_trades[i] = trade_to_update
break
await self.r2_service.save_open_trades_async(open_trades)
await self.r2_service.save_system_logs_async({
"trade_updated": True, "symbol": symbol, "action": "UPDATE_TRADE",
"strategy": original_strategy,
"new_exit_profile": re_analysis_decision.get('new_exit_profile', 'N/A')
})
print(f"✅ تم تحديث صفقة {symbol}")
return True
except Exception as e:
print(f"❌ فشل تحديث صفقة {trade_to_update.get('symbol')}: {e}")
raise
async def immediate_close_trade(self, symbol, close_price, reason="المراقبة الفورية"):
try:
open_trades = await self.r2_service.get_open_trades_async()
trade_to_close = None
for trade in open_trades:
if trade['symbol'] == symbol and trade['status'] == 'OPEN':
trade_to_close = trade
break
if not trade_to_close:
print(f"⚠️ لم يتم العثور على صفقة مفتوحة لـ {symbol}")
return False
await self.close_trade(trade_to_close, close_price, reason)
return True
except Exception as e:
print(f"❌ فشل الإغلاق الفوري لـ {symbol}: {e}")
return False
async def start_trade_monitoring(self):
self.is_running = True
print(f"🔍 بدء مراقبة الصفقات (Dynamic Exit Engine) (Pandas: {PANDAS_TA_AVAILABLE})...")
while self.is_running:
try:
open_trades = await self.r2_service.get_open_trades_async()
current_time = time.time()
for trade in open_trades:
symbol = trade['symbol']
if self.monitoring_errors.get(symbol, 0) >= self.max_consecutive_errors:
if symbol in self.monitoring_tasks:
del self.monitoring_tasks[symbol]
continue
if symbol not in self.monitoring_tasks:
task = asyncio.create_task(self._monitor_single_trade(trade))
self.monitoring_tasks[symbol] = {
'task': task,
'start_time': current_time,
'trade_object': trade # 🔴 حفظ نسخة محلية من الصفقة
}
trade['monitoring_started'] = True
current_symbols = {trade['symbol'] for trade in open_trades}
for symbol in list(self.monitoring_tasks.keys()):
if symbol not in current_symbols:
task_info = self.monitoring_tasks[symbol]
if not task_info['task'].done():
task_info['task'].cancel()
del self.monitoring_tasks[symbol]
if symbol in self.monitoring_errors:
del self.monitoring_errors[symbol]
await asyncio.sleep(10) # الفحص الرئيسي كل 10 ثوانٍ
except Exception as error:
print(f"❌ خطأ في مراقبة الصفقات: {error}")
await asyncio.sleep(30)
async def _monitor_single_trade(self, trade_object):
"""
🔴 (أعيدت كتابته بالكامل)
المراقب التكتيكي (1 دقيقة) الذي ينفذ ملف الخروج الديناميكي.
"""
symbol = trade_object['symbol']
# 🔴 نستخدم النسخة المحلية من الصفقة لتتبع الوقف المتحرك
local_trade = trade_object.copy()
print(f"🔍 بدء مراقبة الصفقة (تكتيكي): {symbol} (ملف: {local_trade.get('decision_data', {}).get('exit_profile', 'N/A')})")
while (symbol in self.monitoring_tasks and
self.is_running and
self.monitoring_errors.get(symbol, 0) < self.max_consecutive_errors):
try:
# 🔴 1. التحقق من قفل التحليل الاستراتيجي (منع التعارض)
if self.state_manager and self.state_manager.trade_analysis_lock.locked():
print(f"⏸️ [Monitor] إيقاف مؤقت لـ {symbol} (التحليل الاستراتيجي يعمل...)")
await asyncio.sleep(10) # انتظار انتهاء التحليل
continue # تخطي هذه الدورة
if not self.data_manager:
print(f"⚠️ DataManager غير متوفر لـ {symbol}")
await asyncio.sleep(15)
continue
# 🔴 2. جلب السعر الحالي
try:
current_price = await asyncio.wait_for(
self.data_manager.get_latest_price_async(symbol),
timeout=10
)
except asyncio.TimeoutError:
print(f"⏰ مهلة انتظار السعر لـ {symbol}")
self._increment_monitoring_error(symbol)
await asyncio.sleep(15)
continue
if not current_price:
print(f"⚠️ لم يتم الحصول على سعر لـ {symbol}")
self._increment_monitoring_error(symbol)
await asyncio.sleep(15)
continue
# 🔴 3. تنفيذ محرك الخروج الديناميكي
# نمرر النسخة المحلية من الصفقة (local_trade)
should_close, close_reason, updated_local_trade = await self.exit_engine.evaluate_exit(
local_trade,
current_price
)
# تحديث النسخة المحلية (تحتوي على dynamic_stop_loss المحدث)
local_trade = updated_local_trade
# 🔴 4. اتخاذ قرار الإغلاق
if should_close:
print(f"🛑 [Monitor] قرار إغلاق لـ {symbol}: {close_reason}")
if self.r2_service.acquire_lock(): # محاولة قفل R2 للإغلاق
try:
# قبل الإغلاق، تأكد من جلب أحدث نسخة من R2
latest_trade_data = await self.r2_service.get_trade_by_symbol_async(symbol)
if latest_trade_data and latest_trade_data['status'] == 'OPEN':
await self.immediate_close_trade(symbol, current_price, f"Tactical Monitor: {close_reason}")
else:
print(f"⚠️ الصفقة {symbol} لم تعد مفتوحة، تم إلغاء الإغلاق التكتيكي.")
except Exception as close_error:
print(f"❌ فشل الإغلاق التلقائي لـ {symbol}: {close_error}")
finally:
self.r2_service.release_lock()
break # الخروج من حلقة المراقبة بعد محاولة الإغلاق
# 🔴 5. إعادة تعيين عداد الأخطاء والانتظار
if symbol in self.monitoring_errors:
self.monitoring_errors[symbol] = 0
# انتظار 60 ثانية (دورة المراقب التكتيكي)
await asyncio.sleep(60)
except Exception as error:
error_count = self._increment_monitoring_error(symbol)
print(f"❌ خطأ في مراقبة {symbol} (الخطأ #{error_count}): {error}")
traceback.print_exc()
if error_count >= self.max_consecutive_errors:
print(f"🚨 إيقاف مراقبة {symbol} بسبب الأخطاء المتتالية")
await self.r2_service.save_system_logs_async({
"monitoring_stopped": True, "symbol": symbol, "error_count": error_count, "error": str(error)
})
break # الخروج من الحلقة بسبب الأخطاء
await asyncio.sleep(30) # انتظار أطول بعد الخطأ
print(f"🛑 توقيف مراقبة الصفقة (تكتيكي): {symbol}")
if symbol in self.monitoring_tasks:
del self.monitoring_tasks[symbol]
if symbol in self.monitoring_errors:
del self.monitoring_errors[symbol]
def _increment_monitoring_error(self, symbol):
if symbol not in self.monitoring_errors:
self.monitoring_errors[symbol] = 0
self.monitoring_errors[symbol] += 1
return self.monitoring_errors[symbol]
def stop_monitoring(self):
self.is_running = False
print("🛑 إيقاف جميع مهام المراقبة...")
for symbol, task_info in self.monitoring_tasks.items():
if not task_info['task'].done():
task_info['task'].cancel()
print(f"✅ تم إلغاء مهمة مراقبة {symbol}")
self.monitoring_tasks.clear()
self.monitoring_errors.clear()
async def _archive_closed_trade(self, closed_trade):
try:
key = "closed_trades_history.json"
try:
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
history = json.loads(response['Body'].read())
except Exception:
history = []
history.append(closed_trade)
if len(history) > 1000:
history = history[-1000:]
data_json = json.dumps(history, indent=2).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
except Exception as e:
print(f"❌ فشل أرشفة الصفقة: {e}")
async def _update_trade_summary(self, closed_trade):
try:
key = "trade_summary.json"
try:
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
summary = json.loads(response['Body'].read())
except Exception:
summary = {
"total_trades": 0, "winning_trades": 0, "losing_trades": 0,
"total_profit_usd": 0.0, "total_loss_usd": 0.0, "win_percentage": 0.0,
"avg_profit_per_trade": 0.0, "avg_loss_per_trade": 0.0,
"largest_win": 0.0, "largest_loss": 0.0,
"last_updated": datetime.now().isoformat()
}
pnl = closed_trade.get('pnl_usd', 0.0)
summary['total_trades'] += 1
if pnl >= 0:
summary['winning_trades'] += 1
summary['total_profit_usd'] += pnl
if pnl > summary.get('largest_win', 0):
summary['largest_win'] = pnl
else:
summary['losing_trades'] += 1
summary['total_loss_usd'] += abs(pnl)
if abs(pnl) > summary.get('largest_loss', 0):
summary['largest_loss'] = abs(pnl)
if summary['total_trades'] > 0:
summary['win_percentage'] = (summary['winning_trades'] / summary['total_trades']) * 100
if summary['winning_trades'] > 0:
summary['avg_profit_per_trade'] = summary['total_profit_usd'] / summary['winning_trades']
if summary['losing_trades'] > 0:
summary['avg_loss_per_trade'] = summary['total_loss_usd'] / summary['losing_trades']
summary['last_updated'] = datetime.now().isoformat()
data_json = json.dumps(summary, indent=2).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
except Exception as e:
print(f"❌ فشل تحديث ملخص التداول: {e}")
async def get_open_trades(self):
try:
return await self.r2_service.get_open_trades_async()
except Exception as e:
print(f"❌ فشل جلب الصفقات المفتوحة: {e}")
return []
async def get_trade_by_symbol(self, symbol):
try:
open_trades = await self.get_open_trades()
for trade in open_trades:
if trade['symbol'] == symbol and trade['status'] == 'OPEN':
return trade
return None
except Exception as e:
print(f"❌ فشل البحث عن صفقة {symbol}: {e}")
return None
def get_monitoring_status(self):
return {
'is_running': self.is_running,
'active_tasks': len(self.monitoring_tasks),
'monitoring_errors': dict(self.monitoring_errors),
'max_consecutive_errors': self.max_consecutive_errors
}
print(f"✅ Trade Manager loaded - V2 (Dynamic Exit Engine & Lock-Aware Monitor / Pandas: {PANDAS_TA_AVAILABLE})")