Trad / learning_engine.py
Riy777's picture
Update learning_engine.py
f5a7217
raw
history blame
22.5 kB
import os, json, asyncio
from datetime import datetime
from helpers import normalize_weights, calculate_market_volatility, should_update_weights
class LearningEngine:
def __init__(self, r2_service, data_manager):
self.r2_service = r2_service
self.data_manager = data_manager
self.weights = {}
self.performance_history = []
self.strategy_effectiveness = {}
self.market_patterns = {}
self.risk_profiles = {}
self.initialized = False
self.initialization_lock = asyncio.Lock()
async def initialize(self):
async with self.initialization_lock:
if self.initialized: return
print("Initializing learning system...")
try:
await self.load_weights_from_r2()
await self.load_performance_history()
self.initialized = True
print("Learning system ready")
except Exception as e:
print(f"Weights loading failed: {e}")
await self.initialize_default_weights()
self.initialized = True
async def initialize_enhanced(self):
async with self.initialization_lock:
if self.initialized: return
print("Enhanced learning system initialization...")
try:
await self.load_weights_from_r2()
await self.load_performance_history()
await self.fix_weights_structure()
if not self.performance_history:
print("Starting learning from scratch")
await self.initialize_default_weights()
self.initialized = True
except Exception as e:
print(f"Enhanced initialization failed: {e}")
await self.initialize_default_weights()
self.initialized = True
async def fix_weights_structure(self):
try:
key = "learning_engine_weights.json"
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
current_data = json.loads(response['Body'].read())
if 'strategy_weights' in current_data and 'last_updated' not in current_data:
fixed_data = {
"weights": current_data,
"last_updated": datetime.now().isoformat(),
"version": "2.0",
"performance_metrics": await self.calculate_performance_metrics()
}
data_json = json.dumps(fixed_data, indent=2, ensure_ascii=False).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
print("Weights structure fixed")
except Exception as e:
print(f"Weights structure fix failed: {e}")
async def initialize_default_weights(self):
self.weights = {
"strategy_weights": {
"trend_following": 0.18, "mean_reversion": 0.15, "breakout_momentum": 0.22,
"volume_spike": 0.12, "whale_tracking": 0.15, "pattern_recognition": 0.10,
"hybrid_ai": 0.08
},
"technical_weights": {
"rsi": 0.15, "macd": 0.18, "ema_cross": 0.12, "bollinger_bands": 0.10,
"volume_analysis": 0.15, "support_resistance": 0.12, "market_sentiment": 0.18
},
"risk_parameters": {
"max_position_size": 0.1, "max_daily_loss": 0.02, "stop_loss_base": 0.02,
"risk_reward_ratio": 2.0, "volatility_adjustment": 1.0
},
"market_condition_weights": {
"bull_market": {"trend_following": 0.25, "breakout_momentum": 0.20, "whale_tracking": 0.15},
"bear_market": {"mean_reversion": 0.25, "pattern_recognition": 0.20, "hybrid_ai": 0.15},
"sideways_market": {"mean_reversion": 0.30, "volume_spike": 0.20, "pattern_recognition": 0.15}
}
}
async def load_weights_from_r2(self):
try:
key = "learning_engine_weights.json"
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
weights_data = json.loads(response['Body'].read())
if isinstance(weights_data, dict):
if 'weights' in weights_data:
self.weights = weights_data['weights']
else:
self.weights = weights_data
print(f"Weights loaded from R2")
else:
raise ValueError("Invalid weights structure")
except Exception as e:
print(f"Weights loading failed: {e}")
await self.initialize_default_weights()
await self.save_weights_to_r2()
async def save_weights_to_r2(self):
try:
key = "learning_engine_weights.json"
weights_data = {
"weights": self.weights,
"last_updated": datetime.now().isoformat(),
"version": "2.0",
"performance_metrics": await self.calculate_performance_metrics()
}
data_json = json.dumps(weights_data, indent=2, ensure_ascii=False).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
print("Weights saved to R2")
except Exception as e:
print(f"Weights saving failed: {e}")
async def load_performance_history(self):
try:
key = "learning_performance_history.json"
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
history_data = json.loads(response['Body'].read())
self.performance_history = history_data.get("history", [])
print(f"Performance history loaded - {len(self.performance_history)} records")
except Exception as e:
print(f"Performance history loading failed: {e}")
self.performance_history = []
async def save_performance_history(self):
try:
key = "learning_performance_history.json"
history_data = {
"history": self.performance_history[-1000:],
"last_updated": datetime.now().isoformat()
}
data_json = json.dumps(history_data, indent=2, ensure_ascii=False).encode('utf-8')
self.r2_service.s3_client.put_object(
Bucket="trading", Key=key, Body=data_json, ContentType="application/json"
)
except Exception as e:
print(f"Performance history saving failed: {e}")
async def analyze_trade_outcome(self, trade_data, outcome):
if not self.initialized: await self.initialize()
try:
strategy = trade_data.get('strategy', 'unknown')
if strategy == 'unknown':
decision_data = trade_data.get('decision_data', {})
strategy = decision_data.get('strategy', 'unknown')
market_context = await self.get_current_market_conditions()
analysis_entry = {
"timestamp": datetime.now().isoformat(),
"trade_data": trade_data,
"outcome": outcome,
"market_conditions": market_context,
"strategy_used": strategy,
"symbol": trade_data.get('symbol', 'unknown'),
"pnl_usd": trade_data.get('pnl_usd', 0),
"pnl_percent": trade_data.get('pnl_percent', 0)
}
self.performance_history.append(analysis_entry)
await self.update_strategy_effectiveness(analysis_entry)
await self.update_market_patterns(analysis_entry)
if should_update_weights(len(self.performance_history)):
await self.adapt_weights_based_on_performance()
await self.save_weights_to_r2()
await self.save_performance_history()
print(f"Trade analyzed {trade_data.get('symbol')} - Strategy: {strategy} - Outcome: {outcome}")
except Exception as e:
print(f"Trade outcome analysis failed: {e}")
async def update_strategy_effectiveness(self, analysis_entry):
strategy = analysis_entry['strategy_used']
outcome = analysis_entry['outcome']
market_condition = analysis_entry['market_conditions']['current_trend']
pnl_percent = analysis_entry.get('pnl_percent', 0)
if strategy not in self.strategy_effectiveness:
self.strategy_effectiveness[strategy] = {
"total_trades": 0, "successful_trades": 0, "total_profit": 0,
"total_pnl_percent": 0, "market_conditions": {}
}
self.strategy_effectiveness[strategy]["total_trades"] += 1
self.strategy_effectiveness[strategy]["total_pnl_percent"] += pnl_percent
is_success = outcome in ["SUCCESS", "CLOSED_BY_REANALYSIS", "CLOSED_BY_MONITOR"] and pnl_percent > 0
if is_success: self.strategy_effectiveness[strategy]["successful_trades"] += 1
if market_condition not in self.strategy_effectiveness[strategy]["market_conditions"]:
self.strategy_effectiveness[strategy]["market_conditions"][market_condition] = {
"trades": 0, "successes": 0, "total_pnl": 0
}
self.strategy_effectiveness[strategy]["market_conditions"][market_condition]["trades"] += 1
self.strategy_effectiveness[strategy]["market_conditions"][market_condition]["total_pnl"] += pnl_percent
if is_success: self.strategy_effectiveness[strategy]["market_conditions"][market_condition]["successes"] += 1
async def update_market_patterns(self, analysis_entry):
market_condition = analysis_entry['market_conditions']['current_trend']
symbol = analysis_entry['symbol']
outcome = analysis_entry['outcome']
pnl_percent = analysis_entry.get('pnl_percent', 0)
if market_condition not in self.market_patterns:
self.market_patterns[market_condition] = {
"total_trades": 0, "successful_trades": 0, "total_pnl_percent": 0,
"best_performing_strategies": {}, "best_performing_symbols": {}
}
self.market_patterns[market_condition]["total_trades"] += 1
self.market_patterns[market_condition]["total_pnl_percent"] += pnl_percent
is_success = outcome in ["SUCCESS", "CLOSED_BY_REANALYSIS", "CLOSED_BY_MONITOR"] and pnl_percent > 0
if is_success: self.market_patterns[market_condition]["successful_trades"] += 1
strategy = analysis_entry['strategy_used']
if strategy not in self.market_patterns[market_condition]["best_performing_strategies"]:
self.market_patterns[market_condition]["best_performing_strategies"][strategy] = {
"count": 0, "total_pnl": 0
}
self.market_patterns[market_condition]["best_performing_strategies"][strategy]["count"] += 1
self.market_patterns[market_condition]["best_performing_strategies"][strategy]["total_pnl"] += pnl_percent
if symbol not in self.market_patterns[market_condition]["best_performing_symbols"]:
self.market_patterns[market_condition]["best_performing_symbols"][symbol] = {
"count": 0, "total_pnl": 0
}
self.market_patterns[market_condition]["best_performing_symbols"][symbol]["count"] += 1
self.market_patterns[market_condition]["best_performing_symbols"][symbol]["total_pnl"] += pnl_percent
async def adapt_weights_based_on_performance(self):
print("Updating weights based on performance...")
try:
if not self.strategy_effectiveness:
print("Insufficient performance data, using gradual adjustment")
await self.gradual_weights_adjustment()
return
total_performance = 0
strategy_performance = {}
for strategy, data in self.strategy_effectiveness.items():
if data["total_trades"] > 0:
success_rate = data["successful_trades"] / data["total_trades"]
avg_pnl = data["total_pnl_percent"] / data["total_trades"]
composite_performance = (success_rate * 0.7) + (min(avg_pnl, 10) / 10 * 0.3)
strategy_performance[strategy] = composite_performance
total_performance += composite_performance
if total_performance > 0 and strategy_performance:
for strategy, performance in strategy_performance.items():
current_weight = self.weights["strategy_weights"].get(strategy, 0.1)
new_weight = current_weight * 0.7 + (performance * 0.3)
self.weights["strategy_weights"][strategy] = new_weight
normalize_weights(self.weights["strategy_weights"])
print("Weights updated based on real performance")
else:
await self.gradual_weights_adjustment()
except Exception as e:
print(f"Weights update failed: {e}")
await self.gradual_weights_adjustment()
async def gradual_weights_adjustment(self):
print("Gradual weights adjustment...")
if self.market_patterns:
for market_condition, data in self.market_patterns.items():
if data.get("total_trades", 0) > 0:
best_strategy = max(data["best_performing_strategies"].items(),
key=lambda x: x[1]["total_pnl"])[0] if data["best_performing_strategies"] else None
if best_strategy:
current_weight = self.weights["strategy_weights"].get(best_strategy, 0.1)
self.weights["strategy_weights"][best_strategy] = min(current_weight * 1.1, 0.3)
normalize_weights(self.weights["strategy_weights"])
print("Gradual weights adjustment completed")
async def get_current_market_conditions(self):
try:
if not self.data_manager: raise ValueError("DataManager unavailable")
market_context = await self.data_manager.get_market_context_async()
if not market_context: raise ValueError("Market context fetch failed")
return {
"current_trend": market_context.get('market_trend', 'sideways_market'),
"volatility": calculate_market_volatility(market_context),
"market_sentiment": market_context.get('btc_sentiment', 'NEUTRAL'),
"whale_activity": market_context.get('general_whale_activity', {}).get('sentiment', 'NEUTRAL'),
"fear_greed_index": market_context.get('fear_and_greed_index', 50)
}
except Exception as e:
print(f"Market conditions fetch failed: {e}")
return {
"current_trend": "sideways_market", "volatility": "medium",
"market_sentiment": "neutral", "whale_activity": "low", "fear_greed_index": 50
}
async def calculate_performance_metrics(self):
if not self.performance_history: return {"status": "No performance data yet"}
recent_trades = self.performance_history[-50:]
total_trades = len(recent_trades)
successful_trades = sum(1 for trade in recent_trades
if trade['outcome'] in ["SUCCESS", "CLOSED_BY_REANALYSIS", "CLOSED_BY_MONITOR"] and trade.get('pnl_percent', 0) > 0)
success_rate = successful_trades / total_trades if total_trades > 0 else 0
total_pnl = sum(trade.get('pnl_percent', 0) for trade in recent_trades)
avg_pnl = total_pnl / total_trades if total_trades > 0 else 0
strategy_performance = {}
for strategy, data in self.strategy_effectiveness.items():
if data["total_trades"] > 0:
strategy_success_rate = data["successful_trades"] / data["total_trades"]
strategy_avg_pnl = data["total_pnl_percent"] / data["total_trades"]
strategy_performance[strategy] = {
"success_rate": strategy_success_rate, "avg_pnl_percent": strategy_avg_pnl,
"total_trades": data["total_trades"], "successful_trades": data["successful_trades"]
}
market_performance = {}
for condition, data in self.market_patterns.items():
if data["total_trades"] > 0:
market_success_rate = data["successful_trades"] / data["total_trades"]
market_avg_pnl = data["total_pnl_percent"] / data["total_trades"]
market_performance[condition] = {
"success_rate": market_success_rate, "avg_pnl_percent": market_avg_pnl,
"total_trades": data["total_trades"]
}
return {
"overall_success_rate": success_rate, "overall_avg_pnl_percent": avg_pnl,
"total_analyzed_trades": len(self.performance_history), "recent_trades_analyzed": total_trades,
"strategy_performance": strategy_performance, "market_performance": market_performance,
"last_updated": datetime.now().isoformat()
}
async def get_optimized_strategy_weights(self, market_condition):
try:
if not self.initialized: return await self.get_default_strategy_weights()
if (not self.weights or "strategy_weights" not in self.weights or not self.weights["strategy_weights"]):
return await self.get_default_strategy_weights()
base_weights = self.weights["strategy_weights"].copy()
if not any(weight > 0 for weight in base_weights.values()):
return await self.get_default_strategy_weights()
print(f"Using learned weights: {base_weights}")
return base_weights
except Exception as e:
print(f"Optimized weights calculation failed: {e}")
return await self.get_default_strategy_weights()
async def get_default_strategy_weights(self):
return {
"trend_following": 0.18, "mean_reversion": 0.15, "breakout_momentum": 0.22,
"volume_spike": 0.12, "whale_tracking": 0.15, "pattern_recognition": 0.10,
"hybrid_ai": 0.08
}
async def get_risk_parameters(self, symbol_volatility):
if not self.weights or "risk_parameters" not in self.weights: await self.initialize_default_weights()
risk_params = self.weights.get("risk_parameters", {}).copy()
if symbol_volatility == "HIGH":
risk_params["stop_loss_base"] *= 1.5
risk_params["max_position_size"] *= 0.7
risk_params["risk_reward_ratio"] = 1.5
elif symbol_volatility == "LOW":
risk_params["stop_loss_base"] *= 0.7
risk_params["max_position_size"] *= 1.2
risk_params["risk_reward_ratio"] = 2.5
return risk_params
async def suggest_improvements(self):
improvements = []
if not self.performance_history:
improvements.append("Start collecting performance data from first trades")
return improvements
for strategy, data in self.strategy_effectiveness.items():
if data["total_trades"] >= 3:
success_rate = data["successful_trades"] / data["total_trades"]
avg_pnl = data["total_pnl_percent"] / data["total_trades"]
if success_rate < 0.3 and avg_pnl < 0:
improvements.append(f"Strategy {strategy} poor performance ({success_rate:.1%} success, {avg_pnl:+.1f}% average) - suggest reducing usage")
elif success_rate > 0.6 and avg_pnl > 2:
improvements.append(f"Strategy {strategy} excellent performance ({success_rate:.1%} success, {avg_pnl:+.1f}% average) - suggest increasing usage")
elif success_rate > 0.7:
improvements.append(f"Strategy {strategy} high success ({success_rate:.1%}) - focus on trade quality")
for market_condition, data in self.market_patterns.items():
if data["total_trades"] >= 5:
success_rate = data["successful_trades"] / data["total_trades"]
avg_pnl = data["total_pnl_percent"] / data["total_trades"]
if success_rate < 0.4:
improvements.append(f"Poor performance in {market_condition} market ({success_rate:.1%} success) - needs strategy review")
best_strategy = None
best_performance = -100
for strategy, stats in data["best_performing_strategies"].items():
if stats["count"] >= 2:
strategy_avg_pnl = stats["total_pnl"] / stats["count"]
if strategy_avg_pnl > best_performance:
best_performance = strategy_avg_pnl
best_strategy = strategy
if best_strategy and best_performance > 1:
improvements.append(f"Best strategy in {market_condition}: {best_strategy} ({best_performance:+.1f}% average profit)")
if not improvements: improvements.append("No suggested improvements currently - continue data collection")
return improvements
async def force_strategy_learning(self):
print("Forcing strategy update from current data...")
if not self.performance_history:
print("No performance data to learn from")
return
for entry in self.performance_history:
await self.update_strategy_effectiveness(entry)
await self.update_market_patterns(entry)
await self.adapt_weights_based_on_performance()
await self.save_weights_to_r2()
print("Strategy update forced successfully")
print("Enhanced self-learning system loaded - ready for continuous learning and adaptation")