# r2.py (محدث V10.3 - Typing Fix) import os, traceback, json, time from datetime import datetime, timedelta import asyncio import boto3 from botocore.exceptions import NoCredentialsError, ClientError from typing import List, Dict, Any, Optional # <-- 🔴 السطر المضاف لإصلاح الخطأ R2_ACCOUNT_ID = os.getenv("R2_ACCOUNT_ID") R2_ACCESS_KEY_ID = os.getenv("R2_ACCESS_KEY_ID") R2_SECRET_ACCESS_KEY = os.getenv("R2_SECRET_ACCESS_KEY") BUCKET_NAME = "trading" INITIAL_CAPITAL = 10.0 # 🔴 --- (جديد V10.2) أسماء ملفات التعلم --- 🔴 WHALE_LEARNING_PENDING_KEY = "learning_whale_pending_records.json" WHALE_LEARNING_COMPLETED_KEY = "learning_whale_completed_records.json" WHALE_LEARNING_CONFIG_KEY = "learning_whale_optimal_config.json" class R2Service: def __init__(self): try: endpoint_url = f"https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com" self.s3_client = boto3.client( 's3', endpoint_url=endpoint_url, aws_access_key_id=R2_ACCESS_KEY_ID, aws_secret_access_key=R2_SECRET_ACCESS_KEY, ) self.lock_acquired = False self.BUCKET_NAME = BUCKET_NAME self._open_trades_warning_printed = False self._portfolio_warning_printed = False self._contracts_warning_printed = False except Exception as e: raise RuntimeError(f"Failed to initialize S3 client: {e}") def acquire_lock(self, max_retries=3): lock_path = "lock.txt" for attempt in range(max_retries): try: try: self.s3_client.head_object(Bucket=BUCKET_NAME, Key=lock_path) print(f"🔒 Lock file exists. Attempt {attempt + 1}/{max_retries}. Waiting...") time.sleep(1) except ClientError as e: if e.response['Error']['Code'] == '404': self.s3_client.put_object(Bucket=BUCKET_NAME, Key=lock_path, Body=b'') self.lock_acquired = True print("✅ Lock acquired.") return True else: raise except Exception as e: print(f"❌ Failed to acquire lock: {e}") time.sleep(1) print(f"❌ Failed to acquire lock after {max_retries} attempts.") return False def release_lock(self): lock_path = "lock.txt" if self.lock_acquired: try: self.s3_client.delete_object(Bucket=BUCKET_NAME, Key=lock_path) print("✅ Lock released.") self.lock_acquired = False except Exception as e: print(f"❌ Failed to release lock: {e}") async def save_candidates_async(self, candidates): """حفظ بيانات المرشحين العشرة في ملف منفصل في R2""" try: key = "Candidates.json" data = { "timestamp": datetime.now().isoformat(), "total_candidates": len(candidates), "candidates": candidates } data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8') self.s3_client.put_object( Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" ) print(f"✅ تم حفظ {len(candidates)} مرشح في ملف Candidates في R2") # عرض معلومات المرشحين المحفوظين print("📊 المرشحون المحفوظون:") for i, candidate in enumerate(candidates): symbol = candidate.get('symbol', 'Unknown') score = candidate.get('enhanced_final_score', 0) strategy = candidate.get('target_strategy', 'GENERIC') print(f" {i+1}. {symbol}: {score:.3f} - {strategy}") except Exception as e: print(f"❌ فشل حفظ المرشحين في R2: {e}") async def load_candidates_async(self): """تحميل بيانات المرشحين من R2""" try: key = "Candidates.json" response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) data = json.loads(response['Body'].read()) candidates = data.get('candidates', []) print(f"✅ تم تحميل {len(candidates)} مرشح من R2") return candidates except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': print("⚠️ لا يوجد ملف مرشحين سابق") return [] else: raise async def save_llm_prompts_async(self, symbol, prompt_type, prompt_content, analysis_data=None): """حفظ الـ Prompts المرسلة إلى النموذج الضخم""" try: key = "llm_prompts.json" try: response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) existing_data = json.loads(response['Body'].read()) except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': existing_data = {"prompts": []} else: raise new_prompt = { "timestamp": datetime.now().isoformat(), "symbol": symbol, "prompt_type": prompt_type, # 'trading_decision' or 'trade_reanalysis' "prompt_content": prompt_content, "analysis_data": analysis_data } existing_data["prompts"].append(new_prompt) if len(existing_data["prompts"]) > 2000: existing_data["prompts"] = existing_data["prompts"][-2000:] data_json = json.dumps(existing_data, indent=2, ensure_ascii=False).encode('utf-8') self.s3_client.put_object( Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" ) print(f"✅ تم حفظ prompt لـ {symbol} ({prompt_type}) في R2") except Exception as e: print(f"❌ فشل حفظ prompt لـ {symbol}: {e}") async def save_system_logs_async(self, log_data): try: key = "system_logs.json" try: response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) existing_logs = json.loads(response['Body'].read()) except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': existing_logs = {"logs": []} else: raise log_entry = { "timestamp": datetime.now().isoformat(), **log_data } existing_logs["logs"].append(log_entry) if len(existing_logs["logs"]) > 2000: existing_logs["logs"] = existing_logs["logs"][-2000:] data_json = json.dumps(existing_logs, indent=2, ensure_ascii=False).encode('utf-8') self.s3_client.put_object( Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" ) print(f"✅ System log saved: {log_data.get('cycle_started', log_data.get('cycle_completed', 'event'))}") except Exception as e: print(f"❌ Failed to save system logs: {e}") async def save_learning_data_async(self, learning_data): try: key = "learning_data.json" data = { "timestamp": datetime.now().isoformat(), "learning_data": learning_data } data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8') self.s3_client.put_object( Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" ) print("✅ Learning data saved to R2") except Exception as e: print(f"❌ Failed to save learning data: {e}") async def load_learning_data_async(self): try: key = "learning_data.json" response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) data = json.loads(response['Body'].read()) print("✅ Learning data loaded from R2") return data except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': print("⚠️ No learning data found. Starting fresh.") return {} else: raise async def get_portfolio_state_async(self): key = "portfolio_state.json" try: response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) state = json.loads(response['Body'].read()) if hasattr(self, '_portfolio_warning_printed'): delattr(self, '_portfolio_warning_printed') print(f"💰 Portfolio state loaded: Current Capital ${state.get('current_capital_usd', 0):.2f}") return state except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': if not hasattr(self, '_portfolio_warning_printed'): print(f"⚠️ No portfolio state file found. Initializing with ${INITIAL_CAPITAL:.2f}") self._portfolio_warning_printed = True initial_state = { "current_capital_usd": INITIAL_CAPITAL, "invested_capital_usd": 0.0, "initial_capital_usd": INITIAL_CAPITAL, "total_trades": 0, "winning_trades": 0, "total_profit_usd": 0.0, "total_loss_usd": 0.0 } await self.save_portfolio_state_async(initial_state) return initial_state else: raise async def save_portfolio_state_async(self, state): key = "portfolio_state.json" try: data_json = json.dumps(state, indent=2).encode('utf-8') self.s3_client.put_object( Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" ) print(f"💾 Portfolio state saved: Current Capital ${state.get('current_capital_usd', 0):.2f}") except Exception as e: print(f"❌ Failed to save portfolio state: {e}") raise async def get_open_trades_async(self): try: response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key="open_trades.json") trades = json.loads(response['Body'].read()) if hasattr(self, '_open_trades_warning_printed'): delattr(self, '_open_trades_warning_printed') return trades except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': if not hasattr(self, '_open_trades_warning_printed'): print("⚠️ No open trades file found. Starting with an empty list.") print("💡 This is normal for first-time runs or when all trades are closed.") self._open_trades_warning_printed = True return [] else: raise async def save_open_trades_async(self, trades): try: data_json = json.dumps(trades, indent=2).encode('utf-8') self.s3_client.put_object( Bucket=BUCKET_NAME, Key="open_trades.json", Body=data_json, ContentType="application/json" ) print(f"✅ Open trades saved to R2. Total open trades: {len(trades)}") except Exception as e: print(f"❌ Failed to save open trades: {e}") raise async def load_contracts_db_async(self): key = "contracts.json" try: response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) contracts_db = json.loads(response['Body'].read()) if hasattr(self, '_contracts_warning_printed'): delattr(self, '_contracts_warning_printed') print(f"💾 Contracts database loaded from R2. Total entries: {len(contracts_db)}") return contracts_db except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': if not hasattr(self, '_contracts_warning_printed'): print("⚠️ No existing contracts database found. Initializing new one.") self._contracts_warning_printed = True return {} else: raise async def save_contracts_db_async(self, data): key = "contracts.json" try: data_json = json.dumps(data, indent=2).encode('utf-8') self.s3_client.put_object( Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" ) print(f"✅ Contracts database saved to R2 successfully. Total entries: {len(data)}") except Exception as e: print(f"❌ Failed to save contracts database to R2: {e}") raise async def get_trade_by_symbol_async(self, symbol): try: open_trades = await self.get_open_trades_async() for trade in open_trades: if trade['symbol'] == symbol and trade['status'] == 'OPEN': return trade return None except Exception as e: print(f"❌ Failed to get trade by symbol {symbol}: {e}") return None async def update_trade_monitoring_status_async(self, symbol, is_monitored): try: open_trades = await self.get_open_trades_async() updated = False for trade in open_trades: if trade['symbol'] == symbol: trade['is_monitored'] = is_monitored updated = True break if updated: await self.save_open_trades_async(open_trades) status = "ENABLED" if is_monitored else "DISABLED" print(f"✅ Real-time monitoring {status} for {symbol}") else: print(f"⚠️ Trade {symbol} not found for monitoring status update") return updated except Exception as e: print(f"❌ Failed to update monitoring status for {symbol}: {e}") return False async def get_monitored_trades_async(self): try: open_trades = await self.get_open_trades_async() monitored_trades = [trade for trade in open_trades if trade.get('is_monitored', False)] return monitored_trades except Exception as e: print(f"❌ Failed to get monitored trades: {e}") return [] # # 🔴 دالة جديدة: لحفظ سجل تدقيق التحليل # async def save_analysis_audit_log_async(self, audit_data): """حفظ سجل تدقيق دورة التحليل (يحتفظ بآخر 50 دورة)""" try: key = "analysis_audit_log.json" # 1. جلب السجل الحالي (إن وجد) try: response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) existing_log_data = json.loads(response['Body'].read()) if isinstance(existing_log_data, list): history = existing_log_data else: history = [] # بدء سجل جديد إذا كان التنسيق غير صالح except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': history = [] # ملف جديد else: raise # 2. إضافة الدورة الحالية history.append(audit_data) # 3. الحفاظ على آخر 50 سجل فقط if len(history) > 50: history = history[-50:] # 4. حفظ الملف المحدث data_json = json.dumps(history, indent=2, ensure_ascii=False).encode('utf-8') self.s3_client.put_object( Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" ) print(f"📊 تم حفظ سجل تدقيق التحليل بنجاح في R2 (إجمالي {len(history)} سجلات)") except Exception as e: print(f"❌ فشل حفظ سجل تدقيق التحليل في R2: {e}") # 🔴 --- START OF CHANGE (V10.2 - Whale Learning Storage) --- 🔴 async def _load_json_file_from_r2(self, key: str, default: Any = []) -> Any: """دالة مساعدة لتحميل ملف JSON بأمان.""" try: response = self.s3_client.get_object(Bucket=self.BUCKET_NAME, Key=key) return json.loads(response['Body'].read()) except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': print(f"ℹ️ [R2Service] لم يتم العثور على ملف '{key}'. سيتم استخدام القيمة الافتراضية.") return default else: print(f"❌ [R2Service] خطأ ClientError أثناء تحميل '{key}': {e}") raise except Exception as e: print(f"❌ [R2Service] خطأ عام أثناء تحميل '{key}': {e}") return default async def _save_json_file_to_r2(self, key: str, data: Any): """دالة مساعدة لحفظ ملف JSON بأمان.""" try: data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8') self.s3_client.put_object( Bucket=self.BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" ) except Exception as e: print(f"❌ [R2Service] فشل حفظ الملف '{key}' إلى R2: {e}") traceback.print_exc() async def save_whale_learning_record_async(self, record: Dict[str, Any]): """ (جديد V10.2) يحفظ سجلاً "معلقاً" جديداً في ملف PENDING. """ try: pending_records = await self._load_json_file_from_r2(WHALE_LEARNING_PENDING_KEY, default=[]) pending_records.append(record) await self._save_json_file_to_r2(WHALE_LEARNING_PENDING_KEY, pending_records) print(f"✅ [R2Service] تم حفظ سجل تعلم الحيتان (PENDING) لـ {record['symbol']}.") except Exception as e: print(f"❌ [R2Service] فشل في save_whale_learning_record_async: {e}") async def get_pending_whale_learning_records_async(self) -> List[Dict[str, Any]]: """ (جديد V10.2) يجلب جميع السجلات "المعلقة" من ملف PENDING. """ try: return await self._load_json_file_from_r2(WHALE_LEARNING_PENDING_KEY, default=[]) except Exception as e: print(f"❌ [R2Service] فشل في get_pending_whale_learning_records_async: {e}") return [] async def update_completed_whale_learning_record_async(self, completed_record: Dict[str, Any]): """ (جديد V10.2) 1. يحفظ السجل "المكتمل" في ملف COMPLETED. 2. يزيل السجل من ملف PENDING. """ try: record_id = completed_record.get("record_id") if not record_id: print("❌ [R2Service] لا يمكن تحديث سجل مكتمل بدون record_id.") return # 1. الحفظ في ملف المكتمل (يحتفظ بآخر 5000 سجل مكتمل) completed_records = await self._load_json_file_from_r2(WHALE_LEARNING_COMPLETED_KEY, default=[]) completed_records.append(completed_record) if len(completed_records) > 5000: completed_records = completed_records[-5000:] await self._save_json_file_to_r2(WHALE_LEARNING_COMPLETED_KEY, completed_records) # 2. الإزالة من ملف المعلق pending_records = await self._load_json_file_from_r2(WHALE_LEARNING_PENDING_KEY, default=[]) updated_pending_records = [ rec for rec in pending_records if rec.get("record_id") != record_id ] await self._save_json_file_to_r2(WHALE_LEARNING_PENDING_KEY, updated_pending_records) print(f"✅ [R2Service] تم نقل سجل تعلم الحيتان (COMPLETED) لـ {completed_record['symbol']} (ID: {record_id}).") except Exception as e: print(f"❌ [R2Service] فشل في update_completed_whale_learning_record_async: {e}") async def get_all_completed_whale_records_async(self) -> List[Dict[str, Any]]: """ (جديد V10.2) يجلب *جميع* السجلات المكتملة (لتحليل الارتباط). """ try: return await self._load_json_file_from_r2(WHALE_LEARNING_COMPLETED_KEY, default=[]) except Exception as e: print(f"❌ [R2Service] فشل في get_all_completed_whale_records_async: {e}") return [] async def save_whale_learning_config_async(self, config: Dict[str, Any]): """ (جديد V10.2) يحفظ ملف الإعدادات (الأوزان) الذي نتج عن التعلم. """ await self._save_json_file_to_r2(WHALE_LEARNING_CONFIG_KEY, config) print(f"✅ [R2Service] تم حفظ إعدادات تعلم الحيتان المثلى.") async def load_whale_learning_config_async(self) -> Dict[str, Any]: """ (جديد V10.2) يجلب ملف الإعدادات (الأوزان) الذي نتج عن التعلم. """ return await self._load_json_file_from_r2(WHALE_LEARNING_CONFIG_KEY, default={}) # 🔴 --- END OF CHANGE --- 🔴 print("✅ Enhanced R2 Service Loaded - Comprehensive Logging System with Candidates Support")