Trad / app.py
Riy777's picture
Update app.py
0905088
raw
history blame
40.4 kB
# app.py (محدث)
import os
import traceback
import signal
import sys
import uvicorn
import asyncio
import json
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from datetime import datetime
# استيراد الخدمات
try:
from r2 import R2Service
from LLM import LLMService
from data_manager import DataManager
from ml_engine.processor import MLProcessor # 🔴 تم تعديل هذا السطر
from learning_engine import LearningEngine
from sentiment_news import SentimentAnalyzer
from trade_manager import TradeManager
import state
from helpers import safe_float_conversion, validate_candidate_data_enhanced
except ImportError as e:
print(f"❌ خطأ في استيراد الوحدات: {e}")
sys.exit(1)
# المتغيرات العالمية
r2_service_global = None
data_manager_global = None
llm_service_global = None
learning_engine_global = None
trade_manager_global = None
sentiment_analyzer_global = None
symbol_whale_monitor_global = None
class StateManager:
def __init__(self):
self.market_analysis_lock = asyncio.Lock()
self.trade_analysis_lock = asyncio.Lock()
self.initialization_complete = False
self.initialization_error = None
self.services_initialized = {
'r2_service': False,
'data_manager': False,
'llm_service': False,
'learning_engine': False,
'trade_manager': False,
'sentiment_analyzer': False,
'symbol_whale_monitor': False
}
async def wait_for_initialization(self, timeout=60):
start_time = time.time()
while not self.initialization_complete and (time.time() - start_time) < timeout:
if self.initialization_error:
raise Exception(f"فشل التهيئة: {self.initialization_error}")
await asyncio.sleep(2)
if not self.initialization_complete:
raise Exception(f"انتهت مهلة التهيئة ({timeout} ثانية)")
return self.initialization_complete
def set_service_initialized(self, service_name):
self.services_initialized[service_name] = True
if all(self.services_initialized.values()):
self.initialization_complete = True
print("🎯 جميع الخدمات مهيأة بالكامل")
def set_initialization_error(self, error):
self.initialization_error = error
print(f"❌ خطأ في التهيئة: {error}")
state_manager = StateManager()
async def initialize_services():
"""تهيئة جميع الخدمات بشكل منفصل"""
global r2_service_global, data_manager_global, llm_service_global
global learning_engine_global, trade_manager_global, sentiment_analyzer_global
global symbol_whale_monitor_global
try:
print("🚀 بدء تهيئة الخدمات...")
# 1. تهيئة R2Service أولاً
print(" 🔄 تهيئة R2Service...")
r2_service_global = R2Service()
state_manager.set_service_initialized('r2_service')
print(" ✅ R2Service مهيأة")
# 2. تحميل قاعدة بيانات العقود
print(" 🔄 جلب قاعدة بيانات العقود...")
contracts_database = await r2_service_global.load_contracts_db_async()
print(f" ✅ تم تحميل {len(contracts_database)} عقد")
# 3. تهيئة مراقب الحيتان
print(" 🔄 تهيئة مراقب الحيتان...")
try:
from whale_news_data import EnhancedWhaleMonitor
symbol_whale_monitor_global = EnhancedWhaleMonitor(contracts_database, r2_service_global)
state_manager.set_service_initialized('symbol_whale_monitor')
print(" ✅ مراقب الحيتان مهيأ")
except Exception as e:
print(f" ⚠️ فشل تهيئة مراقب الحيتان: {e}")
symbol_whale_monitor_global = None
# 4. تهيئة DataManager
print(" 🔄 تهيئة DataManager...")
data_manager_global = DataManager(contracts_database, symbol_whale_monitor_global)
await data_manager_global.initialize()
state_manager.set_service_initialized('data_manager')
print(" ✅ DataManager مهيأ")
# 5. تهيئة LLMService
print(" 🔄 تهيئة LLMService...")
llm_service_global = LLMService()
llm_service_global.r2_service = r2_service_global
state_manager.set_service_initialized('llm_service')
print(" ✅ LLMService مهيأ")
# 6. تهيئة محلل المشاعر
print(" 🔄 تهيئة محلل المشاعر...")
sentiment_analyzer_global = SentimentAnalyzer(data_manager_global)
state_manager.set_service_initialized('sentiment_analyzer')
print(" ✅ محلل المشاعر مهيأ")
# 7. تهيئة محرك التعلم
print(" 🔄 تهيئة محرك التعلم...")
learning_engine_global = LearningEngine(r2_service_global, data_manager_global)
await learning_engine_global.initialize_enhanced()
state_manager.set_service_initialized('learning_engine')
print(" ✅ محرك التعلم مهيأ")
# 8. تهيئة مدير الصفقات
print(" 🔄 تهيئة مدير الصفقات...")
trade_manager_global = TradeManager(r2_service_global, learning_engine_global, data_manager_global)
state_manager.set_service_initialized('trade_manager')
print(" ✅ مدير الصفقات مهيأ")
print("🎯 اكتملت تهيئة جميع الخدمات بنجاح")
return True
except Exception as e:
error_msg = f"فشل تهيئة الخدمات: {str(e)}"
print(f"❌ {error_msg}")
state_manager.set_initialization_error(error_msg)
return False
async def monitor_market_async():
"""مراقبة السوق"""
global data_manager_global, sentiment_analyzer_global
try:
if not await state_manager.wait_for_initialization():
print("❌ فشل تهيئة الخدمات - إيقاف مراقبة السوق")
return
while True:
try:
async with state_manager.market_analysis_lock:
market_context = await sentiment_analyzer_global.get_market_sentiment()
if not market_context:
state.MARKET_STATE_OK = True
await asyncio.sleep(60)
continue
bitcoin_sentiment = market_context.get('btc_sentiment')
fear_greed_index = market_context.get('fear_and_greed_index')
should_halt_trading, halt_reason = False, ""
if bitcoin_sentiment == 'BEARISH' and (fear_greed_index is not None and fear_greed_index < 30):
should_halt_trading, halt_reason = True, "ظروف سوق هابطة"
if should_halt_trading:
state.MARKET_STATE_OK = False
await r2_service_global.save_system_logs_async({"market_halt": True, "reason": halt_reason})
else:
if not state.MARKET_STATE_OK:
print("✅ تحسنت ظروف السوق. استئناف العمليات العادية.")
state.MARKET_STATE_OK = True
await asyncio.sleep(60)
except Exception as error:
print(f"❌ خطأ أثناء مراقبة السوق: {error}")
state.MARKET_STATE_OK = True
await asyncio.sleep(60)
except Exception as e:
print(f"❌ فشل تشغيل مراقبة السوق: {e}")
#
# 🔴 تم التعديل: الدالة الآن ترجع قاموساً مفصلاً بدلاً من قائمة واحدة
#
async def process_batch_parallel(batch, ml_processor, batch_num, total_batches):
"""معالجة دفعة من الرموز بشكل متوازي وإرجاع نتائج مفصلة"""
try:
print(f" 🔄 معالجة الدفعة {batch_num}/{total_batches} ({len(batch)} عملة)...")
# إنشاء مهام للدفعة الحالية
batch_tasks = []
for symbol_data in batch:
task = asyncio.create_task(ml_processor.process_and_score_symbol_enhanced(symbol_data))
batch_tasks.append(task)
# انتظار انتهاء جميع مهام الدفعة الحالية
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
#
# 🔴 تم التعديل: تصفية النتائج إلى ثلاث فئات
#
successful_results = []
low_score_results = []
failed_results = []
for i, result in enumerate(batch_results):
symbol = batch[i].get('symbol', 'unknown') # جلب الرمز من بيانات الدفعة الأصلية
if isinstance(result, Exception):
# فشل على مستوى المهمة (مثل Timeout)
failed_results.append({"symbol": symbol, "error": f"Task Execution Error: {str(result)}"})
elif result is None:
# فشل المعالجة داخل ML.py (سيرجع None)
failed_results.append({"symbol": symbol, "error": "ML.py processing returned None (Check logs for internal error)"})
elif result.get('enhanced_final_score', 0) > 0.4:
# نجاح - درجة عالية
successful_results.append(result)
else:
# نجاح - درجة منخفضة
low_score_results.append(result)
print(f" ✅ اكتملت الدفعة {batch_num}: {len(successful_results)} نجاح | {len(low_score_results)} منخفض | {len(failed_results)} فشل")
# إرجاع قاموس مفصل
return {
'success': successful_results,
'low_score': low_score_results,
'failures': failed_results
}
except Exception as error:
print(f"❌ خطأ في معالجة الدفعة {batch_num}: {error}")
# إرجاع هيكل فارغ في حالة فشل الدفعة بالكامل
return {'success': [], 'low_score': [], 'failures': []}
async def run_3_layer_analysis():
"""
تشغيل النظام الطبقي المكون من 3 طبقات:
الطبقة 1: data_manager - الفحص السريع
الطبقة 2: MLProcessor - التحليل المتقدم
الطبقة 3: LLMService - النموذج الضخم
"""
# 🔴 تعريف متغيرات السجل في بداية الدالة
layer1_candidates = []
layer2_candidates = []
all_low_score_candidates = []
all_failed_candidates = []
final_layer2_candidates = []
final_opportunities = []
try:
print("🎯 بدء النظام الطبقي المكون من 3 طبقات...")
if not await state_manager.wait_for_initialization():
print("❌ الخدمات غير مهيأة بالكامل")
return None
# الطبقة 1: الفحص السريع لجميع العملات
print("\n🔍 الطبقة 1: الفحص السريع (data_manager)...")
layer1_candidates = await data_manager_global.layer1_rapid_screening()
if not layer1_candidates:
print("❌ لم يتم العثور على مرشحين في الطبقة 1")
return None
print(f"✅ تم اختيار {len(layer1_candidates)} عملة للطبقة 2")
# جلب بيانات OHLCV كاملة للمرشحين
layer1_symbols = [candidate['symbol'] for candidate in layer1_candidates]
ohlcv_data_list = await data_manager_global.get_ohlcv_data_for_symbols(layer1_symbols)
if not ohlcv_data_list:
print("❌ فشل جلب بيانات OHLCV للمرشحين")
return None
print(f"📊 تم جلب بيانات OHLCV لـ {len(ohlcv_data_list)} عملة بنجاح")
# الطبقة 2: التحليل المتقدم بشكل متوازي حقيقي
print(f"\n📈 الطبقة 2: التحليل المتقدم (MLProcessor) بشكل متوازي لـ {len(ohlcv_data_list)} عملة...")
market_context = await data_manager_global.get_market_context_async()
# إنشاء معالج ML
ml_processor = MLProcessor(market_context, data_manager_global, learning_engine_global)
# تجهيز البيانات للطبقة 2
layer2_data = []
for ohlcv_data in ohlcv_data_list:
try:
# إضافة أسباب الترشيح من الطبقة 1
symbol = ohlcv_data['symbol']
layer1_candidate = next((c for c in layer1_candidates if c['symbol'] == symbol), None)
if layer1_candidate:
ohlcv_data['reasons_for_candidacy'] = layer1_candidate.get('reasons', [])
ohlcv_data['layer1_score'] = layer1_candidate.get('layer1_score', 0)
ohlcv_data['successful_timeframes'] = ohlcv_data.get('successful_timeframes', 0)
layer2_data.append(ohlcv_data)
except Exception as e:
continue
if not layer2_data:
print("❌ فشل إعداد بيانات الطبقة 2")
return None
# تقسيم العمل إلى دفعات للمعالجة المتوازية
batch_size = 15
batches = [layer2_data[i:i + batch_size] for i in range(0, len(layer2_data), batch_size)]
total_batches = len(batches)
print(f" 🚀 تقسيم العمل إلى {total_batches} دفعة ({batch_size} عملة لكل دفعة)...")
# معالجة جميع الدفعات بشكل متوازي
batch_tasks = []
for i, batch in enumerate(batches):
task = asyncio.create_task(process_batch_parallel(batch, ml_processor, i+1, total_batches))
batch_tasks.append(task)
#
# 🔴 تم التعديل: تجميع النتائج المفصلة
#
batch_results_list = await asyncio.gather(*batch_tasks)
# دمج جميع النتائج
layer2_candidates = []
all_low_score_candidates = []
all_failed_candidates = []
for batch_result in batch_results_list:
layer2_candidates.extend(batch_result['success'])
all_low_score_candidates.extend(batch_result['low_score'])
all_failed_candidates.extend(batch_result['failures'])
print(f"✅ اكتمل التحليل المتقدم: {len(layer2_candidates)} نجاح (عالي) | {len(all_low_score_candidates)} نجاح (منخفض) | {len(all_failed_candidates)} فشل")
if not layer2_candidates:
print("❌ لم يتم العثور على مرشحين في الطبقة 2")
# 🔴 استمرار لتسجيل السجل
# ترتيب المرشحين (الناجحين فقط) حسب الدرجة المحسنة وأخذ أقوى 10
layer2_candidates.sort(key=lambda x: x.get('enhanced_final_score', 0), reverse=True)
target_count = min(10, len(layer2_candidates))
final_layer2_candidates = layer2_candidates[:target_count]
print(f"🎯 تم اختيار {len(final_layer2_candidates)} عملة للطبقة 3 (الأقوى فقط)")
# ✅ حفظ المرشحين العشرة في ملف Candidates في R2
await r2_service_global.save_candidates_async(final_layer2_candidates)
# عرض أفضل 10 عملات من الطبقة 2
print("\n🏆 أفضل 10 عملات من الطبقة 2:")
for i, candidate in enumerate(final_layer2_candidates):
score = candidate.get('enhanced_final_score', 0)
strategy = candidate.get('target_strategy', 'GENERIC')
mc_score = candidate.get('monte_carlo_probability', 0)
pattern = candidate.get('pattern_analysis', {}).get('pattern_detected', 'no_pattern')
timeframes = candidate.get('successful_timeframes', 0)
print(f" {i+1}. {candidate['symbol']}:")
print(f" 📊 النهائي: {score:.3f} | الأطر: {timeframes}/6")
if mc_score > 0:
print(f" 🎯 مونت كارلو: {mc_score:.3f}")
print(f" 🎯 استراتيجية: {strategy} | نمط: {pattern}")
# الطبقة 3: التحليل بالنموذج الضخم
print("\n🧠 الطبقة 3: التحليل بالنموذج الضخم (LLMService)...")
final_opportunities = []
for candidate in final_layer2_candidates:
try:
symbol = candidate['symbol']
print(f" 🤔 تحليل {symbol} بالنموذج الضخم...")
# ✅ الإصلاح الرئيسي: التأكد من وجود بيانات الشموع في candidate
ohlcv_data = candidate.get('ohlcv') # التغيير هنا: استخدام 'ohlcv' بدلاً من 'raw_ohlcv'
if not ohlcv_data:
print(f" ⚠️ لا توجد بيانات شموع لـ {symbol}")
# محاولة الحصول على البيانات من المصدر الأصلي
symbol_ohlcv_list = await data_manager_global.get_ohlcv_data_for_symbols([symbol])
if symbol_ohlcv_list and len(symbol_ohlcv_list) > 0:
ohlcv_data = symbol_ohlcv_list[0].get('ohlcv')
candidate['ohlcv'] = ohlcv_data
candidate['raw_ohlcv'] = ohlcv_data
if not ohlcv_data:
print(f" ⚠️ فشل جلب بيانات شموع لـ {symbol}")
continue
# ✅ التأكد من تمرير البيانات الخام للنموذج - الإصلاح الرئيسي
candidate['raw_ohlcv'] = ohlcv_data
candidate['ohlcv'] = ohlcv_data
# ✅ التحقق من جودة البيانات قبل الإرسال للنموذج
timeframes_count = candidate.get('successful_timeframes', 0)
total_candles = sum(len(data) for data in ohlcv_data.values()) if ohlcv_data else 0
if total_candles < 30: # تخفيف الشرط من 50 إلى 30 شمعة
print(f" ⚠️ بيانات شموع غير كافية لـ {symbol}: {total_candles} شمعة فقط")
continue
print(f" 📊 إرسال {symbol} للنموذج: {total_candles} شمعة في {timeframes_count} إطار زمني")
# ✅ إرسال كل عملة للنموذج الضخم على حدة
llm_analysis = await llm_service_global.get_trading_decision(candidate)
# ✅ التحقق من وجود قرار صالح من النموذج
if llm_analysis and llm_analysis.get('action') in ['BUY', 'SELL']:
opportunity = {
'symbol': symbol,
'current_price': candidate.get('current_price', 0),
'decision': llm_analysis,
'enhanced_score': candidate.get('enhanced_final_score', 0),
'llm_confidence': llm_analysis.get('confidence_level', 0),
'strategy': llm_analysis.get('strategy', 'GENERIC'),
'analysis_timestamp': datetime.now().isoformat(),
'timeframes_count': timeframes_count,
'total_candles': total_candles
}
final_opportunities.append(opportunity)
print(f" ✅ {symbol}: {llm_analysis.get('action')} - ثقة: {llm_analysis.get('confidence_level', 0):.2f}")
else:
action = llm_analysis.get('action', 'NO_DECISION') if llm_analysis else 'NO_RESPONSE'
print(f" ⚠️ {symbol}: لا يوجد قرار تداول من النموذج الضخم ({action})")
except Exception as e:
print(f"❌ خطأ في تحليل النموذج الضخم لـ {candidate.get('symbol')}: {e}")
continue
if final_opportunities:
# ترتيب الفرص النهائية حسب الثقة والدرجة
final_opportunities.sort(key=lambda x: (x['llm_confidence'] + x['enhanced_score']) / 2, reverse=True)
print(f"\n🏆 النظام الطبقي اكتمل: {len(final_opportunities)} فرصة تداول")
for i, opportunity in enumerate(final_opportunities[:5]):
print(f" {i+1}. {opportunity['symbol']}: {opportunity['decision'].get('action')} - ثقة: {opportunity['llm_confidence']:.2f} - أطر: {opportunity['timeframes_count']}")
#
# 🔴 --- بدء سجل تدقيق التحليل ---
#
try:
# 1. ملخص الـ 10 الأوائل (لـ LLM)
top_10_detailed_summary = []
for c in final_layer2_candidates: # هذه هي قائمة الـ 10 الأوائل
whale_summary = "Not Available"
whale_data = c.get('whale_data')
if whale_data and whale_data.get('data_available'):
signal = whale_data.get('trading_signal', {})
action = signal.get('action', 'HOLD')
confidence = signal.get('confidence', 0)
reason_preview = signal.get('reason', 'N/A')[:75] + "..." if signal.get('reason') else 'N/A'
whale_summary = f"Action: {action}, Conf: {confidence:.2f}, Alert: {signal.get('critical_alert', False)}, Reason: {reason_preview}"
top_10_detailed_summary.append({
"symbol": c.get('symbol'),
"score": c.get('enhanced_final_score', 0),
"timeframes": f"{c.get('successful_timeframes', 'N/A')}/6",
"whale_data_summary": whale_summary,
"strategy": c.get('target_strategy', 'N/A'),
"pattern": c.get('pattern_analysis', {}).get('pattern_detected', 'N/A'),
})
# 2. ملخص باقي الناجحين (الذين لم يتم إرسالهم للنموذج)
other_successful_candidates = layer2_candidates[target_count:]
other_success_summary = [
{
"symbol": c['symbol'],
"score": c.get('enhanced_final_score', 0),
"timeframes": f"{c.get('successful_timeframes', 'N/A')}/6",
"whale_data": "Available" if c.get('whale_data', {}).get('data_available') else "Not Available"
}
for c in other_successful_candidates
]
# 3. ملخص الدرجات المنخفضة (نجاح < 0.4)
low_score_summary = [
{
"symbol": c['symbol'],
"score": c.get('enhanced_final_score', 0),
"timeframes": f"{c.get('successful_timeframes', 'N/A')}/6",
"whale_data": "Available" if c.get('whale_data', {}).get('data_available') else "Not Available"
}
for c in all_low_score_candidates
]
# 4. تجميع السجل النهائي
audit_data = {
"timestamp": datetime.now().isoformat(),
"total_layer1_candidates": len(layer1_candidates),
"total_processed_in_layer2": len(layer2_candidates) + len(all_low_score_candidates) + len(all_failed_candidates),
"counts": {
"sent_to_llm": len(final_layer2_candidates),
"success_not_top_10": len(other_successful_candidates),
"success_low_score": len(all_low_score_candidates),
"failures": len(all_failed_candidates)
},
"top_candidates_for_llm": top_10_detailed_summary,
"other_successful_candidates": other_success_summary,
"low_score_candidates": low_score_summary,
"failed_candidates": all_failed_candidates, # {"symbol": ..., "error": ...}
}
# 5. حفظ السجل
await r2_service_global.save_analysis_audit_log_async(audit_data)
print(f"✅ تم حفظ سجل تدقيق التحليل في R2.")
except Exception as audit_error:
print(f"❌ فشل حفظ سجل تدقيق التحليل: {audit_error}")
traceback.print_exc()
#
# 🔴 --- نهاية سجل تدقيق التحليل ---
#
if not final_opportunities:
print("❌ لم يتم العثور على فرص تداول مناسبة")
return None
return final_opportunities[0] if final_opportunities else None
except Exception as error:
print(f"❌ خطأ في النظام الطبقي: {error}")
traceback.print_exc()
# 🔴 تسجيل السجل حتى في حالة الفشل
try:
audit_data = {
"timestamp": datetime.now().isoformat(),
"status": "FAILED",
"error": str(error),
"traceback": traceback.format_exc(),
"total_layer1_candidates": len(layer1_candidates),
"counts": {
"sent_to_llm": 0,
"success_not_top_10": len(layer2_candidates[target_count:]) if 'target_count' in locals() else 0,
"success_low_score": len(all_low_score_candidates),
"failures": len(all_failed_candidates)
},
"failed_candidates": all_failed_candidates
}
await r2_service_global.save_analysis_audit_log_async(audit_data)
print("⚠️ تم حفظ سجل تدقيق جزئي بعد الفشل.")
except Exception as audit_fail_error:
print(f"❌ فشل حفظ سجل التدقيق أثناء معالجة خطأ آخر: {audit_fail_error}")
return None
async def re_analyze_open_trade_async(trade_data):
"""إعادة تحليل الصفقة المفتوحة"""
symbol = trade_data.get('symbol')
try:
async with state_manager.trade_analysis_lock:
# جلب البيانات الحالية
market_context = await data_manager_global.get_market_context_async()
ohlcv_data_list = await data_manager_global.get_ohlcv_data_for_symbols([symbol])
if not ohlcv_data_list:
return None
ohlcv_data = ohlcv_data_list[0]
ohlcv_data['reasons_for_candidacy'] = ['re-analysis']
# استخدام ML للتحليل
ml_processor = MLProcessor(market_context, data_manager_global, learning_engine_global)
processed_data = await ml_processor.process_and_score_symbol_enhanced(ohlcv_data)
if not processed_data:
return None
# ✅ التأكد من تمرير بيانات الشموع بشكل صحيح
processed_data['raw_ohlcv'] = ohlcv_data.get('raw_ohlcv') or ohlcv_data.get('ohlcv')
processed_data['ohlcv'] = processed_data['raw_ohlcv']
# استخدام LLM لإعادة التحليل
re_analysis_decision = await llm_service_global.re_analyze_trade_async(trade_data, processed_data)
# ✅ التحقق من وجود قرار صالح من النموذج
if re_analysis_decision:
await r2_service_global.save_system_logs_async({
"trade_reanalyzed": True,
"symbol": symbol,
"action": re_analysis_decision.get('action'),
'strategy': re_analysis_decision.get('strategy', 'GENERIC')
})
return {
"symbol": symbol,
"decision": re_analysis_decision,
"current_price": processed_data.get('current_price')
}
else:
return None
except Exception as error:
await r2_service_global.save_system_logs_async({
"reanalysis_error": True,
"symbol": symbol,
"error": str(error)
})
return None
async def run_bot_cycle_async():
"""دورة التداول الرئيسية"""
try:
if not await state_manager.wait_for_initialization():
print("❌ الخدمات غير مهيأة بالكامل - تخطي الدورة")
return
print("🔄 بدء دورة التداول...")
await r2_service_global.save_system_logs_async({"cycle_started": True})
if not r2_service_global.acquire_lock():
print("❌ فشل الحصول على القفل - تخطي الدورة")
return
try:
open_trades = await trade_manager_global.get_open_trades()
print(f"📋 الصفقات المفتوحة: {len(open_trades)}")
should_look_for_new_trade = len(open_trades) == 0
# إعادة تحليل الصفقات المفتوحة
if open_trades:
now = datetime.now()
trades_to_reanalyze = [
trade for trade in open_trades
if now >= datetime.fromisoformat(trade.get('expected_target_time', now.isoformat()))
]
if trades_to_reanalyze:
print(f"🔄 إعادة تحليل {len(trades_to_reanalyze)} صفقة")
for trade in trades_to_reanalyze:
result = await re_analyze_open_trade_async(trade)
if result and result['decision'].get('action') == "CLOSE_TRADE":
await trade_manager_global.close_trade(trade, result['current_price'], 'CLOSED_BY_REANALYSIS')
should_look_for_new_trade = True
elif result and result['decision'].get('action') == "UPDATE_TRADE":
await trade_manager_global.update_trade(trade, result['decision'])
# البحث عن صفقات جديدة إذا لزم الأمر
if should_look_for_new_trade:
portfolio_state = await r2_service_global.get_portfolio_state_async()
current_capital = portfolio_state.get("current_capital_usd", 0)
if current_capital > 1:
print("🎯 البحث عن فرص تداول جديدة...")
best_opportunity = await run_3_layer_analysis()
if best_opportunity:
print(f"✅ فتح صفقة جديدة: {best_opportunity['symbol']}")
await trade_manager_global.open_trade(
best_opportunity['symbol'],
best_opportunity['decision'],
best_opportunity['current_price']
)
else:
print("❌ لم يتم العثور على فرص تداول مناسبة")
else:
print("❌ رأس المال غير كافي لفتح صفقات جديدة")
finally:
r2_service_global.release_lock()
await r2_service_global.save_system_logs_async({
"cycle_completed": True,
"open_trades": len(open_trades) if 'open_trades' in locals() else 0
})
print("✅ اكتملت دورة التداول")
except Exception as error:
print(f"❌ Unhandled error in main cycle: {error}")
await r2_service_global.save_system_logs_async({
"cycle_error": True,
"error": str(error)
})
if r2_service_global.lock_acquired:
r2_service_global.release_lock()
@asynccontextmanager
async def lifespan(application: FastAPI):
"""إدارة دورة حياة التطبيق"""
print("🚀 بدء تهيئة التطبيق...")
try:
# تهيئة الخدمات
success = await initialize_services()
if not success:
print("❌ فشل تهيئة التطبيق - إغلاق...")
yield
return
# بدء المهام الخلفية
asyncio.create_task(monitor_market_async())
asyncio.create_task(trade_manager_global.start_trade_monitoring())
await r2_service_global.save_system_logs_async({"application_started": True})
print("🎯 التطبيق جاهز للعمل - نظام الطبقات 3 فعال")
yield
except Exception as error:
print(f"❌ Application startup failed: {error}")
traceback.print_exc()
if r2_service_global:
await r2_service_global.save_system_logs_async({
"application_startup_failed": True,
"error": str(error)
})
raise
finally:
await cleanup_on_shutdown()
application = FastAPI(
lifespan=lifespan,
title="AI Trading Bot",
description="نظام تداول ذكي بثلاث طبقات تحليلية",
version="3.0.0"
)
@application.get("/")
async def root():
"""الصفحة الرئيسية"""
return {
"message": "مرحباً بك في نظام التداول الذكي",
"system": "3-Layer Analysis System",
"status": "running" if state_manager.initialization_complete else "initializing",
"timestamp": datetime.now().isoformat()
}
@application.get("/run-cycle")
async def run_cycle_api():
"""تشغيل دورة التداول"""
if not state_manager.initialization_complete:
raise HTTPException(status_code=503, detail="الخدمات غير مهيأة بالكامل")
asyncio.create_task(run_bot_cycle_async())
return {"message": "Bot cycle initiated", "system": "3-Layer Analysis"}
@application.get("/health")
async def health_check():
"""فحص صحة النظام"""
services_status = {
"status": "healthy" if state_manager.initialization_complete else "initializing",
"initialization_complete": state_manager.initialization_complete,
"services_initialized": state_manager.services_initialized,
"initialization_error": state_manager.initialization_error,
"timestamp": datetime.now().isoformat(),
"system_architecture": "3-Layer Analysis System",
"layers": {
"layer1": "Data Manager - Rapid Screening",
"layer2": "ML Processor - Advanced Analysis",
"layer3": "LLM Service - Deep Analysis"
}
}
return services_status
@application.get("/analyze-market")
async def analyze_market_api():
"""تشغيل التحليل الطبقي فقط"""
if not state_manager.initialization_complete:
raise HTTPException(status_code=503, detail="الخدمات غير مهيأة بالكامل")
result = await run_3_layer_analysis()
if result:
return {
"opportunity_found": True,
"symbol": result['symbol'],
"action": result['decision'].get('action'),
"confidence": result['llm_confidence'],
"strategy": result['strategy']
}
else:
return {"opportunity_found": False, "message": "No suitable opportunities found"}
@application.get("/portfolio")
async def get_portfolio_api():
"""الحصول على حالة المحفظة"""
if not state_manager.initialization_complete:
raise HTTPException(status_code=503, detail="الخدمات غير مهيأة بالكامل")
try:
portfolio_state = await r2_service_global.get_portfolio_state_async()
open_trades = await trade_manager_global.get_open_trades()
return {
"portfolio": portfolio_state,
"open_trades": open_trades,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"خطأ في جلب بيانات المحفظة: {str(e)}")
@application.get("/system-status")
async def get_system_status():
"""الحصول على حالة النظام التفصيلية"""
monitoring_status = trade_manager_global.get_monitoring_status() if trade_manager_global else {}
return {
"initialization_complete": state_manager.initialization_complete,
"services_initialized": state_manager.services_initialized,
"initialization_error": state_manager.initialization_error,
"market_state_ok": state.MARKET_STATE_OK,
"monitoring_status": monitoring_status,
"timestamp": datetime.now().isoformat()
}
async def cleanup_on_shutdown():
"""تنظيف الموارد عند الإغلاق"""
global r2_service_global, data_manager_global, trade_manager_global, learning_engine_global
print("🛑 Shutdown signal received. Cleaning up...")
if trade_manager_global:
trade_manager_global.stop_monitoring()
print("✅ Trade monitoring stopped")
if learning_engine_global and learning_engine_global.initialized:
try:
await learning_engine_global.save_weights_to_r2()
await learning_engine_global.save_performance_history()
print("✅ Learning engine data saved")
except Exception as e:
print(f"❌ Failed to save learning engine data: {e}")
if data_manager_global:
await data_manager_global.close()
print("✅ Data manager closed")
if r2_service_global:
try:
await r2_service_global.save_system_logs_async({"application_shutdown": True})
print("✅ Shutdown log saved")
except Exception as e:
print(f"❌ Failed to save shutdown log: {e}")
if r2_service_global.lock_acquired:
r2_service_global.release_lock()
print("✅ R2 lock released")
def signal_handler(signum, frame):
"""معالج إشارات الإغلاق"""
print(f"🛑 Received signal {signum}. Initiating shutdown...")
asyncio.create_task(cleanup_on_shutdown())
sys.exit(0)
# تسجيل معالجات الإشارات
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if __name__ == "__main__":
print("🚀 Starting AI Trading Bot with 3-Layer Analysis System...")
uvicorn.run(
application,
host="0.0.0.0",
port=7860,
log_level="info",
access_log=True
)