File size: 26,275 Bytes
bd99dc8
00bb5c9
 
 
 
 
 
79a9e95
d69dead
28fa18b
394e2c7
164b380
 
 
 
 
4ace337
164b380
11b4dc5
4ace337
11b4dc5
 
 
4ace337
 
53cf6c0
28fa18b
 
53cf6c0
 
6b00681
53cf6c0
d2775f3
4ace337
c6f72fe
 
 
4ace337
 
c6f72fe
79a9e95
c6f72fe
dceb75a
c6f72fe
 
87e3669
56e3f87
 
11b4dc5
4ace337
11b4dc5
 
4ace337
 
 
d69dead
53cf6c0
248e033
56e3f87
6b00681
 
 
 
4ace337
6b00681
 
 
4ace337
bd99dc8
 
4ace337
bd99dc8
4ace337
 
 
 
bd99dc8
4ace337
bd99dc8
4ace337
bd99dc8
4ace337
6b00681
bd99dc8
24a0949
56e3f87
 
4ace337
dceb75a
7985470
56e3f87
 
dceb75a
4ace337
56e3f87
53cf6c0
4ace337
b866e29
4ace337
 
 
24a0949
d69dead
248e033
 
4ace337
 
24a0949
4ace337
24a0949
 
 
 
 
 
4ace337
d69dead
24a0949
d69dead
248e033
24a0949
4ace337
 
24a0949
4ace337
 
24a0949
4ace337
11b4dc5
 
 
248e033
 
11b4dc5
 
 
 
 
ea38153
11b4dc5
 
 
 
ea38153
 
c6f72fe
4ace337
248e033
4ace337
c6f72fe
11b4dc5
ea38153
c6f72fe
4ace337
11b4dc5
4ace337
11b4dc5
c6f72fe
4ace337
ea38153
 
248e033
ea38153
11b4dc5
eb48a52
248e033
4ace337
 
 
 
 
 
87e3669
4ace337
53cf6c0
11b4dc5
4ace337
11b4dc5
 
 
 
 
 
 
 
 
4ace337
11b4dc5
 
24a0949
 
bd99dc8
 
24a0949
bd99dc8
79a9e95
4ace337
bd99dc8
4ace337
 
164b380
11b4dc5
4ace337
79a9e95
bd99dc8
79a9e95
 
 
164b380
bd99dc8
79a9e95
4ace337
 
 
164b380
4ace337
 
 
24a0949
4ace337
 
 
 
 
 
 
 
bd99dc8
4ace337
11b4dc5
bd99dc8
11b4dc5
bd99dc8
4ace337
 
 
 
 
 
bd99dc8
4ace337
 
 
 
 
11b4dc5
4ace337
bd99dc8
4ace337
 
 
bd99dc8
4ace337
 
4c1a71a
bd99dc8
 
808315b
 
 
 
bd99dc8
808315b
bd99dc8
808315b
 
bd99dc8
4ace337
 
4c1a71a
bd99dc8
 
4ace337
bd99dc8
4ace337
bd99dc8
4ace337
 
f375b09
bd99dc8
f375b09
4ace337
 
 
 
 
4c1a71a
bd99dc8
164b380
4ace337
164b380
4ace337
164b380
4ace337
 
11b4dc5
4ace337
164b380
b44825a
 
11b4dc5
6690430
 
b44825a
11b4dc5
6690430
 
11b4dc5
6690430
 
 
b44825a
 
11b4dc5
b44825a
6690430
11b4dc5
6690430
 
 
 
 
 
 
 
 
 
4ace337
b44825a
6690430
 
 
 
b44825a
 
 
 
6690430
 
 
11b4dc5
6690430
 
 
b44825a
 
11b4dc5
b44825a
 
6690430
 
 
 
 
11b4dc5
6690430
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ace337
b44825a
6690430
 
 
 
3fd2d9a
4ace337
6690430
 
 
 
dc2c23a
113d926
6690430
 
 
 
 
 
dc2c23a
6690430
113d926
dc2c23a
6690430
 
 
 
 
dc2c23a
6690430
 
 
 
 
 
113d926
6690430
 
 
 
 
 
 
 
 
 
 
 
6598c39
dc2c23a
7f28923
6690430
 
 
 
 
 
 
 
 
 
 
dc2c23a
 
11b4dc5
6690430
 
 
 
 
afa0eeb
7aab55a
231c23d
6690430
4ace337
 
11b4dc5
6690430
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
08e895c
6690430
 
08e895c
 
7985470
6690430
 
 
 
4ace337
6690430
 
 
 
 
6598c39
 
11b4dc5
7985470
4ace337
6690430
 
 
20a2029
6690430
 
 
 
 
20a2029
6690430
 
 
 
20a2029
bd99dc8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
# data_manager.py (Updated to V9.8 - Sniper Threshold 60%)
import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt
import numpy as np
import logging
from typing import List, Dict, Any
import pandas as pd

try:
    import pandas_ta as ta
except ImportError:
    print("⚠️ مكتبة pandas_ta غير موجودة. النظام سيفشل.")
    ta = None

# (V9.1) استيراد العقل الحسابي (الذي يحتوي على V9.1 Features)
from ml_engine.indicators import AdvancedTechnicalAnalyzer
from ml_engine.monte_carlo import MonteCarloAnalyzer
from ml_engine.patterns import ChartPatternAnalyzer
# (V9.1) استيراد "العقل الذكي" الجديد
from ml_engine.ranker import Layer1Ranker 

logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)

class DataManager:
    def __init__(self, contracts_db, whale_monitor, r2_service=None):
        self.contracts_db = contracts_db or {}
        self.whale_monitor = whale_monitor
        self.r2_service = r2_service
        
        try:
            self.exchange = ccxt.kucoin({
                'sandbox': False, 'enableRateLimit': True,
                'timeout': 30000, 'verbose': False,
            })
            print("✅ تم تهيئة اتصال KuCoin بنجاح")
        except Exception as e:
            print(f"❌ فشل تهيئة اتصال KuCoin: {e}")
            self.exchange = None
            
        self.http_client = None
        self.market_cache = {}
        self.last_market_load = None

        # (V9.1) استيراد العقل الحسابي (الذي يحتوي على V9.1 Features)
        self.technical_analyzer = AdvancedTechnicalAnalyzer()
        self.monte_carlo_analyzer = MonteCarloAnalyzer()
        self.pattern_analyzer = None
        # (V9.1) تهيئة "العقل الذكي" (النموذج)
        self.layer1_ranker = None 
        
    async def initialize(self):
        self.http_client = httpx.AsyncClient(timeout=30.0)
        await self._load_markets()
        
        print("   > [DataManager] تهيئة محرك الأنماط V8 (ML-Based)...")
        try:
            self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
            await self.pattern_analyzer.initialize()
        except Exception as e:
            print(f"❌ [DataManager] فشل تهيئة محرك الأنماط V8: {e}")
            self.pattern_analyzer = ChartPatternAnalyzer(r2_service=None)
            
        # (تهيئة "العقل الذكي" - نموذج الرانكر V9.8)
        print("   > [DataManager] تهيئة الكاشف المصغر (Layer1 Ranker V9.8)...")
        try:
            # (تأكد من أن النموذج V9.8 موجود ومسمى بهذا الاسم)
            model_file_path = "ml_models/layer1_ranker.lgbm"
            self.layer1_ranker = Layer1Ranker(model_path=model_file_path)
            await self.layer1_ranker.initialize()
            if self.layer1_ranker.model is None:
                print("   ⚠️ [DataManager V9.8] الرانكر في وضع 'وهمي' (Placeholder).")
            else:
                 print(f"   ✅ [DataManager V9.8] الرانكر {self.layer1_ranker.model_name} جاهز للعمل.")
        except Exception as e:
            print(f"❌ [DataManager V9.8] فشل تهيئة الرانكر V9.8: {e}")
            self.layer1_ranker = None 

        print("✅ DataManager initialized - V9.8 (Sniper Threshold 60%)")

    async def _load_markets(self):
        try:
            if not self.exchange: return
            print("🔄 جلب أحدث بيانات الأسواق من KuCoin...")
            self.exchange.load_markets()
            self.market_cache = self.exchange.markets
            self.last_market_load = datetime.now()
            print(f"✅ تم تحميل {len(self.market_cache)} سوق من KuCoin")
        except Exception as e: print(f"❌ فشل تحميل بيانات الأسواق: {e}")

    async def close(self):
        if self.http_client and not self.http_client.is_closed: await self.http_client.aclose()
        if self.exchange:
            try: await self.exchange.close()
            except Exception: pass

    async def get_market_context_async(self):
        try:
            sentiment_data = await self.get_sentiment_safe_async()
            price_data = await self._get_prices_with_fallback()
            bitcoin_price = price_data.get('bitcoin'); ethereum_price = price_data.get('ethereum')
            return {
                'timestamp': datetime.now().isoformat(),
                'bitcoin_price_usd': bitcoin_price, 'ethereum_price_usd': ethereum_price,
                'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None,
                'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'NEUTRAL',
                'market_trend': self._determine_market_trend(bitcoin_price, sentiment_data),
                'btc_sentiment': self._get_btc_sentiment(bitcoin_price),
                'data_quality': 'HIGH' if bitcoin_price and ethereum_price else 'LOW'
            }
        except Exception as e: return self._get_minimal_market_context()

    async def get_sentiment_safe_async(self):
        try:
            async with httpx.AsyncClient(timeout=10) as client:
                response = await client.get("https://api.alternative.me/fng/")
                response.raise_for_status(); data = response.json()
                if 'data' not in data or not data['data']: raise ValueError("بيانات المشاعر غير متوفرة")
                latest_data = data['data'][0]
                return { "feargreed_value": int(latest_data['value']), "feargreed_class": latest_data['value_classification'], "source": "alternative.me", "timestamp": datetime.now().isoformat() }
        except Exception as e: return None
    def _determine_market_trend(self, bitcoin_price, sentiment_data):
        if bitcoin_price is None: return "UNKNOWN"
        if bitcoin_price > 60000: score = 1
        elif bitcoin_price < 55000: score = -1
        else: score = 0
        if sentiment_data and sentiment_data.get('feargreed_value') is not None:
            fear_greed = sentiment_data.get('feargreed_value')
            if fear_greed > 60: score += 1
            elif fear_greed < 40: score -= 1
        if score >= 1: return "bull_market"
        elif score <= -1: return "bear_market"
        else: return "sideways_market"
    def _get_btc_sentiment(self, bitcoin_price):
        if bitcoin_price is None: return 'UNKNOWN'
        elif bitcoin_price > 60000: return 'BULLISH'
        elif bitcoin_price < 55000: return 'BEARISH'
        else: return 'NEUTRAL'
    async def _get_prices_with_fallback(self):
        try:
            prices = await self._get_prices_from_kucoin_safe()
            if prices.get('bitcoin') and prices.get('ethereum'): return prices
            return await self._get_prices_from_coingecko()
        except Exception as e: return {'bitcoin': None, 'ethereum': None}
    async def _get_prices_from_kucoin_safe(self):
        if not self.exchange: return {'bitcoin': None, 'ethereum': None}
        try:
            prices = {'bitcoin': None, 'ethereum': None}
            btc_ticker = self.exchange.fetch_ticker('BTC/USDT'); btc_price = float(btc_ticker.get('last', 0)) if btc_ticker.get('last') else None
            if btc_price and btc_price > 0: prices['bitcoin'] = btc_price
            eth_ticker = self.exchange.fetch_ticker('ETH/USDT'); eth_price = float(eth_ticker.get('last', 0)) if eth_ticker.get('last') else None
            if eth_price and eth_price > 0: prices['ethereum'] = eth_price
            return prices
        except Exception as e: return {'bitcoin': None, 'ethereum': None}
    async def _get_prices_from_coingecko(self):
        try:
            await asyncio.sleep(0.5)
            url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd"
            headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json'}
            async with httpx.AsyncClient(headers=headers) as client:
                response = await client.get(url, timeout=10)
                if response.status_code == 429: await asyncio.sleep(2); response = await client.get(url, timeout=10)
                response.raise_for_status(); data = response.json()
                btc_price = data.get('bitcoin', {}).get('usd'); eth_price = data.get('ethereum', {}).get('usd')
                if btc_price and eth_price: return {'bitcoin': btc_price, 'ethereum': eth_price}
                else: return {'bitcoin': None, 'ethereum': None}
        except Exception as e: return {'bitcoin': None, 'ethereum': None}
    def _get_minimal_market_context(self):
        return { 'timestamp': datetime.now().isoformat(), 'data_available': False, 'market_trend': 'UNKNOWN', 'btc_sentiment': 'UNKNOWN', 'data_quality': 'LOW' }

    def _create_dataframe(self, candles: List) -> pd.DataFrame:
        """(V9.1) إنشاء DataFrame (تحتاج 200 شمعة للميزات)"""
        try:
            if not candles: return pd.DataFrame()
            df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
            df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
            df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
            df.set_index('timestamp', inplace=True)
            df.sort_index(inplace=True)
            return df
        except Exception as e:
            print(f"❌ خطأ في إنشاء DataFrame: {e}")
            return pd.DataFrame()

    async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
        """
        الطبقة 1: فحص سريع - (محدث V9.8)
        - استخدام عتبة القناص 60% (0.60).
        """
        print("📊 الطبقة 1 (V9.8 - Sniper Threshold 60%): بدء الغربلة...")
        
        if not self.layer1_ranker:
            print("❌ [V9.8] الرانكر غير مهيأ. إيقاف الغربلة.")
            return []

        # الخطوة 1: جلب أفضل 100 عملة حسب الحجم
        volume_data = await self._get_volume_data_optimal()
        if not volume_data: volume_data = await self._get_volume_data_direct_api()
        if not volume_data:
            print("❌ [V9.8] فشل جلب بيانات الأحجام.")
            return []
        
        volume_data.sort(key=lambda x: x['dollar_volume'], reverse=True)
        top_100_by_volume = volume_data[:100]
        print(f"✅ [V9.8] تم تحديد أفضل {len(top_100_by_volume)} عملة. بدء حساب الميزات الذكية...")
        
        final_candidates_with_scores = []
        batch_symbols_data = top_100_by_volume
        batch_symbols = [s['symbol'] for s in batch_symbols_data]
            
        # الخطوة 2: جلب بيانات 1H (200 شمعة)
        tasks = [self._fetch_1h_ohlcv_for_screening(symbol, limit=200) for symbol in batch_symbols]
        results_candles = await asyncio.gather(*tasks, return_exceptions=True)
        
        valid_symbol_data_for_ranking = []
        for j, (candles) in enumerate(results_candles):
            symbol_data = batch_symbols_data[j]
            if isinstance(candles, Exception) or not candles or len(candles) < 200: continue
            symbol_data['ohlcv_1h_raw'] = candles 
            valid_symbol_data_for_ranking.append(symbol_data)

        if not valid_symbol_data_for_ranking:
            print("❌ [V9.8] لا توجد عملات صالحة (تحتاج 200 شمعة 1H).")
            return []

        print(f"   🔄 [V9.8] حساب الميزات الذكية لـ {len(valid_symbol_data_for_ranking)} عملة...")
        
        # الخطوة 3: حساب "الميزات الذكية V9.8"
        all_features_list = []
        symbols_in_order = []
        for symbol_data in valid_symbol_data_for_ranking:
            try:
                df = self._create_dataframe(symbol_data['ohlcv_1h_raw'])
                if df.empty: continue
                # (استدعاء العقل الحسابي V9.8)
                smart_features = self.technical_analyzer.calculate_v9_smart_features(df)
                if smart_features:
                    all_features_list.append(smart_features)
                    symbols_in_order.append(symbol_data)
            except Exception: pass
        
        if not all_features_list:
            print("❌ [V9.8] فشل حساب الميزات الذكية.")
            return []

        # الخطوة 4: التنبؤ (التصنيف)
        print(f"   🧠 [V9.8] إرسال {len(all_features_list)} عملة إلى نموذج الرانكر...")
        features_dataframe = pd.DataFrame(all_features_list)
        probabilities = self.layer1_ranker.predict_proba(features_dataframe)
        
        # (الطباعة التشخيصية V9.5 - لا تزال مفيدة)
        print(f"   🔍 [V9.8 DEBUG] تم استلام {len(probabilities)} نتيجة من الرانكر.")
        debug_scores = []
        for i, (symbol_data) in enumerate(symbols_in_order):
            debug_scores.append((symbol_data['symbol'], probabilities[i]))
        debug_scores.sort(key=lambda x: x[1], reverse=True)
        print("   --- 📋 [V9.8 DEBUG] أعلى 10 درجات خام (قبل الفلترة) ---")
        for i, (symbol, score) in enumerate(debug_scores[:10]):
            print(f"      {i+1}. {symbol}: {score:.4f}") 
        print("   -------------------------------------------------")

        # الخطوة 5: تجميع النتائج (مع عتبة القناص 60%)
        for i, (symbol_data) in enumerate(symbols_in_order):
            score = probabilities[i]
            
            # 🔴 (V9.8: عتبة القناص 60% - بناءً على نتائجنا) 🔴
            if score >= 0.60: 
                symbol = symbol_data['symbol']
                print(f"      ✅ {symbol}: نجح (الاحتمالية: {score:.3f})")
                symbol_data['layer1_score'] = float(score)
                symbol_data['reasons_for_candidacy'] = ["V9_SMART_RANKER_SNIPER_60"]
                if 'ohlcv_1h_raw' in symbol_data: del symbol_data['ohlcv_1h_raw'] 
                final_candidates_with_scores.append(symbol_data)

        print(f"🎯 اكتملت الغربلة (V9.8). تم تأهيل {len(final_candidates_with_scores)} عملة (ثقة >= 60%).")
        
        if final_candidates_with_scores:
            final_candidates_with_scores.sort(key=lambda x: x['layer1_score'], reverse=True)
            print("🏆 المرشحون الناجحون (Top Candidates):")
            for k, candidate in enumerate(final_candidates_with_scores[:5]):
                print(f"   {k+1}. {candidate['symbol']}: (Score: {candidate.get('layer1_score'):.3f})")
        else:
            print("⚠️ [V9.8] لم تنجح أي عملة في تجاوز عتبة الثقة 60%. (هذا جيد، النظام حذر)")

        return final_candidates_with_scores[:20]

    async def _fetch_1h_ohlcv_for_screening(self, symbol: str, limit: int = 200) -> List:
        try:
            ohlcv_data = self.exchange.fetch_ohlcv(symbol, '1h', limit=limit)
            if not ohlcv_data or len(ohlcv_data) < limit: return None
            return ohlcv_data
        except Exception: return None

    async def _get_volume_data_optimal(self) -> List[Dict[str, Any]]:
        try:
            if not self.exchange: return []
            tickers = self.exchange.fetch_tickers()
            volume_data = []
            for symbol, ticker in tickers.items():
                if not symbol.endswith('/USDT') or not ticker.get('active', True): continue
                current_price = ticker.get('last', 0)
                quote_volume = ticker.get('quoteVolume', 0)
                if current_price is None or current_price <= 0: continue
                
                if quote_volume is not None and quote_volume > 0:
                    dollar_volume = quote_volume
                else:
                    base_volume = ticker.get('baseVolume', 0)
                    if base_volume is None: continue
                    dollar_volume = base_volume * current_price
                    
                if dollar_volume is None or dollar_volume < 50000: continue
                
                price_change_24h = ticker.get('percentage', 0) or 0

                volume_data.append({ 
                    'symbol': symbol, 
                    'dollar_volume': dollar_volume, 
                    'current_price': current_price, 
                    'volume_24h': ticker.get('baseVolume', 0) or 0, 
                    'price_change_24h': price_change_24h 
                })
            print(f"✅ تم معالجة {len(volume_data)} عملة في الطريقة المثلى")
            return volume_data
        except Exception as e: 
            print(f"❌ خطأ في جلب بيانات الحجم المثلى: {e}")
            return []
            
    async def _get_volume_data_direct_api(self) -> List[Dict[str, Any]]:
        try:
            url = "https://api.kucoin.com/api/v1/market/allTickers"
            async with httpx.AsyncClient(timeout=15) as client:
                response = await client.get(url)
                response.raise_for_status()
                data = response.json()
                if data.get('code') != '200000': raise ValueError(f"استجابة API غير متوقعة: {data.get('code')}")
                
                tickers = data['data']['ticker']
                volume_data = []
                for ticker in tickers:
                    symbol = ticker['symbol']
                    if not symbol.endswith('USDT'): continue
                    formatted_symbol = symbol.replace('-', '/')
                    try:
                        vol_value = ticker.get('volValue')
                        last_price = ticker.get('last')
                        change_rate = ticker.get('changeRate')
                        vol = ticker.get('vol')
                        
                        if vol_value is None or last_price is None or change_rate is None or vol is None: continue
                        
                        dollar_volume = float(vol_value) if vol_value else 0
                        current_price = float(last_price) if last_price else 0
                        price_change = (float(change_rate) * 100) if change_rate else 0
                        volume_24h = float(vol) if vol else 0
                        
                        if dollar_volume >= 50000 and current_price > 0:
                            volume_data.append({ 
                                'symbol': formatted_symbol, 
                                'dollar_volume': dollar_volume, 
                                'current_price': current_price, 
                                'volume_24h': volume_24h, 
                                'price_change_24h': price_change 
                            })
                    except (ValueError, TypeError, KeyError):
                        continue
                        
                print(f"✅ تم معالجة {len(volume_data)} عملة في الطريقة المباشرة")
                return volume_data
        except Exception as e:
            print(f"❌ خطأ في جلب بيانات الحجم المباشر: {e}")
            return []
            
    async def stream_ohlcv_data(self, symbols: List[Dict[str, Any]], queue: asyncio.Queue):
        print(f"📊 بدء تدفق بيانات OHLCV (الكاملة) لـ {len(symbols)} عملة (مصنفة)...")
        batch_size = 15
        batches = [symbols[i:i + batch_size] for i in range(0, len(symbols), batch_size)]
        total_successful = 0
        
        for batch_num, batch in enumerate(batches):
            print(f"   🔄 [المنتج] جلب الدفعة {batch_num + 1}/{len(batches)} ({len(batch)} عملة)...")
            batch_tasks = []
            for symbol_data in batch:
                symbol_str = symbol_data['symbol'] 
                task = asyncio.create_task(self._fetch_complete_ohlcv_parallel(symbol_str))
                batch_tasks.append(task)
                
            batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
            
            successful_data_for_batch = []
            for i, result in enumerate(batch_results):
                original_symbol_data = batch[i]
                symbol_str = original_symbol_data['symbol']
                
                if isinstance(result, Exception):
                    print(f"      ❌ [المنتج] فشل جلب {symbol_str}: {result}")
                elif result is not None:
                    result.update(original_symbol_data) 
                    successful_data_for_batch.append(result)
                    print(f"      ✅ [المنتج] {symbol_str}: {result.get('successful_timeframes', 0)}/6 أطر زمنية")
                else:
                    print(f"      ⚠️ [المنتج] {symbol_str}: بيانات غير كافية، تم التجاهل")

            if successful_data_for_batch:
                try:
                    await queue.put(successful_data_for_batch)
                    total_successful += len(successful_data_for_batch)
                except Exception as q_err:
                    print(f"      ❌ [المنتج] فشل إرسال الدفعة للطابور: {q_err}")

            if batch_num < len(batches) - 1:
                await asyncio.sleep(1) # (إضافة تأخير بسيط بين الدفعات)
                
        print(f"✅ [المنتج] اكتمل التدفق. تم إرسال {total_successful} عملة للمعالجة.")
        await queue.put(None)
        print("      📬 [المنتج] تم إرسال إشارة الإنهاء (None) إلى الطابور.")

    async def _fetch_complete_ohlcv_parallel(self, symbol: str) -> Dict[str, Any]:
        try:
            ohlcv_data = {}
            timeframes = [('5m', 200), ('15m', 200), ('1h', 200), ('4h', 200), ('1d', 200), ('1w', 200)]
            timeframe_tasks = []
            
            for timeframe, limit in timeframes:
                task = asyncio.create_task(self._fetch_single_timeframe_improved(symbol, timeframe, limit))
                timeframe_tasks.append(task)
                
            timeframe_results = await asyncio.gather(*timeframe_tasks, return_exceptions=True)
            successful_timeframes = 0
            
            for i, (timeframe, limit) in enumerate(timeframes):
                result = timeframe_results[i]
                if isinstance(result, Exception): continue
                if result and len(result) >= 200: # (التأكد من وجود بيانات كافية)
                    ohlcv_data[timeframe] = result
                    successful_timeframes += 1
                    
            if successful_timeframes >= 3 and ohlcv_data: # (نحتاج 3 أطر زمنية على الأقل)
                try:
                    current_price = await self.get_latest_price_async(symbol)
                    if current_price is None:
                        # (محاولة احتياطية للحصول على السعر من آخر شمعة)
                        for tf_data in ohlcv_data.values():
                            if tf_data and len(tf_data) > 0: current_price = tf_data[-1][4]; break
                    if current_price is None: return None
                    
                    return { 
                        'symbol': symbol, 
                        'ohlcv': ohlcv_data, 
                        'raw_ohlcv': ohlcv_data, 
                        'current_price': current_price, 
                        'timestamp': datetime.now().isoformat(), 
                        'candles_count': {tf: len(data) for tf, data in ohlcv_data.items()},
                        'successful_timeframes': successful_timeframes 
                    }
                except Exception: 
                    return None
            else: 
                return None
        except Exception: 
            return None
            
    async def _fetch_single_timeframe_improved(self, symbol: str, timeframe: str, limit: int):
        max_retries = 3
        retry_delay = 2
        for attempt in range(max_retries):
            try:
                ohlcv_data = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
                if ohlcv_data and len(ohlcv_data) > 0: 
                    return ohlcv_data
                else: 
                    return []
            except Exception:
                if attempt < max_retries - 1:
                    await asyncio.sleep(retry_delay * (attempt + 1))
                else: 
                    return []
                    
    async def get_latest_price_async(self, symbol):
        try:
            if not self.exchange: return None
            ticker = self.exchange.fetch_ticker(symbol)
            return float(ticker['last']) if ticker and ticker.get('last') else None
        except Exception: 
            return None
            
    async def get_whale_data_for_symbol(self, symbol):
        try:
            return await self.whale_monitor.get_symbol_whale_activity(symbol) if self.whale_monitor else None
        except Exception: 
            return None
            
    async def get_whale_trading_signal(self, symbol, whale_data, market_context):
        try:
            return await self.whale_monitor.generate_whale_trading_signal(symbol, whale_data, market_context) if self.whale_monitor else {'action': 'HOLD', 'confidence': 0.3, 'reason': 'Whale monitor not available', 'source': 'whale_analysis'}
        except Exception as e:
            return {'action': 'HOLD', 'confidence': 0.3, 'reason': f'Error: {str(e)}', 'source': 'whale_analysis'}

print("✅ DataManager loaded - V9.8 (Sniper Threshold 60%)")