Trad / ML.py
Riy777's picture
Update ML.py
ffe93c4
raw
history blame
61.8 kB
# ML.py
import pandas as pd
import pandas_ta as ta
import numpy as np
from datetime import datetime, timedelta
import asyncio
import json
import re
class AdvancedTechnicalAnalyzer:
def __init__(self):
self.indicators_config = {
'trend': ['ema_9', 'ema_21', 'ema_50', 'ema_200', 'ichimoku', 'adx', 'parabolic_sar', 'dmi'],
'momentum': ['rsi', 'stoch_rsi', 'macd', 'williams_r', 'cci', 'awesome_oscillator', 'momentum'],
'volatility': ['bbands', 'atr', 'keltner', 'donchian', 'rvi'],
'volume': ['vwap', 'obv', 'mfi', 'volume_profile', 'ad', 'volume_oscillator'],
'cycle': ['hull_ma', 'supertrend', 'zigzag', 'fisher_transform']
}
def calculate_all_indicators(self, dataframe, timeframe):
"""حساب جميع المؤشرات الفنية للإطار الزمني المحدد"""
if dataframe.empty or dataframe is None:
return {}
indicators = {}
try:
indicators.update(self._calculate_trend_indicators(dataframe))
indicators.update(self._calculate_momentum_indicators(dataframe))
indicators.update(self._calculate_volatility_indicators(dataframe))
indicators.update(self._calculate_volume_indicators(dataframe, timeframe))
indicators.update(self._calculate_cycle_indicators(dataframe))
except Exception as e:
print(f"⚠️ خطأ في حساب المؤشرات لـ {timeframe}: {e}")
return indicators
def _calculate_trend_indicators(self, dataframe):
"""حساب مؤشرات الاتجاه"""
trend = {}
try:
# التحقق من وجود البيانات الأساسية
if dataframe is None or dataframe.empty or 'close' not in dataframe.columns:
return {}
# المتوسطات المتحركة
if len(dataframe) >= 9:
ema_9 = ta.ema(dataframe['close'], length=9)
if ema_9 is not None and not ema_9.empty and not pd.isna(ema_9.iloc[-1]):
trend['ema_9'] = float(ema_9.iloc[-1])
if len(dataframe) >= 21:
ema_21 = ta.ema(dataframe['close'], length=21)
if ema_21 is not None and not ema_21.empty and not pd.isna(ema_21.iloc[-1]):
trend['ema_21'] = float(ema_21.iloc[-1])
if len(dataframe) >= 50:
ema_50 = ta.ema(dataframe['close'], length=50)
if ema_50 is not None and not ema_50.empty and not pd.isna(ema_50.iloc[-1]):
trend['ema_50'] = float(ema_50.iloc[-1])
if len(dataframe) >= 200:
ema_200 = ta.ema(dataframe['close'], length=200)
if ema_200 is not None and not ema_200.empty and not pd.isna(ema_200.iloc[-1]):
trend['ema_200'] = float(ema_200.iloc[-1])
# إيشيموكو
if len(dataframe) >= 26:
try:
ichimoku = ta.ichimoku(dataframe['high'], dataframe['low'], dataframe['close'])
if ichimoku is not None and len(ichimoku) > 0:
# التحقق من أن ichimoku ليس None وأنه يحتوي على بيانات
conversion_line = ichimoku[0].get('ITS_9') if ichimoku[0] is not None else None
base_line = ichimoku[0].get('IKS_26') if ichimoku[0] is not None else None
if conversion_line is not None and not conversion_line.empty and not pd.isna(conversion_line.iloc[-1]):
trend['ichimoku_conversion'] = float(conversion_line.iloc[-1])
if base_line is not None and not base_line.empty and not pd.isna(base_line.iloc[-1]):
trend['ichimoku_base'] = float(base_line.iloc[-1])
except Exception as ichimoku_error:
print(f"⚠️ خطأ في حساب إيشيموكو: {ichimoku_error}")
# ADX - قوة الاتجاه
if len(dataframe) >= 14:
try:
adx_result = ta.adx(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
if adx_result is not None and not adx_result.empty:
adx_value = adx_result.get('ADX_14')
if adx_value is not None and not adx_value.empty and not pd.isna(adx_value.iloc[-1]):
trend['adx'] = float(adx_value.iloc[-1])
except Exception as adx_error:
print(f"⚠️ خطأ في حساب ADX: {adx_error}")
except Exception as e:
print(f"⚠️ خطأ في حساب مؤشرات الاتجاه: {e}")
return {key: value for key, value in trend.items() if value is not None and not np.isnan(value)}
def _calculate_momentum_indicators(self, dataframe):
"""حساب مؤشرات الزخم"""
momentum = {}
try:
# التحقق من وجود البيانات الأساسية
if dataframe is None or dataframe.empty or 'close' not in dataframe.columns:
return {}
# RSI
if len(dataframe) >= 14:
rsi = ta.rsi(dataframe['close'], length=14)
if rsi is not None and not rsi.empty and not pd.isna(rsi.iloc[-1]):
momentum['rsi'] = float(rsi.iloc[-1])
# MACD
if len(dataframe) >= 26:
macd = ta.macd(dataframe['close'])
if macd is not None and not macd.empty:
macd_hist = macd.get('MACDh_12_26_9')
macd_line = macd.get('MACD_12_26_9')
if macd_hist is not None and not macd_hist.empty and not pd.isna(macd_hist.iloc[-1]):
momentum['macd_hist'] = float(macd_hist.iloc[-1])
if macd_line is not None and not macd_line.empty and not pd.isna(macd_line.iloc[-1]):
momentum['macd_line'] = float(macd_line.iloc[-1])
# ستوكاستك RSI
if len(dataframe) >= 14:
stoch_rsi = ta.stochrsi(dataframe['close'], length=14)
if stoch_rsi is not None and not stoch_rsi.empty:
stoch_k = stoch_rsi.get('STOCHRSIk_14_14_3_3')
if stoch_k is not None and not stoch_k.empty and not pd.isna(stoch_k.iloc[-1]):
momentum['stoch_rsi_k'] = float(stoch_k.iloc[-1])
# ويليامز %R
if len(dataframe) >= 14:
williams = ta.willr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
if williams is not None and not williams.empty and not pd.isna(williams.iloc[-1]):
momentum['williams_r'] = float(williams.iloc[-1])
except Exception as e:
print(f"⚠️ خطأ في حساب مؤشرات الزخم: {e}")
return {key: value for key, value in momentum.items() if value is not None and not np.isnan(value)}
def _calculate_volatility_indicators(self, dataframe):
"""حساب مؤشرات التقلب"""
volatility = {}
try:
# التحقق من وجود البيانات الأساسية
if dataframe is None or dataframe.empty or 'close' not in dataframe.columns:
return {}
# بولينجر باندز
if len(dataframe) >= 20:
bollinger_bands = ta.bbands(dataframe['close'], length=20, std=2)
if bollinger_bands is not None and not bollinger_bands.empty:
bb_lower = bollinger_bands.get('BBL_20_2.0')
bb_upper = bollinger_bands.get('BBU_20_2.0')
bb_middle = bollinger_bands.get('BBM_20_2.0')
if bb_lower is not None and not bb_lower.empty and not pd.isna(bb_lower.iloc[-1]):
volatility['bb_lower'] = float(bb_lower.iloc[-1])
if bb_upper is not None and not bb_upper.empty and not pd.isna(bb_upper.iloc[-1]):
volatility['bb_upper'] = float(bb_upper.iloc[-1])
if bb_middle is not None and not bb_middle.empty and not pd.isna(bb_middle.iloc[-1]):
volatility['bb_middle'] = float(bb_middle.iloc[-1])
# متوسط المدى الحقيقي (ATR)
if len(dataframe) >= 14:
average_true_range = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
if average_true_range is not None and not average_true_range.empty and not pd.isna(average_true_range.iloc[-1]):
atr_value = float(average_true_range.iloc[-1])
volatility['atr'] = atr_value
current_close = dataframe['close'].iloc[-1] if not dataframe['close'].empty else 0
if atr_value and current_close > 0:
volatility['atr_percent'] = (atr_value / current_close) * 100
except Exception as e:
print(f"⚠️ خطأ في حساب مؤشرات التقلب: {e}")
return {key: value for key, value in volatility.items() if value is not None and not np.isnan(value)}
def _calculate_volume_indicators(self, dataframe, timeframe):
"""حساب مؤشرات الحجم"""
volume = {}
try:
# التحقق من وجود البيانات الأساسية
if dataframe is None or dataframe.empty or 'close' not in dataframe.columns or 'volume' not in dataframe.columns:
return {}
# VWAP - إصلاح المشكلة هنا
if len(dataframe) >= 1:
try:
# إنشاء نسخة من البيانات مع DatetimeIndex مرتب
df_vwap = dataframe.copy()
# تحويل timestamp إلى datetime وضبطه كـ index
if not isinstance(df_vwap.index, pd.DatetimeIndex):
if 'timestamp' in df_vwap.columns:
df_vwap['timestamp'] = pd.to_datetime(df_vwap['timestamp'], unit='ms')
df_vwap.set_index('timestamp', inplace=True)
# التأكد من أن الفهرس مرتب
df_vwap.sort_index(inplace=True)
# حساب VWAP
volume_weighted_average_price = ta.vwap(
high=df_vwap['high'],
low=df_vwap['low'],
close=df_vwap['close'],
volume=df_vwap['volume']
)
if volume_weighted_average_price is not None and not volume_weighted_average_price.empty and not pd.isna(volume_weighted_average_price.iloc[-1]):
volume['vwap'] = float(volume_weighted_average_price.iloc[-1])
except Exception as vwap_error:
print(f"⚠️ خطأ في حساب VWAP لـ {timeframe}: {vwap_error}")
# استخدام بديل لـ VWAP في حالة الخطأ
if len(dataframe) >= 20:
try:
typical_price = (dataframe['high'] + dataframe['low'] + dataframe['close']) / 3
vwap_simple = (typical_price * dataframe['volume']).sum() / dataframe['volume'].sum()
if not np.isnan(vwap_simple):
volume['vwap'] = float(vwap_simple)
except Exception as simple_vwap_error:
print(f"⚠️ خطأ في حساب VWAP البديل: {simple_vwap_error}")
# OBV
try:
on_balance_volume = ta.obv(dataframe['close'], dataframe['volume'])
if on_balance_volume is not None and not on_balance_volume.empty and not pd.isna(on_balance_volume.iloc[-1]):
volume['obv'] = float(on_balance_volume.iloc[-1])
except Exception as obv_error:
print(f"⚠️ خطأ في حساب OBV: {obv_error}")
# MFI
if len(dataframe) >= 14:
try:
money_flow_index = ta.mfi(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'], length=14)
if money_flow_index is not None and not money_flow_index.empty and not pd.isna(money_flow_index.iloc[-1]):
volume['mfi'] = float(money_flow_index.iloc[-1])
except Exception as mfi_error:
print(f"⚠️ خطأ في حساب MFI: {mfi_error}")
# نسبة الحجم
if len(dataframe) >= 20:
try:
volume_avg_20 = float(dataframe['volume'].tail(20).mean())
current_volume = float(dataframe['volume'].iloc[-1]) if not dataframe['volume'].empty else 0
if volume_avg_20 and volume_avg_20 > 0 and current_volume > 0:
volume_ratio = current_volume / volume_avg_20
if not np.isnan(volume_ratio):
volume['volume_ratio'] = volume_ratio
except Exception as volume_error:
print(f"⚠️ خطأ في حساب نسبة الحجم: {volume_error}")
except Exception as e:
print(f"⚠️ خطأ في حساب مؤشرات الحجم: {e}")
return {key: value for key, value in volume.items() if value is not None and not np.isnan(value)}
def _calculate_cycle_indicators(self, dataframe):
"""حساب مؤشرات الدورة"""
cycle = {}
try:
# التحقق من وجود البيانات الأساسية
if dataframe is None or dataframe.empty or 'close' not in dataframe.columns:
return {}
# هول موفينج افريج
if len(dataframe) >= 9:
hull_moving_average = ta.hma(dataframe['close'], length=9)
if hull_moving_average is not None and not hull_moving_average.empty and not pd.isna(hull_moving_average.iloc[-1]):
cycle['hull_ma'] = float(hull_moving_average.iloc[-1])
# سوبرتريند
if len(dataframe) >= 10:
supertrend = ta.supertrend(dataframe['high'], dataframe['low'], dataframe['close'], length=10, multiplier=3)
if supertrend is not None and not supertrend.empty:
supertrend_value = supertrend.get('SUPERT_10_3.0')
if supertrend_value is not None and not supertrend_value.empty and not pd.isna(supertrend_value.iloc[-1]):
cycle['supertrend'] = float(supertrend_value.iloc[-1])
except Exception as e:
print(f"⚠️ خطأ في حساب مؤشرات الدورة: {e}")
return {key: value for key, value in cycle.items() if value is not None and not np.isnan(value)}
class MonteCarloAnalyzer:
def __init__(self):
self.simulation_results = {}
async def predict_1h_probability(self, ohlcv_data):
"""
محاكاة مونت كارلو للتنبؤ بالساعة القادمة
تركز على احتمالية تحقيق ربح 0.5% في الساعة القادمة
"""
try:
if not ohlcv_data or '1h' not in ohlcv_data or len(ohlcv_data['1h']) < 24:
return None # ❌ لا نرجع قيمة افتراضية
# استخدام بيانات 1h و 15m معاً لدقة أفضل
all_closes = []
# إضافة بيانات 1h
all_closes.extend([candle[4] for candle in ohlcv_data['1h']])
# إضافة بيانات 15m إن وجدت
if '15m' in ohlcv_data and len(ohlcv_data['15m']) >= 16:
recent_15m = [candle[4] for candle in ohlcv_data['15m'][-16:]]
all_closes.extend(recent_15m)
if len(all_closes) < 30:
return None # ❌ لا نرجع قيمة افتراضية
closes = np.array(all_closes)
current_price = closes[-1]
# حساب العوائد اللوغاريتمية بدقة
log_returns = []
for i in range(1, len(closes)):
if closes[i-1] > 0:
log_return = np.log(closes[i] / closes[i-1])
log_returns.append(log_return)
if len(log_returns) < 20:
return None # ❌ لا نرجع قيمة افتراضية
log_returns = np.array(log_returns)
mean_return = np.mean(log_returns)
std_return = np.std(log_returns)
# محاكاة مونت كارلو للساعة القادمة
num_simulations = 2000
target_periods = 1
profit_threshold = 0.005
success_count = 0
simulation_details = []
for i in range(num_simulations):
simulated_price = current_price
# محاكاة حركة السعر للساعة القادمة
for period in range(target_periods):
random_return = np.random.normal(mean_return, std_return)
simulated_price *= np.exp(random_return)
price_change = (simulated_price - current_price) / current_price
if price_change >= profit_threshold:
success_count += 1
if i < 100:
simulation_details.append({
'simulation': i,
'final_price': simulated_price,
'profit_percent': price_change * 100
})
probability = success_count / num_simulations
trend_adjustment = self._calculate_trend_adjustment(closes)
adjusted_probability = probability * trend_adjustment
self.simulation_results = {
'base_probability': probability,
'adjusted_probability': adjusted_probability,
'success_count': success_count,
'total_simulations': num_simulations,
'mean_return': mean_return,
'std_return': std_return,
'trend_adjustment': trend_adjustment,
'simulation_details': simulation_details[:10]
}
return min(max(adjusted_probability, 0.01), 0.99)
except Exception as e:
print(f"❌ خطأ في محاكاة مونت كارلو: {e}")
return None # ❌ لا نرجع قيمة افتراضية
def _calculate_trend_adjustment(self, closes):
"""حساب معامل تعديل الاتجاه"""
try:
if len(closes) < 10:
return 1.0
recent_trend = (closes[-1] - closes[-10]) / closes[-10]
gains = []
losses = []
for i in range(1, min(14, len(closes))):
change = closes[-(i+1)] - closes[-i]
if change > 0:
gains.append(change)
else:
losses.append(abs(change))
avg_gain = np.mean(gains) if gains else 0
avg_loss = np.mean(losses) if losses else 1
rs = avg_gain / avg_loss
trend_strength = 100 - (100 / (1 + rs))
if recent_trend > 0.02 and trend_strength > 60:
return 1.3
elif recent_trend > 0.01 and trend_strength > 50:
return 1.15
elif recent_trend < -0.02 and trend_strength < 40:
return 0.7
elif recent_trend < -0.01 and trend_strength < 50:
return 0.85
else:
return 1.0
except Exception as e:
print(f"❌ خطأ في حساب تعديل الاتجاه: {e}")
return 1.0
class PatternEnhancedStrategyEngine:
def __init__(self, data_manager, learning_engine):
self.data_manager = data_manager
self.learning_engine = learning_engine
self.pattern_analyzer = ChartPatternAnalyzer()
async def enhance_strategy_with_patterns(self, strategy_scores, pattern_analysis, symbol):
"""تعزيز الاستراتيجيات بناءً على الأنماط المكتشفة"""
if not pattern_analysis or pattern_analysis.get('pattern_detected') in ['no_clear_pattern', 'insufficient_data']:
return strategy_scores
pattern_confidence = pattern_analysis.get('pattern_confidence', 0)
pattern_name = pattern_analysis.get('pattern_detected', '')
predicted_direction = pattern_analysis.get('predicted_direction', '')
if pattern_confidence >= 0.6:
enhancement_factor = self._calculate_pattern_enhancement(pattern_confidence, pattern_name)
enhanced_strategies = self._get_pattern_appropriate_strategies(pattern_name, predicted_direction)
print(f"🎯 تعزيز استراتيجيات {symbol} بناءً على نمط {pattern_name} (ثقة: {pattern_confidence:.2f})")
for strategy in enhanced_strategies:
if strategy in strategy_scores:
original_score = strategy_scores[strategy]
strategy_scores[strategy] = min(original_score * enhancement_factor, 1.0)
print(f" 📈 {strategy}: {original_score:.3f}{strategy_scores[strategy]:.3f}")
return strategy_scores
def _calculate_pattern_enhancement(self, pattern_confidence, pattern_name):
"""حساب عامل التعزيز بناءً على ثقة النمط ونوعه"""
base_enhancement = 1.0 + (pattern_confidence * 0.3)
high_reliability_patterns = ['Double Top', 'Double Bottom', 'Head & Shoulders', 'Cup and Handle']
if pattern_name in high_reliability_patterns:
base_enhancement *= 1.1
return min(base_enhancement, 1.5)
def _get_pattern_appropriate_strategies(self, pattern_name, direction):
"""تحديد الاستراتيجيات المناسبة للنمط المكتشف"""
reversal_patterns = ['Double Top', 'Double Bottom', 'Head & Shoulders', 'Triple Top', 'Triple Bottom']
continuation_patterns = ['Flags', 'Pennants', 'Triangles', 'Rectangles']
if pattern_name in reversal_patterns:
if direction == 'down':
return ['breakout_momentum', 'trend_following']
else:
return ['mean_reversion', 'breakout_momentum']
elif pattern_name in continuation_patterns:
return ['trend_following', 'breakout_momentum']
else:
return ['breakout_momentum', 'hybrid_ai']
class ChartPatternAnalyzer:
def __init__(self):
self.pattern_cache = {}
async def detect_chart_patterns(self, ohlcv_data):
"""اكتشاف الأنماط البيانية لجميع الأطر الزمنية"""
patterns = {
'pattern_detected': 'no_clear_pattern',
'pattern_confidence': 0,
'predicted_direction': 'neutral',
'timeframe_analysis': {},
'all_patterns': []
}
try:
for timeframe, candles in ohlcv_data.items():
if candles and len(candles) >= 20:
dataframe = self._create_dataframe(candles)
timeframe_pattern = await self._analyze_timeframe_patterns(dataframe, timeframe)
patterns['timeframe_analysis'][timeframe] = timeframe_pattern
patterns['all_patterns'].append(timeframe_pattern)
if timeframe_pattern['confidence'] > patterns['pattern_confidence']:
patterns.update({
'pattern_detected': timeframe_pattern['pattern'],
'pattern_confidence': timeframe_pattern['confidence'],
'predicted_direction': timeframe_pattern['direction']
})
return patterns
except Exception as e:
print(f"❌ خطأ في اكتشاف الأنماط: {e}")
return patterns
def _create_dataframe(self, candles):
"""إنشاء DataFrame من بيانات الشموع"""
try:
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
return df
except Exception as e:
print(f"❌ خطأ في إنشاء DataFrame: {e}")
return pd.DataFrame()
async def _analyze_timeframe_patterns(self, dataframe, timeframe):
"""تحليل الأنماط لإطار زمني محدد"""
pattern_info = {
'pattern': 'no_clear_pattern',
'confidence': 0,
'direction': 'neutral',
'timeframe': timeframe,
'details': {}
}
try:
if dataframe is None or dataframe.empty or len(dataframe) < 20:
return pattern_info
closes = dataframe['close'].values
highs = dataframe['high'].values
lows = dataframe['low'].values
current_price = closes[-1]
patterns_detected = []
double_pattern = self._detect_double_pattern(highs, lows, closes)
if double_pattern['detected']:
patterns_detected.append(double_pattern)
breakout_pattern = self._detect_breakout_pattern(highs, lows, closes)
if breakout_pattern['detected']:
patterns_detected.append(breakout_pattern)
trend_pattern = self._detect_trend_pattern(dataframe)
if trend_pattern['detected']:
patterns_detected.append(trend_pattern)
support_resistance_pattern = self._detect_support_resistance(highs, lows, closes)
if support_resistance_pattern['detected']:
patterns_detected.append(support_resistance_pattern)
if patterns_detected:
best_pattern = max(patterns_detected, key=lambda x: x['confidence'])
pattern_info.update({
'pattern': best_pattern['pattern'],
'confidence': best_pattern['confidence'],
'direction': best_pattern.get('direction', 'neutral'),
'details': best_pattern.get('details', {})
})
return pattern_info
except Exception as e:
print(f"❌ خطأ في تحليل الأنماط للإطار {timeframe}: {e}")
return pattern_info
def _detect_double_pattern(self, highs, lows, closes):
"""كشف نمط القمة المزدوجة أو القاع المزدوج"""
try:
if len(highs) < 15:
return {'detected': False}
recent_highs = highs[-15:]
recent_lows = lows[-15:]
high_indices = np.argsort(recent_highs)[-2:]
high_indices.sort()
low_indices = np.argsort(recent_lows)[:2]
low_indices.sort()
double_top = False
double_bottom = False
if len(high_indices) == 2:
high1 = recent_highs[high_indices[0]]
high2 = recent_highs[high_indices[1]]
time_diff = high_indices[1] - high_indices[0]
if (abs(high1 - high2) / high1 < 0.02 and
time_diff >= 3 and time_diff <= 10 and
closes[-1] < min(high1, high2)):
double_top = True
if len(low_indices) == 2:
low1 = recent_lows[low_indices[0]]
low2 = recent_lows[low_indices[1]]
time_diff = low_indices[1] - low_indices[0]
if (abs(low1 - low2) / low1 < 0.02 and
time_diff >= 3 and time_diff <= 10 and
closes[-1] > max(low1, low2)):
double_bottom = True
if double_top:
return {
'detected': True,
'pattern': 'Double Top',
'confidence': 0.75,
'direction': 'down',
'details': {
'resistance_level': np.mean([high1, high2]),
'breakdown_level': min(lows[-5:])
}
}
elif double_bottom:
return {
'detected': True,
'pattern': 'Double Bottom',
'confidence': 0.75,
'direction': 'up',
'details': {
'support_level': np.mean([low1, low2]),
'breakout_level': max(highs[-5:])
}
}
return {'detected': False}
except Exception as e:
return {'detected': False}
def _detect_breakout_pattern(self, highs, lows, closes):
"""كشف نمط الاختراق"""
try:
if len(highs) < 25:
return {'detected': False}
current_price = closes[-1]
resistance = np.max(highs[-25:-5])
support = np.min(lows[-25:-5])
if current_price > resistance * 1.01:
return {
'detected': True,
'pattern': 'Breakout Up',
'confidence': 0.8,
'direction': 'up',
'details': {
'breakout_level': resistance,
'target_level': resistance * 1.05
}
}
elif current_price < support * 0.99:
return {
'detected': True,
'pattern': 'Breakout Down',
'confidence': 0.8,
'direction': 'down',
'details': {
'breakdown_level': support,
'target_level': support * 0.95
}
}
return {'detected': False}
except Exception as e:
return {'detected': False}
def _detect_trend_pattern(self, dataframe):
"""كشف نمط الاتجاه"""
try:
if dataframe is None or dataframe.empty or len(dataframe) < 20:
return {'detected': False}
closes = dataframe['close'].values
ma_short = np.mean(closes[-5:])
ma_medium = np.mean(closes[-13:])
ma_long = np.mean(closes[-21:])
if ma_short > ma_medium > ma_long and closes[-1] > ma_short:
trend_strength = (ma_short - ma_long) / ma_long
confidence = min(0.3 + trend_strength * 10, 0.8)
return {
'detected': True,
'pattern': 'Uptrend',
'confidence': confidence,
'direction': 'up',
'details': {
'trend_strength': trend_strength,
'support_level': ma_medium
}
}
elif ma_short < ma_medium < ma_long and closes[-1] < ma_short:
trend_strength = (ma_long - ma_short) / ma_long
confidence = min(0.3 + trend_strength * 10, 0.8)
return {
'detected': True,
'pattern': 'Downtrend',
'confidence': confidence,
'direction': 'down',
'details': {
'trend_strength': trend_strength,
'resistance_level': ma_medium
}
}
return {'detected': False}
except Exception as e:
return {'detected': False}
def _detect_support_resistance(self, highs, lows, closes):
"""كشف مستويات الدعم والمقاومة"""
try:
if len(highs) < 20:
return {'detected': False}
current_price = closes[-1]
resistance_level = np.max(highs[-20:])
support_level = np.min(lows[-20:])
position = (current_price - support_level) / (resistance_level - support_level)
if position < 0.2:
return {
'detected': True,
'pattern': 'Near Support',
'confidence': 0.6,
'direction': 'up',
'details': {
'support_level': support_level,
'resistance_level': resistance_level,
'position': position
}
}
elif position > 0.8:
return {
'detected': True,
'pattern': 'Near Resistance',
'confidence': 0.6,
'direction': 'down',
'details': {
'support_level': support_level,
'resistance_level': resistance_level,
'position': position
}
}
return {'detected': False}
except Exception as e:
return {'detected': False}
class MultiStrategyEngine:
def __init__(self, data_manager, learning_engine):
self.data_manager = data_manager
self.learning_engine = learning_engine
self.technical_analyzer = AdvancedTechnicalAnalyzer()
self.pattern_enhancer = PatternEnhancedStrategyEngine(data_manager, learning_engine)
self.monte_carlo_analyzer = MonteCarloAnalyzer()
self.pattern_analyzer = ChartPatternAnalyzer()
self.strategies = {
'trend_following': self._trend_following_strategy,
'mean_reversion': self._mean_reversion_strategy,
'breakout_momentum': self._breakout_momentum_strategy,
'volume_spike': self._volume_spike_strategy,
'whale_tracking': self._whale_tracking_strategy,
'pattern_recognition': self._pattern_recognition_strategy,
'hybrid_ai': self._hybrid_ai_strategy
}
async def evaluate_all_strategies(self, symbol_data, market_context):
"""تقييم جميع استراتيجيات التداول"""
try:
if self.learning_engine and hasattr(self.learning_engine, 'initialized') and self.learning_engine.initialized:
try:
market_condition = market_context.get('market_trend', 'sideways_market')
optimized_weights = await self.learning_engine.get_optimized_strategy_weights(market_condition)
except Exception as e:
# ❌ لا نستخدم قيم افتراضية، نستخدم الأوزان الأساسية
optimized_weights = await self.get_default_weights()
else:
optimized_weights = await self.get_default_weights()
strategy_scores = {}
base_scores = {}
for strategy_name, strategy_function in self.strategies.items():
try:
base_score = await strategy_function(symbol_data, market_context)
if base_score is None: # ❌ إذا فشلت الاستراتيجية، لا نستخدم قيم افتراضية
continue
base_scores[strategy_name] = base_score
weight = optimized_weights.get(strategy_name, 0.1)
weighted_score = base_score * weight
strategy_scores[strategy_name] = min(weighted_score, 1.0)
except Exception as error:
print(f"❌ خطأ في تقييم استراتيجية {strategy_name}: {error}")
# ❌ لا نستخدم أي محاكاة أو قيم افتراضية
continue
pattern_analysis = symbol_data.get('pattern_analysis')
if pattern_analysis:
strategy_scores = await self.pattern_enhancer.enhance_strategy_with_patterns(
strategy_scores, pattern_analysis, symbol_data.get('symbol')
)
if base_scores:
best_strategy = max(base_scores.items(), key=lambda x: x[1])
best_strategy_name = best_strategy[0]
best_strategy_score = best_strategy[1]
symbol_data['recommended_strategy'] = best_strategy_name
symbol_data['strategy_confidence'] = best_strategy_score
return strategy_scores, base_scores
except Exception as error:
print(f"❌ خطأ في تقييم الاستراتيجيات: {error}")
# ❌ لا نستخدم أي محاكاة
return {}, {}
async def get_default_weights(self):
"""الأوزان الافتراضية للاستراتيجيات - هذه ليست محاكاة ولكن أوزان ابتدائية"""
return {
'trend_following': 0.15,
'mean_reversion': 0.12,
'breakout_momentum': 0.18,
'volume_spike': 0.10,
'whale_tracking': 0.20,
'pattern_recognition': 0.15,
'hybrid_ai': 0.10
}
async def _trend_following_strategy(self, symbol_data, market_context):
"""استراتيجية تتبع الاتجاه"""
try:
score = 0.0
indicators = symbol_data.get('advanced_indicators', {})
for timeframe in ['4h', '1h', '15m']:
if timeframe in indicators:
timeframe_indicators = indicators[timeframe]
if self._check_ema_alignment(timeframe_indicators):
score += 0.20
adx_value = timeframe_indicators.get('adx', 0)
if adx_value > 25:
score += 0.15
if (timeframe_indicators.get('ichimoku_conversion', 0) >
timeframe_indicators.get('ichimoku_base', 0)):
score += 0.10
return min(score, 1.0)
except Exception as error:
print(f"❌ خطأ في استراتيجية تتبع الاتجاه: {error}")
return None # ❌ لا نرجع قيمة افتراضية
def _check_ema_alignment(self, indicators):
"""التحقق من محاذاة المتوسطات المتحركة"""
required_emas = ['ema_9', 'ema_21', 'ema_50']
if all(ema in indicators for ema in required_emas):
return (indicators['ema_9'] > indicators['ema_21'] > indicators['ema_50'])
return False
async def _mean_reversion_strategy(self, symbol_data, market_context):
"""استراتيجية العودة للمتوسط"""
try:
score = 0.0
current_price = symbol_data['current_price']
indicators = symbol_data.get('advanced_indicators', {})
if '1h' in indicators:
hourly_indicators = indicators['1h']
if all(key in hourly_indicators for key in ['bb_upper', 'bb_lower', 'bb_middle']):
position_in_band = (current_price - hourly_indicators['bb_lower']) / (
hourly_indicators['bb_upper'] - hourly_indicators['bb_lower'])
if position_in_band < 0.1 and hourly_indicators.get('rsi', 50) < 35:
score += 0.45
if position_in_band > 0.9 and hourly_indicators.get('rsi', 50) > 65:
score += 0.45
rsi_value = hourly_indicators.get('rsi', 50)
if rsi_value < 30:
score += 0.35
elif rsi_value > 70:
score += 0.35
return min(score, 1.0)
except Exception as error:
print(f"❌ خطأ في استراتيجية العودة للمتوسط: {error}")
return None # ❌ لا نرجع قيمة افتراضية
async def _breakout_momentum_strategy(self, symbol_data, market_context):
"""استراتيجية زخم الاختراق"""
try:
score = 0.0
indicators = symbol_data.get('advanced_indicators', {})
for timeframe in ['1h', '15m', '5m']:
if timeframe in indicators:
timeframe_indicators = indicators[timeframe]
volume_ratio = timeframe_indicators.get('volume_ratio', 0)
if volume_ratio > 1.8:
score += 0.25
elif volume_ratio > 1.3:
score += 0.15
if timeframe_indicators.get('macd_hist', 0) > 0:
score += 0.20
if 'vwap' in timeframe_indicators and symbol_data['current_price'] > timeframe_indicators['vwap']:
score += 0.15
rsi_value = timeframe_indicators.get('rsi', 50)
if 40 <= rsi_value <= 70:
score += 0.10
if score > 0.2:
score = max(score, 0.4)
return min(score, 1.0)
except Exception as error:
print(f"❌ خطأ في استراتيجية زخم الاختراق: {error}")
return None # ❌ لا نرجع قيمة افتراضية
async def _volume_spike_strategy(self, symbol_data, market_context):
"""استراتيجية ارتفاع الحجم"""
try:
score = 0.0
indicators = symbol_data.get('advanced_indicators', {})
for timeframe in ['1h', '15m', '5m']:
if timeframe in indicators:
volume_ratio = indicators[timeframe].get('volume_ratio', 0)
if volume_ratio > 3.0:
score += 0.45
elif volume_ratio > 2.0:
score += 0.25
elif volume_ratio > 1.5:
score += 0.15
return min(score, 1.0)
except Exception as error:
print(f"❌ خطأ في استراتيجية ارتفاع الحجم: {error}")
return None # ❌ لا نرجع قيمة افتراضية
async def _whale_tracking_strategy(self, symbol_data, market_context):
"""استراتيجية تتبع الحيتان"""
try:
whale_data = symbol_data.get('whale_data', {})
if not whale_data.get('data_available', False):
return None # ❌ لا نرجع قيمة افتراضية
whale_signal = await self.data_manager.get_whale_trading_signal(
symbol_data['symbol'], whale_data, market_context
)
if whale_signal and whale_signal.get('action') != 'HOLD':
confidence = whale_signal.get('confidence', 0)
if whale_signal.get('action') in ['STRONG_BUY', 'BUY']:
return min(confidence * 1.2, 1.0)
elif whale_signal.get('action') in ['STRONG_SELL', 'SELL']:
return min(confidence * 0.8, 1.0)
return None # ❌ لا نرجع قيمة افتراضية
except Exception as error:
print(f"❌ خطأ في استراتيجية تتبع الحيتان: {error}")
return None # ❌ لا نرجع قيمة افتراضية
async def _pattern_recognition_strategy(self, symbol_data, market_context):
"""استراتيجية التعرف على الأنماط"""
try:
score = 0.0
pattern_analysis = symbol_data.get('pattern_analysis')
if pattern_analysis and pattern_analysis.get('pattern_confidence', 0) > 0.6:
score += pattern_analysis.get('pattern_confidence', 0) * 0.8
else:
indicators = symbol_data.get('advanced_indicators', {})
for timeframe in ['4h', '1h']:
if timeframe in indicators:
timeframe_indicators = indicators[timeframe]
if (timeframe_indicators.get('rsi', 50) > 60 and
timeframe_indicators.get('macd_hist', 0) > 0 and
timeframe_indicators.get('volume_ratio', 0) > 1.5):
score += 0.35
return min(score, 1.0)
except Exception as error:
print(f"❌ خطأ في استراتيجية التعرف على الأنماط: {error}")
return None # ❌ لا نرجع قيمة افتراضية
async def _hybrid_ai_strategy(self, symbol_data, market_context):
"""استراتيجية الهجين الذكية"""
try:
score = 0.0
monte_carlo_probability = symbol_data.get('monte_carlo_probability')
if monte_carlo_probability is not None:
score += monte_carlo_probability * 0.4
final_score = symbol_data.get('final_score', 0)
if final_score > 0:
score += final_score * 0.3
if market_context.get('btc_sentiment') == 'BULLISH':
score += 0.15
elif market_context.get('btc_sentiment') == 'BEARISH':
score -= 0.08
pattern_analysis = symbol_data.get('pattern_analysis')
if pattern_analysis and pattern_analysis.get('pattern_confidence', 0) > 0.6:
pattern_bonus = pattern_analysis.get('pattern_confidence', 0) * 0.15
score += pattern_bonus
return max(0.0, min(score, 1.0))
except Exception as error:
print(f"❌ خطأ في استراتيجية الهجين الذكية: {error}")
return None # ❌ لا نرجع قيمة افتراضية
class MLProcessor:
def __init__(self, market_context, data_manager, learning_engine):
self.market_context = market_context
self.data_manager = data_manager
self.learning_engine = learning_engine
self.technical_analyzer = AdvancedTechnicalAnalyzer()
self.strategy_engine = MultiStrategyEngine(data_manager, learning_engine)
self.monte_carlo_analyzer = MonteCarloAnalyzer()
self.pattern_analyzer = ChartPatternAnalyzer()
async def process_and_score_symbol_enhanced(self, raw_data):
"""المعالجة المحسنة للرموز مع كل التحليلات المتقدمة"""
try:
if not raw_data or not raw_data.get('ohlcv'):
print(f"❌ بيانات غير صالحة للرمز {raw_data.get('symbol', 'unknown')}")
return None
symbol = raw_data['symbol']
print(f"🔍 معالجة الرمز {symbol} بالتحليلات المتقدمة...")
base_analysis = await self.process_and_score_symbol(raw_data)
if not base_analysis:
return None
try:
advanced_indicators = {}
for timeframe, candles in raw_data['ohlcv'].items():
if candles and len(candles) >= 20:
dataframe = self._create_dataframe(candles)
indicators = self.technical_analyzer.calculate_all_indicators(dataframe, timeframe)
advanced_indicators[timeframe] = indicators
base_analysis['advanced_indicators'] = advanced_indicators
monte_carlo_probability = await self.monte_carlo_analyzer.predict_1h_probability(raw_data['ohlcv'])
if monte_carlo_probability is not None:
base_analysis['monte_carlo_probability'] = monte_carlo_probability
base_analysis['monte_carlo_details'] = self.monte_carlo_analyzer.simulation_results
pattern_analysis = await self.pattern_analyzer.detect_chart_patterns(raw_data['ohlcv'])
base_analysis['pattern_analysis'] = pattern_analysis
# جلب بيانات الحيتان للعملة
whale_data = await self.data_manager.get_whale_data_for_symbol(symbol)
if whale_data:
base_analysis['whale_data'] = whale_data
strategy_scores, base_scores = await self.strategy_engine.evaluate_all_strategies(base_analysis, self.market_context)
base_analysis['strategy_scores'] = strategy_scores
base_analysis['base_strategy_scores'] = base_scores
if base_scores:
best_strategy = max(base_scores.items(), key=lambda x: x[1])
best_strategy_name = best_strategy[0]
best_strategy_score = best_strategy[1]
base_analysis['recommended_strategy'] = best_strategy_name
base_analysis['strategy_confidence'] = best_strategy_score
if best_strategy_score > 0.3:
base_analysis['target_strategy'] = best_strategy_name
else:
base_analysis['target_strategy'] = 'GENERIC'
print(f"🎯 أفضل استراتيجية لـ {symbol}: {best_strategy_name} (ثقة: {best_strategy_score:.2f})")
enhanced_score = self._calculate_enhanced_final_score(base_analysis)
base_analysis['enhanced_final_score'] = enhanced_score
print(f"✅ اكتمل التحليل المتقدم لـ {symbol}:")
print(f" 📊 النهائي: {enhanced_score:.3f}")
if monte_carlo_probability is not None:
print(f" 🎯 مونت كارلو: {monte_carlo_probability:.3f}")
print(f" 🎯 نمط: {pattern_analysis.get('pattern_detected')} (ثقة: {pattern_analysis.get('pattern_confidence', 0):.2f})")
if whale_data and whale_data.get('data_available'):
print(f" 🐋 حيتان: {whale_data.get('trading_signal', {}).get('action', 'HOLD')} (ثقة: {whale_data.get('trading_signal', {}).get('confidence', 0):.2f})")
return base_analysis
except Exception as strategy_error:
print(f"❌ خطأ في التحليل المتقدم لـ {symbol}: {strategy_error}")
return base_analysis
except Exception as error:
print(f"❌ خطأ في المعالجة المحسنة للرمز {raw_data.get('symbol', 'unknown')}: {error}")
return await self.process_and_score_symbol(raw_data)
def _create_dataframe(self, candles):
"""إنشاء DataFrame من بيانات الشموع مع DatetimeIndex مرتب"""
try:
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
# تحويل timestamp إلى datetime وضبطه كـ index
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
# التأكد من أن الفهرس مرتب
df.sort_index(inplace=True)
return df
except Exception as e:
print(f"❌ خطأ في إنشاء DataFrame: {e}")
return pd.DataFrame()
def _calculate_enhanced_final_score(self, analysis):
"""حساب الدرجة النهائية المحسنة"""
try:
base_score = analysis.get('final_score', 0)
monte_carlo_score = analysis.get('monte_carlo_probability', 0)
pattern_confidence = analysis.get('pattern_analysis', {}).get('pattern_confidence', 0)
strategy_confidence = analysis.get('strategy_confidence', 0)
# استخدام فقط البيانات المتاحة
components = []
weights = []
if base_score > 0:
components.append(base_score)
weights.append(0.25)
if monte_carlo_score > 0:
components.append(monte_carlo_score)
weights.append(0.30)
if pattern_confidence > 0:
components.append(pattern_confidence)
weights.append(0.25)
if strategy_confidence > 0:
components.append(strategy_confidence)
weights.append(0.20)
if not components:
return 0 # ❌ لا توجد بيانات صالحة
# حساب المتوسط المرجح
total_weight = sum(weights)
if total_weight == 0:
return 0
enhanced_score = sum(comp * weight for comp, weight in zip(components, weights)) / total_weight
return min(enhanced_score, 1.0)
except Exception as e:
print(f"❌ خطأ في حساب الدرجة المحسنة: {e}")
return analysis.get('final_score', 0)
async def process_and_score_symbol(self, raw_data):
"""المعالجة الأساسية للرمز"""
try:
symbol = raw_data['symbol']
ohlcv_data = raw_data['ohlcv']
if not ohlcv_data:
return None
current_price = raw_data.get('current_price', 0)
layer1_score = raw_data.get('layer1_score', 0)
reasons = raw_data.get('reasons_for_candidacy', [])
final_score = layer1_score
return {
'symbol': symbol,
'current_price': current_price,
'final_score': final_score,
'enhanced_final_score': final_score,
'reasons_for_candidacy': reasons,
'layer1_score': layer1_score
}
except Exception as error:
print(f"❌ خطأ في المعالجة الأساسية للرمز {raw_data.get('symbol', 'unknown')}: {error}")
return None
def filter_top_candidates(self, candidates, number_of_candidates=10):
"""تصفية أفضل المرشحين"""
valid_candidates = [candidate for candidate in candidates if candidate is not None]
if not valid_candidates:
print("❌ لا توجد مرشحات صالحة للتصفية")
return []
sorted_candidates = sorted(valid_candidates,
key=lambda candidate: candidate.get('enhanced_final_score', 0),
reverse=True)
top_candidates = sorted_candidates[:number_of_candidates]
print(f"🎖️ أفضل {len(top_candidates)} مرشح:")
for i, candidate in enumerate(top_candidates):
score = candidate.get('enhanced_final_score', 0)
strategy = candidate.get('recommended_strategy', 'GENERIC')
mc_score = candidate.get('monte_carlo_probability', 0)
pattern = candidate.get('pattern_analysis', {}).get('pattern_detected', 'no_pattern')
print(f" {i+1}. {candidate['symbol']}:")
print(f" 📊 النهائي: {score:.3f}")
if mc_score > 0:
print(f" 🎯 مونت كارلو: {mc_score:.3f}")
print(f" 🎯 استراتيجية: {strategy} | نمط: {pattern}")
return top_candidates
def safe_json_parse(json_string):
"""تحليل JSON آمن مع معالجة الأخطاء"""
try:
return json.loads(json_string)
except json.JSONDecodeError as e:
try:
json_string = json_string.replace("'", '"')
json_string = re.sub(r'(\w+):', r'"\1":', json_string)
json_string = re.sub(r',\s*}', '}', json_string)
json_string = re.sub(r',\s*]', ']', json_string)
return json.loads(json_string)
except json.JSONDecodeError:
print(f"❌ فشل تحليل JSON بعد الإصلاح: {e}")
return None
async def process_multiple_symbols_parallel(self, symbols_data_list, max_concurrent=20):
"""معالجة متعددة للرموز بشكل متوازي مع التحكم في التزامن"""
try:
print(f"🚀 بدء المعالجة المتوازية لـ {len(symbols_data_list)} رمز (بحد أقصى {max_concurrent} متزامنة)...")
# تقسيم العمل إلى دفعات لتجنب الحمل الزائد
batches = [symbols_data_list[i:i + max_concurrent]
for i in range(0, len(symbols_data_list), max_concurrent)]
all_results = []
for batch_num, batch in enumerate(batches):
print(f" 🔄 معالجة الدفعة {batch_num + 1}/{len(batches)} ({len(batch)} رمز)...")
# إنشاء مهام للدفعة الحالية
batch_tasks = []
for symbol_data in batch:
task = asyncio.create_task(self.process_and_score_symbol_enhanced(symbol_data))
batch_tasks.append(task)
# انتظار انتهاء الدفعة الحالية
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
# تصفية النتائج الناجحة
successful_results = []
for result in batch_results:
if isinstance(result, Exception):
continue
if result and result.get('enhanced_final_score', 0) > 0.4:
successful_results.append(result)
all_results.extend(successful_results)
print(f" ✅ اكتملت الدفعة {batch_num + 1}: {len(successful_results)}/{len(batch)} ناجحة")
# انتظار قصير بين الدفعات لتجنب rate limits
if batch_num < len(batches) - 1:
await asyncio.sleep(1)
print(f"🎯 اكتملت المعالجة المتوازية: {len(all_results)}/{len(symbols_data_list)} رمز تم تحليلها بنجاح")
return all_results
except Exception as error:
print(f"❌ خطأ في المعالجة المتوازية: {error}")
return []
print("✅ ML Processor loaded - No Default Values & Enhanced Analysis")