Trad / whale_monitor /rpc_manager.py
Riy777's picture
Update whale_monitor/rpc_manager.py
2eac7ea
raw
history blame
12.2 kB
# whale_monitor/rpc_manager.py
# هذا هو "الوكيل الذكي" لإدارة اتصالات RPC والمستكشفات
import asyncio
import httpx
import time
import ssl
import json
import os
from collections import deque, defaultdict
import random
# استيراد الإعدادات الثابتة
from .config import DEFAULT_NETWORK_CONFIGS, COINGECKO_BASE_URL
# إعدادات الوكيل الذكي
RPC_HEALTH_CHECK_WINDOW = 10 # تتبع آخر 10 طلبات
RPC_ERROR_THRESHOLD = 3 # عدد الأخطاء المتتالية لإيقاف مؤقت
RPC_CIRCUIT_BREAKER_DURATION = 300 # 5 دقائق إيقاف مؤقت
# (إضافة ثابت لفرض تأخير بين طلبات CoinGecko)
COINGECKO_REQUEST_DELAY = 2.0 # 2.0 ثانية (يساوي 30 طلب/دقيقة كحد أقصى)
class AdaptiveRpcManager:
"""
مدير RPC ذكي يتتبع صحة النقاط، ويدير حدود الطلبات،
ويختار أفضل نقطة متاحة تلقائياً.
"""
def __init__(self, http_client: httpx.AsyncClient, api_keys: dict):
self.http_client = http_client
self.api_keys = api_keys
# منظمات الطلبات (Semaphores)
self.rpc_semaphore = asyncio.Semaphore(5) # حد عام لـ RPC
self.coingecko_semaphore = asyncio.Semaphore(1) # حد خاص لـ CoinGecko
# (متغير لتتبع وقت آخر طلب لـ CoinGecko)
self.last_coingecko_call = 0.0
# تهيئة إعدادات الشبكة ونقاط RPC
self.network_configs = self._initialize_network_configs(DEFAULT_NETWORK_CONFIGS)
# نظام تتبع الصحة (التعلم)
self.endpoint_health = defaultdict(lambda: defaultdict(lambda: {
"latency": deque(maxlen=RPC_HEALTH_CHECK_WINDOW),
"consecutive_errors": 0,
"total_errors": 0,
"last_error_time": None,
"circuit_open": False,
}))
print("✅ مدير RPC الذكي (الوكيل) مهيأ.")
def _initialize_network_configs(self, configs):
"""
يقوم بحقن مفاتيح API في إعدادات الشبكة عند بدء التشغيل.
"""
initialized_configs = {}
for network, config in configs.items():
new_config = config.copy()
# حقن مفاتيح RPC
new_config['rpc_endpoints'] = self._inject_api_keys(
config['rpc_endpoints'],
self.api_keys.get('infura')
)
# حقن مفاتيح المستكشف (Explorer)
if config.get('explorer'):
explorer_key_name = config['explorer'].get('api_key_name')
if explorer_key_name and explorer_key_name in self.api_keys:
new_config['explorer']['api_key'] = self.api_keys[explorer_key_name]
else:
new_config['explorer']['api_key'] = None # مفتاح غير متوفر
initialized_configs[network] = new_config
return initialized_configs
def _inject_api_keys(self, endpoints: list, infura_key: str):
"""
يستبدل <INFURA_KEY> بالمفتاح الفعلي.
"""
if not infura_key:
# إزالة النقاط التي تعتمد على مفتاح غير متوفر
return [ep for ep in endpoints if "<INFURA_KEY>" not in ep]
return [ep.replace("<INFURA_KEY>", infura_key) for ep in endpoints]
def get_network_configs(self):
"""
إرجاع إعدادات الشبكة المهيأة (للاستخدام في core.py)
"""
return self.network_configs
def get_explorer_config(self, network: str):
"""
إرجاع إعدادات المستكشف (URL والمفتاح) لشبكة معينة.
"""
config = self.network_configs.get(network, {})
return config.get('explorer')
def _get_healthy_endpoints(self, network: str):
"""
"الذكاء" - يرتب نقاط RPC بناءً على الصحة.
يستبعد النقاط التي في وضع "قاطع الدائرة" (Circuit Breaker).
"""
if network not in self.network_configs:
return []
endpoints = self.network_configs[network]['rpc_endpoints']
healthy_endpoints = []
current_time = time.time()
for ep in endpoints:
health = self.endpoint_health[network][ep]
# التحقق من قاطع الدائرة
if health['circuit_open']:
if current_time - health['last_error_time'] > RPC_CIRCUIT_BREAKER_DURATION:
# إعادة فتح الدائرة للمحاولة
health['circuit_open'] = False
health['consecutive_errors'] = 0
print(f"ℹ️ [RPC Manager] إعادة تفعيل نقطة RPC: {ep.split('//')[-1]}")
else:
continue # النقطة لا تزال معطلة مؤقتاً
# حساب متوسط زمن الاستجابة
avg_latency = sum(health['latency']) / len(health['latency']) if health['latency'] else float('inf')
# (يمكن إضافة منطق ترتيب أكثر تعقيداً هنا مستقبلاً)
healthy_endpoints.append((ep, avg_latency, health['consecutive_errors']))
# الترتيب: الأقل أخطاء متتالية، ثم الأقل زمن استجابة
healthy_endpoints.sort(key=lambda x: (x[2], x[1]))
# خلط بسيط للنقاط السليمة لمنع التحميل الزائد على نقطة واحدة
healthy_list = [ep[0] for ep in healthy_endpoints if ep[2] == 0]
random.shuffle(healthy_list)
unhealthy_list = [ep[0] for ep in healthy_endpoints if ep[2] > 0]
return healthy_list + unhealthy_list
def _update_health(self, network: str, endpoint: str, success: bool, latency: float):
"""
"التعلم" - تحديث إحصائيات صحة نقطة RPC.
"""
health = self.endpoint_health[network][endpoint]
if success:
health['latency'].append(latency)
health['consecutive_errors'] = 0
health['circuit_open'] = False
else:
health['consecutive_errors'] += 1
health['total_errors'] += 1
health['last_error_time'] = time.time()
# 🔴 --- START OF CHANGE --- 🔴
# (تصحيح الخطأ الطباعي: استخدام RPC_ERROR_THRESHOLD)
if health['consecutive_errors'] >= RPC_ERROR_THRESHOLD:
# 🔴 --- END OF CHANGE --- 🔴
health['circuit_open'] = True
print(f"🚨 [RPC Manager] قاطع الدائرة مفعل! إيقاف مؤقت لـ: {endpoint.split('//')[-1]}")
async def post(self, network: str, payload: dict, timeout: float = 20.0):
"""
إرسال طلب POST إلى أفضل نقطة RPC متاحة للشبكة.
يعيد المحاولة تلقائياً عند الفشل.
"""
endpoints = self._get_healthy_endpoints(network)
if not endpoints:
print(f"❌ [RPC Manager] لا توجد نقاط RPC متاحة أو سليمة لشبكة {network}")
return None
# محاولة أفضل 3 نقاط متاحة
for endpoint in endpoints[:3]:
start_time = time.time()
ep_name = endpoint.split('//')[-1].split('/')[0] # للاختصار
try:
async with self.rpc_semaphore:
response = await self.http_client.post(endpoint, json=payload, timeout=timeout)
response.raise_for_status() # يطلق استثناء لأخطاء 4xx/5xx
latency = time.time() - start_time
self._update_health(network, endpoint, success=True, latency=latency)
print(f"✅ [RPC] {network} ({ep_name}) - {latency:.2f}s")
return response.json()
except (httpx.HTTPStatusError, httpx.RequestError, ssl.SSLError, asyncio.TimeoutError, json.JSONDecodeError) as e:
latency = time.time() - start_time
self._update_health(network, endpoint, success=False, latency=latency)
print(f"⚠️ [RPC] فشل {network} ({ep_name}): {type(e).__name__}")
continue # انتقل إلى النقطة التالية
except Exception as e:
latency = time.time() - start_time
self._update_health(network, endpoint, success=False, latency=latency)
print(f"❌ [RPC] فشل فادح {network} ({ep_name}): {e}")
continue # انتقل إلى النقطة التالية
print(f"❌ [RPC Manager] فشلت جميع المحاولات لشبكة {network} للطلب: {payload.get('method', 'N/A')}")
return None
async def get(self, base_url: str, params: dict, headers: dict = None, timeout: float = 15.0, use_coingecko_semaphore: bool = False):
"""
(محدث) إرسال طلب GET (للمستكشفات أو CoinGecko).
يدير حدود الطلبات بذكاء (Throttling).
"""
semaphore = self.coingecko_semaphore if use_coingecko_semaphore else self.rpc_semaphore
try:
async with semaphore:
# (تطبيق "الخنق" لـ CoinGecko)
if use_coingecko_semaphore:
current_time = time.time()
time_since_last = current_time - self.last_coingecko_call
if time_since_last < COINGECKO_REQUEST_DELAY:
wait_time = COINGECKO_REQUEST_DELAY - time_since_last
# print(f" [CoinGecko Throttler] الانتظار {wait_time:.2f} ثانية...")
await asyncio.sleep(wait_time)
# (تحديث وقت آخر استدعاء *قبل* الطلب)
self.last_coingecko_call = time.time()
response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout)
if response.status_code == 429: # Too Many Requests
# (زيادة وقت الانتظار)
wait_duration = 15.0
print(f"⚠️ [GET] خطأ 429 (Rate Limit) من {base_url}. الانتظار {wait_duration} ثوان...")
await asyncio.sleep(wait_duration)
# (تحديث وقت آخر استدعاء مرة أخرى بعد الانتظار)
if use_coingecko_semaphore:
self.last_coingecko_call = time.time()
# إعادة المحاولة مرة واحدة
response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
print(f"❌ [GET] خطأ HTTP من {base_url}: {e.response.status_code}")
return None
except (httpx.RequestError, asyncio.TimeoutError) as e:
print(f"❌ [GET] خطأ اتصال/مهلة من {base_url}: {e}")
return None
except json.JSONDecodeError as e:
print(f"❌ [GET] خطأ JSON من {base_url}: {e}")
return None
except Exception as e:
print(f"❌ [GET] خطأ غير متوقع من {base_url}: {e}")
return None