|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio |
|
|
import httpx |
|
|
import time |
|
|
import ssl |
|
|
import json |
|
|
import os |
|
|
import csv |
|
|
from collections import deque, defaultdict |
|
|
import random |
|
|
from typing import Dict, Any, Optional, List |
|
|
|
|
|
|
|
|
from .config import DEFAULT_NETWORK_CONFIGS, COINGECKO_BASE_URL |
|
|
|
|
|
|
|
|
RPC_HEALTH_CHECK_WINDOW = 10 |
|
|
RPC_ERROR_THRESHOLD = 3 |
|
|
RPC_CIRCUIT_BREAKER_DURATION = 300 |
|
|
|
|
|
|
|
|
COINGECKO_REQUEST_DELAY = 2.0 |
|
|
|
|
|
|
|
|
_CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
|
|
CUSTOM_RPC_CSV_FILE = os.path.join(_CURRENT_DIR, 'rpc_endpoints (1).csv') |
|
|
|
|
|
class AdaptiveRpcManager: |
|
|
""" |
|
|
(محدث V2.1) |
|
|
مدير RPC و API ذكي: |
|
|
1. يدير صحة نقاط RPC العامة (Public). |
|
|
2. يدير منظمات الطلبات (Rate Limiters) للمفاتيح الخاصة (Infura, Moralis, Scanners, Solscan). |
|
|
3. يتتبع إحصائيات الجلسة (Session Stats) لجميع الطلبات. |
|
|
""" |
|
|
def __init__(self, http_client: httpx.AsyncClient): |
|
|
self.http_client = http_client |
|
|
|
|
|
|
|
|
self.api_keys = { |
|
|
"infura": os.getenv("INFURA_KEY"), |
|
|
"moralis": os.getenv("MORALIS_KEY"), |
|
|
"etherscan": os.getenv("ETHERSCAN_KEY"), |
|
|
"bscscan": os.getenv("BSCSCAN_KEY"), |
|
|
"polygonscan": os.getenv("POLYGONSCAN_KEY"), |
|
|
|
|
|
"solscan": os.getenv("SOLSCAN_KEY"), |
|
|
|
|
|
} |
|
|
print("✅ [RPCManager V2.1] تم تحميل المفاتيح الخاصة من البيئة.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.infura_semaphore = asyncio.Semaphore(450) |
|
|
|
|
|
|
|
|
self.moralis_semaphore = asyncio.Semaphore(1) |
|
|
|
|
|
|
|
|
self.scanner_semaphore = asyncio.Semaphore(5) |
|
|
|
|
|
|
|
|
|
|
|
self.solscan_semaphore = asyncio.Semaphore(15) |
|
|
|
|
|
|
|
|
|
|
|
self.coingecko_semaphore = asyncio.Semaphore(1) |
|
|
|
|
|
|
|
|
self.public_rpc_semaphore = asyncio.Semaphore(10) |
|
|
|
|
|
self.last_coingecko_call = 0.0 |
|
|
self.last_moralis_call = 0.0 |
|
|
|
|
|
|
|
|
self.session_stats = defaultdict(int) |
|
|
|
|
|
|
|
|
self.network_configs = self._initialize_network_configs(DEFAULT_NETWORK_CONFIGS) |
|
|
|
|
|
|
|
|
self.endpoint_health = defaultdict(lambda: defaultdict(lambda: { |
|
|
"latency": deque(maxlen=RPC_HEALTH_CHECK_WINDOW), |
|
|
"consecutive_errors": 0, "total_errors": 0, "last_error_time": None, |
|
|
"circuit_open": False, |
|
|
})) |
|
|
|
|
|
print("✅ [RPCManager V2.1] مدير RPC/API الذكي (V2.1) مهيأ.") |
|
|
|
|
|
def _load_rpc_from_csv(self, csv_file_path: str) -> Dict[str, List[str]]: |
|
|
""" |
|
|
(جديد V2) |
|
|
قراءة ملف rpc_endpoints (1).csv ودمج النقاط. |
|
|
يتوقع الملف أن يحتوي على عمودين: 'network' و 'url' |
|
|
""" |
|
|
custom_rpcs = defaultdict(list) |
|
|
if not os.path.exists(csv_file_path): |
|
|
print(f"⚠️ [RPCManager V2] ملف CSV المخصص '{csv_file_path}' غير موجود. سيتم تخطيه.") |
|
|
return custom_rpcs |
|
|
|
|
|
try: |
|
|
with open(csv_file_path, mode='r', encoding='utf-8') as f: |
|
|
reader = csv.DictReader(f) |
|
|
for row in reader: |
|
|
network = row.get('network') |
|
|
url = row.get('url') |
|
|
if network and url and url.startswith('http'): |
|
|
custom_rpcs[network].append(url) |
|
|
print(f"✅ [RPCManager V2] تم تحميل {sum(len(v) for v in custom_rpcs.values())} نقطة RPC مخصصة من {csv_file_path}") |
|
|
return custom_rpcs |
|
|
except Exception as e: |
|
|
print(f"❌ [RPCManager V2] فشل في قراءة ملف CSV '{csv_file_path}': {e}") |
|
|
return defaultdict(list) |
|
|
|
|
|
def _initialize_network_configs(self, configs): |
|
|
""" |
|
|
(محدث V2.1) |
|
|
يقوم بدمج CSV، وحقن مفاتيح API، وإعداد الشبكات. |
|
|
""" |
|
|
initialized_configs = {} |
|
|
|
|
|
|
|
|
custom_rpcs = self._load_rpc_from_csv(CUSTOM_RPC_CSV_FILE) |
|
|
|
|
|
for network, config in configs.items(): |
|
|
new_config = config.copy() |
|
|
|
|
|
|
|
|
new_config['rpc_endpoints'] = self._inject_api_keys( |
|
|
config['rpc_endpoints'], |
|
|
self.api_keys.get('infura') |
|
|
) |
|
|
|
|
|
|
|
|
if network in custom_rpcs: |
|
|
new_config['rpc_endpoints'].extend(custom_rpcs[network]) |
|
|
print(f" ... دمج {len(custom_rpcs[network])} نقاط مخصصة لشبكة {network}") |
|
|
|
|
|
|
|
|
random.shuffle(new_config['rpc_endpoints']) |
|
|
|
|
|
|
|
|
if config.get('explorer'): |
|
|
explorer_key_name = config['explorer'].get('api_key_name') |
|
|
if explorer_key_name and explorer_key_name in self.api_keys: |
|
|
new_config['explorer']['api_key'] = self.api_keys[explorer_key_name] |
|
|
else: |
|
|
new_config['explorer']['api_key'] = None |
|
|
|
|
|
initialized_configs[network] = new_config |
|
|
return initialized_configs |
|
|
|
|
|
def _inject_api_keys(self, endpoints: list, infura_key: str): |
|
|
""" |
|
|
يستبدل <INFURA_KEY> بالمفتاح الفعلي. |
|
|
""" |
|
|
if not infura_key: |
|
|
|
|
|
return [ep for ep in endpoints if "<INFURA_KEY>" not in ep] |
|
|
|
|
|
return [ep.replace("<INFURA_KEY>", infura_key) for ep in endpoints] |
|
|
|
|
|
|
|
|
|
|
|
def get_network_configs(self): |
|
|
return self.network_configs |
|
|
|
|
|
def get_explorer_config(self, network: str): |
|
|
config = self.network_configs.get(network, {}) |
|
|
return config.get('explorer') |
|
|
|
|
|
def get_api_key(self, key_name: str) -> Optional[str]: |
|
|
"""(جديد V2) جلب مفتاح API بأمان""" |
|
|
return self.api_keys.get(key_name) |
|
|
|
|
|
|
|
|
|
|
|
def reset_session_stats(self): |
|
|
"""(جديد V2) تصفير عدادات الإحصائيات للجلسة الجديدة""" |
|
|
self.session_stats = defaultdict(int) |
|
|
print("📊 [RPCManager V2.1] تم تصفير عدادات إحصائيات الجلسة.") |
|
|
|
|
|
def get_session_stats(self) -> Dict[str, int]: |
|
|
"""(جديد V2) إرجاع نسخة من الإحصائيات الحالية""" |
|
|
return self.session_stats.copy() |
|
|
|
|
|
|
|
|
|
|
|
def _get_healthy_public_endpoints(self, network: str): |
|
|
""" |
|
|
(معدل V2) |
|
|
يرتب نقاط RPC العامة (فقط) بناءً على الصحة. |
|
|
""" |
|
|
if network not in self.network_configs: return [] |
|
|
|
|
|
endpoints = self.network_configs[network]['rpc_endpoints'] |
|
|
healthy_endpoints = [] |
|
|
current_time = time.time() |
|
|
|
|
|
for ep in endpoints: |
|
|
|
|
|
if "infura.io" in ep: |
|
|
continue |
|
|
|
|
|
health = self.endpoint_health[network][ep] |
|
|
if health['circuit_open']: |
|
|
if current_time - health['last_error_time'] > RPC_CIRCUIT_BREAKER_DURATION: |
|
|
health['circuit_open'] = False; health['consecutive_errors'] = 0 |
|
|
else: |
|
|
continue |
|
|
|
|
|
avg_latency = sum(health['latency']) / len(health['latency']) if health['latency'] else float('inf') |
|
|
healthy_endpoints.append((ep, avg_latency, health['consecutive_errors'])) |
|
|
|
|
|
healthy_endpoints.sort(key=lambda x: (x[2], x[1])) |
|
|
|
|
|
|
|
|
healthy_list = [ep[0] for ep in healthy_endpoints if ep[2] == 0] |
|
|
random.shuffle(healthy_list) |
|
|
unhealthy_list = [ep[0] for ep in healthy_endpoints if ep[2] > 0] |
|
|
|
|
|
return healthy_list + unhealthy_list |
|
|
|
|
|
def _update_health(self, network: str, endpoint: str, success: bool, latency: float): |
|
|
"""(لا تغيير) تحديث إحصائيات صحة نقطة RPC العامة.""" |
|
|
health = self.endpoint_health[network][endpoint] |
|
|
if success: |
|
|
health['latency'].append(latency) |
|
|
health['consecutive_errors'] = 0 |
|
|
health['circuit_open'] = False |
|
|
else: |
|
|
health['consecutive_errors'] += 1; health['total_errors'] += 1 |
|
|
health['last_error_time'] = time.time() |
|
|
if health['consecutive_errors'] >= RPC_ERROR_THRESHOLD: |
|
|
health['circuit_open'] = True |
|
|
print(f"🚨 [RPCManager V2.1] قاطع الدائرة مفعل (عام)! إيقاف مؤقت لـ: {endpoint.split('//')[-1]}") |
|
|
|
|
|
|
|
|
|
|
|
async def post_rpc(self, network: str, payload: dict, timeout: float = 20.0): |
|
|
""" |
|
|
(محدث V2) |
|
|
إرسال طلب POST (JSON-RPC) |
|
|
سيحاول مع Infura أولاً (إذا كان متاحاً)، ثم يلجأ إلى المجمع العام. |
|
|
""" |
|
|
|
|
|
|
|
|
infura_key = self.api_keys.get('infura') |
|
|
infura_endpoint = next((ep for ep in self.network_configs.get(network, {}).get('rpc_endpoints', []) if "infura.io" in ep), None) |
|
|
|
|
|
if infura_key and infura_endpoint: |
|
|
start_time = time.time() |
|
|
try: |
|
|
async with self.infura_semaphore: |
|
|
response = await self.http_client.post(infura_endpoint, json=payload, timeout=timeout) |
|
|
response.raise_for_status() |
|
|
|
|
|
self.session_stats['infura_success'] += 1 |
|
|
latency = time.time() - start_time |
|
|
print(f"✅ [RPC Infura] {network} - {latency:.2f}s") |
|
|
return response.json() |
|
|
|
|
|
except Exception as e: |
|
|
self.session_stats['infura_fail'] += 1 |
|
|
print(f"⚠️ [RPC Infura] فشل {network}: {type(e).__name__}. اللجوء إلى المجمع العام...") |
|
|
|
|
|
|
|
|
endpoints = self._get_healthy_public_endpoints(network) |
|
|
if not endpoints: |
|
|
print(f"❌ [RPCManager V2.1] لا توجد نقاط RPC عامة متاحة لشبكة {network}") |
|
|
return None |
|
|
|
|
|
for endpoint in endpoints[:3]: |
|
|
start_time = time.time() |
|
|
ep_name = endpoint.split('//')[-1].split('/')[0] |
|
|
try: |
|
|
async with self.public_rpc_semaphore: |
|
|
response = await self.http_client.post(endpoint, json=payload, timeout=timeout) |
|
|
response.raise_for_status() |
|
|
|
|
|
latency = time.time() - start_time |
|
|
self._update_health(network, endpoint, success=True, latency=latency) |
|
|
self.session_stats['public_rpc_success'] += 1 |
|
|
print(f"✅ [RPC Public] {network} ({ep_name}) - {latency:.2f}s") |
|
|
return response.json() |
|
|
|
|
|
except Exception as e: |
|
|
latency = time.time() - start_time |
|
|
self._update_health(network, endpoint, success=False, latency=latency) |
|
|
self.session_stats['public_rpc_fail'] += 1 |
|
|
print(f"⚠️ [RPC Public] فشل {network} ({ep_name}): {type(e).__name__}") |
|
|
continue |
|
|
|
|
|
print(f"❌ [RPCManager V2.1] فشلت جميع محاولات RPC لشبكة {network}") |
|
|
return None |
|
|
|
|
|
async def get_scanner_api(self, base_url: str, params: dict, timeout: float = 15.0): |
|
|
""" |
|
|
(جديد V2) |
|
|
إجراء طلب GET لواجهات Scanners (Etherscan, BscScan, etc.) |
|
|
يستخدم المنظم المشترك (5/ثانية). |
|
|
""" |
|
|
self.session_stats['scanner_total'] += 1 |
|
|
try: |
|
|
async with self.scanner_semaphore: |
|
|
response = await self.http_client.get(base_url, params=params, headers=None, timeout=timeout) |
|
|
response.raise_for_status() |
|
|
self.session_stats['scanner_success'] += 1 |
|
|
return response.json() |
|
|
except Exception as e: |
|
|
self.session_stats['scanner_fail'] += 1 |
|
|
print(f"❌ [Scanner API] فشل الطلب من {base_url.split('//')[-1]}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
async def get_solscan_api(self, path: str, params: dict, timeout: float = 15.0): |
|
|
""" |
|
|
(جديد V2.1) |
|
|
إجراء طلب GET لـ Solscan Pro API. |
|
|
يستخدم المنظم الخاص به (15/ثانية) ومفتاح API. |
|
|
""" |
|
|
solscan_key = self.api_keys.get('solscan') |
|
|
if not solscan_key: |
|
|
print("❌ [Solscan API] لا يوجد مفتاح SOLSCAN_KEY.") |
|
|
self.session_stats['solscan_fail_key'] += 1 |
|
|
return None |
|
|
|
|
|
base_url = self.network_configs.get('solana', {}).get('explorer', {}).get('api_url', 'https://pro-api.solscan.io') |
|
|
full_url = f"{base_url}{path}" |
|
|
headers = {"accept": "application/json", "token": solscan_key} |
|
|
self.session_stats['solscan_total'] += 1 |
|
|
|
|
|
try: |
|
|
async with self.solscan_semaphore: |
|
|
response = await self.http_client.get(full_url, params=params, headers=headers, timeout=timeout) |
|
|
response.raise_for_status() |
|
|
|
|
|
self.session_stats['solscan_success'] += 1 |
|
|
return response.json() |
|
|
except Exception as e: |
|
|
self.session_stats['solscan_fail'] += 1 |
|
|
print(f"❌ [Solscan API] فشل الطلب من {path}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
async def get_moralis_api(self, base_url: str, params: dict, timeout: float = 20.0): |
|
|
""" |
|
|
(جديد V2) |
|
|
إجراء طلب GET لـ Moralis API. |
|
|
يستخدم المنظم الخاص به (1/ثانية) ومفتاح API. |
|
|
""" |
|
|
moralis_key = self.api_keys.get('moralis') |
|
|
if not moralis_key: |
|
|
print("❌ [Moralis API] لا يوجد مفتاح MORALIS_KEY.") |
|
|
return None |
|
|
|
|
|
headers = {"accept": "application/json", "X-API-Key": moralis_key} |
|
|
self.session_stats['moralis_total'] += 1 |
|
|
|
|
|
try: |
|
|
async with self.moralis_semaphore: |
|
|
|
|
|
current_time = time.time() |
|
|
if current_time - self.last_moralis_call < 1.0: |
|
|
await asyncio.sleep(1.0 - (current_time - self.last_moralis_call)) |
|
|
self.last_moralis_call = time.time() |
|
|
|
|
|
response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout) |
|
|
response.raise_for_status() |
|
|
|
|
|
self.session_stats['moralis_success'] += 1 |
|
|
return response.json() |
|
|
except Exception as e: |
|
|
self.session_stats['moralis_fail'] += 1 |
|
|
print(f"❌ [Moralis API] فشل الطلب: {e}") |
|
|
return None |
|
|
|
|
|
async def get_coingecko_api(self, params: dict, headers: dict = None, timeout: float = 15.0): |
|
|
""" |
|
|
(معدل V2) |
|
|
إجراء طلب GET لـ CoinGecko (يستخدم الآن إحصائيات ومنظم خاص). |
|
|
""" |
|
|
base_url = COINGECKO_BASE_URL |
|
|
self.session_stats['coingecko_total'] += 1 |
|
|
try: |
|
|
async with self.coingecko_semaphore: |
|
|
|
|
|
current_time = time.time() |
|
|
time_since_last = current_time - self.last_coingecko_call |
|
|
if time_since_last < COINGECKO_REQUEST_DELAY: |
|
|
wait_time = COINGECKO_REQUEST_DELAY - time_since_last |
|
|
await asyncio.sleep(wait_time) |
|
|
self.last_coingecko_call = time.time() |
|
|
|
|
|
response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout) |
|
|
|
|
|
if response.status_code == 429: |
|
|
wait_duration = 15.0 |
|
|
print(f"⚠️ [CoinGecko] خطأ 429 (Rate Limit). الانتظار {wait_duration} ثوان...") |
|
|
await asyncio.sleep(wait_duration) |
|
|
self.last_coingecko_call = time.time() |
|
|
response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout) |
|
|
|
|
|
response.raise_for_status() |
|
|
|
|
|
self.session_stats['coingecko_success'] += 1 |
|
|
return response.json() |
|
|
|
|
|
except Exception as e: |
|
|
self.session_stats['coingecko_fail'] += 1 |
|
|
print(f"❌ [CoinGecko] فشل الطلب: {e}") |
|
|
return None |