Trad / learning_engine.py
Riy777's picture
Rename learning_engine (19).py to learning_engine.py
64100b4
raw
history blame
31.6 kB
import os
import json
import asyncio
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Any
import hashlib
class LearningEngine:
def __init__(self, r2_service, data_manager):
self.r2_service = r2_service
self.data_manager = data_manager
self.weights = {}
self.performance_history = []
self.strategy_effectiveness = {}
self.market_patterns = {}
self.risk_profiles = {}
self.initialized = False
self.initialization_lock = asyncio.Lock()
async def initialize(self):
"""تهيئة نظام التعلم من R2"""
async with self.initialization_lock:
if self.initialized:
return
print("🧠 تهيئة نظام التعلم الذاتي...")
try:
await self.load_weights_from_r2()
await self.load_performance_history()
self.initialized = True
print("✅ نظام التعلم جاهز - الأوزان محملة بنجاح")
except Exception as e:
print(f"⚠️ لم يتم تحميل الأوزان السابقة: {e} - سيتم البدء بأوزان افتراضية")
await self.initialize_default_weights()
self.initialized = True
async def initialize_enhanced(self):
"""تهيئة محسنة لنظام التعلم"""
async with self.initialization_lock:
if self.initialized:
return
print("🧠 تهيئة نظام التعلم الذاتي المحسّن...")
try:
await self.load_weights_from_r2()
await self.load_performance_history()
# إصلاح هيكل الأوزان إذا لزم الأمر
await self.fix_weights_structure()
# إذا لم تكن هناك بيانات كافية، بدء التعلم من الصفر
if not self.performance_history:
print("🔰 بدء التعلم من الصفر - لا توجد بيانات تاريخية")
await self.initialize_default_weights()
self.initialized = True
print("✅ نظام التعلم المحسّن جاهز")
except Exception as e:
print(f"⚠️ فشل التهيئة المحسنة: {e}")
await self.initialize_default_weights()
self.initialized = True
async def fix_weights_structure(self):
"""إصلاح هيكل الأوزان ليتوافق مع الكود"""
try:
# تحميل البيانات الحالية
key = "learning_engine_weights.json"
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
current_data = json.loads(response['Body'].read())
# إذا كان الهيكل قديماً، قم بتحديثه
if 'strategy_weights' in current_data and 'last_updated' not in current_data:
fixed_data = {
"weights": current_data,
"last_updated": datetime.now().isoformat(),
"version": "2.0",
"performance_metrics": await self.calculate_performance_metrics()
}
data_json = json.dumps(fixed_data, indent=2, ensure_ascii=False).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
print("✅ تم إصلاح هيكل الأوزان بنجاح")
except Exception as e:
print(f"⚠️ لم يتم إصلاح هيكل الأوزان: {e}")
async def initialize_default_weights(self):
"""تهيئة الأوزان الافتراضية - موزعة بشكل أفضل"""
self.weights = {
"strategy_weights": {
"trend_following": 0.18,
"mean_reversion": 0.15,
"breakout_momentum": 0.22,
"volume_spike": 0.12,
"whale_tracking": 0.15,
"pattern_recognition": 0.10,
"hybrid_ai": 0.08
},
"technical_weights": {
"rsi": 0.15,
"macd": 0.18,
"ema_cross": 0.12,
"bollinger_bands": 0.10,
"volume_analysis": 0.15,
"support_resistance": 0.12,
"market_sentiment": 0.18
},
"risk_parameters": {
"max_position_size": 0.1,
"max_daily_loss": 0.02,
"stop_loss_base": 0.02,
"risk_reward_ratio": 2.0,
"volatility_adjustment": 1.0
},
"market_condition_weights": {
"bull_market": {
"trend_following": 0.25,
"breakout_momentum": 0.20,
"whale_tracking": 0.15
},
"bear_market": {
"mean_reversion": 0.25,
"pattern_recognition": 0.20,
"hybrid_ai": 0.15
},
"sideways_market": {
"mean_reversion": 0.30,
"volume_spike": 0.20,
"pattern_recognition": 0.15
}
}
}
async def load_weights_from_r2(self):
"""تحميل الأوزان من R2"""
try:
key = "learning_engine_weights.json"
response = self.r2_service.s3_client.get_object(
Bucket="trading", Key=key
)
weights_data = json.loads(response['Body'].read())
# التعامل مع الهيكل الجديد والقديم
if isinstance(weights_data, dict):
if 'weights' in weights_data:
self.weights = weights_data['weights']
else:
self.weights = weights_data
print(f"✅ تم تحميل الأوزان من R2 بنجاح. إصدار الهيكل: {'جديد' if 'weights' in weights_data else 'قديم'}")
else:
raise ValueError("هيكل الأوزان غير صحيح")
except Exception as e:
print(f"❌ فشل تحميل الأوزان: {e}")
await self.initialize_default_weights()
await self.save_weights_to_r2()
async def save_weights_to_r2(self):
"""حفظ الأوزان إلى R2"""
try:
key = "learning_engine_weights.json"
weights_data = {
"weights": self.weights,
"last_updated": datetime.now().isoformat(),
"version": "2.0",
"performance_metrics": await self.calculate_performance_metrics()
}
data_json = json.dumps(weights_data, indent=2, ensure_ascii=False).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading",
Key=key,
Body=data_json,
ContentType="application/json"
)
print("✅ تم حفظ الأوزان إلى R2 بنجاح")
except Exception as e:
print(f"❌ فشل حفظ الأوزان: {e}")
async def load_performance_history(self):
"""تحميل سجل الأداء"""
try:
key = "learning_performance_history.json"
response = self.r2_service.s3_client.get_object(
Bucket="trading", Key=key
)
history_data = json.loads(response['Body'].read())
self.performance_history = history_data.get("history", [])
print(f"✅ تم تحميل سجل الأداء - {len(self.performance_history)} تسجيل")
except Exception as e:
print(f"⚠️ لم يتم تحميل سجل الأداء: {e}")
self.performance_history = []
async def save_performance_history(self):
"""حفظ سجل الأداء"""
try:
key = "learning_performance_history.json"
history_data = {
"history": self.performance_history[-1000:],
"last_updated": datetime.now().isoformat()
}
data_json = json.dumps(history_data, indent=2, ensure_ascii=False).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading",
Key=key,
Body=data_json,
ContentType="application/json"
)
except Exception as e:
print(f"❌ فشل حفظ سجل الأداء: {e}")
async def analyze_trade_outcome(self, trade_data, outcome):
"""تحليل نتيجة الصفقة وتحديث الأوزان - الإصدار المحسّن"""
if not self.initialized:
await self.initialize()
try:
# استخراج الاستراتيجية من بيانات الصفقة
strategy = trade_data.get('strategy', 'unknown')
if strategy == 'unknown':
decision_data = trade_data.get('decision_data', {})
strategy = decision_data.get('strategy', 'unknown')
# الحصول على سياق السوق الحقيقي
market_context = await self.get_current_market_conditions()
analysis_entry = {
"timestamp": datetime.now().isoformat(),
"trade_data": trade_data,
"outcome": outcome,
"market_conditions": market_context,
"strategy_used": strategy,
"symbol": trade_data.get('symbol', 'unknown'),
"pnl_usd": trade_data.get('pnl_usd', 0),
"pnl_percent": trade_data.get('pnl_percent', 0)
}
self.performance_history.append(analysis_entry)
await self.update_strategy_effectiveness(analysis_entry)
await self.update_market_patterns(analysis_entry)
# ✅ التحديث الهام: تحديث الأوزان بعد كل صفقة في البداية
if len(self.performance_history) <= 10: # أول 10 صفقات
await self.adapt_weights_based_on_performance()
await self.save_weights_to_r2()
await self.save_performance_history()
else:
# بعد ذلك، تحديث كل 3 صفقات
if len(self.performance_history) % 3 == 0:
await self.adapt_weights_based_on_performance()
await self.save_weights_to_r2()
await self.save_performance_history()
print(f"📊 تم تحليل صفقة {trade_data.get('symbol')} - الاستراتيجية: {strategy} - النتيجة: {outcome} - PnL: {trade_data.get('pnl_percent', 0):.2f}%")
except Exception as e:
print(f"❌ فشل تحليل نتيجة الصفقة: {e}")
async def update_strategy_effectiveness(self, analysis_entry):
"""تحديث فعالية الاستراتيجيات"""
strategy = analysis_entry['strategy_used']
outcome = analysis_entry['outcome']
market_condition = analysis_entry['market_conditions']['current_trend']
pnl_percent = analysis_entry.get('pnl_percent', 0)
if strategy not in self.strategy_effectiveness:
self.strategy_effectiveness[strategy] = {
"total_trades": 0,
"successful_trades": 0,
"total_profit": 0,
"total_pnl_percent": 0,
"market_conditions": {}
}
self.strategy_effectiveness[strategy]["total_trades"] += 1
self.strategy_effectiveness[strategy]["total_pnl_percent"] += pnl_percent
# تحديد النجاح بناءً على النتيجة والأداء
is_success = outcome in ["SUCCESS", "CLOSED_BY_REANALYSIS", "CLOSED_BY_MONITOR"] and pnl_percent > 0
if is_success:
self.strategy_effectiveness[strategy]["successful_trades"] += 1
if market_condition not in self.strategy_effectiveness[strategy]["market_conditions"]:
self.strategy_effectiveness[strategy]["market_conditions"][market_condition] = {
"trades": 0,
"successes": 0,
"total_pnl": 0
}
self.strategy_effectiveness[strategy]["market_conditions"][market_condition]["trades"] += 1
self.strategy_effectiveness[strategy]["market_conditions"][market_condition]["total_pnl"] += pnl_percent
if is_success:
self.strategy_effectiveness[strategy]["market_conditions"][market_condition]["successes"] += 1
async def update_market_patterns(self, analysis_entry):
"""تحديث أنماط السوق"""
market_condition = analysis_entry['market_conditions']['current_trend']
symbol = analysis_entry['symbol']
outcome = analysis_entry['outcome']
pnl_percent = analysis_entry.get('pnl_percent', 0)
if market_condition not in self.market_patterns:
self.market_patterns[market_condition] = {
"total_trades": 0,
"successful_trades": 0,
"total_pnl_percent": 0,
"best_performing_strategies": {},
"best_performing_symbols": {}
}
self.market_patterns[market_condition]["total_trades"] += 1
self.market_patterns[market_condition]["total_pnl_percent"] += pnl_percent
is_success = outcome in ["SUCCESS", "CLOSED_BY_REANALYSIS", "CLOSED_BY_MONITOR"] and pnl_percent > 0
if is_success:
self.market_patterns[market_condition]["successful_trades"] += 1
strategy = analysis_entry['strategy_used']
if strategy not in self.market_patterns[market_condition]["best_performing_strategies"]:
self.market_patterns[market_condition]["best_performing_strategies"][strategy] = {
"count": 0,
"total_pnl": 0
}
self.market_patterns[market_condition]["best_performing_strategies"][strategy]["count"] += 1
self.market_patterns[market_condition]["best_performing_strategies"][strategy]["total_pnl"] += pnl_percent
if symbol not in self.market_patterns[market_condition]["best_performing_symbols"]:
self.market_patterns[market_condition]["best_performing_symbols"][symbol] = {
"count": 0,
"total_pnl": 0
}
self.market_patterns[market_condition]["best_performing_symbols"][symbol]["count"] += 1
self.market_patterns[market_condition]["best_performing_symbols"][symbol]["total_pnl"] += pnl_percent
async def adapt_weights_based_on_performance(self):
"""تعديل الأوزان بناءً على الأداء - الإصدار المحسّن"""
print("🔄 تحديث الأوزان بناءً على الأداء...")
try:
# إذا لم تكن هناك بيانات كافية، استخدم تحديثاً تدريجياً
if not self.strategy_effectiveness:
print("⚠️ لا توجد بيانات أداء كافية، استخدام تحديث تدريجي")
await self.gradual_weights_adjustment()
return
# تحديث أوزان الاستراتيجيات بناءً على الأداء الحقيقي
total_performance = 0
strategy_performance = {}
for strategy, data in self.strategy_effectiveness.items():
if data["total_trades"] > 0:
success_rate = data["successful_trades"] / data["total_trades"]
avg_pnl = data["total_pnl_percent"] / data["total_trades"]
# حساب الأداء المركب
composite_performance = (success_rate * 0.7) + (min(avg_pnl, 10) / 10 * 0.3)
strategy_performance[strategy] = composite_performance
total_performance += composite_performance
# إذا كان هناك أداء كافٍ، قم بالتحديث
if total_performance > 0 and strategy_performance:
for strategy, performance in strategy_performance.items():
current_weight = self.weights["strategy_weights"].get(strategy, 0.1)
# تحديث تدريجي لتجنب التغيرات المفاجئة
new_weight = current_weight * 0.7 + (performance * 0.3)
self.weights["strategy_weights"][strategy] = new_weight
# تطبيع الأوزان
self.normalize_weights()
print("✅ تم تحديث الأوزان بناءً على الأداء الحقيقي")
else:
await self.gradual_weights_adjustment()
except Exception as e:
print(f"❌ فشل تحديث الأوزان: {e}")
await self.gradual_weights_adjustment()
async def gradual_weights_adjustment(self):
"""تعديل تدريجي للأوزان لتحسين الأداء"""
print("📈 إجراء تعديل تدريجي على الأوزان...")
# زيادة وزن الاستراتيجيات التي تعتمد على البيانات المتاحة
if self.market_patterns:
for market_condition, data in self.market_patterns.items():
if data.get("total_trades", 0) > 0:
best_strategy = max(data["best_performing_strategies"].items(),
key=lambda x: x[1]["total_pnl"])[0] if data["best_performing_strategies"] else None
if best_strategy:
current_weight = self.weights["strategy_weights"].get(best_strategy, 0.1)
self.weights["strategy_weights"][best_strategy] = min(current_weight * 1.1, 0.3)
self.normalize_weights()
print("✅ تم التعديل التدريجي للأوزان")
def normalize_weights(self):
"""تطبيع الأوزان للتأكد من أن مجموعها 1"""
total = sum(self.weights["strategy_weights"].values())
if total > 0:
for strategy in self.weights["strategy_weights"]:
self.weights["strategy_weights"][strategy] /= total
async def get_current_market_conditions(self):
"""الحصول على ظروف السوق الحالية - بيانات حقيقية"""
try:
if not self.data_manager:
raise ValueError("DataManager غير متوفر")
market_context = await self.data_manager.get_market_context_async()
if not market_context:
raise ValueError("فشل جلب سياق السوق")
return {
"current_trend": market_context.get('market_trend', 'sideways_market'),
"volatility": self._calculate_market_volatility(market_context),
"market_sentiment": market_context.get('btc_sentiment', 'NEUTRAL'),
"whale_activity": market_context.get('general_whale_activity', {}).get('sentiment', 'NEUTRAL'),
"fear_greed_index": market_context.get('fear_and_greed_index', 50)
}
except Exception as e:
print(f"⚠️ فشل الحصول على ظروف السوق: {e}")
return {
"current_trend": "sideways_market",
"volatility": "medium",
"market_sentiment": "neutral",
"whale_activity": "low",
"fear_greed_index": 50
}
def _calculate_market_volatility(self, market_context):
"""حساب تقلبية السوق بناءً على البيانات الحقيقية"""
try:
btc_price = market_context.get('bitcoin_price_usd', 0)
fear_greed = market_context.get('fear_and_greed_index', 50)
whale_sentiment = market_context.get('general_whale_activity', {}).get('sentiment', 'NEUTRAL')
volatility_score = 0
# تحليل سعر البيتكوين (تغيرات كبيرة = تقلبية عالية)
if btc_price > 0:
# هذا مؤشر مبسط - في التطبيق الحقيقي نحتاج بيانات تاريخية
if abs(fear_greed - 50) > 20:
volatility_score += 1
# تحليل نشاط الحيتان
if whale_sentiment in ['BULLISH', 'BEARISH']:
volatility_score += 1
elif whale_sentiment == 'SLIGHTLY_BULLISH':
volatility_score += 0.5
if volatility_score >= 1.5:
return "high"
elif volatility_score >= 0.5:
return "medium"
else:
return "low"
except Exception as e:
print(f"⚠️ خطأ في حساب التقلبية: {e}")
return "medium"
async def calculate_performance_metrics(self):
"""حساب مقاييس الأداء"""
if not self.performance_history:
return {"status": "لا توجد بيانات أداء بعد"}
recent_trades = self.performance_history[-50:] # آخر 50 صفقة فقط
total_trades = len(recent_trades)
successful_trades = sum(1 for trade in recent_trades
if trade['outcome'] in ["SUCCESS", "CLOSED_BY_REANALYSIS", "CLOSED_BY_MONITOR"] and trade.get('pnl_percent', 0) > 0)
success_rate = successful_trades / total_trades if total_trades > 0 else 0
total_pnl = sum(trade.get('pnl_percent', 0) for trade in recent_trades)
avg_pnl = total_pnl / total_trades if total_trades > 0 else 0
strategy_performance = {}
for strategy, data in self.strategy_effectiveness.items():
if data["total_trades"] > 0:
strategy_success_rate = data["successful_trades"] / data["total_trades"]
strategy_avg_pnl = data["total_pnl_percent"] / data["total_trades"]
strategy_performance[strategy] = {
"success_rate": strategy_success_rate,
"avg_pnl_percent": strategy_avg_pnl,
"total_trades": data["total_trades"],
"successful_trades": data["successful_trades"]
}
market_performance = {}
for condition, data in self.market_patterns.items():
if data["total_trades"] > 0:
market_success_rate = data["successful_trades"] / data["total_trades"]
market_avg_pnl = data["total_pnl_percent"] / data["total_trades"]
market_performance[condition] = {
"success_rate": market_success_rate,
"avg_pnl_percent": market_avg_pnl,
"total_trades": data["total_trades"]
}
return {
"overall_success_rate": success_rate,
"overall_avg_pnl_percent": avg_pnl,
"total_analyzed_trades": len(self.performance_history),
"recent_trades_analyzed": total_trades,
"strategy_performance": strategy_performance,
"market_performance": market_performance,
"last_updated": datetime.now().isoformat()
}
async def get_optimized_strategy_weights(self, market_condition):
"""الحصول على أوزان استراتيجية محسنة - الإصدار المصحح"""
try:
if not self.initialized:
print("⚠️ نظام التعلم غير مهيء، استخدام الأوزان الافتراضية")
return await self.get_default_strategy_weights()
# ✅ التحقق من وجود الأوزان وهيكلتها بشكل صحيح
if (not self.weights or
"strategy_weights" not in self.weights or
not self.weights["strategy_weights"]):
print("⚠️ الأوزان غير متوفرة أو فارغة، استخدام الأوزان الافتراضية")
return await self.get_default_strategy_weights()
base_weights = self.weights["strategy_weights"].copy()
# ✅ التحقق من أن الأوزان تحتوي على استراتيجيات فعلية
if not any(weight > 0 for weight in base_weights.values()):
print("⚠️ جميع الأوزان صفر، استخدام الأوزان الافتراضية")
return await self.get_default_strategy_weights()
print(f"✅ استخدام الأوزان المتعلمة: {base_weights}")
return base_weights
except Exception as e:
print(f"❌ فشل في حساب الأوزان المحسنة: {e}")
return await self.get_default_strategy_weights()
async def get_default_strategy_weights(self):
"""إرجاع الأوزان الافتراضية"""
return {
"trend_following": 0.18,
"mean_reversion": 0.15,
"breakout_momentum": 0.22,
"volume_spike": 0.12,
"whale_tracking": 0.15,
"pattern_recognition": 0.10,
"hybrid_ai": 0.08
}
async def get_risk_parameters(self, symbol_volatility):
"""الحصول على معايير المخاطرة المحسنة"""
if not self.weights or "risk_parameters" not in self.weights:
await self.initialize_default_weights()
risk_params = self.weights.get("risk_parameters", {}).copy()
# تعديل معايير المخاطرة بناءً على تقلبية الرمز
if symbol_volatility == "HIGH":
risk_params["stop_loss_base"] *= 1.5
risk_params["max_position_size"] *= 0.7
risk_params["risk_reward_ratio"] = 1.5 # تخفيض نسبة المكافأة/المخاطرة للتقليل العالي
elif symbol_volatility == "LOW":
risk_params["stop_loss_base"] *= 0.7
risk_params["max_position_size"] *= 1.2
risk_params["risk_reward_ratio"] = 2.5 # زيادة النسبة للتقليل المنخفض
return risk_params
async def suggest_improvements(self):
"""اقتراح تحسينات بناءً على تحليل الأداء"""
improvements = []
if not self.performance_history:
improvements.append("📊 ابدأ بجمع بيانات الأداء من الصفقات الأولى")
return improvements
# تحليل أداء الاستراتيجيات
for strategy, data in self.strategy_effectiveness.items():
if data["total_trades"] >= 3:
success_rate = data["successful_trades"] / data["total_trades"]
avg_pnl = data["total_pnl_percent"] / data["total_trades"]
if success_rate < 0.3 and avg_pnl < 0:
improvements.append(f"🚨 استراتيجية {strategy} ضعيفة الأداء ({success_rate:.1%} نجاح، {avg_pnl:+.1f}% متوسط) - يقترح تقليل استخدامها")
elif success_rate > 0.6 and avg_pnl > 2:
improvements.append(f"✅ استراتيجية {strategy} ممتازة الأداء ({success_rate:.1%} نجاح، {avg_pnl:+.1f}% متوسط) - يقترح زيادة استخدامها")
elif success_rate > 0.7:
improvements.append(f"🎯 استراتيجية {strategy} عالية النجاح ({success_rate:.1%}) - التركيز على جودة الصفقات")
# تحليل أداء ظروف السوق
for market_condition, data in self.market_patterns.items():
if data["total_trades"] >= 5:
success_rate = data["successful_trades"] / data["total_trades"]
avg_pnl = data["total_pnl_percent"] / data["total_trades"]
if success_rate < 0.4:
improvements.append(f"⚠️ الأداء ضعيف في سوق {market_condition} ({success_rate:.1%} نجاح) - يحتاج مراجعة الاستراتيجيات")
# العثور على أفضل استراتيجية لهذا السوق
best_strategy = None
best_performance = -100
for strategy, stats in data["best_performing_strategies"].items():
if stats["count"] >= 2:
strategy_avg_pnl = stats["total_pnl"] / stats["count"]
if strategy_avg_pnl > best_performance:
best_performance = strategy_avg_pnl
best_strategy = strategy
if best_strategy and best_performance > 1:
improvements.append(f"📈 أفضل استراتيجية في {market_condition}: {best_strategy} ({best_performance:+.1f}% متوسط ربح)")
if not improvements:
improvements.append("📊 لا توجد تحسينات مقترحة حالياً - استمر في جمع البيانات")
return improvements
async def force_strategy_learning(self):
"""إجبار النظام على التعلم من البيانات الحالية"""
print("🧠 إجبار تحديث الاستراتيجيات من البيانات الحالية...")
if not self.performance_history:
print("⚠️ لا توجد بيانات أداء للتعلم منها")
return
# تحديث فعالية الاستراتيجيات من البيانات التاريخية
for entry in self.performance_history:
await self.update_strategy_effectiveness(entry)
await self.update_market_patterns(entry)
# تحديث الأوزان فوراً
await self.adapt_weights_based_on_performance()
await self.save_weights_to_r2()
print("✅ تم إجبار تحديث الاستراتيجيات بنجاح")
print("✅ نظام التعلم الذاتي المحسن محمل - جاهز للتعلم والتكيف المستمر")