Riy777 commited on
Commit
db0d805
·
verified ·
1 Parent(s): 4b858f5

Update data_manager.py

Browse files
Files changed (1) hide show
  1. data_manager.py +13 -7
data_manager.py CHANGED
@@ -54,8 +54,19 @@ class DataManager:
54
  print(f"❌ فشل تحميل بيانات الأسواق: {e}")
55
 
56
  async def close(self):
57
- if self.http_client:
 
 
58
  await self.http_client.aclose()
 
 
 
 
 
 
 
 
 
59
 
60
  async def get_market_context_async(self):
61
  """جلب سياق السوق الأساسي فقط"""
@@ -636,8 +647,7 @@ class DataManager:
636
  else: # أكثر من -10%
637
  return 0.3
638
 
639
- # 🔴 --- بدء التعديل الجوهري --- 🔴
640
- # تم تعديل هذه الدالة لتعمل كـ "منتج" (Producer) يضخ البيانات في طابور
641
  async def stream_ohlcv_data(self, symbols: List[str], queue: asyncio.Queue):
642
  """
643
  (معدلة) جلب بيانات OHLCV بشكل متدفق وإرسالها إلى طابور
@@ -692,15 +702,11 @@ class DataManager:
692
 
693
  print(f"✅ [المنتج] اكتمل تدفق بيانات OHLCV. تم إرسال {total_successful} عملة للمعالجة.")
694
 
695
- # 🔴 --- START OF CHANGE --- 🔴
696
- # (إرسال إشارة "None" لإنهاء المستهلك)
697
  try:
698
  await queue.put(None)
699
  print(" 📬 [المنتج] تم إرسال إشارة الإنهاء (None) إلى الطابور.")
700
  except Exception as q_err:
701
  print(f" ❌ [المنتج] فشل إرسال إشارة الإنهاء (None) للطابور: {q_err}")
702
- # 🔴 --- END OF CHANGE --- 🔴
703
- # 🔴 --- نهاية التعديل الجوهري --- 🔴
704
 
705
 
706
  async def _fetch_complete_ohlcv_parallel(self, symbol: str) -> Dict[str, Any]:
 
54
  print(f"❌ فشل تحميل بيانات الأسواق: {e}")
55
 
56
  async def close(self):
57
+ # 🔴 --- START OF CHANGE --- 🔴
58
+ # (إصلاح تسريب الموارد)
59
+ if self.http_client and not self.http_client.is_closed:
60
  await self.http_client.aclose()
61
+ print(" ✅ DataManager: http_client closed.")
62
+
63
+ if self.exchange:
64
+ try:
65
+ await self.exchange.close()
66
+ print(" ✅ DataManager: ccxt.kucoin exchange closed.")
67
+ except Exception as e:
68
+ print(f" ⚠️ DataManager: Error closing ccxt.kucoin: {e}")
69
+ # 🔴 --- END OF CHANGE --- 🔴
70
 
71
  async def get_market_context_async(self):
72
  """جلب سياق السوق الأساسي فقط"""
 
647
  else: # أكثر من -10%
648
  return 0.3
649
 
650
+ # (لا تغيير في دالة التدفق)
 
651
  async def stream_ohlcv_data(self, symbols: List[str], queue: asyncio.Queue):
652
  """
653
  (معدلة) جلب بيانات OHLCV بشكل متدفق وإرسالها إلى طابور
 
702
 
703
  print(f"✅ [المنتج] اكتمل تدفق بيانات OHLCV. تم إرسال {total_successful} عملة للمعالجة.")
704
 
 
 
705
  try:
706
  await queue.put(None)
707
  print(" 📬 [المنتج] تم إرسال إشارة الإنهاء (None) إلى الطابور.")
708
  except Exception as q_err:
709
  print(f" ❌ [المنتج] فشل إرسال إشارة الإنهاء (None) للطابور: {q_err}")
 
 
710
 
711
 
712
  async def _fetch_complete_ohlcv_parallel(self, symbol: str) -> Dict[str, Any]: