Trad / app.py
Riy777's picture
Update app.py
816f418
raw
history blame
35.7 kB
# app.py (Fully updated to Learning Hub architecture)
import os
import traceback
import signal
import sys
import uvicorn
import asyncio
import json
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from datetime import datetime
# ุงุณุชูŠุฑุงุฏ ุงู„ุฎุฏู…ุงุช
try:
from r2 import R2Service
from LLM import LLMService
from data_manager import DataManager
from ml_engine.processor import MLProcessor
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
# (Import the new hub manager instead of the old engine)
from learning_hub.hub_manager import LearningHubManager
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
from sentiment_news import SentimentAnalyzer
from trade_manager import TradeManager
import state # (This is state.py)
from helpers import safe_float_conversion, validate_candidate_data_enhanced
except ImportError as e:
print(f"โŒ ุฎุทุฃ ููŠ ุงุณุชูŠุฑุงุฏ ุงู„ูˆุญุฏุงุช: {e}")
sys.exit(1)
# ุงู„ู…ุชุบูŠุฑุงุช ุงู„ุนุงู„ู…ูŠุฉ
r2_service_global = None
data_manager_global = None
llm_service_global = None
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
learning_hub_global = None # (Changed from learning_engine_global)
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
trade_manager_global = None
sentiment_analyzer_global = None
symbol_whale_monitor_global = None
class StateManager:
def __init__(self):
self.market_analysis_lock = asyncio.Lock()
self.trade_analysis_lock = asyncio.Lock()
self.initialization_complete = False
self.initialization_error = None
self.services_initialized = {
'r2_service': False, 'data_manager': False, 'llm_service': False,
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
'learning_hub': False, # (Changed from learning_engine)
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
'trade_manager': False, 'sentiment_analyzer': False,
'symbol_whale_monitor': False
}
async def wait_for_initialization(self, timeout=60):
start_time = time.time()
while not self.initialization_complete and (time.time() - start_time) < timeout:
if self.initialization_error: raise Exception(f"ูุดู„ ุงู„ุชู‡ูŠุฆุฉ: {self.initialization_error}")
await asyncio.sleep(2)
if not self.initialization_complete: raise Exception(f"ุงู†ุชู‡ุช ู…ู‡ู„ุฉ ุงู„ุชู‡ูŠุฆุฉ ({timeout} ุซุงู†ูŠุฉ)")
return self.initialization_complete
def set_service_initialized(self, service_name):
self.services_initialized[service_name] = True
if all(self.services_initialized.values()):
self.initialization_complete = True
print("๐ŸŽฏ ุฌู…ูŠุน ุงู„ุฎุฏู…ุงุช ู…ู‡ูŠุฃุฉ ุจุงู„ูƒุงู…ู„")
def set_initialization_error(self, error):
self.initialization_error = error
print(f"โŒ ุฎุทุฃ ููŠ ุงู„ุชู‡ูŠุฆุฉ: {error}")
state_manager = StateManager()
async def initialize_services():
"""ุชู‡ูŠุฆุฉ ุฌู…ูŠุน ุงู„ุฎุฏู…ุงุช ุจุดูƒู„ ู…ู†ูุตู„"""
global r2_service_global, data_manager_global, llm_service_global
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
global learning_hub_global # (Changed from learning_engine_global)
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
global trade_manager_global, sentiment_analyzer_global
global symbol_whale_monitor_global
try:
print("๐Ÿš€ ุจุฏุก ุชู‡ูŠุฆุฉ ุงู„ุฎุฏู…ุงุช...")
print(" ๐Ÿ”„ ุชู‡ูŠุฆุฉ R2Service..."); r2_service_global = R2Service(); state_manager.set_service_initialized('r2_service'); print(" โœ… R2Service ู…ู‡ูŠุฃุฉ")
print(" ๐Ÿ”„ ุฌู„ุจ ู‚ุงุนุฏุฉ ุจูŠุงู†ุงุช ุงู„ุนู‚ูˆุฏ..."); contracts_database = await r2_service_global.load_contracts_db_async(); print(f" โœ… ุชู… ุชุญู…ูŠู„ {len(contracts_database)} ุนู‚ุฏ")
print(" ๐Ÿ”„ ุชู‡ูŠุฆุฉ ู…ุฑุงู‚ุจ ุงู„ุญูŠุชุงู†...");
try:
from whale_news_data import EnhancedWhaleMonitor
symbol_whale_monitor_global = EnhancedWhaleMonitor(contracts_database, r2_service_global)
state_manager.set_service_initialized('symbol_whale_monitor'); print(" โœ… ู…ุฑุงู‚ุจ ุงู„ุญูŠุชุงู† ู…ู‡ูŠุฃ")
except Exception as e: print(f" โš ๏ธ ูุดู„ ุชู‡ูŠุฆุฉ ู…ุฑุงู‚ุจ ุงู„ุญูŠุชุงู†: {e}"); symbol_whale_monitor_global = None
print(" ๐Ÿ”„ ุชู‡ูŠุฆุฉ DataManager..."); data_manager_global = DataManager(contracts_database, symbol_whale_monitor_global); await data_manager_global.initialize(); state_manager.set_service_initialized('data_manager'); print(" โœ… DataManager ู…ู‡ูŠุฃุฉ")
print(" ๐Ÿ”„ ุชู‡ูŠุฆุฉ LLMService...");
llm_service_global = LLMService();
llm_service_global.r2_service = r2_service_global;
print(" ๐Ÿ”„ ุชู‡ูŠุฆุฉ ู…ุญู„ู„ ุงู„ู…ุดุงุนุฑ...");
sentiment_analyzer_global = SentimentAnalyzer(data_manager_global);
state_manager.set_service_initialized('sentiment_analyzer');
print(" โœ… ู…ุญู„ู„ ุงู„ู…ุดุงุนุฑ ู…ู‡ูŠุฃ")
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
# (Initialize the new Learning Hub Manager)
print(" ๐Ÿ”„ ุชู‡ูŠุฆุฉ ู…ุญูˆุฑ ุงู„ุชุนู„ู… (Learning Hub)...");
learning_hub_global = LearningHubManager(
r2_service=r2_service_global,
llm_service=llm_service_global,
data_manager=data_manager_global
)
await learning_hub_global.initialize()
state_manager.set_service_initialized('learning_hub');
print(" โœ… ู…ุญูˆุฑ ุงู„ุชุนู„ู… (Hub) ู…ู‡ูŠุฃ")
# (Connect the Hub to the LLM service)
llm_service_global.learning_hub = learning_hub_global
state_manager.set_service_initialized('llm_service');
print(" โœ… LLMService ู…ู‡ูŠุฃุฉ (ูˆู…ุฑุจูˆุทุฉ ุจู…ุญูˆุฑ ุงู„ุชุนู„ู…)")
print(" ๐Ÿ”„ ุชู‡ูŠุฆุฉ ู…ุฏูŠุฑ ุงู„ุตูู‚ุงุช...");
# (Pass the new Learning Hub to the Trade Manager)
trade_manager_global = TradeManager(
r2_service_global,
learning_hub_global, # (Passing the new hub)
data_manager_global,
state_manager
)
state_manager.set_service_initialized('trade_manager');
print(" โœ… ู…ุฏูŠุฑ ุงู„ุตูู‚ุงุช ู…ู‡ูŠุฃ (ูˆู…ุฑุจูˆุท ุจู…ุญูˆุฑ ุงู„ุชุนู„ู…)")
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
print("๐ŸŽฏ ุงูƒุชู…ู„ุช ุชู‡ูŠุฆุฉ ุฌู…ูŠุน ุงู„ุฎุฏู…ุงุช ุจู†ุฌุงุญ"); return True
except Exception as e: error_msg = f"ูุดู„ ุชู‡ูŠุฆุฉ ุงู„ุฎุฏู…ุงุช: {str(e)}"; print(f"โŒ {error_msg}"); state_manager.set_initialization_error(error_msg); return False
async def monitor_market_async():
"""(Unchanged)"""
global data_manager_global, sentiment_analyzer_global
try:
if not await state_manager.wait_for_initialization(): print("โŒ ูุดู„ ุชู‡ูŠุฆุฉ ุงู„ุฎุฏู…ุงุช - ุฅูŠู‚ุงู ู…ุฑุงู‚ุจุฉ ุงู„ุณูˆู‚"); return
while True:
try:
async with state_manager.market_analysis_lock:
market_context = await sentiment_analyzer_global.get_market_sentiment()
if not market_context: state.MARKET_STATE_OK = True; await asyncio.sleep(60); continue
bitcoin_sentiment = market_context.get('btc_sentiment')
fear_greed_index = market_context.get('fear_and_greed_index')
should_halt_trading, halt_reason = False, ""
if bitcoin_sentiment == 'BEARISH' and (fear_greed_index is not None and fear_greed_index < 30): should_halt_trading, halt_reason = True, "ุธุฑูˆู ุณูˆู‚ ู‡ุงุจุทุฉ"
if should_halt_trading: state.MARKET_STATE_OK = False; await r2_service_global.save_system_logs_async({"market_halt": True, "reason": halt_reason})
else:
if not state.MARKET_STATE_OK: print("โœ… ุชุญุณู†ุช ุธุฑูˆู ุงู„ุณูˆู‚. ุงุณุชุฆู†ุงู ุงู„ุนู…ู„ูŠุงุช ุงู„ุนุงุฏูŠุฉ.")
state.MARKET_STATE_OK = True
await asyncio.sleep(60)
except Exception as error: print(f"โŒ ุฎุทุฃ ุฃุซู†ุงุก ู…ุฑุงู‚ุจุฉ ุงู„ุณูˆู‚: {error}"); state.MARKET_STATE_OK = True; await asyncio.sleep(60)
except Exception as e: print(f"โŒ ูุดู„ ุชุดุบูŠู„ ู…ุฑุงู‚ุจุฉ ุงู„ุณูˆู‚: {e}")
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
# (New background task for periodic distillation - Point 6)
async def run_periodic_distillation():
"""
Runs the Learning Hub's distillation process periodically.
"""
print("background task: Periodic Distillation (Curator) scheduled.")
await asyncio.sleep(300) # (Wait 5 minutes after startup)
while True:
try:
if not await state_manager.wait_for_initialization():
await asyncio.sleep(60)
continue
print("๐Ÿ”„ [Scheduler] Running periodic distillation check...")
await learning_hub_global.run_distillation_check()
# (Run every 6 hours)
await asyncio.sleep(6 * 60 * 60)
except Exception as e:
print(f"โŒ [Scheduler] Error in periodic distillation task: {e}")
traceback.print_exc()
await asyncio.sleep(60 * 60) # (Wait 1 hour on error)
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
async def process_batch_parallel(batch, ml_processor, batch_num, total_batches, preloaded_whale_data):
"""(Unchanged)"""
try:
batch_tasks = []
for symbol_data in batch:
task = asyncio.create_task(ml_processor.process_multiple_symbols_parallel([symbol_data], preloaded_whale_data))
batch_tasks.append(task)
batch_results_list_of_lists = await asyncio.gather(*batch_tasks, return_exceptions=True)
successful_results = []
low_score_results = []
failed_results = []
for i, result_list in enumerate(batch_results_list_of_lists):
symbol = batch[i].get('symbol', 'unknown')
if isinstance(result_list, Exception):
failed_results.append({"symbol": symbol, "error": f"Task Execution Error: {str(result_list)}"})
continue
if result_list:
result = result_list[0]
if isinstance(result, dict):
if result.get('enhanced_final_score', 0) > 0.4:
successful_results.append(result)
else:
low_score_results.append(result)
else:
failed_results.append({"symbol": symbol, "error": f"ML processor returned invalid type: {type(result)}"})
else:
failed_results.append({"symbol": symbol, "error": "ML processing returned None or empty list"})
return {'success': successful_results, 'low_score': low_score_results, 'failures': failed_results}
except Exception as error:
print(f"โŒ [Consumer] Error processing batch {batch_num}: {error}")
return {'success': [], 'low_score': [], 'failures': []}
async def run_3_layer_analysis():
"""
(Updated to pass LearningHub to MLProcessor and save context for Reflector)
"""
layer1_candidates = []
layer2_candidates = []
all_low_score_candidates = []
all_failed_candidates = []
final_layer2_candidates = []
final_opportunities = []
preloaded_whale_data_dict = {}
try:
print("๐ŸŽฏ Starting 3-Layer Analysis (with Learning Hub integration)...")
if not await state_manager.wait_for_initialization(): print("โŒ Services not fully initialized"); return None
# (Layer 1 - Unchanged)
print("\n๐Ÿ” Layer 1: Rapid Screening (data_manager)...")
layer1_candidates = await data_manager_global.layer1_rapid_screening()
if not layer1_candidates: print("โŒ No candidates found in Layer 1"); return None
print(f"โœ… Selected {len(layer1_candidates)} symbols for Layer 2")
layer1_symbols = [c['symbol'] for c in layer1_candidates]
# (Layer 1.5 - Unchanged)
start_whale_fetch = time.time()
print(f"\n๐Ÿ‹ Layer 1.5: Fetching whale data for {len(layer1_symbols)} symbols (async)...")
async def fetch_whale_data_task(symbols, results_dict):
WHALE_FETCH_CONCURRENCY = 3
semaphore = asyncio.Semaphore(WHALE_FETCH_CONCURRENCY)
tasks = []
async def get_data_with_semaphore(symbol):
async with semaphore:
try:
data = await data_manager_global.get_whale_data_for_symbol(symbol)
if data: results_dict[symbol] = data
except Exception as e:
results_dict[symbol] = {'data_available': False, 'error': str(e)}
for symbol in symbols: tasks.append(asyncio.create_task(get_data_with_semaphore(symbol)))
await asyncio.gather(*tasks)
whale_fetcher_task = asyncio.create_task(fetch_whale_data_task(layer1_symbols, preloaded_whale_data_dict))
# (Producer/Consumer Setup - Unchanged)
DATA_QUEUE_MAX_SIZE = 2
ohlcv_data_queue = asyncio.Queue(maxsize=DATA_QUEUE_MAX_SIZE)
ml_results_list = []
market_context = await data_manager_global.get_market_context_async()
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
# (Pass the global Learning Hub to the MLProcessor)
ml_processor = MLProcessor(market_context, data_manager_global, learning_hub_global)
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
batch_size = 15
total_batches = (len(layer1_candidates) + batch_size - 1) // batch_size
# (ML Consumer Task - Unchanged)
async def ml_consumer_task(queue: asyncio.Queue, results_list: list, whale_data_store: dict):
batch_num = 0
while True:
try:
batch_data = await queue.get()
if batch_data is None: queue.task_done(); break
batch_num += 1
batch_results_dict = await process_batch_parallel(
batch_data, ml_processor, batch_num, total_batches, whale_data_store
)
results_list.append(batch_results_dict)
queue.task_done()
except Exception as e: print(f"โŒ [ML Consumer] Fatal Error: {e}"); traceback.print_exc(); queue.task_done()
# (Run Producer/Consumer - Unchanged)
consumer_task = asyncio.create_task(ml_consumer_task(ohlcv_data_queue, ml_results_list, preloaded_whale_data_dict))
producer_task = asyncio.create_task(data_manager_global.stream_ohlcv_data(layer1_symbols, ohlcv_data_queue))
await producer_task;
await ohlcv_data_queue.put(None)
await ohlcv_data_queue.join()
await consumer_task;
# (Wait for Whale Fetch - Unchanged)
WHALE_FETCH_TIMEOUT_SECONDS = 180
try:
await asyncio.wait_for(whale_fetcher_task, timeout=WHALE_FETCH_TIMEOUT_SECONDS)
except asyncio.TimeoutError:
print(f" โš ๏ธ Whale fetch timeout ({WHALE_FETCH_TIMEOUT_SECONDS}s)!")
except Exception as whale_task_err:
print(f" โŒ Whale fetch task error: {whale_task_err}")
# (Aggregate Results - Unchanged)
print("๐Ÿ”„ Aggregating all results...")
for batch_result in ml_results_list:
for success_item in batch_result['success']:
symbol = success_item['symbol']
l1_data = next((c for c in layer1_candidates if c['symbol'] == symbol), None)
if l1_data:
success_item['reasons_for_candidacy'] = l1_data.get('reasons', [])
success_item['layer1_score'] = l1_data.get('layer1_score', 0)
if symbol in preloaded_whale_data_dict: success_item['whale_data'] = preloaded_whale_data_dict[symbol]
elif 'whale_data' not in success_item: success_item['whale_data'] = {'data_available': False, 'reason': 'Fetch timed out or failed'}
layer2_candidates.append(success_item)
all_low_score_candidates.extend(batch_result['low_score'])
all_failed_candidates.extend(batch_result['failures'])
if not layer2_candidates: print("โŒ No candidates found in Layer 2")
# (Sort and Filter - Unchanged)
layer2_candidates.sort(key=lambda x: x.get('enhanced_final_score', 0), reverse=True)
target_count = min(10, len(layer2_candidates))
final_layer2_candidates = layer2_candidates[:target_count]
# (Layer 2.5: Advanced MC - Unchanged)
print(f"\n๐Ÿ”ฌ Layer 2.5: Running Advanced MC (GARCH+LGBM) on top {len(final_layer2_candidates)} candidates...")
advanced_mc_analyzer = ml_processor.monte_carlo_analyzer
updated_candidates_for_llm = []
for candidate in final_layer2_candidates:
symbol = candidate.get('symbol', 'UNKNOWN')
try:
advanced_mc_results = await advanced_mc_analyzer.generate_1h_distribution_advanced(
candidate.get('ohlcv')
)
if advanced_mc_results and advanced_mc_results.get('simulation_model') == 'Phase2_GARCH_LGBM':
candidate['monte_carlo_distribution'] = advanced_mc_results
candidate['monte_carlo_probability'] = advanced_mc_results.get('probability_of_gain', 0)
candidate['advanced_mc_run'] = True
else:
candidate['advanced_mc_run'] = False
updated_candidates_for_llm.append(candidate)
except Exception as e:
print(f" โŒ [Advanced MC] {symbol} - Error: {e}. Using Phase 1 results.")
candidate['advanced_mc_run'] = False
updated_candidates_for_llm.append(candidate)
final_layer2_candidates = updated_candidates_for_llm
await r2_service_global.save_candidates_async(final_layer2_candidates)
# (Printing results omitted for brevity)
# (Layer 3 - LLM)
print("\n๐Ÿง  Layer 3: LLM Service Analysis...")
for candidate in final_layer2_candidates:
try:
symbol = candidate['symbol']
ohlcv_data = candidate.get('ohlcv');
if not ohlcv_data: continue
candidate['raw_ohlcv'] = ohlcv_data
total_candles = sum(len(data) for data in ohlcv_data.values()) if ohlcv_data else 0
if total_candles < 30: continue
# (Add latest market context for LLM)
candidate['sentiment_data'] = await data_manager_global.get_market_context_async()
llm_analysis = await llm_service_global.get_trading_decision(candidate)
if llm_analysis and llm_analysis.get('action') in ['BUY']:
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
# (Save the 'candidate' object, which is the full context for the Reflector)
opportunity={
'symbol': symbol,
'current_price': candidate.get('current_price', 0),
'decision': llm_analysis,
'enhanced_score': candidate.get('enhanced_final_score', 0),
'llm_confidence': llm_analysis.get('confidence_level', 0),
'strategy': llm_analysis.get('strategy', 'GENERIC'),
'analysis_timestamp': datetime.now().isoformat(),
'decision_context_data': candidate # (This is the snapshot)
}
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
final_opportunities.append(opportunity)
print(f" โœ… {symbol}: {llm_analysis.get('action')} - Conf: {llm_analysis.get('confidence_level', 0):.2f}")
else:
action = llm_analysis.get('action', 'NO_DECISION') if llm_analysis else 'NO_RESPONSE'; print(f" โš ๏ธ {symbol}: No trade decision from LLM ({action})")
except Exception as e: print(f"โŒ Error in LLM analysis for {candidate.get('symbol')}: {e}"); traceback.print_exc(); continue
if final_opportunities:
final_opportunities.sort(key=lambda x: (x['llm_confidence'] + x['enhanced_score']) / 2, reverse=True)
# (Audit log saving omitted for brevity - unchanged)
if not final_opportunities: print("โŒ No suitable trading opportunities found"); return None
return final_opportunities[0] if final_opportunities else None
except Exception as error:
print(f"โŒ Fatal error in 3-layer system: {error}"); traceback.print_exc()
return None
async def re_analyze_open_trade_async(trade_data):
"""(Updated to pass LearningHub to MLProcessor)"""
symbol = trade_data.get('symbol')
try:
async with state_manager.trade_analysis_lock:
print(f"๐Ÿ”„ [Re-Analyze] Starting strategic analysis for {symbol}...")
market_context = await data_manager_global.get_market_context_async()
ohlcv_data_list = []
temp_queue = asyncio.Queue()
await data_manager_global.stream_ohlcv_data([symbol], temp_queue)
while True:
try:
batch = await asyncio.wait_for(temp_queue.get(), timeout=1.0)
if batch is None: temp_queue.task_done(); break
ohlcv_data_list.extend(batch)
temp_queue.task_done()
except asyncio.TimeoutError:
if temp_queue.empty(): break
except Exception: break
if not ohlcv_data_list: print(f"โš ๏ธ Failed to get re-analysis data for {symbol}"); return None
ohlcv_data = ohlcv_data_list[0]
l1_data = await data_manager_global._get_detailed_symbol_data(symbol)
if l1_data: ohlcv_data.update(l1_data); ohlcv_data['reasons_for_candidacy'] = ['re-analysis']
re_analysis_whale_data = await data_manager_global.get_whale_data_for_symbol(symbol)
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
# (Pass the global Learning Hub to the MLProcessor)
ml_processor = MLProcessor(market_context, data_manager_global, learning_hub_global)
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
print(f"๐Ÿ”„ [Re-Analyze] Using Advanced MC (Phase 2+3) for {symbol}...")
advanced_mc_results = await ml_processor.monte_carlo_analyzer.generate_1h_distribution_advanced(
ohlcv_data.get('ohlcv')
)
processed_data = await ml_processor.process_and_score_symbol_enhanced(ohlcv_data, {symbol: re_analysis_whale_data} if re_analysis_whale_data else {})
if not processed_data: return None
if advanced_mc_results:
processed_data['monte_carlo_distribution'] = advanced_mc_results
processed_data['monte_carlo_probability'] = advanced_mc_results.get('probability_of_gain', 0)
processed_data['raw_ohlcv'] = ohlcv_data.get('raw_ohlcv') or ohlcv_data.get('ohlcv')
processed_data['ohlcv'] = processed_data['raw_ohlcv']
processed_data['sentiment_data'] = market_context
re_analysis_decision = await llm_service_global.re_analyze_trade_async(trade_data, processed_data)
if re_analysis_decision:
await r2_service_global.save_system_logs_async({ "trade_reanalyzed": True, "symbol": symbol, "action": re_analysis_decision.get('action'), 'strategy': re_analysis_decision.get('strategy', 'GENERIC') })
print(f"โœ… [Re-Analyze] Strategic analysis complete for {symbol}. Decision: {re_analysis_decision.get('action')}")
return {"symbol": symbol, "decision": re_analysis_decision, "current_price": processed_data.get('current_price')}
else: return None
except Exception as error: await r2_service_global.save_system_logs_async({ "reanalysis_error": True, "symbol": symbol, "error": str(error) }); print(f"โŒ Error in re_analyze_open_trade_async for {symbol}: {error}"); traceback.print_exc(); return None
async def run_bot_cycle_async():
"""(Updated to pass decision_context to open_trade)"""
try:
if not await state_manager.wait_for_initialization(): print("โŒ Services not fully initialized - skipping cycle"); return
print("๐Ÿ”„ Starting trading cycle..."); await r2_service_global.save_system_logs_async({"cycle_started": True})
if not r2_service_global.acquire_lock(): print("โŒ Failed to acquire lock - skipping cycle"); return
open_trades = []
try:
open_trades = await trade_manager_global.get_open_trades(); print(f"๐Ÿ“‹ Open trades: {len(open_trades)}")
if open_trades:
now = datetime.now()
trades_to_reanalyze = [t for t in open_trades if now >= datetime.fromisoformat(t.get('expected_target_time', now.isoformat()))]
if trades_to_reanalyze:
print(f"๐Ÿ”„ Re-analyzing {len(trades_to_reanalyze)} trades (using Advanced MC)")
reanalysis_results = await asyncio.gather(*[re_analyze_open_trade_async(trade) for trade in trades_to_reanalyze], return_exceptions=True)
for i, result in enumerate(reanalysis_results):
trade = trades_to_reanalyze[i]
if isinstance(result, Exception): print(f" โŒ Re-analysis failed for {trade.get('symbol')}: {result}")
elif result and result['decision'].get('action') == "CLOSE_TRADE": print(f" โœ… Closing {trade.get('symbol')} based on re-analysis."); await trade_manager_global.close_trade(trade, result['current_price'], 'CLOSED_BY_REANALYSIS');
elif result and result['decision'].get('action') == "UPDATE_TRADE": print(f" โœ… Updating {trade.get('symbol')} based on re-analysis."); await trade_manager_global.update_trade(trade, result['decision'])
elif result: print(f" โ„น๏ธ Holding {trade.get('symbol')} based on re-analysis.")
else: print(f" โš ๏ธ Re-analysis for {trade.get('symbol')} yielded no decision.")
current_open_trades_count = len(await trade_manager_global.get_open_trades())
should_look_for_new_trade = current_open_trades_count == 0
if should_look_for_new_trade:
portfolio_state = await r2_service_global.get_portfolio_state_async(); current_capital = portfolio_state.get("current_capital_usd", 0)
if current_capital > 1:
print("๐ŸŽฏ Looking for new trading opportunities (Dual-Phase MC)...")
best_opportunity = await run_3_layer_analysis()
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
# (Pass the decision_context to open_trade for the Reflector)
if best_opportunity:
print(f"โœ… Opening new trade: {best_opportunity['symbol']}")
symbol = best_opportunity['symbol']
decision = best_opportunity['decision']
price = best_opportunity['current_price']
# (Extract the snapshot saved by run_3_layer_analysis)
context_data = best_opportunity.get('decision_context_data', {})
# (Build the snapshot for the Reflector)
decision_context_snapshot = {
"market": context_data.get('sentiment_data', {}),
"indicators": context_data.get('advanced_indicators', {}),
"pattern": context_data.get('pattern_analysis', {}),
"monte_carlo": context_data.get('monte_carlo_distribution', {}),
"whale": context_data.get('whale_data', {}),
"strategy_scores": context_data.get('base_strategy_scores', {})
}
await trade_manager_global.open_trade(
symbol,
decision,
price,
decision_context=decision_context_snapshot
)
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
else: print("โŒ No suitable trading opportunities found")
else: print("โŒ Insufficient capital to open new trades")
else: print("โ„น๏ธ A trade is already open, skipping new trade search.")
finally:
if r2_service_global.lock_acquired: r2_service_global.release_lock()
await r2_service_global.save_system_logs_async({ "cycle_completed": True, "open_trades": len(open_trades)})
print("โœ… Trading cycle complete")
except Exception as error:
print(f"โŒ Unhandled error in main cycle: {error}"); traceback.print_exc()
await r2_service_global.save_system_logs_async({ "cycle_error": True, "error": str(error) });
if r2_service_global and r2_service_global.lock_acquired: r2_service_global.release_lock()
@asynccontextmanager
async def lifespan(application: FastAPI):
"""Application lifecycle management"""
print("๐Ÿš€ Starting application initialization...")
try:
success = await initialize_services()
if not success: print("โŒ Application initialization failed - shutting down..."); yield; return
asyncio.create_task(monitor_market_async())
asyncio.create_task(trade_manager_global.start_trade_monitoring())
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
# (Start the new periodic distillation task)
asyncio.create_task(run_periodic_distillation())
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
await r2_service_global.save_system_logs_async({"application_started": True})
print("๐ŸŽฏ Application ready - 3-Layer System (Learning Hub Enabled) is active")
print(" -> ๐Ÿ“ˆ Tactical Monitor (Dynamic Exit) is active")
print(" -> ๐Ÿง  Periodic Distillation (Curator) is scheduled")
yield
except Exception as error:
print(f"โŒ Application startup failed: {error}");
traceback.print_exc()
if r2_service_global:
await r2_service_global.save_system_logs_async({ "application_startup_failed": True, "error": str(error) })
raise
finally:
await cleanup_on_shutdown()
application = FastAPI(lifespan=lifespan, title="AI Trading Bot", description="Intelligent Trading System with 3-Layer Analysis and full Operational Learning Hub (Reflexion Architecture)", version="4.0.0")
# (Endpoints remain unchanged)
@application.get("/")
async def root(): return {"message": "Welcome to the AI Trading System", "system": "3-Layer Analysis (Learning Hub Enabled)", "status": "running" if state_manager.initialization_complete else "initializing", "timestamp": datetime.now().isoformat()}
@application.get("/run-cycle")
async def run_cycle_api():
if not state_manager.initialization_complete: raise HTTPException(status_code=503, detail="Services not fully initialized")
asyncio.create_task(run_bot_cycle_async())
return {"message": "Bot cycle initiated (Learning Hub Enabled)", "system": "3-Layer Analysis"}
@application.get("/health")
async def health_check(): return {"status": "healthy" if state_manager.initialization_complete else "initializing", "initialization_complete": state_manager.initialization_complete, "services_initialized": state_manager.services_initialized, "initialization_error": state_manager.initialization_error, "timestamp": datetime.now().isoformat(), "system_architecture": "3-Layer Analysis (Learning Hub V4.0)"}
@application.get("/analyze-market")
async def analyze_market_api():
if not state_manager.initialization_complete: raise HTTPException(status_code=503, detail="Services not fully initialized")
result = await run_3_layer_analysis()
if result: return {"opportunity_found": True, "symbol": result['symbol'], "action": result['decision'].get('action'), "confidence": result['llm_confidence'], "strategy": result['strategy'], "exit_profile": result['decision'].get('exit_profile')}
else: return {"opportunity_found": False, "message": "No suitable opportunities found"}
@application.get("/portfolio")
async def get_portfolio_api():
if not state_manager.initialization_complete: raise HTTPException(status_code=503, detail="Services not fully initialized")
try: portfolio_state = await r2_service_global.get_portfolio_state_async(); open_trades = await trade_manager_global.get_open_trades(); return {"portfolio": portfolio_state, "open_trades": open_trades, "timestamp": datetime.now().isoformat()}
except Exception as e: raise HTTPException(status_code=500, detail=f"Error getting portfolio: {str(e)}")
@application.get("/system-status")
async def get_system_status(): monitoring_status = trade_manager_global.get_monitoring_status() if trade_manager_global else {}; return {"initialization_complete": state_manager.initialization_complete, "services_initialized": state_manager.services_initialized, "initialization_error": state_manager.initialization_error, "market_state_ok": state.MARKET_STATE_OK, "monitoring_status": monitoring_status, "timestamp": datetime.now().isoformat()}
async def cleanup_on_shutdown():
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
global r2_service_global, data_manager_global, trade_manager_global, learning_hub_global
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
print("๐Ÿ›‘ Shutdown signal received. Cleaning up...")
if trade_manager_global: trade_manager_global.stop_monitoring(); print("โœ… Trade monitoring stopped")
# ๐Ÿ”ด --- START OF CHANGE --- ๐Ÿ”ด
# (Call the new hub's shutdown method)
if learning_hub_global and learning_hub_global.initialized:
try:
await learning_hub_global.shutdown()
print("โœ… Learning hub data saved")
except Exception as e: print(f"โŒ Failed to save learning hub data: {e}")
# ๐Ÿ”ด --- END OF CHANGE --- ๐Ÿ”ด
if data_manager_global: await data_manager_global.close(); print("โœ… Data manager closed")
if r2_service_global:
try: await r2_service_global.save_system_logs_async({"application_shutdown": True}); print("โœ… Shutdown log saved")
except Exception as e: print(f"โŒ Failed to save shutdown log: {e}")
if r2_service_global.lock_acquired: r2_service_global.release_lock(); print("โœ… R2 lock released")
def signal_handler(signum, frame): print(f"๐Ÿ›‘ Received signal {signum}. Initiating shutdown..."); asyncio.create_task(cleanup_on_shutdown()); sys.exit(0)
signal.signal(signal.SIGINT, signal_handler); signal.signal(signal.SIGTERM, signal_handler)
if __name__ == "__main__":
print("๐Ÿš€ Starting AI Trading Bot with 3-Layer Analysis System (Learning Hub V4.0)...")
uvicorn.run( application, host="0.0.0.0", port=7860, log_level="info", access_log=True )