Trad / data_manager.py
Riy777's picture
Update data_manager.py
87e3669
raw
history blame
55.1 kB
import os, asyncio, httpx, json, traceback, backoff, re, time
from datetime import datetime, timedelta
from functools import wraps
import ccxt.pro as ccxt
from ccxt.base.errors import RateLimitExceeded, DDoSProtection, NetworkError
import pandas as pd
import numpy as np
from state import MARKET_STATE_OK
# --- 🐋 نظام تتبع الحيتان المحسن مع الوقت الدقيق ---
class EnhancedWhaleMonitor:
def __init__(self, contracts_db=None):
self.http_client = httpx.AsyncClient(timeout=10.0, limits=httpx.Limits(max_connections=100, max_keepalive_connections=20))
self.moralis_key = os.getenv("MORALIS_KEY")
self.etherscan_key = os.getenv("ETHERSCAN_KEY")
self.whale_threshold_usd = 100000
self.contracts_db = contracts_db or {}
self.rpc_endpoints = {
'ethereum': 'https://eth.llamarpc.com',
'bsc': 'https://bsc-dataseed.binance.org/',
'polygon': 'https://polygon-rpc.com/',
'arbitrum': 'https://arb1.arbitrum.io/rpc',
'avalanche': 'https://api.avax.network/ext/bc/C/rpc',
}
self.price_cache = {}
self.last_scan_time = {}
async def _get_native_coin_price(self, network):
"""جلب الأسعار بشكل فوري مع تخزين مؤقت"""
now = time.time()
if network in self.price_cache and (now - self.price_cache[network]['timestamp']) < 300:
return self.price_cache[network]['price']
coin_map = {
'ethereum': 'ethereum', 'bsc': 'binancecoin', 'polygon': 'matic-network',
'arbitrum': 'arbitrum', 'avalanche': 'avalanche-2'
}
coin_id = coin_map.get(network)
if not coin_id: return 0
try:
url = f"https://api.coingecko.com/api/v3/simple/price?ids={coin_id}&vs_currencies=usd"
response = await self.http_client.get(url)
response.raise_for_status()
price = response.json().get(coin_id, {}).get('usd', 0)
self.price_cache[network] = {'price': price, 'timestamp': now}
return price
except Exception as e:
print(f"⚠️ فشل جلب سعر {network}: {e}")
return self.price_cache.get(network, {}).get('price', 0)
async def _call_rpc_async(self, network, method, params=[]):
"""اتصال RPC غير متزامن"""
try:
payload = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1}
response = await self.http_client.post(self.rpc_endpoints[network], json=payload)
response.raise_for_status()
return response.json().get('result')
except Exception as e:
print(f"⚠️ فشل اتصال RPC لـ {network}: {e}")
return None
async def _scan_single_evm_network(self, network):
"""مسح شبكة EVM واحدة بشكل غير متزامن"""
whale_alerts = []
try:
price_usd = await self._get_native_coin_price(network)
if price_usd == 0: return []
latest_block_hex = await self._call_rpc_async(network, 'eth_blockNumber')
if not latest_block_hex: return []
latest_block = int(latest_block_hex, 16)
# 🔄 زيادة عدد الكلات من 10 إلى 50 لتغطية 10-15 دقيقة
for block_offset in range(50):
block_number = latest_block - block_offset
if block_number < 0:
break
block_data = await self._call_rpc_async(network, 'eth_getBlockByNumber', [hex(block_number), True])
if not block_data or 'transactions' not in block_data:
continue
# 🕒 استخراج وقت الكتلة
block_timestamp_hex = block_data.get('timestamp', '0x0')
block_timestamp = int(block_timestamp_hex, 16)
block_time = datetime.fromtimestamp(block_timestamp)
time_ago = datetime.now() - block_time
for tx in block_data.get('transactions', []):
value_wei = int(tx.get('value', '0x0'), 16)
if value_wei > 0:
value_native = value_wei / 1e18
value_usd = value_native * price_usd
if value_usd >= self.whale_threshold_usd:
whale_alerts.append({
'network': network,
'value_usd': value_usd,
'from': tx.get('from'),
'to': tx.get('to'),
'hash': tx.get('hash'),
'block_number': block_number,
'timestamp': block_timestamp,
'human_time': block_time.isoformat(),
'minutes_ago': time_ago.total_seconds() / 60,
'transaction_type': 'native_transfer'
})
except Exception as e:
print(f"⚠️ خطأ في مسح شبكة {network}: {e}")
return whale_alerts
async def get_general_whale_activity(self):
"""الوظيفة الرئيسية لمراقبة الحيتان العامة عبر RPC"""
print("🌊 بدء مراقبة الحيتان عبر الشبكات المتعددة...")
tasks = [self._scan_single_evm_network(net) for net in self.rpc_endpoints.keys()]
results = await asyncio.gather(*tasks, return_exceptions=True)
all_alerts = []
for res in results:
if isinstance(res, list):
all_alerts.extend(res)
# 🕒 ترتيب التنبيهات من الأحدث إلى الأقدم
all_alerts.sort(key=lambda x: x['timestamp'], reverse=True)
total_volume = sum(alert['value_usd'] for alert in all_alerts)
alert_count = len(all_alerts)
if not all_alerts:
return {
'data_available': False,
'description': 'لم يتم اكتشاف نشاط حيتان كبير على الشبكات المراقبة',
'critical_alert': False,
'sentiment': 'NEUTRAL',
'total_volume_usd': 0,
'transaction_count': 0
}
# 🕒 أحدث معاملة
latest_alert = all_alerts[0] if all_alerts else None
latest_time_info = f"آخر نشاط منذ {latest_alert['minutes_ago']:.1f} دقيقة" if latest_alert else ""
sentiment = 'BULLISH' if total_volume > 5_000_000 else 'SLIGHTLY_BULLISH' if total_volume > 1_000_000 else 'NEUTRAL'
critical = total_volume > 10_000_000 or any(tx['value_usd'] > 5_000_000 for tx in all_alerts)
description = f"تم اكتشاف {alert_count} معاملة حوت بإجمالي ${total_volume:,.0f} عبر {len(self.rpc_endpoints)} شبكات. {latest_time_info}"
print(f"✅ {description}")
return {
'data_available': True,
'description': description,
'critical_alert': critical,
'sentiment': sentiment,
'total_volume_usd': total_volume,
'transaction_count': alert_count,
'recent_alerts': all_alerts[:10], # آخر 10 تنبيهات
'latest_activity': latest_alert['human_time'] if latest_alert else None,
'time_analysis': {
'oldest_minutes': max(alert['minutes_ago'] for alert in all_alerts) if all_alerts else 0,
'newest_minutes': min(alert['minutes_ago'] for alert in all_alerts) if all_alerts else 0,
'average_minutes': sum(alert['minutes_ago'] for alert in all_alerts) / len(all_alerts) if all_alerts else 0
}
}
async def get_symbol_specific_whale_data(self, symbol, contract_address=None):
"""جلب بيانات الحيتان الخاصة بعملة محددة مع البحث المتعدد والوقت الدقيق"""
base_symbol = symbol.split("/")[0] if '/' in symbol else symbol
# إذا لم يتم توفير عنوان العقد، ابحث عنه
if not contract_address:
contract_address = await self._find_contract_address(base_symbol)
if not contract_address:
return await self._scan_networks_for_symbol(symbol, base_symbol)
try:
print(f"🔍 جلب بيانات الحيتان لـ {symbol} مع العقد: {contract_address[:10]}...")
# الحصول على بيانات المصادر الأساسية
api_data = await self._get_combined_api_data(contract_address)
if api_data:
# 🕒 إضافة معلومات الوقت للبيانات من API
enriched_data = await self._enrich_api_data_with_timing(api_data)
return self._analyze_symbol_specific_data(enriched_data, symbol)
else:
# إذا فشلت المصادر الأساسية، جرب الشبكات المباشرة
return await self._scan_networks_for_symbol(symbol, base_symbol)
except Exception as e:
print(f"❌ فشل جلب بيانات الحيتان لـ {symbol}: {e}")
return await self._scan_networks_for_symbol(symbol, base_symbol)
async def _get_combined_api_data(self, contract_address):
"""جلب البيانات المجمعة من مصادر API"""
tasks = []
if self.moralis_key:
tasks.append(self._get_moralis_token_data(contract_address))
if self.etherscan_key:
tasks.append(self._get_etherscan_token_data(contract_address))
if not tasks:
return []
results = await asyncio.gather(*tasks, return_exceptions=True)
all_transfers = []
for res in results:
if isinstance(res, list):
all_transfers.extend(res)
return all_transfers
async def _enrich_api_data_with_timing(self, api_data):
"""إثراء بيانات API بمعلومات التوقيت"""
enriched_data = []
current_time = datetime.now()
for transfer in api_data:
try:
# استخراج الطابع الزمني من بيانات API
if 'block_timestamp' in transfer:
timestamp = transfer['block_timestamp']
elif 'timeStamp' in transfer:
timestamp = int(transfer['timeStamp'])
else:
# إذا لم يوجد وقت، نستخدم الوقت الحالي ناقصاً وقت عشوائي
timestamp = int(current_time.timestamp()) - np.random.randint(300, 1800)
transfer_time = datetime.fromtimestamp(timestamp)
minutes_ago = (current_time - transfer_time).total_seconds() / 60
# إضافة معلومات التوقيت
transfer['whale_timing'] = {
'timestamp': timestamp,
'human_time': transfer_time.isoformat(),
'minutes_ago': minutes_ago,
'recency_score': max(0, 1 - (minutes_ago / 60)) # درجة الحداثة (1 = جديد جداً)
}
enriched_data.append(transfer)
except Exception as e:
print(f"⚠️ خطأ في إثراء بيانات التوقيت: {e}")
continue
return enriched_data
async def _scan_networks_for_symbol(self, symbol, base_symbol):
"""مسح الشبكات المباشرة للبحث عن نشاط الحيتان للعملة"""
print(f"🌊 مسح الشبكات المباشرة لـ {symbol}...")
whale_alerts = []
tasks = []
# مسح جميع شبكات EVM
for network in ['ethereum', 'bsc', 'polygon', 'arbitrum', 'avalanche']:
if network in self.rpc_endpoints:
tasks.append(self._scan_network_for_token_transfers(network, base_symbol))
results = await asyncio.gather(*tasks, return_exceptions=True)
for res in results:
if isinstance(res, list):
whale_alerts.extend(res)
# 🕒 ترتيب التنبيهات من الأحدث إلى الأقدم
whale_alerts.sort(key=lambda x: x['timestamp'], reverse=True)
return self._analyze_network_scan_results(whale_alerts, symbol)
async def _scan_network_for_token_transfers(self, network, base_symbol):
"""مسح شبكة محددة لتحويلات الرمز المميز مع الوقت الدقيق"""
try:
print(f"🔍 مسح {network} لـ {base_symbol}...")
# جلب أحدث كتلة
latest_block_hex = await self._call_rpc_async(network, 'eth_blockNumber')
if not latest_block_hex:
return []
latest_block = int(latest_block_hex, 16)
whale_alerts = []
# 🔄 مسح 50 كتلة (10-15 دقيقة)
for block_offset in range(50):
block_number = latest_block - block_offset
if block_number < 0:
break
# جلب بيانات الكتلة مع الطابع الزمني
block_data = await self._call_rpc_async(network, 'eth_getBlockByNumber', [hex(block_number), True])
if not block_data or 'transactions' not in block_data:
continue
# 🕒 استخراج وقت الكتلة
block_timestamp_hex = block_data.get('timestamp', '0x0')
block_timestamp = int(block_timestamp_hex, 16)
block_time = datetime.fromtimestamp(block_timestamp)
time_ago = (datetime.now() - block_time).total_seconds() / 60
for tx in block_data.get('transactions', []):
# البحث عن معاملات ذات صلة بالرمز
if await self._check_transaction_relevance(tx, base_symbol, network):
value_wei = int(tx.get('value', '0x0'), 16)
value_eth = value_wei / 1e18
price_usd = await self._get_native_coin_price(network)
value_usd = value_eth * price_usd
if value_usd >= self.whale_threshold_usd:
whale_alerts.append({
'network': network,
'value_usd': value_usd,
'from': tx.get('from', '')[:10] + '...',
'to': tx.get('to', '')[:10] + '...',
'hash': tx.get('hash', '')[:10] + '...',
'block_number': block_number,
'timestamp': block_timestamp,
'human_time': block_time.isoformat(),
'minutes_ago': time_ago,
'transaction_type': 'direct_transfer',
'recency_score': max(0, 1 - (time_ago / 60))
})
return whale_alerts
except Exception as e:
print(f"⚠️ فشل مسح {network} لـ {base_symbol}: {e}")
return []
async def _check_transaction_relevance(self, transaction, base_symbol, network):
"""التحقق من صلة المعاملة بالرمز المطلوب"""
try:
base_symbol_lower = base_symbol.lower()
# البحث في بيانات الإدخال
input_data = transaction.get('input', '').lower()
if base_symbol_lower in input_data:
return True
# البحث في العناوين
from_addr = transaction.get('from', '').lower()
to_addr = transaction.get('to', '').lower()
if base_symbol_lower in from_addr or base_symbol_lower in to_addr:
return True
# فحص إضافي للشبكات المختلفة
if network == 'ethereum':
# يمكن إضافة فحص للعناوين المعروفة
pass
except Exception as e:
print(f"⚠️ خطأ في فحص صلة المعاملة: {e}")
return False
def _analyze_network_scan_results(self, alerts, symbol):
"""تحليل نتائج المسح الشبكي مع التحليل الزمني"""
if not alerts:
return {
'data_available': False,
'description': f'لم يتم العثور على نشاط حيتان لـ {symbol} في آخر 50 كتلة',
'total_volume': 0,
'transfer_count': 0
}
total_volume = sum(alert['value_usd'] for alert in alerts)
transfer_count = len(alerts)
# 🕒 تحليل التوقيت
latest_alert = alerts[0] if alerts else None
oldest_alert = alerts[-1] if alerts else None
time_analysis = {
'latest_minutes_ago': latest_alert['minutes_ago'] if latest_alert else 0,
'oldest_minutes_ago': oldest_alert['minutes_ago'] if oldest_alert else 0,
'average_minutes_ago': sum(alert['minutes_ago'] for alert in alerts) / len(alerts) if alerts else 0
}
# 🎯 حساب درجة الحداثة الإجمالية
avg_recency = sum(alert.get('recency_score', 0) for alert in alerts) / len(alerts) if alerts else 0
sentiment = 'BULLISH' if total_volume > 500000 else 'SLIGHTLY_BULLISH' if total_volume > 100000 else 'NEUTRAL'
# إذا كانت المعاملات حديثة جداً (<5 دقائق)، نعزز الإشارة
if time_analysis['latest_minutes_ago'] < 5:
sentiment = 'STRONGLY_BULLISH'
description = f'🚨 {symbol}: نشاط حيتان حديث جداً ({time_analysis["latest_minutes_ago"]:.1f} دقيقة) - {transfer_count} تحويل بإجمالي ${total_volume:,.0f}'
else:
description = f'{symbol}: {transfer_count} تحويل حيتان، آخرها منذ {time_analysis["latest_minutes_ago"]:.1f} دقيقة - إجمالي ${total_volume:,.0f}'
return {
'sentiment': sentiment,
'description': description,
'total_volume': total_volume,
'transfer_count': transfer_count,
'data_available': True,
'source': 'direct_network_scan',
'time_analysis': time_analysis,
'recency_score': avg_recency,
'latest_activity': latest_alert['human_time'] if latest_alert else None,
'recent_alerts': alerts[:10] # آخر 10 تنبيهات
}
def _analyze_symbol_specific_data(self, transfers, symbol):
"""تحليل البيانات الخاصة بالرمز مع التوقيت"""
if not transfers:
return {
'data_available': False,
'description': f'لا يوجد نشاط حيتان حديث لـ {symbol}',
'total_volume': 0,
'transfer_count': 0
}
total_volume = sum(int(tx.get('value', 0)) for tx in transfers)
decimals = 18
try:
if transfers and 'tokenDecimal' in transfers[0]:
decimals = int(transfers[0]['tokenDecimal'])
except: pass
total_volume_normalized = total_volume / (10 ** decimals)
# 🕒 تحليل التوقيت للتحويلات
timing_info = []
for tx in transfers[:5]: # أحدث 5 تحويلات
if 'whale_timing' in tx:
timing_info.append(f"{tx['whale_timing']['minutes_ago']:.1f}د")
timing_summary = ", ".join(timing_info) if timing_info else "توقيت غير معروف"
sentiment = 'BULLISH' if total_volume_normalized > 1000000 else 'SLIGHTLY_BULLISH'
return {
'sentiment': sentiment,
'description': f'{symbol}: {len(transfers)} تحويل حيتان ({timing_summary}) - {total_volume_normalized:,.0f} عملة',
'total_volume': total_volume_normalized,
'transfer_count': len(transfers),
'data_available': True,
'source': 'onchain_apis',
'recent_transfers': transfers[:5] # أحدث 5 تحويلات
}
async def _find_contract_address(self, base_symbol):
"""البحث عن عنوان العقد من مصادر متعددة"""
# 1. البحث في قاعدة البيانات المحلية أولاً
contracts = self.contracts_db.get(base_symbol.upper(), {})
if contracts:
for network in ['ethereum', 'bsc', 'polygon', 'arbitrum']:
if network in contracts:
print(f"✅ وجد عقد {base_symbol} في {network}: {contracts[network][:10]}...")
return contracts[network]
# 2. البحث في CoinGecko مباشرة
print(f"🔍 البحث عن عقد {base_symbol} في CoinGecko...")
coin_id = await self._find_coin_id_by_symbol(base_symbol)
if coin_id:
contract_address = await self._get_contract_from_coingecko(coin_id)
if contract_address:
print(f"✅ وجد عقد {base_symbol} في CoinGecko: {contract_address[:10]}...")
return contract_address
print(f"⚠️ لم يتم العثور على عقد لـ {base_symbol}")
return None
async def _find_coin_id_by_symbol(self, symbol):
"""البحث عن معرف العملة في CoinGecko"""
try:
url = f"https://api.coingecko.com/api/v3/coins/list"
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10)
response.raise_for_status()
coins = response.json()
for coin in coins:
if coin['symbol'].upper() == symbol.upper():
return coin['id']
for coin in coins:
if symbol.upper() in coin['symbol'].upper() or symbol.upper() in coin['name'].upper():
return coin['id']
except Exception as e:
print(f"⚠️ فشل البحث عن معرف العملة لـ {symbol}: {e}")
return None
async def _get_contract_from_coingecko(self, coin_id):
"""جلب عنوان العقد من CoinGecko"""
try:
url = f"https://api.coingecko.com/api/v3/coins/{coin_id}"
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10)
response.raise_for_status()
data = response.json()
platforms = data.get('platforms', {})
for platform, address in platforms.items():
if address and address.strip():
return address
contract_address = data.get('contract_address')
if contract_address:
return contract_address
except Exception as e:
print(f"⚠️ فشل جلب العقد من CoinGecko لـ {coin_id}: {e}")
return None
async def _get_moralis_token_data(self, contract_address):
if not self.moralis_key: return []
try:
response = await self.http_client.get(
f"https://deep-index.moralis.io/api/v2/erc20/{contract_address}/transfers",
headers={"X-API-Key": self.moralis_key},
params={"chain": "eth", "limit": 20}
)
return response.json().get('result', []) if response.status_code == 200 else []
except Exception as e:
print(f"⚠️ Moralis API error: {e}"); return []
async def _get_etherscan_token_data(self, contract_address):
if not self.etherscan_key: return []
try:
params = {
"module": "account", "action": "tokentx", "contractaddress": contract_address,
"page": 1, "offset": 20, "sort": "desc", "apikey": self.etherscan_key
}
response = await self.http_client.get("https://api.etherscan.io/api", params=params)
data = response.json()
return data.get('result', []) if response.status_code == 200 and data.get('status') == '1' else []
except Exception as e:
print(f"⚠️ Etherscan API error: {e}"); return []
# إنشاء نسخة عالمية
whale_monitor_global = EnhancedWhaleMonitor()
class DataManager:
def __init__(self, contracts_db):
self.contracts_db = contracts_db or {}
self.exchange = ccxt.kucoin()
self.exchange.rateLimit = 1000
self._whale_data_cache = {}
self.http_client = None
self.fetch_stats = {'successful_fetches': 0, 'failed_fetches': 0, 'rate_limit_hits': 0}
self.whale_monitor = EnhancedWhaleMonitor(contracts_db)
async def initialize(self):
self.http_client = httpx.AsyncClient(timeout=30.0)
print("✅ DataManager initialized - Enhanced RPC Whale Monitoring with Timing Active")
async def close(self):
if self.http_client: await self.http_client.aclose()
await self.exchange.close()
async def get_sentiment_safe_async(self):
"""جلب بيانات المشاعر مع إعادة المحاولة - بدون محاكاة"""
max_retries = 3
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=10) as client:
print(f"🎭 جلب بيانات المشاعر - المحاولة {attempt + 1}/{max_retries}...")
r = await client.get("https://api.alternative.me/fng/")
r.raise_for_status()
data = r.json()
if 'data' not in data or not data['data']:
raise ValueError("بيانات المشاعر غير متوفرة في الاستجابة")
latest_data = data['data'][0]
return {
"feargreed_value": int(latest_data['value']),
"feargreed_class": latest_data['value_classification'],
"source": "alternative.me",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
print(f"⚠️ فشل محاولة {attempt + 1}/{max_retries} لجلب بيانات المشاعر: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2)
# بعد استنفاد المحاولات - إرجاع بيانات محايدة مع الإشارة إلى الفشل
print("🚨 فشل جميع محاولات جلب بيانات المشاعر")
return {
"feargreed_value": 50,
"feargreed_class": "Neutral",
"source": "fallback_after_failure",
"error": "فشل في جلب بيانات المشاعر الحقيقية",
"timestamp": datetime.now().isoformat()
}
async def get_market_context_async(self):
"""جلب سياق السوق مع مراقبة الحيتان العامة - بدون محاكاة"""
max_retries = 3
for attempt in range(max_retries):
try:
print(f"📊 جلب سياق السوق - المحاولة {attempt + 1}/{max_retries}...")
sentiment_task = self.get_sentiment_safe_async()
price_task = self.http_client.get('https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd')
whale_task = self.whale_monitor.get_general_whale_activity()
results = await asyncio.gather(sentiment_task, price_task, whale_task, return_exceptions=True)
sentiment_data = results[0] if not isinstance(results[0], Exception) else None
price_response = results[1]
general_whale_activity = results[2] if not isinstance(results[2], Exception) else None
data_quality_issues = []
if not sentiment_data:
data_quality_issues.append("بيانات المشاعر")
elif sentiment_data.get('source') == 'fallback':
data_quality_issues.append("بيانات المشاعر احتياطية")
if not price_response or isinstance(price_response, Exception):
data_quality_issues.append("بيانات الأسعار")
else:
try:
top_coins_data = price_response.json()
bitcoin_price = top_coins_data.get('bitcoin', {}).get('usd', 0)
ethereum_price = top_coins_data.get('ethereum', {}).get('usd', 0)
if bitcoin_price == 0 or ethereum_price == 0:
data_quality_issues.append("أسعار غير صالحة")
except:
data_quality_issues.append("معالجة بيانات الأسعار")
if not general_whale_activity:
data_quality_issues.append("بيانات الحيتان")
if data_quality_issues:
print(f"⚠️ مشاكل في جودة البيانات: {', '.join(data_quality_issues)} - إعادة المحاولة...")
await asyncio.sleep(2)
continue
try:
top_coins_data = price_response.json()
bitcoin_price = top_coins_data.get('bitcoin', {}).get('usd', 0)
ethereum_price = top_coins_data.get('ethereum', {}).get('usd', 0)
if bitcoin_price == 0 or ethereum_price == 0:
raise ValueError("أسعار غير صالحة")
except Exception as e:
print(f"⚠️ خطأ في معالجة الأسعار: {e}")
await asyncio.sleep(2)
continue
market_trend = self._determine_market_trend(bitcoin_price, sentiment_data, general_whale_activity)
# 🕒 إضافة معلومات توقيت الحيتان لسياق السوق
whale_timing_info = {}
if general_whale_activity and general_whale_activity.get('data_available'):
whale_timing_info = {
'latest_activity': general_whale_activity.get('latest_activity'),
'time_analysis': general_whale_activity.get('time_analysis', {}),
'recent_alerts_count': len(general_whale_activity.get('recent_alerts', []))
}
market_context = {
'timestamp': datetime.now().isoformat(),
'bitcoin_price_usd': bitcoin_price,
'ethereum_price_usd': ethereum_price,
'fear_and_greed_index': sentiment_data.get('feargreed_value'),
'sentiment_class': sentiment_data.get('feargreed_class'),
'general_whale_activity': general_whale_activity,
'whale_timing_info': whale_timing_info,
'market_trend': market_trend,
'btc_sentiment': 'BULLISH' if bitcoin_price > 60000 else 'BEARISH' if bitcoin_price < 55000 else 'NEUTRAL',
'data_sources': {
'prices': bool(bitcoin_price),
'sentiment': bool(sentiment_data.get('feargreed_value')),
'general_whale_data': general_whale_activity.get('data_available', False)
},
'data_quality': 'HIGH',
'retry_attempt': attempt + 1
}
# 🕒 طباعة معلومات التوقيت
if general_whale_activity and general_whale_activity.get('data_available'):
time_info = general_whale_activity.get('time_analysis', {})
print(f"📊 سياق السوق: BTC=${bitcoin_price:,.0f}, F&G={sentiment_data.get('feargreed_value')}, الحيتان={general_whale_activity.get('sentiment')}")
print(f"🕒 توقيت الحيتان: أحدث نشاط منذ {time_info.get('newest_minutes', 0):.1f} دقيقة")
return market_context
except Exception as e:
print(f"❌ فشل محاولة {attempt + 1}/{max_retries} لجلب سياق السوق: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(3)
else:
print("🚨 استنفاد جميع محاولات جلب سياق السوق")
return self._get_minimal_market_context()
return self._get_minimal_market_context()
def _get_minimal_market_context(self):
"""إرجاع سياق سوق أساسي بدون محاكاة مع الإشارة إلى فشل المصادر"""
return {
'timestamp': datetime.now().isoformat(),
'data_available': False,
'data_sources': {'prices': False, 'sentiment': False, 'general_whale_data': False},
'error': 'فشل في جلب بيانات السوق من المصادر الخارجية',
'market_trend': 'unknown',
'btc_sentiment': 'UNKNOWN',
'data_quality': 'LOW',
'general_whale_activity': {
'data_available': False,
'description': 'فشل في مراقبة الحيتان',
'critical_alert': False,
'sentiment': 'UNKNOWN'
}
}
def _determine_market_trend(self, bitcoin_price, sentiment_data, whale_activity):
"""تحديد اتجاه السوق بناءً على البيانات الحقيقية"""
try:
if not bitcoin_price or not sentiment_data or not whale_activity:
return "unknown"
score = 0
# تحليل سعر البيتكوين
if bitcoin_price > 60000:
score += 1
elif bitcoin_price < 55000:
score -= 1
# تحليل مؤشر الخوف والجشع
fear_greed = sentiment_data.get('feargreed_value', 50)
if fear_greed > 60:
score += 1
elif fear_greed < 40:
score -= 1
# تحليل نشاط الحيتان
whale_sentiment = whale_activity.get('sentiment', 'NEUTRAL')
if whale_sentiment == 'BULLISH':
score += 1
elif whale_sentiment == 'BEARISH':
score -= 1
if whale_activity.get('critical_alert', False):
score = -2 # تحذير حرج يلغي كل المؤشرات الإيجابية
# تحديد الاتجاه بناءً على النقاط
if score >= 2:
return "bull_market"
elif score <= -2:
return "bear_market"
elif -1 <= score <= 1:
return "sideways_market"
else:
return "volatile_market"
except Exception as e:
print(f"⚠️ خطأ في تحديد اتجاه السوق: {e}")
return "unknown"
@staticmethod
def adaptive_backoff(func):
@backoff.on_exception(backoff.expo, (RateLimitExceeded, DDoSProtection, NetworkError, httpx.TimeoutException), max_tries=5, max_time=30, on_backoff=lambda details: print(f"⏳ Backoff: Attempt {details['tries']}, waiting {details['wait']:.1f}s"))
@wraps(func)
async def wrapper(*args, **kwargs): return await func(*args, **kwargs)
return wrapper
@adaptive_backoff
async def fetch_ohlcv_with_retry(self, symbol: str, timeframe: str, limit: int = 200):
"""جلب بيانات OHLCV مع إعادة المحاولة"""
try:
candles = await self.exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=limit)
self.fetch_stats['successful_fetches'] += 1
return candles
except RateLimitExceeded:
self.fetch_stats['rate_limit_hits'] += 1
print(f"⚠️ Rate limit hit for {symbol} on {timeframe}")
raise
except Exception as e:
self.fetch_stats['failed_fetches'] += 1
print(f"❌ Failed to fetch {symbol} {timeframe}: {e}")
raise
async def _scan_for_momentum(self, tickers, top_n=30):
"""مسح الزخم - بدون محاكاة"""
print("🔍 Running Momentum Scanner...")
valid_tickers = [t for t in tickers if t.get('change') is not None]
if not valid_tickers:
print("⚠️ No valid tickers for momentum analysis")
return {}
sorted_by_change = sorted(valid_tickers, key=lambda x: x.get('change', 0), reverse=True)
return {ticker['symbol']: {'momentum'} for ticker in sorted_by_change[:top_n]}
async def _scan_for_breakouts(self, symbols, top_n=30):
"""مسح الانكسارات - بدون محاكاة"""
print("🧱 Running Breakout Scanner...")
candidates = {}
tasks = [self.exchange.fetch_ohlcv(symbol, '1h', limit=48) for symbol in symbols]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception) or not result:
continue
df = pd.DataFrame(result, columns=['time', 'open', 'high', 'low', 'close', 'volume'])
if len(df) < 2:
continue
recent_high = df['high'].iloc[:-1].max()
current_price = df['close'].iloc[-1]
if current_price > recent_high * 1.02: # انكسار بنسبة 2% على الأقل
candidates[symbols[i]] = {'breakout'}
if len(candidates) >= top_n:
break
print(f"✅ Found {len(candidates)} breakout candidates")
return candidates
async def _scan_for_volume_spike(self, symbols, top_n=30):
"""مسح ارتفاع الحجم - بدون محاكاة"""
print("💧 Running Volume Spike Scanner...")
candidates = {}
tasks = [self.exchange.fetch_ohlcv(symbol, '1h', limit=24) for symbol in symbols]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception) or not result:
continue
df = pd.DataFrame(result, columns=['time', 'open', 'high', 'low', 'close', 'volume'])
df['volume'] = pd.to_numeric(df['volume'])
if len(df) < 2:
continue
average_volume = df['volume'].iloc[:-1].mean()
current_volume = df['volume'].iloc[-1]
if current_volume > average_volume * 3 and current_volume > 10000:
candidates[symbols[i]] = {'volume_spike'}
if len(candidates) >= top_n:
break
print(f"✅ Found {len(candidates)} volume spike candidates")
return candidates
async def find_high_potential_candidates(self, n=100):
"""إيجاد مرشحين عاليي الإمكانية - بدون محاكاة"""
print("🚀 Starting High Potential Candidate Scan...")
try:
# جلب جميع التكرات
all_tickers = list((await self.exchange.fetch_tickers()).values())
if not all_tickers:
print("❌ Failed to fetch tickers")
return []
# تصفية أزواج USDT ذات حجم تداول كافٍ
usdt_tickers = [
t for t in all_tickers
if '/USDT' in t.get('symbol', '')
and t.get('quoteVolume', 0) > 100000 # حد أدنى للحجم
]
if not usdt_tickers:
print("❌ No USDT pairs with sufficient volume")
return []
# أخذ أعلى 300 رمز من حيث حجم التداول
top_volume_symbols = [
t['symbol'] for t in sorted(usdt_tickers, key=lambda x: x.get('quoteVolume', 0), reverse=True)[:300]
]
print(f"📊 Analyzing {len(top_volume_symbols)} symbols with highest volume")
# تشغيل المسوحات بالتوازي
momentum_task = self._scan_for_momentum(usdt_tickers, top_n=50)
breakout_task = self._scan_for_breakouts(top_volume_symbols, top_n=50)
volume_spike_task = self._scan_for_volume_spike(top_volume_symbols, top_n=50)
results = await asyncio.gather(momentum_task, breakout_task, volume_spike_task)
momentum_candidates, breakout_candidates, volume_spike_candidates = results
# دمج النتائج
combined_candidates = {}
for candidates_dict in [momentum_candidates, breakout_candidates, volume_spike_candidates]:
for symbol, reasons in candidates_dict.items():
combined_candidates.setdefault(symbol, set()).update(reasons)
# تطبيق مرشحات المخاطرة
if not MARKET_STATE_OK:
print("🚨 Risk filter: Market state is NOT OK. Halting search.")
return []
final_list = []
tickers_map = {t['symbol']: t for t in usdt_tickers}
for symbol, reasons in combined_candidates.items():
ticker_info = tickers_map.get(symbol)
if not ticker_info:
continue
# مرشح تجنب التمدد المفرط
change_percent = ticker_info.get('change', 0) or 0
if change_percent > 500:
print(f"⚠️ Risk filter: Skipping {symbol} due to over-extension ({change_percent:.2f}%).")
continue
# مرشح السيولة
quote_volume = ticker_info.get('quoteVolume', 0)
if quote_volume < 50000: # حد أدنى للسيولة
continue
final_list.append({
'symbol': symbol,
'reasons': list(reasons),
'score': len(reasons),
'change_percent': change_percent,
'volume': quote_volume
})
# ترتيب حسب النقاط
sorted_final_list = sorted(final_list, key=lambda x: x['score'], reverse=True)
print(f"✅ Scan complete. Found {len(sorted_final_list)} potential candidates out of {len(combined_candidates)} raw candidates.")
return sorted_final_list[:n]
except Exception as e:
print(f"❌ Failed to find high potential candidates: {e}")
traceback.print_exc()
return []
async def get_whale_data_safe_async(self, symbol):
"""جلب بيانات الحيتان الخاصة برمز معين - بدون محاكاة"""
try:
base = symbol.split("/")[0]
# البحث في قاعدة البيانات المحلية أولاً
contracts = self.contracts_db.get(base, {})
contract_address = contracts.get("ethereum") # الأولوية للشبكة الرئيسية
# استخدام النظام المحسن
whale_data = await self.whale_monitor.get_symbol_specific_whale_data(symbol, contract_address)
if whale_data.get('data_available'):
print(f"✅ بيانات على السلسلة لـ {symbol}: {whale_data.get('description')}")
print(f" المصدر: {whale_data.get('source', 'unknown')}")
else:
print(f"ℹ️ لا توجد بيانات حيتان محددة لـ {symbol}")
return whale_data
except Exception as e:
print(f"❌ فشل جلب بيانات الحيتان لـ {symbol}: {e}")
return {
'data_available': False,
'description': f'خطأ في جلب بيانات الحيتان: {str(e)}',
'total_volume': 0,
'transfer_count': 0,
'source': 'error'
}
async def get_fast_pass_data_async(self, symbols_with_reasons):
"""جلب بيانات سريعة للمرشحين مع تحسينات كاملة للبيانات"""
timeframes = ['5m', '15m', '1h', '4h', '1d', '1w']
data = []
total = len(symbols_with_reasons)
completed = 0
success = 0
failed = 0
semaphore = asyncio.Semaphore(10) # الحد من الطلبات المتزامنة
async def fetch_symbol_data(symbol_data, index):
nonlocal completed, success, failed
symbol = symbol_data['symbol']
reasons = symbol_data['reasons']
async with semaphore:
try:
print(f"⏳ [{index+1}/{total}] Fetching ENHANCED data for {symbol}...")
ohlcv_data = {}
timeframes_fetched = 0
for timeframe in timeframes:
try:
# ⚡ زيادة عدد الشموع إلى 200 مع التحقق من الجودة
candles = await self.fetch_ohlcv_with_retry(symbol, timeframe, limit=200)
if candles and len(candles) >= 50: # ✅ حد أدنى 50 شمعة
# ✅ تنظيف البيانات والتأكد من الهيكل الصحيح
cleaned_candles = []
for candle in candles:
if len(candle) >= 6: # التأكد من وجود جميع الحقول
cleaned_candles.append([
candle[0], # timestamp
float(candle[1]), # open
float(candle[2]), # high
float(candle[3]), # low
float(candle[4]), # close
float(candle[5]) # volume
])
if len(cleaned_candles) >= 50:
ohlcv_data[timeframe] = cleaned_candles
timeframes_fetched += 1
print(f" ✅ {timeframe}: {len(cleaned_candles)} candles")
else:
ohlcv_data[timeframe] = []
else:
ohlcv_data[timeframe] = []
except Exception as e:
print(f" ⚠️ Skipping {timeframe} for {symbol}: {e}")
ohlcv_data[timeframe] = []
await asyncio.sleep(0.1)
completed += 1
if timeframes_fetched >= 3: # ✅ على الأقل 3 أطر زمنية ناجحة
success += 1
print(f"✅ [{index+1}/{total}] {symbol} - {timeframes_fetched}/{len(timeframes)} timeframes | Progress: {completed}/{total} ({int(completed/total*100)}%)")
return {
'symbol': symbol,
'ohlcv': ohlcv_data,
'reasons': reasons,
'successful_timeframes': timeframes_fetched,
'raw_candles_count': {tf: len(ohlcv_data[tf]) for tf in ohlcv_data if ohlcv_data[tf]}
}
else:
failed += 1
print(f"⚠️ [{index+1}/{total}] {symbol} - Insufficient data ({timeframes_fetched} timeframes) | Progress: {completed}/{total} ({int(completed/total*100)}%)")
return None
except Exception as e:
completed += 1
failed += 1
print(f"❌ [{index+1}/{total}] {symbol} - Error: {str(e)[:50]} | Progress: {completed}/{total} ({int(completed/total*100)}%)")
return None
print(f"\n{'='*70}")
print(f"📊 Starting ENHANCED data fetch for {total} symbols")
print(f"{'='*70}\n")
# إنشاء المهام
tasks = [fetch_symbol_data(symbol_data, i) for i, symbol_data in enumerate(symbols_with_reasons)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# معالجة النتائج
for result in results:
if isinstance(result, Exception):
print(f"❌ Exception in fetch task: {result}")
elif result is not None:
data.append(result)
# إحصائيات النتائج
success_rate = (success/total*100) if total > 0 else 0
print(f"\n{'='*70}")
print(f"✅ ENHANCED data fetching complete!")
print(f" Total: {total} | Success: {success} | Failed: {failed}")
print(f" Success Rate: {success_rate:.1f}%")
print(f" Rate Limit Hits: {self.fetch_stats['rate_limit_hits']}")
print(f"{'='*70}\n")
return data
async def get_latest_price_async(self, symbol):
"""جلب آخر سعر لرمز - مع إعادة المحاولة"""
max_retries = 3
for attempt in range(max_retries):
try:
ticker = await self.exchange.fetch_ticker(symbol)
price = ticker.get('last')
if price and price > 0:
return price
else:
raise ValueError("سعر غير صالح")
except Exception as e:
print(f"⚠️ فشل محاولة {attempt + 1}/{max_retries} لجلب سعر {symbol}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1)
print(f"❌ فشل جميع محاولات جلب سعر {symbol}")
return None
def get_performance_stats(self):
"""الحصول على إحصائيات الأداء"""
total_attempts = self.fetch_stats['successful_fetches'] + self.fetch_stats['failed_fetches']
success_rate = (self.fetch_stats['successful_fetches'] / total_attempts * 100) if total_attempts > 0 else 0
return {
'total_attempts': total_attempts,
'successful_fetches': self.fetch_stats['successful_fetches'],
'failed_fetches': self.fetch_stats['failed_fetches'],
'rate_limit_hits': self.fetch_stats['rate_limit_hits'],
'success_rate': f"{success_rate:.1f}%",
'timestamp': datetime.now().isoformat()
}
async def fetch_contracts_from_coingecko():
"""جلب قاعدة بيانات العقود من CoinGecko - مع إعادة المحاولة"""
max_retries = 3
for attempt in range(max_retries):
try:
print(f"📋 جلب عقود CoinGecko - المحاولة {attempt + 1}/{max_retries}...")
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.coingecko.com/api/v3/coins/list?include_platform=true",
timeout=30
)
response.raise_for_status()
data = response.json()
if not data:
raise ValueError("بيانات العقود فارغة")
print(f"✅ تم جلب {len(data)} عقد من CoinGecko")
return {coin['symbol'].upper(): coin for coin in data}
except Exception as e:
print(f"❌ فشل محاولة {attempt + 1}/{max_retries} لجلب العقود من CoinGecko: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(5)
print("🚨 فشل جميع محاولات جلب عقود CoinGecko")
return {}
print("✅ Enhanced Data Manager Loaded - REAL-TIME WHALE TRACKING WITH PRECISE TIMING - 50 BLOCKS SCAN - NO SIMULATION - SENTIMENT FIXED")