# ml_engine/monte_carlo.py (Updated to V10.0 - Added Simple Sim for Ranker) import numpy as np import pandas as pd from arch import arch_model import lightgbm as lgb import traceback import json try: import pandas_ta as ta except ImportError: print("⚠️ مكتبة pandas_ta غير موجودة، سيتم استخدام حسابات يدوية للمؤشرات.") ta = None def _sanitize_results_for_json(results_dict): """ Recursively converts numpy types (ndarray, np.float64, etc.) in a dictionary to standard Python types (list, float) to make it JSON serializable. """ if isinstance(results_dict, dict): return {k: _sanitize_results_for_json(v) for k, v in results_dict.items()} elif isinstance(results_dict, list): return [_sanitize_results_for_json(v) for v in results_dict] elif isinstance(results_dict, np.ndarray): return results_dict.tolist() elif isinstance(results_dict, (np.float64, np.float32)): return float(results_dict) elif isinstance(results_dict, (np.int64, np.int32)): return int(results_dict) else: return results_dict class MonteCarloAnalyzer: def __init__(self): self.simulation_results = {} # 🔴 --- START OF NEW FUNCTION (V10.0) --- 🔴 # (هذه الدالة هي التي يحتاجها الرانكر V9.8) def generate_1h_price_distribution_simple(self, closes_np: np.ndarray, target_profit_percent=0.005) -> dict: """ (V10.0) نسخة سريعة جداً (غير متزامنة) مخصصة للرانكر. تقبل numpy array مباشرة وتُرجع الميزات المطلوبة فقط. """ try: # (نحتاج 100 شمعة كما في Colab) if len(closes_np) < 30: return {'mc_prob_gain': 0.5, 'mc_var_95_pct': 0.0, 'error': True} current_price = closes_np[-1] if current_price <= 0: return {'mc_prob_gain': 0.5, 'mc_var_95_pct': 0.0, 'error': True} log_returns = np.log(closes_np[1:] / closes_np[:-1]) log_returns = log_returns[~np.isnan(log_returns) & ~np.isinf(log_returns)] if len(log_returns) < 20: return {'mc_prob_gain': 0.5, 'mc_var_95_pct': 0.0, 'error': True} mean_return = np.mean(log_returns) std_return = np.std(log_returns) if std_return < 1e-5: # (واقي العملة المستقرة) return {'mc_prob_gain': 0.5, 'mc_var_95_pct': 0.0, 'error': True} num_simulations = 1000 # (سريعة، كما في Colab) t_df = 10 jump_lambda = 0.05 jump_mean = 0.0 jump_std = std_return * 3.0 drift = (mean_return - 0.5 * std_return**2) diffusion = std_return * np.random.standard_t(df=t_df, size=num_simulations) jump_mask = np.random.rand(num_simulations) < jump_lambda jump_sizes = np.random.normal(jump_mean, jump_std, size=num_simulations) jump_component = np.zeros(num_simulations) jump_component[jump_mask] = jump_sizes[jump_mask] simulated_log_returns = drift + diffusion + jump_component simulated_prices = current_price * np.exp(simulated_log_returns) percentiles = np.percentile(simulated_prices, [5]) VaR_95_price = percentiles[0] # (كنسبة مئوية، مع واقي من القسمة على صفر) VaR_95_value = (current_price - VaR_95_price) / (current_price + 1e-9) target_price = current_price * (1 + target_profit_percent) probability_of_gain = np.mean(simulated_prices >= target_price) return { 'mc_prob_gain': probability_of_gain, 'mc_var_95_pct': VaR_95_value, 'error': False } except Exception: # (إرجاع قيم محايدة في حالة الفشل) return {'mc_prob_gain': 0.5, 'mc_var_95_pct': 0.0, 'error': True} # 🔴 --- END OF NEW FUNCTION (V10.0) --- 🔴 async def generate_1h_price_distribution(self, ohlcv_data, target_profit_percent=0.005): """ (المرحلة 1 - سريعة) - (للاستخدامات القديمة إن وجدت) """ try: if not ohlcv_data or '1h' not in ohlcv_data or len(ohlcv_data['1h']) < 30: if '15m' in ohlcv_data and len(ohlcv_data['15m']) >= 50: closes = np.array([candle[4] for candle in ohlcv_data['15m']]) else: self.simulation_results = {'error': 'Insufficient OHLCV data (< 30 candles 1h)'} return None else: all_closes = [candle[4] for candle in ohlcv_data['1h']] if '15m' in ohlcv_data and len(ohlcv_data['15m']) >= 16: all_closes.extend([candle[4] for candle in ohlcv_data['15m'][-16:]]) closes = np.array(all_closes) if len(closes) < 30: self.simulation_results = {'error': 'Insufficient combined OHLCV data (< 30 candles)'} return None current_price = closes[-1] if current_price <= 0: self.simulation_results = {'error': 'Invalid current price <= 0'} return None log_returns = np.log(closes[1:] / closes[:-1]) log_returns = log_returns[~np.isnan(log_returns) & ~np.isinf(log_returns)] if len(log_returns) < 20: self.simulation_results = {'error': 'Insufficient log returns (< 20)'} return None mean_return = np.mean(log_returns) std_return = np.std(log_returns) if std_return < 1e-5: # (واقي العملة المستقرة) self.simulation_results = {'error': 'Zero volatility detected (Stablecoin?)'} return None num_simulations = 5000 t_df = 10 jump_lambda = 0.05 jump_mean = 0.0 jump_std = std_return * 3.0 drift = (mean_return - 0.5 * std_return**2) diffusion = std_return * np.random.standard_t(df=t_df, size=num_simulations) jump_mask = np.random.rand(num_simulations) < jump_lambda jump_sizes = np.random.normal(jump_mean, jump_std, size=num_simulations) jump_component = np.zeros(num_simulations) jump_component[jump_mask] = jump_sizes[jump_mask] simulated_log_returns = drift + diffusion + jump_component simulated_prices = current_price * np.exp(simulated_log_returns) mean_price = np.mean(simulated_prices) median_price = np.median(simulated_prices) percentiles = np.percentile(simulated_prices, [2.5, 5, 25, 50, 75, 95, 97.5]) pi_95 = [percentiles[0], percentiles[-1]] pi_90 = [percentiles[1], percentiles[-2]] pi_50 = [percentiles[2], percentiles[4]] VaR_95_price = percentiles[1] VaR_95_value = current_price - VaR_95_price losses_beyond_var = simulated_prices[simulated_prices <= VaR_95_price] CVR_95_price = np.mean(losses_beyond_var) if len(losses_beyond_var) > 0 else VaR_95_price CVaR_95_value = current_price - CVR_95_price target_price = current_price * (1 + target_profit_percent) probability_of_gain = np.mean(simulated_prices >= target_price) self.simulation_results = { 'simulation_model': 'Phase1_Student-t_JumpDiffusion', 'num_simulations': num_simulations, 'current_price': current_price, 'distribution_summary': {'mean_price': mean_price, 'median_price': median_price}, 'prediction_interval_50': pi_50, 'prediction_interval_90': pi_90, 'prediction_interval_95': pi_95, 'risk_metrics': { 'VaR_95_price': VaR_95_price, 'VaR_95_value': VaR_95_value, 'CVaR_95_price': CVR_95_price, 'CVaR_95_value': CVaR_95_value, }, 'probability_of_gain': probability_of_gain, 'raw_simulated_prices': simulated_prices[:100] } return _sanitize_results_for_json(self.simulation_results) except Exception as e: print(f"❌ خطأ فادح في محاكاة مونت كارلو (Phase 1): {e}") traceback.print_exc() self.simulation_results = {'error': f'Phase 1 MC Error: {str(e)}'} return None async def generate_1h_distribution_advanced(self, ohlcv_data, target_profit_percent=0.005): """ (المرحلة 2+3 - متقدمة) - (للاستخدام في الطبقة 2) """ try: if not ohlcv_data or '1h' not in ohlcv_data or len(ohlcv_data['1h']) < 50: self.simulation_results = {'error': 'Advanced MC requires 1h data (>= 50 candles)'} return await self.generate_1h_price_distribution(ohlcv_data, target_profit_percent) candles = ohlcv_data['1h'] df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df.set_index('timestamp', inplace=True) df.sort_index(inplace=True) if df.empty or len(df) < 50: raise ValueError("DataFrame creation failed or insufficient data after processing") current_price = df['close'].iloc[-1] df['log_returns'] = np.log(df['close'] / df['close'].shift(1)).fillna(0) log_returns_series = df['log_returns'].replace([np.inf, -np.inf], 0) std_return_check = np.std(log_returns_series.iloc[-30:]) if std_return_check < 1e-5: # (واقي العملة المستقرة) self.simulation_results = {'error': 'Zero volatility detected (Stablecoin?)'} return await self.generate_1h_price_distribution(ohlcv_data, target_profit_percent) # 3. (Phase 2) توقع التقلب باستخدام GARCH(1,1) try: garch_model = arch_model(log_returns_series * 100, vol='Garch', p=1, q=1, dist='t', rescale=False) res = garch_model.fit(update_freq=0, disp='off') forecast = res.forecast(horizon=1) forecasted_var = forecast.variance.iloc[-1, 0] / (100**2) forecasted_std_return = np.sqrt(forecasted_var) except Exception as garch_err: forecasted_std_return = std_return_check print(f"⚠️ GARCH failed, using std: {garch_err}") # 4. (Phase 3) توقع الميل (Drift) باستخدام LightGBM try: if ta: df['rsi'] = ta.rsi(df['close'], length=14) macd = ta.macd(df['close'], fast=12, slow=26, signal=9) df['macd_hist'] = macd['MACDh_12_26_9'] else: delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / (loss + 1e-9) df['rsi'] = 100 - (100 / (1 + rs)) df['macd_hist'] = df['close'].ewm(span=12).mean() - df['close'].ewm(span=26).mean() df['lag_1'] = df['log_returns'].shift(1) df['lag_2'] = df['log_returns'].shift(2) features = ['rsi', 'macd_hist', 'lag_1', 'lag_2'] df.dropna(inplace=True) if df.empty or len(df) < 20: raise ValueError("Insufficient data after feature engineering") df['target'] = df['log_returns'].shift(-1) df.dropna(inplace=True) X = df[features] y = df['target'] X_train, y_train = X.iloc[:-1], y.iloc[:-1] X_predict = X.iloc[-1:] lgbm_model = lgb.LGBMRegressor(n_estimators=100, learning_rate=0.1, n_jobs=1, verbose=-1) lgbm_model.fit(X_train, y_train) forecasted_mean_return = lgbm_model.predict(X_predict)[0] except Exception as lgbm_err: forecasted_mean_return = np.mean(log_returns_series.iloc[-30:]) print(f"⚠️ LGBM failed, using mean: {lgbm_err}") # 5. تشغيل المحاكاة بالقيم الديناميكية num_simulations = 5000 t_df = 10 jump_lambda = 0.05 jump_mean = 0.0 jump_std = forecasted_std_return * 3.0 mean_return = forecasted_mean_return std_return = forecasted_std_return drift = (mean_return - 0.5 * std_return**2) diffusion = std_return * np.random.standard_t(df=t_df, size=num_simulations) jump_mask = np.random.rand(num_simulations) < jump_lambda jump_sizes = np.random.normal(jump_mean, jump_std, size=num_simulations) jump_component = np.zeros(num_simulations) jump_component[jump_mask] = jump_sizes[jump_mask] simulated_log_returns = drift + diffusion + jump_component simulated_prices = current_price * np.exp(simulated_log_returns) # 6. حساب المخرجات والتوزيع mean_price = np.mean(simulated_prices) median_price = np.median(simulated_prices) percentiles = np.percentile(simulated_prices, [2.5, 5, 25, 50, 75, 95, 97.5]) pi_95 = [percentiles[0], percentiles[-1]] pi_90 = [percentiles[1], percentiles[-2]] pi_50 = [percentiles[2], percentiles[4]] VaR_95_price = percentiles[1] VaR_95_value = current_price - VaR_95_price losses_beyond_var = simulated_prices[simulated_prices <= VaR_95_price] CVR_95_price = np.mean(losses_beyond_var) if len(losses_beyond_var) > 0 else VaR_95_price CVaR_95_value = current_price - CVR_95_price target_price = current_price * (1 + target_profit_percent) probability_of_gain = np.mean(simulated_prices >= target_price) self.simulation_results = { 'simulation_model': 'Phase2_GARCH_LGBM', 'num_simulations': num_simulations, 'current_price': current_price, 'forecasted_drift_lgbm': forecasted_mean_return, 'forecasted_vol_garch': forecasted_std_return, 'distribution_summary': {'mean_price': mean_price, 'median_price': median_price}, 'prediction_interval_50': pi_50, 'prediction_interval_90': pi_90, 'prediction_interval_95': pi_95, 'risk_metrics': { 'VaR_95_price': VaR_95_price, 'VaR_95_value': VaR_95_value, 'CVaR_95_price': CVR_95_price, 'CVaR_95_value': CVaR_95_value, }, 'probability_of_gain': probability_of_gain, 'raw_simulated_prices': simulated_prices[:100] } return _sanitize_results_for_json(self.simulation_results) except Exception as e: print(f"❌ خطأ فادح في محاكاة مونت كارلو المتقدمة (GARCH/LGBM): {e}") traceback.print_exc() self.simulation_results = {'error': f'Advanced MC Error: {str(e)}'} # العودة إلى المرحلة 1 في حالة الفشل الفادح return await self.generate_1h_price_distribution(ohlcv_data, target_profit_percent) def _calculate_trend_adjustment(self, closes): """(غير مستخدمة حالياً)""" try: if len(closes) < 10: return 1.0 recent_trend = (closes[-1] - closes[-10]) / closes[-10] if recent_trend > 0.02: return 1.2 elif recent_trend > 0.01: return 1.1 elif recent_trend < -0.02: return 0.8 elif recent_trend < -0.01: return 0.9 else: return 1.0 except Exception: return 1.0 print("✅ ML Module: Advanced Monte Carlo Analyzer loaded (V10.0 - Simple Sim Added)")