Riy777 commited on
Commit
f42108a
·
1 Parent(s): 4696baa

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +43 -22
app.py CHANGED
@@ -118,7 +118,7 @@ async def initialize_services():
118
  llm_service_global = LLMService()
119
  llm_service_global.r2_service = r2_service_global
120
  state_manager.set_service_initialized('llm_service')
121
- print(" ✅ LLMService مهيأة")
122
 
123
  # 6. تهيئة محلل المشاعر
124
  print(" 🔄 تهيئة محلل المشاعر...")
@@ -190,22 +190,44 @@ async def monitor_market_async():
190
  except Exception as e:
191
  print(f"❌ فشل تشغيل مراقبة السوق: {e}")
192
 
193
- #
194
- # 🔴 هذه الدالة (المستهلك) لم تتغير، لأنها بالفعل تعالج الدفعات
195
- #
 
196
  async def process_batch_parallel(batch, ml_processor, batch_num, total_batches):
197
- """معالجة دفعة من الرموز بشكل متوازي وإرجاع نتائج مفصلة"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
198
  try:
199
- print(f" 🔄 [المستهلك] بدء معالجة الدفعة {batch_num}/{total_batches} ({len(batch)} عملة)...")
200
 
201
- # إنشاء مهام للدفعة الحالية
202
  batch_tasks = []
203
  for symbol_data in batch:
204
- task = asyncio.create_task(ml_processor.process_and_score_symbol_enhanced(symbol_data))
 
205
  batch_tasks.append(task)
206
 
207
  # انتظار انتهاء جميع مهام الدفعة الحالية
208
- batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
209
 
210
  successful_results = []
211
  low_score_results = []
@@ -235,10 +257,9 @@ async def process_batch_parallel(batch, ml_processor, batch_num, total_batches):
235
  except Exception as error:
236
  print(f"❌ [المستهلك] خطأ في معالجة الدفعة {batch_num}: {error}")
237
  return {'success': [], 'low_score': [], 'failures': []}
 
238
 
239
 
240
- # 🔴 --- بدء التعديل الجوهري --- 🔴
241
- # تم إعادة هيكلة هذه الدالة بالكامل لاستخدام نموذج المنتج/المستهلك
242
  async def run_3_layer_analysis():
243
  """
244
  (معدلة) تشغيل النظام الطبقي (منتج/مستهلك)
@@ -274,8 +295,6 @@ async def run_3_layer_analysis():
274
  # 🔴 --- إعداد نموذج المنتج والمستهلك --- 🔴
275
 
276
  # 1. إنشاء الطابور (Queue)
277
- # maxsize=2 يعني أن المنتج (جلب البيانات) سيتوقف مؤقتاً إذا كان متقدماً
278
- # على المستهلك (تحليل ML) بأكثر من دفعتين. هذا يمنع استهلاك ذاكرة مفرط.
279
  DATA_QUEUE_MAX_SIZE = 2
280
  ohlcv_data_queue = asyncio.Queue(maxsize=DATA_QUEUE_MAX_SIZE)
281
 
@@ -309,7 +328,7 @@ async def run_3_layer_analysis():
309
  batch_num += 1
310
  print(f" 📬 [المستهلك] استلم الدفعة {batch_num}/{total_batches} ({len(batch_data)} عملة)")
311
 
312
- # تشغيل المعالجة المتوازية للدفعة (هذا ما كان يحدث سابقاً)
313
  batch_results_dict = await process_batch_parallel(
314
  batch_data, ml_processor, batch_num, total_batches
315
  )
@@ -330,8 +349,6 @@ async def run_3_layer_analysis():
330
 
331
  # 5. تشغيل مهمة المنتج (Producer)
332
  layer1_symbols = [candidate['symbol'] for candidate in layer1_candidates]
333
- # دمج بيانات الطبقة 1 مع بيانات OHLCV
334
- # 🔴 (تعديل بسيط): دمج بيانات الطبقة 1 *بعد* جلب OHLCV لتبسيط التدفق
335
  print(" ▶️ [المنتج] بدء تشغيل مهمة المنتج (تدفق بيانات OHLCV)...")
336
  producer_task = asyncio.create_task(
337
  data_manager_global.stream_ohlcv_data(layer1_symbols, ohlcv_data_queue)
@@ -548,23 +565,29 @@ async def re_analyze_open_trade_async(trade_data):
548
  async with state_manager.trade_analysis_lock:
549
  # جلب البيانات الحالية
550
  market_context = await data_manager_global.get_market_context_async()
551
- # 🔴 ملاحظة: هذه الدالة معدلة الآن، لكن استدعاؤها لرمز واحد سيعمل كما كان
 
552
  ohlcv_data_list = []
553
  temp_queue = asyncio.Queue()
 
554
  await data_manager_global.stream_ohlcv_data([symbol], temp_queue)
 
555
  while not temp_queue.empty():
556
- ohlcv_data_list.extend(await temp_queue.get())
 
 
557
 
558
  if not ohlcv_data_list:
 
559
  return None
560
 
561
  ohlcv_data = ohlcv_data_list[0]
562
 
563
- # 🔴 دمج بيانات الطبقة 1 الوهمية لإعادة التحليل
564
  l1_data = await data_manager_global._get_detailed_symbol_data(symbol)
565
  if l1_data:
566
  ohlcv_data['reasons_for_candidacy'] = l1_data.get('reasons', ['re-analysis'])
567
- ohlcv_data['layer1_score'] = l1_data.get('layer1_score', 0.5) # افتراض درجة متوسطة
568
 
569
  # استخدام ML للتحليل
570
  ml_processor = MLProcessor(market_context, data_manager_global, learning_engine_global)
@@ -573,14 +596,12 @@ async def re_analyze_open_trade_async(trade_data):
573
  if not processed_data:
574
  return None
575
 
576
- # ✅ التأكد من تمرير بيانات الشموع بشكل صحيح
577
  processed_data['raw_ohlcv'] = ohlcv_data.get('raw_ohlcv') or ohlcv_data.get('ohlcv')
578
  processed_data['ohlcv'] = processed_data['raw_ohlcv']
579
 
580
  # استخدام LLM لإعادة التحليل
581
  re_analysis_decision = await llm_service_global.re_analyze_trade_async(trade_data, processed_data)
582
 
583
- # ✅ التحقق من وجود قرار صالح من النموذج
584
  if re_analysis_decision:
585
  await r2_service_global.save_system_logs_async({
586
  "trade_reanalyzed": True,
 
118
  llm_service_global = LLMService()
119
  llm_service_global.r2_service = r2_service_global
120
  state_manager.set_service_initialized('llm_service')
121
+ print(" ✅ LLMService مهيأ")
122
 
123
  # 6. تهيئة محلل المشاعر
124
  print(" 🔄 تهيئة محلل المشاعر...")
 
190
  except Exception as e:
191
  print(f"❌ فشل تشغيل مراقبة السوق: {e}")
192
 
193
+
194
+ # 🔴 --- بدء التعديل الجوهري --- 🔴
195
+ # تم تعديل هذه الدالة لإضافة Semaphore داخلي
196
+ # هذا للحد من عدد العمليات المتزامنة *داخل* الدفعة الواحدة
197
  async def process_batch_parallel(batch, ml_processor, batch_num, total_batches):
198
+ """
199
+ (معدلة) معالجة دفعة من الرموز بشكل متوازي وإرجاع نتائج مفصلة
200
+ مع إضافة منظم سرعة (Semaphore) للتحكم في الضغط على الشبكة
201
+ """
202
+
203
+ # 🔴 تحديد 5 عمليات متزامنة كحد أقصى (من الـ 15 في الدفعة)
204
+ # هذا هو مفتاح الحل لمنع انهيار الشبكة
205
+ CONCURRENT_ML_TASKS = 5
206
+ semaphore = asyncio.Semaphore(CONCURRENT_ML_TASKS)
207
+
208
+ tasks_results = []
209
+
210
+ async def process_symbol_with_semaphore(symbol_data):
211
+ """دالة داخلية لاستهدافها بالـ Semaphore"""
212
+ async with semaphore:
213
+ try:
214
+ # هذه هي المهمة التي تستهلك الشبكة (جلب بيانات الحيتان)
215
+ return await ml_processor.process_and_score_symbol_enhanced(symbol_data)
216
+ except Exception as e:
217
+ # إرجاع الخطأ ليتم تسجيله
218
+ return e
219
+
220
  try:
221
+ print(f" 🔄 [المستهلك] بدء معالجة الدفعة {batch_num}/{total_batches} ({len(batch)} عملة)... (بحد أقصى {CONCURRENT_ML_TASKS} معاً)")
222
 
 
223
  batch_tasks = []
224
  for symbol_data in batch:
225
+ # 🔴 تشغيل المهام عبر الدالة الداخلية المنظمة
226
+ task = asyncio.create_task(process_symbol_with_semaphore(symbol_data))
227
  batch_tasks.append(task)
228
 
229
  # انتظار انتهاء جميع مهام الدفعة الحالية
230
+ batch_results = await asyncio.gather(*batch_tasks, return_exceptions=False) # return_exceptions=False لأننا نعالجها يدوياً
231
 
232
  successful_results = []
233
  low_score_results = []
 
257
  except Exception as error:
258
  print(f"❌ [المستهلك] خطأ في معالجة الدفعة {batch_num}: {error}")
259
  return {'success': [], 'low_score': [], 'failures': []}
260
+ # 🔴 --- نهاية التعديل الجوهري --- 🔴
261
 
262
 
 
 
263
  async def run_3_layer_analysis():
264
  """
265
  (معدلة) تشغيل النظام الطبقي (منتج/مستهلك)
 
295
  # 🔴 --- إعداد نموذج المنتج والمستهلك --- 🔴
296
 
297
  # 1. إنشاء الطابور (Queue)
 
 
298
  DATA_QUEUE_MAX_SIZE = 2
299
  ohlcv_data_queue = asyncio.Queue(maxsize=DATA_QUEUE_MAX_SIZE)
300
 
 
328
  batch_num += 1
329
  print(f" 📬 [المستهلك] استلم الدفعة {batch_num}/{total_batches} ({len(batch_data)} عملة)")
330
 
331
+ # 🔴 تشغيل المعالجة المتوازية (المعدلة) للدفعة
332
  batch_results_dict = await process_batch_parallel(
333
  batch_data, ml_processor, batch_num, total_batches
334
  )
 
349
 
350
  # 5. تشغيل مهمة المنتج (Producer)
351
  layer1_symbols = [candidate['symbol'] for candidate in layer1_candidates]
 
 
352
  print(" ▶️ [المنتج] بدء تشغيل مهمة المنتج (تدفق بيانات OHLCV)...")
353
  producer_task = asyncio.create_task(
354
  data_manager_global.stream_ohlcv_data(layer1_symbols, ohlcv_data_queue)
 
565
  async with state_manager.trade_analysis_lock:
566
  # جلب البيانات الحالية
567
  market_context = await data_manager_global.get_market_context_async()
568
+
569
+ # 🔴 استخدام نموذج الطابور لجلب بيانات الرمز الواحد
570
  ohlcv_data_list = []
571
  temp_queue = asyncio.Queue()
572
+ # استدعاء المنتج لرمز واحد
573
  await data_manager_global.stream_ohlcv_data([symbol], temp_queue)
574
+ # استهلاك البيانات من الطابور
575
  while not temp_queue.empty():
576
+ batch = await temp_queue.get()
577
+ if batch:
578
+ ohlcv_data_list.extend(batch)
579
 
580
  if not ohlcv_data_list:
581
+ print(f"⚠️ فشل جلب بيانات إعادة التحليل لـ {symbol}")
582
  return None
583
 
584
  ohlcv_data = ohlcv_data_list[0]
585
 
586
+ # دمج بيانات الطبقة 1 الوهمية لإعادة التحليل
587
  l1_data = await data_manager_global._get_detailed_symbol_data(symbol)
588
  if l1_data:
589
  ohlcv_data['reasons_for_candidacy'] = l1_data.get('reasons', ['re-analysis'])
590
+ ohlcv_data['layer1_score'] = l1_data.get('layer1_score', 0.5)
591
 
592
  # استخدام ML للتحليل
593
  ml_processor = MLProcessor(market_context, data_manager_global, learning_engine_global)
 
596
  if not processed_data:
597
  return None
598
 
 
599
  processed_data['raw_ohlcv'] = ohlcv_data.get('raw_ohlcv') or ohlcv_data.get('ohlcv')
600
  processed_data['ohlcv'] = processed_data['raw_ohlcv']
601
 
602
  # استخدام LLM لإعادة التحليل
603
  re_analysis_decision = await llm_service_global.re_analyze_trade_async(trade_data, processed_data)
604
 
 
605
  if re_analysis_decision:
606
  await r2_service_global.save_system_logs_async({
607
  "trade_reanalyzed": True,