Spaces:
Running
Running
| # whale_monitor/rpc_manager.py | |
| # (V2 - الموجه الذكي) | |
| # هذا هو "الوكيل الذكي" لإدارة اتصالات RPC والمستكشفات (APIs) | |
| # يدير منظمات الطلبات (Rate Limiters) والإحصائيات | |
| import asyncio | |
| import httpx | |
| import time | |
| import ssl | |
| import json | |
| import os | |
| import csv | |
| from collections import deque, defaultdict | |
| import random | |
| from typing import Dict, Any, Optional, List | |
| # استيراد الإعدادات الثابتة | |
| from .config import DEFAULT_NETWORK_CONFIGS, COINGECKO_BASE_URL | |
| # إعدادات الوكيل الذكي | |
| RPC_HEALTH_CHECK_WINDOW = 10 # تتبع آخر 10 طلبات | |
| RPC_ERROR_THRESHOLD = 3 # عدد الأخطاء المتتالية لإيقاف مؤقت | |
| RPC_CIRCUIT_BREAKER_DURATION = 300 # 5 دقائق إيقاف مؤقت | |
| # (إضافة ثابت لفرض تأخير بين طلبات CoinGecko) | |
| COINGECKO_REQUEST_DELAY = 2.0 # 2.0 ثانية (يساوي 30 طلب/دقيقة كحد أقصى) | |
| # (تحديد المسار الحالي لملف rpc_manager.py) | |
| _CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| # (تحديد مسار ملف CSV داخل نفس المجلد) | |
| CUSTOM_RPC_CSV_FILE = os.path.join(_CURRENT_DIR, 'rpc_endpoints (1).csv') | |
| class AdaptiveRpcManager: | |
| """ | |
| (محدث V2) | |
| مدير RPC و API ذكي: | |
| 1. يدير صحة نقاط RPC العامة (Public). | |
| 2. يدير منظمات الطلبات (Rate Limiters) للمفاتيح الخاصة (Infura, Moralis, Scanners). | |
| 3. يتتبع إحصائيات الجلسة (Session Stats) لجميع الطلبات. | |
| """ | |
| def __init__(self, http_client: httpx.AsyncClient): | |
| self.http_client = http_client | |
| # 1. تحميل المفاتيح الخاصة من متغيرات البيئة | |
| self.api_keys = { | |
| "infura": os.getenv("INFURA_KEY"), | |
| "moralis": os.getenv("MORALIS_KEY"), | |
| "etherscan": os.getenv("ETHERSCAN_KEY"), | |
| "bscscan": os.getenv("BSCSCAN_KEY"), | |
| "polygonscan": os.getenv("POLYGONSCAN_KEY"), | |
| # (يمكن إضافة المزيد من مفاتيح Scanners هنا، مثل ARBISCAN_KEY) | |
| } | |
| print("✅ [RPCManager V2] تم تحميل المفاتيح الخاصة من البيئة.") | |
| # 2. تهيئة منظمات الطلبات (Semaphores) | |
| # (بناءً على الحدود التي قدمتها) | |
| # لـ Infura (500 credits/sec) - سنستخدم 450 كحد أمان | |
| self.infura_semaphore = asyncio.Semaphore(450) | |
| # لـ Moralis (40k/day ~ 0.46/sec) - سنستخدم 1 لضمان طلب واحد في كل مرة | |
| self.moralis_semaphore = asyncio.Semaphore(1) | |
| # لـ Etherscan, BscScan, PolygonScan (الحد المشترك 5/sec) | |
| self.scanner_semaphore = asyncio.Semaphore(5) | |
| # لـ CoinGecko (عام، سنكون حذرين) | |
| self.coingecko_semaphore = asyncio.Semaphore(1) | |
| # لـ مجمع RPC العام (Public Pool) (لحمايتهم من الضغط) | |
| self.public_rpc_semaphore = asyncio.Semaphore(10) | |
| self.last_coingecko_call = 0.0 | |
| self.last_moralis_call = 0.0 | |
| # 3. تهيئة إحصائيات الجلسة | |
| self.session_stats = defaultdict(int) | |
| # 4. تهيئة إعدادات الشبكة ونقاط RPC | |
| self.network_configs = self._initialize_network_configs(DEFAULT_NETWORK_CONFIGS) | |
| # 5. نظام تتبع الصحة (فقط لنقاط RPC العامة) | |
| self.endpoint_health = defaultdict(lambda: defaultdict(lambda: { | |
| "latency": deque(maxlen=RPC_HEALTH_CHECK_WINDOW), | |
| "consecutive_errors": 0, "total_errors": 0, "last_error_time": None, | |
| "circuit_open": False, | |
| })) | |
| print("✅ [RPCManager V2] مدير RPC/API الذكي (V2) مهيأ.") | |
| def _load_rpc_from_csv(self, csv_file_path: str) -> Dict[str, List[str]]: | |
| """ | |
| (جديد V2) | |
| قراءة ملف rpc_endpoints (1).csv ودمج النقاط. | |
| يتوقع الملف أن يحتوي على عمودين: 'network' و 'url' | |
| """ | |
| custom_rpcs = defaultdict(list) | |
| if not os.path.exists(csv_file_path): | |
| print(f"⚠️ [RPCManager V2] ملف CSV المخصص '{csv_file_path}' غير موجود. سيتم تخطيه.") | |
| return custom_rpcs | |
| try: | |
| with open(csv_file_path, mode='r', encoding='utf-8') as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| network = row.get('network') | |
| url = row.get('url') | |
| if network and url and url.startswith('http'): | |
| custom_rpcs[network].append(url) | |
| print(f"✅ [RPCManager V2] تم تحميل {sum(len(v) for v in custom_rpcs.values())} نقطة RPC مخصصة من {csv_file_path}") | |
| return custom_rpcs | |
| except Exception as e: | |
| print(f"❌ [RPCManager V2] فشل في قراءة ملف CSV '{csv_file_path}': {e}") | |
| return defaultdict(list) | |
| def _initialize_network_configs(self, configs): | |
| """ | |
| (محدث V2) | |
| يقوم بدمج CSV، وحقن مفاتيح API، وإعداد الشبكات. | |
| """ | |
| initialized_configs = {} | |
| # 1. تحميل نقاط CSV المخصصة | |
| custom_rpcs = self._load_rpc_from_csv(CUSTOM_RPC_CSV_FILE) | |
| for network, config in configs.items(): | |
| new_config = config.copy() | |
| # 2. حقن مفاتيح Infura | |
| new_config['rpc_endpoints'] = self._inject_api_keys( | |
| config['rpc_endpoints'], | |
| self.api_keys.get('infura') | |
| ) | |
| # 3. دمج نقاط CSV | |
| if network in custom_rpcs: | |
| new_config['rpc_endpoints'].extend(custom_rpcs[network]) | |
| print(f" ... دمج {len(custom_rpcs[network])} نقاط مخصصة لشبكة {network}") | |
| # 4. خلط القائمة النهائية (لضمان التوزيع) | |
| random.shuffle(new_config['rpc_endpoints']) | |
| # 5. حقن مفاتيح المستكشف (Explorer) | |
| if config.get('explorer'): | |
| explorer_key_name = config['explorer'].get('api_key_name') | |
| if explorer_key_name and explorer_key_name in self.api_keys: | |
| new_config['explorer']['api_key'] = self.api_keys[explorer_key_name] | |
| else: | |
| new_config['explorer']['api_key'] = None # مفتاح غير متوفر | |
| initialized_configs[network] = new_config | |
| return initialized_configs | |
| def _inject_api_keys(self, endpoints: list, infura_key: str): | |
| """ | |
| يستبدل <INFURA_KEY> بالمفتاح الفعلي. | |
| """ | |
| if not infura_key: | |
| # إزالة النقاط التي تعتمد على مفتاح غير متوفر | |
| return [ep for ep in endpoints if "<INFURA_KEY>" not in ep] | |
| return [ep.replace("<INFURA_KEY>", infura_key) for ep in endpoints] | |
| # --- (دوال مساعدة للحصول على الإعدادات) --- | |
| def get_network_configs(self): | |
| return self.network_configs | |
| def get_explorer_config(self, network: str): | |
| config = self.network_configs.get(network, {}) | |
| return config.get('explorer') | |
| def get_api_key(self, key_name: str) -> Optional[str]: | |
| """(جديد V2) جلب مفتاح API بأمان""" | |
| return self.api_keys.get(key_name) | |
| # --- (دوال إدارة الإحصائيات V2) --- | |
| def reset_session_stats(self): | |
| """(جديد V2) تصفير عدادات الإحصائيات للجلسة الجديدة""" | |
| self.session_stats = defaultdict(int) | |
| print("📊 [RPCManager V2] تم تصفير عدادات إحصائيات الجلسة.") | |
| def get_session_stats(self) -> Dict[str, int]: | |
| """(جديد V2) إرجاع نسخة من الإحصائيات الحالية""" | |
| return self.session_stats.copy() | |
| # --- (دوال الصحة لـ Public RPCs - لا تغيير) --- | |
| def _get_healthy_public_endpoints(self, network: str): | |
| """ | |
| (معدل V2) | |
| يرتب نقاط RPC العامة (فقط) بناءً على الصحة. | |
| """ | |
| if network not in self.network_configs: return [] | |
| endpoints = self.network_configs[network]['rpc_endpoints'] | |
| healthy_endpoints = [] | |
| current_time = time.time() | |
| for ep in endpoints: | |
| # (تخطي Infura، هذا المجمع للعام فقط) | |
| if "infura.io" in ep: | |
| continue | |
| health = self.endpoint_health[network][ep] | |
| if health['circuit_open']: | |
| if current_time - health['last_error_time'] > RPC_CIRCUIT_BREAKER_DURATION: | |
| health['circuit_open'] = False; health['consecutive_errors'] = 0 | |
| else: | |
| continue # النقطة لا تزال معطلة مؤقتاً | |
| avg_latency = sum(health['latency']) / len(health['latency']) if health['latency'] else float('inf') | |
| healthy_endpoints.append((ep, avg_latency, health['consecutive_errors'])) | |
| healthy_endpoints.sort(key=lambda x: (x[2], x[1])) | |
| # (فصل السليم تماماً عن غير السليم) | |
| healthy_list = [ep[0] for ep in healthy_endpoints if ep[2] == 0] | |
| random.shuffle(healthy_list) # خلط السليم للتوزيع | |
| unhealthy_list = [ep[0] for ep in healthy_endpoints if ep[2] > 0] | |
| return healthy_list + unhealthy_list | |
| def _update_health(self, network: str, endpoint: str, success: bool, latency: float): | |
| """(لا تغيير) تحديث إحصائيات صحة نقطة RPC العامة.""" | |
| health = self.endpoint_health[network][endpoint] | |
| if success: | |
| health['latency'].append(latency) | |
| health['consecutive_errors'] = 0 | |
| health['circuit_open'] = False | |
| else: | |
| health['consecutive_errors'] += 1; health['total_errors'] += 1 | |
| health['last_error_time'] = time.time() | |
| if health['consecutive_errors'] >= RPC_ERROR_THRESHOLD: | |
| health['circuit_open'] = True | |
| print(f"🚨 [RPCManager V2] قاطع الدائرة مفعل (عام)! إيقاف مؤقت لـ: {endpoint.split('//')[-1]}") | |
| # --- (دوال الاتصال الأساسية V2 - محدثة بالكامل) --- | |
| async def post_rpc(self, network: str, payload: dict, timeout: float = 20.0): | |
| """ | |
| (محدث V2) | |
| إرسال طلب POST (JSON-RPC) | |
| سيحاول مع Infura أولاً (إذا كان متاحاً)، ثم يلجأ إلى المجمع العام. | |
| """ | |
| # 1. محاولة Infura أولاً (الأولوية) | |
| infura_key = self.api_keys.get('infura') | |
| infura_endpoint = next((ep for ep in self.network_configs.get(network, {}).get('rpc_endpoints', []) if "infura.io" in ep), None) | |
| if infura_key and infura_endpoint: | |
| start_time = time.time() | |
| try: | |
| async with self.infura_semaphore: | |
| response = await self.http_client.post(infura_endpoint, json=payload, timeout=timeout) | |
| response.raise_for_status() | |
| self.session_stats['infura_success'] += 1 | |
| latency = time.time() - start_time | |
| print(f"✅ [RPC Infura] {network} - {latency:.2f}s") | |
| return response.json() | |
| except Exception as e: | |
| self.session_stats['infura_fail'] += 1 | |
| print(f"⚠️ [RPC Infura] فشل {network}: {type(e).__name__}. اللجوء إلى المجمع العام...") | |
| # 2. اللجوء إلى المجمع العام (Public Pool) | |
| endpoints = self._get_healthy_public_endpoints(network) | |
| if not endpoints: | |
| print(f"❌ [RPCManager V2] لا توجد نقاط RPC عامة متاحة لشبكة {network}") | |
| return None | |
| for endpoint in endpoints[:3]: # محاولة أفضل 3 | |
| start_time = time.time() | |
| ep_name = endpoint.split('//')[-1].split('/')[0] | |
| try: | |
| async with self.public_rpc_semaphore: | |
| response = await self.http_client.post(endpoint, json=payload, timeout=timeout) | |
| response.raise_for_status() | |
| latency = time.time() - start_time | |
| self._update_health(network, endpoint, success=True, latency=latency) | |
| self.session_stats['public_rpc_success'] += 1 | |
| print(f"✅ [RPC Public] {network} ({ep_name}) - {latency:.2f}s") | |
| return response.json() | |
| except Exception as e: | |
| latency = time.time() - start_time | |
| self._update_health(network, endpoint, success=False, latency=latency) | |
| self.session_stats['public_rpc_fail'] += 1 | |
| print(f"⚠️ [RPC Public] فشل {network} ({ep_name}): {type(e).__name__}") | |
| continue | |
| print(f"❌ [RPCManager V2] فشلت جميع محاولات RPC لشبكة {network}") | |
| return None | |
| async def get_scanner_api(self, base_url: str, params: dict, timeout: float = 15.0): | |
| """ | |
| (جديد V2) | |
| إجراء طلب GET لواجهات Scanners (Etherscan, BscScan, etc.) | |
| يستخدم المنظم المشترك (5/ثانية). | |
| """ | |
| self.session_stats['scanner_total'] += 1 | |
| try: | |
| async with self.scanner_semaphore: | |
| response = await self.http_client.get(base_url, params=params, headers=None, timeout=timeout) | |
| response.raise_for_status() | |
| self.session_stats['scanner_success'] += 1 | |
| return response.json() | |
| except Exception as e: | |
| self.session_stats['scanner_fail'] += 1 | |
| print(f"❌ [Scanner API] فشل الطلب من {base_url.split('//')[-1]}: {e}") | |
| return None | |
| async def get_moralis_api(self, base_url: str, params: dict, timeout: float = 20.0): | |
| """ | |
| (جديد V2) | |
| إجراء طلب GET لـ Moralis API. | |
| يستخدم المنظم الخاص به (1/ثانية) ومفتاح API. | |
| """ | |
| moralis_key = self.api_keys.get('moralis') | |
| if not moralis_key: | |
| print("❌ [Moralis API] لا يوجد مفتاح MORALIS_KEY.") | |
| return None | |
| headers = {"accept": "application/json", "X-API-Key": moralis_key} | |
| self.session_stats['moralis_total'] += 1 | |
| try: | |
| async with self.moralis_semaphore: | |
| # (ضمان وجود ثانية واحدة على الأقل بين الطلبات لتوزيع 40k على اليوم) | |
| current_time = time.time() | |
| if current_time - self.last_moralis_call < 1.0: | |
| await asyncio.sleep(1.0 - (current_time - self.last_moralis_call)) | |
| self.last_moralis_call = time.time() | |
| response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout) | |
| response.raise_for_status() | |
| self.session_stats['moralis_success'] += 1 | |
| return response.json() | |
| except Exception as e: | |
| self.session_stats['moralis_fail'] += 1 | |
| print(f"❌ [Moralis API] فشل الطلب: {e}") | |
| return None | |
| async def get_coingecko_api(self, params: dict, headers: dict = None, timeout: float = 15.0): | |
| """ | |
| (معدل V2) | |
| إجراء طلب GET لـ CoinGecko (يستخدم الآن إحصائيات ومنظم خاص). | |
| """ | |
| base_url = COINGECKO_BASE_URL | |
| self.session_stats['coingecko_total'] += 1 | |
| try: | |
| async with self.coingecko_semaphore: | |
| # (تطبيق "الخنق" لـ CoinGecko) | |
| current_time = time.time() | |
| time_since_last = current_time - self.last_coingecko_call | |
| if time_since_last < COINGECKO_REQUEST_DELAY: | |
| wait_time = COINGECKO_REQUEST_DELAY - time_since_last | |
| await asyncio.sleep(wait_time) | |
| self.last_coingecko_call = time.time() | |
| response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout) | |
| if response.status_code == 429: # Too Many Requests | |
| wait_duration = 15.0 | |
| print(f"⚠️ [CoinGecko] خطأ 429 (Rate Limit). الانتظار {wait_duration} ثوان...") | |
| await asyncio.sleep(wait_duration) | |
| self.last_coingecko_call = time.time() | |
| response = await self.http_client.get(base_url, params=params, headers=headers, timeout=timeout) | |
| response.raise_for_status() | |
| self.session_stats['coingecko_success'] += 1 | |
| return response.json() | |
| except Exception as e: | |
| self.session_stats['coingecko_fail'] += 1 | |
| print(f"❌ [CoinGecko] فشل الطلب: {e}") | |
| return None |