Trad / r2.py
Riy777's picture
Rename r2 (29).py to r2.py
45d0144
raw
history blame
29.3 kB
# r2 (23).py - الإصلاحات
import os
import traceback
import json
from datetime import datetime, timedelta
import asyncio
import time
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
# --- R2 Service Configuration ---
R2_ACCOUNT_ID = os.getenv("R2_ACCOUNT_ID")
R2_ACCESS_KEY_ID = os.getenv("R2_ACCESS_KEY_ID")
R2_SECRET_ACCESS_KEY = os.getenv("R2_SECRET_ACCESS_KEY")
BUCKET_NAME = "trading"
INITIAL_CAPITAL = 10.0
class R2Service:
def __init__(self):
try:
endpoint_url = f"https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com"
self.s3_client = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=R2_ACCESS_KEY_ID,
aws_secret_access_key=R2_SECRET_ACCESS_KEY,
)
self.lock_acquired = False
self.BUCKET_NAME = BUCKET_NAME
self._open_trades_warning_printed = False
self._portfolio_warning_printed = False
self._contracts_warning_printed = False
except Exception as e:
raise RuntimeError(f"Failed to initialize S3 client: {e}")
def acquire_lock(self, max_retries=3):
"""Acquires a lock file in R2 with retry logic."""
lock_path = "lock.txt"
for attempt in range(max_retries):
try:
try:
self.s3_client.head_object(Bucket=BUCKET_NAME, Key=lock_path)
print(f"🔒 Lock file exists. Attempt {attempt + 1}/{max_retries}. Waiting...")
time.sleep(1)
except ClientError as e:
if e.response['Error']['Code'] == '404':
self.s3_client.put_object(Bucket=BUCKET_NAME, Key=lock_path, Body=b'')
self.lock_acquired = True
print("✅ Lock acquired.")
return True
else:
raise
except Exception as e:
print(f"❌ Failed to acquire lock: {e}")
time.sleep(1)
print(f"❌ Failed to acquire lock after {max_retries} attempts.")
return False
def release_lock(self):
"""Releases the lock file from R2."""
lock_path = "lock.txt"
if self.lock_acquired:
try:
self.s3_client.delete_object(Bucket=BUCKET_NAME, Key=lock_path)
print("✅ Lock released.")
self.lock_acquired = False
except Exception as e:
print(f"❌ Failed to release lock: {e}")
async def close_trade_async(self, trade_to_close, close_price):
"""Closes a trade, archives it, updates summary, and updates portfolio capital."""
try:
trade_to_close['status'] = 'CLOSED'
trade_to_close['close_price'] = close_price
trade_to_close['close_timestamp'] = datetime.now().isoformat()
trade_to_close['is_monitored'] = False
entry_price = trade_to_close['entry_price']
position_size = trade_to_close['position_size_usd']
trade_type = trade_to_close.get('trade_type', 'LONG')
strategy = trade_to_close.get('strategy', 'unknown')
# ✅ الإصلاح: حساب PnL بشكل صحيح
pnl = 0.0
pnl_percent = 0.0
if entry_price and entry_price > 0 and close_price and close_price > 0:
try:
if trade_type == 'LONG':
pnl_percent = ((close_price - entry_price) / entry_price) * 100
pnl = position_size * (pnl_percent / 100)
elif trade_type == 'SHORT':
pnl_percent = ((entry_price - close_price) / entry_price) * 100
pnl = position_size * (pnl_percent / 100)
print(f"💰 PnL Calculation: Entry=${entry_price:.6f}, Close=${close_price:.6f}, "
f"Position=${position_size:.2f}, Type={trade_type}, "
f"PnL=${pnl:.4f} ({pnl_percent:+.4f}%)")
except (TypeError, ZeroDivisionError) as calc_error:
print(f"⚠️ PnL calculation error: {calc_error}")
pnl = 0.0
pnl_percent = 0.0
else:
print(f"⚠️ Invalid prices for PnL calculation: Entry={entry_price}, Close={close_price}")
trade_to_close['pnl_usd'] = pnl
trade_to_close['pnl_percent'] = pnl_percent
await self._archive_closed_trade_async(trade_to_close)
await self._update_trade_summary_async(trade_to_close)
# ✅ الإصلاح: تحديث رأس المال بشكل صحيح
portfolio_state = await self.get_portfolio_state_async()
current_capital = portfolio_state.get("current_capital_usd", 0)
invested_capital = portfolio_state.get("invested_capital_usd", 0)
# حساب رأس المال الجديد
new_capital = current_capital + position_size + pnl
portfolio_state["current_capital_usd"] = new_capital
portfolio_state["invested_capital_usd"] = 0.0
if pnl > 0:
portfolio_state["winning_trades"] = portfolio_state.get("winning_trades", 0) + 1
portfolio_state["total_profit_usd"] = portfolio_state.get("total_profit_usd", 0.0) + pnl
elif pnl < 0:
portfolio_state["total_loss_usd"] = portfolio_state.get("total_loss_usd", 0.0) + abs(pnl)
await self.save_portfolio_state_async(portfolio_state)
print(f"📈 Trade PnL: ${pnl:.4f} ({pnl_percent:+.4f}%). "
f"New available capital: ${new_capital:.4f}. Strategy: {strategy}")
open_trades = await self.get_open_trades_async()
trades_to_keep = [t for t in open_trades if t.get('id') != trade_to_close.get('id')]
await self.save_open_trades_async(trades_to_keep)
print(f"✅ Trade for {trade_to_close.get('symbol')} closed and archived successfully. Strategy: {strategy}")
await self.save_system_logs_async({
"trade_closed": True,
"symbol": trade_to_close.get('symbol'),
"entry_price": entry_price,
"close_price": close_price,
"pnl_usd": pnl,
"pnl_percent": pnl_percent,
"new_capital": new_capital,
"strategy": strategy,
"position_size": position_size,
"trade_type": trade_type
})
except Exception as e:
print(f"❌ Failed to close trade: {e}")
traceback.print_exc()
raise
# باقي الدوال تبقى كما هي بدون تغيير
async def save_candidates_data_async(self, candidates_data, reanalysis_data):
"""حفظ بيانات المرشحين العشرة وبيانات إعادة التحليل"""
try:
key = "candidates_data.json"
data = {
"timestamp": datetime.now().isoformat(),
"top_candidates": candidates_data,
"reanalysis_data": reanalysis_data
}
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"✅ Candidates data saved to R2: {len(candidates_data) if candidates_data else 0} candidates")
except Exception as e:
print(f"❌ Failed to save candidates data: {e}")
async def save_llm_responses_async(self, symbol, prompt, full_response, parsed_decision):
"""حفظ إجابات النموذج الكاملة"""
try:
key = "llm_responses.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
existing_data = json.loads(response['Body'].read())
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
existing_data = {"responses": []}
else:
raise
new_response = {
"timestamp": datetime.now().isoformat(),
"symbol": symbol,
"prompt": prompt[:2000] + "..." if len(prompt) > 2000 else prompt,
"full_response": full_response,
"parsed_decision": parsed_decision
}
existing_data["responses"].append(new_response)
if len(existing_data["responses"]) > 1000:
existing_data["responses"] = existing_data["responses"][-1000:]
data_json = json.dumps(existing_data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"✅ LLM response saved for {symbol}")
except Exception as e:
print(f"❌ Failed to save LLM response: {e}")
async def save_system_logs_async(self, log_data):
"""حفظ سجلات النظام"""
try:
key = "system_logs.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
existing_logs = json.loads(response['Body'].read())
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
existing_logs = {"logs": []}
else:
raise
log_entry = {
"timestamp": datetime.now().isoformat(),
**log_data
}
existing_logs["logs"].append(log_entry)
if len(existing_logs["logs"]) > 2000:
existing_logs["logs"] = existing_logs["logs"][-2000:]
data_json = json.dumps(existing_logs, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"✅ System log saved: {log_data.get('cycle_started', log_data.get('cycle_completed', 'event'))}")
except Exception as e:
print(f"❌ Failed to save system logs: {e}")
async def save_learning_data_async(self, learning_data):
"""حفظ بيانات التعلم"""
try:
key = "learning_data.json"
data = {
"timestamp": datetime.now().isoformat(),
"learning_data": learning_data
}
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print("✅ Learning data saved to R2")
except Exception as e:
print(f"❌ Failed to save learning data: {e}")
async def load_learning_data_async(self):
"""تحميل بيانات التعلم"""
try:
key = "learning_data.json"
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
data = json.loads(response['Body'].read())
print("✅ Learning data loaded from R2")
return data
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
print("⚠️ No learning data found. Starting fresh.")
return {}
else:
raise
async def get_portfolio_state_async(self):
"""Fetches the current portfolio state from R2, or initializes it."""
key = "portfolio_state.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
state = json.loads(response['Body'].read())
if hasattr(self, '_portfolio_warning_printed'):
delattr(self, '_portfolio_warning_printed')
print(f"💰 Portfolio state loaded: Current Capital ${state.get('current_capital_usd', 0):.2f}")
return state
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if not hasattr(self, '_portfolio_warning_printed'):
print(f"⚠️ No portfolio state file found. Initializing with ${INITIAL_CAPITAL:.2f}")
self._portfolio_warning_printed = True
initial_state = {
"current_capital_usd": INITIAL_CAPITAL,
"invested_capital_usd": 0.0,
"initial_capital_usd": INITIAL_CAPITAL,
"total_trades": 0,
"winning_trades": 0,
"total_profit_usd": 0.0,
"total_loss_usd": 0.0
}
await self.save_portfolio_state_async(initial_state)
return initial_state
else:
raise
async def save_portfolio_state_async(self, state):
"""Saves the portfolio state to R2."""
key = "portfolio_state.json"
try:
data_json = json.dumps(state, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"💾 Portfolio state saved: Current Capital ${state.get('current_capital_usd', 0):.2f}")
except Exception as e:
print(f"❌ Failed to save portfolio state: {e}")
raise
async def get_open_trades_async(self):
"""Fetches all open trades from R2 with reduced noise."""
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key="open_trades.json")
trades = json.loads(response['Body'].read())
if hasattr(self, '_open_trades_warning_printed'):
delattr(self, '_open_trades_warning_printed')
return trades
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if not hasattr(self, '_open_trades_warning_printed'):
print("⚠️ No open trades file found. Starting with an empty list.")
print("💡 This is normal for first-time runs or when all trades are closed.")
self._open_trades_warning_printed = True
return []
else:
raise
async def save_open_trades_async(self, trades):
"""Saves the list of open trades to R2."""
try:
data_json = json.dumps(trades, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key="open_trades.json", Body=data_json, ContentType="application/json"
)
print(f"✅ Open trades saved to R2. Total open trades: {len(trades)}")
except Exception as e:
print(f"❌ Failed to save open trades: {e}")
raise
async def save_new_trade_async(self, symbol, decision, current_price):
"""Creates a new trade using the full available capital and saves it."""
try:
portfolio_state = await self.get_portfolio_state_async()
available_capital = portfolio_state.get("current_capital_usd", 0)
if available_capital < 1:
print(f"❌ Insufficient capital (${available_capital:.2f}) to open a new trade.")
return
expected_target_minutes = decision.get('expected_target_minutes', 15)
expected_target_minutes = max(5, min(expected_target_minutes, 45))
expected_target_time = (datetime.now() + timedelta(minutes=expected_target_minutes)).isoformat()
# ✅ الإصلاح النهائي: التأكد من وجود استراتيجية صالحة
strategy = decision.get('strategy')
if not strategy or strategy == 'unknown':
strategy = 'GENERIC'
print(f"⚠️ Strategy was missing or unknown. Setting to GENERIC for {symbol}")
trades = await self.get_open_trades_async()
new_trade = {
"id": str(int(time.time())),
"symbol": symbol,
"entry_price": current_price,
"entry_timestamp": datetime.now().isoformat(),
"decision_data": decision,
"status": "OPEN",
"stop_loss": decision.get("stop_loss"),
"take_profit": decision.get("take_profit"),
"trade_type": decision.get("trade_type"),
"position_size_usd": available_capital,
"expected_target_minutes": expected_target_minutes,
"expected_target_time": expected_target_time,
"is_monitored": True,
"strategy": strategy # ✅ استخدام الاستراتيجية المؤكدة
}
trades.append(new_trade)
await self.save_open_trades_async(trades)
original_expected = decision.get('expected_target_minutes', 15)
if original_expected > 45:
print(f"⚠️ LLM wanted {original_expected} minutes, CAPPED to 45 minutes for strategy consistency")
print(f"✅ New trade for {symbol} saved with position size ${available_capital:.2f}. Strategy: {strategy}. Expected results in {expected_target_minutes} minutes.")
portfolio_state["invested_capital_usd"] = available_capital
portfolio_state["current_capital_usd"] = 0.0
portfolio_state["total_trades"] = portfolio_state.get("total_trades", 0) + 1
await self.save_portfolio_state_async(portfolio_state)
await self.save_system_logs_async({
"new_trade_opened": True,
"symbol": symbol,
"position_size": available_capital,
"expected_minutes": expected_target_minutes,
"trade_type": decision.get("trade_type", "LONG"),
"strategy": strategy # ✅ استخدام الاستراتيجية المؤكدة
})
except Exception as e:
print(f"❌ Failed to save new trade: {e}")
raise
async def update_trade_async(self, trade_to_update, re_analysis_decision):
"""Updates an existing trade with new parameters from re-analysis."""
try:
if re_analysis_decision.get('new_stop_loss'):
trade_to_update['stop_loss'] = re_analysis_decision['new_stop_loss']
if re_analysis_decision.get('new_take_profit'):
trade_to_update['take_profit'] = re_analysis_decision['new_take_profit']
new_expected_minutes = re_analysis_decision.get('new_expected_minutes')
if new_expected_minutes:
new_expected_minutes = max(5, min(new_expected_minutes, 45))
trade_to_update['expected_target_minutes'] = new_expected_minutes
trade_to_update['expected_target_time'] = (datetime.now() + timedelta(minutes=new_expected_minutes)).isoformat()
print(f"⏰ Trade time expectation updated to {new_expected_minutes} minutes.")
# ✅ الإصلاح: الحفاظ على الاستراتيجية الأصلية
original_strategy = trade_to_update.get('strategy')
if not original_strategy or original_strategy == 'unknown':
original_strategy = re_analysis_decision.get('strategy', 'GENERIC')
trade_to_update['strategy'] = original_strategy
trade_to_update['decision_data'] = re_analysis_decision
trade_to_update['is_monitored'] = True
open_trades = await self.get_open_trades_async()
for i, trade in enumerate(open_trades):
if trade.get('id') == trade_to_update.get('id'):
open_trades[i] = trade_to_update
break
await self.save_open_trades_async(open_trades)
print(f"✅ Trade for {trade_to_update.get('symbol')} updated successfully. Strategy: {original_strategy}")
await self.save_system_logs_async({
"trade_updated": True,
"symbol": trade_to_update.get('symbol'),
"new_expected_minutes": new_expected_minutes,
"action": "UPDATE_TRADE",
"strategy": original_strategy
})
except Exception as e:
print(f"❌ Failed to update trade: {e}")
raise
async def immediate_close_trade_async(self, symbol, close_price, reason="Real-time monitoring"):
"""Closes a trade immediately without full re-analysis."""
try:
open_trades = await self.get_open_trades_async()
trade_to_close = None
for trade in open_trades:
if trade['symbol'] == symbol and trade['status'] == 'OPEN':
trade_to_close = trade
break
if not trade_to_close:
print(f"❌ No open trade found for {symbol}")
return False
await self.close_trade_async(trade_to_close, close_price)
print(f"🚨 IMMEDIATE CLOSE: {symbol} at {close_price} - {reason}")
await self.save_system_logs_async({
"immediate_close": True,
"symbol": symbol,
"close_price": close_price,
"reason": reason,
"strategy": trade_to_close.get('strategy', 'unknown')
})
return True
except Exception as e:
print(f"❌ Failed to immediately close trade {symbol}: {e}")
return False
async def _archive_closed_trade_async(self, closed_trade):
"""Appends a closed trade to the history file."""
key = "closed_trades_history.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
history = json.loads(response['Body'].read())
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
history = []
else:
raise
history.append(closed_trade)
data_json = json.dumps(history, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"📚 Trade archived. Total archived trades: {len(history)}")
async def _update_trade_summary_async(self, closed_trade):
"""Updates the trade summary statistics file."""
key = "trade_summary.json"
try:
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
summary = json.loads(response['Body'].read())
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
summary = {
"total_trades": 0, "winning_trades": 0, "losing_trades": 0,
"total_profit_usd": 0.0, "total_loss_usd": 0.0, "win_percentage": 0.0,
"avg_profit_per_trade": 0.0, "avg_loss_per_trade": 0.0,
"largest_win": 0.0, "largest_loss": 0.0
}
else:
raise
pnl = closed_trade.get('pnl_usd', 0.0)
summary['total_trades'] += 1
if pnl >= 0:
summary['winning_trades'] += 1
summary['total_profit_usd'] += pnl
if pnl > summary.get('largest_win', 0):
summary['largest_win'] = pnl
else:
summary['losing_trades'] += 1
summary['total_loss_usd'] += abs(pnl)
if abs(pnl) > summary.get('largest_loss', 0):
summary['largest_loss'] = abs(pnl)
if summary['total_trades'] > 0:
summary['win_percentage'] = (summary['winning_trades'] / summary['total_trades']) * 100
if summary['winning_trades'] > 0:
summary['avg_profit_per_trade'] = summary['total_profit_usd'] / summary['winning_trades']
if summary['losing_trades'] > 0:
summary['avg_loss_per_trade'] = summary['total_loss_usd'] / summary['losing_trades']
data_json = json.dumps(summary, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"📊 Trade summary updated. Win Rate: {summary['win_percentage']:.2f}%")
except Exception as e:
print(f"❌ Failed to update trade summary: {e}")
raise
async def load_contracts_db_async(self):
"""Loads the contracts database from R2 with reduced noise."""
key = "contracts.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
contracts_db = json.loads(response['Body'].read())
if hasattr(self, '_contracts_warning_printed'):
delattr(self, '_contracts_warning_printed')
print(f"💾 Contracts database loaded from R2. Total entries: {len(contracts_db)}")
return contracts_db
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if not hasattr(self, '_contracts_warning_printed'):
print("⚠️ No existing contracts database found. Initializing new one.")
self._contracts_warning_printed = True
return {}
else:
raise
async def save_contracts_db_async(self, data):
"""Saves the contracts database to R2."""
key = "contracts.json"
try:
data_json = json.dumps(data, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"✅ Contracts database saved to R2 successfully. Total entries: {len(data)}")
except Exception as e:
print(f"❌ Failed to save contracts database to R2: {e}")
raise
async def get_trade_by_symbol_async(self, symbol):
"""Fetches a specific trade by symbol."""
try:
open_trades = await self.get_open_trades_async()
for trade in open_trades:
if trade['symbol'] == symbol and trade['status'] == 'OPEN':
return trade
return None
except Exception as e:
print(f"❌ Failed to get trade by symbol {symbol}: {e}")
return None
async def update_trade_monitoring_status_async(self, symbol, is_monitored):
"""Updates the monitoring status of a trade."""
try:
open_trades = await self.get_open_trades_async()
updated = False
for trade in open_trades:
if trade['symbol'] == symbol:
trade['is_monitored'] = is_monitored
updated = True
break
if updated:
await self.save_open_trades_async(open_trades)
status = "ENABLED" if is_monitored else "DISABLED"
print(f"✅ Real-time monitoring {status} for {symbol}")
else:
print(f"⚠️ Trade {symbol} not found for monitoring status update")
return updated
except Exception as e:
print(f"❌ Failed to update monitoring status for {symbol}: {e}")
return False
async def get_monitored_trades_async(self):
"""Fetches all trades that are currently being monitored."""
try:
open_trades = await self.get_open_trades_async()
monitored_trades = [trade for trade in open_trades if trade.get('is_monitored', False)]
return monitored_trades
except Exception as e:
print(f"❌ Failed to get monitored trades: {e}")
return []
print("✅ Enhanced R2 Service Loaded - Comprehensive Logging System with Learning Support")