Trad / whale_monitor /core.py
Riy777's picture
Update whale_monitor/core.py
b03be3d
raw
history blame
67.2 kB
# whale_monitor/core.py
# المنطق الأساسي لـ EnhancedWhaleMonitor (محدث ومعاد هيكلته)
import os
import asyncio
import httpx
import json
import traceback
import time
from datetime import datetime, timedelta
from collections import defaultdict, deque
import ccxt.pro as ccxt # ✅ ملاحظة: هذا هو ccxt.pro (غير متزامن)
import numpy as np
import logging
import ssl
from botocore.exceptions import ClientError
import base58
# --- استيراد من الملفات المحلية الجديدة ---
from .config import (
DEFAULT_WHALE_THRESHOLD_USD, TRANSFER_EVENT_SIGNATURE, NATIVE_COINS,
DEFAULT_EXCHANGE_ADDRESSES, COINGECKO_BASE_URL, COINGECKO_SYMBOL_MAPPING
)
from .rpc_manager import AdaptiveRpcManager
# تعطيل تسجيل HTTP المزعج
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
class EnhancedWhaleMonitor:
def __init__(self, contracts_db=None, r2_service=None):
print("🔄 [WhaleMonitor] بدء التهيئة...")
# 1. الخدمات الخارجية
self.r2_service = r2_service
self.data_manager = None # سيتم تعيينه لاحقاً إذا لزم الأمر
# 2. إعدادات الاتصال
self.ssl_context = ssl.create_default_context()
self.http_client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
follow_redirects=True,
verify=self.ssl_context
)
# 3. جلب مفاتيح API
self.api_keys = {
"moralis": os.getenv("MORALIS_KEY"),
"etherscan": os.getenv("ETHERSCAN_KEY"),
"infura": os.getenv("INFURA_KEY"),
"bscscan": os.getenv("BSCSCAN_KEY"),
"polygonscan": os.getenv("POLYGONSCAN_KEY"),
}
self._validate_api_keys()
# 4. تهيئة "الوكيل الذكي" (RPC Manager)
# هذا هو التغيير الجوهري
self.rpc_manager = AdaptiveRpcManager(self.http_client, self.api_keys)
# 5. تحميل الإعدادات من config.py
self.whale_threshold_usd = DEFAULT_WHALE_THRESHOLD_USD
self.supported_networks = self.rpc_manager.get_network_configs()
# 6. قواعد البيانات الداخلية و Caches
self.contracts_db = {}
self._initialize_contracts_db(contracts_db or {})
self.address_labels = {}
self.address_categories = {'exchange': set(), 'cex': set(), 'dex': set(), 'bridge': set(), 'whale': set(), 'unknown': set()}
self._initialize_comprehensive_exchange_addresses()
self.token_price_cache = {}
self.token_decimals_cache = {}
# 7. تحميل العقود من R2 (إن وجد)
if self.r2_service:
asyncio.create_task(self._load_contracts_from_r2())
print("✅ [WhaleMonitor] تم التهيئة بنجاح باستخدام مدير RPC الذكي.")
def _initialize_contracts_db(self, initial_contracts):
"""تهيئة قاعدة بيانات العقود مع تحسين تخزين الشبكات"""
print("🔄 [WhaleMonitor] تهيئة قاعدة بيانات العقود...")
for symbol, contract_data in initial_contracts.items():
symbol_lower = symbol.lower()
if isinstance(contract_data, dict) and 'address' in contract_data and 'network' in contract_data:
self.contracts_db[symbol_lower] = contract_data
elif isinstance(contract_data, str):
self.contracts_db[symbol_lower] = {
'address': contract_data,
'network': self._detect_network_from_address(contract_data)
}
print(f"✅ [WhaleMonitor] تم تحميل {len(self.contracts_db)} عقد في قاعدة البيانات")
def _detect_network_from_address(self, address):
"""اكتشاف الشبكة من عنوان العقد"""
if not isinstance(address, str): return 'ethereum'
address_lower = address.lower()
if address_lower.startswith('0x') and len(address_lower) == 42:
return 'ethereum' # افتراضي لـ EVM
elif len(address_lower) >= 32 and len(address_lower) <= 44:
base58_chars = set("123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz")
if all(char in base58_chars for char in address):
try:
base58.b58decode(address); return 'solana'
except ValueError: return 'ethereum'
else: return 'ethereum'
else: return 'ethereum'
def _validate_api_keys(self):
"""التحقق من صحة مفاتيح API"""
print("🔍 [WhaleMonitor] التحقق من صحة مفاتيح API...")
for key_name, key_value in self.api_keys.items():
if key_value and isinstance(key_value, str) and len(key_value) >= 10:
print(f" ✅ مفتاح {key_name.upper()} موجود.")
else:
print(f" ⚠️ مفتاح {key_name.upper()} غير متوفر أو غير صالح.")
self.api_keys[key_name] = None
def _initialize_comprehensive_exchange_addresses(self):
"""
تهيئة قاعدة بيانات عناوين المنصات (تعتمد الآن على config.py).
"""
for category, addresses in DEFAULT_EXCHANGE_ADDRESSES.items():
for address in addresses:
if not isinstance(address, str): continue
addr_lower = address.lower()
self.address_labels[addr_lower] = category
self.address_categories['exchange'].add(addr_lower)
if category in ['kucoin', 'binance', 'coinbase', 'kraken', 'okx', 'gate', 'solana_kucoin', 'solana_binance', 'solana_coinbase']:
self.address_categories['cex'].add(addr_lower)
elif category in ['uniswap', 'pancakeswap']:
self.address_categories['dex'].add(addr_lower)
elif 'wormhole' in category:
self.address_categories['bridge'].add(addr_lower)
print(f"✅ [WhaleMonitor] تم تهيئة {len(self.address_labels)} عنوان منصة/بروتوكول معروف.")
# --- دوال جلب البيانات المعتمدة على R2 (لا تغيير) ---
async def _load_contracts_from_r2(self):
if not self.r2_service: return
try:
key = "contracts.json"; response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
contracts_data = json.loads(response['Body'].read()); loaded_count = 0; updated_count = 0
updated_contracts_db = self.contracts_db.copy()
for symbol, contract_data in contracts_data.items():
symbol_lower = symbol.lower()
if isinstance(contract_data, dict) and 'address' in contract_data and 'network' in contract_data:
updated_contracts_db[symbol_lower] = contract_data; loaded_count += 1
elif isinstance(contract_data, str):
new_format = {'address': contract_data, 'network': self._detect_network_from_address(contract_data)}
updated_contracts_db[symbol_lower] = new_format; loaded_count += 1; updated_count +=1
self.contracts_db = updated_contracts_db
print(f"✅ [WhaleMonitor] تم تحميل {loaded_count} عقد من R2.")
if updated_count > 0: print(f" ℹ️ تم تحديث صيغة {updated_count} عقد."); await self._save_contracts_to_r2()
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey': print("⚠️ [WhaleMonitor] لم يتم العثور على قاعدة بيانات العقود في R2.")
else: print(f"❌ [WhaleMonitor] خطأ ClientError أثناء تحميل العقود من R2: {e}")
except Exception as e: print(f"❌ [WhaleMonitor] خطأ عام أثناء تحميل العقود من R2: {e}")
async def _save_contracts_to_r2(self):
if not self.r2_service: return
try:
key = "contracts.json"; contracts_to_save = {}
for symbol, data in self.contracts_db.items():
if isinstance(data, dict) and 'address' in data and 'network' in data: contracts_to_save[symbol] = data
elif isinstance(data, str): contracts_to_save[symbol] = {'address': data, 'network': self._detect_network_from_address(data)}
if not contracts_to_save: print("⚠️ [WhaleMonitor] لا توجد بيانات عقود صالحة للحفظ في R2."); return
data_json = json.dumps(contracts_to_save, indent=2).encode('utf-8')
self.r2_service.s3_client.put_object(Bucket="trading", Key=key, Body=data_json, ContentType="application/json")
print(f"✅ [WhaleMonitor] تم حفظ قاعدة بيانات العقود ({len(contracts_to_save)} إدخال) إلى R2")
except Exception as e: print(f"❌ [WhaleMonitor] فشل حفظ قاعدة البيانات العقود: {e}")
# --- المنطق الأساسي (لا تغيير) ---
async def get_symbol_whale_activity(self, symbol, contract_address=None):
try:
print(f"🔍 [WhaleMonitor] بدء مراقبة الحيتان للعملة: {symbol}")
base_symbol = symbol.split("/")[0].upper() if '/' in symbol else symbol.upper()
if base_symbol in NATIVE_COINS:
print(f"ℹ️ {symbol} عملة أصلية - لا توجد بيانات حيتان للتوكنات")
return self._create_native_coin_response(symbol)
contract_info = None; network = None
if contract_address and isinstance(contract_address, str):
network = self._detect_network_from_address(contract_address)
contract_info_from_db = await self._find_contract_address_enhanced(symbol)
if contract_info_from_db and contract_info_from_db.get('address', '').lower() != contract_address.lower():
print(f" ⚠️ تم توفير عنوان عقد ({contract_address}) يختلف عن المخزن لـ {symbol}. سيتم استخدام العنوان المُقدم.")
contract_info = {'address': contract_address, 'network': network}
elif contract_info_from_db: contract_info = contract_info_from_db
else: contract_info = {'address': contract_address, 'network': network}
else: contract_info = await self._find_contract_address_enhanced(symbol)
if not contract_info or not contract_info.get('address') or not contract_info.get('network'):
print(f"❌ لم يتم العثور على عقد أو شبكة للعملة {symbol}")
return self._create_no_contract_response(symbol)
contract_address_final = contract_info['address']
network_final = contract_info['network']
print(f"🌐 البحث في الشبكة المحددة: {network_final} للعقد: {contract_address_final}")
transfers = await self._get_targeted_transfer_data(contract_address_final, network_final)
if not transfers:
print(f"⚠️ لم يتم العثور على تحويلات للعملة {symbol} على شبكة {network_final}")
return self._create_no_transfers_response(symbol)
recent_transfers = self._filter_recent_transfers(transfers, max_minutes=120)
if not recent_transfers:
print(f"⚠️ لا توجد تحويلات حديثة للعملة {symbol}")
return self._create_no_recent_activity_response(symbol)
print(f"📊 تم جلب {len(recent_transfers)} تحويلة حديثة فريدة للعملة {symbol}")
analysis = await self._analyze_enhanced_whale_impact(recent_transfers, symbol, contract_address_final, network_final)
return analysis
except Exception as e:
print(f"❌ خطأ في مراقبة الحيتان للعملة {symbol}: {e}"); traceback.print_exc()
return self._create_error_response(symbol, str(e))
async def _get_targeted_transfer_data(self, contract_address, network):
"""
(مبسط) جلب بيانات التحويلات من الشبكة المحددة فقط.
"""
print(f"🌐 جلب بيانات التحويلات من شبكة {network} للعقد: {contract_address}")
if isinstance(contract_address, dict):
contract_address = contract_address.get(network);
if not contract_address: print(f"❌ لم يتم العثور على عنوان للشبكة {network}"); return []
if not isinstance(contract_address, str): print(f"❌ عنوان العقد غير صالح: {type(contract_address)}"); return []
if network in self.supported_networks:
net_config = self.supported_networks[network]
if net_config['type'] == 'evm':
return await self._get_evm_network_transfer_data(contract_address, network)
elif net_config['type'] == 'solana':
return await self._get_solana_transfer_data(contract_address, network)
print(f"⚠️ نوع الشبكة {network} غير مدعوم حاليًا."); return []
async def _get_evm_network_transfer_data(self, contract_address, network):
"""
(مبسط) جلب بيانات التحويلات من شبكة EVM (باستخدام RPC و Explorer).
"""
try:
transfers = []; rpc_successful = False
try:
rpc_transfers = await self._get_rpc_token_data_targeted(contract_address, network)
if rpc_transfers:
transfers.extend(rpc_transfers); rpc_successful = True
print(f"✅ RPC {network}: تم جلب {len(rpc_transfers)} تحويلة عبر eth_getLogs")
else: print(f"⚠️ RPC {network}: لم يتم العثور على تحويلات عبر eth_getLogs")
except Exception as rpc_error: print(f"❌ خطأ في جلب بيانات RPC {network}: {rpc_error}")
if not rpc_successful or len(transfers) < 5:
print(f"ℹ️ محاولة جلب بيانات إضافية من Explorer لـ {network}...")
try:
explorer_transfers = await self._get_explorer_token_data(contract_address, network)
if explorer_transfers:
existing_hashes = {f"{t.get('hash')}-{t.get('logIndex','N/A')}" for t in transfers}
new_explorer_transfers = []
for et in explorer_transfers:
explorer_key = f"{et.get('hash')}-{et.get('logIndex','N/A')}"
if explorer_key not in existing_hashes: new_explorer_transfers.append(et)
if new_explorer_transfers:
transfers.extend(new_explorer_transfers)
print(f"✅ Explorer {network}: تم إضافة {len(new_explorer_transfers)} تحويلة جديدة.")
else: print(f"⚠️ Explorer {network}: لم يتم العثور على تحويلات جديدة.")
else: print(f"⚠️ Explorer {network}: لم يتم العثور على تحويلات.")
except Exception as explorer_error: print(f"❌ خطأ في جلب بيانات Explorer {network}: {explorer_error}")
print(f"📊 إجمالي التحويلات المجمعة لـ {network}: {len(transfers)}")
final_transfers = []; seen_keys = set()
for t in transfers:
key = f"{t.get('hash')}-{t.get('logIndex','N/A')}"
if key not in seen_keys: final_transfers.append(t); seen_keys.add(key)
return final_transfers
except Exception as e: print(f"❌ خطأ عام في جلب بيانات {network}: {e}"); return []
# --- دوال الاتصال الشبكي (محدثة بالكامل) ---
async def _get_rpc_token_data_targeted(self, contract_address, network='ethereum', blocks_to_scan=500):
"""
(محدث) جلب تحويلات التوكن ERC20 عبر RPC (يستخدم الآن الوكيل الذكي).
"""
try:
if network not in self.supported_networks or self.supported_networks[network]['type'] != 'evm':
print(f"⚠️ شبكة {network} غير مدعومة أو ليست EVM لـ eth_getLogs."); return []
if not isinstance(contract_address, str) or not contract_address.startswith('0x'):
print(f"❌ عنوان العقد غير صالح لـ EVM: {contract_address}"); return []
transfers = []
# 1. جلب رقم أحدث كتلة
payload_block = {"jsonrpc": "2.0", "method": "eth_blockNumber", "params": [], "id": int(time.time())}
json_response_block = await self.rpc_manager.post(network, payload_block, timeout=15.0)
# (إصلاح خطأ 'NoneType' - يجب التحقق من None أولاً)
if json_response_block is None:
print(f" ❌ [RPC Manager] فشلت جميع محاولات جلب رقم الكتلة لشبكة {network}.")
return []
if not json_response_block.get('result'):
error_msg = json_response_block.get('error', {}).get('message', 'No block number result')
print(f" ❌ [RPC Manager] لم يتم الحصول على رقم أحدث كتلة من {network}: {error_msg}")
return []
latest_block = int(json_response_block['result'], 16)
# 2. جلب وقت أحدث كتلة (تقريبي)
payload_latest_block_time = {"jsonrpc": "2.0", "method": "eth_getBlockByNumber", "params": [hex(latest_block), False], "id": int(time.time())+1}
response_latest_time = await self.rpc_manager.post(network, payload_latest_block_time, timeout=15.0)
latest_block_timestamp_approx = int(time.time())
if response_latest_time and response_latest_time.get('result'):
latest_block_data = response_latest_time['result']
if latest_block_data and latest_block_data.get('timestamp'):
latest_block_timestamp_approx = int(latest_block_data['timestamp'], 16)
from_block = max(0, latest_block - blocks_to_scan)
print(f" 📦 [RPC Manager] فحص الكتل من {from_block} إلى {latest_block} على {network}")
# 3. جلب السجلات (Logs)
payload_logs = {
"jsonrpc": "2.0", "method": "eth_getLogs",
"params": [{"fromBlock": hex(from_block), "toBlock": hex(latest_block), "address": contract_address, "topics": [TRANSFER_EVENT_SIGNATURE]}],
"id": int(time.time())+2
}
json_response_logs = await self.rpc_manager.post(network, payload_logs, timeout=45.0)
if not json_response_logs:
print(f" ❌ [RPC Manager] فشل طلب eth_getLogs لـ {network}."); return []
logs = json_response_logs.get('result')
if logs is None:
error_msg = json_response_logs.get('error', {}).get('message', 'Invalid logs response')
print(f" ❌ [RPC Manager] استجابة السجلات غير صالحة من {network}: {error_msg}"); return []
if not logs:
print(f" ✅ [RPC Manager] لا توجد سجلات تحويل لـ {contract_address} في آخر {blocks_to_scan} كتل."); return []
print(f" 📊 [RPC Manager] تم العثور على {len(logs)} سجل تحويل محتمل.")
for log in logs:
try:
topics = log.get('topics', []); data = log.get('data', '0x')
block_num_hex = log.get('blockNumber'); tx_hash = log.get('transactionHash')
log_index_hex = log.get('logIndex', '0x0')
if len(topics) == 3 and data != '0x' and block_num_hex and tx_hash:
sender = '0x' + topics[1][26:]; receiver = '0x' + topics[2][26:]
value = str(int(data, 16)); block_num = str(int(block_num_hex, 16))
log_index = str(int(log_index_hex, 16))
transfers.append({
'hash': tx_hash, 'from': sender.lower(), 'to': receiver.lower(),
'value': value, 'timeStamp': str(latest_block_timestamp_approx),
'blockNumber': block_num, 'network': network, 'logIndex': log_index
})
except (ValueError, TypeError, KeyError, IndexError) as log_parse_error:
print(f" ⚠️ خطأ في تحليل سجل فردي: {log_parse_error} - Log: {log}")
continue
print(f" ✅ [RPC Manager] تم تحليل {len(transfers)} تحويلة بنجاح من {network}")
return transfers
except Exception as e:
print(f"❌ فشل عام في جلب بيانات RPC لـ {network}: {e}"); traceback.print_exc()
return []
async def _get_solana_transfer_data(self, token_address, network='solana', limit=50):
"""
(محدث) جلب بيانات التحويلات من شبكة Solana (يستخدم الآن الوكيل الذكي).
"""
try:
print(f" 🔍 [RPC Manager] جلب تحويلات Solana للتوكن: {token_address}")
transfers = []
# 1. جلب التوقيعات (Signatures)
payload_signatures = {
"jsonrpc": "2.0", "id": int(time.time()),
"method": "getSignaturesForAddress",
"params": [ token_address, {"limit": limit, "commitment": "confirmed"} ]
}
data_signatures = await self.rpc_manager.post(network, payload_signatures, timeout=20.0)
if not data_signatures:
print(f" ❌ [RPC Manager] فشل جلب توقيعات Solana."); return []
if 'error' in data_signatures:
print(f" ❌ خطأ RPC (getSignatures) من {network}: {data_signatures['error'].get('message', 'Unknown RPC error')}"); return []
if 'result' not in data_signatures or data_signatures['result'] is None or not data_signatures['result']:
print(f" ✅ [RPC Manager] Solana: لا توجد معاملات حديثة."); return []
signatures_data = data_signatures['result']
signatures = [tx['signature'] for tx in signatures_data]
print(f" 📊 [RPC Manager] Solana: وجد {len(signatures)} توقيع معاملة محتملة.")
# 2. جلب تفاصيل المعاملات (بشكل متوازي)
transaction_tasks = []
for signature in signatures[:20]: # حد أقصى 20 لتقليل الحمل
transaction_tasks.append(
self._get_solana_transaction_detail(signature, network)
)
transaction_details = await asyncio.gather(*transaction_tasks)
# 3. تحليل المعاملات
processed_count = 0
for detail_result in transaction_details:
if detail_result and self._is_solana_token_transfer(detail_result, token_address):
transfer = await self._parse_solana_transfer(detail_result, token_address)
if transfer:
transfers.append(transfer)
processed_count += 1
print(f" ✅ [RPC Manager] Solana: تم تحليل {processed_count} تحويلة توكن فعلية.")
return transfers
except Exception as e:
print(f"❌ فشل عام في جلب بيانات Solana: {e}"); traceback.print_exc()
return []
async def _get_solana_transaction_detail(self, signature, network='solana'):
"""
(محدث) جلب تفاصيل معاملة Solana (يستخدم الآن الوكيل الذكي).
"""
try:
payload = {
"jsonrpc": "2.0",
"id": f"gtx-{signature[:8]}-{int(time.time()*1000)}",
"method": "getTransaction",
"params": [ signature, {"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0, "commitment": "confirmed"} ]
}
data = await self.rpc_manager.post(network, payload, timeout=20.0)
if not data:
print(f" ⚠️ فشل طلب getTransaction لـ {signature[:10]}...")
return None
if 'error' in data:
print(f" ❌ خطأ RPC في getTransaction لـ {signature[:10]}... : {data['error'].get('message')}")
return None
result = data.get('result')
if result is None:
print(f" ℹ️ لم يتم العثور على نتيجة للمعاملة {signature[:10]}...")
return None
return result
except Exception as e:
print(f" ❌ استثناء غير متوقع في getTransaction لـ {signature[:10]}... : {e}")
return None
async def _get_explorer_token_data(self, contract_address, network):
"""
(محدث) جلب بيانات التحويلات من Explorer (يستخدم الآن الوكيل الذكي).
"""
try:
explorer_config = self.rpc_manager.get_explorer_config(network)
if not explorer_config or not explorer_config.get('api_key') or not explorer_config.get('api_url'):
print(f" ⚠️ لا توجد إعدادات مستكشف أو مفتاح API لشبكة {network}")
return []
config = explorer_config
address_param = "contractaddress" if network == 'ethereum' else "address"
params = {
"module": "account", "action": "tokentx",
address_param: contract_address,
"page": 1, "offset": 100, "startblock": 0,
"endblock": 999999999, "sort": "desc", "apikey": config['api_key']
}
# استخدام الوكيل الذكي لإجراء طلب GET
data = await self.rpc_manager.get(config['api_url'], params=params, timeout=20.0)
if not data:
print(f"⚠️ Explorer {network} request failed or returned empty data.")
return []
status = str(data.get('status', '0'))
message = data.get('message', '').upper()
if status == '1' and message == 'OK':
transfers = data.get('result', [])
processed_transfers = []
for tf in transfers:
if tf.get('tokenSymbol') and tf.get('contractAddress','').lower() == contract_address.lower():
tf['network'] = network
if 'from' in tf: tf['from'] = tf['from'].lower()
if 'to' in tf: tf['to'] = tf['to'].lower()
if 'logIndex' not in tf: tf['logIndex'] = 'N/A'
processed_transfers.append(tf)
print(f" ✅ Explorer {network}: تم جلب {len(processed_transfers)} تحويلة توكن للعقد المحدد.")
return processed_transfers
elif status == '0' and "NO TRANSACTIONS FOUND" in message:
print(f" ✅ Explorer {network}: لا توجد تحويلات.")
return []
else:
error_message = data.get('result', message)
print(f"⚠️ Explorer {network} returned error: Status={status}, Message={message}, Result={error_message}")
if "INVALID API KEY" in str(error_message).upper():
print(f"🚨 خطأ فادح: مفتاح API الخاص بـ {network.upper()} غير صالح!")
return []
except Exception as e:
print(f"❌ فشل جلب بيانات Explorer لـ {network}: {e}")
return []
async def _get_token_decimals(self, contract_address, network):
"""
(محدث) جلب عدد الكسور العشرية للعملة (يستخدم الآن الوكيل الذكي).
"""
try:
if isinstance(contract_address, dict):
addr_for_network = contract_address.get(network)
if not addr_for_network: return None
contract_address = addr_for_network
if not isinstance(contract_address, str): return None
cache_key = f"{contract_address.lower()}_{network}"
if cache_key in self.token_decimals_cache:
return self.token_decimals_cache[cache_key]
if network in self.supported_networks and self.supported_networks[network]['type'] == 'evm':
payload = {
"jsonrpc": "2.0", "method": "eth_call",
"params": [{"to": contract_address, "data": "0x313ce567"}, "latest"],
"id": int(time.time())
}
response_json = await self.rpc_manager.post(network, payload, timeout=10.0)
if response_json and response_json.get('result') not in [None, '0x', '']:
try:
decimals = int(response_json['result'], 16)
if isinstance(decimals, int) and decimals >= 0:
self.token_decimals_cache[cache_key] = decimals
print(f" ℹ️ تم جلب Decimals لـ {contract_address[:10]}... على {network}: {decimals}")
return decimals
except ValueError:
print(f" ⚠️ فشل تحويل نتيجة decimals من RPC ({response_json['result']}) لـ {contract_address} على {network}")
elif network == 'solana':
# (لا يزال تقديرياً، يمكن تحسينه بجلب بيانات الحساب)
estimated_decimals = 9
self.token_decimals_cache[cache_key] = estimated_decimals
print(f" ⚠️ استخدام قيمة decimals تقديرية لـ Solana: {estimated_decimals}")
return estimated_decimals
print(f"❌ فشل جلب الكسور العشرية للعقد {contract_address} على شبكة {network}.")
return None
except Exception as e:
print(f"❌ فشل عام في جلب الكسور العشرية للعقد {contract_address} على شبكة {network}: {e}")
return None
async def _get_token_price_from_coingecko(self, symbol):
"""
(محدث) جلب سعر العملة من CoinGecko (يستخدم الآن الوكيل الذكي).
"""
try:
base_symbol = symbol.split('/')[0].upper()
coingecko_id = COINGECKO_SYMBOL_MAPPING.get(base_symbol, base_symbol.lower())
url = f"{COINGECKO_BASE_URL}/simple/price"
params = {"ids": coingecko_id, "vs_currencies": "usd"}
headers = {'User-Agent': 'Mozilla/5.0', 'Accept': 'application/json'}
# استخدام الوكيل الذكي مع منظم طلبات CoinGecko
data = await self.rpc_manager.get(url, params=params, headers=headers, timeout=10.0, use_coingecko_semaphore=True)
if data and data.get(coingecko_id) and 'usd' in data[coingecko_id]:
price = data[coingecko_id]['usd']
if isinstance(price, (int, float)) and price > 0: return price
else: print(f"⚠️ استجابة CoinGecko تحتوي على سعر غير صالح لـ {symbol}: {price}"); return 0
# محاولة البحث إذا فشل المعرف الافتراضي
print(f"⚠️ لم يتم العثور على سعر لـ {coingecko_id} في CoinGecko، محاولة البحث...")
search_url = f"{COINGECKO_BASE_URL}/search"
search_params = {"query": base_symbol}
search_data = await self.rpc_manager.get(search_url, params=search_params, headers=headers, timeout=10.0, use_coingecko_semaphore=True)
if search_data and search_data.get('coins'):
coins = search_data['coins']
found_id = None
search_symbol_lower = base_symbol.lower()
for coin in coins:
if coin.get('symbol', '').lower() == search_symbol_lower: found_id = coin.get('id'); break
if not found_id: found_id = coins[0].get('id') # أخذ أول نتيجة
if found_id:
print(f" 🔄 تم العثور على معرف بديل: {found_id}. إعادة المحاولة...")
params["ids"] = found_id
data = await self.rpc_manager.get(url, params=params, headers=headers, timeout=10.0, use_coingecko_semaphore=True)
if data and data.get(found_id) and 'usd' in data[found_id]:
return data[found_id]['usd']
print(f"❌ فشل جلب السعر من CoinGecko لـ {symbol} بعد البحث.")
return 0
except Exception as e:
print(f"❌ فشل عام في _get_token_price_from_coingecko لـ {symbol}: {e}")
return 0
async def _find_contract_via_coingecko(self, symbol):
"""
(محدث) البحث عن عقد العملة عبر CoinGecko (يستخدم الآن الوكيل الذكي).
"""
try:
search_url = f"{COINGECKO_BASE_URL}/search"
params = {"query": symbol}
headers = {'User-Agent': 'Mozilla/5.0', 'Accept': 'application/json'}
# 1. البحث عن العملة
data = await self.rpc_manager.get(search_url, params=params, headers=headers, timeout=15.0, use_coingecko_semaphore=True)
if not data or not data.get('coins'):
print(f" ❌ No matching coins found for {symbol} on CoinGecko"); return None
coins = data.get('coins', [])
best_coin = None; search_symbol_lower = symbol.lower()
for coin in coins:
if coin.get('symbol', '').lower() == search_symbol_lower: best_coin = coin; break
if not best_coin: best_coin = coins[0]
coin_id = best_coin.get('id')
if not coin_id: print(f" ❌ No ID found"); return None
# 2. جلب تفاصيل العملة
print(f" 🔍 Fetching details for CoinGecko ID: {coin_id}")
detail_url = f"{COINGECKO_BASE_URL}/coins/{coin_id}"
detail_data = await self.rpc_manager.get(detail_url, params=None, headers=headers, timeout=15.0, use_coingecko_semaphore=True)
if not detail_data: print(f" ❌ Failed to fetch details for {coin_id}"); return None
platforms = detail_data.get('platforms', {})
if not platforms: print(f" ❌ No platform data found"); return None
network_priority = ['ethereum', 'binance-smart-chain', 'polygon-pos', 'arbitrum-one', 'optimistic-ethereum', 'avalanche', 'fantom', 'solana']
network_map = {'ethereum': 'ethereum', 'binance-smart-chain': 'bsc', 'polygon-pos': 'polygon', 'arbitrum-one': 'arbitrum', 'optimistic-ethereum': 'optimism', 'avalanche': 'avalanche', 'fantom': 'fantom', 'solana': 'solana'}
for platform_cg in network_priority:
address = platforms.get(platform_cg)
if address and isinstance(address, str) and address.strip():
network = network_map.get(platform_cg)
if network:
print(f" ✅ Found contract on {network}: {address}")
return address, network
print(f" ❌ No contract found on supported platforms for {coin_id}")
return None
except Exception as e:
print(f"❌ General failure in _find_contract_via_coingecko for {symbol}: {e}"); traceback.print_exc()
return None
# --- باقي الدوال (التحليل، المساعدة، ccxt) - لا تغيير جوهري لأنها لا تعتمد على RPC/HTTP ---
def _is_solana_token_transfer(self, transaction, token_address):
"""(لا تغيير) التحقق إذا كانت المعاملة تحويل توكن في Solana"""
try:
if not transaction or 'meta' not in transaction or transaction.get('meta') is None: return False
meta = transaction['meta']; post_balances = meta.get('postTokenBalances'); pre_balances = meta.get('preTokenBalances')
if post_balances is None and pre_balances is None:
inner_instructions = meta.get('innerInstructions', [])
for inner_inst_set in inner_instructions:
for inst in inner_inst_set.get('instructions', []):
parsed = inst.get('parsed')
if isinstance(parsed, dict) and parsed.get('type') in ['transfer', 'transferChecked'] and parsed.get('info', {}).get('mint') == token_address: return True
return False
all_balances = (post_balances or []) + (pre_balances or [])
for balance in all_balances:
if isinstance(balance, dict) and balance.get('mint') == token_address:
pre_amount = next((pb.get('uiTokenAmount',{}).get('amount','0') for pb in (pre_balances or []) if isinstance(pb, dict) and pb.get('owner') == balance.get('owner') and pb.get('mint') == token_address), '0')
post_amount = balance.get('uiTokenAmount',{}).get('amount','0')
try:
if int(post_amount) != int(pre_amount): return True
except (ValueError, TypeError): pass
return False
except Exception as e: return False
async def _parse_solana_transfer(self, transaction, token_address):
"""(محدث مع إصلاح SyntaxError) تحليل معاملة Solana لاستخراج بيانات التحويل"""
try:
if not transaction or 'transaction' not in transaction or not transaction['transaction'].get('signatures'): return None
signature = transaction['transaction']['signatures'][0]; block_time = transaction.get('blockTime')
timestamp_to_use = str(block_time) if block_time else str(int(time.time())); slot = transaction.get('slot', 0)
sender = None; receiver = None; amount_raw = 0; meta = transaction.get('meta')
if not meta: return None
pre_balances = {bal.get('owner'): bal for bal in meta.get('preTokenBalances', []) if isinstance(bal, dict) and bal.get('mint') == token_address}
post_balances = {bal.get('owner'): bal for bal in meta.get('postTokenBalances', []) if isinstance(bal, dict) and bal.get('mint') == token_address}
all_owners = set(list(pre_balances.keys()) + list(post_balances.keys()))
for owner in all_owners:
pre_bal_data = pre_balances.get(owner, {}).get('uiTokenAmount', {}); post_bal_data = post_balances.get(owner, {}).get('uiTokenAmount', {})
pre_amount_str = pre_bal_data.get('amount', '0'); post_amount_str = post_bal_data.get('amount', '0')
try:
pre_amount_raw = int(pre_amount_str); post_amount_raw = int(post_amount_str); diff_raw = post_amount_raw - pre_amount_raw
if diff_raw > 0: receiver = owner; amount_raw = diff_raw
elif diff_raw < 0: sender = owner
except (ValueError, TypeError):
pre_ui = pre_bal_data.get('uiAmount', 0.0) or 0.0; post_ui = post_bal_data.get('uiAmount', 0.0) or 0.0; diff_ui = post_ui - pre_ui
if diff_ui > 1e-9:
receiver = owner; decimals = await self._get_token_decimals(token_address, 'solana')
# (السطر الذي تم إصلاحه)
if decimals is not None:
amount_raw = int(diff_ui * (10**decimals))
else:
amount_raw = 0
elif diff_ui < -1e-9: sender = owner
if not sender or not receiver or amount_raw == 0:
inner_instructions = meta.get('innerInstructions', [])
for inner_inst_set in inner_instructions:
for inst in inner_inst_set.get('instructions', []):
parsed = inst.get('parsed')
if isinstance(parsed, dict) and parsed.get('type') in ['transfer', 'transferChecked'] and parsed.get('info', {}).get('mint') == token_address:
info = parsed.get('info', {}); sender_inner = info.get('source') or info.get('authority'); receiver_inner = info.get('destination')
amount_inner_str = info.get('tokenAmount', {}).get('amount') or info.get('amount')
try:
amount_inner_raw = int(amount_inner_str)
if sender_inner and receiver_inner and amount_inner_raw > 0:
sender = sender_inner; receiver = receiver_inner; amount_raw = amount_inner_raw; break
except (ValueError, TypeError): continue
if sender and receiver and amount_raw > 0: break
if sender and receiver and amount_raw > 0: break
if sender and receiver and amount_raw > 0:
return {'hash': signature, 'from': sender, 'to': receiver, 'value': str(amount_raw), 'timeStamp': timestamp_to_use, 'blockNumber': str(slot), 'network': 'solana', 'type': 'solana_transfer', 'logIndex': '0'}
else: return None
except Exception as e:
sig_short = signature[:10] if 'signature' in locals() and signature else "N/A"; print(f"❌ خطأ في تحليل معاملة Solana ({sig_short}...): {e}"); traceback.print_exc(); return None
def _filter_recent_transfers(self, transfers, max_minutes=120):
"""(لا تغيير) ترشيح التحويلات الحديثة فقط"""
recent_transfers = []; current_time_dt = datetime.now()
cutoff_timestamp = int((current_time_dt - timedelta(minutes=max_minutes)).timestamp())
processed_keys = set()
for transfer in transfers:
try:
tx_hash = transfer.get('hash', 'N/A'); log_index = transfer.get('logIndex', 'N/A')
unique_key = f"{tx_hash}-{log_index}";
if unique_key in processed_keys: continue
timestamp_str = transfer.get('timeStamp')
if not timestamp_str or not timestamp_str.isdigit(): continue
timestamp = int(timestamp_str)
if timestamp >= cutoff_timestamp:
transfer_time_dt = datetime.fromtimestamp(timestamp); time_diff = current_time_dt - transfer_time_dt
transfer['minutes_ago'] = round(time_diff.total_seconds() / 60, 2)
transfer['human_time'] = transfer_time_dt.isoformat()
recent_transfers.append(transfer); processed_keys.add(unique_key)
except (ValueError, TypeError, KeyError) as e: print(f" ⚠️ خطأ في ترشيح تحويلة: {e} - البيانات: {transfer}"); continue
print(f" 🔍 تم ترشيح {len(recent_transfers)} تحويلة حديثة فريدة (آخر {max_minutes} دقيقة).")
recent_transfers.sort(key=lambda x: int(x.get('timeStamp', 0)), reverse=True); return recent_transfers
async def _analyze_enhanced_whale_impact(self, transfers, symbol, contract_address, network):
"""(لا تغيير) تحليل تأثير تحركات الحيتان المحسن"""
print(f"📊 تحليل {len(transfers)} تحويلة حديثة للعملة {symbol}")
exchange_flows = {'to_exchanges': [], 'from_exchanges': [], 'other_transfers': [], 'network_breakdown': defaultdict(lambda: {'to_exchanges': 0, 'from_exchanges': 0, 'other': 0})}
total_volume_usd = 0.0; whale_transfers_count = 0; network_stats = defaultdict(int); unique_transfers = {}
decimals = await self._get_token_decimals(contract_address, network)
if decimals is None: print(f"❌ لا يمكن تحليل الحيتان بدون decimals لـ {symbol} على {network}."); return self._create_error_response(symbol, f"Failed to get decimals on {network}")
for transfer in transfers:
try:
unique_key = f"{transfer.get('hash', '')}-{transfer.get('logIndex', 'N/A')}"
if unique_key in unique_transfers: continue
unique_transfers[unique_key] = True
from_addr = str(transfer.get('from', '')).lower(); to_addr = str(transfer.get('to', '')).lower()
raw_value_str = transfer.get('value', '0'); transfer_network = transfer.get('network', 'unknown')
if not raw_value_str or not raw_value_str.isdigit(): continue
raw_value = int(raw_value_str);
if raw_value == 0: continue
value_usd = await self._get_accurate_token_value_optimized(raw_value, decimals, symbol)
if value_usd is None or value_usd < 0: print(f" ⚠️ قيمة USD غير صالحة ({value_usd}) للتحويلة {unique_key}. التخطي."); continue
total_volume_usd += value_usd; network_stats[transfer_network] += 1
if value_usd >= self.whale_threshold_usd:
whale_transfers_count += 1
is_to_exchange = to_addr in self.address_categories['exchange']
is_from_exchange = from_addr in self.address_categories['exchange']
if is_to_exchange:
exchange_flows['to_exchanges'].append({'value_usd': value_usd, 'from_type': self._classify_address(from_addr), 'to_exchange': self._classify_address(to_addr), 'transaction': {'hash': transfer.get('hash'), 'minutes_ago': transfer.get('minutes_ago')}, 'network': transfer_network})
exchange_flows['network_breakdown'][transfer_network]['to_exchanges'] += 1
elif is_from_exchange:
exchange_flows['from_exchanges'].append({'value_usd': value_usd, 'from_exchange': self._classify_address(from_addr), 'to_type': self._classify_address(to_addr), 'transaction': {'hash': transfer.get('hash'), 'minutes_ago': transfer.get('minutes_ago')}, 'network': transfer_network})
exchange_flows['network_breakdown'][transfer_network]['from_exchanges'] += 1
else:
exchange_flows['other_transfers'].append({'value_usd': value_usd, 'from_type': self._classify_address(from_addr), 'to_type': self._classify_address(to_addr), 'transaction': {'hash': transfer.get('hash'), 'minutes_ago': transfer.get('minutes_ago')}, 'network': transfer_network})
exchange_flows['network_breakdown'][transfer_network]['other'] += 1
except Exception as analysis_loop_error: print(f" ⚠️ خطأ في تحليل حلقة التحويلات: {analysis_loop_error}"); continue
total_to_exchanges_usd = sum(t['value_usd'] for t in exchange_flows['to_exchanges']); total_from_exchanges_usd = sum(t['value_usd'] for t in exchange_flows['from_exchanges'])
deposit_count = len(exchange_flows['to_exchanges']); withdrawal_count = len(exchange_flows['from_exchanges']); net_flow_usd = total_to_exchanges_usd - total_from_exchanges_usd
exchange_flows['deposit_count'] = deposit_count; exchange_flows['withdrawal_count'] = withdrawal_count; exchange_flows['net_flow_usd'] = net_flow_usd
print(f"🐋 تحليل الحيتان لـ {symbol}:"); print(f" • إجمالي التحويلات الفريدة: {len(unique_transfers)}"); print(f" • تحويلات الحيتان (>${self.whale_threshold_usd:,.0f}): {whale_transfers_count}"); print(f" • إيداعات للمنصات: {deposit_count} (${total_to_exchanges_usd:,.2f})"); print(f" • سحوبات من المنصات: {withdrawal_count} (${total_from_exchanges_usd:,.2f})"); print(f" • صافي التدفق (للمنصات): ${net_flow_usd:,.2f}")
signal = self._generate_enhanced_trading_signal(total_to_exchanges_usd, total_from_exchanges_usd, net_flow_usd, deposit_count, withdrawal_count, whale_transfers_count, network_stats)
llm_summary = self._create_enhanced_llm_summary(signal, exchange_flows, network_stats, total_volume_usd, whale_transfers_count)
return {'symbol': symbol, 'data_available': True, 'analysis_timestamp': datetime.now().isoformat(), 'summary': {'total_transfers_analyzed': len(unique_transfers), 'whale_transfers_count': whale_transfers_count, 'total_volume_usd': total_volume_usd, 'time_window_minutes': 120, 'networks_analyzed': dict(network_stats)}, 'exchange_flows': {'to_exchanges_usd': total_to_exchanges_usd, 'from_exchanges_usd': total_from_exchanges_usd, 'net_flow_usd': net_flow_usd, 'deposit_count': deposit_count, 'withdrawal_count': withdrawal_count, 'network_breakdown': dict(exchange_flows['network_breakdown']), 'top_deposits': sorted([{'value_usd': t['value_usd'], 'network': t['network'], 'to': t['to_exchange']} for t in exchange_flows['to_exchanges']], key=lambda x: x['value_usd'], reverse=True)[:3], 'top_withdrawals': sorted([{'value_usd': t['value_usd'], 'network': t['network'], 'from': t['from_exchange']} for t in exchange_flows['from_exchanges']], key=lambda x: x['value_usd'], reverse=True)[:3]}, 'trading_signal': signal, 'llm_friendly_summary': llm_summary}
async def _get_accurate_token_value_optimized(self, raw_value, decimals, symbol):
"""(لا تغيير) حساب القيمة بالدولار"""
try:
if decimals is None or decimals < 0: print(f"⚠️ قيمة decimals غير صالحة ({decimals}) لـ {symbol}."); return 0
price = await self._get_token_price(symbol)
if price is None or price <= 0: return 0
token_amount = raw_value / (10 ** decimals); value_usd = token_amount * price
return value_usd
except OverflowError: print(f"⚠️ خطأ OverflowError أثناء حساب قيمة {symbol} (Raw={raw_value}, Decimals={decimals})"); return 0
except Exception as e: print(f"❌ خطأ في _get_accurate_token_value_optimized لـ {symbol}: {e}"); return 0
async def _get_token_price(self, symbol):
"""
(محدث) جلب سعر العملة (KuCoin أولاً، ثم CoinGecko كاحتياطي).
"""
try:
base_symbol = symbol.split('/')[0].upper()
cache_entry = self.token_price_cache.get(base_symbol)
if cache_entry and time.time() - cache_entry.get('timestamp', 0) < 300:
return cache_entry['price']
# 🔴 --- START OF CHANGE --- 🔴
# (تغيير الأولوية حسب طلبك)
# 1. جرب KuCoin أولاً (سريع وله حدود أعلى)
price = await self._get_token_price_from_kucoin(symbol)
if price is not None and price > 0:
self.token_price_cache[base_symbol] = {'price': price, 'timestamp': time.time()}; return price
# 2. جرب CoinGecko كاحتياطي (أبطأ وله حدود صارمة)
price = await self._get_token_price_from_coingecko(symbol)
if price is not None and price > 0:
self.token_price_cache[base_symbol] = {'price': price, 'timestamp': time.time()}; return price
# 🔴 --- END OF CHANGE --- 🔴
print(f"❌ فشل جميع محاولات جلب سعر العملة {symbol}"); self.token_price_cache[base_symbol] = {'price': 0, 'timestamp': time.time()}; return 0
except Exception as e: print(f"❌ فشل عام في جلب سعر العملة {symbol}: {e}"); return 0
async def _get_token_price_from_kucoin(self, symbol):
"""(لا تغيير) جلب سعر العملة من KuCoin (يستخدم ccxt)"""
exchange = None
try:
# (نستخدم اتصالاً مؤقتاً هنا بدلاً من الاعتماد على self.kucoin_rest لتجنب التعارض)
exchange = ccxtasync.kucoin({'enableRateLimit': True}); markets = await exchange.load_markets()
if symbol not in markets:
# print(f"⚠️ الرمز {symbol} غير موجود في أسواق KuCoin.")
await exchange.close(); return 0
ticker = await exchange.fetch_ticker(symbol); price = ticker.get('last')
if price is not None:
try:
price_float = float(price)
if price_float > 0: return price_float
else:
# print(f"⚠️ KuCoin أعاد سعراً غير موجب لـ {symbol}: {price}")
return 0
except (ValueError, TypeError):
# print(f"⚠️ KuCoin أعاد سعراً غير رقمي لـ {symbol}: {price}")
return 0
else:
# print(f"⚠️ KuCoin لم يُعد مفتاح 'last' في ticker لـ {symbol}")
return 0
except ccxt.NetworkError as e:
# print(f"❌ خطأ شبكة أثناء جلب السعر من KuCoin لـ {symbol}: {e}")
return 0
except ccxt.ExchangeError as e:
# print(f"❌ خطأ منصة KuCoin ({type(e).__name__}) لـ {symbol}: {e}")
return 0
except Exception as e:
# print(f"❌ خطأ غير متوقع أثناء جلب السعر من KuCoin لـ {symbol}: {e}")
return 0
finally:
if exchange:
try: await exchange.close()
except Exception: pass
def _generate_enhanced_trading_signal(self, to_exchanges_usd, from_exchanges_usd, net_flow_usd, deposit_count, withdrawal_count, whale_transfers_count, network_stats):
"""(لا تغيير) توليد إشارة تداول محسنة"""
network_strength = min(len(network_stats) / 3.0, 1.0); abs_net_flow = abs(net_flow_usd); total_flow_volume = to_exchanges_usd + from_exchanges_usd
confidence_multiplier = min( (abs_net_flow / 500000.0) + (whale_transfers_count / 5.0) , 1.5); base_confidence = 0.5 + (network_strength * 0.1)
action = 'HOLD'; reason = f'نشاط حيتان عبر {len(network_stats)} شبكة: {whale_transfers_count} تحويلة كبيرة بقيمة إجمالية ${total_flow_volume:,.0f}. صافي التدفق ${net_flow_usd:,.0f}'; critical_alert = False
if net_flow_usd > 500000 and deposit_count >= 2:
action = 'STRONG_SELL'; confidence = min(0.7 + (base_confidence * confidence_multiplier * 0.3), 0.95); reason = f'ضغط بيعي قوي عبر {len(network_stats)} شبكة: ${net_flow_usd:,.0f} إيداع للمنصات ({deposit_count} تحويلة).'; critical_alert = abs_net_flow > 1000000
elif net_flow_usd > 150000 and (deposit_count >= 1 or whale_transfers_count > 0):
action = 'SELL'; confidence = min(0.6 + (base_confidence * confidence_multiplier * 0.2), 0.85); reason = f'ضغط بيعي محتمل عبر {len(network_stats)} شبكة: ${net_flow_usd:,.0f} إيداع للمنصات.'; critical_alert = abs_net_flow > 750000
elif net_flow_usd < -500000 and withdrawal_count >= 2:
action = 'STRONG_BUY'; confidence = min(0.7 + (base_confidence * confidence_multiplier * 0.3), 0.95); reason = f'تراكم شرائي قوي عبر {len(network_stats)} شبكة: ${abs_net_flow:,.0f} سحب من المنصات ({withdrawal_count} تحويلة).'; critical_alert = abs_net_flow > 1000000
elif net_flow_usd < -150000 and (withdrawal_count >= 1 or whale_transfers_count > 0):
action = 'BUY'; confidence = min(0.6 + (base_confidence * confidence_multiplier * 0.2), 0.85); reason = f'تراكم شرائي محتمل عبر {len(network_stats)} شبكة: ${abs_net_flow:,.0f} سحب من المنصات.'; critical_alert = abs_net_flow > 750000
else:
if whale_transfers_count > 0 and abs_net_flow < 100000: confidence = min(base_confidence * 1.1, 0.6); reason += " (صافي التدفق منخفض نسبياً)"
elif whale_transfers_count > 0 and abs_net_flow < 50000: confidence = max(0.4, base_confidence * 0.9); reason += " (صافي التدفق شبه متعادل)"
elif whale_transfers_count == 0: confidence = 0.3; reason = 'لا توجد تحركات حيتان كبيرة في الساعات الأخيرة'
else: confidence = base_confidence
final_confidence = max(0.3, min(confidence, 0.95))
return {'action': action, 'confidence': final_confidence, 'reason': reason, 'critical_alert': critical_alert}
def _create_enhanced_llm_summary(self, signal, exchange_flows, network_stats, total_volume, whale_count):
"""(لا تغيير) إنشاء ملخص محسن للنموذج الضخم"""
deposit_count_val = exchange_flows.get('deposit_count', 0); withdrawal_count_val = exchange_flows.get('withdrawal_count', 0)
exchange_involvement_str = f"{deposit_count_val + withdrawal_count_val} معاملة مع المنصات"
return {'whale_activity_summary': signal['reason'], 'recommended_action': signal['action'], 'confidence': signal['confidence'], 'key_metrics': {'total_whale_transfers': whale_count, 'total_volume_analyzed': f"${total_volume:,.0f}", 'net_flow_direction': 'TO_EXCHANGES' if signal['action'] in ['SELL', 'STRONG_SELL'] else 'FROM_EXCHANGES' if signal['action'] in ['BUY', 'STRONG_BUY'] else 'BALANCED', 'whale_movement_impact': 'HIGH' if signal['confidence'] > 0.8 else 'MEDIUM' if signal['confidence'] > 0.6 else 'LOW', 'exchange_involvement': exchange_involvement_str, 'network_coverage': f"{len(network_stats)} شبكات", 'data_quality': 'REAL_TIME'}}
async def _find_contract_address_enhanced(self, symbol):
"""(محدث) بحث متقدم عن عقد العملة (يستخدم الآن دالة محدثة)"""
base_symbol = symbol.split("/")[0].upper() if '/' in symbol else symbol.upper(); symbol_lower = base_symbol.lower()
print(f"🔍 [WhaleMonitor] البحث عن عقد للعملة: {symbol}")
if symbol_lower in self.contracts_db:
contract_info = self.contracts_db[symbol_lower]
if isinstance(contract_info, str):
network = self._detect_network_from_address(contract_info)
contract_info = {'address': contract_info, 'network': network}; self.contracts_db[symbol_lower] = contract_info
if 'address' in contract_info and 'network' in contract_info: print(f" ✅ وجد في قاعدة البيانات المحلية: {contract_info}"); return contract_info
else: print(f" ⚠️ بيانات العقد غير مكتملة في القاعدة المحلية لـ {symbol}: {contract_info}")
print(f" 🔍 البحث في CoinGecko عن {base_symbol}...")
coingecko_result = await self._find_contract_via_coingecko(base_symbol)
if coingecko_result:
address, network = coingecko_result; contract_info = {'address': address, 'network': network}
self.contracts_db[symbol_lower] = contract_info
print(f" ✅ تم العثور على عقد {symbol} عبر CoinGecko على شبكة {network}: {address}")
if self.r2_service: await self._save_contracts_to_r2()
return contract_info
print(f" ❌ فشل العثور على عقد لـ {symbol} في جميع المصادر"); return None
def _is_exchange_address(self, address):
"""(لا تغيير) التحقق إذا كان العنوان ينتمي إلى منصة"""
try:
if not isinstance(address, str): return False
return address.lower() in self.address_categories['exchange']
except Exception: return False
def _classify_address(self, address):
"""(لا تغيير) تصنيف العنوان إلى نوع محدد"""
try:
if not isinstance(address, str): return 'unknown'
return self.address_labels.get(address.lower(), 'unknown')
except Exception: return 'unknown'
def _create_native_coin_response(self, symbol):
"""(لا تغيير) إنشاء رد للعملات الأصلية"""
return {'symbol': symbol, 'data_available': False, 'error': 'NATIVE_COIN_NO_TOKEN', 'trading_signal': {'action': 'HOLD', 'confidence': 0.3, 'reason': f'{symbol} عملة أصلية - لا توجد بيانات حيتان للتوكنات'}, 'llm_friendly_summary': {'whale_activity_summary': f'{symbol} عملة أصلية - نظام مراقبة الحيتان الحالي مصمم للتوكنات فقط', 'recommended_action': 'HOLD', 'confidence': 0.3, 'key_metrics': {'whale_movement_impact': 'NOT_APPLICABLE'}}}
def _create_no_contract_response(self, symbol):
"""(لا تغيير) إنشاء رد عند عدم العثور على عقد"""
return {'symbol': symbol, 'data_available': False, 'error': 'NO_CONTRACT_FOUND', 'trading_signal': {'action': 'HOLD', 'confidence': 0.3, 'reason': 'لم يتم العثور على عقد العملة - لا توجد بيانات حيتان'}, 'llm_friendly_summary': {'whale_activity_summary': 'لا توجد بيانات عن تحركات الحيتان - لم يتم العثور على عقد العملة', 'recommended_action': 'HOLD', 'confidence': 0.3, 'key_metrics': {'whale_movement_impact': 'NO_DATA'}}}
def _create_no_transfers_response(self, symbol):
"""(لا تغيير) إنشاء رد عند عدم العثور على تحويلات"""
return {'symbol': symbol, 'data_available': False, 'error': 'NO_TRANSFERS_FOUND', 'trading_signal': {'action': 'HOLD', 'confidence': 0.3, 'reason': 'لا توجد تحويلات للعملة - لا توجد بيانات حيتان'}, 'llm_friendly_summary': {'whale_activity_summary': 'لا توجد تحويلات حديثة للعملة - لا توجد بيانات حيتان', 'recommended_action': 'HOLD', 'confidence': 0.3, 'key_metrics': {'whale_movement_impact': 'NO_DATA'}}}
def _create_no_recent_activity_response(self, symbol):
"""(لا تغيير) إنشاء رد عند عدم وجود نشاط حديث"""
return {'symbol': symbol, 'data_available': False, 'error': 'NO_RECENT_ACTIVITY', 'trading_signal': {'action': 'HOLD', 'confidence': 0.3, 'reason': 'لا توجد تحويلات حديثة للعملة - لا توجد بيانات حيتان'}, 'llm_friendly_summary': {'whale_activity_summary': 'لا توجد تحويلات حيتان حديثة في آخر 120 دقيقة - لا توجد بيانات حيتان', 'recommended_action': 'HOLD', 'confidence': 0.3, 'key_metrics': {'whale_movement_impact': 'NO_DATA'}}}
def _create_error_response(self, symbol, error_msg):
"""(لا تغيير) إنشاء رد في حالة حدوث خطأ"""
return {'symbol': symbol, 'data_available': False, 'error': f'ANALYSIS_ERROR: {error_msg}', 'trading_signal': {'action': 'HOLD', 'confidence': 0.3, 'reason': f'خطأ في تحليل الحيتان: {error_msg} - لا توجد بيانات حيتان'}, 'llm_friendly_summary': {'whale_activity_summary': f'فشل في تحليل تحركات الحيتان: {error_msg} - لا توجد بيانات حيتان', 'recommended_action': 'HOLD', 'confidence': 0.3, 'key_metrics': {'whale_movement_impact': 'ERROR'}}}
async def generate_whale_trading_signal(self, symbol, whale_data, market_context):
"""(لا تغيير) توليد إشارة تداول بناءً على بيانات الحيتان"""
try:
if not whale_data or not whale_data.get('data_available', False):
return {'action': 'HOLD', 'confidence': 0.3, 'reason': 'لا توجد بيانات كافية عن نشاط الحيتان', 'source': 'whale_analysis', 'data_available': False}
whale_signal = whale_data.get('trading_signal', {}); analysis_details = whale_data
return {'action': whale_signal.get('action', 'HOLD'), 'confidence': whale_signal.get('confidence', 0.3), 'reason': whale_signal.get('reason', 'تحليل الحيتان غير متوفر'), 'source': 'whale_analysis', 'critical_alert': whale_signal.get('critical_alert', False), 'data_available': True, 'whale_analysis_details': analysis_details}
except Exception as e:
print(f"❌ خطأ في توليد إشارة تداول الحيتان لـ {symbol}: {e}")
return {'action': 'HOLD', 'confidence': 0.3, 'reason': f'خطأ في تحليل الحيتان: {str(e)} - لا توجد بيانات حيتان', 'source': 'whale_analysis', 'data_available': False}
async def cleanup(self):
"""(لا تغيير) تنظيف الموارد عند الإغلاق"""
if hasattr(self, 'http_client') and self.http_client and not self.http_client.is_closed:
try:
await self.http_client.aclose()
print("✅ تم إغلاق اتصال HTTP client لمراقب الحيتان.")
except Exception as e:
print(f"⚠️ خطأ أثناء إغلاق HTTP client لمراقب الحيتان: {e}")