Spaces:
Restarting
Restarting
| # app.py (Fully updated to Learning Hub architecture) | |
| import os | |
| import traceback | |
| import signal | |
| import sys | |
| import uvicorn | |
| import asyncio | |
| import json | |
| import time | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, HTTPException | |
| from datetime import datetime | |
| # ุงุณุชูุฑุงุฏ ุงูุฎุฏู ุงุช | |
| try: | |
| from r2 import R2Service | |
| from LLM import LLMService | |
| from data_manager import DataManager | |
| from ml_engine.processor import MLProcessor | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| # (Import the new hub manager instead of the old engine) | |
| from learning_hub.hub_manager import LearningHubManager | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| from sentiment_news import SentimentAnalyzer | |
| from trade_manager import TradeManager | |
| import state # (This is state.py) | |
| from helpers import safe_float_conversion, validate_candidate_data_enhanced | |
| except ImportError as e: | |
| print(f"โ ุฎุทุฃ ูู ุงุณุชูุฑุงุฏ ุงููุญุฏุงุช: {e}") | |
| sys.exit(1) | |
| # ุงูู ุชุบูุฑุงุช ุงูุนุงูู ูุฉ | |
| r2_service_global = None | |
| data_manager_global = None | |
| llm_service_global = None | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| learning_hub_global = None # (Changed from learning_engine_global) | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| trade_manager_global = None | |
| sentiment_analyzer_global = None | |
| symbol_whale_monitor_global = None | |
| class StateManager: | |
| def __init__(self): | |
| self.market_analysis_lock = asyncio.Lock() | |
| self.trade_analysis_lock = asyncio.Lock() | |
| self.initialization_complete = False | |
| self.initialization_error = None | |
| self.services_initialized = { | |
| 'r2_service': False, 'data_manager': False, 'llm_service': False, | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| 'learning_hub': False, # (Changed from learning_engine) | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| 'trade_manager': False, 'sentiment_analyzer': False, | |
| 'symbol_whale_monitor': False | |
| } | |
| async def wait_for_initialization(self, timeout=60): | |
| start_time = time.time() | |
| while not self.initialization_complete and (time.time() - start_time) < timeout: | |
| if self.initialization_error: raise Exception(f"ูุดู ุงูุชููุฆุฉ: {self.initialization_error}") | |
| await asyncio.sleep(2) | |
| if not self.initialization_complete: raise Exception(f"ุงูุชูุช ู ููุฉ ุงูุชููุฆุฉ ({timeout} ุซุงููุฉ)") | |
| return self.initialization_complete | |
| def set_service_initialized(self, service_name): | |
| self.services_initialized[service_name] = True | |
| if all(self.services_initialized.values()): | |
| self.initialization_complete = True | |
| print("๐ฏ ุฌู ูุน ุงูุฎุฏู ุงุช ู ููุฃุฉ ุจุงููุงู ู") | |
| def set_initialization_error(self, error): | |
| self.initialization_error = error | |
| print(f"โ ุฎุทุฃ ูู ุงูุชููุฆุฉ: {error}") | |
| state_manager = StateManager() | |
| async def initialize_services(): | |
| """ุชููุฆุฉ ุฌู ูุน ุงูุฎุฏู ุงุช ุจุดูู ู ููุตู""" | |
| global r2_service_global, data_manager_global, llm_service_global | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| global learning_hub_global # (Changed from learning_engine_global) | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| global trade_manager_global, sentiment_analyzer_global | |
| global symbol_whale_monitor_global | |
| try: | |
| print("๐ ุจุฏุก ุชููุฆุฉ ุงูุฎุฏู ุงุช...") | |
| print(" ๐ ุชููุฆุฉ R2Service..."); r2_service_global = R2Service(); state_manager.set_service_initialized('r2_service'); print(" โ R2Service ู ููุฃุฉ") | |
| print(" ๐ ุฌูุจ ูุงุนุฏุฉ ุจูุงูุงุช ุงูุนููุฏ..."); contracts_database = await r2_service_global.load_contracts_db_async(); print(f" โ ุชู ุชุญู ูู {len(contracts_database)} ุนูุฏ") | |
| print(" ๐ ุชููุฆุฉ ู ุฑุงูุจ ุงูุญูุชุงู..."); | |
| try: | |
| from whale_news_data import EnhancedWhaleMonitor | |
| symbol_whale_monitor_global = EnhancedWhaleMonitor(contracts_database, r2_service_global) | |
| state_manager.set_service_initialized('symbol_whale_monitor'); print(" โ ู ุฑุงูุจ ุงูุญูุชุงู ู ููุฃ") | |
| except Exception as e: print(f" โ ๏ธ ูุดู ุชููุฆุฉ ู ุฑุงูุจ ุงูุญูุชุงู: {e}"); symbol_whale_monitor_global = None | |
| print(" ๐ ุชููุฆุฉ DataManager..."); data_manager_global = DataManager(contracts_database, symbol_whale_monitor_global); await data_manager_global.initialize(); state_manager.set_service_initialized('data_manager'); print(" โ DataManager ู ููุฃุฉ") | |
| print(" ๐ ุชููุฆุฉ LLMService..."); | |
| llm_service_global = LLMService(); | |
| llm_service_global.r2_service = r2_service_global; | |
| print(" ๐ ุชููุฆุฉ ู ุญูู ุงูู ุดุงุนุฑ..."); | |
| sentiment_analyzer_global = SentimentAnalyzer(data_manager_global); | |
| state_manager.set_service_initialized('sentiment_analyzer'); | |
| print(" โ ู ุญูู ุงูู ุดุงุนุฑ ู ููุฃ") | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| # (Initialize the new Learning Hub Manager) | |
| print(" ๐ ุชููุฆุฉ ู ุญูุฑ ุงูุชุนูู (Learning Hub)..."); | |
| learning_hub_global = LearningHubManager( | |
| r2_service=r2_service_global, | |
| llm_service=llm_service_global, | |
| data_manager=data_manager_global | |
| ) | |
| await learning_hub_global.initialize() | |
| state_manager.set_service_initialized('learning_hub'); | |
| print(" โ ู ุญูุฑ ุงูุชุนูู (Hub) ู ููุฃ") | |
| # (Connect the Hub to the LLM service) | |
| llm_service_global.learning_hub = learning_hub_global | |
| state_manager.set_service_initialized('llm_service'); | |
| print(" โ LLMService ู ููุฃุฉ (ูู ุฑุจูุทุฉ ุจู ุญูุฑ ุงูุชุนูู )") | |
| print(" ๐ ุชููุฆุฉ ู ุฏูุฑ ุงูุตููุงุช..."); | |
| # (Pass the new Learning Hub to the Trade Manager) | |
| trade_manager_global = TradeManager( | |
| r2_service_global, | |
| learning_hub_global, # (Passing the new hub) | |
| data_manager_global, | |
| state_manager | |
| ) | |
| state_manager.set_service_initialized('trade_manager'); | |
| print(" โ ู ุฏูุฑ ุงูุตููุงุช ู ููุฃ (ูู ุฑุจูุท ุจู ุญูุฑ ุงูุชุนูู )") | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| print("๐ฏ ุงูุชู ูุช ุชููุฆุฉ ุฌู ูุน ุงูุฎุฏู ุงุช ุจูุฌุงุญ"); return True | |
| except Exception as e: error_msg = f"ูุดู ุชููุฆุฉ ุงูุฎุฏู ุงุช: {str(e)}"; print(f"โ {error_msg}"); state_manager.set_initialization_error(error_msg); return False | |
| async def monitor_market_async(): | |
| """(Unchanged)""" | |
| global data_manager_global, sentiment_analyzer_global | |
| try: | |
| if not await state_manager.wait_for_initialization(): print("โ ูุดู ุชููุฆุฉ ุงูุฎุฏู ุงุช - ุฅููุงู ู ุฑุงูุจุฉ ุงูุณูู"); return | |
| while True: | |
| try: | |
| async with state_manager.market_analysis_lock: | |
| market_context = await sentiment_analyzer_global.get_market_sentiment() | |
| if not market_context: state.MARKET_STATE_OK = True; await asyncio.sleep(60); continue | |
| bitcoin_sentiment = market_context.get('btc_sentiment') | |
| fear_greed_index = market_context.get('fear_and_greed_index') | |
| should_halt_trading, halt_reason = False, "" | |
| if bitcoin_sentiment == 'BEARISH' and (fear_greed_index is not None and fear_greed_index < 30): should_halt_trading, halt_reason = True, "ุธุฑูู ุณูู ูุงุจุทุฉ" | |
| if should_halt_trading: state.MARKET_STATE_OK = False; await r2_service_global.save_system_logs_async({"market_halt": True, "reason": halt_reason}) | |
| else: | |
| if not state.MARKET_STATE_OK: print("โ ุชุญุณูุช ุธุฑูู ุงูุณูู. ุงุณุชุฆูุงู ุงูุนู ููุงุช ุงูุนุงุฏูุฉ.") | |
| state.MARKET_STATE_OK = True | |
| await asyncio.sleep(60) | |
| except Exception as error: print(f"โ ุฎุทุฃ ุฃุซูุงุก ู ุฑุงูุจุฉ ุงูุณูู: {error}"); state.MARKET_STATE_OK = True; await asyncio.sleep(60) | |
| except Exception as e: print(f"โ ูุดู ุชุดุบูู ู ุฑุงูุจุฉ ุงูุณูู: {e}") | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| # (New background task for periodic distillation - Point 6) | |
| async def run_periodic_distillation(): | |
| """ | |
| Runs the Learning Hub's distillation process periodically. | |
| """ | |
| print("background task: Periodic Distillation (Curator) scheduled.") | |
| await asyncio.sleep(300) # (Wait 5 minutes after startup) | |
| while True: | |
| try: | |
| if not await state_manager.wait_for_initialization(): | |
| await asyncio.sleep(60) | |
| continue | |
| print("๐ [Scheduler] Running periodic distillation check...") | |
| await learning_hub_global.run_distillation_check() | |
| # (Run every 6 hours) | |
| await asyncio.sleep(6 * 60 * 60) | |
| except Exception as e: | |
| print(f"โ [Scheduler] Error in periodic distillation task: {e}") | |
| traceback.print_exc() | |
| await asyncio.sleep(60 * 60) # (Wait 1 hour on error) | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| async def process_batch_parallel(batch, ml_processor, batch_num, total_batches, preloaded_whale_data): | |
| """(Unchanged)""" | |
| try: | |
| batch_tasks = [] | |
| for symbol_data in batch: | |
| task = asyncio.create_task(ml_processor.process_multiple_symbols_parallel([symbol_data], preloaded_whale_data)) | |
| batch_tasks.append(task) | |
| batch_results_list_of_lists = await asyncio.gather(*batch_tasks, return_exceptions=True) | |
| successful_results = [] | |
| low_score_results = [] | |
| failed_results = [] | |
| for i, result_list in enumerate(batch_results_list_of_lists): | |
| symbol = batch[i].get('symbol', 'unknown') | |
| if isinstance(result_list, Exception): | |
| failed_results.append({"symbol": symbol, "error": f"Task Execution Error: {str(result_list)}"}) | |
| continue | |
| if result_list: | |
| result = result_list[0] | |
| if isinstance(result, dict): | |
| if result.get('enhanced_final_score', 0) > 0.4: | |
| successful_results.append(result) | |
| else: | |
| low_score_results.append(result) | |
| else: | |
| failed_results.append({"symbol": symbol, "error": f"ML processor returned invalid type: {type(result)}"}) | |
| else: | |
| failed_results.append({"symbol": symbol, "error": "ML processing returned None or empty list"}) | |
| return {'success': successful_results, 'low_score': low_score_results, 'failures': failed_results} | |
| except Exception as error: | |
| print(f"โ [Consumer] Error processing batch {batch_num}: {error}") | |
| return {'success': [], 'low_score': [], 'failures': []} | |
| async def run_3_layer_analysis(): | |
| """ | |
| (Updated to pass LearningHub to MLProcessor and save context for Reflector) | |
| """ | |
| layer1_candidates = [] | |
| layer2_candidates = [] | |
| all_low_score_candidates = [] | |
| all_failed_candidates = [] | |
| final_layer2_candidates = [] | |
| final_opportunities = [] | |
| preloaded_whale_data_dict = {} | |
| try: | |
| print("๐ฏ Starting 3-Layer Analysis (with Learning Hub integration)...") | |
| if not await state_manager.wait_for_initialization(): print("โ Services not fully initialized"); return None | |
| # (Layer 1 - Unchanged) | |
| print("\n๐ Layer 1: Rapid Screening (data_manager)...") | |
| layer1_candidates = await data_manager_global.layer1_rapid_screening() | |
| if not layer1_candidates: print("โ No candidates found in Layer 1"); return None | |
| print(f"โ Selected {len(layer1_candidates)} symbols for Layer 2") | |
| layer1_symbols = [c['symbol'] for c in layer1_candidates] | |
| # (Layer 1.5 - Unchanged) | |
| start_whale_fetch = time.time() | |
| print(f"\n๐ Layer 1.5: Fetching whale data for {len(layer1_symbols)} symbols (async)...") | |
| async def fetch_whale_data_task(symbols, results_dict): | |
| WHALE_FETCH_CONCURRENCY = 3 | |
| semaphore = asyncio.Semaphore(WHALE_FETCH_CONCURRENCY) | |
| tasks = [] | |
| async def get_data_with_semaphore(symbol): | |
| async with semaphore: | |
| try: | |
| data = await data_manager_global.get_whale_data_for_symbol(symbol) | |
| if data: results_dict[symbol] = data | |
| except Exception as e: | |
| results_dict[symbol] = {'data_available': False, 'error': str(e)} | |
| for symbol in symbols: tasks.append(asyncio.create_task(get_data_with_semaphore(symbol))) | |
| await asyncio.gather(*tasks) | |
| whale_fetcher_task = asyncio.create_task(fetch_whale_data_task(layer1_symbols, preloaded_whale_data_dict)) | |
| # (Producer/Consumer Setup - Unchanged) | |
| DATA_QUEUE_MAX_SIZE = 2 | |
| ohlcv_data_queue = asyncio.Queue(maxsize=DATA_QUEUE_MAX_SIZE) | |
| ml_results_list = [] | |
| market_context = await data_manager_global.get_market_context_async() | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| # (Pass the global Learning Hub to the MLProcessor) | |
| ml_processor = MLProcessor(market_context, data_manager_global, learning_hub_global) | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| batch_size = 15 | |
| total_batches = (len(layer1_candidates) + batch_size - 1) // batch_size | |
| # (ML Consumer Task - Unchanged) | |
| async def ml_consumer_task(queue: asyncio.Queue, results_list: list, whale_data_store: dict): | |
| batch_num = 0 | |
| while True: | |
| try: | |
| batch_data = await queue.get() | |
| if batch_data is None: queue.task_done(); break | |
| batch_num += 1 | |
| batch_results_dict = await process_batch_parallel( | |
| batch_data, ml_processor, batch_num, total_batches, whale_data_store | |
| ) | |
| results_list.append(batch_results_dict) | |
| queue.task_done() | |
| except Exception as e: print(f"โ [ML Consumer] Fatal Error: {e}"); traceback.print_exc(); queue.task_done() | |
| # (Run Producer/Consumer - Unchanged) | |
| consumer_task = asyncio.create_task(ml_consumer_task(ohlcv_data_queue, ml_results_list, preloaded_whale_data_dict)) | |
| producer_task = asyncio.create_task(data_manager_global.stream_ohlcv_data(layer1_symbols, ohlcv_data_queue)) | |
| await producer_task; | |
| await ohlcv_data_queue.put(None) | |
| await ohlcv_data_queue.join() | |
| await consumer_task; | |
| # (Wait for Whale Fetch - Unchanged) | |
| WHALE_FETCH_TIMEOUT_SECONDS = 180 | |
| try: | |
| await asyncio.wait_for(whale_fetcher_task, timeout=WHALE_FETCH_TIMEOUT_SECONDS) | |
| except asyncio.TimeoutError: | |
| print(f" โ ๏ธ Whale fetch timeout ({WHALE_FETCH_TIMEOUT_SECONDS}s)!") | |
| except Exception as whale_task_err: | |
| print(f" โ Whale fetch task error: {whale_task_err}") | |
| # (Aggregate Results - Unchanged) | |
| print("๐ Aggregating all results...") | |
| for batch_result in ml_results_list: | |
| for success_item in batch_result['success']: | |
| symbol = success_item['symbol'] | |
| l1_data = next((c for c in layer1_candidates if c['symbol'] == symbol), None) | |
| if l1_data: | |
| success_item['reasons_for_candidacy'] = l1_data.get('reasons', []) | |
| success_item['layer1_score'] = l1_data.get('layer1_score', 0) | |
| if symbol in preloaded_whale_data_dict: success_item['whale_data'] = preloaded_whale_data_dict[symbol] | |
| elif 'whale_data' not in success_item: success_item['whale_data'] = {'data_available': False, 'reason': 'Fetch timed out or failed'} | |
| layer2_candidates.append(success_item) | |
| all_low_score_candidates.extend(batch_result['low_score']) | |
| all_failed_candidates.extend(batch_result['failures']) | |
| if not layer2_candidates: print("โ No candidates found in Layer 2") | |
| # (Sort and Filter - Unchanged) | |
| layer2_candidates.sort(key=lambda x: x.get('enhanced_final_score', 0), reverse=True) | |
| target_count = min(10, len(layer2_candidates)) | |
| final_layer2_candidates = layer2_candidates[:target_count] | |
| # (Layer 2.5: Advanced MC - Unchanged) | |
| print(f"\n๐ฌ Layer 2.5: Running Advanced MC (GARCH+LGBM) on top {len(final_layer2_candidates)} candidates...") | |
| advanced_mc_analyzer = ml_processor.monte_carlo_analyzer | |
| updated_candidates_for_llm = [] | |
| for candidate in final_layer2_candidates: | |
| symbol = candidate.get('symbol', 'UNKNOWN') | |
| try: | |
| advanced_mc_results = await advanced_mc_analyzer.generate_1h_distribution_advanced( | |
| candidate.get('ohlcv') | |
| ) | |
| if advanced_mc_results and advanced_mc_results.get('simulation_model') == 'Phase2_GARCH_LGBM': | |
| candidate['monte_carlo_distribution'] = advanced_mc_results | |
| candidate['monte_carlo_probability'] = advanced_mc_results.get('probability_of_gain', 0) | |
| candidate['advanced_mc_run'] = True | |
| else: | |
| candidate['advanced_mc_run'] = False | |
| updated_candidates_for_llm.append(candidate) | |
| except Exception as e: | |
| print(f" โ [Advanced MC] {symbol} - Error: {e}. Using Phase 1 results.") | |
| candidate['advanced_mc_run'] = False | |
| updated_candidates_for_llm.append(candidate) | |
| final_layer2_candidates = updated_candidates_for_llm | |
| await r2_service_global.save_candidates_async(final_layer2_candidates) | |
| # (Printing results omitted for brevity) | |
| # (Layer 3 - LLM) | |
| print("\n๐ง Layer 3: LLM Service Analysis...") | |
| for candidate in final_layer2_candidates: | |
| try: | |
| symbol = candidate['symbol'] | |
| ohlcv_data = candidate.get('ohlcv'); | |
| if not ohlcv_data: continue | |
| candidate['raw_ohlcv'] = ohlcv_data | |
| total_candles = sum(len(data) for data in ohlcv_data.values()) if ohlcv_data else 0 | |
| if total_candles < 30: continue | |
| # (Add latest market context for LLM) | |
| candidate['sentiment_data'] = await data_manager_global.get_market_context_async() | |
| llm_analysis = await llm_service_global.get_trading_decision(candidate) | |
| if llm_analysis and llm_analysis.get('action') in ['BUY']: | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| # (Save the 'candidate' object, which is the full context for the Reflector) | |
| opportunity={ | |
| 'symbol': symbol, | |
| 'current_price': candidate.get('current_price', 0), | |
| 'decision': llm_analysis, | |
| 'enhanced_score': candidate.get('enhanced_final_score', 0), | |
| 'llm_confidence': llm_analysis.get('confidence_level', 0), | |
| 'strategy': llm_analysis.get('strategy', 'GENERIC'), | |
| 'analysis_timestamp': datetime.now().isoformat(), | |
| 'decision_context_data': candidate # (This is the snapshot) | |
| } | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| final_opportunities.append(opportunity) | |
| print(f" โ {symbol}: {llm_analysis.get('action')} - Conf: {llm_analysis.get('confidence_level', 0):.2f}") | |
| else: | |
| action = llm_analysis.get('action', 'NO_DECISION') if llm_analysis else 'NO_RESPONSE'; print(f" โ ๏ธ {symbol}: No trade decision from LLM ({action})") | |
| except Exception as e: print(f"โ Error in LLM analysis for {candidate.get('symbol')}: {e}"); traceback.print_exc(); continue | |
| if final_opportunities: | |
| final_opportunities.sort(key=lambda x: (x['llm_confidence'] + x['enhanced_score']) / 2, reverse=True) | |
| # (Audit log saving omitted for brevity - unchanged) | |
| if not final_opportunities: print("โ No suitable trading opportunities found"); return None | |
| return final_opportunities[0] if final_opportunities else None | |
| except Exception as error: | |
| print(f"โ Fatal error in 3-layer system: {error}"); traceback.print_exc() | |
| return None | |
| async def re_analyze_open_trade_async(trade_data): | |
| """(Updated to pass LearningHub to MLProcessor)""" | |
| symbol = trade_data.get('symbol') | |
| try: | |
| async with state_manager.trade_analysis_lock: | |
| print(f"๐ [Re-Analyze] Starting strategic analysis for {symbol}...") | |
| market_context = await data_manager_global.get_market_context_async() | |
| ohlcv_data_list = [] | |
| temp_queue = asyncio.Queue() | |
| await data_manager_global.stream_ohlcv_data([symbol], temp_queue) | |
| while True: | |
| try: | |
| batch = await asyncio.wait_for(temp_queue.get(), timeout=1.0) | |
| if batch is None: temp_queue.task_done(); break | |
| ohlcv_data_list.extend(batch) | |
| temp_queue.task_done() | |
| except asyncio.TimeoutError: | |
| if temp_queue.empty(): break | |
| except Exception: break | |
| if not ohlcv_data_list: print(f"โ ๏ธ Failed to get re-analysis data for {symbol}"); return None | |
| ohlcv_data = ohlcv_data_list[0] | |
| l1_data = await data_manager_global._get_detailed_symbol_data(symbol) | |
| if l1_data: ohlcv_data.update(l1_data); ohlcv_data['reasons_for_candidacy'] = ['re-analysis'] | |
| re_analysis_whale_data = await data_manager_global.get_whale_data_for_symbol(symbol) | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| # (Pass the global Learning Hub to the MLProcessor) | |
| ml_processor = MLProcessor(market_context, data_manager_global, learning_hub_global) | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| print(f"๐ [Re-Analyze] Using Advanced MC (Phase 2+3) for {symbol}...") | |
| advanced_mc_results = await ml_processor.monte_carlo_analyzer.generate_1h_distribution_advanced( | |
| ohlcv_data.get('ohlcv') | |
| ) | |
| processed_data = await ml_processor.process_and_score_symbol_enhanced(ohlcv_data, {symbol: re_analysis_whale_data} if re_analysis_whale_data else {}) | |
| if not processed_data: return None | |
| if advanced_mc_results: | |
| processed_data['monte_carlo_distribution'] = advanced_mc_results | |
| processed_data['monte_carlo_probability'] = advanced_mc_results.get('probability_of_gain', 0) | |
| processed_data['raw_ohlcv'] = ohlcv_data.get('raw_ohlcv') or ohlcv_data.get('ohlcv') | |
| processed_data['ohlcv'] = processed_data['raw_ohlcv'] | |
| processed_data['sentiment_data'] = market_context | |
| re_analysis_decision = await llm_service_global.re_analyze_trade_async(trade_data, processed_data) | |
| if re_analysis_decision: | |
| await r2_service_global.save_system_logs_async({ "trade_reanalyzed": True, "symbol": symbol, "action": re_analysis_decision.get('action'), 'strategy': re_analysis_decision.get('strategy', 'GENERIC') }) | |
| print(f"โ [Re-Analyze] Strategic analysis complete for {symbol}. Decision: {re_analysis_decision.get('action')}") | |
| return {"symbol": symbol, "decision": re_analysis_decision, "current_price": processed_data.get('current_price')} | |
| else: return None | |
| except Exception as error: await r2_service_global.save_system_logs_async({ "reanalysis_error": True, "symbol": symbol, "error": str(error) }); print(f"โ Error in re_analyze_open_trade_async for {symbol}: {error}"); traceback.print_exc(); return None | |
| async def run_bot_cycle_async(): | |
| """(Updated to pass decision_context to open_trade)""" | |
| try: | |
| if not await state_manager.wait_for_initialization(): print("โ Services not fully initialized - skipping cycle"); return | |
| print("๐ Starting trading cycle..."); await r2_service_global.save_system_logs_async({"cycle_started": True}) | |
| if not r2_service_global.acquire_lock(): print("โ Failed to acquire lock - skipping cycle"); return | |
| open_trades = [] | |
| try: | |
| open_trades = await trade_manager_global.get_open_trades(); print(f"๐ Open trades: {len(open_trades)}") | |
| if open_trades: | |
| now = datetime.now() | |
| trades_to_reanalyze = [t for t in open_trades if now >= datetime.fromisoformat(t.get('expected_target_time', now.isoformat()))] | |
| if trades_to_reanalyze: | |
| print(f"๐ Re-analyzing {len(trades_to_reanalyze)} trades (using Advanced MC)") | |
| reanalysis_results = await asyncio.gather(*[re_analyze_open_trade_async(trade) for trade in trades_to_reanalyze], return_exceptions=True) | |
| for i, result in enumerate(reanalysis_results): | |
| trade = trades_to_reanalyze[i] | |
| if isinstance(result, Exception): print(f" โ Re-analysis failed for {trade.get('symbol')}: {result}") | |
| elif result and result['decision'].get('action') == "CLOSE_TRADE": print(f" โ Closing {trade.get('symbol')} based on re-analysis."); await trade_manager_global.close_trade(trade, result['current_price'], 'CLOSED_BY_REANALYSIS'); | |
| elif result and result['decision'].get('action') == "UPDATE_TRADE": print(f" โ Updating {trade.get('symbol')} based on re-analysis."); await trade_manager_global.update_trade(trade, result['decision']) | |
| elif result: print(f" โน๏ธ Holding {trade.get('symbol')} based on re-analysis.") | |
| else: print(f" โ ๏ธ Re-analysis for {trade.get('symbol')} yielded no decision.") | |
| current_open_trades_count = len(await trade_manager_global.get_open_trades()) | |
| should_look_for_new_trade = current_open_trades_count == 0 | |
| if should_look_for_new_trade: | |
| portfolio_state = await r2_service_global.get_portfolio_state_async(); current_capital = portfolio_state.get("current_capital_usd", 0) | |
| if current_capital > 1: | |
| print("๐ฏ Looking for new trading opportunities (Dual-Phase MC)...") | |
| best_opportunity = await run_3_layer_analysis() | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| # (Pass the decision_context to open_trade for the Reflector) | |
| if best_opportunity: | |
| print(f"โ Opening new trade: {best_opportunity['symbol']}") | |
| symbol = best_opportunity['symbol'] | |
| decision = best_opportunity['decision'] | |
| price = best_opportunity['current_price'] | |
| # (Extract the snapshot saved by run_3_layer_analysis) | |
| context_data = best_opportunity.get('decision_context_data', {}) | |
| # (Build the snapshot for the Reflector) | |
| decision_context_snapshot = { | |
| "market": context_data.get('sentiment_data', {}), | |
| "indicators": context_data.get('advanced_indicators', {}), | |
| "pattern": context_data.get('pattern_analysis', {}), | |
| "monte_carlo": context_data.get('monte_carlo_distribution', {}), | |
| "whale": context_data.get('whale_data', {}), | |
| "strategy_scores": context_data.get('base_strategy_scores', {}) | |
| } | |
| await trade_manager_global.open_trade( | |
| symbol, | |
| decision, | |
| price, | |
| decision_context=decision_context_snapshot | |
| ) | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| else: print("โ No suitable trading opportunities found") | |
| else: print("โ Insufficient capital to open new trades") | |
| else: print("โน๏ธ A trade is already open, skipping new trade search.") | |
| finally: | |
| if r2_service_global.lock_acquired: r2_service_global.release_lock() | |
| await r2_service_global.save_system_logs_async({ "cycle_completed": True, "open_trades": len(open_trades)}) | |
| print("โ Trading cycle complete") | |
| except Exception as error: | |
| print(f"โ Unhandled error in main cycle: {error}"); traceback.print_exc() | |
| await r2_service_global.save_system_logs_async({ "cycle_error": True, "error": str(error) }); | |
| if r2_service_global and r2_service_global.lock_acquired: r2_service_global.release_lock() | |
| async def lifespan(application: FastAPI): | |
| """Application lifecycle management""" | |
| print("๐ Starting application initialization...") | |
| try: | |
| success = await initialize_services() | |
| if not success: print("โ Application initialization failed - shutting down..."); yield; return | |
| asyncio.create_task(monitor_market_async()) | |
| asyncio.create_task(trade_manager_global.start_trade_monitoring()) | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| # (Start the new periodic distillation task) | |
| asyncio.create_task(run_periodic_distillation()) | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| await r2_service_global.save_system_logs_async({"application_started": True}) | |
| print("๐ฏ Application ready - 3-Layer System (Learning Hub Enabled) is active") | |
| print(" -> ๐ Tactical Monitor (Dynamic Exit) is active") | |
| print(" -> ๐ง Periodic Distillation (Curator) is scheduled") | |
| yield | |
| except Exception as error: | |
| print(f"โ Application startup failed: {error}"); | |
| traceback.print_exc() | |
| if r2_service_global: | |
| await r2_service_global.save_system_logs_async({ "application_startup_failed": True, "error": str(error) }) | |
| raise | |
| finally: | |
| await cleanup_on_shutdown() | |
| application = FastAPI(lifespan=lifespan, title="AI Trading Bot", description="Intelligent Trading System with 3-Layer Analysis and full Operational Learning Hub (Reflexion Architecture)", version="4.0.0") | |
| # (Endpoints remain unchanged) | |
| async def root(): return {"message": "Welcome to the AI Trading System", "system": "3-Layer Analysis (Learning Hub Enabled)", "status": "running" if state_manager.initialization_complete else "initializing", "timestamp": datetime.now().isoformat()} | |
| async def run_cycle_api(): | |
| if not state_manager.initialization_complete: raise HTTPException(status_code=503, detail="Services not fully initialized") | |
| asyncio.create_task(run_bot_cycle_async()) | |
| return {"message": "Bot cycle initiated (Learning Hub Enabled)", "system": "3-Layer Analysis"} | |
| async def health_check(): return {"status": "healthy" if state_manager.initialization_complete else "initializing", "initialization_complete": state_manager.initialization_complete, "services_initialized": state_manager.services_initialized, "initialization_error": state_manager.initialization_error, "timestamp": datetime.now().isoformat(), "system_architecture": "3-Layer Analysis (Learning Hub V4.0)"} | |
| async def analyze_market_api(): | |
| if not state_manager.initialization_complete: raise HTTPException(status_code=503, detail="Services not fully initialized") | |
| result = await run_3_layer_analysis() | |
| if result: return {"opportunity_found": True, "symbol": result['symbol'], "action": result['decision'].get('action'), "confidence": result['llm_confidence'], "strategy": result['strategy'], "exit_profile": result['decision'].get('exit_profile')} | |
| else: return {"opportunity_found": False, "message": "No suitable opportunities found"} | |
| async def get_portfolio_api(): | |
| if not state_manager.initialization_complete: raise HTTPException(status_code=503, detail="Services not fully initialized") | |
| try: portfolio_state = await r2_service_global.get_portfolio_state_async(); open_trades = await trade_manager_global.get_open_trades(); return {"portfolio": portfolio_state, "open_trades": open_trades, "timestamp": datetime.now().isoformat()} | |
| except Exception as e: raise HTTPException(status_code=500, detail=f"Error getting portfolio: {str(e)}") | |
| async def get_system_status(): monitoring_status = trade_manager_global.get_monitoring_status() if trade_manager_global else {}; return {"initialization_complete": state_manager.initialization_complete, "services_initialized": state_manager.services_initialized, "initialization_error": state_manager.initialization_error, "market_state_ok": state.MARKET_STATE_OK, "monitoring_status": monitoring_status, "timestamp": datetime.now().isoformat()} | |
| async def cleanup_on_shutdown(): | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| global r2_service_global, data_manager_global, trade_manager_global, learning_hub_global | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| print("๐ Shutdown signal received. Cleaning up...") | |
| if trade_manager_global: trade_manager_global.stop_monitoring(); print("โ Trade monitoring stopped") | |
| # ๐ด --- START OF CHANGE --- ๐ด | |
| # (Call the new hub's shutdown method) | |
| if learning_hub_global and learning_hub_global.initialized: | |
| try: | |
| await learning_hub_global.shutdown() | |
| print("โ Learning hub data saved") | |
| except Exception as e: print(f"โ Failed to save learning hub data: {e}") | |
| # ๐ด --- END OF CHANGE --- ๐ด | |
| if data_manager_global: await data_manager_global.close(); print("โ Data manager closed") | |
| if r2_service_global: | |
| try: await r2_service_global.save_system_logs_async({"application_shutdown": True}); print("โ Shutdown log saved") | |
| except Exception as e: print(f"โ Failed to save shutdown log: {e}") | |
| if r2_service_global.lock_acquired: r2_service_global.release_lock(); print("โ R2 lock released") | |
| def signal_handler(signum, frame): print(f"๐ Received signal {signum}. Initiating shutdown..."); asyncio.create_task(cleanup_on_shutdown()); sys.exit(0) | |
| signal.signal(signal.SIGINT, signal_handler); signal.signal(signal.SIGTERM, signal_handler) | |
| if __name__ == "__main__": | |
| print("๐ Starting AI Trading Bot with 3-Layer Analysis System (Learning Hub V4.0)...") | |
| uvicorn.run( application, host="0.0.0.0", port=7860, log_level="info", access_log=True ) |