Trad / learning_hub /hub_manager.py
Riy777's picture
Update learning_hub/hub_manager.py
370e8b2 verified
# learning_hub/hub_manager.py
# (محدث بالكامل - V3 - Whale Learning Loop)
import asyncio
import traceback
from typing import Any, Dict, List
from datetime import datetime, timezone
# (استيراد جميع المكونات الداخلية للمركز)
from .schemas import *
from .policy_engine import PolicyEngine
from .memory_store import MemoryStore
from .statistical_analyzer import StatisticalAnalyzer
from .reflector import Reflector
from .curator import Curator
# 🔴 --- (جديد V3) استيراد لتحليل الارتباط --- 🔴
try:
import numpy as np
from scipy.stats import pearsonr
NUMPY_AVAILABLE = True
except ImportError:
print("❌ [HubManager] مكتبة numpy أو scipy غير مثبتة! لن يعمل تعلم الحيتان.")
NUMPY_AVAILABLE = False
class LearningHubManager:
def __init__(self, r2_service: Any, llm_service: Any, data_manager: Any):
print("🚀 Initializing Learning Hub Manager (V3)...")
# 1. الخدمات الأساسية (يتم تمريرها من app.py)
self.r2_service = r2_service
self.llm_service = llm_service
self.data_manager = data_manager
# 2. تهيئة المكونات (بناء النظام)
self.policy_engine = PolicyEngine()
self.memory_store = MemoryStore(
r2_service=self.r2_service,
policy_engine=self.policy_engine,
llm_service=self.llm_service
)
self.reflector = Reflector(
llm_service=self.llm_service,
memory_store=self.memory_store
)
self.curator = Curator(
llm_service=self.llm_service,
memory_store=self.memory_store
)
self.statistical_analyzer = StatisticalAnalyzer(
r2_service=self.r2_service,
data_manager=self.data_manager
)
# 🔴 --- (جديد V3) متغيرات حالة لتعلم الحيتان --- 🔴
self.whale_learning_lock = asyncio.Lock()
self.optimal_whale_config = {} # (الأوزان المتعلمة)
self.initialized = False
print("✅ Learning Hub Manager constructed. Ready for initialization.")
async def initialize(self):
"""
تهيئة جميع الأنظمة الفرعية، وخاصة تحميل الإحصائيات والأوزان.
"""
if self.initialized:
return
print("🔄 [HubManager] Initializing all sub-modules...")
await self.statistical_analyzer.initialize()
# 🔴 --- (جديد V3) تحميل إعدادات الحيتان المتعلمة --- 🔴
if hasattr(self.r2_service, 'load_whale_learning_config_async'):
self.optimal_whale_config = await self.r2_service.load_whale_learning_config_async()
if self.optimal_whale_config:
print(f"✅ [HubManager] تم تحميل إعدادات تعلم الحيتان المثلى: {self.optimal_whale_config}")
else:
print("ℹ️ [HubManager] لم يتم العثور على إعدادات تعلم حيتان سابقة.")
self.initialized = True
print("✅ [HubManager] All sub-modules initialized. Learning Hub is LIVE.")
async def analyze_trade_and_learn(self, trade_object: Dict[str, Any], close_reason: str):
"""
هذه هي الدالة الرئيسية التي يستدعيها TradeManager.
إنها تشغل كلاً من نظام التعلم السريع (Reflector) والبطيء (StatsAnalyzer).
"""
if not self.initialized:
print("⚠️ [HubManager] Learning Hub not initialized. Skipping learning.")
return
print(f"🧠 [HubManager] Learning from trade {trade_object.get('symbol')}...")
try:
# 1. التعلم السريع (Reflector):
await self.reflector.analyze_trade_outcome(trade_object, close_reason)
except Exception as e:
print(f"❌ [HubManager] Reflector (Fast-Learner) failed: {e}")
try:
# 2. التعلم البطيء (StatisticalAnalyzer):
await self.statistical_analyzer.update_statistics(trade_object, close_reason)
except Exception as e:
print(f"❌ [HubManager] StatisticalAnalyzer (Slow-Learner) failed: {e}")
print(f"✅ [HubManager] Learning complete for {trade_object.get('symbol')}.")
async def get_active_context_for_llm(self, domain: str, query: str) -> str:
"""
يُستخدم بواسطة LLMService لجلب "الدفتر" (Playbook) / القواعد (Deltas).
"""
if not self.initialized:
return "Learning Hub not initialized."
return await self.memory_store.get_active_context(domain, query)
async def get_statistical_feedback_for_llm(self, entry_strategy: str) -> str:
"""
يُستخدم بواسطة LLMService لجلب أفضل ملف خروج (إحصائياً).
"""
if not self.initialized:
return "Learning Hub not initialized."
best_profile = await self.statistical_analyzer.get_best_exit_profile(entry_strategy)
if best_profile != "unknown":
# (Prompt in English as requested)
feedback = f"Statistical Feedback: For the '{entry_strategy}' strategy, the '{best_profile}' exit profile has historically performed best."
return feedback
else:
return "No statistical feedback available for this strategy yet."
# 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴
async def get_statistical_news_score(self, raw_vader_score: float) -> float:
"""
يحول درجة VADER الخام إلى متوسط الربح/الخسارة التاريخي المتوقع.
(يُستخدم بواسطة app.py / MLProcessor للترتيب الداخلي)
"""
if not self.initialized:
return 0.0 # محايد
# (جلب متوسط الربح/الخسارة الفعلي من المحلل الإحصائي)
historical_pnl = await self.statistical_analyzer.get_statistical_vader_pnl(raw_vader_score)
# (إرجاع النسبة المئوية للربح/الخسارة مباشرة، مثلاً: 1.1 أو -0.5)
return historical_pnl
# 🔴 --- END OF CHANGE --- 🔴
# 🔴 --- START OF CHANGE --- 🔴
async def get_optimized_weights(self, market_condition: str) -> Dict[str, float]:
"""
يُستخدم بواسطة MLProcessor/StrategyEngine/Sentry لجلب الأوزان المعدلة إحصائياً.
"""
if not self.initialized:
# (الحصول على كل الأوزان الافتراضية)
return await self.statistical_analyzer.get_default_strategy_weights()
# (الحصول على كل الأوزان المحسنة)
return await self.statistical_analyzer.get_optimized_weights(market_condition)
# 🔴 --- END OF CHANGE --- 🔴
async def run_distillation_check(self):
"""
(يتم استدعاؤها دورياً من app.py)
للتحقق من جميع المجالات وتشغيل التقطير إذا لزم الأمر.
"""
if not self.initialized:
return
print("ℹ️ [HubManager] Running periodic distillation check...")
for domain in self.memory_store.domain_files.keys():
await self.curator.check_and_distill_domain(domain)
print("✅ [HubManager] Distillation check complete.")
# (No change to shutdown function)
async def shutdown(self):
"""
Saves all persistent data from the statistical analyzer.
"""
if not self.initialized:
return
print("🔄 [HubManager] Shutting down... Saving all learning data.")
try:
await self.statistical_analyzer.save_weights_to_r2()
await self.statistical_analyzer.save_performance_history()
await self.statistical_analyzer.save_exit_profile_effectiveness()
# 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴
await self.statistical_analyzer.save_vader_effectiveness()
# 🔴 --- END OF CHANGE --- 🔴
print("✅ [HubManager] All statistical (slow-learner) data saved.")
except Exception as e:
print(f"❌ [HubManager] Failed to save learning data on shutdown: {e}")
# 🔴 --- START OF CHANGE (V3 - Whale Learning Loop) --- 🔴
async def run_whale_learning_check(self):
"""
(جديد V3 - "المُسجّل" Logger)
يعمل في الخلفية لإكمال سجلات تعلم الحيتان المعلقة.
"""
if not self.initialized:
await asyncio.sleep(60) # انتظر حتى تتم التهيئة
print(f"🧠 [Whale-Logger] بدء تشغيل حلقة تعلم الحيتان (المُسجّل)...")
# (الانتظار 10 دقائق عند بدء التشغيل للسماح بجمع بعض البيانات)
await asyncio.sleep(600)
while True:
try:
# (1. جلب السجلات المعلقة)
pending_records = await self.r2_service.get_pending_whale_learning_records_async()
if not pending_records:
# (لا توجد سجلات، انتظر 10 دقائق)
await asyncio.sleep(600)
continue
print(f"🧠 [Whale-Logger] تم العثور على {len(pending_records)} سجل تعلم معلق. بدء المعالجة...")
now_utc = datetime.now(timezone.utc)
for record in pending_records:
try:
target_time_utc = datetime.fromisoformat(record['target_time_utc'])
# (2. التحقق من الوقت)
if now_utc >= target_time_utc:
print(f" -> [Whale-Logger] معالجة سجل {record['symbol']} (ID: {record['record_id']})...")
# (حان وقت جلب السعر المستقبلي)
symbol = record['symbol']
target_price = await self.data_manager.get_latest_price_async(symbol)
if target_price and target_price > 0 and record['start_price_usd'] > 0:
# (3. حساب النتيجة)
price_change_pct = ((target_price - record['start_price_usd']) / record['start_price_usd']) * 100
record['target_price_usd'] = target_price
record['price_change_percentage'] = price_change_pct
record['status'] = "COMPLETED"
# (4. تحديث السجل في R2)
await self.r2_service.update_completed_whale_learning_record_async(record)
else:
print(f" ⚠️ [Whale-Logger] فشل جلب السعر المستقبلي لـ {symbol}. سيعاد المحاولة لاحقاً.")
else:
# (لم يحن الوقت بعد)
pass
except Exception as e_inner:
print(f"❌ [Whale-Logger] فشل معالجة سجل فردي: {e_inner}")
# (تشغيل "المعلّم" بعد كل دورة تسجيل)
await self.update_optimal_whale_window()
# (الانتظار 5 دقائق قبل التحقق مرة أخرى)
await asyncio.sleep(300)
except Exception as e_outer:
print(f"❌ [Whale-Logger] خطأ فادح في حلقة تعلم الحيتان: {e_outer}")
traceback.print_exc()
await asyncio.sleep(600) # (انتظار 10 دقائق عند الفشل الفادح)
async def update_optimal_whale_window(self):
"""
(جديد V3 - "المعلّم" Teacher)
يحلل جميع السجلات المكتملة ويجد أفضل "مقياس + نافذة" للارتباط.
"""
if not NUMPY_AVAILABLE:
print("⚠️ [Whale-Teacher] لا يمكن تشغيل تحليل الارتباط (numpy/scipy مفقودة).")
return
async with self.whale_learning_lock:
print("👨‍🏫 [Whale-Teacher] بدء تحليل الارتباط الإحصائي...")
try:
# (1. جلب جميع السجلات المكتملة)
all_completed = await self.r2_service.get_all_completed_whale_records_async()
if len(all_completed) < 20: # (نحتاج 20 عينة على الأقل لبدء التعلم)
print(f"👨‍🏫 [Whale-Teacher] نحتاج 20 سجل مكتمل على الأقل (الحالي: {len(all_completed)}). تخطي التحليل.")
return
# (2. استخراج البيانات في مصفوفات Numpy)
price_changes = []
metrics_data = defaultdict(lambda: defaultdict(list))
# (قائمة بجميع المقاييس التي نريد اختبارها)
windows = ['30m', '1h', '2h', '4h', '24h']
metric_keys = ['relative_net_flow_percent', 'transaction_density', 'net_flow_usd']
for record in all_completed:
if record.get('price_change_percentage') is None: continue
price_changes.append(record['price_change_percentage'])
analysis = record.get('window_analysis', {})
for w in windows:
window_data = analysis.get(w, {})
for k in metric_keys:
metrics_data[w][k].append(window_data.get(k, 0.0))
price_changes_np = np.array(price_changes)
if len(price_changes_np) < 20:
print("👨‍🏫 [Whale-Teacher] لا توجد بيانات كافية (NP) للارتباط.")
return
# (3. حساب الارتباط)
correlation_results = {}
for w in windows:
for k in metric_keys:
metric_np = np.array(metrics_data[w][k])
if len(metric_np) != len(price_changes_np): continue
# (حساب ارتباط بيرسون)
corr, p_value = pearsonr(metric_np, price_changes_np)
if not np.isnan(corr) and p_value < 0.1: # (نهتم فقط بالارتباطات ذات الدلالة الإحصائية)
correlation_results[f"{w}_{k}"] = abs(corr) # (نهتم بقوة الارتباط، بغض النظر عن الاتجاه)
if not correlation_results:
print("👨‍🏫 [Whale-Teacher] لم يتم العثور على ارتباطات إحصائية ذات دلالة.")
return
# (4. العثور على الفائز وحفظه)
best_metric_key = max(correlation_results, key=correlation_results.get)
best_correlation = correlation_results[best_metric_key]
# (تقسيم المفتاح: '1h_relative_net_flow_percent')
best_window, best_metric = best_metric_key.split('_', 1)
new_config = {
"best_window": best_window,
"best_metric": best_metric,
"correlation_score": best_correlation,
"total_samples": len(price_changes_np),
"last_updated_utc": datetime.now(timezone.utc).isoformat()
}
# (حفظ الإعدادات الجديدة ومشاركتها مع النظام)
self.optimal_whale_config = new_config
await self.r2_service.save_whale_learning_config_async(new_config)
print(f"🏆 [Whale-Teacher] تم العثور على أفضل إشارة جديدة!")
print(f" -> المقياس: {best_metric}")
print(f" -> النافذة: {best_window}")
print(f" -> الارتباط: {best_correlation:.4f} (على {len(price_changes_np)} عينة)")
except Exception as e:
print(f"❌ [Whale-Teacher] فشل تحليل الارتباط: {e}")
traceback.print_exc()
# 🔴 --- END OF CHANGE --- 🔴