Spaces:
Running
Running
| # data_manager.py (Updated to V7.3 - Increased L1 Threshold to 0.50) | |
| import os | |
| import asyncio | |
| import httpx | |
| import traceback | |
| import time | |
| from datetime import datetime | |
| import ccxt | |
| import numpy as np | |
| import logging | |
| from typing import List, Dict, Any | |
| import pandas as pd | |
| try: | |
| import pandas_ta as ta | |
| except ImportError: | |
| print("⚠️ مكتبة pandas_ta غير موجودة، فلتر الغربلة المتقدم سيفشل.") | |
| ta = None | |
| from ml_engine.indicators import AdvancedTechnicalAnalyzer | |
| from ml_engine.monte_carlo import MonteCarloAnalyzer | |
| from ml_engine.patterns import ChartPatternAnalyzer | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("httpcore").setLevel(logging.WARNING) | |
| class DataManager: | |
| def __init__(self, contracts_db, whale_monitor): | |
| self.contracts_db = contracts_db or {} | |
| self.whale_monitor = whale_monitor | |
| try: | |
| self.exchange = ccxt.kucoin({ | |
| 'sandbox': False, | |
| 'enableRateLimit': True, | |
| 'timeout': 30000, | |
| 'verbose': False, | |
| }) | |
| print("✅ تم تهيئة اتصال KuCoin بنجاح") | |
| except Exception as e: | |
| print(f"❌ فشل تهيئة اتصال KuCoin: {e}") | |
| self.exchange = None | |
| self.http_client = None | |
| self.market_cache = {} | |
| self.last_market_load = None | |
| self.technical_analyzer = AdvancedTechnicalAnalyzer() | |
| self.monte_carlo_analyzer = MonteCarloAnalyzer() | |
| self.pattern_analyzer = ChartPatternAnalyzer() | |
| async def initialize(self): | |
| self.http_client = httpx.AsyncClient(timeout=30.0) | |
| await self._load_markets() | |
| print("✅ DataManager initialized - V7.3 (L1 Threshold @ 0.50)") | |
| async def _load_markets(self): | |
| try: | |
| if not self.exchange: | |
| return | |
| print("🔄 جلب أحدث بيانات الأسواق من KuCoin...") | |
| self.exchange.load_markets() | |
| self.market_cache = self.exchange.markets | |
| self.last_market_load = datetime.now() | |
| print(f"✅ تم تحميل {len(self.market_cache)} سوق من KuCoin") | |
| except Exception as e: | |
| print(f"❌ فشل تحميل بيانات الأسواق: {e}") | |
| async def close(self): | |
| if self.http_client and not self.http_client.is_closed: | |
| await self.http_client.aclose() | |
| print(" ✅ DataManager: http_client closed.") | |
| if self.exchange: | |
| try: | |
| await self.exchange.close() | |
| print(" ✅ DataManager: ccxt.kucoin exchange closed.") | |
| except Exception as e: | |
| print(f" ⚠️ DataManager: Error closing ccxt.kucoin: {e}") | |
| async def get_market_context_async(self): | |
| try: | |
| sentiment_data = await self.get_sentiment_safe_async() | |
| price_data = await self._get_prices_with_fallback() | |
| bitcoin_price = price_data.get('bitcoin') | |
| ethereum_price = price_data.get('ethereum') | |
| market_context = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'bitcoin_price_usd': bitcoin_price, | |
| 'ethereum_price_usd': ethereum_price, | |
| 'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None, | |
| 'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'NEUTRAL', | |
| 'market_trend': self._determine_market_trend(bitcoin_price, sentiment_data), | |
| 'btc_sentiment': self._get_btc_sentiment(bitcoin_price), | |
| 'data_quality': 'HIGH' if bitcoin_price and ethereum_price else 'LOW' | |
| } | |
| return market_context | |
| except Exception as e: | |
| return self._get_minimal_market_context() | |
| async def get_sentiment_safe_async(self): | |
| try: | |
| async with httpx.AsyncClient(timeout=10) as client: | |
| response = await client.get("https://api.alternative.me/fng/") | |
| response.raise_for_status() | |
| data = response.json() | |
| if 'data' not in data or not data['data']: | |
| raise ValueError("بيانات المشاعر غير متوفرة") | |
| latest_data = data['data'][0] | |
| return { | |
| "feargreed_value": int(latest_data['value']), | |
| "feargreed_class": latest_data['value_classification'], | |
| "source": "alternative.me", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| return None | |
| def _determine_market_trend(self, bitcoin_price, sentiment_data): | |
| if bitcoin_price is None: | |
| return "UNKNOWN" | |
| if bitcoin_price > 60000: score = 1 | |
| elif bitcoin_price < 55000: score = -1 | |
| else: score = 0 | |
| if sentiment_data and sentiment_data.get('feargreed_value') is not None: | |
| fear_greed = sentiment_data.get('feargreed_value') | |
| if fear_greed > 60: score += 1 | |
| elif fear_greed < 40: score -= 1 | |
| if score >= 1: return "bull_market" | |
| elif score <= -1: return "bear_market" | |
| else: return "sideways_market" | |
| def _get_btc_sentiment(self, bitcoin_price): | |
| if bitcoin_price is None: return 'UNKNOWN' | |
| elif bitcoin_price > 60000: return 'BULLISH' | |
| elif bitcoin_price < 55000: return 'BEARISH' | |
| else: return 'NEUTRAL' | |
| async def _get_prices_with_fallback(self): | |
| try: | |
| prices = await self._get_prices_from_kucoin_safe() | |
| if prices.get('bitcoin') and prices.get('ethereum'): | |
| return prices | |
| return await self._get_prices_from_coingecko() | |
| except Exception as e: | |
| return {'bitcoin': None, 'ethereum': None} | |
| async def _get_prices_from_kucoin_safe(self): | |
| if not self.exchange: return {'bitcoin': None, 'ethereum': None} | |
| try: | |
| prices = {'bitcoin': None, 'ethereum': None} | |
| btc_ticker = self.exchange.fetch_ticker('BTC/USDT') | |
| btc_price = float(btc_ticker.get('last', 0)) if btc_ticker.get('last') else None | |
| if btc_price and btc_price > 0: prices['bitcoin'] = btc_price | |
| eth_ticker = self.exchange.fetch_ticker('ETH/USDT') | |
| eth_price = float(eth_ticker.get('last', 0)) if eth_ticker.get('last') else None | |
| if eth_price and eth_price > 0: prices['ethereum'] = eth_price | |
| return prices | |
| except Exception as e: | |
| return {'bitcoin': None, 'ethereum': None} | |
| async def _get_prices_from_coingecko(self): | |
| try: | |
| await asyncio.sleep(0.5) | |
| url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd" | |
| headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json'} | |
| async with httpx.AsyncClient(headers=headers) as client: | |
| response = await client.get(url, timeout=10) | |
| if response.status_code == 429: | |
| await asyncio.sleep(2) | |
| response = await client.get(url, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| btc_price = data.get('bitcoin', {}).get('usd') | |
| eth_price = data.get('ethereum', {}).get('usd') | |
| if btc_price and eth_price: | |
| return {'bitcoin': btc_price, 'ethereum': eth_price} | |
| else: | |
| return {'bitcoin': None, 'ethereum': None} | |
| except Exception as e: | |
| return {'bitcoin': None, 'ethereum': None} | |
| def _get_minimal_market_context(self): | |
| return { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'data_available': False, | |
| 'market_trend': 'UNKNOWN', | |
| 'btc_sentiment': 'UNKNOWN', | |
| 'data_quality': 'LOW' | |
| } | |
| def _create_dataframe(self, candles: List) -> pd.DataFrame: | |
| """(V7.1) دالة مساعدة لإنشاء DataFrame لتحليل 1H""" | |
| try: | |
| if not candles: return pd.DataFrame() | |
| df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float) | |
| df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') | |
| df.set_index('timestamp', inplace=True) | |
| df.sort_index(inplace=True) | |
| return df | |
| except Exception as e: | |
| print(f"❌ خطأ في إنشاء DataFrame لمرشح 1H: {e}") | |
| return pd.DataFrame() | |
| def _calculate_1h_filter_score(self, analysis: Dict) -> float: | |
| """ | |
| (محدث V7.2) - "الكاشف المصغر" | |
| يحتوي الآن على "واقي العملات المستقرة" | |
| """ | |
| try: | |
| # (V7.2) واقي العملات المستقرة | |
| if 'ohlcv_1h' in analysis and '1h' in analysis['ohlcv_1h']: | |
| closes_1h = [c[4] for c in analysis['ohlcv_1h']['1h']] | |
| if len(closes_1h) > 20: | |
| std_dev = np.std(closes_1h[-20:]) | |
| if std_dev < 1e-5: | |
| print(f" - {analysis.get('symbol', 'N/A')}: تم الاستبعاد (عملة مستقرة)") | |
| return 0.0 | |
| # 1. درجة الأنماط (Pattern Score) | |
| pattern_confidence = analysis.get('pattern_analysis', {}).get('pattern_confidence', 0) | |
| # 2. درجة مونت كارلو (Monte Carlo Score) | |
| mc_distribution = analysis.get('monte_carlo_distribution') | |
| monte_carlo_score = 0 | |
| if mc_distribution and mc_distribution.get('error') is None: | |
| prob_gain = mc_distribution.get('probability_of_gain', 0) | |
| var_95_value = mc_distribution.get('risk_metrics', {}).get('VaR_95_value', 0) | |
| current_price = analysis.get('current_price', 1) | |
| if current_price > 0: | |
| normalized_var = var_95_value / current_price | |
| risk_penalty = 1.0 | |
| if normalized_var > 0.05: risk_penalty = 0.5 | |
| elif normalized_var > 0.03: risk_penalty = 0.8 | |
| normalized_prob_score = max(0.0, (prob_gain - 0.5) * 2) | |
| monte_carlo_score = normalized_prob_score * risk_penalty | |
| # 3. درجة المؤشرات (Indicator Score) | |
| indicator_score = 0 | |
| indicators = analysis.get('advanced_indicators', {}).get('1h', {}) | |
| if indicators: | |
| rsi = indicators.get('rsi', 50) | |
| macd_hist = indicators.get('macd_hist', 0) | |
| ema_9 = indicators.get('ema_9', 0) | |
| ema_21 = indicators.get('ema_21', 0) | |
| if rsi > 55 and macd_hist > 0 and ema_9 > ema_21: | |
| indicator_score = min(0.5 + (rsi - 55) / 50 + (macd_hist / (analysis.get('current_price', 1) * 0.001)), 1.0) | |
| elif rsi < 35: | |
| indicator_score = min(0.4 + (35 - rsi) / 35, 0.8) | |
| # 4. حساب النتيجة النهائية (بدون استراتيجيات أو حيتان) | |
| components = [] | |
| weights = [] | |
| if monte_carlo_score > 0: components.append(monte_carlo_score); weights.append(0.40) | |
| if pattern_confidence > 0: components.append(pattern_confidence); weights.append(0.30) | |
| if indicator_score > 0: components.append(indicator_score); weights.append(0.30) | |
| if not components: return 0 | |
| total_weight = sum(weights) | |
| if total_weight == 0: return 0 | |
| enhanced_score = sum(comp * weight for comp, weight in zip(components, weights)) / total_weight | |
| return min(max(enhanced_score, 0.0), 1.0) | |
| except Exception as e: | |
| print(f"❌ خطأ في حساب درجة فلتر 1H: {e}") | |
| return 0.0 | |
| async def layer1_rapid_screening(self) -> List[Dict[str, Any]]: | |
| """ | |
| الطبقة 1: فحص سريع - (محدث بالكامل V7.3) | |
| """ | |
| print("📊 الطبقة 1 (V7.3): بدء الغربلة (الكاشف المصغر 1H)...") | |
| # الخطوة 1: جلب أفضل 100 عملة حسب الحجم | |
| volume_data = await self._get_volume_data_optimal() | |
| if not volume_data: | |
| volume_data = await self._get_volume_data_direct_api() | |
| if not volume_data: | |
| print("❌ فشل جلب بيانات الأحجام للطبقة 1") | |
| return [] | |
| volume_data.sort(key=lambda x: x['dollar_volume'], reverse=True) | |
| top_100_by_volume = volume_data[:100] | |
| print(f"✅ تم تحديد أفضل {len(top_100_by_volume)} عملة. بدء تشغيل الكاشف المصغر (1H)...") | |
| final_candidates = [] | |
| batch_size = 20 | |
| for i in range(0, len(top_100_by_volume), batch_size): | |
| batch_symbols_data = top_100_by_volume[i:i + batch_size] | |
| batch_symbols = [s['symbol'] for s in batch_symbols_data] | |
| print(f" 🔄 معالجة دفعة {int(i/batch_size) + 1}/{(len(top_100_by_volume) + batch_size - 1) // batch_size} ({len(batch_symbols)} عملة)...") | |
| # الخطوة 2: جلب بيانات 1H بالتوازي | |
| tasks = [self._fetch_1h_ohlcv_for_screening(symbol) for symbol in batch_symbols] | |
| results_candles = await asyncio.gather(*tasks, return_exceptions=True) | |
| analysis_tasks = [] | |
| valid_symbol_data = [] | |
| for j, (candles) in enumerate(results_candles): | |
| symbol_data = batch_symbols_data[j] | |
| symbol = symbol_data['symbol'] | |
| if isinstance(candles, Exception) or not candles or len(candles) < 50: | |
| continue | |
| ohlcv_1h_only = {'1h': candles} | |
| symbol_data['ohlcv_1h'] = ohlcv_1h_only | |
| symbol_data['current_price'] = candles[-1][4] | |
| analysis_tasks.append(self._run_mini_detector(symbol_data)) | |
| valid_symbol_data.append(symbol_data) | |
| if not analysis_tasks: | |
| continue | |
| analysis_results = await asyncio.gather(*analysis_tasks, return_exceptions=True) | |
| for j, (analysis_output) in enumerate(analysis_results): | |
| symbol_data = valid_symbol_data[j] | |
| symbol = symbol_data['symbol'] | |
| if isinstance(analysis_output, Exception): | |
| print(f" - {symbol}: فشل الكاشف المصغر ({analysis_output})") | |
| continue | |
| analysis_output['ohlcv_1h'] = symbol_data['ohlcv_1h'] | |
| analysis_output['symbol'] = symbol | |
| filter_score = self._calculate_1h_filter_score(analysis_output) | |
| # 🔴 --- START OF CHANGE (V7.3) --- 🔴 | |
| # (رفع العتبة من 0.20 إلى 0.50) | |
| if filter_score >= 0.50: | |
| # 🔴 --- END OF CHANGE --- 🔴 | |
| print(f" ✅ {symbol}: نجح (الدرجة: {filter_score:.2f})") | |
| symbol_data['layer1_score'] = filter_score | |
| symbol_data['reasons_for_candidacy'] = [f'1H_DETECTOR_PASS'] | |
| if 'ohlcv_1h' in symbol_data: del symbol_data['ohlcv_1h'] | |
| final_candidates.append(symbol_data) | |
| print(f"🎯 اكتملت الغربلة (V7.3). تم تأهيل {len(final_candidates)} عملة من أصل 100 للطبقة 2.") | |
| print("🏆 المرشحون الناجحون:") | |
| for k, candidate in enumerate(final_candidates[:15]): | |
| score = candidate.get('layer1_score', 0) | |
| volume = candidate.get('dollar_volume', 0) | |
| print(f" {k+1:2d}. {candidate['symbol']}: (الدرجة: {score:.2f}) | ${volume:,.0f}") | |
| return final_candidates | |
| async def _run_mini_detector(self, symbol_data: Dict) -> Dict: | |
| """(V7.1) يشغل المحللات الأساسية بالتوازي على بيانات 1H فقط""" | |
| ohlcv_1h = symbol_data.get('ohlcv_1h') | |
| current_price = symbol_data.get('current_price') | |
| df = self._create_dataframe(ohlcv_1h.get('1h')) | |
| if df.empty: | |
| raise ValueError("DataFrame فارغ لتحليل 1H") | |
| analysis_dict = {'current_price': current_price} | |
| task_indicators = self.technical_analyzer.calculate_all_indicators(df, '1h') | |
| task_mc = self.monte_carlo_analyzer.generate_1h_price_distribution(ohlcv_1h) | |
| task_pattern = self.pattern_analyzer.detect_chart_patterns(ohlcv_1h) | |
| results = await asyncio.gather(task_mc, task_pattern, return_exceptions=True) | |
| analysis_dict['advanced_indicators'] = {'1h': task_indicators} | |
| if not isinstance(results[0], Exception): | |
| analysis_dict['monte_carlo_distribution'] = results[0] | |
| if not isinstance(results[1], Exception): | |
| analysis_dict['pattern_analysis'] = results[1] | |
| return analysis_dict | |
| async def _fetch_1h_ohlcv_for_screening(self, symbol: str) -> List: | |
| """(V7.1) جلب 100 شمعة لإطار الساعة (1H) للغربلة السريعة""" | |
| try: | |
| ohlcv_data = self.exchange.fetch_ohlcv(symbol, '1h', limit=100) | |
| if not ohlcv_data or len(ohlcv_data) < 50: | |
| return None | |
| return ohlcv_data | |
| except Exception: | |
| return None | |
| async def _get_volume_data_optimal(self) -> List[Dict[str, Any]]: | |
| try: | |
| if not self.exchange: return [] | |
| tickers = self.exchange.fetch_tickers() | |
| volume_data = [] | |
| for symbol, ticker in tickers.items(): | |
| if not symbol.endswith('/USDT') or not ticker.get('active', True): continue | |
| current_price = ticker.get('last', 0) | |
| quote_volume = ticker.get('quoteVolume', 0) | |
| if current_price is None or current_price <= 0: continue | |
| if quote_volume is not None and quote_volume > 0: | |
| dollar_volume = quote_volume | |
| else: | |
| base_volume = ticker.get('baseVolume', 0) | |
| if base_volume is None: continue | |
| dollar_volume = base_volume * current_price | |
| if dollar_volume is None or dollar_volume < 50000: continue | |
| price_change_24h = ticker.get('percentage', 0) or 0 | |
| if price_change_24h is None: price_change_24h = 0 | |
| volume_data.append({ | |
| 'symbol': symbol, 'dollar_volume': dollar_volume, | |
| 'current_price': current_price, 'volume_24h': ticker.get('baseVolume', 0) or 0, | |
| 'price_change_24h': price_change_24h | |
| }) | |
| print(f"✅ تم معالجة {len(volume_data)} عملة في الطريقة المثلى (لجلب الحجم)") | |
| return volume_data | |
| except Exception as e: | |
| print(f"❌ خطأ في جلب بيانات الحجم المثلى: {e}") | |
| return [] | |
| async def _get_volume_data_direct_api(self) -> List[Dict[str, Any]]: | |
| try: | |
| url = "https://api.kucoin.com/api/v1/market/allTickers" | |
| async with httpx.AsyncClient(timeout=15) as client: | |
| response = await client.get(url) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get('code') != '200000': raise ValueError(f"استجابة API غير متوقعة: {data.get('code')}") | |
| tickers = data['data']['ticker'] | |
| volume_data = [] | |
| for ticker in tickers: | |
| symbol = ticker['symbol'] | |
| if not symbol.endswith('USDT'): continue | |
| formatted_symbol = symbol.replace('-', '/') | |
| try: | |
| vol_value = ticker.get('volValue') | |
| last_price = ticker.get('last') | |
| change_rate = ticker.get('changeRate') | |
| vol = ticker.get('vol') | |
| if vol_value is None or last_price is None or change_rate is None or vol is None: continue | |
| dollar_volume = float(vol_value) if vol_value else 0 | |
| current_price = float(last_price) if last_price else 0 | |
| price_change = (float(change_rate) * 100) if change_rate else 0 | |
| volume_24h = float(vol) if vol else 0 | |
| if dollar_volume >= 50000 and current_price > 0: | |
| volume_data.append({ | |
| 'symbol': formatted_symbol, 'dollar_volume': dollar_volume, | |
| 'current_price': current_price, 'volume_24h': volume_24h, | |
| 'price_change_24h': price_change | |
| }) | |
| except (ValueError, TypeError, KeyError) as e: continue | |
| print(f"✅ تم معالجة {len(volume_data)} عملة في الطريقة المباشرة (لجلب الحجم)") | |
| return volume_data | |
| except Exception as e: | |
| print(f"❌ خطأ في جلب بيانات الحجم المباشر: {e}") | |
| return [] | |
| async def stream_ohlcv_data(self, symbols: List[Dict[str, Any]], queue: asyncio.Queue): | |
| """ | |
| (محدث V7.2) | |
| جلب بيانات OHLCV كاملة (6 أطر زمنية) للعملات الناجحة فقط | |
| """ | |
| print(f"📊 بدء تدفق بيانات OHLCV (الكاملة) لـ {len(symbols)} عملة (الناجحين من الغربلة)...") | |
| batch_size = 15 | |
| batches = [symbols[i:i + batch_size] for i in range(0, len(symbols), batch_size)] | |
| total_successful = 0 | |
| for batch_num, batch in enumerate(batches): | |
| print(f" 🔄 [المنتج] جلب الدفعة {batch_num + 1}/{len(batches)} ({len(batch)} عملة)...") | |
| batch_tasks = [] | |
| # (V7.2 FIX) | |
| for symbol_data in batch: | |
| symbol_str = symbol_data['symbol'] | |
| task = asyncio.create_task(self._fetch_complete_ohlcv_parallel(symbol_str)) | |
| batch_tasks.append(task) | |
| batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) | |
| successful_data_for_batch = [] | |
| successful_count = 0 | |
| for i, result in enumerate(batch_results): | |
| original_symbol_data = batch[i] | |
| symbol_str = original_symbol_data['symbol'] | |
| if isinstance(result, Exception): | |
| print(f" ❌ [المنتج] فشل جلب {symbol_str}: {result}") | |
| elif result is not None: | |
| result.update(original_symbol_data) | |
| successful_data_for_batch.append(result) | |
| successful_count += 1 | |
| timeframes_count = result.get('successful_timeframes', 0) | |
| print(f" ✅ [المنتج] {symbol_str}: {timeframes_count}/6 أطر زمنية") | |
| else: | |
| print(f" ⚠️ [المنتج] {symbol_str}: بيانات غير كافية، تم التجاهل") | |
| print(f" 📦 [المنتج] اكتملت الدفعة {batch_num + 1}: {successful_count}/{len(batch)} ناجحة") | |
| if successful_data_for_batch: | |
| try: | |
| await queue.put(successful_data_for_batch) | |
| print(f" 📬 [المنتج] تم إرسال {len(successful_data_for_batch)} عملة إلى طابور المعالجة") | |
| total_successful += len(successful_data_for_batch) | |
| except Exception as q_err: | |
| print(f" ❌ [المنتج] فشل إرسال الدفعة للطابور: {q_err}") | |
| if batch_num < len(batches) - 1: | |
| await asyncio.sleep(1) | |
| print(f"✅ [المنتج] اكتمل تدفق بيانات OHLCV (الكاملة). تم إرسال {total_successful} عملة للمعالجة.") | |
| try: | |
| await queue.put(None) | |
| print(" 📬 [المنتج] تم إرسال إشارة الإنهاء (None) إلى الطابور.") | |
| except Exception as q_err: | |
| print(f" ❌ [المنتج] فشل إرسال إشارة الإنهاء (None) للطابور: {q_err}") | |
| async def _fetch_complete_ohlcv_parallel(self, symbol: str) -> Dict[str, Any]: | |
| """(V7.2) جلب بيانات OHLCV كاملة - يتوقع 'symbol' كنص""" | |
| try: | |
| ohlcv_data = {} | |
| timeframes = [ | |
| ('5m', 200), ('15m', 200), ('1h', 200), | |
| ('4h', 200), ('1d', 200), ('1w', 200), | |
| ] | |
| timeframe_tasks = [] | |
| for timeframe, limit in timeframes: | |
| task = asyncio.create_task(self._fetch_single_timeframe_improved(symbol, timeframe, limit)) | |
| timeframe_tasks.append(task) | |
| timeframe_results = await asyncio.gather(*timeframe_tasks, return_exceptions=True) | |
| successful_timeframes = 0 | |
| min_required_timeframes = 2 | |
| for i, (timeframe, limit) in enumerate(timeframes): | |
| result = timeframe_results[i] | |
| if isinstance(result, Exception): continue | |
| if result and len(result) >= 10: | |
| ohlcv_data[timeframe] = result | |
| successful_timeframes += 1 | |
| if successful_timeframes >= min_required_timeframes and ohlcv_data: | |
| try: | |
| current_price = await self.get_latest_price_async(symbol) | |
| if current_price is None: | |
| for timeframe_data in ohlcv_data.values(): | |
| if timeframe_data and len(timeframe_data) > 0: | |
| last_candle = timeframe_data[-1] | |
| if len(last_candle) >= 5: | |
| current_price = last_candle[4]; break | |
| if current_price is None: return None | |
| result_data = { | |
| 'symbol': symbol, 'ohlcv': ohlcv_data, 'raw_ohlcv': ohlcv_data, | |
| 'current_price': current_price, 'timestamp': datetime.now().isoformat(), | |
| 'candles_count': {tf: len(data) for tf, data in ohlcv_data.items()}, | |
| 'successful_timeframes': successful_timeframes | |
| } | |
| return result_data | |
| except Exception as price_error: return None | |
| else: return None | |
| except Exception as e: return None | |
| async def _fetch_single_timeframe_improved(self, symbol: str, timeframe: str, limit: int): | |
| """(V7.2) جلب بيانات إطار زمني واحد - يتوقع 'symbol' كنص""" | |
| max_retries = 3 | |
| retry_delay = 2 | |
| for attempt in range(max_retries): | |
| try: | |
| ohlcv_data = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) | |
| if ohlcv_data and len(ohlcv_data) > 0: | |
| return ohlcv_data | |
| else: | |
| return [] | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| await asyncio.sleep(retry_delay * (attempt + 1)) | |
| else: | |
| return [] | |
| async def get_latest_price_async(self, symbol): | |
| """(V7.2) جلب السعر الحالي - يتوقع 'symbol' كنص""" | |
| try: | |
| if not self.exchange: return None | |
| if not symbol or '/' not in symbol: return None | |
| ticker = self.exchange.fetch_ticker(symbol) | |
| if not ticker: return None | |
| current_price = ticker.get('last') | |
| if current_price is None: return None | |
| return float(current_price) | |
| except Exception as e: return None | |
| async def get_whale_data_for_symbol(self, symbol): | |
| try: | |
| if self.whale_monitor: | |
| whale_data = await self.whale_monitor.get_symbol_whale_activity(symbol) | |
| return whale_data | |
| else: return None | |
| except Exception as e: return None | |
| async def get_whale_trading_signal(self, symbol, whale_data, market_context): | |
| try: | |
| if self.whale_monitor: | |
| return await self.whale_monitor.generate_whale_trading_signal(symbol, whale_data, market_context) | |
| else: | |
| return {'action': 'HOLD', 'confidence': 0.3, 'reason': 'Whale monitor not available', 'source': 'whale_analysis'} | |
| except Exception as e: | |
| return {'action': 'HOLD', 'confidence': 0.3, 'reason': f'Error: {str(e)}', 'source': 'whale_analysis'} | |
| print("✅ DataManager loaded - V7.3 (L1 Threshold @ 0.50)") |