Trad / data_manager.py
Riy777's picture
Update data_manager.py
00bb5c9
raw
history blame
19.4 kB
import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt.pro as ccxt
from state import MARKET_STATE_OK
from whale_news_data import whale_monitor_global
from helpers import safe_float_conversion
class DataManager:
def __init__(self, contracts_db):
self.contracts_db = contracts_db or {}
try:
self.exchange = ccxt.kucoin({
'sandbox': False,
'enableRateLimit': True
})
self.exchange.rateLimit = 800
print("✅ تم تهيئة KuCoin (الوضع العام)")
except Exception as e:
print(f"⚠️ فشل تهيئة KuCoin: {e}")
self.exchange = None
self._whale_data_cache = {}
self.http_client = None
self.fetch_stats = {'successful_fetches': 0, 'failed_fetches': 0, 'rate_limit_hits': 0}
self.whale_monitor = whale_monitor_global
self.price_cache = {}
async def initialize(self):
self.http_client = httpx.AsyncClient(timeout=20.0)
api_status = {
'KUCOIN': '🟢 عام (بدون مفتاح)',
'MORALIS_KEY': "🟢 متوفر" if os.getenv('MORALIS_KEY') else "🔴 غير متوفر",
'ETHERSCAN_KEY': "🟢 متوفر" if os.getenv('ETHERSCAN_KEY') else "🔴 غير متوفر",
'INFURA_KEY': "🟢 متوفر" if os.getenv('INFURA_KEY') else "🔴 غير متوفر"
}
print("✅ DataManager initialized - تحليل صافي التدفق المتقدم")
for key, status in api_status.items():
print(f" {key}: {status}")
async def close(self):
if self.http_client:
await self.http_client.aclose()
if self.exchange:
await self.exchange.close()
async def get_sentiment_safe_async(self):
max_retries = 2
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=8) as client:
print(f"🎭 جلب بيانات المشاعر - المحاولة {attempt + 1}/{max_retries}...")
r = await client.get("https://api.alternative.me/fng/")
r.raise_for_status()
data = r.json()
if 'data' not in data or not data['data']:
raise ValueError("بيانات المشاعر غير متوفرة في الاستجابة")
latest_data = data['data'][0]
return {
"feargreed_value": int(latest_data['value']),
"feargreed_class": latest_data['value_classification'],
"source": "alternative.me",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
print(f"⚠️ فشل محاولة {attempt + 1}/{max_retries} لجلب بيانات المشاعر: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1)
return None
async def get_market_context_async(self):
max_retries = 2
for attempt in range(max_retries):
try:
print(f"📊 جلب سياق السوق - المحاولة {attempt + 1}/{max_retries}")
sentiment_task = asyncio.wait_for(self.get_sentiment_safe_async(), timeout=10)
price_task = asyncio.wait_for(self._get_prices_with_fallback(), timeout=15)
whale_task = asyncio.wait_for(self.whale_monitor.get_general_whale_activity(), timeout=30)
results = await asyncio.gather(sentiment_task, price_task, whale_task,
return_exceptions=True)
sentiment_data = results[0] if not isinstance(results[0], Exception) else None
price_data = results[1] if not isinstance(results[1], Exception) else {}
general_whale_activity = results[2] if not isinstance(results[2], Exception) else None
bitcoin_price = price_data.get('bitcoin')
ethereum_price = price_data.get('ethereum')
if bitcoin_price is None or ethereum_price is None:
if attempt < max_retries - 1:
await asyncio.sleep(2)
continue
else:
return self._get_minimal_market_context()
market_trend = self._determine_market_trend(bitcoin_price, sentiment_data, general_whale_activity)
trading_decision = self._analyze_advanced_trading_signals(general_whale_activity, sentiment_data)
market_context = {
'timestamp': datetime.now().isoformat(),
'bitcoin_price_usd': bitcoin_price,
'ethereum_price_usd': ethereum_price,
'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None,
'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'UNKNOWN',
'general_whale_activity': general_whale_activity or {
'data_available': False,
'description': 'غير متوفر - فشل في مراقبة الحيتان',
'critical_alert': False,
'sentiment': 'UNKNOWN',
'trading_signals': []
},
'market_trend': market_trend,
'trading_decision': trading_decision,
'btc_sentiment': self._get_btc_sentiment(bitcoin_price),
'data_sources': {
'prices': bitcoin_price is not None and ethereum_price is not None,
'sentiment': sentiment_data is not None,
'general_whale_data': general_whale_activity.get('data_available', False) if general_whale_activity else False,
'netflow_analysis': general_whale_activity.get('netflow_analysis', {}).get('market_impact', 'UNKNOWN') if general_whale_activity else 'UNKNOWN'
},
'data_quality': 'HIGH',
'risk_assessment': self._assess_market_risk(general_whale_activity, sentiment_data)
}
print(f"📊 سياق السوق: BTC=${bitcoin_price:,.0f}, ETH=${ethereum_price:,.0f}")
if general_whale_activity and general_whale_activity.get('netflow_analysis'):
netflow = general_whale_activity['netflow_analysis']
print(f"📈 تحليل التدفق: صافي ${netflow['net_flow']:,.0f}")
if general_whale_activity and general_whale_activity.get('trading_signals'):
for signal in general_whale_activity['trading_signals']:
print(f"🎯 {signal['action']}: {signal['reason']}")
return market_context
except Exception as e:
print(f"❌ فشل محاولة {attempt + 1}/{max_retries} لجلب سياق السوق: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(3)
return self._get_minimal_market_context()
def _analyze_advanced_trading_signals(self, whale_activity, sentiment_data):
if not whale_activity or not whale_activity.get('data_available'):
return {
'action': 'HOLD',
'confidence': 0.0,
'reason': 'غير متوفر - لا توجد بيانات كافية عن الحيتان',
'risk_level': 'UNKNOWN'
}
signals = whale_activity.get('trading_signals', [])
netflow_analysis = whale_activity.get('netflow_analysis', {})
whale_sentiment = whale_activity.get('sentiment', 'NEUTRAL')
strongest_signal = None
for signal in signals:
if not strongest_signal or signal.get('confidence', 0) > strongest_signal.get('confidence', 0):
strongest_signal = signal
if strongest_signal and strongest_signal.get('confidence', 0) > 0.7:
action = strongest_signal['action']
reason = strongest_signal['reason']
confidence = strongest_signal['confidence']
if 'STRONG_' in action:
risk_level = 'HIGH' if 'SELL' in action else 'LOW'
else:
risk_level = 'MEDIUM' if 'SELL' in action else 'LOW'
return {
'action': action,
'confidence': confidence,
'reason': reason,
'risk_level': risk_level,
'source': 'netflow_analysis'
}
net_flow = netflow_analysis.get('net_flow', 0)
flow_direction = netflow_analysis.get('flow_direction', 'BALANCED')
if flow_direction == 'TO_EXCHANGES' and abs(net_flow) > 500000:
return {
'action': 'SELL',
'confidence': 0.6,
'reason': f'تدفق بيعي إلى المنصات: ${abs(net_flow):,.0f}',
'risk_level': 'MEDIUM',
'source': 'netflow_direction'
}
elif flow_direction == 'FROM_EXCHANGES' and net_flow > 500000:
return {
'action': 'BUY',
'confidence': 0.6,
'reason': f'تراكم شرائي من المنصات: ${net_flow:,.0f}',
'risk_level': 'LOW',
'source': 'netflow_direction'
}
return {
'action': 'HOLD',
'confidence': 0.5,
'reason': f'أنماط التدفق طبيعية - مشاعر الحيتان: {whale_sentiment}',
'risk_level': 'LOW',
'source': 'balanced_flow'
}
def _assess_market_risk(self, whale_activity, sentiment_data):
risk_factors = []
risk_score = 0
if whale_activity and whale_activity.get('data_available'):
if whale_activity.get('critical_alert', False):
risk_factors.append("نشاط حيتان حرج")
risk_score += 3
netflow = whale_activity.get('netflow_analysis', {})
if netflow.get('flow_direction') == 'TO_EXCHANGES' and abs(netflow.get('net_flow', 0)) > 1000000:
risk_factors.append("تدفق بيعي كبير إلى المنصات")
risk_score += 2
if whale_activity.get('sentiment') == 'BEARISH':
risk_factors.append("مشاعر حيتان هبوطية")
risk_score += 1
if sentiment_data and sentiment_data.get('feargreed_value', 50) < 30:
risk_factors.append("مخاوف السوق عالية")
risk_score += 2
elif sentiment_data and sentiment_data.get('feargreed_value', 50) > 70:
risk_factors.append("جشع السوق مرتفع")
risk_score += 1
if risk_score >= 4:
return {'level': 'HIGH', 'score': risk_score, 'factors': risk_factors}
elif risk_score >= 2:
return {'level': 'MEDIUM', 'score': risk_score, 'factors': risk_factors}
else:
return {'level': 'LOW', 'score': risk_score, 'factors': risk_factors}
def _get_btc_sentiment(self, bitcoin_price):
if bitcoin_price is None:
return 'UNKNOWN'
elif bitcoin_price > 60000:
return 'BULLISH'
elif bitcoin_price < 55000:
return 'BEARISH'
else:
return 'NEUTRAL'
async def _get_prices_with_fallback(self):
try:
prices = await self._get_prices_from_kucoin_safe()
if prices.get('bitcoin') and prices.get('ethereum'):
return prices
prices = await self._get_prices_from_coingecko()
if prices.get('bitcoin') and prices.get('ethereum'):
return prices
return {'bitcoin': None, 'ethereum': None}
except Exception as e:
print(f"❌ فشل جلب الأسعار: {e}")
return {'bitcoin': None, 'ethereum': None}
async def _get_prices_from_kucoin_safe(self):
if not self.exchange:
return {'bitcoin': None, 'ethereum': None}
try:
prices = {'bitcoin': None, 'ethereum': None}
try:
btc_ticker = await self.exchange.fetch_ticker('BTC/USDT')
btc_price = float(btc_ticker.get('last', 0)) if btc_ticker.get('last') else None
if btc_price and btc_price > 0:
prices['bitcoin'] = btc_price
self.price_cache['bitcoin'] = btc_price
print(f"✅ BTC من KuCoin: ${btc_price:.0f}")
except Exception as e:
print(f"⚠️ فشل جلب سعر BTC من KuCoin: {e}")
try:
eth_ticker = await self.exchange.fetch_ticker('ETH/USDT')
eth_price = float(eth_ticker.get('last', 0)) if eth_ticker.get('last') else None
if eth_price and eth_price > 0:
prices['ethereum'] = eth_price
self.price_cache['ethereum'] = eth_price
print(f"✅ ETH من KuCoin: ${eth_price:.0f}")
except Exception as e:
print(f"⚠️ فشل جلب سعر ETH من KuCoin: {e}")
return prices
except Exception as e:
print(f"⚠️ فشل جلب الأسعار من KuCoin: {e}")
return {'bitcoin': None, 'ethereum': None}
async def _get_prices_from_coingecko(self):
try:
await asyncio.sleep(0.5)
url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd"
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10)
response.raise_for_status()
data = response.json()
btc_price = data.get('bitcoin', {}).get('usd')
eth_price = data.get('ethereum', {}).get('usd')
if btc_price and eth_price:
self.price_cache['bitcoin'] = btc_price
self.price_cache['ethereum'] = eth_price
print(f"✅ الأسعار من CoinGecko: BTC=${btc_price:.0f}, ETH=${eth_price:.0f}")
return {'bitcoin': btc_price, 'ethereum': eth_price}
return {'bitcoin': None, 'ethereum': None}
except Exception as e:
print(f"⚠️ فشل جلب الأسعار من CoinGecko: {e}")
return {'bitcoin': None, 'ethereum': None}
def _get_minimal_market_context(self):
return {
'timestamp': datetime.now().isoformat(),
'data_available': False,
'data_sources': {'prices': False, 'sentiment': False, 'general_whale_data': False},
'error': 'غير متوفر - فشل في جلب بيانات السوق من المصادر الخارجية',
'market_trend': 'UNKNOWN',
'btc_sentiment': 'UNKNOWN',
'data_quality': 'LOW',
'general_whale_activity': {
'data_available': False,
'description': 'غير متوفر - فشل في مراقبة الحيتان',
'critical_alert': False,
'sentiment': 'UNKNOWN'
},
'bitcoin_price_usd': None,
'ethereum_price_usd': None,
'fear_and_greed_index': None,
'sentiment_class': 'UNKNOWN',
'missing_data': ['غير متوفر - أسعار البيتكوين', 'غير متوفر - أسعار الإيثيريوم', 'غير متوفر - بيانات المشاعر', 'غير متوفر - بيانات الحيتان']
}
def _determine_market_trend(self, bitcoin_price, sentiment_data, whale_activity):
try:
if bitcoin_price is None:
return "UNKNOWN"
score = 0
data_points = 1
if bitcoin_price > 60000:
score += 1
elif bitcoin_price < 55000:
score -= 1
if sentiment_data and sentiment_data.get('feargreed_value') is not None:
fear_greed = sentiment_data.get('feargreed_value')
if fear_greed > 60:
score += 1
elif fear_greed < 40:
score -= 1
data_points += 1
if (whale_activity and whale_activity.get('data_available') and
whale_activity.get('sentiment') != 'UNKNOWN'):
whale_sentiment = whale_activity.get('sentiment')
if whale_sentiment == 'BULLISH':
score += 1
elif whale_sentiment == 'BEARISH':
score -= 1
data_points += 1
if whale_activity.get('critical_alert', False):
score = -2
if data_points < 2:
return "UNKNOWN"
if score >= 2:
return "bull_market"
elif score <= -2:
return "bear_market"
elif -1 <= score <= 1:
return "sideways_market"
else:
return "volatile_market"
except Exception as e:
print(f"⚠️ خطأ في تحديد اتجاه السوق: {e}")
return "UNKNOWN"
def get_performance_stats(self):
total_attempts = self.fetch_stats['successful_fetches'] + self.fetch_stats['failed_fetches']
success_rate = (self.fetch_stats['successful_fetches'] / total_attempts * 100) if total_attempts > 0 else 0
stats = {
'total_attempts': total_attempts,
'successful_fetches': self.fetch_stats['successful_fetches'],
'failed_fetches': self.fetch_stats['failed_fetches'],
'rate_limit_hits': self.fetch_stats['rate_limit_hits'],
'success_rate': f"{success_rate:.1f}%",
'timestamp': datetime.now().isoformat(),
'exchange_available': self.exchange is not None
}
api_stats = self.whale_monitor.get_api_usage_stats()
stats['api_usage'] = api_stats
return stats
async def get_symbol_specific_whale_data(self, symbol, contract_address=None):
return await self.whale_monitor.get_symbol_specific_whale_data(symbol, contract_address)
print("✅ Enhanced Data Manager Loaded - جميع البيانات حقيقية")