Trad / ml_engine /monte_carlo.py
Riy777's picture
Update ml_engine/monte_carlo.py
fd9c5b3
raw
history blame
17 kB
# ml_engine/monte_carlo.py (Updated to V10.0 - Added Simple Sim for Ranker)
import numpy as np
import pandas as pd
from arch import arch_model
import lightgbm as lgb
import traceback
import json
try:
import pandas_ta as ta
except ImportError:
print("⚠️ مكتبة pandas_ta غير موجودة، سيتم استخدام حسابات يدوية للمؤشرات.")
ta = None
def _sanitize_results_for_json(results_dict):
"""
Recursively converts numpy types (ndarray, np.float64, etc.)
in a dictionary to standard Python types (list, float)
to make it JSON serializable.
"""
if isinstance(results_dict, dict):
return {k: _sanitize_results_for_json(v) for k, v in results_dict.items()}
elif isinstance(results_dict, list):
return [_sanitize_results_for_json(v) for v in results_dict]
elif isinstance(results_dict, np.ndarray):
return results_dict.tolist()
elif isinstance(results_dict, (np.float64, np.float32)):
return float(results_dict)
elif isinstance(results_dict, (np.int64, np.int32)):
return int(results_dict)
else:
return results_dict
class MonteCarloAnalyzer:
def __init__(self):
self.simulation_results = {}
# 🔴 --- START OF NEW FUNCTION (V10.0) --- 🔴
# (هذه الدالة هي التي يحتاجها الرانكر V9.8)
def generate_1h_price_distribution_simple(self, closes_np: np.ndarray, target_profit_percent=0.005) -> dict:
"""
(V10.0) نسخة سريعة جداً (غير متزامنة) مخصصة للرانكر.
تقبل numpy array مباشرة وتُرجع الميزات المطلوبة فقط.
"""
try:
# (نحتاج 100 شمعة كما في Colab)
if len(closes_np) < 30:
return {'mc_prob_gain': 0.5, 'mc_var_95_pct': 0.0, 'error': True}
current_price = closes_np[-1]
if current_price <= 0:
return {'mc_prob_gain': 0.5, 'mc_var_95_pct': 0.0, 'error': True}
log_returns = np.log(closes_np[1:] / closes_np[:-1])
log_returns = log_returns[~np.isnan(log_returns) & ~np.isinf(log_returns)]
if len(log_returns) < 20:
return {'mc_prob_gain': 0.5, 'mc_var_95_pct': 0.0, 'error': True}
mean_return = np.mean(log_returns)
std_return = np.std(log_returns)
if std_return < 1e-5: # (واقي العملة المستقرة)
return {'mc_prob_gain': 0.5, 'mc_var_95_pct': 0.0, 'error': True}
num_simulations = 1000 # (سريعة، كما في Colab)
t_df = 10
jump_lambda = 0.05
jump_mean = 0.0
jump_std = std_return * 3.0
drift = (mean_return - 0.5 * std_return**2)
diffusion = std_return * np.random.standard_t(df=t_df, size=num_simulations)
jump_mask = np.random.rand(num_simulations) < jump_lambda
jump_sizes = np.random.normal(jump_mean, jump_std, size=num_simulations)
jump_component = np.zeros(num_simulations)
jump_component[jump_mask] = jump_sizes[jump_mask]
simulated_log_returns = drift + diffusion + jump_component
simulated_prices = current_price * np.exp(simulated_log_returns)
percentiles = np.percentile(simulated_prices, [5])
VaR_95_price = percentiles[0]
# (كنسبة مئوية، مع واقي من القسمة على صفر)
VaR_95_value = (current_price - VaR_95_price) / (current_price + 1e-9)
target_price = current_price * (1 + target_profit_percent)
probability_of_gain = np.mean(simulated_prices >= target_price)
return {
'mc_prob_gain': probability_of_gain,
'mc_var_95_pct': VaR_95_value,
'error': False
}
except Exception:
# (إرجاع قيم محايدة في حالة الفشل)
return {'mc_prob_gain': 0.5, 'mc_var_95_pct': 0.0, 'error': True}
# 🔴 --- END OF NEW FUNCTION (V10.0) --- 🔴
async def generate_1h_price_distribution(self, ohlcv_data, target_profit_percent=0.005):
"""
(المرحلة 1 - سريعة) - (للاستخدامات القديمة إن وجدت)
"""
try:
if not ohlcv_data or '1h' not in ohlcv_data or len(ohlcv_data['1h']) < 30:
if '15m' in ohlcv_data and len(ohlcv_data['15m']) >= 50:
closes = np.array([candle[4] for candle in ohlcv_data['15m']])
else:
self.simulation_results = {'error': 'Insufficient OHLCV data (< 30 candles 1h)'}
return None
else:
all_closes = [candle[4] for candle in ohlcv_data['1h']]
if '15m' in ohlcv_data and len(ohlcv_data['15m']) >= 16:
all_closes.extend([candle[4] for candle in ohlcv_data['15m'][-16:]])
closes = np.array(all_closes)
if len(closes) < 30:
self.simulation_results = {'error': 'Insufficient combined OHLCV data (< 30 candles)'}
return None
current_price = closes[-1]
if current_price <= 0:
self.simulation_results = {'error': 'Invalid current price <= 0'}
return None
log_returns = np.log(closes[1:] / closes[:-1])
log_returns = log_returns[~np.isnan(log_returns) & ~np.isinf(log_returns)]
if len(log_returns) < 20:
self.simulation_results = {'error': 'Insufficient log returns (< 20)'}
return None
mean_return = np.mean(log_returns)
std_return = np.std(log_returns)
if std_return < 1e-5: # (واقي العملة المستقرة)
self.simulation_results = {'error': 'Zero volatility detected (Stablecoin?)'}
return None
num_simulations = 5000
t_df = 10
jump_lambda = 0.05
jump_mean = 0.0
jump_std = std_return * 3.0
drift = (mean_return - 0.5 * std_return**2)
diffusion = std_return * np.random.standard_t(df=t_df, size=num_simulations)
jump_mask = np.random.rand(num_simulations) < jump_lambda
jump_sizes = np.random.normal(jump_mean, jump_std, size=num_simulations)
jump_component = np.zeros(num_simulations)
jump_component[jump_mask] = jump_sizes[jump_mask]
simulated_log_returns = drift + diffusion + jump_component
simulated_prices = current_price * np.exp(simulated_log_returns)
mean_price = np.mean(simulated_prices)
median_price = np.median(simulated_prices)
percentiles = np.percentile(simulated_prices, [2.5, 5, 25, 50, 75, 95, 97.5])
pi_95 = [percentiles[0], percentiles[-1]]
pi_90 = [percentiles[1], percentiles[-2]]
pi_50 = [percentiles[2], percentiles[4]]
VaR_95_price = percentiles[1]
VaR_95_value = current_price - VaR_95_price
losses_beyond_var = simulated_prices[simulated_prices <= VaR_95_price]
CVR_95_price = np.mean(losses_beyond_var) if len(losses_beyond_var) > 0 else VaR_95_price
CVaR_95_value = current_price - CVR_95_price
target_price = current_price * (1 + target_profit_percent)
probability_of_gain = np.mean(simulated_prices >= target_price)
self.simulation_results = {
'simulation_model': 'Phase1_Student-t_JumpDiffusion',
'num_simulations': num_simulations,
'current_price': current_price,
'distribution_summary': {'mean_price': mean_price, 'median_price': median_price},
'prediction_interval_50': pi_50,
'prediction_interval_90': pi_90,
'prediction_interval_95': pi_95,
'risk_metrics': {
'VaR_95_price': VaR_95_price,
'VaR_95_value': VaR_95_value,
'CVaR_95_price': CVR_95_price,
'CVaR_95_value': CVaR_95_value,
},
'probability_of_gain': probability_of_gain,
'raw_simulated_prices': simulated_prices[:100]
}
return _sanitize_results_for_json(self.simulation_results)
except Exception as e:
print(f"❌ خطأ فادح في محاكاة مونت كارلو (Phase 1): {e}")
traceback.print_exc()
self.simulation_results = {'error': f'Phase 1 MC Error: {str(e)}'}
return None
async def generate_1h_distribution_advanced(self, ohlcv_data, target_profit_percent=0.005):
"""
(المرحلة 2+3 - متقدمة) - (للاستخدام في الطبقة 2)
"""
try:
if not ohlcv_data or '1h' not in ohlcv_data or len(ohlcv_data['1h']) < 50:
self.simulation_results = {'error': 'Advanced MC requires 1h data (>= 50 candles)'}
return await self.generate_1h_price_distribution(ohlcv_data, target_profit_percent)
candles = ohlcv_data['1h']
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
if df.empty or len(df) < 50:
raise ValueError("DataFrame creation failed or insufficient data after processing")
current_price = df['close'].iloc[-1]
df['log_returns'] = np.log(df['close'] / df['close'].shift(1)).fillna(0)
log_returns_series = df['log_returns'].replace([np.inf, -np.inf], 0)
std_return_check = np.std(log_returns_series.iloc[-30:])
if std_return_check < 1e-5: # (واقي العملة المستقرة)
self.simulation_results = {'error': 'Zero volatility detected (Stablecoin?)'}
return await self.generate_1h_price_distribution(ohlcv_data, target_profit_percent)
# 3. (Phase 2) توقع التقلب باستخدام GARCH(1,1)
try:
garch_model = arch_model(log_returns_series * 100, vol='Garch', p=1, q=1, dist='t', rescale=False)
res = garch_model.fit(update_freq=0, disp='off')
forecast = res.forecast(horizon=1)
forecasted_var = forecast.variance.iloc[-1, 0] / (100**2)
forecasted_std_return = np.sqrt(forecasted_var)
except Exception as garch_err:
forecasted_std_return = std_return_check
print(f"⚠️ GARCH failed, using std: {garch_err}")
# 4. (Phase 3) توقع الميل (Drift) باستخدام LightGBM
try:
if ta:
df['rsi'] = ta.rsi(df['close'], length=14)
macd = ta.macd(df['close'], fast=12, slow=26, signal=9)
df['macd_hist'] = macd['MACDh_12_26_9']
else:
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / (loss + 1e-9)
df['rsi'] = 100 - (100 / (1 + rs))
df['macd_hist'] = df['close'].ewm(span=12).mean() - df['close'].ewm(span=26).mean()
df['lag_1'] = df['log_returns'].shift(1)
df['lag_2'] = df['log_returns'].shift(2)
features = ['rsi', 'macd_hist', 'lag_1', 'lag_2']
df.dropna(inplace=True)
if df.empty or len(df) < 20:
raise ValueError("Insufficient data after feature engineering")
df['target'] = df['log_returns'].shift(-1)
df.dropna(inplace=True)
X = df[features]
y = df['target']
X_train, y_train = X.iloc[:-1], y.iloc[:-1]
X_predict = X.iloc[-1:]
lgbm_model = lgb.LGBMRegressor(n_estimators=100, learning_rate=0.1, n_jobs=1, verbose=-1)
lgbm_model.fit(X_train, y_train)
forecasted_mean_return = lgbm_model.predict(X_predict)[0]
except Exception as lgbm_err:
forecasted_mean_return = np.mean(log_returns_series.iloc[-30:])
print(f"⚠️ LGBM failed, using mean: {lgbm_err}")
# 5. تشغيل المحاكاة بالقيم الديناميكية
num_simulations = 5000
t_df = 10
jump_lambda = 0.05
jump_mean = 0.0
jump_std = forecasted_std_return * 3.0
mean_return = forecasted_mean_return
std_return = forecasted_std_return
drift = (mean_return - 0.5 * std_return**2)
diffusion = std_return * np.random.standard_t(df=t_df, size=num_simulations)
jump_mask = np.random.rand(num_simulations) < jump_lambda
jump_sizes = np.random.normal(jump_mean, jump_std, size=num_simulations)
jump_component = np.zeros(num_simulations)
jump_component[jump_mask] = jump_sizes[jump_mask]
simulated_log_returns = drift + diffusion + jump_component
simulated_prices = current_price * np.exp(simulated_log_returns)
# 6. حساب المخرجات والتوزيع
mean_price = np.mean(simulated_prices)
median_price = np.median(simulated_prices)
percentiles = np.percentile(simulated_prices, [2.5, 5, 25, 50, 75, 95, 97.5])
pi_95 = [percentiles[0], percentiles[-1]]
pi_90 = [percentiles[1], percentiles[-2]]
pi_50 = [percentiles[2], percentiles[4]]
VaR_95_price = percentiles[1]
VaR_95_value = current_price - VaR_95_price
losses_beyond_var = simulated_prices[simulated_prices <= VaR_95_price]
CVR_95_price = np.mean(losses_beyond_var) if len(losses_beyond_var) > 0 else VaR_95_price
CVaR_95_value = current_price - CVR_95_price
target_price = current_price * (1 + target_profit_percent)
probability_of_gain = np.mean(simulated_prices >= target_price)
self.simulation_results = {
'simulation_model': 'Phase2_GARCH_LGBM',
'num_simulations': num_simulations,
'current_price': current_price,
'forecasted_drift_lgbm': forecasted_mean_return,
'forecasted_vol_garch': forecasted_std_return,
'distribution_summary': {'mean_price': mean_price, 'median_price': median_price},
'prediction_interval_50': pi_50,
'prediction_interval_90': pi_90,
'prediction_interval_95': pi_95,
'risk_metrics': {
'VaR_95_price': VaR_95_price,
'VaR_95_value': VaR_95_value,
'CVaR_95_price': CVR_95_price,
'CVaR_95_value': CVaR_95_value,
},
'probability_of_gain': probability_of_gain,
'raw_simulated_prices': simulated_prices[:100]
}
return _sanitize_results_for_json(self.simulation_results)
except Exception as e:
print(f"❌ خطأ فادح في محاكاة مونت كارلو المتقدمة (GARCH/LGBM): {e}")
traceback.print_exc()
self.simulation_results = {'error': f'Advanced MC Error: {str(e)}'}
# العودة إلى المرحلة 1 في حالة الفشل الفادح
return await self.generate_1h_price_distribution(ohlcv_data, target_profit_percent)
def _calculate_trend_adjustment(self, closes):
"""(غير مستخدمة حالياً)"""
try:
if len(closes) < 10: return 1.0
recent_trend = (closes[-1] - closes[-10]) / closes[-10]
if recent_trend > 0.02: return 1.2
elif recent_trend > 0.01: return 1.1
elif recent_trend < -0.02: return 0.8
elif recent_trend < -0.01: return 0.9
else: return 1.0
except Exception: return 1.0
print("✅ ML Module: Advanced Monte Carlo Analyzer loaded (V10.0 - Simple Sim Added)")