# learning_hub/memory_store.py import json import asyncio from datetime import datetime # 🔴 --- START OF CHANGE --- 🔴 from typing import List, Dict, Optional, Any # (Import Any here) # 🔴 --- END OF CHANGE --- 🔴 from .schemas import Delta, ReflectorOutput from .policy_engine import PolicyEngine # للتوافق مع R2Service (نفترض أنه سيتم تمريره) # (لا يمكننا استيراده مباشرة لتجنب التبعيات الدائرية) class MemoryStore: # (The __init__ signature now correctly uses the imported Any) def __init__(self, r2_service: Any, policy_engine: PolicyEngine, llm_service: Any): self.r2_service = r2_service self.policy_engine = policy_engine self.llm_service = llm_service # نحتاجه لعملية "التقطير" (Distillation) self.domain_files = { "strategy": "learning_deltas_strategy.json", "pattern": "learning_deltas_pattern.json", "indicator": "learning_deltas_indicator.json", "monte_carlo": "learning_deltas_monte_carlo.json", "general": "learning_deltas_general.json" } self.distill_threshold = 50 # (من النقطة 6) print("✅ Learning Hub Module: Memory Store loaded (FIXED: Imported Any)") async def _load_deltas_from_r2(self, domain: str) -> List[Dict]: """تحميل ملف الدلتا المحدد من R2""" key = self.domain_files.get(domain, self.domain_files["general"]) try: response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) deltas_list = json.loads(response['Body'].read()) return deltas_list except Exception as e: # إذا فشل التحميل (مثل الملف غير موجود)، أعد قائمة فارغة print(f"ℹ️ [MemoryStore] لم يتم العثور على ملف دلتا لـ {domain}، سيتم إنشاء واحد جديد.") return [] async def _save_deltas_to_r2(self, domain: str, deltas_list: List[Dict]): """حفظ ملف الدلتا المحدث إلى R2""" key = self.domain_files.get(domain, self.domain_files["general"]) try: # (Ensure list contains dicts before saving) deltas_to_save = [d.model_dump() if isinstance(d, Delta) else d for d in deltas_list] data_json = json.dumps(deltas_to_save, indent=2, ensure_ascii=False).encode('utf-8') self.r2_service.s3_client.put_object( Bucket="trading", Key=key, Body=data_json, ContentType="application/json" ) except Exception as e: print(f"❌ [MemoryStore] فشل حفظ الدلتا إلى R2: {e}") async def save_new_delta(self, reflector_output: ReflectorOutput, trade_object: Dict[str, Any], domain: str = "strategy"): """ حفظ "دلتا" جديدة بناءً على مخرجات المنعكس وسياسة القبول. """ try: trade_pnl_percent = trade_object.get('pnl_percent', 0) # 1. التحقق من سياسة القبول is_approved, approval_reason = self.policy_engine.get_delta_acceptance( reflector_output, trade_pnl_percent ) # 2. إنشاء كائن الدلتا new_delta = Delta( text=reflector_output.suggested_rule, domain=domain, score=reflector_output.confidence, evidence_refs=[trade_object.get('id', 'unknown_trade_id')], approved=is_approved, trade_strategy=trade_object.get('strategy', 'unknown'), exit_profile=trade_object.get('decision_data', {}).get('exit_profile', 'unknown') ) # 3. تحميل، إضافة، وحفظ الدلتا deltas_list = await self._load_deltas_from_r2(domain) deltas_list.append(new_delta.model_dump()) # Use model_dump() for pydantic models await self._save_deltas_to_r2(domain, deltas_list) print(f"✅ [MemoryStore] تم حفظ دلتا جديدة لـ {domain}. الحالة: {approval_reason}") # 4. تفعيل عملية "التقطير" (لا يتم تنفيذها هنا مباشرة) if len([d for d in deltas_list if d.get('approved')]) % self.distill_threshold == 0 and is_approved: print(f"ℹ️ [MemoryStore] تم الوصول إلى حد {self.distill_threshold} دلتا لـ {domain}. التقطير سيتم جدولته.") # (Curator will handle the actual check and distillation) except Exception as e: print(f"❌ [MemoryStore] فشل فادح في حفظ الدلتا: {e}") traceback.print_exc() async def get_active_context(self, domain: str, query: str, top_k: int = 3) -> str: """ جلب "السياق النشط" (Active Context) لإرساله إلى النموذج. """ try: all_deltas_dicts = await self._load_deltas_from_r2(domain) # 1. تصفية الدلتا المعتمدة فقط approved_deltas = [Delta(**d) for d in all_deltas_dicts if d.get('approved', False)] if not approved_deltas: # (Return English text for consistency) return "Playbook: No approved learning rules (Deltas) found for this domain yet." # 2. خوارزمية الاسترجاع (نسخة مبسطة) scored_deltas = [] for delta in approved_deltas: priority_map = {"high": 1.0, "medium": 0.6, "low": 0.2} priority_score = priority_map.get(delta.priority, 0.6) try: age_days = (datetime.now() - datetime.fromisoformat(delta.created_at)).days freshness_score = max(0, 1.0 - (age_days / 90.0)) except Exception: freshness_score = 0.5 relevance_score = 0.5 query_words = set(query.lower().split()) delta_words = set(delta.text.lower().split()) if query_words.intersection(delta_words): relevance_score = 1.0 elif delta.trade_strategy and delta.trade_strategy.lower() in query_words: relevance_score = 0.8 final_score = (0.6 * relevance_score) + (0.3 * priority_score) + (0.1 * freshness_score) scored_deltas.append((final_score, delta)) # 3. فرز واختيار أفضل K scored_deltas.sort(key=lambda x: x[0], reverse=True) top_deltas = [delta for score, delta in scored_deltas[:top_k]] # 4. تنسيق الموجه (باللغة الإنجليزية) if not top_deltas: return "Playbook: No relevant learning rules (Deltas) found for this query." playbook_header = f"Playbook (Top {len(top_deltas)} Rules - Domain: {domain}):" # (Added score for context) delta_lines = [f"• {delta.text} (Score: {delta.score:.2f}, Prio: {delta.priority})" for delta in top_deltas] # (Distilled rules are just high-priority deltas in this implementation) return "\n".join([playbook_header] + delta_lines) except Exception as e: print(f"❌ [MemoryStore] فشل جلب السياق النشط: {e}") # (Return English text for consistency) return "Playbook: Error retrieving learning context."