Spaces:
Running
Running
| # whale_monitor/rpc_manager.py | |
| # هذا هو "الوكيل الذكي" لإدارة اتصالات RPC والمستكشفات | |
| import asyncio | |
| import httpx | |
| import time | |
| import ssl | |
| import json | |
| import os | |
| from collections import deque, defaultdict | |
| import random | |
| # استيراد الإعدادات الثابتة | |
| from .config import DEFAULT_NETWORK_CONFIGS, COINGECKO_BASE_URL | |
| # إعدادات الوكيل الذكي | |
| RPC_HEALTH_CHECK_WINDOW = 10 # تتبع آخر 10 طلبات | |
| RPC_ERROR_THRESHOLD = 3 # عدد الأخطاء المتتالية لإيقاف مؤقت | |
| RPC_CIRCUIT_BREAKER_DURATION = 300 # 5 دقائق إيقاف مؤقت | |
| # (إضافة ثابت لفرض تأخير بين طلبات CoinGecko) | |
| COINGECKO_REQUEST_DELAY = 2.0 # 2.0 ثانية (يساوي 30 طلب/دقيقة كحد أقصى) | |
| class AdaptiveRpcManager: | |
| """ | |
| مدير RPC ذكي يتتبع صحة النقاط، ويدير حدود الطلبات، | |
| ويختار أفضل نقطة متاحة تلقائياً. | |
| """ | |
| def __init__(self, http_client: httpx.AsyncClient, api_keys: dict): | |
| self.http_client = http_client | |
| self.api_keys = api_keys | |
| # منظمات الطلبات (Semaphores) | |
| self.rpc_semaphore = asyncio.Semaphore(5) # حد عام لـ RPC | |
| self.coingecko_semaphore = asyncio.Semaphore(1) # حد خاص لـ CoinGecko | |
| # (متغير لتتبع وقت آخر طلب لـ CoinGecko) | |
| self.last_coingecko_call = 0.0 | |
| # تهيئة إعدادات الشبكة ونقاط RPC | |
| self.network_configs = self._initialize_network_configs(DEFAULT_NETWORK_CONFIGS) | |
| # نظام تتبع الصحة (التعلم) | |
| self.endpoint_health = defaultdict(lambda: defaultdict(lambda: { | |
| "latency": deque(maxlen=RPC_HEALTH_CHECK_WINDOW), | |
| "consecutive_errors": 0, | |
| "total_errors": 0, | |
| "last_error_time": None, | |
| "circuit_open": False, | |
| })) | |
| print("✅ مدير RPC الذكي (الوكيل) مهيأ.") | |
| def _initialize_network_configs(self, configs): | |
| """ | |
| يقوم بحقن مفاتيح API في إعدادات الشبكة عند بدء التشغيل. | |
| """ | |
| initialized_configs = {} | |
| for network, config in configs.items(): | |
| new_config = config.copy() | |
| # حقن مفاتيح RPC | |
| new_config['rpc_endpoints'] = self._inject_api_keys( | |
| config['rpc_endpoints'], | |
| self.api_keys.get('infura') | |
| ) | |
| # حقن مفاتيح المستكشف (Explorer) | |
| if config.get('explorer'): | |
| explorer_key_name = config['explorer'].get('api_key_name') | |
| if explorer_key_name and explorer_key_name in self.api_keys: | |
| new_config['explorer']['api_key'] = self.api_keys[explorer_key_name] | |
| else: | |
| new_config['explorer']['api_key'] = None # مفتاح غير متوفر | |
| initialized_configs[network] = new_config | |
| return initialized_configs | |
| def _inject_api_keys(self, endpoints: list, infura_key: str): | |
| """ | |
| يستبدل <INFURA_KEY> بالمفتاح الفعلي. | |
| """ | |
| if not infura_key: | |
| # إزالة النقاط التي تعتمد على مفتاح غير متوفر | |
| return [ep for ep in endpoints if "<INFURA_KEY>" not in ep] | |
| return [ep.replace("<INFURA_KEY>", infura_key) for ep in endpoints] | |
| def get_network_configs(self): | |
| """ | |
| إرجاع إعدادات الشبكة المهيأة (للاستخدام في core.py) | |
| """ | |
| return self.network_configs | |
| def get_explorer_config(self, network: str): | |
| """ | |
| إرجاع إعدادات المستكشف (URL والمفتاح) لشبكة معينة. | |
| """ | |
| config = self.network_configs.get(network, {}) | |
| return config.get('explorer') | |
| def _get_healthy_endpoints(self, network: str): | |
| """ | |
| "الذكاء" - يرتب نقاط RPC بناءً على الصحة. | |
| يستبعد النقاط التي في وضع "قاطع الدائرة" (Circuit Breaker). | |
| """ | |
| if network not in self.network_configs: | |
| return [] | |
| endpoints = self.network_configs[network]['rpc_endpoints'] | |
| healthy_endpoints = [] | |
| current_time = time.time() | |
| for ep in endpoints: | |
| health = self.endpoint_health[network][ep] | |
| # التحقق من قاطع الدائرة | |
| if health['circuit_open']: | |
| if current_time - health['last_error_time'] > RPC_CIRCUIT_BREAKER_DURATION: | |
| # إعادة فتح الدائرة للمحاولة | |
| health['circuit_open'] = False | |
| health['consecutive_errors'] = 0 | |
| print(f"ℹ️ [RPC Manager] إعادة تفعيل نقطة RPC: {ep.split('//')[-1]}") | |
| else: | |
| continue # النقطة لا تزال معطلة مؤقتاً | |
| # حساب متوسط زمن الاستجابة | |
| avg_latency = sum(health['latency']) / len(health['latency']) if health['latency'] else float('inf') | |
| # (يمكن إضافة منطق ترتيب أكثر تعقيداً هنا مستقبلاً) | |
| healthy_endpoints.append((ep, avg_latency, health['consecutive_errors'])) | |
| # الترتيب: الأقل أخطاء متتالية، ثم الأقل زمن استجابة | |
| healthy_endpoints.sort(key=lambda x: (x[2], x[1])) | |
| # خلط بسيط للنقاط السليمة لمنع التحميل الزائد على نقطة واحدة | |
| healthy_list = [ep[0] for ep in healthy_endpoints if ep[2] == 0] | |
| random.shuffle(healthy_list) | |
| unhealthy_list = [ep[0] for ep in healthy_endpoints if ep[2] > 0] | |
| return healthy_list + unhealthy_list | |
| def _update_health(self, network: str, endpoint: str, success: bool, latency: float): | |
| """ | |
| "التعلم" - تحديث إحصائيات صحة نقطة RPC. | |
| """ | |
| health = self.endpoint_health[network][endpoint] | |
| if success: | |
| health['latency'].append(latency) | |
| health['consecutive_errors'] = 0 | |
| health['circuit_open'] = False | |
| else: | |
| health['consecutive_errors'] += 1 | |
| health['total_errors'] += 1 | |
| health['last_error_time'] = time.time() | |
| # 🔴 --- START OF CHANGE --- 🔴 | |
| # (تصحيح الخطأ الطباعي: استخدام RPC_ERROR_THRESHOLD) | |
| if health['consecutive_errors'] >= RPC_ERROR_THRESHOLD: | |
| # 🔴 --- END OF CHANGE --- 🔴 | |
| health['circuit_open'] = True | |
| print(f"🚨 [RPC Manager] قاطع الدائرة مفعل! إيقاف مؤقت لـ: {endpoint.split('//')[-1]}") | |
| async def post(self, network: str, payload: dict, timeout: float = 20.0): | |
| """ | |
| إرسال طلب POST إلى أفضل نقطة RPC متاحة للشبكة. | |
| يعيد المحاولة تلقائياً عند الفشل. | |
| """ | |
| endpoints = self._get_healthy_endpoints(network) | |
| if not endpoints: | |
| print(f"❌ [RPC Manager] لا توجد نقاط RPC متاحة أو سليمة لشبكة {network}") | |
| return None | |
| # محاولة أفضل 3 نقاط متاحة | |
| for endpoint in endpoints[:3]: | |
| start_time = time.time() | |
| ep_name = endpoint.split('//')[-1].split('/')[0] # للاختصار | |
| try: | |
| async with self.rpc_semaphore: | |
| response = await self.http_client.post(endpoint, json=payload, timeout=timeout) | |
| response.raise_for_status() # يطلق استثناء لأخطاء 4xx/5xx | |
| latency = time.time() - start_time | |
| self._update_health(network, endpoint, success=True, latency=latency) | |
| print(f"✅ [RPC] {network} ({ep_name}) - {latency:.2f}s") | |
| return response.json() | |
| except (httpx.HTTPStatusError, httpx.RequestError, ssl.SSLError, asyncio.TimeoutError, json.JSONDecodeError) as e: | |
| latency = time.time() - start_time | |
| self._update_health(network, endpoint, success=False, latency=latency) | |
| print(f"⚠️ [RPC] فشل {network} ({ep_name}): {type(e).__name__}") | |
| continue # انتقل إلى النقطة التالية | |
| except Exception as e: | |
| latency = time.time() - start_time | |
| self._update_health(network, endpoint, success=False, latency=latency) | |
| print(f"❌ [RPC] فشل فادح {network} ({ep_name}): {e}") | |
| continue # انتقل إلى النقطة التالية | |
| print(f"❌ [RPC Manager] فشلت جميع المحاولات لشبكة {network} للطلب: {payload.get('method', 'N/A')}") | |
| return None | |
| async def get(self, base_url: str, params: dict, headers: dict = None, timeout: float = 15.0, use_coingecko_semaphore: bool = False): | |
| """ | |
| (محدث) إرسال طلب GET (للمستكشفات أو CoinGecko). | |
| يدير حدود الطلبات بذكاء (Throttling). | |
| """ | |
| semaphore = self.coingecko_semaphore if use_coingecko_semaphore else self.rpc_semaphore | |
| try: | |
| async with semaphore: | |
| # (تطبيق "الخنق" لـ CoinGecko) | |
| if use_coingecko_semaphore: | |
| current_time = time.time() | |
| time_since_last = current_time - self.last_coingecko_call | |
| if time_since_last < COINGECKO_REQUEST_DELAY: | |
| wait_time = COINGECKO_REQUEST_DELAY - time_since_last | |
| # print(f" [CoinGecko Throttler] الانتظار {wait_time:.2f} ثانية...") | |
| await asyncio.sleep(wait_time) | |
| # (تحديث وقت آخر استدعاء *قبل* الطلب) | |
| self.last_coingecko_call = time.time() | |
| response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout) | |
| if response.status_code == 429: # Too Many Requests | |
| # (زيادة وقت الانتظار) | |
| wait_duration = 15.0 | |
| print(f"⚠️ [GET] خطأ 429 (Rate Limit) من {base_url}. الانتظار {wait_duration} ثوان...") | |
| await asyncio.sleep(wait_duration) | |
| # (تحديث وقت آخر استدعاء مرة أخرى بعد الانتظار) | |
| if use_coingecko_semaphore: | |
| self.last_coingecko_call = time.time() | |
| # إعادة المحاولة مرة واحدة | |
| response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPStatusError as e: | |
| print(f"❌ [GET] خطأ HTTP من {base_url}: {e.response.status_code}") | |
| return None | |
| except (httpx.RequestError, asyncio.TimeoutError) as e: | |
| print(f"❌ [GET] خطأ اتصال/مهلة من {base_url}: {e}") | |
| return None | |
| except json.JSONDecodeError as e: | |
| print(f"❌ [GET] خطأ JSON من {base_url}: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"❌ [GET] خطأ غير متوقع من {base_url}: {e}") | |
| return None |