Spaces:
Running
Running
| # r2 (23).py - الإصلاحات | |
| import os | |
| import traceback | |
| import json | |
| from datetime import datetime, timedelta | |
| import asyncio | |
| import time | |
| import boto3 | |
| from botocore.exceptions import NoCredentialsError, ClientError | |
| # --- R2 Service Configuration --- | |
| R2_ACCOUNT_ID = os.getenv("R2_ACCOUNT_ID") | |
| R2_ACCESS_KEY_ID = os.getenv("R2_ACCESS_KEY_ID") | |
| R2_SECRET_ACCESS_KEY = os.getenv("R2_SECRET_ACCESS_KEY") | |
| BUCKET_NAME = "trading" | |
| INITIAL_CAPITAL = 10.0 | |
| class R2Service: | |
| def __init__(self): | |
| try: | |
| endpoint_url = f"https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com" | |
| self.s3_client = boto3.client( | |
| 's3', | |
| endpoint_url=endpoint_url, | |
| aws_access_key_id=R2_ACCESS_KEY_ID, | |
| aws_secret_access_key=R2_SECRET_ACCESS_KEY, | |
| ) | |
| self.lock_acquired = False | |
| self.BUCKET_NAME = BUCKET_NAME | |
| self._open_trades_warning_printed = False | |
| self._portfolio_warning_printed = False | |
| self._contracts_warning_printed = False | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to initialize S3 client: {e}") | |
| def acquire_lock(self, max_retries=3): | |
| """Acquires a lock file in R2 with retry logic.""" | |
| lock_path = "lock.txt" | |
| for attempt in range(max_retries): | |
| try: | |
| try: | |
| self.s3_client.head_object(Bucket=BUCKET_NAME, Key=lock_path) | |
| print(f"🔒 Lock file exists. Attempt {attempt + 1}/{max_retries}. Waiting...") | |
| time.sleep(1) | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == '404': | |
| self.s3_client.put_object(Bucket=BUCKET_NAME, Key=lock_path, Body=b'') | |
| self.lock_acquired = True | |
| print("✅ Lock acquired.") | |
| return True | |
| else: | |
| raise | |
| except Exception as e: | |
| print(f"❌ Failed to acquire lock: {e}") | |
| time.sleep(1) | |
| print(f"❌ Failed to acquire lock after {max_retries} attempts.") | |
| return False | |
| def release_lock(self): | |
| """Releases the lock file from R2.""" | |
| lock_path = "lock.txt" | |
| if self.lock_acquired: | |
| try: | |
| self.s3_client.delete_object(Bucket=BUCKET_NAME, Key=lock_path) | |
| print("✅ Lock released.") | |
| self.lock_acquired = False | |
| except Exception as e: | |
| print(f"❌ Failed to release lock: {e}") | |
| async def close_trade_async(self, trade_to_close, close_price): | |
| """Closes a trade, archives it, updates summary, and updates portfolio capital.""" | |
| try: | |
| trade_to_close['status'] = 'CLOSED' | |
| trade_to_close['close_price'] = close_price | |
| trade_to_close['close_timestamp'] = datetime.now().isoformat() | |
| trade_to_close['is_monitored'] = False | |
| entry_price = trade_to_close['entry_price'] | |
| position_size = trade_to_close['position_size_usd'] | |
| trade_type = trade_to_close.get('trade_type', 'LONG') | |
| strategy = trade_to_close.get('strategy', 'unknown') | |
| # ✅ الإصلاح: حساب PnL بشكل صحيح | |
| pnl = 0.0 | |
| pnl_percent = 0.0 | |
| if entry_price and entry_price > 0 and close_price and close_price > 0: | |
| try: | |
| if trade_type == 'LONG': | |
| pnl_percent = ((close_price - entry_price) / entry_price) * 100 | |
| pnl = position_size * (pnl_percent / 100) | |
| elif trade_type == 'SHORT': | |
| pnl_percent = ((entry_price - close_price) / entry_price) * 100 | |
| pnl = position_size * (pnl_percent / 100) | |
| print(f"💰 PnL Calculation: Entry=${entry_price:.6f}, Close=${close_price:.6f}, " | |
| f"Position=${position_size:.2f}, Type={trade_type}, " | |
| f"PnL=${pnl:.4f} ({pnl_percent:+.4f}%)") | |
| except (TypeError, ZeroDivisionError) as calc_error: | |
| print(f"⚠️ PnL calculation error: {calc_error}") | |
| pnl = 0.0 | |
| pnl_percent = 0.0 | |
| else: | |
| print(f"⚠️ Invalid prices for PnL calculation: Entry={entry_price}, Close={close_price}") | |
| trade_to_close['pnl_usd'] = pnl | |
| trade_to_close['pnl_percent'] = pnl_percent | |
| await self._archive_closed_trade_async(trade_to_close) | |
| await self._update_trade_summary_async(trade_to_close) | |
| # ✅ الإصلاح: تحديث رأس المال بشكل صحيح | |
| portfolio_state = await self.get_portfolio_state_async() | |
| current_capital = portfolio_state.get("current_capital_usd", 0) | |
| invested_capital = portfolio_state.get("invested_capital_usd", 0) | |
| # حساب رأس المال الجديد | |
| new_capital = current_capital + position_size + pnl | |
| portfolio_state["current_capital_usd"] = new_capital | |
| portfolio_state["invested_capital_usd"] = 0.0 | |
| if pnl > 0: | |
| portfolio_state["winning_trades"] = portfolio_state.get("winning_trades", 0) + 1 | |
| portfolio_state["total_profit_usd"] = portfolio_state.get("total_profit_usd", 0.0) + pnl | |
| elif pnl < 0: | |
| portfolio_state["total_loss_usd"] = portfolio_state.get("total_loss_usd", 0.0) + abs(pnl) | |
| await self.save_portfolio_state_async(portfolio_state) | |
| print(f"📈 Trade PnL: ${pnl:.4f} ({pnl_percent:+.4f}%). " | |
| f"New available capital: ${new_capital:.4f}. Strategy: {strategy}") | |
| open_trades = await self.get_open_trades_async() | |
| trades_to_keep = [t for t in open_trades if t.get('id') != trade_to_close.get('id')] | |
| await self.save_open_trades_async(trades_to_keep) | |
| print(f"✅ Trade for {trade_to_close.get('symbol')} closed and archived successfully. Strategy: {strategy}") | |
| await self.save_system_logs_async({ | |
| "trade_closed": True, | |
| "symbol": trade_to_close.get('symbol'), | |
| "entry_price": entry_price, | |
| "close_price": close_price, | |
| "pnl_usd": pnl, | |
| "pnl_percent": pnl_percent, | |
| "new_capital": new_capital, | |
| "strategy": strategy, | |
| "position_size": position_size, | |
| "trade_type": trade_type | |
| }) | |
| except Exception as e: | |
| print(f"❌ Failed to close trade: {e}") | |
| traceback.print_exc() | |
| raise | |
| # باقي الدوال تبقى كما هي بدون تغيير | |
| async def save_candidates_data_async(self, candidates_data, reanalysis_data): | |
| """حفظ بيانات المرشحين العشرة وبيانات إعادة التحليل""" | |
| try: | |
| key = "candidates_data.json" | |
| data = { | |
| "timestamp": datetime.now().isoformat(), | |
| "top_candidates": candidates_data, | |
| "reanalysis_data": reanalysis_data | |
| } | |
| data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"✅ Candidates data saved to R2: {len(candidates_data) if candidates_data else 0} candidates") | |
| except Exception as e: | |
| print(f"❌ Failed to save candidates data: {e}") | |
| async def save_llm_responses_async(self, symbol, prompt, full_response, parsed_decision): | |
| """حفظ إجابات النموذج الكاملة""" | |
| try: | |
| key = "llm_responses.json" | |
| try: | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) | |
| existing_data = json.loads(response['Body'].read()) | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| existing_data = {"responses": []} | |
| else: | |
| raise | |
| new_response = { | |
| "timestamp": datetime.now().isoformat(), | |
| "symbol": symbol, | |
| "prompt": prompt[:2000] + "..." if len(prompt) > 2000 else prompt, | |
| "full_response": full_response, | |
| "parsed_decision": parsed_decision | |
| } | |
| existing_data["responses"].append(new_response) | |
| if len(existing_data["responses"]) > 1000: | |
| existing_data["responses"] = existing_data["responses"][-1000:] | |
| data_json = json.dumps(existing_data, indent=2, ensure_ascii=False).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"✅ LLM response saved for {symbol}") | |
| except Exception as e: | |
| print(f"❌ Failed to save LLM response: {e}") | |
| async def save_system_logs_async(self, log_data): | |
| """حفظ سجلات النظام""" | |
| try: | |
| key = "system_logs.json" | |
| try: | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) | |
| existing_logs = json.loads(response['Body'].read()) | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| existing_logs = {"logs": []} | |
| else: | |
| raise | |
| log_entry = { | |
| "timestamp": datetime.now().isoformat(), | |
| **log_data | |
| } | |
| existing_logs["logs"].append(log_entry) | |
| if len(existing_logs["logs"]) > 2000: | |
| existing_logs["logs"] = existing_logs["logs"][-2000:] | |
| data_json = json.dumps(existing_logs, indent=2, ensure_ascii=False).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"✅ System log saved: {log_data.get('cycle_started', log_data.get('cycle_completed', 'event'))}") | |
| except Exception as e: | |
| print(f"❌ Failed to save system logs: {e}") | |
| async def save_learning_data_async(self, learning_data): | |
| """حفظ بيانات التعلم""" | |
| try: | |
| key = "learning_data.json" | |
| data = { | |
| "timestamp": datetime.now().isoformat(), | |
| "learning_data": learning_data | |
| } | |
| data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print("✅ Learning data saved to R2") | |
| except Exception as e: | |
| print(f"❌ Failed to save learning data: {e}") | |
| async def load_learning_data_async(self): | |
| """تحميل بيانات التعلم""" | |
| try: | |
| key = "learning_data.json" | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) | |
| data = json.loads(response['Body'].read()) | |
| print("✅ Learning data loaded from R2") | |
| return data | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| print("⚠️ No learning data found. Starting fresh.") | |
| return {} | |
| else: | |
| raise | |
| async def get_portfolio_state_async(self): | |
| """Fetches the current portfolio state from R2, or initializes it.""" | |
| key = "portfolio_state.json" | |
| try: | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) | |
| state = json.loads(response['Body'].read()) | |
| if hasattr(self, '_portfolio_warning_printed'): | |
| delattr(self, '_portfolio_warning_printed') | |
| print(f"💰 Portfolio state loaded: Current Capital ${state.get('current_capital_usd', 0):.2f}") | |
| return state | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| if not hasattr(self, '_portfolio_warning_printed'): | |
| print(f"⚠️ No portfolio state file found. Initializing with ${INITIAL_CAPITAL:.2f}") | |
| self._portfolio_warning_printed = True | |
| initial_state = { | |
| "current_capital_usd": INITIAL_CAPITAL, | |
| "invested_capital_usd": 0.0, | |
| "initial_capital_usd": INITIAL_CAPITAL, | |
| "total_trades": 0, | |
| "winning_trades": 0, | |
| "total_profit_usd": 0.0, | |
| "total_loss_usd": 0.0 | |
| } | |
| await self.save_portfolio_state_async(initial_state) | |
| return initial_state | |
| else: | |
| raise | |
| async def save_portfolio_state_async(self, state): | |
| """Saves the portfolio state to R2.""" | |
| key = "portfolio_state.json" | |
| try: | |
| data_json = json.dumps(state, indent=2).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"💾 Portfolio state saved: Current Capital ${state.get('current_capital_usd', 0):.2f}") | |
| except Exception as e: | |
| print(f"❌ Failed to save portfolio state: {e}") | |
| raise | |
| async def get_open_trades_async(self): | |
| """Fetches all open trades from R2 with reduced noise.""" | |
| try: | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key="open_trades.json") | |
| trades = json.loads(response['Body'].read()) | |
| if hasattr(self, '_open_trades_warning_printed'): | |
| delattr(self, '_open_trades_warning_printed') | |
| return trades | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| if not hasattr(self, '_open_trades_warning_printed'): | |
| print("⚠️ No open trades file found. Starting with an empty list.") | |
| print("💡 This is normal for first-time runs or when all trades are closed.") | |
| self._open_trades_warning_printed = True | |
| return [] | |
| else: | |
| raise | |
| async def save_open_trades_async(self, trades): | |
| """Saves the list of open trades to R2.""" | |
| try: | |
| data_json = json.dumps(trades, indent=2).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key="open_trades.json", Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"✅ Open trades saved to R2. Total open trades: {len(trades)}") | |
| except Exception as e: | |
| print(f"❌ Failed to save open trades: {e}") | |
| raise | |
| async def save_new_trade_async(self, symbol, decision, current_price): | |
| """Creates a new trade using the full available capital and saves it.""" | |
| try: | |
| portfolio_state = await self.get_portfolio_state_async() | |
| available_capital = portfolio_state.get("current_capital_usd", 0) | |
| if available_capital < 1: | |
| print(f"❌ Insufficient capital (${available_capital:.2f}) to open a new trade.") | |
| return | |
| expected_target_minutes = decision.get('expected_target_minutes', 15) | |
| expected_target_minutes = max(5, min(expected_target_minutes, 45)) | |
| expected_target_time = (datetime.now() + timedelta(minutes=expected_target_minutes)).isoformat() | |
| # ✅ الإصلاح النهائي: التأكد من وجود استراتيجية صالحة | |
| strategy = decision.get('strategy') | |
| if not strategy or strategy == 'unknown': | |
| strategy = 'GENERIC' | |
| print(f"⚠️ Strategy was missing or unknown. Setting to GENERIC for {symbol}") | |
| trades = await self.get_open_trades_async() | |
| new_trade = { | |
| "id": str(int(time.time())), | |
| "symbol": symbol, | |
| "entry_price": current_price, | |
| "entry_timestamp": datetime.now().isoformat(), | |
| "decision_data": decision, | |
| "status": "OPEN", | |
| "stop_loss": decision.get("stop_loss"), | |
| "take_profit": decision.get("take_profit"), | |
| "trade_type": decision.get("trade_type"), | |
| "position_size_usd": available_capital, | |
| "expected_target_minutes": expected_target_minutes, | |
| "expected_target_time": expected_target_time, | |
| "is_monitored": True, | |
| "strategy": strategy # ✅ استخدام الاستراتيجية المؤكدة | |
| } | |
| trades.append(new_trade) | |
| await self.save_open_trades_async(trades) | |
| original_expected = decision.get('expected_target_minutes', 15) | |
| if original_expected > 45: | |
| print(f"⚠️ LLM wanted {original_expected} minutes, CAPPED to 45 minutes for strategy consistency") | |
| print(f"✅ New trade for {symbol} saved with position size ${available_capital:.2f}. Strategy: {strategy}. Expected results in {expected_target_minutes} minutes.") | |
| portfolio_state["invested_capital_usd"] = available_capital | |
| portfolio_state["current_capital_usd"] = 0.0 | |
| portfolio_state["total_trades"] = portfolio_state.get("total_trades", 0) + 1 | |
| await self.save_portfolio_state_async(portfolio_state) | |
| await self.save_system_logs_async({ | |
| "new_trade_opened": True, | |
| "symbol": symbol, | |
| "position_size": available_capital, | |
| "expected_minutes": expected_target_minutes, | |
| "trade_type": decision.get("trade_type", "LONG"), | |
| "strategy": strategy # ✅ استخدام الاستراتيجية المؤكدة | |
| }) | |
| except Exception as e: | |
| print(f"❌ Failed to save new trade: {e}") | |
| raise | |
| async def update_trade_async(self, trade_to_update, re_analysis_decision): | |
| """Updates an existing trade with new parameters from re-analysis.""" | |
| try: | |
| if re_analysis_decision.get('new_stop_loss'): | |
| trade_to_update['stop_loss'] = re_analysis_decision['new_stop_loss'] | |
| if re_analysis_decision.get('new_take_profit'): | |
| trade_to_update['take_profit'] = re_analysis_decision['new_take_profit'] | |
| new_expected_minutes = re_analysis_decision.get('new_expected_minutes') | |
| if new_expected_minutes: | |
| new_expected_minutes = max(5, min(new_expected_minutes, 45)) | |
| trade_to_update['expected_target_minutes'] = new_expected_minutes | |
| trade_to_update['expected_target_time'] = (datetime.now() + timedelta(minutes=new_expected_minutes)).isoformat() | |
| print(f"⏰ Trade time expectation updated to {new_expected_minutes} minutes.") | |
| # ✅ الإصلاح: الحفاظ على الاستراتيجية الأصلية | |
| original_strategy = trade_to_update.get('strategy') | |
| if not original_strategy or original_strategy == 'unknown': | |
| original_strategy = re_analysis_decision.get('strategy', 'GENERIC') | |
| trade_to_update['strategy'] = original_strategy | |
| trade_to_update['decision_data'] = re_analysis_decision | |
| trade_to_update['is_monitored'] = True | |
| open_trades = await self.get_open_trades_async() | |
| for i, trade in enumerate(open_trades): | |
| if trade.get('id') == trade_to_update.get('id'): | |
| open_trades[i] = trade_to_update | |
| break | |
| await self.save_open_trades_async(open_trades) | |
| print(f"✅ Trade for {trade_to_update.get('symbol')} updated successfully. Strategy: {original_strategy}") | |
| await self.save_system_logs_async({ | |
| "trade_updated": True, | |
| "symbol": trade_to_update.get('symbol'), | |
| "new_expected_minutes": new_expected_minutes, | |
| "action": "UPDATE_TRADE", | |
| "strategy": original_strategy | |
| }) | |
| except Exception as e: | |
| print(f"❌ Failed to update trade: {e}") | |
| raise | |
| async def immediate_close_trade_async(self, symbol, close_price, reason="Real-time monitoring"): | |
| """Closes a trade immediately without full re-analysis.""" | |
| try: | |
| open_trades = await self.get_open_trades_async() | |
| trade_to_close = None | |
| for trade in open_trades: | |
| if trade['symbol'] == symbol and trade['status'] == 'OPEN': | |
| trade_to_close = trade | |
| break | |
| if not trade_to_close: | |
| print(f"❌ No open trade found for {symbol}") | |
| return False | |
| await self.close_trade_async(trade_to_close, close_price) | |
| print(f"🚨 IMMEDIATE CLOSE: {symbol} at {close_price} - {reason}") | |
| await self.save_system_logs_async({ | |
| "immediate_close": True, | |
| "symbol": symbol, | |
| "close_price": close_price, | |
| "reason": reason, | |
| "strategy": trade_to_close.get('strategy', 'unknown') | |
| }) | |
| return True | |
| except Exception as e: | |
| print(f"❌ Failed to immediately close trade {symbol}: {e}") | |
| return False | |
| async def _archive_closed_trade_async(self, closed_trade): | |
| """Appends a closed trade to the history file.""" | |
| key = "closed_trades_history.json" | |
| try: | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) | |
| history = json.loads(response['Body'].read()) | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| history = [] | |
| else: | |
| raise | |
| history.append(closed_trade) | |
| data_json = json.dumps(history, indent=2).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"📚 Trade archived. Total archived trades: {len(history)}") | |
| async def _update_trade_summary_async(self, closed_trade): | |
| """Updates the trade summary statistics file.""" | |
| key = "trade_summary.json" | |
| try: | |
| try: | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) | |
| summary = json.loads(response['Body'].read()) | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| summary = { | |
| "total_trades": 0, "winning_trades": 0, "losing_trades": 0, | |
| "total_profit_usd": 0.0, "total_loss_usd": 0.0, "win_percentage": 0.0, | |
| "avg_profit_per_trade": 0.0, "avg_loss_per_trade": 0.0, | |
| "largest_win": 0.0, "largest_loss": 0.0 | |
| } | |
| else: | |
| raise | |
| pnl = closed_trade.get('pnl_usd', 0.0) | |
| summary['total_trades'] += 1 | |
| if pnl >= 0: | |
| summary['winning_trades'] += 1 | |
| summary['total_profit_usd'] += pnl | |
| if pnl > summary.get('largest_win', 0): | |
| summary['largest_win'] = pnl | |
| else: | |
| summary['losing_trades'] += 1 | |
| summary['total_loss_usd'] += abs(pnl) | |
| if abs(pnl) > summary.get('largest_loss', 0): | |
| summary['largest_loss'] = abs(pnl) | |
| if summary['total_trades'] > 0: | |
| summary['win_percentage'] = (summary['winning_trades'] / summary['total_trades']) * 100 | |
| if summary['winning_trades'] > 0: | |
| summary['avg_profit_per_trade'] = summary['total_profit_usd'] / summary['winning_trades'] | |
| if summary['losing_trades'] > 0: | |
| summary['avg_loss_per_trade'] = summary['total_loss_usd'] / summary['losing_trades'] | |
| data_json = json.dumps(summary, indent=2).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"📊 Trade summary updated. Win Rate: {summary['win_percentage']:.2f}%") | |
| except Exception as e: | |
| print(f"❌ Failed to update trade summary: {e}") | |
| raise | |
| async def load_contracts_db_async(self): | |
| """Loads the contracts database from R2 with reduced noise.""" | |
| key = "contracts.json" | |
| try: | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) | |
| contracts_db = json.loads(response['Body'].read()) | |
| if hasattr(self, '_contracts_warning_printed'): | |
| delattr(self, '_contracts_warning_printed') | |
| print(f"💾 Contracts database loaded from R2. Total entries: {len(contracts_db)}") | |
| return contracts_db | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| if not hasattr(self, '_contracts_warning_printed'): | |
| print("⚠️ No existing contracts database found. Initializing new one.") | |
| self._contracts_warning_printed = True | |
| return {} | |
| else: | |
| raise | |
| async def save_contracts_db_async(self, data): | |
| """Saves the contracts database to R2.""" | |
| key = "contracts.json" | |
| try: | |
| data_json = json.dumps(data, indent=2).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"✅ Contracts database saved to R2 successfully. Total entries: {len(data)}") | |
| except Exception as e: | |
| print(f"❌ Failed to save contracts database to R2: {e}") | |
| raise | |
| async def get_trade_by_symbol_async(self, symbol): | |
| """Fetches a specific trade by symbol.""" | |
| try: | |
| open_trades = await self.get_open_trades_async() | |
| for trade in open_trades: | |
| if trade['symbol'] == symbol and trade['status'] == 'OPEN': | |
| return trade | |
| return None | |
| except Exception as e: | |
| print(f"❌ Failed to get trade by symbol {symbol}: {e}") | |
| return None | |
| async def update_trade_monitoring_status_async(self, symbol, is_monitored): | |
| """Updates the monitoring status of a trade.""" | |
| try: | |
| open_trades = await self.get_open_trades_async() | |
| updated = False | |
| for trade in open_trades: | |
| if trade['symbol'] == symbol: | |
| trade['is_monitored'] = is_monitored | |
| updated = True | |
| break | |
| if updated: | |
| await self.save_open_trades_async(open_trades) | |
| status = "ENABLED" if is_monitored else "DISABLED" | |
| print(f"✅ Real-time monitoring {status} for {symbol}") | |
| else: | |
| print(f"⚠️ Trade {symbol} not found for monitoring status update") | |
| return updated | |
| except Exception as e: | |
| print(f"❌ Failed to update monitoring status for {symbol}: {e}") | |
| return False | |
| async def get_monitored_trades_async(self): | |
| """Fetches all trades that are currently being monitored.""" | |
| try: | |
| open_trades = await self.get_open_trades_async() | |
| monitored_trades = [trade for trade in open_trades if trade.get('is_monitored', False)] | |
| return monitored_trades | |
| except Exception as e: | |
| print(f"❌ Failed to get monitored trades: {e}") | |
| return [] | |
| print("✅ Enhanced R2 Service Loaded - Comprehensive Logging System with Learning Support") |