Riy777 commited on
Commit
e73bb85
·
1 Parent(s): 10bcb5f

Update trade_manager.py

Browse files
Files changed (1) hide show
  1. trade_manager.py +89 -34
trade_manager.py CHANGED
@@ -1,4 +1,4 @@
1
- # trade_manager.py (Updated to V5.7 - Fixed Tactical Triggers Logic)
2
  import asyncio
3
  import json
4
  import time
@@ -133,6 +133,10 @@ class TradeManager:
133
  self.sentry_tasks = {}
134
  self.tactical_data_cache = {}
135
 
 
 
 
 
136
  self.kucoin_rest = None # (المنصة الأساسية)
137
  self.confirmation_exchanges = {} # (المنصات الثانوية للتأكيد)
138
  self.polling_interval = 1.5 # (KuCoin - سريع)
@@ -185,33 +189,54 @@ class TradeManager:
185
  for ex in self.confirmation_exchanges.values(): await ex.close()
186
  raise
187
 
188
- async def start_sentry_and_monitoring_loops(self): # (لا تغيير هنا)
 
189
  """الحلقة الرئيسية للحارس (Sentry) ومراقب الخروج (Exit Monitor)."""
190
  self.is_running = True
191
  print(f"✅ [Sentry] بدء حلقات المراقبة التكتيكية (Layer 2 - API Polling)...")
192
  while self.is_running:
193
  try:
194
- # (نفس منطق إدارة المهام)
195
- watchlist_symbols = set(self.sentry_watchlist.keys())
 
 
 
196
  open_trades = await self.get_open_trades()
197
  open_trade_symbols = {t['symbol'] for t in open_trades}
 
198
  symbols_to_monitor = watchlist_symbols.union(open_trade_symbols)
199
- for symbol in symbols_to_monitor:
200
- if symbol not in self.sentry_tasks:
201
- print(f" [Sentry] بدء المراقبة التكتيكية (Polling) لـ {symbol}")
 
 
 
 
 
 
202
  strategy_hint = self.sentry_watchlist.get(symbol, {}).get('strategy_hint', 'generic')
203
- if symbol not in self.tactical_data_cache: self.tactical_data_cache[symbol] = TacticalData(symbol)
204
- task = asyncio.create_task(self._monitor_symbol_activity_polling(symbol, strategy_hint))
205
- self.sentry_tasks[symbol] = task
206
- for symbol in list(self.sentry_tasks.keys()):
207
- if symbol not in symbols_to_monitor:
208
- print(f" [Sentry] إيقاف المراقبة التكتيكية (Polling) لـ {symbol}")
209
- task = self.sentry_tasks.pop(symbol, None)
210
- if task: task.cancel()
211
- if symbol in self.tactical_data_cache: del self.tactical_data_cache[symbol]
212
- await asyncio.sleep(15)
 
 
 
 
 
 
 
 
 
213
  except Exception as error:
214
  print(f"❌ [Sentry] خطأ في الحلقة الرئيسية: {error}"); traceback.print_exc(); await asyncio.sleep(60)
 
215
 
216
  async def stop_sentry_loops(self):
217
  """(محدث) إي��اف جميع مهام المراقبة وإغلاق جميع اتصالات REST"""
@@ -234,17 +259,24 @@ class TradeManager:
234
  print("✅ [Sentry] تم إغلاق اتصالات التداول (REST).")
235
  except Exception as e: print(f"⚠️ [Sentry] خطأ أثناء إغلاق الاتصالات: {e}")
236
 
237
- async def update_sentry_watchlist(self, candidates: List[Dict]): # (لا تغيير هنا)
 
238
  """تحديث قائمة المراقبة التي يستخدمها الحارس (Sentry)."""
239
- self.sentry_watchlist = {c['symbol']: c for c in candidates}
240
- print(f"ℹ️ [Sentry] تم تحديث Watchlist. عدد المرشحين: {len(self.sentry_watchlist)}")
 
 
241
 
242
  def get_sentry_status(self):
243
  """(محدث) لواجهة برمجة التطبيقات /system-status"""
 
 
 
 
244
  return {
245
  'is_running': self.is_running,
246
- 'active_monitoring_tasks': len(self.sentry_tasks),
247
- 'watchlist_symbols': list(self.sentry_watchlist.keys()),
248
  'monitored_symbols': list(self.sentry_tasks.keys()),
249
  'confirmation_exchanges_active': list(self.confirmation_exchanges.keys())
250
  }
@@ -272,8 +304,11 @@ class TradeManager:
272
  traceback.print_exc()
273
  finally:
274
  print(f"🛑 [Sentry] إنهاء جميع مهام (Polling) {symbol}")
275
- if symbol in self.sentry_tasks: del self.sentry_tasks[symbol]
276
- if symbol in self.tactical_data_cache: del self.tactical_data_cache[symbol]
 
 
 
277
 
278
  async def _poll_kucoin_data(self, symbol):
279
  """(لا تغيير) حلقة استقصاء (Polling) لبيانات KuCoin (الأساسية)"""
@@ -302,6 +337,8 @@ class TradeManager:
302
  except ccxtasync.RateLimitExceeded as e:
303
  print(f"⏳ [Sentry Polling] {symbol} KuCoin Rate Limit Exceeded: {e}. زيادة فترة الانتظار...")
304
  await asyncio.sleep(10)
 
 
305
  except Exception as e:
306
  print(f"⚠️ [Sentry Polling] خطأ في {symbol} KuCoin data polling: {e}")
307
  await asyncio.sleep(5)
@@ -354,6 +391,8 @@ class TradeManager:
354
  except ccxtasync.RateLimitExceeded:
355
  print(f"⏳ [Sentry Conf] {ex_id} Rate Limit لـ {symbol}. الانتظار...")
356
  await asyncio.sleep(15) # انتظار أطول لهذه المنصة
 
 
357
  except Exception as e:
358
  # (الفشل في منصة واحدة لا يجب أن يوقف الحلقة، فقط نتجاهل بياناتها هذه المرة)
359
  pass
@@ -381,17 +420,28 @@ class TradeManager:
381
  if current_price: await self.immediate_close_trade(symbol, current_price, f"Tactical Exit: {exit_reason}")
382
  else:
383
  # (منطق الدخول - محدث)
384
- if symbol in self.sentry_watchlist:
 
 
 
 
 
385
  trigger = self._check_entry_trigger(symbol, strategy_hint, snapshot)
386
  if trigger:
387
  print(f"✅ [Sentry] زناد دخول تكتيكي لـ {symbol} (استراتيجية: {strategy_hint})")
388
- watchlist_entry = self.sentry_watchlist.pop(symbol, None)
 
 
 
 
 
389
  if watchlist_entry:
390
  explorer_context = watchlist_entry.get('llm_decision_context', {})
391
  await self._execute_smart_entry(symbol, strategy_hint, snapshot, explorer_context)
 
 
392
  except Exception as e: print(f"❌ [Sentry] خطأ في حلقة التحليل التكتيكي لـ {symbol}: {e}"); traceback.print_exc()
393
 
394
- # 🔴 --- START OF CHANGE (LOGIC FIXED V5.7) --- 🔴
395
  def _check_entry_trigger(self, symbol: str, strategy_hint: str, data: Dict) -> bool:
396
  """(محدث V5.7) إصلاح منطق الزناد للسماح باستراتيجيات الانعكاس."""
397
 
@@ -436,7 +486,7 @@ class TradeManager:
436
  return True
437
 
438
  return False
439
- # 🔴 --- END OF CHANGE (LOGIC FIXED V5.7) --- 🔴
440
 
441
  def _check_exit_trigger(self, trade: Dict, data: Dict) -> str: # (لا تغيير هنا)
442
  """يحدد ما إذا كان يجب الخروج من صفقة مفتوحة تكتيكياً."""
@@ -503,10 +553,12 @@ class TradeManager:
503
  open_trades_after_cancel = await self.get_open_trades()
504
  if not any(t['symbol'] == symbol for t in open_trades_after_cancel):
505
  print(f" [Sentry] إعادة {symbol} إلى Watchlist بعد فشل التنفيذ.")
506
- if explorer_context:
507
- self.sentry_watchlist[symbol] = {"symbol": symbol, "strategy_hint": strategy_hint, "llm_decision_context": explorer_context}
508
- else:
509
- self.sentry_watchlist[symbol] = {"symbol": symbol, "strategy_hint": strategy_hint}
 
 
510
 
511
 
512
  except Exception as e: print(f"❌ [Executor] فشل فادح أثناء التنفيذ لـ {symbol}: {e}"); traceback.print_exc()
@@ -573,7 +625,10 @@ class TradeManager:
573
  open_trades = await self.r2_service.get_open_trades_async()
574
  trades_to_keep = [t for t in open_trades if t.get('id') != trade_to_close.get('id')]
575
  await self.r2_service.save_open_trades_async(trades_to_keep)
576
- if symbol in self.sentry_tasks: print(f"ℹ️ [Sentry] الصفقة {symbol} أغلقت، ستتوقف المراقبة.")
 
 
 
577
  await self.r2_service.save_system_logs_async({
578
  "trade_closed": True, "symbol": symbol, "pnl_usd": pnl, "pnl_percent": pnl_percent,
579
  "new_capital": new_capital, "strategy": strategy, "reason": reason
@@ -660,4 +715,4 @@ class TradeManager:
660
  except Exception as e: print(f"❌ Failed to get trade by symbol {symbol}: {e}"); return None
661
 
662
 
663
- print(f"✅ Trade Manager loaded - V5.7 (Fixed Tactical Triggers & ccxt.async_support: {CCXT_ASYNC_AVAILABLE})")
 
1
+ # trade_manager.py (Updated to V5.8 - Added Asyncio Lock for Watchlist)
2
  import asyncio
3
  import json
4
  import time
 
133
  self.sentry_tasks = {}
134
  self.tactical_data_cache = {}
135
 
136
+ # 🔴 --- START OF CHANGE (FIX RACE CONDITION) --- 🔴
137
+ self.sentry_lock = asyncio.Lock() # إضافة قفل لحماية Watchlist
138
+ # 🔴 --- END OF CHANGE --- 🔴
139
+
140
  self.kucoin_rest = None # (المنصة الأساسية)
141
  self.confirmation_exchanges = {} # (المنصات الثانوية للتأكيد)
142
  self.polling_interval = 1.5 # (KuCoin - سريع)
 
189
  for ex in self.confirmation_exchanges.values(): await ex.close()
190
  raise
191
 
192
+ # 🔴 --- START OF CHANGE (FIX RACE CONDITION) --- 🔴
193
+ async def start_sentry_and_monitoring_loops(self):
194
  """الحلقة الرئيسية للحارس (Sentry) ومراقب الخروج (Exit Monitor)."""
195
  self.is_running = True
196
  print(f"✅ [Sentry] بدء حلقات المراقبة التكتيكية (Layer 2 - API Polling)...")
197
  while self.is_running:
198
  try:
199
+ # الحصول على قائمة الرموز المطلوب مراقبتها (داخل القفل)
200
+ async with self.sentry_lock:
201
+ watchlist_symbols = set(self.sentry_watchlist.keys())
202
+
203
+ # (جلب الصفقات المفتوحة - هذا I/O، نتركه خارج القفل)
204
  open_trades = await self.get_open_trades()
205
  open_trade_symbols = {t['symbol'] for t in open_trades}
206
+
207
  symbols_to_monitor = watchlist_symbols.union(open_trade_symbols)
208
+ current_tasks = set(self.sentry_tasks.keys())
209
+
210
+ # الرموز التي يجب إضافتها
211
+ symbols_to_add = symbols_to_monitor - current_tasks
212
+ for symbol in symbols_to_add:
213
+ print(f" [Sentry] بدء المراقبة التكتيكية (Polling) لـ {symbol}")
214
+
215
+ # (الحصول على التلميح داخل القفل لضمان عدم تغييره أثناء القراءة)
216
+ async with self.sentry_lock:
217
  strategy_hint = self.sentry_watchlist.get(symbol, {}).get('strategy_hint', 'generic')
218
+
219
+ if symbol not in self.tactical_data_cache:
220
+ self.tactical_data_cache[symbol] = TacticalData(symbol)
221
+
222
+ task = asyncio.create_task(self._monitor_symbol_activity_polling(symbol, strategy_hint))
223
+ self.sentry_tasks[symbol] = task
224
+
225
+ # الرموز التي يجب إزالتها
226
+ symbols_to_remove = current_tasks - symbols_to_monitor
227
+ for symbol in symbols_to_remove:
228
+ print(f" [Sentry] إيقاف المراقبة التكتيكية (Polling) لـ {symbol}")
229
+ task = self.sentry_tasks.pop(symbol, None)
230
+ if task:
231
+ task.cancel()
232
+ if symbol in self.tactical_data_cache:
233
+ del self.tactical_data_cache[symbol]
234
+
235
+ await asyncio.sleep(15) # فاصل زمني للحلقة الرئيسية
236
+
237
  except Exception as error:
238
  print(f"❌ [Sentry] خطأ في الحلقة الرئيسية: {error}"); traceback.print_exc(); await asyncio.sleep(60)
239
+ # 🔴 --- END OF CHANGE --- 🔴
240
 
241
  async def stop_sentry_loops(self):
242
  """(محدث) إي��اف جميع مهام المراقبة وإغلاق جميع اتصالات REST"""
 
259
  print("✅ [Sentry] تم إغلاق اتصالات التداول (REST).")
260
  except Exception as e: print(f"⚠️ [Sentry] خطأ أثناء إغلاق الاتصالات: {e}")
261
 
262
+ # 🔴 --- START OF CHANGE (FIX RACE CONDITION) --- 🔴
263
+ async def update_sentry_watchlist(self, candidates: List[Dict]):
264
  """تحديث قائمة المراقبة التي يستخدمها الحارس (Sentry)."""
265
+ async with self.sentry_lock: # (استخدام القفل قبل الكتابة)
266
+ self.sentry_watchlist = {c['symbol']: c for c in candidates}
267
+ print(f"ℹ️ [Sentry] تم تحديث Watchlist. عدد المرشحين: {len(self.sentry_watchlist)}")
268
+ # 🔴 --- END OF CHANGE --- 🔴
269
 
270
  def get_sentry_status(self):
271
  """(محدث) لواجهة برمجة التطبيقات /system-status"""
272
+ # (لا حاجة للقفل هنا، هذه قراءة سريعة لعدد العناصر فقط)
273
+ active_monitoring_count = len(self.sentry_tasks)
274
+ watchlist_symbols_list = list(self.sentry_watchlist.keys())
275
+
276
  return {
277
  'is_running': self.is_running,
278
+ 'active_monitoring_tasks': active_monitoring_count,
279
+ 'watchlist_symbols': watchlist_symbols_list,
280
  'monitored_symbols': list(self.sentry_tasks.keys()),
281
  'confirmation_exchanges_active': list(self.confirmation_exchanges.keys())
282
  }
 
304
  traceback.print_exc()
305
  finally:
306
  print(f"🛑 [Sentry] إنهاء جميع مهام (Polling) {symbol}")
307
+ # (التنظيف يحدث بالفعل في الحلقة الرئيسية، ولكن هذا ضمان إضافي)
308
+ if symbol in self.sentry_tasks:
309
+ self.sentry_tasks.pop(symbol, None)
310
+ if symbol in self.tactical_data_cache:
311
+ del self.tactical_data_cache[symbol]
312
 
313
  async def _poll_kucoin_data(self, symbol):
314
  """(لا تغيير) حلقة استقصاء (Polling) لبيانات KuCoin (الأساسية)"""
 
337
  except ccxtasync.RateLimitExceeded as e:
338
  print(f"⏳ [Sentry Polling] {symbol} KuCoin Rate Limit Exceeded: {e}. زيادة فترة الانتظار...")
339
  await asyncio.sleep(10)
340
+ except asyncio.CancelledError:
341
+ raise # تمرير الإلغاء
342
  except Exception as e:
343
  print(f"⚠️ [Sentry Polling] خطأ في {symbol} KuCoin data polling: {e}")
344
  await asyncio.sleep(5)
 
391
  except ccxtasync.RateLimitExceeded:
392
  print(f"⏳ [Sentry Conf] {ex_id} Rate Limit لـ {symbol}. الانتظار...")
393
  await asyncio.sleep(15) # انتظار أطول لهذه المنصة
394
+ except asyncio.CancelledError:
395
+ raise # تمرير الإلغاء
396
  except Exception as e:
397
  # (الفشل في منصة واحدة لا يجب أن يوقف الحلقة، فقط نتجاهل بياناتها هذه المرة)
398
  pass
 
420
  if current_price: await self.immediate_close_trade(symbol, current_price, f"Tactical Exit: {exit_reason}")
421
  else:
422
  # (منطق الدخول - محدث)
423
+ # (التحقق من قائمة المراقبة يتم الآن في الحلقة الرئيسية)
424
+ # (لكننا نحتاج إلى التحقق مرة أخرى هنا لضمان عدم حدوث سباق حالات)
425
+ async with self.sentry_lock:
426
+ is_still_on_watchlist = symbol in self.sentry_watchlist
427
+
428
+ if is_still_on_watchlist:
429
  trigger = self._check_entry_trigger(symbol, strategy_hint, snapshot)
430
  if trigger:
431
  print(f"✅ [Sentry] زناد دخول تكتيكي لـ {symbol} (استراتيجية: {strategy_hint})")
432
+
433
+ # (إزالة الع��صر من قائمة المراقبة باستخدام القفل)
434
+ watchlist_entry = None
435
+ async with self.sentry_lock:
436
+ watchlist_entry = self.sentry_watchlist.pop(symbol, None)
437
+
438
  if watchlist_entry:
439
  explorer_context = watchlist_entry.get('llm_decision_context', {})
440
  await self._execute_smart_entry(symbol, strategy_hint, snapshot, explorer_context)
441
+ except asyncio.CancelledError:
442
+ raise # تمرير الإلغاء
443
  except Exception as e: print(f"❌ [Sentry] خطأ في حلقة التحليل التكتيكي لـ {symbol}: {e}"); traceback.print_exc()
444
 
 
445
  def _check_entry_trigger(self, symbol: str, strategy_hint: str, data: Dict) -> bool:
446
  """(محدث V5.7) إصلاح منطق الزناد للسماح باستراتيجيات الانعكاس."""
447
 
 
486
  return True
487
 
488
  return False
489
+ # 🔴 --- END OF LOGIC FIX (V5.7) --- 🔴
490
 
491
  def _check_exit_trigger(self, trade: Dict, data: Dict) -> str: # (لا تغيير هنا)
492
  """يحدد ما إذا كان يجب الخروج من صفقة مفتوحة تكتيكياً."""
 
553
  open_trades_after_cancel = await self.get_open_trades()
554
  if not any(t['symbol'] == symbol for t in open_trades_after_cancel):
555
  print(f" [Sentry] إعادة {symbol} إلى Watchlist بعد فشل التنفيذ.")
556
+ # (استخدام القفل لإعادة الإضافة بأمان)
557
+ async with self.sentry_lock:
558
+ if explorer_context:
559
+ self.sentry_watchlist[symbol] = {"symbol": symbol, "strategy_hint": strategy_hint, "llm_decision_context": explorer_context}
560
+ else:
561
+ self.sentry_watchlist[symbol] = {"symbol": symbol, "strategy_hint": strategy_hint}
562
 
563
 
564
  except Exception as e: print(f"❌ [Executor] فشل فادح أثناء التنفيذ لـ {symbol}: {e}"); traceback.print_exc()
 
625
  open_trades = await self.r2_service.get_open_trades_async()
626
  trades_to_keep = [t for t in open_trades if t.get('id') != trade_to_close.get('id')]
627
  await self.r2_service.save_open_trades_async(trades_to_keep)
628
+
629
+ # (لا حاجة لإزالة المهمة يدوياً، الحلقة الرئيسية ستفعل ذلك)
630
+ # if symbol in self.sentry_tasks: print(f"ℹ️ [Sentry] الصفقة {symbol} أغلقت، ستتوقف المراقبة.")
631
+
632
  await self.r2_service.save_system_logs_async({
633
  "trade_closed": True, "symbol": symbol, "pnl_usd": pnl, "pnl_percent": pnl_percent,
634
  "new_capital": new_capital, "strategy": strategy, "reason": reason
 
715
  except Exception as e: print(f"❌ Failed to get trade by symbol {symbol}: {e}"); return None
716
 
717
 
718
+ print(f"✅ Trade Manager loaded - V5.8 (Added Watchlist Lock) (ccxt.async_support: {CCXT_ASYNC_AVAILABLE})")