Trad / whale_monitor /rpc_manager.py
Riy777's picture
Update whale_monitor/rpc_manager.py
b08a9b4
# whale_monitor/rpc_manager.py
# (V2.1 - إضافة دعم Solscan API)
# هذا هو "الوكيل الذكي" لإدارة اتصالات RPC والمستكشفات (APIs)
# يدير منظمات الطلبات (Rate Limiters) والإحصائيات
import asyncio
import httpx
import time
import ssl
import json
import os
import csv
from collections import deque, defaultdict
import random
from typing import Dict, Any, Optional, List
# استيراد الإعدادات الثابتة
from .config import DEFAULT_NETWORK_CONFIGS, COINGECKO_BASE_URL
# إعدادات الوكيل الذكي
RPC_HEALTH_CHECK_WINDOW = 10 # تتبع آخر 10 طلبات
RPC_ERROR_THRESHOLD = 3 # عدد الأخطاء المتتالية لإيقاف مؤقت
RPC_CIRCUIT_BREAKER_DURATION = 300 # 5 دقائق إيقاف مؤقت
# (إضافة ثابت لفرض تأخير بين طلبات CoinGecko)
COINGECKO_REQUEST_DELAY = 2.0 # 2.0 ثانية (يساوي 30 طلب/دقيقة كحد أقصى)
# (تحديد المسار الحالي لملف rpc_manager.py)
_CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
# (تحديد مسار ملف CSV داخل نفس المجلد)
CUSTOM_RPC_CSV_FILE = os.path.join(_CURRENT_DIR, 'rpc_endpoints (1).csv')
class AdaptiveRpcManager:
"""
(محدث V2.1)
مدير RPC و API ذكي:
1. يدير صحة نقاط RPC العامة (Public).
2. يدير منظمات الطلبات (Rate Limiters) للمفاتيح الخاصة (Infura, Moralis, Scanners, Solscan).
3. يتتبع إحصائيات الجلسة (Session Stats) لجميع الطلبات.
"""
def __init__(self, http_client: httpx.AsyncClient):
self.http_client = http_client
# 1. تحميل المفاتيح الخاصة من متغيرات البيئة
self.api_keys = {
"infura": os.getenv("INFURA_KEY"),
"moralis": os.getenv("MORALIS_KEY"),
"etherscan": os.getenv("ETHERSCAN_KEY"),
"bscscan": os.getenv("BSCSCAN_KEY"),
"polygonscan": os.getenv("POLYGONSCAN_KEY"),
# 🔴 --- START OF CHANGE (V2.1) --- 🔴
"solscan": os.getenv("SOLSCAN_KEY"), # (إضافة مفتاح Solscan)
# 🔴 --- END OF CHANGE --- 🔴
}
print("✅ [RPCManager V2.1] تم تحميل المفاتيح الخاصة من البيئة.")
# 2. تهيئة منظمات الطلبات (Semaphores)
# لـ Infura (500 credits/sec) - سنستخدم 450 كحد أمان
self.infura_semaphore = asyncio.Semaphore(450)
# لـ Moralis (40k/day ~ 0.46/sec) - سنستخدم 1 لضمان طلب واحد في كل مرة
self.moralis_semaphore = asyncio.Semaphore(1)
# لـ Etherscan, BscScan, PolygonScan (الحد المشترك 5/sec)
self.scanner_semaphore = asyncio.Semaphore(5)
# 🔴 --- START OF CHANGE (V2.1) --- 🔴
# لـ Solscan (1000 reqs/60 sec ~ 16.6/sec) - سنستخدم 15 كحد أمان
self.solscan_semaphore = asyncio.Semaphore(15)
# 🔴 --- END OF CHANGE --- 🔴
# لـ CoinGecko (عام، سنكون حذرين)
self.coingecko_semaphore = asyncio.Semaphore(1)
# لـ مجمع RPC العام (Public Pool) (لحمايتهم من الضغط)
self.public_rpc_semaphore = asyncio.Semaphore(10)
self.last_coingecko_call = 0.0
self.last_moralis_call = 0.0
# 3. تهيئة إحصائيات الجلسة
self.session_stats = defaultdict(int)
# 4. تهيئة إعدادات الشبكة ونقاط RPC
self.network_configs = self._initialize_network_configs(DEFAULT_NETWORK_CONFIGS)
# 5. نظام تتبع الصحة (فقط لنقاط RPC العامة)
self.endpoint_health = defaultdict(lambda: defaultdict(lambda: {
"latency": deque(maxlen=RPC_HEALTH_CHECK_WINDOW),
"consecutive_errors": 0, "total_errors": 0, "last_error_time": None,
"circuit_open": False,
}))
print("✅ [RPCManager V2.1] مدير RPC/API الذكي (V2.1) مهيأ.")
def _load_rpc_from_csv(self, csv_file_path: str) -> Dict[str, List[str]]:
"""
(جديد V2)
قراءة ملف rpc_endpoints (1).csv ودمج النقاط.
يتوقع الملف أن يحتوي على عمودين: 'network' و 'url'
"""
custom_rpcs = defaultdict(list)
if not os.path.exists(csv_file_path):
print(f"⚠️ [RPCManager V2] ملف CSV المخصص '{csv_file_path}' غير موجود. سيتم تخطيه.")
return custom_rpcs
try:
with open(csv_file_path, mode='r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
network = row.get('network')
url = row.get('url')
if network and url and url.startswith('http'):
custom_rpcs[network].append(url)
print(f"✅ [RPCManager V2] تم تحميل {sum(len(v) for v in custom_rpcs.values())} نقطة RPC مخصصة من {csv_file_path}")
return custom_rpcs
except Exception as e:
print(f"❌ [RPCManager V2] فشل في قراءة ملف CSV '{csv_file_path}': {e}")
return defaultdict(list)
def _initialize_network_configs(self, configs):
"""
(محدث V2.1)
يقوم بدمج CSV، وحقن مفاتيح API، وإعداد الشبكات.
"""
initialized_configs = {}
# 1. تحميل نقاط CSV المخصصة
custom_rpcs = self._load_rpc_from_csv(CUSTOM_RPC_CSV_FILE)
for network, config in configs.items():
new_config = config.copy()
# 2. حقن مفاتيح Infura
new_config['rpc_endpoints'] = self._inject_api_keys(
config['rpc_endpoints'],
self.api_keys.get('infura')
)
# 3. دمج نقاط CSV
if network in custom_rpcs:
new_config['rpc_endpoints'].extend(custom_rpcs[network])
print(f" ... دمج {len(custom_rpcs[network])} نقاط مخصصة لشبكة {network}")
# 4. خلط القائمة النهائية (لضمان التوزيع)
random.shuffle(new_config['rpc_endpoints'])
# 5. حقن مفاتيح المستكشف (Explorer)
if config.get('explorer'):
explorer_key_name = config['explorer'].get('api_key_name')
if explorer_key_name and explorer_key_name in self.api_keys:
new_config['explorer']['api_key'] = self.api_keys[explorer_key_name]
else:
new_config['explorer']['api_key'] = None # مفتاح غير متوفر
initialized_configs[network] = new_config
return initialized_configs
def _inject_api_keys(self, endpoints: list, infura_key: str):
"""
يستبدل <INFURA_KEY> بالمفتاح الفعلي.
"""
if not infura_key:
# إزالة النقاط التي تعتمد على مفتاح غير متوفر
return [ep for ep in endpoints if "<INFURA_KEY>" not in ep]
return [ep.replace("<INFURA_KEY>", infura_key) for ep in endpoints]
# --- (دوال مساعدة للحصول على الإعدادات) ---
def get_network_configs(self):
return self.network_configs
def get_explorer_config(self, network: str):
config = self.network_configs.get(network, {})
return config.get('explorer')
def get_api_key(self, key_name: str) -> Optional[str]:
"""(جديد V2) جلب مفتاح API بأمان"""
return self.api_keys.get(key_name)
# --- (دوال إدارة الإحصائيات V2) ---
def reset_session_stats(self):
"""(جديد V2) تصفير عدادات الإحصائيات للجلسة الجديدة"""
self.session_stats = defaultdict(int)
print("📊 [RPCManager V2.1] تم تصفير عدادات إحصائيات الجلسة.")
def get_session_stats(self) -> Dict[str, int]:
"""(جديد V2) إرجاع نسخة من الإحصائيات الحالية"""
return self.session_stats.copy()
# --- (دوال الصحة لـ Public RPCs - لا تغيير) ---
def _get_healthy_public_endpoints(self, network: str):
"""
(معدل V2)
يرتب نقاط RPC العامة (فقط) بناءً على الصحة.
"""
if network not in self.network_configs: return []
endpoints = self.network_configs[network]['rpc_endpoints']
healthy_endpoints = []
current_time = time.time()
for ep in endpoints:
# (تخطي Infura، هذا المجمع للعام فقط)
if "infura.io" in ep:
continue
health = self.endpoint_health[network][ep]
if health['circuit_open']:
if current_time - health['last_error_time'] > RPC_CIRCUIT_BREAKER_DURATION:
health['circuit_open'] = False; health['consecutive_errors'] = 0
else:
continue # النقطة لا تزال معطلة مؤقتاً
avg_latency = sum(health['latency']) / len(health['latency']) if health['latency'] else float('inf')
healthy_endpoints.append((ep, avg_latency, health['consecutive_errors']))
healthy_endpoints.sort(key=lambda x: (x[2], x[1]))
# (فصل السليم تماماً عن غير السليم)
healthy_list = [ep[0] for ep in healthy_endpoints if ep[2] == 0]
random.shuffle(healthy_list) # خلط السليم للتوزيع
unhealthy_list = [ep[0] for ep in healthy_endpoints if ep[2] > 0]
return healthy_list + unhealthy_list
def _update_health(self, network: str, endpoint: str, success: bool, latency: float):
"""(لا تغيير) تحديث إحصائيات صحة نقطة RPC العامة."""
health = self.endpoint_health[network][endpoint]
if success:
health['latency'].append(latency)
health['consecutive_errors'] = 0
health['circuit_open'] = False
else:
health['consecutive_errors'] += 1; health['total_errors'] += 1
health['last_error_time'] = time.time()
if health['consecutive_errors'] >= RPC_ERROR_THRESHOLD:
health['circuit_open'] = True
print(f"🚨 [RPCManager V2.1] قاطع الدائرة مفعل (عام)! إيقاف مؤقت لـ: {endpoint.split('//')[-1]}")
# --- (دوال الاتصال الأساسية V2.1 - محدثة بالكامل) ---
async def post_rpc(self, network: str, payload: dict, timeout: float = 20.0):
"""
(محدث V2)
إرسال طلب POST (JSON-RPC)
سيحاول مع Infura أولاً (إذا كان متاحاً)، ثم يلجأ إلى المجمع العام.
"""
# 1. محاولة Infura أولاً (الأولوية)
infura_key = self.api_keys.get('infura')
infura_endpoint = next((ep for ep in self.network_configs.get(network, {}).get('rpc_endpoints', []) if "infura.io" in ep), None)
if infura_key and infura_endpoint:
start_time = time.time()
try:
async with self.infura_semaphore:
response = await self.http_client.post(infura_endpoint, json=payload, timeout=timeout)
response.raise_for_status()
self.session_stats['infura_success'] += 1
latency = time.time() - start_time
print(f"✅ [RPC Infura] {network} - {latency:.2f}s")
return response.json()
except Exception as e:
self.session_stats['infura_fail'] += 1
print(f"⚠️ [RPC Infura] فشل {network}: {type(e).__name__}. اللجوء إلى المجمع العام...")
# 2. اللجوء إلى المجمع العام (Public Pool)
endpoints = self._get_healthy_public_endpoints(network)
if not endpoints:
print(f"❌ [RPCManager V2.1] لا توجد نقاط RPC عامة متاحة لشبكة {network}")
return None
for endpoint in endpoints[:3]: # محاولة أفضل 3
start_time = time.time()
ep_name = endpoint.split('//')[-1].split('/')[0]
try:
async with self.public_rpc_semaphore:
response = await self.http_client.post(endpoint, json=payload, timeout=timeout)
response.raise_for_status()
latency = time.time() - start_time
self._update_health(network, endpoint, success=True, latency=latency)
self.session_stats['public_rpc_success'] += 1
print(f"✅ [RPC Public] {network} ({ep_name}) - {latency:.2f}s")
return response.json()
except Exception as e:
latency = time.time() - start_time
self._update_health(network, endpoint, success=False, latency=latency)
self.session_stats['public_rpc_fail'] += 1
print(f"⚠️ [RPC Public] فشل {network} ({ep_name}): {type(e).__name__}")
continue
print(f"❌ [RPCManager V2.1] فشلت جميع محاولات RPC لشبكة {network}")
return None
async def get_scanner_api(self, base_url: str, params: dict, timeout: float = 15.0):
"""
(جديد V2)
إجراء طلب GET لواجهات Scanners (Etherscan, BscScan, etc.)
يستخدم المنظم المشترك (5/ثانية).
"""
self.session_stats['scanner_total'] += 1
try:
async with self.scanner_semaphore:
response = await self.http_client.get(base_url, params=params, headers=None, timeout=timeout)
response.raise_for_status()
self.session_stats['scanner_success'] += 1
return response.json()
except Exception as e:
self.session_stats['scanner_fail'] += 1
print(f"❌ [Scanner API] فشل الطلب من {base_url.split('//')[-1]}: {e}")
return None
# 🔴 --- START OF CHANGE (V2.1) --- 🔴
async def get_solscan_api(self, path: str, params: dict, timeout: float = 15.0):
"""
(جديد V2.1)
إجراء طلب GET لـ Solscan Pro API.
يستخدم المنظم الخاص به (15/ثانية) ومفتاح API.
"""
solscan_key = self.api_keys.get('solscan')
if not solscan_key:
print("❌ [Solscan API] لا يوجد مفتاح SOLSCAN_KEY.")
self.session_stats['solscan_fail_key'] += 1
return None
base_url = self.network_configs.get('solana', {}).get('explorer', {}).get('api_url', 'https://pro-api.solscan.io')
full_url = f"{base_url}{path}"
headers = {"accept": "application/json", "token": solscan_key}
self.session_stats['solscan_total'] += 1
try:
async with self.solscan_semaphore:
response = await self.http_client.get(full_url, params=params, headers=headers, timeout=timeout)
response.raise_for_status()
self.session_stats['solscan_success'] += 1
return response.json()
except Exception as e:
self.session_stats['solscan_fail'] += 1
print(f"❌ [Solscan API] فشل الطلب من {path}: {e}")
return None
# 🔴 --- END OF CHANGE --- 🔴
async def get_moralis_api(self, base_url: str, params: dict, timeout: float = 20.0):
"""
(جديد V2)
إجراء طلب GET لـ Moralis API.
يستخدم المنظم الخاص به (1/ثانية) ومفتاح API.
"""
moralis_key = self.api_keys.get('moralis')
if not moralis_key:
print("❌ [Moralis API] لا يوجد مفتاح MORALIS_KEY.")
return None
headers = {"accept": "application/json", "X-API-Key": moralis_key}
self.session_stats['moralis_total'] += 1
try:
async with self.moralis_semaphore:
# (ضمان وجود ثانية واحدة على الأقل بين الطلبات لتوزيع 40k على اليوم)
current_time = time.time()
if current_time - self.last_moralis_call < 1.0:
await asyncio.sleep(1.0 - (current_time - self.last_moralis_call))
self.last_moralis_call = time.time()
response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout)
response.raise_for_status()
self.session_stats['moralis_success'] += 1
return response.json()
except Exception as e:
self.session_stats['moralis_fail'] += 1
print(f"❌ [Moralis API] فشل الطلب: {e}")
return None
async def get_coingecko_api(self, params: dict, headers: dict = None, timeout: float = 15.0):
"""
(معدل V2)
إجراء طلب GET لـ CoinGecko (يستخدم الآن إحصائيات ومنظم خاص).
"""
base_url = COINGECKO_BASE_URL
self.session_stats['coingecko_total'] += 1
try:
async with self.coingecko_semaphore:
# (تطبيق "الخنق" لـ CoinGecko)
current_time = time.time()
time_since_last = current_time - self.last_coingecko_call
if time_since_last < COINGECKO_REQUEST_DELAY:
wait_time = COINGECKO_REQUEST_DELAY - time_since_last
await asyncio.sleep(wait_time)
self.last_coingecko_call = time.time()
response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout)
if response.status_code == 429: # Too Many Requests
wait_duration = 15.0
print(f"⚠️ [CoinGecko] خطأ 429 (Rate Limit). الانتظار {wait_duration} ثوان...")
await asyncio.sleep(wait_duration)
self.last_coingecko_call = time.time()
response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout)
response.raise_for_status()
self.session_stats['coingecko_success'] += 1
return response.json()
except Exception as e:
self.session_stats['coingecko_fail'] += 1
print(f"❌ [CoinGecko] فشل الطلب: {e}")
return None