Trad / learning_hub /statistical_analyzer.py
Riy777's picture
Update learning_hub/statistical_analyzer.py
6512921 verified
# learning_hub/statistical_analyzer.py
# (محدث بالكامل - V2 - تعلم تأثير VADER)
# وهو يمثل "التعلم البطيء" (الإحصائي)
import json
import asyncio
import traceback # (إضافة)
from datetime import datetime
from typing import Dict, Any, List
import numpy as np
# (نفترض أن هذه الدوال المساعدة سيتم نقلها إلى ملف helpers.py عام)
# (لأغراض هذا الملف، سنعرفها هنا مؤقتاً)
def normalize_weights(weights_dict):
total = sum(weights_dict.values())
if total > 0:
for key in weights_dict:
weights_dict[key] /= total
return weights_dict
def should_update_weights(history_length):
return history_length % 5 == 0 # (تحديث الأوزان كل 5 صفقات)
class StatisticalAnalyzer:
def __init__(self, r2_service: Any, data_manager: Any):
self.r2_service = r2_service
self.data_manager = data_manager # (لجلب سياق السوق)
# --- (هذه هي نفس متغيرات الحالة من learning_engine القديم) ---
self.weights = {} # (أوزان استراتيجيات الدخول)
self.performance_history = []
self.strategy_effectiveness = {} # (إحصائيات استراتيجيات الدخول)
self.exit_profile_effectiveness = {} # (إحصائيات مزيج الدخول+الخروج)
self.market_patterns = {}
# 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴
self.vader_bin_effectiveness = {} # (جديد: لتتبع أداء VADER)
# 🔴 --- END OF CHANGE --- 🔴
self.initialized = False
self.lock = asyncio.Lock()
print("✅ Learning Hub Module: Statistical Analyzer (Slow-Learner) loaded")
async def initialize(self):
"""تهيئة المحلل الإحصائي (التعلم البطيء)"""
async with self.lock:
if self.initialized:
return
print("🔄 [StatsAnalyzer] تهيئة نظام التعلم الإحصائي (البطيء)...")
try:
await self.load_weights_from_r2()
await self.load_performance_history()
await self.load_exit_profile_effectiveness()
# 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴
await self.load_vader_effectiveness()
# 🔴 --- END OF CHANGE --- 🔴
if not self.weights or not self.strategy_effectiveness:
await self.initialize_default_weights()
self.initialized = True
print("✅ [StatsAnalyzer] نظام التعلم الإحصائي جاهز.")
except Exception as e:
print(f"❌ [StatsAnalyzer] فشل التهيئة: {e}")
await self.initialize_default_weights()
self.initialized = True
# ---------------------------------------------------------------------------
# (الدوال التالية مأخوذة مباشرة من learning_engine (39).py القديم)
# (مع تعديلات طفيفة)
# ---------------------------------------------------------------------------
async def initialize_default_weights(self):
"""إعادة تعيين الأوزان إلى الوضع الافتراضي"""
# 🔴 --- START OF CHANGE --- 🔴
self.weights = {
# 1. أوزان اختيار الاستراتيجية (MLProcessor)
"strategy_weights": {
"trend_following": 0.18, "mean_reversion": 0.15, "breakout_momentum": 0.22,
"volume_spike": 0.12, "whale_tracking": 0.15, "pattern_recognition": 0.10,
"hybrid_ai": 0.08
},
# 2. أوزان المؤشرات العامة (MLProcessor)
"indicator_weights": {
"rsi": 0.2, "macd": 0.2, "bbands": 0.15, "atr": 0.1,
"volume_ratio": 0.2, "vwap": 0.15
},
# 3. أوزان الأنماط العامة (MLProcessor)
"pattern_weights": {
"Double Bottom": 0.3, "Breakout Up": 0.3, "Uptrend": 0.2,
"Near Support": 0.2, "Double Top": -0.3 # (وزن سلبي)
},
# 4. أوزان كاشف الانعكاس 5m (للحارس)
"reversal_indicator_weights": {
"pattern": 0.4,
"rsi": 0.3,
"macd": 0.3
},
# 5. أوزان زناد الدخول 1m (للحارس)
"entry_trigger_weights": {
"cvd": 0.25,
"order_book": 0.25,
"ema_1m": 0.25,
"macd_1m": 0.25
},
# 6. عتبة تفعيل زناد الدخول
"entry_trigger_threshold": 0.75
}
# 🔴 --- END OF CHANGE --- 🔴
self.strategy_effectiveness = {}
self.exit_profile_effectiveness = {}
self.market_patterns = {}
# 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴
# (إعادة تعيين إحصائيات VADER أيضاً)
self.vader_bin_effectiveness = {
"Strong_Positive": {"total_trades": 0, "total_pnl_percent": 0},
"Positive": {"total_trades": 0, "total_pnl_percent": 0},
"Neutral": {"total_trades": 0, "total_pnl_percent": 0},
"Negative": {"total_trades": 0, "total_pnl_percent": 0},
"Strong_Negative": {"total_trades": 0, "total_pnl_percent": 0}
}
# 🔴 --- END OF CHANGE --- 🔴
async def load_weights_from_r2(self):
key = "learning_statistical_weights.json" # (ملف جديد)
try:
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
data = json.loads(response['Body'].read())
self.weights = data.get("weights", {})
# (إضافة: التحقق من وجود الأوزان الجديدة، وإلا إضافتها من الافتراضيات)
if "reversal_indicator_weights" not in self.weights:
defaults = await self.get_default_strategy_weights() # (سيحتوي على كل شيء)
self.weights["reversal_indicator_weights"] = defaults.get("reversal_indicator_weights")
self.weights["entry_trigger_weights"] = defaults.get("entry_trigger_weights")
self.weights["entry_trigger_threshold"] = defaults.get("entry_trigger_threshold")
print("ℹ️ [StatsAnalyzer] تم تحديث ملف الأوزان ببيانات الحارس الجديدة.")
self.strategy_effectiveness = data.get("strategy_effectiveness", {})
self.market_patterns = data.get("market_patterns", {})
print(f"✅ [StatsAnalyzer] تم تحميل الأوزان والإحصائيات من R2.")
except Exception as e:
print(f"ℹ️ [StatsAnalyzer] فشل تحميل الأوزان ({e}). استخدام الافتراضيات.")
await self.initialize_default_weights()
async def save_weights_to_r2(self):
key = "learning_statistical_weights.json"
try:
data = {
"weights": self.weights,
"strategy_effectiveness": self.strategy_effectiveness,
"market_patterns": self.market_patterns,
"last_updated": datetime.now().isoformat()
}
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
except Exception as e:
print(f"❌ [StatsAnalyzer] فشل حفظ الأوزان في R2: {e}")
async def load_performance_history(self):
key = "learning_performance_history.json" # (مشترك)
try:
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
data = json.loads(response['Body'].read())
self.performance_history = data.get("history", [])
except Exception as e:
self.performance_history = []
async def save_performance_history(self):
key = "learning_performance_history.json"
try:
data = {"history": self.performance_history[-1000:]} # (آخر 1000 صفقة فقط)
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
except Exception as e:
print(f"❌ [StatsAnalyzer] فشل حفظ تاريخ الأداء: {e}")
async def load_exit_profile_effectiveness(self):
key = "learning_exit_profile_effectiveness.json" # (مشترك)
try:
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
data = json.loads(response['Body'].read())
self.exit_profile_effectiveness = data.get("effectiveness", {})
except Exception as e:
self.exit_profile_effectiveness = {}
async def save_exit_profile_effectiveness(self):
key = "learning_exit_profile_effectiveness.json"
try:
data = {
"effectiveness": self.exit_profile_effectiveness,
"last_updated": datetime.now().isoformat()
}
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
except Exception as e:
print(f"❌ [StatsAnalyzer] فشل حفظ أداء ملف الخروج: {e}")
# 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴
async def load_vader_effectiveness(self):
"""تحميل إحصائيات VADER من R2"""
key = "learning_vader_effectiveness.json" # (ملف جديد)
try:
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
data = json.loads(response['Body'].read())
self.vader_bin_effectiveness = data.get("effectiveness", {})
if not self.vader_bin_effectiveness:
await self.initialize_default_weights() # (سيقوم بملء القيم الافتراضية)
except Exception as e:
# (إذا فشل، ستقوم initialize_default_weights بملء القيم الافتراضية)
pass
async def save_vader_effectiveness(self):
"""حفظ إحصائيات VADER إلى R2"""
key = "learning_vader_effectiveness.json"
try:
data = {
"effectiveness": self.vader_bin_effectiveness,
"last_updated": datetime.now().isoformat()
}
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
except Exception as e:
print(f"❌ [StatsAnalyzer] فشل حفظ أداء VADER: {e}")
def _get_vader_bin(self, vader_score: float) -> str:
"""تصنيف درجة VADER الخام (-1 إلى +1) إلى سلال"""
if vader_score > 0.5:
return "Strong_Positive"
elif vader_score > 0.05:
return "Positive"
elif vader_score < -0.5:
return "Strong_Negative"
elif vader_score < -0.05:
return "Negative"
else:
return "Neutral"
# 🔴 --- END OF CHANGE --- 🔴
async def update_statistics(self, trade_object: Dict[str, Any], close_reason: str):
"""
هذه هي الدالة الرئيسية التي تحدث الإحصائيات (التعلم البطيء).
(تدمج update_strategy_effectiveness و update_market_patterns من الملف القديم)
"""
if not self.initialized:
await self.initialize()
try:
strategy = trade_object.get('strategy', 'unknown')
decision_data = trade_object.get('decision_data', {})
exit_profile = decision_data.get('exit_profile', 'unknown')
combined_key = f"{strategy}_{exit_profile}"
pnl_percent = trade_object.get('pnl_percent', 0)
is_success = pnl_percent > 0.1 # (اعتبار الربح الطفيف نجاحاً)
# 🔴 --- START OF CHANGE --- 🔴
# (استخدام بيانات السوق وقت القرار إذا كانت مخزنة، وإلا جلب الحالية)
market_context = decision_data.get('market_context_at_decision', {})
if not market_context:
market_context = await self.get_current_market_conditions()
market_condition = market_context.get('current_trend', 'sideways_market')
# (V2 - VADER Learning) جلب درجة VADER وقت القرار
# (نفترض أن TradeManager حفظها هنا)
vader_score_at_decision = decision_data.get('news_score', 0.0)
vader_bin = self._get_vader_bin(vader_score_at_decision)
# 🔴 --- END OF CHANGE --- 🔴
# --- 1. تحديث تاريخ الأداء (للتتبع العام) ---
analysis_entry = {
"timestamp": datetime.now().isoformat(),
"trade_id": trade_object.get('id', 'N/A'),
"symbol": trade_object.get('symbol', 'N/A'),
"outcome": close_reason,
"market_conditions": market_context,
"strategy_used": strategy,
"exit_profile_used": exit_profile,
"pnl_percent": pnl_percent,
"vader_score": vader_score_at_decision, # (إضافة)
"vader_bin": vader_bin # (إضافة)
}
self.performance_history.append(analysis_entry)
# --- 2. تحديث إحصائيات استراتيجية الدخول (strategy_effectiveness) ---
if strategy not in self.strategy_effectiveness:
self.strategy_effectiveness[strategy] = {"total_trades": 0, "successful_trades": 0, "total_pnl_percent": 0}
self.strategy_effectiveness[strategy]["total_trades"] += 1
self.strategy_effectiveness[strategy]["total_pnl_percent"] += pnl_percent
if is_success:
self.strategy_effectiveness[strategy]["successful_trades"] += 1
# --- 3. تحديث إحصائيات مزيج (الدخول + الخروج) (exit_profile_effectiveness) ---
if combined_key not in self.exit_profile_effectiveness:
self.exit_profile_effectiveness[combined_key] = {"total_trades": 0, "successful_trades": 0, "total_pnl_percent": 0, "pnl_list": []}
self.exit_profile_effectiveness[combined_key]["total_trades"] += 1
self.exit_profile_effectiveness[combined_key]["total_pnl_percent"] += pnl_percent
self.exit_profile_effectiveness[combined_key]["pnl_list"].append(pnl_percent)
if len(self.exit_profile_effectiveness[combined_key]["pnl_list"]) > 100:
self.exit_profile_effectiveness[combined_key]["pnl_list"] = self.exit_profile_effectiveness[combined_key]["pnl_list"][-100:]
if is_success:
self.exit_profile_effectiveness[combined_key]["successful_trades"] += 1
# --- 4. تحديث إحصائيات ظروف السوق (market_patterns) ---
if market_condition not in self.market_patterns:
self.market_patterns[market_condition] = {"total_trades": 0, "successful_trades": 0, "total_pnl_percent": 0}
self.market_patterns[market_condition]["total_trades"] += 1
self.market_patterns[market_condition]["total_pnl_percent"] += pnl_percent
if is_success:
self.market_patterns[market_condition]["successful_trades"] += 1
# 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴
# --- 5. تحديث إحصائيات VADER ---
if vader_bin not in self.vader_bin_effectiveness:
# (لضمان عدم حدوث خطأ إذا كانت السلة غير موجودة)
self.vader_bin_effectiveness[vader_bin] = {"total_trades": 0, "total_pnl_percent": 0}
self.vader_bin_effectiveness[vader_bin]["total_trades"] += 1
self.vader_bin_effectiveness[vader_bin]["total_pnl_percent"] += pnl_percent
# 🔴 --- END OF CHANGE --- 🔴
# --- 6. تكييف الأوزان والحفظ (إذا لزم الأمر) ---
# (ملاحظة: نحتاج إلى إضافة منطق لتعلم أوزان الحارس هنا مستقبلاً)
if should_update_weights(len(self.performance_history)):
await self.adapt_weights_based_on_performance()
await self.save_weights_to_r2()
await self.save_performance_history()
await self.save_exit_profile_effectiveness()
# 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴
await self.save_vader_effectiveness() # (حفظ إحصائيات VADER)
# 🔴 --- END OF CHANGE --- 🔴
print(f"✅ [StatsAnalyzer] تم تحديث الإحصائيات لـ {strategy} (News Bin: {vader_bin})")
except Exception as e:
print(f"❌ [StatsAnalyzer] فشل تحديث الإحصائيات: {e}")
traceback.print_exc()
async def adapt_weights_based_on_performance(self):
"""تكييف أوزان استراتيجيات الدخول بناءً على الأداء الإحصائي"""
# (ملاحظة: هذا المنطق حالياً يكيف فقط strategy_weights)
# (سنحتاج لتطويره لاحقاً ليكيف أوزان الحارس)
print("🔄 [StatsAnalyzer] تكييف أوزان الاستراتيجيات (التعلم البطيء)...")
try:
strategy_performance = {}
total_performance = 0
for strategy, data in self.strategy_effectiveness.items():
if data.get("total_trades", 0) > 2: # (يتطلب 3 صفقات على الأقل للتكيف)
success_rate = data["successful_trades"] / data["total_trades"]
avg_pnl = data["total_pnl_percent"] / data["total_trades"]
# مقياس مركب: (معدل النجاح * 60%) + (متوسط الربح * 40%)
# (يتم تقييد متوسط الربح بين -5 و +5)
normalized_pnl = min(max(avg_pnl, -5.0), 5.0) / 5.0 # (من -1 إلى 1)
composite_performance = (success_rate * 0.6) + (normalized_pnl * 0.4)
strategy_performance[strategy] = composite_performance
total_performance += composite_performance
if total_performance > 0 and strategy_performance:
base_weights = self.weights.get("strategy_weights", {})
for strategy, performance in strategy_performance.items():
current_weight = base_weights.get(strategy, 0.1)
# (تعديل طفيف: 80% من الوزن الحالي + 20% من الأداء)
new_weight = (current_weight * 0.8) + (performance * 0.2)
base_weights[strategy] = max(new_weight, 0.05) # (الحد الأدنى للوزن 5%)
normalize_weights(base_weights)
self.weights["strategy_weights"] = base_weights
print(f"✅ [StatsAnalyzer] تم تكييف الأوزان: {base_weights}")
except Exception as e:
print(f"❌ [StatsAnalyzer] فشل تكييف الأوزان: {e}")
# --- (الدوال المساعدة لجلب البيانات - مأخوذة من الملف القديم) ---
async def get_best_exit_profile(self, entry_strategy: str) -> str:
"""يجد أفضل ملف خروج إحصائياً لاستراتيجية دخول معينة."""
if not self.initialized or not self.exit_profile_effectiveness:
return "unknown"
relevant_profiles = {}
for combined_key, data in self.exit_profile_effectiveness.items():
if combined_key.startswith(f"{entry_strategy}_"):
if data.get("total_trades", 0) >= 3: # (يتطلب 3 صفقات)
exit_profile_name = combined_key.replace(f"{entry_strategy}_", "", 1)
avg_pnl = data["total_pnl_percent"] / data["total_trades"]
relevant_profiles[exit_profile_name] = avg_pnl
if not relevant_profiles:
return "unknown"
best_profile = max(relevant_profiles, key=relevant_profiles.get)
return best_profile
# 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴
async def get_statistical_vader_pnl(self, vader_score: float) -> float:
"""
جلب متوسط الربح/الخسارة التاريخي لدرجة VADER
"""
if not self.initialized:
return 0.0 # (العودة بقيمة محايدة)
vader_bin = self._get_vader_bin(vader_score)
bin_data = self.vader_bin_effectiveness.get(vader_bin)
if not bin_data or bin_data.get("total_trades", 0) < 3:
# (لا توجد بيانات كافية، العودة بقيمة محايدة)
return 0.0
# (إرجاع متوسط الربح/الخسارة الفعلي لهذه السلة)
avg_pnl = bin_data["total_pnl_percent"] / bin_data["total_trades"]
return avg_pnl
# 🔴 --- END OF CHANGE --- 🔴
# 🔴 --- START OF CHANGE --- 🔴
async def get_optimized_weights(self, market_condition: str) -> Dict[str, float]:
"""
جلب جميع الأوزان المعدلة إحصائياً (لكل من MLProcessor والحارس).
"""
if not self.initialized or "strategy_weights" not in self.weights:
await self.initialize()
base_weights = self.weights.copy()
# (يمكننا إضافة منطق تعديل الأوزان بناءً على ظروف السوق هنا)
# (لكن في الوقت الحالي، سنعيد الأوزان المعدلة إحصائياً كما هي)
if not base_weights:
# (العودة إلى الافتراضيات إذا كانت الأوزان فارغة)
return await self.get_default_strategy_weights()
return base_weights
# 🔴 --- END OF CHANGE --- 🔴
async def get_default_strategy_weights(self) -> Dict[str, float]:
"""إرجاع الأوزان الافتراضية عند الفشل"""
# 🔴 --- START OF CHANGE --- 🔴
# (إرجاع كل شيء، وليس فقط أوزان الاستراتيجية)
return {
"strategy_weights": {
"trend_following": 0.18, "mean_reversion": 0.15, "breakout_momentum": 0.22,
"volume_spike": 0.12, "whale_tracking": 0.15, "pattern_recognition": 0.10,
"hybrid_ai": 0.08
},
"indicator_weights": {
"rsi": 0.2, "macd": 0.2, "bbands": 0.15, "atr": 0.1,
"volume_ratio": 0.2, "vwap": 0.15
},
"pattern_weights": {
"Double Bottom": 0.3, "Breakout Up": 0.3, "Uptrend": 0.2,
"Near Support": 0.2, "Double Top": -0.3
},
"reversal_indicator_weights": {
"pattern": 0.4, "rsi": 0.3, "macd": 0.3
},
"entry_trigger_weights": {
"cvd": 0.25, "order_book": 0.25, "ema_1m": 0.25, "macd_1m": 0.25
},
"entry_trigger_threshold": 0.75
}
# 🔴 --- END OF CHANGE --- 🔴
async def get_current_market_conditions(self) -> Dict[str, Any]:
"""جلب سياق السوق الحالي (من الملف القديم)"""
try:
if not self.data_manager:
raise ValueError("DataManager unavailable")
market_context = await self.data_manager.get_market_context_async()
if not market_context:
raise ValueError("Market context fetch failed")
# (نحتاج دالة لحساب التقلب - نفترض أنها في helpers)
# volatility = calculate_market_volatility(market_context)
return {
"current_trend": market_context.get('market_trend', 'sideways_market'),
"volatility": "medium", # (قيمة مؤقتة)
"market_sentiment": market_context.get('btc_sentiment', 'NEUTRAL'),
}
except Exception as e:
return {"current_trend": "sideways_market", "volatility": "medium", "market_sentiment": "NEUTRAL"}
# 🔴 --- START OF CHANGE --- 🔴
# (تم حذف القوس } الزائد من هنا)
# 🔴 --- END OF CHANGE --- 🔴