# learning_hub/hub_manager.py # (محدث بالكامل - V3 - Whale Learning Loop) import asyncio import traceback from typing import Any, Dict, List from datetime import datetime, timezone # (استيراد جميع المكونات الداخلية للمركز) from .schemas import * from .policy_engine import PolicyEngine from .memory_store import MemoryStore from .statistical_analyzer import StatisticalAnalyzer from .reflector import Reflector from .curator import Curator # 🔴 --- (جديد V3) استيراد لتحليل الارتباط --- 🔴 try: import numpy as np from scipy.stats import pearsonr NUMPY_AVAILABLE = True except ImportError: print("❌ [HubManager] مكتبة numpy أو scipy غير مثبتة! لن يعمل تعلم الحيتان.") NUMPY_AVAILABLE = False class LearningHubManager: def __init__(self, r2_service: Any, llm_service: Any, data_manager: Any): print("🚀 Initializing Learning Hub Manager (V3)...") # 1. الخدمات الأساسية (يتم تمريرها من app.py) self.r2_service = r2_service self.llm_service = llm_service self.data_manager = data_manager # 2. تهيئة المكونات (بناء النظام) self.policy_engine = PolicyEngine() self.memory_store = MemoryStore( r2_service=self.r2_service, policy_engine=self.policy_engine, llm_service=self.llm_service ) self.reflector = Reflector( llm_service=self.llm_service, memory_store=self.memory_store ) self.curator = Curator( llm_service=self.llm_service, memory_store=self.memory_store ) self.statistical_analyzer = StatisticalAnalyzer( r2_service=self.r2_service, data_manager=self.data_manager ) # 🔴 --- (جديد V3) متغيرات حالة لتعلم الحيتان --- 🔴 self.whale_learning_lock = asyncio.Lock() self.optimal_whale_config = {} # (الأوزان المتعلمة) self.initialized = False print("✅ Learning Hub Manager constructed. Ready for initialization.") async def initialize(self): """ تهيئة جميع الأنظمة الفرعية، وخاصة تحميل الإحصائيات والأوزان. """ if self.initialized: return print("🔄 [HubManager] Initializing all sub-modules...") await self.statistical_analyzer.initialize() # 🔴 --- (جديد V3) تحميل إعدادات الحيتان المتعلمة --- 🔴 if hasattr(self.r2_service, 'load_whale_learning_config_async'): self.optimal_whale_config = await self.r2_service.load_whale_learning_config_async() if self.optimal_whale_config: print(f"✅ [HubManager] تم تحميل إعدادات تعلم الحيتان المثلى: {self.optimal_whale_config}") else: print("ℹ️ [HubManager] لم يتم العثور على إعدادات تعلم حيتان سابقة.") self.initialized = True print("✅ [HubManager] All sub-modules initialized. Learning Hub is LIVE.") async def analyze_trade_and_learn(self, trade_object: Dict[str, Any], close_reason: str): """ هذه هي الدالة الرئيسية التي يستدعيها TradeManager. إنها تشغل كلاً من نظام التعلم السريع (Reflector) والبطيء (StatsAnalyzer). """ if not self.initialized: print("⚠️ [HubManager] Learning Hub not initialized. Skipping learning.") return print(f"🧠 [HubManager] Learning from trade {trade_object.get('symbol')}...") try: # 1. التعلم السريع (Reflector): await self.reflector.analyze_trade_outcome(trade_object, close_reason) except Exception as e: print(f"❌ [HubManager] Reflector (Fast-Learner) failed: {e}") try: # 2. التعلم البطيء (StatisticalAnalyzer): await self.statistical_analyzer.update_statistics(trade_object, close_reason) except Exception as e: print(f"❌ [HubManager] StatisticalAnalyzer (Slow-Learner) failed: {e}") print(f"✅ [HubManager] Learning complete for {trade_object.get('symbol')}.") async def get_active_context_for_llm(self, domain: str, query: str) -> str: """ يُستخدم بواسطة LLMService لجلب "الدفتر" (Playbook) / القواعد (Deltas). """ if not self.initialized: return "Learning Hub not initialized." return await self.memory_store.get_active_context(domain, query) async def get_statistical_feedback_for_llm(self, entry_strategy: str) -> str: """ يُستخدم بواسطة LLMService لجلب أفضل ملف خروج (إحصائياً). """ if not self.initialized: return "Learning Hub not initialized." best_profile = await self.statistical_analyzer.get_best_exit_profile(entry_strategy) if best_profile != "unknown": # (Prompt in English as requested) feedback = f"Statistical Feedback: For the '{entry_strategy}' strategy, the '{best_profile}' exit profile has historically performed best." return feedback else: return "No statistical feedback available for this strategy yet." # 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴 async def get_statistical_news_score(self, raw_vader_score: float) -> float: """ يحول درجة VADER الخام إلى متوسط الربح/الخسارة التاريخي المتوقع. (يُستخدم بواسطة app.py / MLProcessor للترتيب الداخلي) """ if not self.initialized: return 0.0 # محايد # (جلب متوسط الربح/الخسارة الفعلي من المحلل الإحصائي) historical_pnl = await self.statistical_analyzer.get_statistical_vader_pnl(raw_vader_score) # (إرجاع النسبة المئوية للربح/الخسارة مباشرة، مثلاً: 1.1 أو -0.5) return historical_pnl # 🔴 --- END OF CHANGE --- 🔴 # 🔴 --- START OF CHANGE --- 🔴 async def get_optimized_weights(self, market_condition: str) -> Dict[str, float]: """ يُستخدم بواسطة MLProcessor/StrategyEngine/Sentry لجلب الأوزان المعدلة إحصائياً. """ if not self.initialized: # (الحصول على كل الأوزان الافتراضية) return await self.statistical_analyzer.get_default_strategy_weights() # (الحصول على كل الأوزان المحسنة) return await self.statistical_analyzer.get_optimized_weights(market_condition) # 🔴 --- END OF CHANGE --- 🔴 async def run_distillation_check(self): """ (يتم استدعاؤها دورياً من app.py) للتحقق من جميع المجالات وتشغيل التقطير إذا لزم الأمر. """ if not self.initialized: return print("ℹ️ [HubManager] Running periodic distillation check...") for domain in self.memory_store.domain_files.keys(): await self.curator.check_and_distill_domain(domain) print("✅ [HubManager] Distillation check complete.") # (No change to shutdown function) async def shutdown(self): """ Saves all persistent data from the statistical analyzer. """ if not self.initialized: return print("🔄 [HubManager] Shutting down... Saving all learning data.") try: await self.statistical_analyzer.save_weights_to_r2() await self.statistical_analyzer.save_performance_history() await self.statistical_analyzer.save_exit_profile_effectiveness() # 🔴 --- START OF CHANGE (V2 - VADER Learning) --- 🔴 await self.statistical_analyzer.save_vader_effectiveness() # 🔴 --- END OF CHANGE --- 🔴 print("✅ [HubManager] All statistical (slow-learner) data saved.") except Exception as e: print(f"❌ [HubManager] Failed to save learning data on shutdown: {e}") # 🔴 --- START OF CHANGE (V3 - Whale Learning Loop) --- 🔴 async def run_whale_learning_check(self): """ (جديد V3 - "المُسجّل" Logger) يعمل في الخلفية لإكمال سجلات تعلم الحيتان المعلقة. """ if not self.initialized: await asyncio.sleep(60) # انتظر حتى تتم التهيئة print(f"🧠 [Whale-Logger] بدء تشغيل حلقة تعلم الحيتان (المُسجّل)...") # (الانتظار 10 دقائق عند بدء التشغيل للسماح بجمع بعض البيانات) await asyncio.sleep(600) while True: try: # (1. جلب السجلات المعلقة) pending_records = await self.r2_service.get_pending_whale_learning_records_async() if not pending_records: # (لا توجد سجلات، انتظر 10 دقائق) await asyncio.sleep(600) continue print(f"🧠 [Whale-Logger] تم العثور على {len(pending_records)} سجل تعلم معلق. بدء المعالجة...") now_utc = datetime.now(timezone.utc) for record in pending_records: try: target_time_utc = datetime.fromisoformat(record['target_time_utc']) # (2. التحقق من الوقت) if now_utc >= target_time_utc: print(f" -> [Whale-Logger] معالجة سجل {record['symbol']} (ID: {record['record_id']})...") # (حان وقت جلب السعر المستقبلي) symbol = record['symbol'] target_price = await self.data_manager.get_latest_price_async(symbol) if target_price and target_price > 0 and record['start_price_usd'] > 0: # (3. حساب النتيجة) price_change_pct = ((target_price - record['start_price_usd']) / record['start_price_usd']) * 100 record['target_price_usd'] = target_price record['price_change_percentage'] = price_change_pct record['status'] = "COMPLETED" # (4. تحديث السجل في R2) await self.r2_service.update_completed_whale_learning_record_async(record) else: print(f" ⚠️ [Whale-Logger] فشل جلب السعر المستقبلي لـ {symbol}. سيعاد المحاولة لاحقاً.") else: # (لم يحن الوقت بعد) pass except Exception as e_inner: print(f"❌ [Whale-Logger] فشل معالجة سجل فردي: {e_inner}") # (تشغيل "المعلّم" بعد كل دورة تسجيل) await self.update_optimal_whale_window() # (الانتظار 5 دقائق قبل التحقق مرة أخرى) await asyncio.sleep(300) except Exception as e_outer: print(f"❌ [Whale-Logger] خطأ فادح في حلقة تعلم الحيتان: {e_outer}") traceback.print_exc() await asyncio.sleep(600) # (انتظار 10 دقائق عند الفشل الفادح) async def update_optimal_whale_window(self): """ (جديد V3 - "المعلّم" Teacher) يحلل جميع السجلات المكتملة ويجد أفضل "مقياس + نافذة" للارتباط. """ if not NUMPY_AVAILABLE: print("⚠️ [Whale-Teacher] لا يمكن تشغيل تحليل الارتباط (numpy/scipy مفقودة).") return async with self.whale_learning_lock: print("👨‍🏫 [Whale-Teacher] بدء تحليل الارتباط الإحصائي...") try: # (1. جلب جميع السجلات المكتملة) all_completed = await self.r2_service.get_all_completed_whale_records_async() if len(all_completed) < 20: # (نحتاج 20 عينة على الأقل لبدء التعلم) print(f"👨‍🏫 [Whale-Teacher] نحتاج 20 سجل مكتمل على الأقل (الحالي: {len(all_completed)}). تخطي التحليل.") return # (2. استخراج البيانات في مصفوفات Numpy) price_changes = [] metrics_data = defaultdict(lambda: defaultdict(list)) # (قائمة بجميع المقاييس التي نريد اختبارها) windows = ['30m', '1h', '2h', '4h', '24h'] metric_keys = ['relative_net_flow_percent', 'transaction_density', 'net_flow_usd'] for record in all_completed: if record.get('price_change_percentage') is None: continue price_changes.append(record['price_change_percentage']) analysis = record.get('window_analysis', {}) for w in windows: window_data = analysis.get(w, {}) for k in metric_keys: metrics_data[w][k].append(window_data.get(k, 0.0)) price_changes_np = np.array(price_changes) if len(price_changes_np) < 20: print("👨‍🏫 [Whale-Teacher] لا توجد بيانات كافية (NP) للارتباط.") return # (3. حساب الارتباط) correlation_results = {} for w in windows: for k in metric_keys: metric_np = np.array(metrics_data[w][k]) if len(metric_np) != len(price_changes_np): continue # (حساب ارتباط بيرسون) corr, p_value = pearsonr(metric_np, price_changes_np) if not np.isnan(corr) and p_value < 0.1: # (نهتم فقط بالارتباطات ذات الدلالة الإحصائية) correlation_results[f"{w}_{k}"] = abs(corr) # (نهتم بقوة الارتباط، بغض النظر عن الاتجاه) if not correlation_results: print("👨‍🏫 [Whale-Teacher] لم يتم العثور على ارتباطات إحصائية ذات دلالة.") return # (4. العثور على الفائز وحفظه) best_metric_key = max(correlation_results, key=correlation_results.get) best_correlation = correlation_results[best_metric_key] # (تقسيم المفتاح: '1h_relative_net_flow_percent') best_window, best_metric = best_metric_key.split('_', 1) new_config = { "best_window": best_window, "best_metric": best_metric, "correlation_score": best_correlation, "total_samples": len(price_changes_np), "last_updated_utc": datetime.now(timezone.utc).isoformat() } # (حفظ الإعدادات الجديدة ومشاركتها مع النظام) self.optimal_whale_config = new_config await self.r2_service.save_whale_learning_config_async(new_config) print(f"🏆 [Whale-Teacher] تم العثور على أفضل إشارة جديدة!") print(f" -> المقياس: {best_metric}") print(f" -> النافذة: {best_window}") print(f" -> الارتباط: {best_correlation:.4f} (على {len(price_changes_np)} عينة)") except Exception as e: print(f"❌ [Whale-Teacher] فشل تحليل الارتباط: {e}") traceback.print_exc() # 🔴 --- END OF CHANGE --- 🔴