# ml_engine/patterns.py # (V11.1 - XGBoost Multi-Timeframe Engine with Memory Management) import os import json import numpy as np import pandas as pd import xgboost as xgb import asyncio import io import logging import gc # استيراد garbage collector لتنظيف الذاكرة # استيراد الـ Pipeline الجديد try: from .xgboost_pattern_v2 import transform_candles_for_ml except ImportError: print("❌ [PatternEngineV11.1] فشل استيراد 'xgboost_pattern_v2'. تأكد من وجود الملف.") transform_candles_for_ml = None # إعداد التسجيل logging.basicConfig(level=logging.INFO, format='%(asctime)s - [PatternEngine] - %(message)s') logger = logging.getLogger(__name__) class ChartPatternAnalyzer: def __init__(self, r2_service=None, models_dir="ml_models/xgboost_pattern2"): """ تهيئة محرك الأنماط الجديد المعتمد على XGBoost. Args: r2_service: خدمة R2 (اختياري). models_dir: المجلد المحلي الذي يحتوي على نماذج JSON. """ self.r2_service = r2_service self.models_dir = models_dir self.models = {} # لتخزين النماذج المحملة # الأطر الزمنية المدعومة وأوزان التصويت الجديدة (التركيز على القصير) self.timeframe_weights = { '15m': 0.40, '1h': 0.30, '5m': 0.20, '4h': 0.10, '1d': 0.00 } self.supported_timeframes = list(self.timeframe_weights.keys()) self.initialized = False async def initialize(self): """ تحميل جميع نماذج XGBoost المتوفرة. """ if self.initialized: return True logger.info(f"بدء تحميل نماذج XGBoost من: {self.models_dir}...") if not os.path.exists(self.models_dir): logger.error(f"❌ المجلد غير موجود: {self.models_dir}") return False loaded_count = 0 for tf in self.supported_timeframes: model_path = os.path.join(self.models_dir, f"xgb_{tf}.json") if os.path.exists(model_path): try: model = xgb.Booster() model.load_model(model_path) self.models[tf] = model loaded_count += 1 logger.info(f" ✅ تم تحميل نموذج {tf}") except Exception as e: logger.error(f" ❌ فشل تحميل نموذج {tf}: {e}") else: if self.timeframe_weights.get(tf, 0) > 0: logger.warning(f" ⚠️ نموذج {tf} غير موجود (مطلوب بوزن {self.timeframe_weights[tf]}).") if loaded_count > 0: self.initialized = True logger.info(f"✅ تم تهيئة المحرك. النماذج المحملة: {loaded_count}/{len(self.supported_timeframes)}") return True else: logger.error("❌ لم يتم تحميل أي نموذج.") return False async def detect_chart_patterns(self, ohlcv_data: dict) -> dict: """ تحليل الأنماط باستخدام النماذج المتاحة وتطبيق التصويت الموزون. """ if not self.initialized or not transform_candles_for_ml: return self._get_empty_result("Engine not initialized or pipeline missing") details = {} weighted_score_sum = 0.0 total_weight_used = 0.0 for tf, model in self.models.items(): candles = ohlcv_data.get(tf) if candles and len(candles) >= 200: try: df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) X_features = transform_candles_for_ml(df) if X_features is not None: dtest = xgb.DMatrix(X_features) prob_up = model.predict(dtest)[0] details[tf] = float(prob_up) weight = self.timeframe_weights.get(tf, 0.0) if weight > 0: weighted_score_sum += prob_up * weight total_weight_used += weight except Exception: details[tf] = None else: details[tf] = None final_score = 0.0 if total_weight_used > 0: final_score = weighted_score_sum / total_weight_used pattern_text = "Neutral" if final_score >= 0.60: pattern_text = "Bullish Signal" elif final_score <= 0.40: pattern_text = "Bearish Signal" return { 'pattern_detected': pattern_text, 'pattern_confidence': float(final_score), 'details': details } def _get_empty_result(self, reason=""): return {'pattern_detected': 'Neutral / Error', 'pattern_confidence': 0.0, 'details': {'error': reason}} # 🔴 دالة جديدة لتنظيف الذاكرة def clear_memory(self): """تحرير النماذج من الذاكرة""" self.models.clear() self.initialized = False gc.collect() logger.info("🧹 [PatternEngine] تم تنظيف الذاكرة وتحرير النماذج.") print("✅ ML Module: Pattern Engine V11.1 (Memory Managed) loaded")