Trad / trade_manager.py
Riy777's picture
Update trade_manager.py
a4364da verified
# trade_manager.py (V8.7 - Full Production Version - NO ABBREVIATIONS)
import asyncio
import json
import uuid
import traceback
from datetime import datetime
class TradeManager:
def __init__(self, r2_service, data_manager, titan_engine, pattern_engine):
self.r2 = r2_service
self.data_manager = data_manager
self.titan = titan_engine
self.pattern_engine = pattern_engine
# قواميس تتبع الحالة في الذاكرة (In-Memory State)
self.open_positions = {}
self.watchlist = {}
self.sentry_tasks = {}
self.running = True
# 🔒 قفل لضمان عدم تضارب عمليات الشراء/البيع المتزامنة
# هذا القفل يمنع حالتين من الدخول إلى منطقة التعديل على المحفظة في نفس الوقت
self.execution_lock = asyncio.Lock()
print("🛡️ [TradeManager V8.7] Sentry Module Initialized (Sync & Lock Enabled).")
async def initialize_sentry_exchanges(self):
"""
تحميل الصفقات المفتوحة من R2 عند بدء تشغيل النظام لاستكمال إدارتها.
تعتمد الآن على وظيفة المزامنة المركزية.
"""
print("🛡️ [TradeManager] Initializing and syncing state with R2...")
await self.sync_internal_state()
async def sync_internal_state(self):
"""
مزامنة الحالة الداخلية مع R2 لضمان عدم فقدان أي صفقة مفتوحة.
تستخدم عند بدء النظام وعند بدء كل دورة تحليل جديدة لضمان تطابق البيانات.
"""
try:
# جلب الصفقات المسجلة في R2
loaded_trades = await self.r2.get_open_trades_async()
if loaded_trades:
# إنشاء مجموعة من الرموز الموجودة في R2 للمقارنة
r2_symbols = set(t['symbol'] for t in loaded_trades)
# 1. إضافة الصفقات المفقودة من الذاكرة
for t in loaded_trades:
symbol = t['symbol']
if symbol not in self.open_positions:
self.open_positions[symbol] = t
print(f"🔄 [TradeManager] SYNC: Resumed watching trade from R2: {symbol}")
# 2. (اختياري) تنظيف الذاكرة من الصفقات التي أغلقت في R2 ولكنها علقت هنا
# يتم ذلك بمقارنة عكسية إذا تطلب الأمر مستقبلاً
# التأكد من أن جميع الصفقات المفتوحة لديها حراس يعملون
await self.start_sentry_loops()
except Exception as e:
print(f"⚠️ [TradeManager] Critical State Sync Warning: {e}")
traceback.print_exc()
async def update_sentry_watchlist(self, approved_candidates):
"""
تحديث قائمة المراقبة النشطة للحارس (Sniper) بناءً على ترشيحات الدماغ الجديدة.
"""
# إيقاف المهام القديمة التي لم تعد في القائمة الجديدة (تنظيف)
new_symbols = set(c['symbol'] for c in approved_candidates)
for symbol in list(self.watchlist.keys()):
if symbol not in new_symbols:
if symbol in self.sentry_tasks:
self.sentry_tasks[symbol].cancel()
del self.watchlist[symbol]
# إضافة المرشحين الجدد
for cand in approved_candidates:
symbol = cand['symbol']
if symbol not in self.watchlist:
self.watchlist[symbol] = {
'data': cand,
'added_at': datetime.utcnow().isoformat(),
'monitoring_started': False
}
print(f"📋 [Sniper] Watchlist updated. Active targets: {len(self.watchlist)}")
# بدء مهام المراقبة الجديدة فوراً
for symbol in list(self.watchlist.keys()):
# نتأكد من عدم وجود مهمة قائمة بالفعل لنفس الرمز
if symbol not in self.sentry_tasks or self.sentry_tasks[symbol].done():
print(f"🎯 [Sniper] Locking radar on target: {symbol}")
self.sentry_tasks[symbol] = asyncio.create_task(self._sniper_loop(symbol))
async def _sniper_loop(self, symbol):
"""
حلقة المراقبة التكتيكية (القناص) - تراقب الرمز لحظة بلحظة لاقتناص فرصة الدخول.
"""
print(f"👁️ [Sniper] Monitoring {symbol} for tactical entry...")
consecutive_signals = 0
required_signals = 2 # عدد الشموع المتتالية المطلوبة لتأكيد الزخم
try:
while self.running and symbol in self.watchlist and symbol not in self.open_positions:
# 1. فحص سريع: هل وصلنا للحد الأقصى للصفقات؟
# إذا نعم، نهدئ اللعب لتوفير الموارد
if len(self.open_positions) >= 1:
await asyncio.sleep(60) # انتظار دقيقة كاملة قبل الفحص التالي
continue
# 2. جلب بيانات الشموع اللحظية (5 دقائق)
ohlcv = await self.data_manager.get_latest_ohlcv(symbol, timeframe='5m', limit=30)
if not ohlcv or len(ohlcv) < 20:
await asyncio.sleep(30)
continue
# 3. تحليل فوري باستخدام محرك تيتان
titan_res = self.titan.predict({'5m': ohlcv})
current_score = titan_res.get('score', 0.0)
# 4. تقييم الإشارة
is_signal = False
entry_reason = ""
# شرط الدخول: زخم قوي جداً (فوق 0.80)
if current_score > 0.80:
is_signal = True
entry_reason = f"Titan Surge ({current_score:.2f})"
if is_signal:
consecutive_signals += 1
print(f"🔥 [Sniper] {symbol} Signal detected! ({consecutive_signals}/{required_signals}) - {entry_reason}")
if consecutive_signals >= required_signals:
# محاولة تنفيذ الشراء
current_price = ohlcv[-1][4] # آخر سعر إغلاق
# نستخدم await هنا لأن الدالة execute_buy_order تحتوي على قفل async
executed = await self.execute_buy_order(symbol, entry_reason, current_price)
if executed:
# تم الشراء بنجاح، ننهي مهمة القناص هذه
print(f"✅ [Sniper] Mission accomplished for {symbol}. Standing down.")
break
else:
# فشل الشراء (ربما بسبب اكتمال العدد أو نقص الرصيد)، نعيد المحاولة لاحقاً
consecutive_signals = 0
else:
# إذا انقطعت سلسلة الإشارات، نعيد العداد للصفر
if consecutive_signals > 0:
# print(f"❄️ [Sniper] {symbol} signal cooled off.")
consecutive_signals = 0
await asyncio.sleep(30) # فاصل زمني 30 ثانية بين كل فحص
except asyncio.CancelledError:
# تم إلغاء المهمة يدوياً (مثلاً عند تحديث القائمة)
pass
except Exception as e:
print(f"❌ [Sniper Error] Loop crashed for {symbol}: {e}")
traceback.print_exc()
async def execute_buy_order(self, symbol, reason, price):
"""
تنفيذ أمر الشراء. هذه الدالة حرجة جداً ومحمية بقفل (Lock).
"""
async with self.execution_lock:
# --- منطقة حرجة (Critical Section) ---
# لا يمكن لأي كود آخر تعديل الصفقات أو المحفظة أثناء وجودنا هنا
# 1. التحقق النهائي من عدد الصفقات
if len(self.open_positions) >= 1:
print(f"⚠️ [Execution Blocked] Cannot buy {symbol}. Max positions (1) reached.")
return False
# 2. جلب وتحديث المحفظة
portfolio = await self.r2.get_portfolio_state_async()
capital = portfolio.get('current_capital_usd', 0.0)
if capital < 5.0: # حد أدنى للرصيد (مثلاً 5 دولار)
print(f"⚠️ [Execution Failed] Insufficient capital for {symbol}: ${capital:.2f}")
return False
# 3. حساب كمية الاستثمار
invest_amount = capital * 0.99 # استثمار 99% من الرصيد المتاح
quantity = invest_amount / price
# 4. إنشاء سجل الصفقة
trade_id = str(uuid.uuid4())[:8]
new_trade = {
'trade_id': trade_id,
'symbol': symbol,
'status': 'OPEN',
'entry_time': datetime.utcnow().isoformat(),
'entry_price': price,
'quantity': quantity,
'invested_usd': invest_amount,
'entry_reason': reason,
'tp_price': price * 1.05, # هدف أولي +5%
'sl_price': price * 0.95, # وقف أولي -5%
'highest_price': price # لتتبع الوقف المتحرك
}
# 5. تحديث الحالة الداخلية
self.open_positions[symbol] = new_trade
if symbol in self.watchlist:
del self.watchlist[symbol]
# 6. تحديث وحفظ البيانات في R2 (دفعة واحدة لضمان التناسق)
portfolio['current_capital_usd'] -= invest_amount
portfolio['invested_capital_usd'] += invest_amount
portfolio['total_trades'] += 1
await self.r2.save_portfolio_state_async(portfolio)
await self.r2.save_open_trades_async(list(self.open_positions.values()))
print(f"🚀 [EXECUTED] BUY {symbol} @ {price:.4f} | Invested: ${invest_amount:.2f} | Reason: {reason}")
# 7. تشغيل الحارس فوراً
if symbol in self.sentry_tasks and not self.sentry_tasks[symbol].done():
self.sentry_tasks[symbol].cancel() # إلغاء مهمة القناص القديمة إن وجدت
self.sentry_tasks[symbol] = asyncio.create_task(self._guardian_loop(symbol))
return True
async def _guardian_loop(self, symbol):
"""
حلقة الحماية (Guardian) - تراقب الصفقة المفتوحة وتدير الخروج.
"""
print(f"🛡️ [Guardian] Activated for position: {symbol}")
try:
while self.running and symbol in self.open_positions:
trade = self.open_positions[symbol]
# جلب السعر الحالي
current_price = await self.data_manager.get_latest_price_async(symbol)
if not current_price or current_price <= 0:
await asyncio.sleep(30)
continue
# تحديث أعلى سعر وصل له (مهم للوقف المتحرك المستقبلي)
if current_price > trade.get('highest_price', 0):
trade['highest_price'] = current_price
# ملاحظة: لا نحفظ في R2 هنا لتجنب كثرة الكتابة، نعتمد على الذاكرة
# التحقق من شروط الخروج (SL / TP)
if current_price <= trade['sl_price']:
print(f"🛡️ [Guardian] STOP LOSS triggered for {symbol} @ {current_price:.4f} (SL: {trade['sl_price']:.4f})")
await self.execute_sell_order(symbol, "Stop Loss Hit", current_price)
break
elif current_price >= trade['tp_price']:
print(f"🛡️ [Guardian] TAKE PROFIT triggered for {symbol} @ {current_price:.4f} (TP: {trade['tp_price']:.4f})")
await self.execute_sell_order(symbol, "Take Profit Hit", current_price)
break
await asyncio.sleep(10) # مراقبة نشطة كل 10 ثواني
except asyncio.CancelledError:
print(f"🛡️ [Guardian] Task cancelled for {symbol}")
except Exception as e:
print(f"❌ [Guardian Error] Loop crashed for {symbol}: {e}")
traceback.print_exc()
async def execute_sell_order(self, symbol, reason, price):
"""
تنفيذ أمر البيع. محمي بـ execution_lock.
"""
async with self.execution_lock:
# --- منطقة حرجة ---
if symbol not in self.open_positions:
print(f"⚠️ [Sell Failed] {symbol} not found in open positions during execution.")
return
trade = self.open_positions.pop(symbol)
# حسابات الأرباح
revenue = trade['quantity'] * price
profit_usd = revenue - trade['invested_usd']
profit_pct = (profit_usd / trade['invested_usd']) * 100
# تحديث المحفظة
portfolio = await self.r2.get_portfolio_state_async()
portfolio['current_capital_usd'] += revenue
portfolio['invested_capital_usd'] -= trade['invested_usd']
portfolio['total_profit_usd'] += profit_usd
if profit_usd > 0:
portfolio['winning_trades'] += 1
# حفظ التحديثات النهائية في R2
await self.r2.save_portfolio_state_async(portfolio)
await self.r2.save_open_trades_async(list(self.open_positions.values()))
print(f"💰 [SOLD] {symbol} @ {price:.4f} | PnL: ${profit_usd:.2f} ({profit_pct:+.2f}%) | Reason: {reason}")
async def execute_emergency_exit(self, symbol, reason):
"""واجهة للخروج الطارئ اليدوي أو من الدماغ"""
print(f"🚨 [Emergency] Requested exit for {symbol} due to: {reason}")
current_price = await self.data_manager.get_latest_price_async(symbol)
if current_price and current_price > 0:
await self.execute_sell_order(symbol, f"EMERGENCY: {reason}", current_price)
else:
print(f"❌ [Emergency Failed] Could not fetch valid price for {symbol}")
async def update_trade_targets(self, symbol, new_tp, new_sl, reason):
"""تحديث أهداف الصفقة بناءً على أوامر الدماغ"""
if symbol in self.open_positions:
trade = self.open_positions[symbol]
old_tp = trade['tp_price']
old_sl = trade['sl_price']
if new_tp is not None: trade['tp_price'] = float(new_tp)
if new_sl is not None: trade['sl_price'] = float(new_sl)
self.open_positions[symbol] = trade
# حفظ فوري لضمان عدم ضياع الأهداف الجديدة في حال إعادة التشغيل
await self.r2.save_open_trades_async(list(self.open_positions.values()))
print(f"🎯 [Targets Updated] {symbol} | TP: {old_tp:.4f}->{trade['tp_price']:.4f} | SL: {old_sl:.4f}->{trade['sl_price']:.4f} | Reason: {reason}")
else:
print(f"⚠️ [Target Update Failed] {symbol} is not currently open.")
async def start_sentry_loops(self):
"""بدء أو استئناف جميع مهام الحراسة"""
for symbol in list(self.open_positions.keys()):
# إذا لم يكن هناك حارس نشط، نعين واحداً
if symbol not in self.sentry_tasks or self.sentry_tasks[symbol].done():
print(f"🛡️ [Sentry Restart] Activating guardian for existing trade: {symbol}")
self.sentry_tasks[symbol] = asyncio.create_task(self._guardian_loop(symbol))
async def stop_sentry_loops(self):
"""إيقاف نظيف لجميع المهام الخلفية"""
print("🛑 [TradeManager] Stopping all sentry loops...")
self.running = False
for sym, task in self.sentry_tasks.items():
task.cancel()
# انتظار انتهاء المهام (اختياري لكن أفضل للأمان)
if self.sentry_tasks:
await asyncio.gather(*self.sentry_tasks.values(), return_exceptions=True)
print("✅ [TradeManager] All loops stopped.")