# whale_monitor/rpc_manager.py # (V2.1 - إضافة دعم Solscan API) # هذا هو "الوكيل الذكي" لإدارة اتصالات RPC والمستكشفات (APIs) # يدير منظمات الطلبات (Rate Limiters) والإحصائيات import asyncio import httpx import time import ssl import json import os import csv from collections import deque, defaultdict import random from typing import Dict, Any, Optional, List # استيراد الإعدادات الثابتة from .config import DEFAULT_NETWORK_CONFIGS, COINGECKO_BASE_URL # إعدادات الوكيل الذكي RPC_HEALTH_CHECK_WINDOW = 10 # تتبع آخر 10 طلبات RPC_ERROR_THRESHOLD = 3 # عدد الأخطاء المتتالية لإيقاف مؤقت RPC_CIRCUIT_BREAKER_DURATION = 300 # 5 دقائق إيقاف مؤقت # (إضافة ثابت لفرض تأخير بين طلبات CoinGecko) COINGECKO_REQUEST_DELAY = 2.0 # 2.0 ثانية (يساوي 30 طلب/دقيقة كحد أقصى) # (تحديد المسار الحالي لملف rpc_manager.py) _CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) # (تحديد مسار ملف CSV داخل نفس المجلد) CUSTOM_RPC_CSV_FILE = os.path.join(_CURRENT_DIR, 'rpc_endpoints (1).csv') class AdaptiveRpcManager: """ (محدث V2.1) مدير RPC و API ذكي: 1. يدير صحة نقاط RPC العامة (Public). 2. يدير منظمات الطلبات (Rate Limiters) للمفاتيح الخاصة (Infura, Moralis, Scanners, Solscan). 3. يتتبع إحصائيات الجلسة (Session Stats) لجميع الطلبات. """ def __init__(self, http_client: httpx.AsyncClient): self.http_client = http_client # 1. تحميل المفاتيح الخاصة من متغيرات البيئة self.api_keys = { "infura": os.getenv("INFURA_KEY"), "moralis": os.getenv("MORALIS_KEY"), "etherscan": os.getenv("ETHERSCAN_KEY"), "bscscan": os.getenv("BSCSCAN_KEY"), "polygonscan": os.getenv("POLYGONSCAN_KEY"), # 🔴 --- START OF CHANGE (V2.1) --- 🔴 "solscan": os.getenv("SOLSCAN_KEY"), # (إضافة مفتاح Solscan) # 🔴 --- END OF CHANGE --- 🔴 } print("✅ [RPCManager V2.1] تم تحميل المفاتيح الخاصة من البيئة.") # 2. تهيئة منظمات الطلبات (Semaphores) # لـ Infura (500 credits/sec) - سنستخدم 450 كحد أمان self.infura_semaphore = asyncio.Semaphore(450) # لـ Moralis (40k/day ~ 0.46/sec) - سنستخدم 1 لضمان طلب واحد في كل مرة self.moralis_semaphore = asyncio.Semaphore(1) # لـ Etherscan, BscScan, PolygonScan (الحد المشترك 5/sec) self.scanner_semaphore = asyncio.Semaphore(5) # 🔴 --- START OF CHANGE (V2.1) --- 🔴 # لـ Solscan (1000 reqs/60 sec ~ 16.6/sec) - سنستخدم 15 كحد أمان self.solscan_semaphore = asyncio.Semaphore(15) # 🔴 --- END OF CHANGE --- 🔴 # لـ CoinGecko (عام، سنكون حذرين) self.coingecko_semaphore = asyncio.Semaphore(1) # لـ مجمع RPC العام (Public Pool) (لحمايتهم من الضغط) self.public_rpc_semaphore = asyncio.Semaphore(10) self.last_coingecko_call = 0.0 self.last_moralis_call = 0.0 # 3. تهيئة إحصائيات الجلسة self.session_stats = defaultdict(int) # 4. تهيئة إعدادات الشبكة ونقاط RPC self.network_configs = self._initialize_network_configs(DEFAULT_NETWORK_CONFIGS) # 5. نظام تتبع الصحة (فقط لنقاط RPC العامة) self.endpoint_health = defaultdict(lambda: defaultdict(lambda: { "latency": deque(maxlen=RPC_HEALTH_CHECK_WINDOW), "consecutive_errors": 0, "total_errors": 0, "last_error_time": None, "circuit_open": False, })) print("✅ [RPCManager V2.1] مدير RPC/API الذكي (V2.1) مهيأ.") def _load_rpc_from_csv(self, csv_file_path: str) -> Dict[str, List[str]]: """ (جديد V2) قراءة ملف rpc_endpoints (1).csv ودمج النقاط. يتوقع الملف أن يحتوي على عمودين: 'network' و 'url' """ custom_rpcs = defaultdict(list) if not os.path.exists(csv_file_path): print(f"⚠️ [RPCManager V2] ملف CSV المخصص '{csv_file_path}' غير موجود. سيتم تخطيه.") return custom_rpcs try: with open(csv_file_path, mode='r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: network = row.get('network') url = row.get('url') if network and url and url.startswith('http'): custom_rpcs[network].append(url) print(f"✅ [RPCManager V2] تم تحميل {sum(len(v) for v in custom_rpcs.values())} نقطة RPC مخصصة من {csv_file_path}") return custom_rpcs except Exception as e: print(f"❌ [RPCManager V2] فشل في قراءة ملف CSV '{csv_file_path}': {e}") return defaultdict(list) def _initialize_network_configs(self, configs): """ (محدث V2.1) يقوم بدمج CSV، وحقن مفاتيح API، وإعداد الشبكات. """ initialized_configs = {} # 1. تحميل نقاط CSV المخصصة custom_rpcs = self._load_rpc_from_csv(CUSTOM_RPC_CSV_FILE) for network, config in configs.items(): new_config = config.copy() # 2. حقن مفاتيح Infura new_config['rpc_endpoints'] = self._inject_api_keys( config['rpc_endpoints'], self.api_keys.get('infura') ) # 3. دمج نقاط CSV if network in custom_rpcs: new_config['rpc_endpoints'].extend(custom_rpcs[network]) print(f" ... دمج {len(custom_rpcs[network])} نقاط مخصصة لشبكة {network}") # 4. خلط القائمة النهائية (لضمان التوزيع) random.shuffle(new_config['rpc_endpoints']) # 5. حقن مفاتيح المستكشف (Explorer) if config.get('explorer'): explorer_key_name = config['explorer'].get('api_key_name') if explorer_key_name and explorer_key_name in self.api_keys: new_config['explorer']['api_key'] = self.api_keys[explorer_key_name] else: new_config['explorer']['api_key'] = None # مفتاح غير متوفر initialized_configs[network] = new_config return initialized_configs def _inject_api_keys(self, endpoints: list, infura_key: str): """ يستبدل بالمفتاح الفعلي. """ if not infura_key: # إزالة النقاط التي تعتمد على مفتاح غير متوفر return [ep for ep in endpoints if "" not in ep] return [ep.replace("", infura_key) for ep in endpoints] # --- (دوال مساعدة للحصول على الإعدادات) --- def get_network_configs(self): return self.network_configs def get_explorer_config(self, network: str): config = self.network_configs.get(network, {}) return config.get('explorer') def get_api_key(self, key_name: str) -> Optional[str]: """(جديد V2) جلب مفتاح API بأمان""" return self.api_keys.get(key_name) # --- (دوال إدارة الإحصائيات V2) --- def reset_session_stats(self): """(جديد V2) تصفير عدادات الإحصائيات للجلسة الجديدة""" self.session_stats = defaultdict(int) print("📊 [RPCManager V2.1] تم تصفير عدادات إحصائيات الجلسة.") def get_session_stats(self) -> Dict[str, int]: """(جديد V2) إرجاع نسخة من الإحصائيات الحالية""" return self.session_stats.copy() # --- (دوال الصحة لـ Public RPCs - لا تغيير) --- def _get_healthy_public_endpoints(self, network: str): """ (معدل V2) يرتب نقاط RPC العامة (فقط) بناءً على الصحة. """ if network not in self.network_configs: return [] endpoints = self.network_configs[network]['rpc_endpoints'] healthy_endpoints = [] current_time = time.time() for ep in endpoints: # (تخطي Infura، هذا المجمع للعام فقط) if "infura.io" in ep: continue health = self.endpoint_health[network][ep] if health['circuit_open']: if current_time - health['last_error_time'] > RPC_CIRCUIT_BREAKER_DURATION: health['circuit_open'] = False; health['consecutive_errors'] = 0 else: continue # النقطة لا تزال معطلة مؤقتاً avg_latency = sum(health['latency']) / len(health['latency']) if health['latency'] else float('inf') healthy_endpoints.append((ep, avg_latency, health['consecutive_errors'])) healthy_endpoints.sort(key=lambda x: (x[2], x[1])) # (فصل السليم تماماً عن غير السليم) healthy_list = [ep[0] for ep in healthy_endpoints if ep[2] == 0] random.shuffle(healthy_list) # خلط السليم للتوزيع unhealthy_list = [ep[0] for ep in healthy_endpoints if ep[2] > 0] return healthy_list + unhealthy_list def _update_health(self, network: str, endpoint: str, success: bool, latency: float): """(لا تغيير) تحديث إحصائيات صحة نقطة RPC العامة.""" health = self.endpoint_health[network][endpoint] if success: health['latency'].append(latency) health['consecutive_errors'] = 0 health['circuit_open'] = False else: health['consecutive_errors'] += 1; health['total_errors'] += 1 health['last_error_time'] = time.time() if health['consecutive_errors'] >= RPC_ERROR_THRESHOLD: health['circuit_open'] = True print(f"🚨 [RPCManager V2.1] قاطع الدائرة مفعل (عام)! إيقاف مؤقت لـ: {endpoint.split('//')[-1]}") # --- (دوال الاتصال الأساسية V2.1 - محدثة بالكامل) --- async def post_rpc(self, network: str, payload: dict, timeout: float = 20.0): """ (محدث V2) إرسال طلب POST (JSON-RPC) سيحاول مع Infura أولاً (إذا كان متاحاً)، ثم يلجأ إلى المجمع العام. """ # 1. محاولة Infura أولاً (الأولوية) infura_key = self.api_keys.get('infura') infura_endpoint = next((ep for ep in self.network_configs.get(network, {}).get('rpc_endpoints', []) if "infura.io" in ep), None) if infura_key and infura_endpoint: start_time = time.time() try: async with self.infura_semaphore: response = await self.http_client.post(infura_endpoint, json=payload, timeout=timeout) response.raise_for_status() self.session_stats['infura_success'] += 1 latency = time.time() - start_time print(f"✅ [RPC Infura] {network} - {latency:.2f}s") return response.json() except Exception as e: self.session_stats['infura_fail'] += 1 print(f"⚠️ [RPC Infura] فشل {network}: {type(e).__name__}. اللجوء إلى المجمع العام...") # 2. اللجوء إلى المجمع العام (Public Pool) endpoints = self._get_healthy_public_endpoints(network) if not endpoints: print(f"❌ [RPCManager V2.1] لا توجد نقاط RPC عامة متاحة لشبكة {network}") return None for endpoint in endpoints[:3]: # محاولة أفضل 3 start_time = time.time() ep_name = endpoint.split('//')[-1].split('/')[0] try: async with self.public_rpc_semaphore: response = await self.http_client.post(endpoint, json=payload, timeout=timeout) response.raise_for_status() latency = time.time() - start_time self._update_health(network, endpoint, success=True, latency=latency) self.session_stats['public_rpc_success'] += 1 print(f"✅ [RPC Public] {network} ({ep_name}) - {latency:.2f}s") return response.json() except Exception as e: latency = time.time() - start_time self._update_health(network, endpoint, success=False, latency=latency) self.session_stats['public_rpc_fail'] += 1 print(f"⚠️ [RPC Public] فشل {network} ({ep_name}): {type(e).__name__}") continue print(f"❌ [RPCManager V2.1] فشلت جميع محاولات RPC لشبكة {network}") return None async def get_scanner_api(self, base_url: str, params: dict, timeout: float = 15.0): """ (جديد V2) إجراء طلب GET لواجهات Scanners (Etherscan, BscScan, etc.) يستخدم المنظم المشترك (5/ثانية). """ self.session_stats['scanner_total'] += 1 try: async with self.scanner_semaphore: response = await self.http_client.get(base_url, params=params, headers=None, timeout=timeout) response.raise_for_status() self.session_stats['scanner_success'] += 1 return response.json() except Exception as e: self.session_stats['scanner_fail'] += 1 print(f"❌ [Scanner API] فشل الطلب من {base_url.split('//')[-1]}: {e}") return None # 🔴 --- START OF CHANGE (V2.1) --- 🔴 async def get_solscan_api(self, path: str, params: dict, timeout: float = 15.0): """ (جديد V2.1) إجراء طلب GET لـ Solscan Pro API. يستخدم المنظم الخاص به (15/ثانية) ومفتاح API. """ solscan_key = self.api_keys.get('solscan') if not solscan_key: print("❌ [Solscan API] لا يوجد مفتاح SOLSCAN_KEY.") self.session_stats['solscan_fail_key'] += 1 return None base_url = self.network_configs.get('solana', {}).get('explorer', {}).get('api_url', 'https://pro-api.solscan.io') full_url = f"{base_url}{path}" headers = {"accept": "application/json", "token": solscan_key} self.session_stats['solscan_total'] += 1 try: async with self.solscan_semaphore: response = await self.http_client.get(full_url, params=params, headers=headers, timeout=timeout) response.raise_for_status() self.session_stats['solscan_success'] += 1 return response.json() except Exception as e: self.session_stats['solscan_fail'] += 1 print(f"❌ [Solscan API] فشل الطلب من {path}: {e}") return None # 🔴 --- END OF CHANGE --- 🔴 async def get_moralis_api(self, base_url: str, params: dict, timeout: float = 20.0): """ (جديد V2) إجراء طلب GET لـ Moralis API. يستخدم المنظم الخاص به (1/ثانية) ومفتاح API. """ moralis_key = self.api_keys.get('moralis') if not moralis_key: print("❌ [Moralis API] لا يوجد مفتاح MORALIS_KEY.") return None headers = {"accept": "application/json", "X-API-Key": moralis_key} self.session_stats['moralis_total'] += 1 try: async with self.moralis_semaphore: # (ضمان وجود ثانية واحدة على الأقل بين الطلبات لتوزيع 40k على اليوم) current_time = time.time() if current_time - self.last_moralis_call < 1.0: await asyncio.sleep(1.0 - (current_time - self.last_moralis_call)) self.last_moralis_call = time.time() response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout) response.raise_for_status() self.session_stats['moralis_success'] += 1 return response.json() except Exception as e: self.session_stats['moralis_fail'] += 1 print(f"❌ [Moralis API] فشل الطلب: {e}") return None async def get_coingecko_api(self, params: dict, headers: dict = None, timeout: float = 15.0): """ (معدل V2) إجراء طلب GET لـ CoinGecko (يستخدم الآن إحصائيات ومنظم خاص). """ base_url = COINGECKO_BASE_URL self.session_stats['coingecko_total'] += 1 try: async with self.coingecko_semaphore: # (تطبيق "الخنق" لـ CoinGecko) current_time = time.time() time_since_last = current_time - self.last_coingecko_call if time_since_last < COINGECKO_REQUEST_DELAY: wait_time = COINGECKO_REQUEST_DELAY - time_since_last await asyncio.sleep(wait_time) self.last_coingecko_call = time.time() response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout) if response.status_code == 429: # Too Many Requests wait_duration = 15.0 print(f"⚠️ [CoinGecko] خطأ 429 (Rate Limit). الانتظار {wait_duration} ثوان...") await asyncio.sleep(wait_duration) self.last_coingecko_call = time.time() response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout) response.raise_for_status() self.session_stats['coingecko_success'] += 1 return response.json() except Exception as e: self.session_stats['coingecko_fail'] += 1 print(f"❌ [CoinGecko] فشل الطلب: {e}") return None