import os import json import asyncio import pandas as pd import numpy as np from datetime import datetime, timedelta from typing import Dict, List, Any import hashlib class LearningEngine: def __init__(self, r2_service, data_manager): self.r2_service = r2_service self.data_manager = data_manager self.weights = {} self.performance_history = [] self.strategy_effectiveness = {} self.market_patterns = {} self.risk_profiles = {} self.initialized = False self.initialization_lock = asyncio.Lock() async def initialize(self): """تهيئة نظام التعلم من R2""" async with self.initialization_lock: if self.initialized: return print("🧠 تهيئة نظام التعلم الذاتي...") try: await self.load_weights_from_r2() await self.load_performance_history() self.initialized = True print("✅ نظام التعلم جاهز - الأوزان محملة بنجاح") except Exception as e: print(f"⚠️ لم يتم تحميل الأوزان السابقة: {e} - سيتم البدء بأوزان افتراضية") await self.initialize_default_weights() self.initialized = True async def initialize_enhanced(self): """تهيئة محسنة لنظام التعلم""" async with self.initialization_lock: if self.initialized: return print("🧠 تهيئة نظام التعلم الذاتي المحسّن...") try: await self.load_weights_from_r2() await self.load_performance_history() # إصلاح هيكل الأوزان إذا لزم الأمر await self.fix_weights_structure() # إذا لم تكن هناك بيانات كافية، بدء التعلم من الصفر if not self.performance_history: print("🔰 بدء التعلم من الصفر - لا توجد بيانات تاريخية") await self.initialize_default_weights() self.initialized = True print("✅ نظام التعلم المحسّن جاهز") except Exception as e: print(f"⚠️ فشل التهيئة المحسنة: {e}") await self.initialize_default_weights() self.initialized = True async def fix_weights_structure(self): """إصلاح هيكل الأوزان ليتوافق مع الكود""" try: # تحميل البيانات الحالية key = "learning_engine_weights.json" response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) current_data = json.loads(response['Body'].read()) # إذا كان الهيكل قديماً، قم بتحديثه if 'strategy_weights' in current_data and 'last_updated' not in current_data: fixed_data = { "weights": current_data, "last_updated": datetime.now().isoformat(), "version": "2.0", "performance_metrics": await self.calculate_performance_metrics() } data_json = json.dumps(fixed_data, indent=2, ensure_ascii=False).encode('utf-8') self.r2_service.s3_client.put_object( Bucket="trading", Key=key, Body=data_json, ContentType="application/json" ) print("✅ تم إصلاح هيكل الأوزان بنجاح") except Exception as e: print(f"⚠️ لم يتم إصلاح هيكل الأوزان: {e}") async def initialize_default_weights(self): """تهيئة الأوزان الافتراضية - موزعة بشكل أفضل""" self.weights = { "strategy_weights": { "trend_following": 0.18, "mean_reversion": 0.15, "breakout_momentum": 0.22, "volume_spike": 0.12, "whale_tracking": 0.15, "pattern_recognition": 0.10, "hybrid_ai": 0.08 }, "technical_weights": { "rsi": 0.15, "macd": 0.18, "ema_cross": 0.12, "bollinger_bands": 0.10, "volume_analysis": 0.15, "support_resistance": 0.12, "market_sentiment": 0.18 }, "risk_parameters": { "max_position_size": 0.1, "max_daily_loss": 0.02, "stop_loss_base": 0.02, "risk_reward_ratio": 2.0, "volatility_adjustment": 1.0 }, "market_condition_weights": { "bull_market": { "trend_following": 0.25, "breakout_momentum": 0.20, "whale_tracking": 0.15 }, "bear_market": { "mean_reversion": 0.25, "pattern_recognition": 0.20, "hybrid_ai": 0.15 }, "sideways_market": { "mean_reversion": 0.30, "volume_spike": 0.20, "pattern_recognition": 0.15 } } } async def load_weights_from_r2(self): """تحميل الأوزان من R2""" try: key = "learning_engine_weights.json" response = self.r2_service.s3_client.get_object( Bucket="trading", Key=key ) weights_data = json.loads(response['Body'].read()) # التعامل مع الهيكل الجديد والقديم if isinstance(weights_data, dict): if 'weights' in weights_data: self.weights = weights_data['weights'] else: self.weights = weights_data print(f"✅ تم تحميل الأوزان من R2 بنجاح. إصدار الهيكل: {'جديد' if 'weights' in weights_data else 'قديم'}") else: raise ValueError("هيكل الأوزان غير صحيح") except Exception as e: print(f"❌ فشل تحميل الأوزان: {e}") await self.initialize_default_weights() await self.save_weights_to_r2() async def save_weights_to_r2(self): """حفظ الأوزان إلى R2""" try: key = "learning_engine_weights.json" weights_data = { "weights": self.weights, "last_updated": datetime.now().isoformat(), "version": "2.0", "performance_metrics": await self.calculate_performance_metrics() } data_json = json.dumps(weights_data, indent=2, ensure_ascii=False).encode('utf-8') self.r2_service.s3_client.put_object( Bucket="trading", Key=key, Body=data_json, ContentType="application/json" ) print("✅ تم حفظ الأوزان إلى R2 بنجاح") except Exception as e: print(f"❌ فشل حفظ الأوزان: {e}") async def load_performance_history(self): """تحميل سجل الأداء""" try: key = "learning_performance_history.json" response = self.r2_service.s3_client.get_object( Bucket="trading", Key=key ) history_data = json.loads(response['Body'].read()) self.performance_history = history_data.get("history", []) print(f"✅ تم تحميل سجل الأداء - {len(self.performance_history)} تسجيل") except Exception as e: print(f"⚠️ لم يتم تحميل سجل الأداء: {e}") self.performance_history = [] async def save_performance_history(self): """حفظ سجل الأداء""" try: key = "learning_performance_history.json" history_data = { "history": self.performance_history[-1000:], "last_updated": datetime.now().isoformat() } data_json = json.dumps(history_data, indent=2, ensure_ascii=False).encode('utf-8') self.r2_service.s3_client.put_object( Bucket="trading", Key=key, Body=data_json, ContentType="application/json" ) except Exception as e: print(f"❌ فشل حفظ سجل الأداء: {e}") async def analyze_trade_outcome(self, trade_data, outcome): """تحليل نتيجة الصفقة وتحديث الأوزان - الإصدار المحسّن""" if not self.initialized: await self.initialize() try: # استخراج الاستراتيجية من بيانات الصفقة strategy = trade_data.get('strategy', 'unknown') if strategy == 'unknown': decision_data = trade_data.get('decision_data', {}) strategy = decision_data.get('strategy', 'unknown') # الحصول على سياق السوق الحقيقي market_context = await self.get_current_market_conditions() analysis_entry = { "timestamp": datetime.now().isoformat(), "trade_data": trade_data, "outcome": outcome, "market_conditions": market_context, "strategy_used": strategy, "symbol": trade_data.get('symbol', 'unknown'), "pnl_usd": trade_data.get('pnl_usd', 0), "pnl_percent": trade_data.get('pnl_percent', 0) } self.performance_history.append(analysis_entry) await self.update_strategy_effectiveness(analysis_entry) await self.update_market_patterns(analysis_entry) # ✅ التحديث الهام: تحديث الأوزان بعد كل صفقة في البداية if len(self.performance_history) <= 10: # أول 10 صفقات await self.adapt_weights_based_on_performance() await self.save_weights_to_r2() await self.save_performance_history() else: # بعد ذلك، تحديث كل 3 صفقات if len(self.performance_history) % 3 == 0: await self.adapt_weights_based_on_performance() await self.save_weights_to_r2() await self.save_performance_history() print(f"📊 تم تحليل صفقة {trade_data.get('symbol')} - الاستراتيجية: {strategy} - النتيجة: {outcome} - PnL: {trade_data.get('pnl_percent', 0):.2f}%") except Exception as e: print(f"❌ فشل تحليل نتيجة الصفقة: {e}") async def update_strategy_effectiveness(self, analysis_entry): """تحديث فعالية الاستراتيجيات""" strategy = analysis_entry['strategy_used'] outcome = analysis_entry['outcome'] market_condition = analysis_entry['market_conditions']['current_trend'] pnl_percent = analysis_entry.get('pnl_percent', 0) if strategy not in self.strategy_effectiveness: self.strategy_effectiveness[strategy] = { "total_trades": 0, "successful_trades": 0, "total_profit": 0, "total_pnl_percent": 0, "market_conditions": {} } self.strategy_effectiveness[strategy]["total_trades"] += 1 self.strategy_effectiveness[strategy]["total_pnl_percent"] += pnl_percent # تحديد النجاح بناءً على النتيجة والأداء is_success = outcome in ["SUCCESS", "CLOSED_BY_REANALYSIS", "CLOSED_BY_MONITOR"] and pnl_percent > 0 if is_success: self.strategy_effectiveness[strategy]["successful_trades"] += 1 if market_condition not in self.strategy_effectiveness[strategy]["market_conditions"]: self.strategy_effectiveness[strategy]["market_conditions"][market_condition] = { "trades": 0, "successes": 0, "total_pnl": 0 } self.strategy_effectiveness[strategy]["market_conditions"][market_condition]["trades"] += 1 self.strategy_effectiveness[strategy]["market_conditions"][market_condition]["total_pnl"] += pnl_percent if is_success: self.strategy_effectiveness[strategy]["market_conditions"][market_condition]["successes"] += 1 async def update_market_patterns(self, analysis_entry): """تحديث أنماط السوق""" market_condition = analysis_entry['market_conditions']['current_trend'] symbol = analysis_entry['symbol'] outcome = analysis_entry['outcome'] pnl_percent = analysis_entry.get('pnl_percent', 0) if market_condition not in self.market_patterns: self.market_patterns[market_condition] = { "total_trades": 0, "successful_trades": 0, "total_pnl_percent": 0, "best_performing_strategies": {}, "best_performing_symbols": {} } self.market_patterns[market_condition]["total_trades"] += 1 self.market_patterns[market_condition]["total_pnl_percent"] += pnl_percent is_success = outcome in ["SUCCESS", "CLOSED_BY_REANALYSIS", "CLOSED_BY_MONITOR"] and pnl_percent > 0 if is_success: self.market_patterns[market_condition]["successful_trades"] += 1 strategy = analysis_entry['strategy_used'] if strategy not in self.market_patterns[market_condition]["best_performing_strategies"]: self.market_patterns[market_condition]["best_performing_strategies"][strategy] = { "count": 0, "total_pnl": 0 } self.market_patterns[market_condition]["best_performing_strategies"][strategy]["count"] += 1 self.market_patterns[market_condition]["best_performing_strategies"][strategy]["total_pnl"] += pnl_percent if symbol not in self.market_patterns[market_condition]["best_performing_symbols"]: self.market_patterns[market_condition]["best_performing_symbols"][symbol] = { "count": 0, "total_pnl": 0 } self.market_patterns[market_condition]["best_performing_symbols"][symbol]["count"] += 1 self.market_patterns[market_condition]["best_performing_symbols"][symbol]["total_pnl"] += pnl_percent async def adapt_weights_based_on_performance(self): """تعديل الأوزان بناءً على الأداء - الإصدار المحسّن""" print("🔄 تحديث الأوزان بناءً على الأداء...") try: # إذا لم تكن هناك بيانات كافية، استخدم تحديثاً تدريجياً if not self.strategy_effectiveness: print("⚠️ لا توجد بيانات أداء كافية، استخدام تحديث تدريجي") await self.gradual_weights_adjustment() return # تحديث أوزان الاستراتيجيات بناءً على الأداء الحقيقي total_performance = 0 strategy_performance = {} for strategy, data in self.strategy_effectiveness.items(): if data["total_trades"] > 0: success_rate = data["successful_trades"] / data["total_trades"] avg_pnl = data["total_pnl_percent"] / data["total_trades"] # حساب الأداء المركب composite_performance = (success_rate * 0.7) + (min(avg_pnl, 10) / 10 * 0.3) strategy_performance[strategy] = composite_performance total_performance += composite_performance # إذا كان هناك أداء كافٍ، قم بالتحديث if total_performance > 0 and strategy_performance: for strategy, performance in strategy_performance.items(): current_weight = self.weights["strategy_weights"].get(strategy, 0.1) # تحديث تدريجي لتجنب التغيرات المفاجئة new_weight = current_weight * 0.7 + (performance * 0.3) self.weights["strategy_weights"][strategy] = new_weight # تطبيع الأوزان self.normalize_weights() print("✅ تم تحديث الأوزان بناءً على الأداء الحقيقي") else: await self.gradual_weights_adjustment() except Exception as e: print(f"❌ فشل تحديث الأوزان: {e}") await self.gradual_weights_adjustment() async def gradual_weights_adjustment(self): """تعديل تدريجي للأوزان لتحسين الأداء""" print("📈 إجراء تعديل تدريجي على الأوزان...") # زيادة وزن الاستراتيجيات التي تعتمد على البيانات المتاحة if self.market_patterns: for market_condition, data in self.market_patterns.items(): if data.get("total_trades", 0) > 0: best_strategy = max(data["best_performing_strategies"].items(), key=lambda x: x[1]["total_pnl"])[0] if data["best_performing_strategies"] else None if best_strategy: current_weight = self.weights["strategy_weights"].get(best_strategy, 0.1) self.weights["strategy_weights"][best_strategy] = min(current_weight * 1.1, 0.3) self.normalize_weights() print("✅ تم التعديل التدريجي للأوزان") def normalize_weights(self): """تطبيع الأوزان للتأكد من أن مجموعها 1""" total = sum(self.weights["strategy_weights"].values()) if total > 0: for strategy in self.weights["strategy_weights"]: self.weights["strategy_weights"][strategy] /= total async def get_current_market_conditions(self): """الحصول على ظروف السوق الحالية - بيانات حقيقية""" try: if not self.data_manager: raise ValueError("DataManager غير متوفر") market_context = await self.data_manager.get_market_context_async() if not market_context: raise ValueError("فشل جلب سياق السوق") return { "current_trend": market_context.get('market_trend', 'sideways_market'), "volatility": self._calculate_market_volatility(market_context), "market_sentiment": market_context.get('btc_sentiment', 'NEUTRAL'), "whale_activity": market_context.get('general_whale_activity', {}).get('sentiment', 'NEUTRAL'), "fear_greed_index": market_context.get('fear_and_greed_index', 50) } except Exception as e: print(f"⚠️ فشل الحصول على ظروف السوق: {e}") return { "current_trend": "sideways_market", "volatility": "medium", "market_sentiment": "neutral", "whale_activity": "low", "fear_greed_index": 50 } def _calculate_market_volatility(self, market_context): """حساب تقلبية السوق بناءً على البيانات الحقيقية""" try: btc_price = market_context.get('bitcoin_price_usd', 0) fear_greed = market_context.get('fear_and_greed_index', 50) whale_sentiment = market_context.get('general_whale_activity', {}).get('sentiment', 'NEUTRAL') volatility_score = 0 # تحليل سعر البيتكوين (تغيرات كبيرة = تقلبية عالية) if btc_price > 0: # هذا مؤشر مبسط - في التطبيق الحقيقي نحتاج بيانات تاريخية if abs(fear_greed - 50) > 20: volatility_score += 1 # تحليل نشاط الحيتان if whale_sentiment in ['BULLISH', 'BEARISH']: volatility_score += 1 elif whale_sentiment == 'SLIGHTLY_BULLISH': volatility_score += 0.5 if volatility_score >= 1.5: return "high" elif volatility_score >= 0.5: return "medium" else: return "low" except Exception as e: print(f"⚠️ خطأ في حساب التقلبية: {e}") return "medium" async def calculate_performance_metrics(self): """حساب مقاييس الأداء""" if not self.performance_history: return {"status": "لا توجد بيانات أداء بعد"} recent_trades = self.performance_history[-50:] # آخر 50 صفقة فقط total_trades = len(recent_trades) successful_trades = sum(1 for trade in recent_trades if trade['outcome'] in ["SUCCESS", "CLOSED_BY_REANALYSIS", "CLOSED_BY_MONITOR"] and trade.get('pnl_percent', 0) > 0) success_rate = successful_trades / total_trades if total_trades > 0 else 0 total_pnl = sum(trade.get('pnl_percent', 0) for trade in recent_trades) avg_pnl = total_pnl / total_trades if total_trades > 0 else 0 strategy_performance = {} for strategy, data in self.strategy_effectiveness.items(): if data["total_trades"] > 0: strategy_success_rate = data["successful_trades"] / data["total_trades"] strategy_avg_pnl = data["total_pnl_percent"] / data["total_trades"] strategy_performance[strategy] = { "success_rate": strategy_success_rate, "avg_pnl_percent": strategy_avg_pnl, "total_trades": data["total_trades"], "successful_trades": data["successful_trades"] } market_performance = {} for condition, data in self.market_patterns.items(): if data["total_trades"] > 0: market_success_rate = data["successful_trades"] / data["total_trades"] market_avg_pnl = data["total_pnl_percent"] / data["total_trades"] market_performance[condition] = { "success_rate": market_success_rate, "avg_pnl_percent": market_avg_pnl, "total_trades": data["total_trades"] } return { "overall_success_rate": success_rate, "overall_avg_pnl_percent": avg_pnl, "total_analyzed_trades": len(self.performance_history), "recent_trades_analyzed": total_trades, "strategy_performance": strategy_performance, "market_performance": market_performance, "last_updated": datetime.now().isoformat() } async def get_optimized_strategy_weights(self, market_condition): """الحصول على أوزان استراتيجية محسنة - الإصدار المصحح""" try: if not self.initialized: print("⚠️ نظام التعلم غير مهيء، استخدام الأوزان الافتراضية") return await self.get_default_strategy_weights() # ✅ التحقق من وجود الأوزان وهيكلتها بشكل صحيح if (not self.weights or "strategy_weights" not in self.weights or not self.weights["strategy_weights"]): print("⚠️ الأوزان غير متوفرة أو فارغة، استخدام الأوزان الافتراضية") return await self.get_default_strategy_weights() base_weights = self.weights["strategy_weights"].copy() # ✅ التحقق من أن الأوزان تحتوي على استراتيجيات فعلية if not any(weight > 0 for weight in base_weights.values()): print("⚠️ جميع الأوزان صفر، استخدام الأوزان الافتراضية") return await self.get_default_strategy_weights() print(f"✅ استخدام الأوزان المتعلمة: {base_weights}") return base_weights except Exception as e: print(f"❌ فشل في حساب الأوزان المحسنة: {e}") return await self.get_default_strategy_weights() async def get_default_strategy_weights(self): """إرجاع الأوزان الافتراضية""" return { "trend_following": 0.18, "mean_reversion": 0.15, "breakout_momentum": 0.22, "volume_spike": 0.12, "whale_tracking": 0.15, "pattern_recognition": 0.10, "hybrid_ai": 0.08 } async def get_risk_parameters(self, symbol_volatility): """الحصول على معايير المخاطرة المحسنة""" if not self.weights or "risk_parameters" not in self.weights: await self.initialize_default_weights() risk_params = self.weights.get("risk_parameters", {}).copy() # تعديل معايير المخاطرة بناءً على تقلبية الرمز if symbol_volatility == "HIGH": risk_params["stop_loss_base"] *= 1.5 risk_params["max_position_size"] *= 0.7 risk_params["risk_reward_ratio"] = 1.5 # تخفيض نسبة المكافأة/المخاطرة للتقليل العالي elif symbol_volatility == "LOW": risk_params["stop_loss_base"] *= 0.7 risk_params["max_position_size"] *= 1.2 risk_params["risk_reward_ratio"] = 2.5 # زيادة النسبة للتقليل المنخفض return risk_params async def suggest_improvements(self): """اقتراح تحسينات بناءً على تحليل الأداء""" improvements = [] if not self.performance_history: improvements.append("📊 ابدأ بجمع بيانات الأداء من الصفقات الأولى") return improvements # تحليل أداء الاستراتيجيات for strategy, data in self.strategy_effectiveness.items(): if data["total_trades"] >= 3: success_rate = data["successful_trades"] / data["total_trades"] avg_pnl = data["total_pnl_percent"] / data["total_trades"] if success_rate < 0.3 and avg_pnl < 0: improvements.append(f"🚨 استراتيجية {strategy} ضعيفة الأداء ({success_rate:.1%} نجاح، {avg_pnl:+.1f}% متوسط) - يقترح تقليل استخدامها") elif success_rate > 0.6 and avg_pnl > 2: improvements.append(f"✅ استراتيجية {strategy} ممتازة الأداء ({success_rate:.1%} نجاح، {avg_pnl:+.1f}% متوسط) - يقترح زيادة استخدامها") elif success_rate > 0.7: improvements.append(f"🎯 استراتيجية {strategy} عالية النجاح ({success_rate:.1%}) - التركيز على جودة الصفقات") # تحليل أداء ظروف السوق for market_condition, data in self.market_patterns.items(): if data["total_trades"] >= 5: success_rate = data["successful_trades"] / data["total_trades"] avg_pnl = data["total_pnl_percent"] / data["total_trades"] if success_rate < 0.4: improvements.append(f"⚠️ الأداء ضعيف في سوق {market_condition} ({success_rate:.1%} نجاح) - يحتاج مراجعة الاستراتيجيات") # العثور على أفضل استراتيجية لهذا السوق best_strategy = None best_performance = -100 for strategy, stats in data["best_performing_strategies"].items(): if stats["count"] >= 2: strategy_avg_pnl = stats["total_pnl"] / stats["count"] if strategy_avg_pnl > best_performance: best_performance = strategy_avg_pnl best_strategy = strategy if best_strategy and best_performance > 1: improvements.append(f"📈 أفضل استراتيجية في {market_condition}: {best_strategy} ({best_performance:+.1f}% متوسط ربح)") if not improvements: improvements.append("📊 لا توجد تحسينات مقترحة حالياً - استمر في جمع البيانات") return improvements async def force_strategy_learning(self): """إجبار النظام على التعلم من البيانات الحالية""" print("🧠 إجبار تحديث الاستراتيجيات من البيانات الحالية...") if not self.performance_history: print("⚠️ لا توجد بيانات أداء للتعلم منها") return # تحديث فعالية الاستراتيجيات من البيانات التاريخية for entry in self.performance_history: await self.update_strategy_effectiveness(entry) await self.update_market_patterns(entry) # تحديث الأوزان فوراً await self.adapt_weights_based_on_performance() await self.save_weights_to_r2() print("✅ تم إجبار تحديث الاستراتيجيات بنجاح") print("✅ نظام التعلم الذاتي المحسن محمل - جاهز للتعلم والتكيف المستمر")