Trad / ml_engine /patterns.py
Riy777's picture
Update ml_engine/patterns.py
5b4dadd verified
# ml_engine/patterns.py
# (V11.1 - XGBoost Multi-Timeframe Engine with Memory Management)
import os
import json
import numpy as np
import pandas as pd
import xgboost as xgb
import asyncio
import io
import logging
import gc # استيراد garbage collector لتنظيف الذاكرة
# استيراد الـ Pipeline الجديد
try:
from .xgboost_pattern_v2 import transform_candles_for_ml
except ImportError:
print("❌ [PatternEngineV11.1] فشل استيراد 'xgboost_pattern_v2'. تأكد من وجود الملف.")
transform_candles_for_ml = None
# إعداد التسجيل
logging.basicConfig(level=logging.INFO, format='%(asctime)s - [PatternEngine] - %(message)s')
logger = logging.getLogger(__name__)
class ChartPatternAnalyzer:
def __init__(self, r2_service=None, models_dir="ml_models/xgboost_pattern2"):
"""
تهيئة محرك الأنماط الجديد المعتمد على XGBoost.
Args:
r2_service: خدمة R2 (اختياري).
models_dir: المجلد المحلي الذي يحتوي على نماذج JSON.
"""
self.r2_service = r2_service
self.models_dir = models_dir
self.models = {} # لتخزين النماذج المحملة
# الأطر الزمنية المدعومة وأوزان التصويت الجديدة (التركيز على القصير)
self.timeframe_weights = {
'15m': 0.40,
'1h': 0.30,
'5m': 0.20,
'4h': 0.10,
'1d': 0.00
}
self.supported_timeframes = list(self.timeframe_weights.keys())
self.initialized = False
async def initialize(self):
"""
تحميل جميع نماذج XGBoost المتوفرة.
"""
if self.initialized: return True
logger.info(f"بدء تحميل نماذج XGBoost من: {self.models_dir}...")
if not os.path.exists(self.models_dir):
logger.error(f"❌ المجلد غير موجود: {self.models_dir}")
return False
loaded_count = 0
for tf in self.supported_timeframes:
model_path = os.path.join(self.models_dir, f"xgb_{tf}.json")
if os.path.exists(model_path):
try:
model = xgb.Booster()
model.load_model(model_path)
self.models[tf] = model
loaded_count += 1
logger.info(f" ✅ تم تحميل نموذج {tf}")
except Exception as e:
logger.error(f" ❌ فشل تحميل نموذج {tf}: {e}")
else:
if self.timeframe_weights.get(tf, 0) > 0:
logger.warning(f" ⚠️ نموذج {tf} غير موجود (مطلوب بوزن {self.timeframe_weights[tf]}).")
if loaded_count > 0:
self.initialized = True
logger.info(f"✅ تم تهيئة المحرك. النماذج المحملة: {loaded_count}/{len(self.supported_timeframes)}")
return True
else:
logger.error("❌ لم يتم تحميل أي نموذج.")
return False
async def detect_chart_patterns(self, ohlcv_data: dict) -> dict:
"""
تحليل الأنماط باستخدام النماذج المتاحة وتطبيق التصويت الموزون.
"""
if not self.initialized or not transform_candles_for_ml:
return self._get_empty_result("Engine not initialized or pipeline missing")
details = {}
weighted_score_sum = 0.0
total_weight_used = 0.0
for tf, model in self.models.items():
candles = ohlcv_data.get(tf)
if candles and len(candles) >= 200:
try:
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
X_features = transform_candles_for_ml(df)
if X_features is not None:
dtest = xgb.DMatrix(X_features)
prob_up = model.predict(dtest)[0]
details[tf] = float(prob_up)
weight = self.timeframe_weights.get(tf, 0.0)
if weight > 0:
weighted_score_sum += prob_up * weight
total_weight_used += weight
except Exception:
details[tf] = None
else:
details[tf] = None
final_score = 0.0
if total_weight_used > 0:
final_score = weighted_score_sum / total_weight_used
pattern_text = "Neutral"
if final_score >= 0.60: pattern_text = "Bullish Signal"
elif final_score <= 0.40: pattern_text = "Bearish Signal"
return {
'pattern_detected': pattern_text,
'pattern_confidence': float(final_score),
'details': details
}
def _get_empty_result(self, reason=""):
return {'pattern_detected': 'Neutral / Error', 'pattern_confidence': 0.0, 'details': {'error': reason}}
# 🔴 دالة جديدة لتنظيف الذاكرة
def clear_memory(self):
"""تحرير النماذج من الذاكرة"""
self.models.clear()
self.initialized = False
gc.collect()
logger.info("🧹 [PatternEngine] تم تنظيف الذاكرة وتحرير النماذج.")
print("✅ ML Module: Pattern Engine V11.1 (Memory Managed) loaded")