Trad / trade_manager.py
Riy777's picture
Update trade_manager.py
f3ca3ad
raw
history blame
22.2 kB
import asyncio
import json
import time
import traceback
from datetime import datetime, timedelta
from helpers import safe_float_conversion, _apply_patience_logic
class TradeManager:
def __init__(self, r2_service, learning_engine=None, data_manager=None):
self.r2_service = r2_service
self.learning_engine = learning_engine
self.data_manager = data_manager
self.monitoring_tasks = {}
self.is_running = False
self.monitoring_errors = {}
self.max_consecutive_errors = 5
async def open_trade(self, symbol, decision, current_price):
try:
portfolio_state = await self.r2_service.get_portfolio_state_async()
available_capital = portfolio_state.get("current_capital_usd", 0)
if available_capital < 1:
print(f"❌ رأس المال غير كافي لفتح صفقة لـ {symbol}: {available_capital}")
return None
expected_target_minutes = decision.get('expected_target_minutes', 15)
expected_target_minutes = max(5, min(expected_target_minutes, 45))
expected_target_time = (datetime.now() + timedelta(minutes=expected_target_minutes)).isoformat()
strategy = decision.get('strategy')
if not strategy or strategy == 'unknown':
strategy = 'GENERIC'
trades = await self.r2_service.get_open_trades_async()
new_trade = {
"id": str(int(datetime.now().timestamp())),
"symbol": symbol,
"entry_price": current_price,
"entry_timestamp": datetime.now().isoformat(),
"decision_data": decision,
"status": "OPEN",
"stop_loss": decision.get("stop_loss"),
"take_profit": decision.get("take_profit"),
"trade_type": decision.get("trade_type"),
"position_size_usd": available_capital,
"expected_target_minutes": expected_target_minutes,
"expected_target_time": expected_target_time,
"is_monitored": True,
"strategy": strategy,
"monitoring_started": False
}
trades.append(new_trade)
await self.r2_service.save_open_trades_async(trades)
portfolio_state["invested_capital_usd"] = available_capital
portfolio_state["current_capital_usd"] = 0.0
portfolio_state["total_trades"] = portfolio_state.get("total_trades", 0) + 1
await self.r2_service.save_portfolio_state_async(portfolio_state)
await self.r2_service.save_system_logs_async({
"new_trade_opened": True,
"symbol": symbol,
"position_size": available_capital,
"expected_minutes": expected_target_minutes,
"trade_type": decision.get("trade_type", "LONG"),
"strategy": strategy
})
print(f"✅ تم فتح صفقة جديدة لـ {symbol} باستراتيجية {strategy}")
return new_trade
except Exception as e:
print(f"❌ فشل فتح صفقة لـ {symbol}: {e}")
await self.r2_service.save_system_logs_async({
"trade_open_failed": True,
"symbol": symbol,
"error": str(e)
})
raise
async def close_trade(self, trade_to_close, close_price, reason="إغلاق بالنظام"):
try:
symbol = trade_to_close.get('symbol')
trade_to_close['status'] = 'CLOSED'
trade_to_close['close_price'] = close_price
trade_to_close['close_timestamp'] = datetime.now().isoformat()
trade_to_close['is_monitored'] = False
entry_price = trade_to_close['entry_price']
position_size = trade_to_close['position_size_usd']
trade_type = trade_to_close.get('trade_type', 'LONG')
strategy = trade_to_close.get('strategy', 'unknown')
pnl = 0.0
pnl_percent = 0.0
if entry_price and entry_price > 0 and close_price and close_price > 0:
try:
if trade_type == 'LONG':
pnl_percent = ((close_price - entry_price) / entry_price) * 100
pnl = position_size * (pnl_percent / 100)
elif trade_type == 'SHORT':
pnl_percent = ((entry_price - close_price) / entry_price) * 100
pnl = position_size * (pnl_percent / 100)
except (TypeError, ZeroDivisionError) as calc_error:
pnl = 0.0
pnl_percent = 0.0
trade_to_close['pnl_usd'] = pnl
trade_to_close['pnl_percent'] = pnl_percent
await self._archive_closed_trade(trade_to_close)
await self._update_trade_summary(trade_to_close)
portfolio_state = await self.r2_service.get_portfolio_state_async()
current_capital = portfolio_state.get("current_capital_usd", 0)
invested_capital = portfolio_state.get("invested_capital_usd", 0)
new_capital = current_capital + position_size + pnl
portfolio_state["current_capital_usd"] = new_capital
portfolio_state["invested_capital_usd"] = 0.0
if pnl > 0:
portfolio_state["winning_trades"] = portfolio_state.get("winning_trades", 0) + 1
portfolio_state["total_profit_usd"] = portfolio_state.get("total_profit_usd", 0.0) + pnl
elif pnl < 0:
portfolio_state["total_loss_usd"] = portfolio_state.get("total_loss_usd", 0.0) + abs(pnl)
await self.r2_service.save_portfolio_state_async(portfolio_state)
open_trades = await self.r2_service.get_open_trades_async()
trades_to_keep = [t for t in open_trades if t.get('id') != trade_to_close.get('id')]
await self.r2_service.save_open_trades_async(trades_to_keep)
# إزالة من المهام النشطة
if symbol in self.monitoring_tasks:
del self.monitoring_tasks[symbol]
if symbol in self.monitoring_errors:
del self.monitoring_errors[symbol]
await self.r2_service.save_system_logs_async({
"trade_closed": True,
"symbol": symbol,
"entry_price": entry_price,
"close_price": close_price,
"pnl_usd": pnl,
"pnl_percent": pnl_percent,
"new_capital": new_capital,
"strategy": strategy,
"position_size": position_size,
"trade_type": trade_type,
"reason": reason
})
if self.learning_engine and self.learning_engine.initialized:
await self.learning_engine.analyze_trade_outcome(trade_to_close, reason)
print(f"✅ تم إغلاق صفقة {symbol} - السبب: {reason} - الربح: {pnl_percent:+.2f}%")
return True
except Exception as e:
print(f"❌ فشل إغلاق صفقة {trade_to_close.get('symbol')}: {e}")
await self.r2_service.save_system_logs_async({
"trade_close_failed": True,
"symbol": trade_to_close.get('symbol'),
"error": str(e)
})
raise
async def update_trade(self, trade_to_update, re_analysis_decision):
try:
symbol = trade_to_update.get('symbol')
if re_analysis_decision.get('new_stop_loss'):
trade_to_update['stop_loss'] = re_analysis_decision['new_stop_loss']
if re_analysis_decision.get('new_take_profit'):
trade_to_update['take_profit'] = re_analysis_decision['new_take_profit']
new_expected_minutes = re_analysis_decision.get('new_expected_minutes')
if new_expected_minutes:
new_expected_minutes = max(5, min(new_expected_minutes, 45))
trade_to_update['expected_target_minutes'] = new_expected_minutes
trade_to_update['expected_target_time'] = (datetime.now() + timedelta(minutes=new_expected_minutes)).isoformat()
original_strategy = trade_to_update.get('strategy')
if not original_strategy or original_strategy == 'unknown':
original_strategy = re_analysis_decision.get('strategy', 'GENERIC')
trade_to_update['strategy'] = original_strategy
trade_to_update['decision_data'] = re_analysis_decision
trade_to_update['is_monitored'] = True
open_trades = await self.r2_service.get_open_trades_async()
for i, trade in enumerate(open_trades):
if trade.get('id') == trade_to_update.get('id'):
open_trades[i] = trade_to_update
break
await self.r2_service.save_open_trades_async(open_trades)
await self.r2_service.save_system_logs_async({
"trade_updated": True,
"symbol": symbol,
"new_expected_minutes": new_expected_minutes,
"action": "UPDATE_TRADE",
"strategy": original_strategy
})
print(f"✅ تم تحديث صفقة {symbol}")
return True
except Exception as e:
print(f"❌ فشل تحديث صفقة {trade_to_update.get('symbol')}: {e}")
raise
async def immediate_close_trade(self, symbol, close_price, reason="المراقبة الفورية"):
try:
open_trades = await self.r2_service.get_open_trades_async()
trade_to_close = None
for trade in open_trades:
if trade['symbol'] == symbol and trade['status'] == 'OPEN':
trade_to_close = trade
break
if not trade_to_close:
print(f"⚠️ لم يتم العثور على صفقة مفتوحة لـ {symbol}")
return False
await self.close_trade(trade_to_close, close_price, reason)
return True
except Exception as e:
print(f"❌ فشل الإغلاق الفوري لـ {symbol}: {e}")
return False
async def start_trade_monitoring(self):
self.is_running = True
print("🔍 بدء مراقبة الصفقات...")
while self.is_running:
try:
open_trades = await self.r2_service.get_open_trades_async()
current_time = time.time()
for trade in open_trades:
symbol = trade['symbol']
# تخطي الصفقات التي تجاوزت حد الأخطاء
if self.monitoring_errors.get(symbol, 0) >= self.max_consecutive_errors:
if symbol in self.monitoring_tasks:
del self.monitoring_tasks[symbol]
continue
# بدء المراقبة إذا لم تكن نشطة
if symbol not in self.monitoring_tasks:
task = asyncio.create_task(self._monitor_single_trade(trade))
self.monitoring_tasks[symbol] = {
'task': task,
'start_time': current_time,
'trade': trade
}
trade['monitoring_started'] = True
# تنظيف المهام المنتهية
current_symbols = {trade['symbol'] for trade in open_trades}
for symbol in list(self.monitoring_tasks.keys()):
if symbol not in current_symbols:
task_info = self.monitoring_tasks[symbol]
if not task_info['task'].done():
task_info['task'].cancel()
del self.monitoring_tasks[symbol]
if symbol in self.monitoring_errors:
del self.monitoring_errors[symbol]
await asyncio.sleep(10)
except Exception as error:
print(f"❌ خطأ في مراقبة الصفقات: {error}")
await asyncio.sleep(30)
async def _monitor_single_trade(self, trade):
symbol = trade['symbol']
max_monitoring_time = 3600 # أقصى وقت مراقبة: ساعة واحدة
print(f"🔍 بدء مراقبة الصفقة: {symbol}")
while (symbol in self.monitoring_tasks and
self.is_running and
self.monitoring_errors.get(symbol, 0) < self.max_consecutive_errors):
try:
start_time = time.time()
if not self.data_manager:
print(f"⚠️ DataManager غير متوفر لـ {symbol}")
await asyncio.sleep(15)
continue
# الحصول على السعر الحالي مع مهلة زمنية
try:
current_price = await asyncio.wait_for(
self.data_manager.get_latest_price_async(symbol),
timeout=10
)
except asyncio.TimeoutError:
print(f"⏰ مهلة انتظار السعر لـ {symbol}")
self._increment_monitoring_error(symbol)
await asyncio.sleep(15)
continue
if not current_price:
print(f"⚠️ لم يتم الحصول على سعر لـ {symbol}")
self._increment_monitoring_error(symbol)
await asyncio.sleep(15)
continue
entry_price = trade['entry_price']
stop_loss = trade.get('stop_loss')
take_profit = trade.get('take_profit')
should_close, close_reason = False, ""
# التحقق من شروط الإغلاق
if stop_loss and current_price <= stop_loss:
should_close, close_reason = True, f"وصول وقف الخسارة: {current_price} <= {stop_loss}"
elif take_profit and current_price >= take_profit:
should_close, close_reason = True, f"وصول جني الأرباح: {current_price} >= {take_profit}"
# تحديث وقف الخسارة الديناميكي
if not should_close and current_price > entry_price:
dynamic_stop = current_price * 0.98
if dynamic_stop > (stop_loss or 0):
trade['stop_loss'] = dynamic_stop
# إغلاق الصفقة إذا لزم الأمر
if should_close:
if self.r2_service.acquire_lock():
try:
await self.immediate_close_trade(symbol, current_price, close_reason)
except Exception as close_error:
print(f"❌ فشل الإغلاق التلقائي لـ {symbol}: {close_error}")
finally:
self.r2_service.release_lock()
break
# إعادة تعيين عداد الأخطاء عند النجاح
if symbol in self.monitoring_errors:
self.monitoring_errors[symbol] = 0
# التحقق من وقت المراقبة الطويل
monitoring_duration = time.time() - start_time
if monitoring_duration > max_monitoring_time:
print(f"🕒 انتهى وقت مراقبة الصفقة {symbol}")
break
await asyncio.sleep(15)
except Exception as error:
error_count = self._increment_monitoring_error(symbol)
print(f"❌ خطأ في مراقبة {symbol} (الخطأ #{error_count}): {error}")
if error_count >= self.max_consecutive_errors:
print(f"🚨 إيقاف مراقبة {symbol} بسبب الأخطاء المتتالية")
await self.r2_service.save_system_logs_async({
"monitoring_stopped": True,
"symbol": symbol,
"error_count": error_count,
"error": str(error)
})
break
await asyncio.sleep(30)
print(f"🛑 توقيف مراقبة الصفقة: {symbol}")
def _increment_monitoring_error(self, symbol):
if symbol not in self.monitoring_errors:
self.monitoring_errors[symbol] = 0
self.monitoring_errors[symbol] += 1
return self.monitoring_errors[symbol]
def stop_monitoring(self):
self.is_running = False
print("🛑 إيقاف جميع مهام المراقبة...")
for symbol, task_info in self.monitoring_tasks.items():
if not task_info['task'].done():
task_info['task'].cancel()
print(f"✅ تم إلغاء مهمة مراقبة {symbol}")
self.monitoring_tasks.clear()
self.monitoring_errors.clear()
async def _archive_closed_trade(self, closed_trade):
try:
key = "closed_trades_history.json"
try:
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
history = json.loads(response['Body'].read())
except Exception:
history = []
history.append(closed_trade)
# حفظ آخر 1000 صفقة فقط
if len(history) > 1000:
history = history[-1000:]
data_json = json.dumps(history, indent=2).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
except Exception as e:
print(f"❌ فشل أرشفة الصفقة: {e}")
async def _update_trade_summary(self, closed_trade):
try:
key = "trade_summary.json"
try:
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
summary = json.loads(response['Body'].read())
except Exception:
summary = {
"total_trades": 0, "winning_trades": 0, "losing_trades": 0,
"total_profit_usd": 0.0, "total_loss_usd": 0.0, "win_percentage": 0.0,
"avg_profit_per_trade": 0.0, "avg_loss_per_trade": 0.0,
"largest_win": 0.0, "largest_loss": 0.0,
"last_updated": datetime.now().isoformat()
}
pnl = closed_trade.get('pnl_usd', 0.0)
summary['total_trades'] += 1
if pnl >= 0:
summary['winning_trades'] += 1
summary['total_profit_usd'] += pnl
if pnl > summary.get('largest_win', 0):
summary['largest_win'] = pnl
else:
summary['losing_trades'] += 1
summary['total_loss_usd'] += abs(pnl)
if abs(pnl) > summary.get('largest_loss', 0):
summary['largest_loss'] = abs(pnl)
if summary['total_trades'] > 0:
summary['win_percentage'] = (summary['winning_trades'] / summary['total_trades']) * 100
if summary['winning_trades'] > 0:
summary['avg_profit_per_trade'] = summary['total_profit_usd'] / summary['winning_trades']
if summary['losing_trades'] > 0:
summary['avg_loss_per_trade'] = summary['total_loss_usd'] / summary['losing_trades']
summary['last_updated'] = datetime.now().isoformat()
data_json = json.dumps(summary, indent=2).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
except Exception as e:
print(f"❌ فشل تحديث ملخص التداول: {e}")
async def get_open_trades(self):
try:
return await self.r2_service.get_open_trades_async()
except Exception as e:
print(f"❌ فشل جلب الصفقات المفتوحة: {e}")
return []
async def get_trade_by_symbol(self, symbol):
try:
open_trades = await self.get_open_trades()
for trade in open_trades:
if trade['symbol'] == symbol and trade['status'] == 'OPEN':
return trade
return None
except Exception as e:
print(f"❌ فشل البحث عن صفقة {symbol}: {e}")
return None
def get_monitoring_status(self):
return {
'is_running': self.is_running,
'active_tasks': len(self.monitoring_tasks),
'monitoring_errors': dict(self.monitoring_errors),
'max_consecutive_errors': self.max_consecutive_errors
}