# data_manager.py import os import asyncio import httpx import traceback import time from datetime import datetime import ccxt import numpy as np import logging from typing import List, Dict, Any logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) class DataManager: def __init__(self, contracts_db, whale_monitor): self.contracts_db = contracts_db or {} self.whale_monitor = whale_monitor try: self.exchange = ccxt.kucoin({ 'sandbox': False, 'enableRateLimit': True, 'timeout': 30000, 'verbose': False, }) print("✅ تم تهيئة اتصال KuCoin بنجاح") except Exception as e: print(f"❌ فشل تهيئة اتصال KuCoin: {e}") self.exchange = None self.http_client = None self.market_cache = {} self.last_market_load = None async def initialize(self): self.http_client = httpx.AsyncClient(timeout=30.0) await self._load_markets() print("✅ DataManager initialized - Efficient Volume-Based Screening") async def _load_markets(self): try: if not self.exchange: return print("🔄 جلب أحدث بيانات الأسواق من KuCoin...") self.exchange.load_markets() self.market_cache = self.exchange.markets self.last_market_load = datetime.now() print(f"✅ تم تحميل {len(self.market_cache)} سوق من KuCoin") except Exception as e: print(f"❌ فشل تحميل بيانات الأسواق: {e}") async def close(self): if self.http_client: await self.http_client.aclose() async def get_market_context_async(self): """جلب سياق السوق الأساسي فقط""" try: sentiment_data = await self.get_sentiment_safe_async() price_data = await self._get_prices_with_fallback() bitcoin_price = price_data.get('bitcoin') ethereum_price = price_data.get('ethereum') market_context = { 'timestamp': datetime.now().isoformat(), 'bitcoin_price_usd': bitcoin_price, 'ethereum_price_usd': ethereum_price, 'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None, 'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'NEUTRAL', 'market_trend': self._determine_market_trend(bitcoin_price, sentiment_data), 'btc_sentiment': self._get_btc_sentiment(bitcoin_price), 'data_quality': 'HIGH' if bitcoin_price and ethereum_price else 'LOW' } return market_context except Exception as e: return self._get_minimal_market_context() async def get_sentiment_safe_async(self): """جلب بيانات المشاعر""" try: async with httpx.AsyncClient(timeout=10) as client: response = await client.get("https://api.alternative.me/fng/") response.raise_for_status() data = response.json() if 'data' not in data or not data['data']: raise ValueError("بيانات المشاعر غير متوفرة") latest_data = data['data'][0] return { "feargreed_value": int(latest_data['value']), "feargreed_class": latest_data['value_classification'], "source": "alternative.me", "timestamp": datetime.now().isoformat() } except Exception as e: return None def _determine_market_trend(self, bitcoin_price, sentiment_data): """تحديد اتجاه السوق""" if bitcoin_price is None: return "UNKNOWN" score = 0 if bitcoin_price > 60000: score += 1 elif bitcoin_price < 55000: score -= 1 if sentiment_data and sentiment_data.get('feargreed_value') is not None: fear_greed = sentiment_data.get('feargreed_value') if fear_greed > 60: score += 1 elif fear_greed < 40: score -= 1 if score >= 1: return "bull_market" elif score <= -1: return "bear_market" else: return "sideways_market" def _get_btc_sentiment(self, bitcoin_price): if bitcoin_price is None: return 'UNKNOWN' elif bitcoin_price > 60000: return 'BULLISH' elif bitcoin_price < 55000: return 'BEARISH' else: return 'NEUTRAL' async def _get_prices_with_fallback(self): """جلب أسعار البيتكوين والإيثيريوم""" try: prices = await self._get_prices_from_kucoin_safe() if prices.get('bitcoin') and prices.get('ethereum'): return prices return await self._get_prices_from_coingecko() except Exception as e: return {'bitcoin': None, 'ethereum': None} async def _get_prices_from_kucoin_safe(self): if not self.exchange: return {'bitcoin': None, 'ethereum': None} try: prices = {'bitcoin': None, 'ethereum': None} btc_ticker = self.exchange.fetch_ticker('BTC/USDT') btc_price = float(btc_ticker.get('last', 0)) if btc_ticker.get('last') else None if btc_price and btc_price > 0: prices['bitcoin'] = btc_price eth_ticker = self.exchange.fetch_ticker('ETH/USDT') eth_price = float(eth_ticker.get('last', 0)) if eth_ticker.get('last') else None if eth_price and eth_price > 0: prices['ethereum'] = eth_price return prices except Exception as e: print(f"⚠️ فشل جلب الأسعار من KuCoin: {e}") return {'bitcoin': None, 'ethereum': None} async def _get_prices_from_coingecko(self): """الاحتياطي: جلب الأسعار من CoinGecko""" try: await asyncio.sleep(0.5) url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json' } async with httpx.AsyncClient(headers=headers) as client: response = await client.get(url, timeout=10) if response.status_code == 429: await asyncio.sleep(2) response = await client.get(url, timeout=10) response.raise_for_status() data = response.json() btc_price = data.get('bitcoin', {}).get('usd') eth_price = data.get('ethereum', {}).get('usd') if btc_price and eth_price: return {'bitcoin': btc_price, 'ethereum': eth_price} else: return {'bitcoin': None, 'ethereum': None} except Exception as e: print(f"⚠️ فشل جلب الأسعار من CoinGecko: {e}") return {'bitcoin': None, 'ethereum': None} def _get_minimal_market_context(self): """سياق سوق بدائي عند الفشل""" return { 'timestamp': datetime.now().isoformat(), 'data_available': False, 'market_trend': 'UNKNOWN', 'btc_sentiment': 'UNKNOWN', 'data_quality': 'LOW' } async def layer1_rapid_screening(self) -> List[Dict[str, Any]]: """ الطبقة 1: فحص سريع - جلب أفضل 200 عملة حسب الحجم مباشرة """ print("📊 الطبقة 1: جلب أفضل 200 عملة حسب حجم التداول...") # المحاولة 1: الطريقة المثلى - استخدام fetch_tickers volume_data = await self._get_volume_data_optimal() if not volume_data: # المحاولة 2: الطريقة البديلة - استخدام API المباشر volume_data = await self._get_volume_data_direct_api() if not volume_data: # المحاولة 3: الطريقة التقليدية (الاحتياطية) volume_data = await self._get_volume_data_traditional() if not volume_data: print("❌ فشل جميع محاولات جلب بيانات الأحجام") return [] # أخذ أفضل 200 عملة حسب الحجم فقط volume_data.sort(key=lambda x: x['dollar_volume'], reverse=True) top_200_by_volume = volume_data[:200] print(f"✅ تم اختيار أفضل {len(top_200_by_volume)} عملة حسب الحجم") # المرحلة 2: تطبيق المؤشرات الأخرى على الـ200 فقط final_candidates = await self._apply_advanced_indicators(top_200_by_volume) print(f"🎯 تم تحليل {len(final_candidates)} عملة للطبقة 2") # عرض أفضل 15 عملة print("🏆 أفضل 15 عملة من الطبقة 1:") for i, candidate in enumerate(final_candidates[:15]): score = candidate.get('layer1_score', 0) volume = candidate.get('dollar_volume', 0) change = candidate.get('price_change_24h', 0) print(f" {i+1:2d}. {candidate['symbol']}: {score:.3f} | ${volume:>10,.0f} | {change:>+6.1f}%") return final_candidates async def _get_volume_data_optimal(self) -> List[Dict[str, Any]]: """الطريقة المثلى: استخدام fetch_tickers لجميع البيانات مرة واحدة""" try: if not self.exchange: return [] tickers = self.exchange.fetch_tickers() volume_data = [] processed = 0 for symbol, ticker in tickers.items(): # تصفية أزواج USDT النشطة فقط if not symbol.endswith('/USDT'): continue if not ticker.get('active', True): continue # استخدام quoteVolume (الحجم بالدولار) إذا متوفر current_price = ticker.get('last', 0) quote_volume = ticker.get('quoteVolume', 0) # ✅ الإصلاح: التحقق من القيم قبل العمليات الحسابية if current_price is None or current_price <= 0: continue if quote_volume is not None and quote_volume > 0: dollar_volume = quote_volume else: # fallback: baseVolume * السعر base_volume = ticker.get('baseVolume', 0) if base_volume is None: continue dollar_volume = base_volume * current_price # ✅ الإصلاح: التحقق من أن dollar_volume قيمة صالحة if dollar_volume is None or dollar_volume < 50000: # أقل من 50K دولار continue # ✅ الإصلاح: التحقق من أن price_change_24h قيمة صالحة price_change_24h = (ticker.get('percentage', 0) or 0) * 100 if price_change_24h is None: price_change_24h = 0 volume_data.append({ 'symbol': symbol, 'dollar_volume': dollar_volume, 'current_price': current_price, 'volume_24h': ticker.get('baseVolume', 0) or 0, 'price_change_24h': price_change_24h }) processed += 1 print(f"✅ تم معالجة {processed} عملة في الطريقة المثلى") return volume_data except Exception as e: print(f"❌ خطأ في جلب بيانات الحجم المثلى: {e}") traceback.print_exc() return [] async def _get_volume_data_direct_api(self) -> List[Dict[str, Any]]: """الطريقة الثانية: استخدام KuCoin API مباشرة""" try: url = "https://api.kucoin.com/api/v1/market/allTickers" async with httpx.AsyncClient(timeout=15) as client: response = await client.get(url) response.raise_for_status() data = response.json() if data.get('code') != '200000': raise ValueError(f"استجابة API غير متوقعة: {data.get('code')}") tickers = data['data']['ticker'] volume_data = [] for ticker in tickers: symbol = ticker['symbol'] # تصفية أزواج USDT فقط if not symbol.endswith('USDT'): continue # تحويل الرمز للتنسيق القياسي (BTC-USDT → BTC/USDT) formatted_symbol = symbol.replace('-', '/') try: # ✅ الإصلاح: التحقق من القيم قبل التحويل vol_value = ticker.get('volValue') last_price = ticker.get('last') change_rate = ticker.get('changeRate') vol = ticker.get('vol') if vol_value is None or last_price is None or change_rate is None or vol is None: continue # ✅ الإصلاح: استخدام القيم الافتراضية للتحويل dollar_volume = float(vol_value) if vol_value else 0 current_price = float(last_price) if last_price else 0 price_change = (float(change_rate) * 100) if change_rate else 0 volume_24h = float(vol) if vol else 0 if dollar_volume >= 50000 and current_price > 0: volume_data.append({ 'symbol': formatted_symbol, 'dollar_volume': dollar_volume, 'current_price': current_price, 'volume_24h': volume_24h, 'price_change_24h': price_change }) except (ValueError, TypeError, KeyError) as e: continue print(f"✅ تم معالجة {len(volume_data)} عملة في الطريقة المباشرة") return volume_data except Exception as e: print(f"❌ خطأ في جلب بيانات الحجم المباشر: {e}") traceback.print_exc() return [] async def _get_volume_data_traditional(self) -> List[Dict[str, Any]]: """الطريقة التقليدية: جلب كل رمز على حدة (الاحتياطي)""" try: if not self.exchange or not self.market_cache: return [] usdt_symbols = [ symbol for symbol in self.market_cache.keys() if symbol.endswith('/USDT') and self.market_cache[symbol].get('active', False) ] volume_data = [] processed = 0 # معالجة دفعات لتجنب rate limits batch_size = 20 # تقليل حجم الدفعة لتحسين الاستقرار for i in range(0, len(usdt_symbols), batch_size): batch = usdt_symbols[i:i + batch_size] batch_tasks = [self._process_single_symbol(sym) for sym in batch] batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) for result in batch_results: if isinstance(result, dict): volume_data.append(result) processed += len(batch) # انتظار قصير بين الدفعات if i + batch_size < len(usdt_symbols): await asyncio.sleep(1) print(f"✅ تم معالجة {len(volume_data)} عملة في الطريقة التقليدية") return volume_data except Exception as e: print(f"❌ خطأ في جلب بيانات الحجم التقليدية: {e}") traceback.print_exc() return [] async def _process_single_symbol(self, symbol: str) -> Dict[str, Any]: """معالجة رمز واحد لجلب بيانات الحجم""" try: ticker = self.exchange.fetch_ticker(symbol) if not ticker: return None current_price = ticker.get('last', 0) quote_volume = ticker.get('quoteVolume', 0) # ✅ الإصلاح: التحقق من القيم قبل العمليات الحسابية if current_price is None or current_price <= 0: return None if quote_volume is not None and quote_volume > 0: dollar_volume = quote_volume else: base_volume = ticker.get('baseVolume', 0) if base_volume is None: return None dollar_volume = base_volume * current_price if dollar_volume is None or dollar_volume < 50000: return None # ✅ الإصلاح: التحقق من أن price_change_24h قيمة صالحة price_change_24h = (ticker.get('percentage', 0) or 0) * 100 if price_change_24h is None: price_change_24h = 0 return { 'symbol': symbol, 'dollar_volume': dollar_volume, 'current_price': current_price, 'volume_24h': ticker.get('baseVolume', 0) or 0, 'price_change_24h': price_change_24h } except Exception: return None async def _apply_advanced_indicators(self, volume_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """تطبيق المؤشرات المتقدمة على أفضل العملات حسب الحجم""" candidates = [] for i, symbol_data in enumerate(volume_data): try: symbol = symbol_data['symbol'] # جلب بيانات إضافية للرمز detailed_data = await self._get_detailed_symbol_data(symbol) if not detailed_data: continue # دمج البيانات symbol_data.update(detailed_data) # حساب الدرجة النهائية score = self._calculate_advanced_score(symbol_data) symbol_data['layer1_score'] = score candidates.append(symbol_data) except Exception as e: continue # ترتيب المرشحين حسب الدرجة النهائية candidates.sort(key=lambda x: x.get('layer1_score', 0), reverse=True) return candidates async def _get_detailed_symbol_data(self, symbol: str) -> Dict[str, Any]: """جلب بيانات تفصيلية للرمز""" try: ticker = self.exchange.fetch_ticker(symbol) if not ticker: return None current_price = ticker.get('last', 0) high_24h = ticker.get('high', 0) low_24h = ticker.get('low', 0) open_price = ticker.get('open', 0) price_change_24h = (ticker.get('percentage', 0) or 0) * 100 # ✅ الإصلاح: استخدام القيم الافتراضية للتحويل current_price = current_price or 0 high_24h = high_24h or 0 low_24h = low_24h or 0 open_price = open_price or 0 price_change_24h = price_change_24h or 0 # حساب المؤشرات المتقدمة volatility = self._calculate_volatility(high_24h, low_24h, current_price) price_strength = self._calculate_price_strength(current_price, open_price, price_change_24h) momentum = self._calculate_momentum(price_change_24h) return { 'price_change_24h': price_change_24h, 'high_24h': high_24h, 'low_24h': low_24h, 'open_price': open_price, 'volatility': volatility, 'price_strength': price_strength, 'momentum': momentum, 'reasons': [] } except Exception as e: return None def _calculate_advanced_score(self, symbol_data: Dict[str, Any]) -> float: """حساب درجة متقدمة تجمع بين الحجم والمؤشرات الأخرى""" dollar_volume = symbol_data.get('dollar_volume', 0) price_change = symbol_data.get('price_change_24h', 0) volatility = symbol_data.get('volatility', 0) price_strength = symbol_data.get('price_strength', 0) momentum = symbol_data.get('momentum', 0) # 1. درجة الحجم (40%) - الأهم volume_score = self._calculate_volume_score(dollar_volume) # 2. درجة الزخم (25%) momentum_score = momentum # 3. درجة التقلب (20%) volatility_score = self._calculate_volatility_score(volatility) # 4. درجة قوة السعر (15%) strength_score = price_strength # الدرجة النهائية final_score = ( volume_score * 0.40 + momentum_score * 0.25 + volatility_score * 0.20 + strength_score * 0.15 ) # تحديث أسباب الترشيح reasons = [] if volume_score >= 0.7: reasons.append('high_volume') if momentum_score >= 0.7: reasons.append('strong_momentum') if volatility_score >= 0.7: reasons.append('good_volatility') symbol_data['reasons'] = reasons return final_score def _calculate_volume_score(self, dollar_volume: float) -> float: """حساب درجة الحجم""" if dollar_volume >= 10000000: # 10M+ return 1.0 elif dollar_volume >= 5000000: # 5M+ return 0.9 elif dollar_volume >= 2000000: # 2M+ return 0.8 elif dollar_volume >= 1000000: # 1M+ return 0.7 elif dollar_volume >= 500000: # 500K+ return 0.6 elif dollar_volume >= 250000: # 250K+ return 0.5 elif dollar_volume >= 100000: # 100K+ return 0.4 else: return 0.3 def _calculate_volatility(self, high_24h: float, low_24h: float, current_price: float) -> float: """حساب التقلب""" if current_price == 0: return 0 return (high_24h - low_24h) / current_price def _calculate_volatility_score(self, volatility: float) -> float: """حساب درجة التقلب""" if 0.02 <= volatility <= 0.15: # تقلب مثالي 2%-15% return 1.0 elif 0.01 <= volatility <= 0.20: # مقبول 1%-20% return 0.8 elif volatility <= 0.01: # قليل جداً return 0.4 elif volatility > 0.20: # عالي جداً return 0.3 else: return 0.5 def _calculate_price_strength(self, current_price: float, open_price: float, price_change: float) -> float: """حساب قوة السعر""" if open_price == 0: return 0.5 # قوة السعر تعتمد على المسافة من سعر الافتتاح ونسبة التغير distance_from_open = abs(current_price - open_price) / open_price change_strength = min(abs(price_change) / 50, 1.0) return (distance_from_open * 0.6 + change_strength * 0.4) def _calculate_momentum(self, price_change: float) -> float: """حساب الزخم""" if price_change >= 15: # +15%+ return 1.0 elif price_change >= 10: # +10%+ return 0.9 elif price_change >= 5: # +5%+ return 0.8 elif price_change >= 2: # +2%+ return 0.7 elif price_change >= 0: # موجب return 0.6 elif price_change >= -5: # حتى -5% return 0.5 elif price_change >= -10: # حتى -10% return 0.4 else: # أكثر من -10% return 0.3 # 🔴 --- بدء التعديل الجوهري --- 🔴 # تم تعديل هذه الدالة لتعمل كـ "منتج" (Producer) يضخ البيانات في طابور async def stream_ohlcv_data(self, symbols: List[str], queue: asyncio.Queue): """ (معدلة) جلب بيانات OHLCV بشكل متدفق وإرسالها إلى طابور """ print(f"📊 بدء تدفق بيانات OHLCV لـ {len(symbols)} عملة...") batch_size = 15 batches = [symbols[i:i + batch_size] for i in range(0, len(symbols), batch_size)] total_successful = 0 for batch_num, batch in enumerate(batches): print(f" 🔄 [المنتج] جلب الدفعة {batch_num + 1}/{len(batches)} ({len(batch)} عملة)...") batch_tasks = [] for symbol in batch: task = asyncio.create_task(self._fetch_complete_ohlcv_parallel(symbol)) batch_tasks.append(task) batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) # معالجة نتائج الدفعة successful_data_for_batch = [] successful_count = 0 for i, result in enumerate(batch_results): symbol = batch[i] if isinstance(result, Exception): print(f" ❌ [المنتج] فشل جلب {symbol}: {result}") elif result is not None: successful_data_for_batch.append(result) successful_count += 1 timeframes_count = result.get('successful_timeframes', 0) print(f" ✅ [المنتج] {symbol}: {timeframes_count}/6 أطر زمنية") else: print(f" ⚠️ [المنتج] {symbol}: بيانات غير كافية، تم التجاهل") print(f" 📦 [المنتج] اكتملت الدفعة {batch_num + 1}: {successful_count}/{len(batch)} ناجحة") # 🔴 الإرسال إلى الطابور # نرسل فقط إذا كانت هناك بيانات ناجحة في الدفعة if successful_data_for_batch: try: await queue.put(successful_data_for_batch) print(f" 📬 [المنتج] تم إرسال {len(successful_data_for_batch)} عملة إلى طابور المعالجة") total_successful += len(successful_data_for_batch) except Exception as q_err: print(f" ❌ [المنتج] فشل إرسال الدفعة للطابور: {q_err}") # انتظار قصير بين الدفعات لتجنب rate limits if batch_num < len(batches) - 1: await asyncio.sleep(1) print(f"✅ [المنتج] اكتمل تدفق بيانات OHLCV. تم إرسال {total_successful} عملة للمعالجة.") # 🔴 --- START OF CHANGE --- 🔴 # (إرسال إشارة "None" لإنهاء المستهلك) try: await queue.put(None) print(" 📬 [المنتج] تم إرسال إشارة الإنهاء (None) إلى الطابور.") except Exception as q_err: print(f" ❌ [المنتج] فشل إرسال إشارة الإنهاء (None) للطابور: {q_err}") # 🔴 --- END OF CHANGE --- 🔴 # 🔴 --- نهاية التعديل الجوهري --- 🔴 async def _fetch_complete_ohlcv_parallel(self, symbol: str) -> Dict[str, Any]: """جلب بيانات OHLCV كاملة مع جميع الإطارات الزمنية بشكل متوازي""" try: ohlcv_data = {} # جلب 200 شمعة لكل إطار زمني مع تحسين التعامل مع الأخطاء timeframes = [ ('5m', 200), ('15m', 200), ('1h', 200), ('4h', 200), ('1d', 200), ('1w', 200), ] # إنشاء مهام لجميع الإطارات الزمنية بشكل متوازي timeframe_tasks = [] for timeframe, limit in timeframes: task = asyncio.create_task(self._fetch_single_timeframe_improved(symbol, timeframe, limit)) timeframe_tasks.append(task) # انتظار جميع المهام timeframe_results = await asyncio.gather(*timeframe_tasks, return_exceptions=True) # تحسين: قبول البيانات حتى لو كانت غير مكتملة successful_timeframes = 0 min_required_timeframes = 2 # تخفيف الشرط من 3 إلى 2 أطر زمنية # معالجة النتائج for i, (timeframe, limit) in enumerate(timeframes): result = timeframe_results[i] if isinstance(result, Exception): continue if result and len(result) >= 10: # تخفيف الشرط من 50 إلى 10 شموع ohlcv_data[timeframe] = result successful_timeframes += 1 # تحسين: قبول العملة إذا كان لديها عدد كافٍ من الأطر الزمنية if successful_timeframes >= min_required_timeframes and ohlcv_data: try: # ✅ الحصول على السعر الحالي مباشرة current_price = await self.get_latest_price_async(symbol) # ✅ الإصلاح: إذا لم نتمكن من جلب السعر، نستخدم آخر سعر إغلاق if current_price is None: # البحث عن آخر سعر إغلاق من بيانات OHLCV for timeframe_data in ohlcv_data.values(): if timeframe_data and len(timeframe_data) > 0: last_candle = timeframe_data[-1] if len(last_candle) >= 5: current_price = last_candle[4] # سعر الإغلاق break if current_price is None: print(f"❌ فشل جلب السعر لـ {symbol} من جميع المصادر") return None result_data = { 'symbol': symbol, 'ohlcv': ohlcv_data, 'raw_ohlcv': ohlcv_data, # ✅ إضافة البيانات الخام مباشرة 'current_price': current_price, 'timestamp': datetime.now().isoformat(), 'candles_count': {tf: len(data) for tf, data in ohlcv_data.items()}, 'successful_timeframes': successful_timeframes } return result_data except Exception as price_error: print(f"❌ فشل جلب السعر لـ {symbol}: {price_error}") return None else: return None except Exception as e: print(f"❌ خطأ في جلب بيانات OHLCV لـ {symbol}: {e}") return None async def _fetch_single_timeframe_improved(self, symbol: str, timeframe: str, limit: int): """جلب بيانات إطار زمني واحد مع تحسين التعامل مع الأخطاء""" max_retries = 3 retry_delay = 2 for attempt in range(max_retries): try: ohlcv_data = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) if ohlcv_data and len(ohlcv_data) > 0: return ohlcv_data else: return [] except Exception as e: if attempt < max_retries - 1: await asyncio.sleep(retry_delay * (attempt + 1)) else: return [] async def get_latest_price_async(self, symbol): """جلب السعر الحالي لعملة محددة - الإصلاح الرئيسي هنا""" try: if not self.exchange: print(f"❌ Exchange غير مهيأ لـ {symbol}") return None # ✅ الإصلاح الرئيسي: استخدام fetch_ticker مباشرة بدون asyncio.create_task # التأكد من أن symbol صالح للاستخدام if not symbol or '/' not in symbol: print(f"❌ رمز غير صالح: {symbol}") return None # ✅ الإصلاح: استخدام fetch_ticker بشكل متزامن (ليست async) ticker = self.exchange.fetch_ticker(symbol) if not ticker: print(f"❌ لم يتم العثور على ticker لـ {symbol}") return None current_price = ticker.get('last') if current_price is None: print(f"❌ لا يوجد سعر حالي في ticker لـ {symbol}") return None return float(current_price) except Exception as e: print(f"❌ فشل جلب السعر من KuCoin لـ {symbol}: {e}") return None async def get_available_symbols(self): """الحصول على جميع الرموز المتاحة""" try: if not self.exchange: return [] if not self.market_cache: await self._load_markets() usdt_symbols = [ symbol for symbol in self.market_cache.keys() if symbol.endswith('/USDT') and self.market_cache[symbol].get('active', False) ] return usdt_symbols except Exception as e: return [] async def validate_symbol(self, symbol): """التحقق من صحة الرمز""" try: if not self.exchange: return False if not self.market_cache: await self._load_markets() return symbol in self.market_cache and self.market_cache[symbol].get('active', False) except Exception as e: return False # === الدوال الجديدة لدعم بيانات الحيتان === async def get_whale_data_for_symbol(self, symbol): """جلب بيانات الحيتان لعملة محددة""" try: if self.whale_monitor: whale_data = await self.whale_monitor.get_symbol_whale_activity(symbol) return whale_data else: return None except Exception as e: return None async def get_whale_trading_signal(self, symbol, whale_data, market_context): """جلب إشارة التداول بناءً على بيانات الحيتان""" try: if self.whale_monitor: return await self.whale_monitor.generate_whale_trading_signal(symbol, whale_data, market_context) else: return { 'action': 'HOLD', 'confidence': 0.3, 'reason': 'Whale monitor not available', 'source': 'whale_analysis' } except Exception as e: return { 'action': 'HOLD', 'confidence': 0.3, 'reason': f'Error: {str(e)}', 'source': 'whale_analysis' } print("✅ DataManager loaded - (FIXED: Added stream_ohlcv_data 'None' terminator)")