Trad / ml_engine /monte_carlo.py
Riy777's picture
Update ml_engine/monte_carlo.py
22184d7
raw
history blame
14.9 kB
# ml_engine/monte_carlo.py
import numpy as np
import pandas as pd
from arch import arch_model
import lightgbm as lgb
import traceback # (Import traceback)
import json # (Import json for sanitizing)
# (Import pandas_ta or set to None)
try:
import pandas_ta as ta
except ImportError:
print("⚠️ مكتبة pandas_ta غير موجودة، سيتم استخدام حسابات يدوية للمؤشرات.")
ta = None
# 🔴 --- START OF CHANGE --- 🔴
# (New Helper function to fix JSON serialization)
def _sanitize_results_for_json(results_dict):
"""
Recursively converts numpy types (ndarray, np.float64, etc.)
in a dictionary to standard Python types (list, float)
to make it JSON serializable.
"""
if isinstance(results_dict, dict):
return {k: _sanitize_results_for_json(v) for k, v in results_dict.items()}
elif isinstance(results_dict, list):
return [_sanitize_results_for_json(v) for v in results_dict]
elif isinstance(results_dict, np.ndarray):
return results_dict.tolist() # (Fixes ndarray error)
elif isinstance(results_dict, (np.float64, np.float32, np.float_)):
return float(results_dict) # (Fixes np.float error)
elif isinstance(results_dict, (np.int64, np.int32, np.int_)):
return int(results_dict) # (Proactive fix for int types)
else:
return results_dict
# 🔴 --- END OF CHANGE --- 🔴
class MonteCarloAnalyzer:
def __init__(self):
self.simulation_results = {}
async def generate_1h_price_distribution(self, ohlcv_data, target_profit_percent=0.005):
"""
(المرحلة 1 - سريعة)
"""
try:
# (Data quality checks - unchanged)
if not ohlcv_data or '1h' not in ohlcv_data or len(ohlcv_data['1h']) < 30:
if '15m' in ohlcv_data and len(ohlcv_data['15m']) >= 50:
closes = np.array([candle[4] for candle in ohlcv_data['15m']])
else:
self.simulation_results = {'error': 'Insufficient OHLCV data (< 30 candles 1h)'}
return None
else:
all_closes = [candle[4] for candle in ohlcv_data['1h']]
if '15m' in ohlcv_data and len(ohlcv_data['15m']) >= 16:
all_closes.extend([candle[4] for candle in ohlcv_data['15m'][-16:]])
closes = np.array(all_closes)
if len(closes) < 30:
self.simulation_results = {'error': 'Insufficient combined OHLCV data (< 30 candles)'}
return None
current_price = closes[-1]
if current_price <= 0:
self.simulation_results = {'error': 'Invalid current price <= 0'}
return None
# (Statistical calculation - unchanged)
log_returns = np.log(closes[1:] / closes[:-1])
log_returns = log_returns[~np.isnan(log_returns) & ~np.isinf(log_returns)]
if len(log_returns) < 20:
self.simulation_results = {'error': 'Insufficient log returns (< 20)'}
return None
mean_return = np.mean(log_returns)
std_return = np.std(log_returns)
# (Simulation parameters - unchanged)
num_simulations = 5000
t_df = 10
jump_lambda = 0.05
jump_mean = 0.0
jump_std = std_return * 3.0
# (Simulation run - unchanged)
drift = (mean_return - 0.5 * std_return**2)
diffusion = std_return * np.random.standard_t(df=t_df, size=num_simulations)
jump_mask = np.random.rand(num_simulations) < jump_lambda
jump_sizes = np.random.normal(jump_mean, jump_std, size=num_simulations)
jump_component = np.zeros(num_simulations)
jump_component[jump_mask] = jump_sizes[jump_mask]
simulated_log_returns = drift + diffusion + jump_component
simulated_prices = current_price * np.exp(simulated_log_returns)
# (Output calculation - unchanged)
mean_price = np.mean(simulated_prices)
median_price = np.median(simulated_prices)
percentiles = np.percentile(simulated_prices, [2.5, 5, 25, 50, 75, 95, 97.5])
pi_95 = [percentiles[0], percentiles[-1]]
pi_90 = [percentiles[1], percentiles[-2]]
pi_50 = [percentiles[2], percentiles[4]]
VaR_95_price = percentiles[1]
VaR_95_value = current_price - VaR_95_price
losses_beyond_var = simulated_prices[simulated_prices <= VaR_95_price]
CVR_95_price = np.mean(losses_beyond_var) if len(losses_beyond_var) > 0 else VaR_95_price
CVaR_95_value = current_price - CVR_95_price
target_price = current_price * (1 + target_profit_percent)
probability_of_gain = np.mean(simulated_prices >= target_price)
self.simulation_results = {
'simulation_model': 'Phase1_Student-t_JumpDiffusion',
'num_simulations': num_simulations,
'current_price': current_price,
'distribution_summary': {'mean_price': mean_price, 'median_price': median_price},
'prediction_interval_50': pi_50,
'prediction_interval_90': pi_90,
'prediction_interval_95': pi_95,
'risk_metrics': {
'VaR_95_price': VaR_95_price,
'VaR_95_value': VaR_95_value,
'CVaR_95_price': CVR_95_price,
'CVaR_95_value': CVaR_95_value,
},
'probability_of_gain': probability_of_gain,
'raw_simulated_prices': simulated_prices[:100] # (This is the ndarray)
}
# 🔴 --- START OF CHANGE --- 🔴
# (Sanitize the results before returning)
return _sanitize_results_for_json(self.simulation_results)
# 🔴 --- END OF CHANGE --- 🔴
except Exception as e:
print(f"❌ خطأ فادح في محاكاة مونت كارلو (Phase 1): {e}")
traceback.print_exc()
self.simulation_results = {'error': f'Phase 1 MC Error: {str(e)}'}
return None
# 🔴 --- دالة جديدة --- 🔴
async def generate_1h_distribution_advanced(self, ohlcv_data, target_profit_percent=0.005):
"""
(المرحلة 2+3 - متقدمة)
"""
try:
# 1. إعداد البيانات (DataFrame)
if not ohlcv_data or '1h' not in ohlcv_data or len(ohlcv_data['1h']) < 50:
self.simulation_results = {'error': 'Advanced MC requires 1h data (>= 50 candles)'}
return await self.generate_1h_price_distribution(ohlcv_data, target_profit_percent)
candles = ohlcv_data['1h']
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
if df.empty or len(df) < 50:
raise ValueError("DataFrame creation failed or insufficient data after processing")
current_price = df['close'].iloc[-1]
df['log_returns'] = np.log(df['close'] / df['close'].shift(1)).fillna(0)
log_returns_series = df['log_returns'].replace([np.inf, -np.inf], 0)
# 3. (Phase 2) توقع التقلب باستخدام GARCH(1,1)
try:
# 🔴 --- START OF CHANGE --- 🔴
# (Fix DataScaleWarning: Rescale by 10000 instead of 100)
garch_model = arch_model(log_returns_series * 10000, vol='Garch', p=1, q=1, dist='t')
res = garch_model.fit(update_freq=0, disp='off')
forecast = res.forecast(horizon=1)
# (Divide by 10000^2)
forecasted_var = forecast.variance.iloc[-1, 0] / (10000**2)
forecasted_std_return = np.sqrt(forecasted_var)
# 🔴 --- END OF CHANGE --- 🔴
except Exception as garch_err:
forecasted_std_return = np.std(log_returns_series.iloc[-30:])
print(f"⚠️ GARCH failed, using std: {garch_err}")
# 4. (Phase 3) توقع الميل (Drift) باستخدام LightGBM
try:
# 4a. هندسة الميزات (Unchanged)
if ta:
df['rsi'] = ta.rsi(df['close'], length=14)
macd = ta.macd(df['close'], fast=12, slow=26, signal=9)
df['macd_hist'] = macd['MACDh_12_26_9']
else:
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / (loss + 1e-9) # (Added 1e-9 to prevent zero division)
df['rsi'] = 100 - (100 / (1 + rs))
df['macd_hist'] = df['close'].ewm(span=12).mean() - df['close'].ewm(span=26).mean()
df['lag_1'] = df['log_returns'].shift(1)
df['lag_2'] = df['log_returns'].shift(2)
features = ['rsi', 'macd_hist', 'lag_1', 'lag_2']
df.dropna(inplace=True)
if df.empty or len(df) < 20:
raise ValueError("Insufficient data after feature engineering")
# 4b. إعداد بيانات التدريب والتنبؤ (Unchanged)
df['target'] = df['log_returns'].shift(-1)
df.dropna(inplace=True)
X = df[features]
y = df['target']
X_train, y_train = X.iloc[:-1], y.iloc[:-1]
X_predict = X.iloc[-1:]
# 4c. تدريب نموذج LGBM (Unchanged)
lgbm_model = lgb.LGBMRegressor(n_estimators=100, learning_rate=0.1, n_jobs=1, verbose=-1)
lgbm_model.fit(X_train, y_train)
# 4d. التنبؤ بالميل (Unchanged)
forecasted_mean_return = lgbm_model.predict(X_predict)[0]
except Exception as lgbm_err:
forecasted_mean_return = np.mean(log_returns_series.iloc[-30:])
print(f"⚠️ LGBM failed, using mean: {lgbm_err}")
# 5. تشغيل المحاكاة بالقيم الديناميكية (Unchanged)
num_simulations = 5000
t_df = 10
jump_lambda = 0.05
jump_mean = 0.0
jump_std = forecasted_std_return * 3.0
mean_return = forecasted_mean_return
std_return = forecasted_std_return
drift = (mean_return - 0.5 * std_return**2)
diffusion = std_return * np.random.standard_t(df=t_df, size=num_simulations)
jump_mask = np.random.rand(num_simulations) < jump_lambda
jump_sizes = np.random.normal(jump_mean, jump_std, size=num_simulations)
jump_component = np.zeros(num_simulations)
jump_component[jump_mask] = jump_sizes[jump_mask]
simulated_log_returns = drift + diffusion + jump_component
simulated_prices = current_price * np.exp(simulated_log_returns)
# 6. حساب المخرجات والتوزيع (Unchanged)
mean_price = np.mean(simulated_prices)
median_price = np.median(simulated_prices)
percentiles = np.percentile(simulated_prices, [2.5, 5, 25, 50, 75, 95, 97.5])
pi_95 = [percentiles[0], percentiles[-1]]
pi_90 = [percentiles[1], percentiles[-2]]
pi_50 = [percentiles[2], percentiles[4]]
VaR_95_price = percentiles[1]
VaR_95_value = current_price - VaR_95_price
losses_beyond_var = simulated_prices[simulated_prices <= VaR_95_price]
CVR_95_price = np.mean(losses_beyond_var) if len(losses_beyond_var) > 0 else VaR_95_price
CVaR_95_value = current_price - CVR_95_price
target_price = current_price * (1 + target_profit_percent)
probability_of_gain = np.mean(simulated_prices >= target_price)
self.simulation_results = {
'simulation_model': 'Phase2_GARCH_LGBM',
'num_simulations': num_simulations,
'current_price': current_price,
'forecasted_drift_lgbm': forecasted_mean_return,
'forecasted_vol_garch': forecasted_std_return,
'distribution_summary': {'mean_price': mean_price, 'median_price': median_price},
'prediction_interval_50': pi_50,
'prediction_interval_90': pi_90,
'prediction_interval_95': pi_95,
'risk_metrics': {
'VaR_95_price': VaR_95_price,
'VaR_95_value': VaR_95_value,
'CVaR_95_price': CVR_95_price,
'CVaR_95_value': CVaR_95_value,
},
'probability_of_gain': probability_of_gain,
'raw_simulated_prices': simulated_prices[:100] # (This is the ndarray)
}
# 🔴 --- START OF CHANGE --- 🔴
# (Sanitize the results before returning)
return _sanitize_results_for_json(self.simulation_results)
# 🔴 --- END OF CHANGE --- 🔴
except Exception as e:
print(f"❌ خطأ فادح في محاكاة مونت كارلو المتقدمة (GARCH/LGBM): {e}")
traceback.print_exc()
self.simulation_results = {'error': f'Advanced MC Error: {str(e)}'}
# (Fall back to Phase 1, which also sanitizes its output now)
return await self.generate_1h_price_distribution(ohlcv_data, target_profit_percent)
def _calculate_trend_adjustment(self, closes):
"""(غير مستخدمة حالياً)"""
try:
if len(closes) < 10: return 1.0
recent_trend = (closes[-1] - closes[-10]) / closes[-10]
if recent_trend > 0.02: return 1.2
elif recent_trend > 0.01: return 1.1
elif recent_trend < -0.02: return 0.8
elif recent_trend < -0.01: return 0.9
else: return 1.0
except Exception: return 1.0
print("✅ ML Module: Advanced Monte Carlo Analyzer loaded (FIXED: JSON Serializable & GARCH Scale)")