Trad / r2.py
Riy777's picture
Update r2.py
480cde2 verified
# r2.py (ู…ุญุฏุซ V10.3 - Typing Fix)
import os, traceback, json, time
from datetime import datetime, timedelta
import asyncio
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
from typing import List, Dict, Any, Optional # <-- ๐Ÿ”ด ุงู„ุณุทุฑ ุงู„ู…ุถุงู ู„ุฅุตู„ุงุญ ุงู„ุฎุทุฃ
R2_ACCOUNT_ID = os.getenv("R2_ACCOUNT_ID")
R2_ACCESS_KEY_ID = os.getenv("R2_ACCESS_KEY_ID")
R2_SECRET_ACCESS_KEY = os.getenv("R2_SECRET_ACCESS_KEY")
BUCKET_NAME = "trading"
INITIAL_CAPITAL = 10.0
# ๐Ÿ”ด --- (ุฌุฏูŠุฏ V10.2) ุฃุณู…ุงุก ู…ู„ูุงุช ุงู„ุชุนู„ู… --- ๐Ÿ”ด
WHALE_LEARNING_PENDING_KEY = "learning_whale_pending_records.json"
WHALE_LEARNING_COMPLETED_KEY = "learning_whale_completed_records.json"
WHALE_LEARNING_CONFIG_KEY = "learning_whale_optimal_config.json"
class R2Service:
def __init__(self):
try:
endpoint_url = f"https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com"
self.s3_client = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=R2_ACCESS_KEY_ID,
aws_secret_access_key=R2_SECRET_ACCESS_KEY,
)
self.lock_acquired = False
self.BUCKET_NAME = BUCKET_NAME
self._open_trades_warning_printed = False
self._portfolio_warning_printed = False
self._contracts_warning_printed = False
except Exception as e:
raise RuntimeError(f"Failed to initialize S3 client: {e}")
def acquire_lock(self, max_retries=3):
lock_path = "lock.txt"
for attempt in range(max_retries):
try:
try:
self.s3_client.head_object(Bucket=BUCKET_NAME, Key=lock_path)
print(f"๐Ÿ”’ Lock file exists. Attempt {attempt + 1}/{max_retries}. Waiting...")
time.sleep(1)
except ClientError as e:
if e.response['Error']['Code'] == '404':
self.s3_client.put_object(Bucket=BUCKET_NAME, Key=lock_path, Body=b'')
self.lock_acquired = True
print("โœ… Lock acquired.")
return True
else:
raise
except Exception as e:
print(f"โŒ Failed to acquire lock: {e}")
time.sleep(1)
print(f"โŒ Failed to acquire lock after {max_retries} attempts.")
return False
def release_lock(self):
lock_path = "lock.txt"
if self.lock_acquired:
try:
self.s3_client.delete_object(Bucket=BUCKET_NAME, Key=lock_path)
print("โœ… Lock released.")
self.lock_acquired = False
except Exception as e:
print(f"โŒ Failed to release lock: {e}")
async def save_candidates_async(self, candidates):
"""ุญูุธ ุจูŠุงู†ุงุช ุงู„ู…ุฑุดุญูŠู† ุงู„ุนุดุฑุฉ ููŠ ู…ู„ู ู…ู†ูุตู„ ููŠ R2"""
try:
key = "Candidates.json"
data = {
"timestamp": datetime.now().isoformat(),
"total_candidates": len(candidates),
"candidates": candidates
}
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"โœ… ุชู… ุญูุธ {len(candidates)} ู…ุฑุดุญ ููŠ ู…ู„ู Candidates ููŠ R2")
# ุนุฑุถ ู…ุนู„ูˆู…ุงุช ุงู„ู…ุฑุดุญูŠู† ุงู„ู…ุญููˆุธูŠู†
print("๐Ÿ“Š ุงู„ู…ุฑุดุญูˆู† ุงู„ู…ุญููˆุธูˆู†:")
for i, candidate in enumerate(candidates):
symbol = candidate.get('symbol', 'Unknown')
score = candidate.get('enhanced_final_score', 0)
strategy = candidate.get('target_strategy', 'GENERIC')
print(f" {i+1}. {symbol}: {score:.3f} - {strategy}")
except Exception as e:
print(f"โŒ ูุดู„ ุญูุธ ุงู„ู…ุฑุดุญูŠู† ููŠ R2: {e}")
async def load_candidates_async(self):
"""ุชุญู…ูŠู„ ุจูŠุงู†ุงุช ุงู„ู…ุฑุดุญูŠู† ู…ู† R2"""
try:
key = "Candidates.json"
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
data = json.loads(response['Body'].read())
candidates = data.get('candidates', [])
print(f"โœ… ุชู… ุชุญู…ูŠู„ {len(candidates)} ู…ุฑุดุญ ู…ู† R2")
return candidates
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
print("โš ๏ธ ู„ุง ูŠูˆุฌุฏ ู…ู„ู ู…ุฑุดุญูŠู† ุณุงุจู‚")
return []
else:
raise
async def save_llm_prompts_async(self, symbol, prompt_type, prompt_content, analysis_data=None):
"""ุญูุธ ุงู„ู€ Prompts ุงู„ู…ุฑุณู„ุฉ ุฅู„ู‰ ุงู„ู†ู…ูˆุฐุฌ ุงู„ุถุฎู…"""
try:
key = "llm_prompts.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
existing_data = json.loads(response['Body'].read())
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
existing_data = {"prompts": []}
else:
raise
new_prompt = {
"timestamp": datetime.now().isoformat(),
"symbol": symbol,
"prompt_type": prompt_type, # 'trading_decision' or 'trade_reanalysis'
"prompt_content": prompt_content,
"analysis_data": analysis_data
}
existing_data["prompts"].append(new_prompt)
if len(existing_data["prompts"]) > 2000:
existing_data["prompts"] = existing_data["prompts"][-2000:]
data_json = json.dumps(existing_data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"โœ… ุชู… ุญูุธ prompt ู„ู€ {symbol} ({prompt_type}) ููŠ R2")
except Exception as e:
print(f"โŒ ูุดู„ ุญูุธ prompt ู„ู€ {symbol}: {e}")
async def save_system_logs_async(self, log_data):
try:
key = "system_logs.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
existing_logs = json.loads(response['Body'].read())
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
existing_logs = {"logs": []}
else:
raise
log_entry = {
"timestamp": datetime.now().isoformat(),
**log_data
}
existing_logs["logs"].append(log_entry)
if len(existing_logs["logs"]) > 2000:
existing_logs["logs"] = existing_logs["logs"][-2000:]
data_json = json.dumps(existing_logs, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"โœ… System log saved: {log_data.get('cycle_started', log_data.get('cycle_completed', 'event'))}")
except Exception as e:
print(f"โŒ Failed to save system logs: {e}")
async def save_learning_data_async(self, learning_data):
try:
key = "learning_data.json"
data = {
"timestamp": datetime.now().isoformat(),
"learning_data": learning_data
}
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print("โœ… Learning data saved to R2")
except Exception as e:
print(f"โŒ Failed to save learning data: {e}")
async def load_learning_data_async(self):
try:
key = "learning_data.json"
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
data = json.loads(response['Body'].read())
print("โœ… Learning data loaded from R2")
return data
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
print("โš ๏ธ No learning data found. Starting fresh.")
return {}
else:
raise
async def get_portfolio_state_async(self):
key = "portfolio_state.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
state = json.loads(response['Body'].read())
if hasattr(self, '_portfolio_warning_printed'):
delattr(self, '_portfolio_warning_printed')
print(f"๐Ÿ’ฐ Portfolio state loaded: Current Capital ${state.get('current_capital_usd', 0):.2f}")
return state
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if not hasattr(self, '_portfolio_warning_printed'):
print(f"โš ๏ธ No portfolio state file found. Initializing with ${INITIAL_CAPITAL:.2f}")
self._portfolio_warning_printed = True
initial_state = {
"current_capital_usd": INITIAL_CAPITAL,
"invested_capital_usd": 0.0,
"initial_capital_usd": INITIAL_CAPITAL,
"total_trades": 0,
"winning_trades": 0,
"total_profit_usd": 0.0,
"total_loss_usd": 0.0
}
await self.save_portfolio_state_async(initial_state)
return initial_state
else:
raise
async def save_portfolio_state_async(self, state):
key = "portfolio_state.json"
try:
data_json = json.dumps(state, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"๐Ÿ’พ Portfolio state saved: Current Capital ${state.get('current_capital_usd', 0):.2f}")
except Exception as e:
print(f"โŒ Failed to save portfolio state: {e}")
raise
async def get_open_trades_async(self):
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key="open_trades.json")
trades = json.loads(response['Body'].read())
if hasattr(self, '_open_trades_warning_printed'):
delattr(self, '_open_trades_warning_printed')
return trades
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if not hasattr(self, '_open_trades_warning_printed'):
print("โš ๏ธ No open trades file found. Starting with an empty list.")
print("๐Ÿ’ก This is normal for first-time runs or when all trades are closed.")
self._open_trades_warning_printed = True
return []
else:
raise
async def save_open_trades_async(self, trades):
try:
data_json = json.dumps(trades, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key="open_trades.json", Body=data_json, ContentType="application/json"
)
print(f"โœ… Open trades saved to R2. Total open trades: {len(trades)}")
except Exception as e:
print(f"โŒ Failed to save open trades: {e}")
raise
async def load_contracts_db_async(self):
key = "contracts.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
contracts_db = json.loads(response['Body'].read())
if hasattr(self, '_contracts_warning_printed'):
delattr(self, '_contracts_warning_printed')
print(f"๐Ÿ’พ Contracts database loaded from R2. Total entries: {len(contracts_db)}")
return contracts_db
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if not hasattr(self, '_contracts_warning_printed'):
print("โš ๏ธ No existing contracts database found. Initializing new one.")
self._contracts_warning_printed = True
return {}
else:
raise
async def save_contracts_db_async(self, data):
key = "contracts.json"
try:
data_json = json.dumps(data, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"โœ… Contracts database saved to R2 successfully. Total entries: {len(data)}")
except Exception as e:
print(f"โŒ Failed to save contracts database to R2: {e}")
raise
async def get_trade_by_symbol_async(self, symbol):
try:
open_trades = await self.get_open_trades_async()
for trade in open_trades:
if trade['symbol'] == symbol and trade['status'] == 'OPEN':
return trade
return None
except Exception as e:
print(f"โŒ Failed to get trade by symbol {symbol}: {e}")
return None
async def update_trade_monitoring_status_async(self, symbol, is_monitored):
try:
open_trades = await self.get_open_trades_async()
updated = False
for trade in open_trades:
if trade['symbol'] == symbol:
trade['is_monitored'] = is_monitored
updated = True
break
if updated:
await self.save_open_trades_async(open_trades)
status = "ENABLED" if is_monitored else "DISABLED"
print(f"โœ… Real-time monitoring {status} for {symbol}")
else:
print(f"โš ๏ธ Trade {symbol} not found for monitoring status update")
return updated
except Exception as e:
print(f"โŒ Failed to update monitoring status for {symbol}: {e}")
return False
async def get_monitored_trades_async(self):
try:
open_trades = await self.get_open_trades_async()
monitored_trades = [trade for trade in open_trades if trade.get('is_monitored', False)]
return monitored_trades
except Exception as e:
print(f"โŒ Failed to get monitored trades: {e}")
return []
#
# ๐Ÿ”ด ุฏุงู„ุฉ ุฌุฏูŠุฏุฉ: ู„ุญูุธ ุณุฌู„ ุชุฏู‚ูŠู‚ ุงู„ุชุญู„ูŠู„
#
async def save_analysis_audit_log_async(self, audit_data):
"""ุญูุธ ุณุฌู„ ุชุฏู‚ูŠู‚ ุฏูˆุฑุฉ ุงู„ุชุญู„ูŠู„ (ูŠุญุชูุธ ุจุขุฎุฑ 50 ุฏูˆุฑุฉ)"""
try:
key = "analysis_audit_log.json"
# 1. ุฌู„ุจ ุงู„ุณุฌู„ ุงู„ุญุงู„ูŠ (ุฅู† ูˆุฌุฏ)
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
existing_log_data = json.loads(response['Body'].read())
if isinstance(existing_log_data, list):
history = existing_log_data
else:
history = [] # ุจุฏุก ุณุฌู„ ุฌุฏูŠุฏ ุฅุฐุง ูƒุงู† ุงู„ุชู†ุณูŠู‚ ุบูŠุฑ ุตุงู„ุญ
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
history = [] # ู…ู„ู ุฌุฏูŠุฏ
else:
raise
# 2. ุฅุถุงูุฉ ุงู„ุฏูˆุฑุฉ ุงู„ุญุงู„ูŠุฉ
history.append(audit_data)
# 3. ุงู„ุญูุงุธ ุนู„ู‰ ุขุฎุฑ 50 ุณุฌู„ ูู‚ุท
if len(history) > 50:
history = history[-50:]
# 4. ุญูุธ ุงู„ู…ู„ู ุงู„ู…ุญุฏุซ
data_json = json.dumps(history, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"๐Ÿ“Š ุชู… ุญูุธ ุณุฌู„ ุชุฏู‚ูŠู‚ ุงู„ุชุญู„ูŠู„ ุจู†ุฌุงุญ ููŠ R2 (ุฅุฌู…ุงู„ูŠ {len(history)} ุณุฌู„ุงุช)")
except Exception as e:
print(f"โŒ ูุดู„ ุญูุธ ุณุฌู„ ุชุฏู‚ูŠู‚ ุงู„ุชุญู„ูŠู„ ููŠ R2: {e}")
# ๐Ÿ”ด --- START OF CHANGE (V10.2 - Whale Learning Storage) --- ๐Ÿ”ด
async def _load_json_file_from_r2(self, key: str, default: Any = []) -> Any:
"""ุฏุงู„ุฉ ู…ุณุงุนุฏุฉ ู„ุชุญู…ูŠู„ ู…ู„ู JSON ุจุฃู…ุงู†."""
try:
response = self.s3_client.get_object(Bucket=self.BUCKET_NAME, Key=key)
return json.loads(response['Body'].read())
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
print(f"โ„น๏ธ [R2Service] ู„ู… ูŠุชู… ุงู„ุนุซูˆุฑ ุนู„ู‰ ู…ู„ู '{key}'. ุณูŠุชู… ุงุณุชุฎุฏุงู… ุงู„ู‚ูŠู…ุฉ ุงู„ุงูุชุฑุงุถูŠุฉ.")
return default
else:
print(f"โŒ [R2Service] ุฎุทุฃ ClientError ุฃุซู†ุงุก ุชุญู…ูŠู„ '{key}': {e}")
raise
except Exception as e:
print(f"โŒ [R2Service] ุฎุทุฃ ุนุงู… ุฃุซู†ุงุก ุชุญู…ูŠู„ '{key}': {e}")
return default
async def _save_json_file_to_r2(self, key: str, data: Any):
"""ุฏุงู„ุฉ ู…ุณุงุนุฏุฉ ู„ุญูุธ ู…ู„ู JSON ุจุฃู…ุงู†."""
try:
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=self.BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
except Exception as e:
print(f"โŒ [R2Service] ูุดู„ ุญูุธ ุงู„ู…ู„ู '{key}' ุฅู„ู‰ R2: {e}")
traceback.print_exc()
async def save_whale_learning_record_async(self, record: Dict[str, Any]):
"""
(ุฌุฏูŠุฏ V10.2)
ูŠุญูุธ ุณุฌู„ุงู‹ "ู…ุนู„ู‚ุงู‹" ุฌุฏูŠุฏุงู‹ ููŠ ู…ู„ู PENDING.
"""
try:
pending_records = await self._load_json_file_from_r2(WHALE_LEARNING_PENDING_KEY, default=[])
pending_records.append(record)
await self._save_json_file_to_r2(WHALE_LEARNING_PENDING_KEY, pending_records)
print(f"โœ… [R2Service] ุชู… ุญูุธ ุณุฌู„ ุชุนู„ู… ุงู„ุญูŠุชุงู† (PENDING) ู„ู€ {record['symbol']}.")
except Exception as e:
print(f"โŒ [R2Service] ูุดู„ ููŠ save_whale_learning_record_async: {e}")
async def get_pending_whale_learning_records_async(self) -> List[Dict[str, Any]]:
"""
(ุฌุฏูŠุฏ V10.2)
ูŠุฌู„ุจ ุฌู…ูŠุน ุงู„ุณุฌู„ุงุช "ุงู„ู…ุนู„ู‚ุฉ" ู…ู† ู…ู„ู PENDING.
"""
try:
return await self._load_json_file_from_r2(WHALE_LEARNING_PENDING_KEY, default=[])
except Exception as e:
print(f"โŒ [R2Service] ูุดู„ ููŠ get_pending_whale_learning_records_async: {e}")
return []
async def update_completed_whale_learning_record_async(self, completed_record: Dict[str, Any]):
"""
(ุฌุฏูŠุฏ V10.2)
1. ูŠุญูุธ ุงู„ุณุฌู„ "ุงู„ู…ูƒุชู…ู„" ููŠ ู…ู„ู COMPLETED.
2. ูŠุฒูŠู„ ุงู„ุณุฌู„ ู…ู† ู…ู„ู PENDING.
"""
try:
record_id = completed_record.get("record_id")
if not record_id:
print("โŒ [R2Service] ู„ุง ูŠู…ูƒู† ุชุญุฏูŠุซ ุณุฌู„ ู…ูƒุชู…ู„ ุจุฏูˆู† record_id.")
return
# 1. ุงู„ุญูุธ ููŠ ู…ู„ู ุงู„ู…ูƒุชู…ู„ (ูŠุญุชูุธ ุจุขุฎุฑ 5000 ุณุฌู„ ู…ูƒุชู…ู„)
completed_records = await self._load_json_file_from_r2(WHALE_LEARNING_COMPLETED_KEY, default=[])
completed_records.append(completed_record)
if len(completed_records) > 5000:
completed_records = completed_records[-5000:]
await self._save_json_file_to_r2(WHALE_LEARNING_COMPLETED_KEY, completed_records)
# 2. ุงู„ุฅุฒุงู„ุฉ ู…ู† ู…ู„ู ุงู„ู…ุนู„ู‚
pending_records = await self._load_json_file_from_r2(WHALE_LEARNING_PENDING_KEY, default=[])
updated_pending_records = [
rec for rec in pending_records if rec.get("record_id") != record_id
]
await self._save_json_file_to_r2(WHALE_LEARNING_PENDING_KEY, updated_pending_records)
print(f"โœ… [R2Service] ุชู… ู†ู‚ู„ ุณุฌู„ ุชุนู„ู… ุงู„ุญูŠุชุงู† (COMPLETED) ู„ู€ {completed_record['symbol']} (ID: {record_id}).")
except Exception as e:
print(f"โŒ [R2Service] ูุดู„ ููŠ update_completed_whale_learning_record_async: {e}")
async def get_all_completed_whale_records_async(self) -> List[Dict[str, Any]]:
"""
(ุฌุฏูŠุฏ V10.2)
ูŠุฌู„ุจ *ุฌู…ูŠุน* ุงู„ุณุฌู„ุงุช ุงู„ู…ูƒุชู…ู„ุฉ (ู„ุชุญู„ูŠู„ ุงู„ุงุฑุชุจุงุท).
"""
try:
return await self._load_json_file_from_r2(WHALE_LEARNING_COMPLETED_KEY, default=[])
except Exception as e:
print(f"โŒ [R2Service] ูุดู„ ููŠ get_all_completed_whale_records_async: {e}")
return []
async def save_whale_learning_config_async(self, config: Dict[str, Any]):
"""
(ุฌุฏูŠุฏ V10.2)
ูŠุญูุธ ู…ู„ู ุงู„ุฅุนุฏุงุฏุงุช (ุงู„ุฃูˆุฒุงู†) ุงู„ุฐูŠ ู†ุชุฌ ุนู† ุงู„ุชุนู„ู….
"""
await self._save_json_file_to_r2(WHALE_LEARNING_CONFIG_KEY, config)
print(f"โœ… [R2Service] ุชู… ุญูุธ ุฅุนุฏุงุฏุงุช ุชุนู„ู… ุงู„ุญูŠุชุงู† ุงู„ู…ุซู„ู‰.")
async def load_whale_learning_config_async(self) -> Dict[str, Any]:
"""
(ุฌุฏูŠุฏ V10.2)
ูŠุฌู„ุจ ู…ู„ู ุงู„ุฅุนุฏุงุฏุงุช (ุงู„ุฃูˆุฒุงู†) ุงู„ุฐูŠ ู†ุชุฌ ุนู† ุงู„ุชุนู„ู….
"""
return await self._load_json_file_from_r2(WHALE_LEARNING_CONFIG_KEY, default={})
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
print("โœ… Enhanced R2 Service Loaded - Comprehensive Logging System with Candidates Support")