Study / app.py
Riy777's picture
Update app.py
eb11b26 verified
raw
history blame
62.3 kB
import os
import logging
import asyncio
import re
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes, ConversationHandler
from huggingface_hub import HfApi, hf_hub_download, list_repo_files
from openai import OpenAI
import pickle
import json
from datetime import datetime
import PyPDF2
import fitz # PyMuPDF
from PIL import Image
import io
import requests
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import HTMLResponse
import uvicorn
import random
import docx # لإضافة دعم .docx
# ========== إضافة المكتبات الصحيحة لحل مشكلة DNS + Lifespan + Typing ==========
import httpx
import dns.asyncresolver
from httpx import AsyncClient, AsyncHTTPTransport, Request, Response
import contextlib # <-- إضافة المكتبة الصحيحة
from typing import Dict # <-- إضافة Dict لتحديد نوع الاستجابة
# ==========================================================
# ========== تكوين السجلات ==========
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# ========== التوكنات ومتغيرات البيئة ==========
TELEGRAM_BOT_TOKEN = os.environ.get('TELEGRAM_BOT_TOKEN')
NVAPI_API_KEY = os.environ.get('NVAPI_API_KEY')
SPACE_URL = os.environ.get('SPACE_URL', '')
# ========== تكوين عميل NVIDIA ==========
nvidia_client = OpenAI(
base_url="https://integrate.api.nvidia.com/v1",
api_key=NVAPI_API_KEY
)
# ========== مستودع Hugging Face للمواد ==========
REPO_ID = "Riy777/Study"
# ========== حالات المحادثة ==========
SELECTING_SUBJECT, SELECTING_ACTION, WAITING_FOR_QUESTION = range(3)
# ========== فئة الناقل المخصص للـ DNS ==========
class CustomDNSTransport(AsyncHTTPTransport):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.resolver = dns.asyncresolver.Resolver()
self.resolver.nameservers = ['1.1.1.1', '8.8.8.8']
logger.info("🔧 CustomDNSTransport initialized with 1.1.1.1 and 8.8.8.8")
async def handle_async_request(self, request: Request) -> Response:
try:
host = request.url.host
# التحقق إذا كان Host هو أصلاً IP
is_ip = True
try:
# تحقق مبسط لـ IPv4
parts = host.split('.')
if len(parts) != 4 or not all(p.isdigit() and 0 <= int(p) <= 255 for p in parts):
is_ip = False
# يمكن إضافة تحقق لـ IPv6 إذا لزم الأمر
except Exception:
is_ip = False
if is_ip:
# logger.info(f"Host {host} is already an IP. Skipping resolution.") # يمكن إلغاء التعليق عند الحاجة للتتبع
pass
else:
logger.info(f"🔧 Resolving host: {host}")
result = await self.resolver.resolve(host, 'A')
ip = result[0].address
logger.info(f"✅ Resolved {host} to {ip}")
# تعيين SNI (مهم جداً لـ SSL/HTTPS)
request.extensions["sni_hostname"] = host
# تحديث الـ URL لاستخدام الـ IP
request.url = request.url.copy_with(host=ip)
except dns.resolver.NoAnswer:
logger.error(f"❌ DNS NoAnswer for {host}. Trying request with original host...")
except dns.resolver.NXDOMAIN:
logger.error(f"❌ DNS NXDOMAIN for {host}. Trying request with original host...")
except Exception as e:
logger.error(f"❌ DNS resolution failed for {host}: {e}. Trying request with original host...")
# إذا فشل الحل لأي سبب، اسمح للطلب بالاستمرار مع اسم المضيف الأصلي
pass
# استدعاء المعالج الأصلي (الأب) لإكمال الطلب
return await super().handle_async_request(request)
# ==========================================================
# ========== تطبيق FastAPI ==========
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("🚀 بدء تشغيل FastAPI... بدء مهمة تهيئة البوت في الخلفية.")
asyncio.create_task(bot.initialize_application())
logger.info("✅ خادم FastAPI يعمل. التهيئة (الاتصال بـ Telegram) جارية في الخلفية.")
yield
logger.info("🛑 إيقاف تشغيل خادم FastAPI.")
app = FastAPI(title="Medical Lab Bot", version="1.0.0", lifespan=lifespan)
# ========== فئة البوت الرئيسية ==========
class MedicalLabBot:
def __init__(self):
self.conversation_memory = {}
self.available_materials = {}
self.file_cache = {}
self.application = None
self.is_initialized = False
self.initialization_status = "pending" # تتبع حالة التهيئة
self.load_all_materials()
async def initialize_application(self):
"""تهيئة تطبيق التليجرام بشكل غير متزامن مع حل DNS المخصص"""
try:
if self.is_initialized:
return True
logger.info("🔄 جاري تهيئة تطبيق التليجرام...")
# ========== الحل الجذري (DNS) ==========
logger.info("🔧 إعداد عميل HTTP مخصص مع CustomDNSTransport...")
custom_transport = CustomDNSTransport()
custom_client = httpx.AsyncClient(transport=custom_transport)
# ============================================
self.application = (
Application.builder()
.token(TELEGRAM_BOT_TOKEN)
.http_client(custom_client) # <-- تمرير العميل المخصص هنا
.build()
)
await self.setup_handlers()
max_retries = 3
retry_delay = 5
for attempt in range(max_retries):
try:
logger.info(f"🚀 محاولة تهيئة الاتصال بـ Telegram (محاولة {attempt + 1}/{max_retries})...")
# 1. الاتصال بـ Telegram (getMe)
await self.application.initialize()
logger.info("✅ تم تهيئة الاتصال بـ Telegram بنجاح.")
# 2. ضبط الـ Webhook (فقط بعد نجاح الاتصال)
if SPACE_URL:
webhook_url = f"{SPACE_URL.rstrip('/')}/telegram"
logger.info(f"ℹ️ جاري إعداد الويب هوك على: {webhook_url}")
await self.application.bot.set_webhook(
url=webhook_url,
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=True
)
logger.info(f"✅ Webhook set to: {webhook_url}")
else:
logger.warning("⚠️ SPACE_URL not set. Webhook cannot be set.")
# 3. ضبط الحالة على "ناجح"
self.is_initialized = True
self.initialization_status = "success"
logger.info("✅✅✅ التطبيق جاهز لاستقبال الطلبات.")
return True # نجح، اخرج من الدالة
except Exception as e:
logger.warning(f"⚠️ فشلت محاولة التهيئة {attempt + 1}: {e}")
if attempt < max_retries - 1:
logger.info(f"⏳ الانتظار {retry_delay} ثواني قبل إعادة المحاولة...")
await asyncio.sleep(retry_delay)
else:
logger.error(f"❌ فشل تهيئة التطبيق نهائياً بعد {max_retries} محاولات (حتى مع DNS المخصص).")
# فشل بعد كل المحاولات
self.is_initialized = False
self.initialization_status = "failed"
return False
except Exception as e:
logger.error(f"❌ خطأ فادح في تهيئة التطبيق: {e}")
self.is_initialized = False
self.initialization_status = "failed"
return False
async def setup_handlers(self):
"""إعداد معالجات التليجرام"""
# إعداد Conversation Handler مع إعدادات محسنة
conv_handler = ConversationHandler(
entry_points=[CommandHandler('start', self.start)],
states={
SELECTING_SUBJECT: [
CallbackQueryHandler(self.handle_subject_selection, pattern='^(subject_|general_help|refresh_materials)')
],
SELECTING_ACTION: [
CallbackQueryHandler(self.handle_action_selection, pattern='^(explain_lecture|browse_files|generate_questions|summarize_content|explain_concept|main_menu)$'),
# إضافة معالج العودة هنا أيضاً
CallbackQueryHandler(self.handle_back_actions, pattern='^back_to_actions$')
],
WAITING_FOR_QUESTION: [
MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message),
CallbackQueryHandler(self.handle_back_actions, pattern='^back_to_actions$')
]
},
fallbacks=[
CommandHandler('start', self.start),
CallbackQueryHandler(self.handle_main_menu, pattern='^main_menu$')
],
name="medical_lab_conversation",
persistent=False,
per_message=False
)
# إضافة الـ handlers
self.application.add_handler(conv_handler)
# إضافة handlers منفصلة للاستعلامات العامة
self.application.add_handler(CallbackQueryHandler(self.handle_more_questions, pattern='^more_questions$'))
self.application.add_handler(CallbackQueryHandler(self.handle_change_subject, pattern='^change_subject$'))
logger.info("✅ تم إعداد معالجات التليجرام")
def load_all_materials(self):
"""تحميل جميع المواد والملفات من Hugging Face"""
try:
logger.info("جاري تحميل قائمة المواد من Hugging Face...")
all_files = list_repo_files(repo_id=REPO_ID, repo_type="dataset")
materials = {}
for file_path in all_files:
try:
path_parts = file_path.split('/')
if len(path_parts) >= 2:
subject = path_parts[0]
file_name = path_parts[-1]
if subject not in materials:
materials[subject] = {
'files': [],
'file_details': {}
}
file_info = self.extract_file_info(file_name, file_path)
materials[subject]['files'].append(file_info)
materials[subject]['file_details'][file_name] = file_info
else:
# ملفات في الجذر الرئيسي للمستودع (إن وجدت)
if 'general' not in materials:
materials['general'] = {
'files': [],
'file_details': {}
}
file_info = self.extract_file_info(file_path, file_path)
materials['general']['files'].append(file_info)
materials['general']['file_details'][file_path] = file_info
except Exception as e:
logger.error(f"خطأ في معالجة الملف {file_path}: {e}")
continue
# فرز الملفات داخل كل مادة (اختياري، للترتيب)
for subject in materials:
materials[subject]['files'].sort(key=lambda x: (x['lecture_number'] if x['lecture_number'] is not None else float('inf'), x['name']))
self.available_materials = materials
logger.info(f"✅ تم تحميل {len(materials)} مادة بنجاح")
except Exception as e:
logger.error(f"❌ خطأ في تحميل المواد: {e}")
self.available_materials = {'Biochemistry': {'files': [], 'file_details': {}}} # مادة افتراضية عند الفشل
# ========== (الدوال التي تم إكمالها) ==========
def get_user_memory(self, user_id):
"""الحصول على ذاكرة المستخدم أو إنشاؤها"""
if user_id not in self.conversation_memory:
self.conversation_memory[user_id] = {'history': [], 'last_subject': None}
return self.conversation_memory[user_id]
async def handle_general_help(self, query, context):
"""عرض رسالة المساعدة"""
help_text = """
❓ **مساعدة** ❓
هذا البوت مصمم لمساعدتك في دراسة مواد المختبرات الطبية.
1. **ابدأ** باختيار مادة من القائمة الرئيسية.
2. **اختر الخدمة:**
* **شرح محاضرة:** يعطيك ملخص للملف الذي تختاره.
* **استعراض الملفات:** يعرض لك كل الملفات المتاحة.
* **أسئلة عن المادة:** يولد أسئلة من ملفات عشوائية.
* **ملخص المادة:** يلخص لك ملف مهم من المادة.
* **تفسير مفهوم:** اطرح أي سؤال أو مصطلح (مثل "ما هو CBC") وسأشرحه لك.
3. **تحديث المواد:** اضغط (🔄) لتحديث قائمة المواد من المصدر.
"""
keyboard = [[InlineKeyboardButton("🔙 العودة للقائمة الرئيسية", callback_data="main_menu")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(help_text, reply_markup=reply_markup)
return SELECTING_SUBJECT
def extract_file_info(self, file_name, file_path):
"""استخراج معلومات الملف (النوع، رقم المحاضرة) من الاسم"""
name_lower = file_name.lower()
lecture_num = None
file_type = 'unknown'
# استخراج رقم المحاضرة (محاولة أكثر مرونة)
# يبحث عن "lec" أو "lecture" أو "محاضرة" متبوعة برقم
match = re.search(r'(?:lecture|lec|محاضرة)\s*(\d+)', name_lower)
if not match:
# إذا لم يجد الصيغة السابقة، يبحث عن رقم لوحده في بداية أو نهاية الاسم
match = re.search(r'^(\d+)\s*-|[\s_-](\d+)$', name_lower)
if match:
# يأخذ الرقم الثاني إذا وجد (المجموعة الثانية)، وإلا الأول
lecture_num_str = match.group(2) or match.group(1)
lecture_num = int(lecture_num_str) if lecture_num_str else None
else:
lecture_num = int(match.group(1))
# تحديد نوع الملف
if 'lab' in name_lower or 'عملي' in name_lower:
file_type = 'lab'
elif 'exam' in name_lower or 'امتحان' in name_lower or 'اسئلة' in name_lower:
file_type = 'exam'
elif 'summary' in name_lower or 'ملخص' in name_lower:
file_type = 'summary'
elif 'lecture' in name_lower or 'محاضرة' in name_lower:
file_type = 'lecture'
# إذا لم يكن أي مما سبق وكان هناك رقم محاضرة، افترضه محاضرة
elif lecture_num is not None:
file_type = 'lecture'
return {
'name': file_name,
'path': file_path,
'lecture_number': lecture_num,
'type': file_type
}
async def _call_nvidia_api(self, messages, max_tokens=1500):
"""دالة مساعدة لاستدعاء NVIDIA API"""
try:
completion = await asyncio.to_thread(
nvidia_client.chat.completions.create,
model="meta/llama3-70b-instruct", # استخدام موديل قوي
messages=messages,
temperature=0.5,
top_p=1,
max_tokens=max_tokens
)
return completion.choices[0].message.content
except Exception as e:
logger.error(f"❌ Error calling NVIDIA API: {e}")
return f"❌ حدث خطأ أثناء التواصل مع الذكاء الاصطناعي: {e}"
def _find_file_by_query(self, query, subject):
"""البحث عن ملف بناءً على استعلام المستخدم (رقم، اسم، الخ)"""
files = self.available_materials[subject]['files']
query_lower = query.lower().strip()
# 1. محاولة البحث بالرقم التسلسلي في القائمة (مثل "1", "2")
try:
# استخراج الرقم فقط
match_num = re.findall(r'^\d+$', query_lower) # يبحث عن رقم فقط في الاستعلام
if match_num:
index = int(match_num[0])
if 1 <= index <= len(files):
logger.info(f"Found file by index {index}")
return files[index - 1]
except (IndexError, ValueError):
pass # ليس استعلام رقمي
# 2. محاولة البحث برقم المحاضرة الدقيق (مثل "محاضرة 3", "lec 5")
match_lec_num = re.search(r'(?:lecture|lec|محاضرة)\s*(\d+)', query_lower)
if match_lec_num:
num = int(match_lec_num.group(1))
for file_info in files:
if file_info['lecture_number'] == num:
logger.info(f"Found file by exact lecture number {num}")
return file_info
# 3. محاولة البحث بجزء من الاسم
best_match = None
highest_score = 0
for file_info in files:
name_lower = file_info['name'].lower()
# حساب درجة التشابه (بسيط) - عدد الكلمات المشتركة
query_words = set(query_lower.split())
name_words = set(re.findall(r'\w+', name_lower)) # استخراج الكلمات من اسم الملف
common_words = query_words.intersection(name_words)
score = len(common_words)
if score > highest_score:
highest_score = score
best_match = file_info
# إذا تطابق الرقم المستخرج من الاستعلام مع رقم المحاضرة، نعتبره تطابق جيد
elif highest_score > 0 and file_info['lecture_number'] is not None:
num_in_query = re.findall(r'\d+', query_lower)
if num_in_query and file_info['lecture_number'] == int(num_in_query[0]):
logger.info(f"Found file by partial name match with lecture number heuristic: {file_info['name']}")
return file_info
if best_match and highest_score > 0:
logger.info(f"Found file by best partial name match: {best_match['name']} (score: {highest_score})")
return best_match
logger.warning(f"Could not find file matching query: '{query}' in subject: {subject}")
return None # لم يتم العثور
async def download_and_extract_content(self, file_path, subject):
"""تحميل الملف من HF واستخراج النص منه"""
if file_path in self.file_cache:
# logger.info(f"💾 Using cached content for {file_path}") # إلغاء التعليق للتتبع
return self.file_cache[file_path]
logger.info(f"⏳ Downloading {file_path} from Hugging Face...")
try:
# استخدام asyncio.to_thread لتشغيل الدالة المتزامنة في thread منفصل
local_path = await asyncio.to_thread(
hf_hub_download,
repo_id=REPO_ID,
filename=file_path,
repo_type="dataset"
)
text_content = ""
logger.info(f"📄 Extracting content from {local_path}")
if local_path.lower().endswith('.pdf'):
# استخدام PyMuPDF (fitz) لاستخراج أدق
with fitz.open(local_path) as doc:
for page in doc:
text_content += page.get_text("text", sort=True) # محاولة فرز النص
text_content += "\n\n--- Page Break ---\n\n" # إضافة فاصل صفحات
elif local_path.lower().endswith('.txt'):
with open(local_path, 'r', encoding='utf-8', errors='ignore') as f: # تجاهل أخطاء الترميز
text_content = f.read()
elif local_path.lower().endswith('.docx'):
doc_obj = docx.Document(local_path)
full_text = []
for para in doc_obj.paragraphs:
full_text.append(para.text)
text_content = '\n'.join(full_text)
else:
logger.warning(f"Unsupported file type: {local_path}")
return f"Error: Unsupported file type ({os.path.basename(file_path)})."
# تنظيف أساسي ومتقدم
text_content = re.sub(r'\s+\n', '\n', text_content) # إزالة المسافات الزائدة قبل سطر جديد
text_content = re.sub(r'\n{3,}', '\n\n', text_content) # تقليل الأسطر الفارغة المتتالية
text_content = re.sub(r' +', ' ', text_content).strip() # تقليل المسافات المتتالية
if len(text_content) < 50:
logger.warning(f"File {file_path} content is very short or empty after extraction.")
self.file_cache[file_path] = text_content
# logger.debug(f"Content for {file_path}: {text_content[:500]}...") # إلغاء التعليق للتتبع
return text_content
except Exception as e:
logger.error(f"❌ Error downloading/extracting {file_path}: {e}", exc_info=True)
return f"Error: Could not retrieve or process file {os.path.basename(file_path)}."
# ========== (الدوال الرئيسية) ==========
async def explain_lecture(self, user_query, subject, user_id):
"""شرح محاضرة بناءً على استعلام المستخدم"""
file_info = self._find_file_by_query(user_query, subject)
if not file_info:
files_list_text = await self.get_files_list_text(subject)
return f"❌ لم أتمكن من العثور على الملف المطلوب.\n\n{files_list_text}\n\nيرجى تحديد رقم الملف من القائمة أو كتابة جزء واضح من اسمه."
file_path = file_info['path']
file_name = file_info['name']
await bot.application.bot.send_chat_action(chat_id=user_id, action="typing")
content = await self.download_and_extract_content(file_path, subject)
if content.startswith("Error:"):
return f"❌ خطأ في معالجة الملف: {file_name}\n{content}"
if not content.strip():
return f"❌ المحتوى فارغ للملف: {file_name}"
memory = self.get_user_memory(user_id)
memory['history'].append({"role": "user", "content": f"اشرح لي النقاط الأساسية في هذه المحاضرة: {file_name}"})
# قص المحتوى ليلائم نافذة السياق (مع مراعاة التوكنات التقريبية)
max_content_chars = 7000 # تقليل لترك مساحة للبرومبت والرد
if len(content) > max_content_chars:
content_snippet = content[:max_content_chars] + "\n\n[... المحتوى مقطوع ...]"
logger.warning(f"Content for {file_name} truncated to {max_content_chars} chars.")
else:
content_snippet = content
system_prompt = f"أنت مساعد أكاديمي متخصص في مادة {subject}. مهمتك هي شرح النقاط الأساسية في محتوى المحاضرة المقدم لك. ركز على المفاهيم الجوهرية، التعريفات الهامة، النتائج الرئيسية، وأي معلومات ضرورية لفهم الموضوع. قدم الشرح بطريقة منظمة وواضحة باستخدام نقاط Markdown. تجنب التفاصيل الثانوية غير الضرورية."
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"اسم الملف: {file_name}\n\nالمحتوى:\n```\n{content_snippet}\n```\n\nيرجى شرح النقاط الأساسية في هذا المحتوى."}
]
response = await self._call_nvidia_api(messages, 1500)
memory['history'].append({"role": "assistant", "content": response})
return f"📝 **شرح لأهم نقاط ملف: {file_name}**\n\n{response}"
async def explain_concept(self, user_query, subject, user_id):
"""شرح مفهوم أو مصطلح طبي"""
memory = self.get_user_memory(user_id)
memory['history'].append({"role": "user", "content": user_query})
system_prompt = f"أنت خبير أكاديمي في مجال المختبرات الطبية، متخصص حالياً في مادة {subject}. اشرح المفهوم أو المصطلح التالي ({user_query}) بوضوح ودقة. ابدأ بتعريف أساسي، ثم وضح أهميته وتطبيقاته العملية في المختبر، واربطه بمادة {subject} إن أمكن. استخدم لغة سهلة ومباشرة."
messages = [
{"role": "system", "content": system_prompt},
# إضافة آخر تفاعلين للسياق إذا كانت ذات صلة بالمفهوم
*memory['history'][-3:-1], # يأخذ العنصرين قبل الأخير (آخر سؤال وجواب)
{"role": "user", "content": f"اشرح لي المفهوم التالي: {user_query}"}
]
response = await self._call_nvidia_api(messages, 1000) # تقليل التوكنات للردود المركزة
memory['history'].append({"role": "assistant", "content": response})
return f"🧪 **شرح مفهوم: {user_query}**\n\n{response}"
async def process_general_query(self, user_message, subject, user_id):
"""معالجة استعلام عام من المستخدم"""
memory = self.get_user_memory(user_id)
history = memory.get('history', [])
system_prompt = f"أنت مساعد ذكي متخصص في المختبرات الطبية. المادة الدراسية الحالية التي يركز عليها الطالب هي '{subject}'. أجب على سؤال الطالب ({user_message}) إجابة واضحة ومباشرة. استخدم سياق المحادثة السابق إذا كان ضرورياً لفهم السؤال. إذا كان السؤال خارج نطاق المادة أو التخصص، اعتذر بلطف."
messages = [{"role": "system", "content": system_prompt}]
# إضافة آخر 4 تفاعلات للسياق (سؤالين وجوابين)
messages.extend(history[-4:])
messages.append({"role": "user", "content": user_message})
response = await self._call_nvidia_api(messages)
memory['history'].append({"role": "user", "content": user_message})
memory['history'].append({"role": "assistant", "content": response})
return response
async def generate_questions_for_subject(self, subject, user_id):
"""توليد أسئلة من ملف عشوائي في المادة"""
if subject not in self.available_materials or not self.available_materials[subject]['files']:
return "❌ لا توجد ملفات في هذه المادة لتوليد أسئلة منها."
files = self.available_materials[subject]['files']
# تفضيل ملفات المحاضرات أو الملخصات إن وجدت
preferred_files = [f for f in files if f['type'] in ('lecture', 'summary')]
if preferred_files:
file_info = random.choice(preferred_files)
else:
file_info = random.choice(files) # اختر أي ملف إذا لم توجد المفضلة
file_path = file_info['path']
file_name = file_info['name']
await bot.application.bot.send_chat_action(chat_id=user_id, action="typing")
content = await self.download_and_extract_content(file_path, subject)
if content.startswith("Error:") or not content.strip():
logger.error(f"Failed to get content for question generation from {file_name}. Content: {content[:100]}")
# محاولة اختيار ملف آخر مرة واحدة
if len(files) > 1:
logger.info("Retrying with another file for question generation...")
remaining_files = [f for f in files if f['path'] != file_path]
if remaining_files:
file_info = random.choice(remaining_files)
file_path = file_info['path']
file_name = file_info['name']
content = await self.download_and_extract_content(file_path, subject)
if content.startswith("Error:") or not content.strip():
logger.error(f"Retry failed for question generation from {file_name}.")
return f"❌ لم أتمكن من قراءة محتوى صالح لتوليد أسئلة من ملفات المادة."
else:
return f"❌ لم أتمكن من قراءة محتوى صالح لتوليد أسئلة من ملفات المادة."
else:
return f"❌ لم أتمكن من قراءة محتوى صالح لتوليد أسئلة من ملفات المادة."
max_content_chars = 7000
content_snippet = content[:max_content_chars] if len(content) > max_content_chars else content
system_prompt = f"أنت خبير في وضع الأسئلة لمادة {subject}. بناءً على المحتوى التالي من ملف '{file_name}'، قم بإنشاء 5 أسئلة متنوعة لاختبار الفهم. يجب أن تشمل الأسئلة (إذا أمكن): سؤال اختيار من متعدد (MCQ) واحد على الأقل، سؤال إجابة قصيرة واحد على الأقل، وسؤال يتطلب تفكيراً أعمق قليلاً. اجعل الأسئلة واضحة ومباشرة."
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"المحتوى:\n```\n{content_snippet}\n```\n\nقم بإنشاء 5 أسئلة متنوعة بناءً على هذا المحتوى."}
]
response = await self._call_nvidia_api(messages)
return f"❓ **أسئلة مقترحة من ملف: {file_name}**\n\n{response}"
async def generate_summary(self, subject, user_id):
"""تلخيص ملف مهم من المادة"""
if subject not in self.available_materials or not self.available_materials[subject]['files']:
return "❌ لا توجد ملفات في هذه المادة لتلخيصها."
files = self.available_materials[subject]['files']
# إعطاء أولوية للملفات المسماة "summary" أو "ملخص"
summary_file = next((f for f in files if f['type'] == 'summary'), None)
# إذا لم يوجد، اختر أول ملف محاضرة
if not summary_file:
summary_file = next((f for f in files if f['type'] == 'lecture'), None)
# إذا لم يوجد محاضرة أيضاً، اختر أول ملف
if not summary_file:
summary_file = files[0]
file_path = summary_file['path']
file_name = summary_file['name']
await bot.application.bot.send_chat_action(chat_id=user_id, action="typing")
content = await self.download_and_extract_content(file_path, subject)
if content.startswith("Error:") or not content.strip():
logger.error(f"Failed to get content for summary from {file_name}. Content: {content[:100]}")
return f"❌ لم أتمكن من قراءة ملف ({file_name}) للتلخيص."
max_content_chars = 7000
content_snippet = content[:max_content_chars] if len(content) > max_content_chars else content
system_prompt = f"أنت خبير في تلخيص المواد العلمية لمادة {subject}. قم بتلخيص المحتوى التالي من ملف '{file_name}' في 5 نقاط رئيسية وموجزة. يجب أن تغطي النقاط أهم الأفكار والمفاهيم الواردة في النص."
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"المحتوى:\n```\n{content_snippet}\n```\n\nلخص هذا المحتوى في 5 نقاط رئيسية."}
]
response = await self._call_nvidia_api(messages)
return f"📋 **ملخص ملف: {file_name}**\n\n{response}"
# ========== (باقي دوال المعالجة الأصلية) ==========
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""بدء المحادثة وعرض القائمة الرئيسية"""
user_id = update.effective_user.id
logger.info(f"User {user_id} started the bot.")
welcome_text = """
🏥 **مرحباً بك في بوت المختبرات الطبية الذكي** 🔬
أنا هنا لمساعدتك في دراسة موادك! 📚
**المواد المتاحة حالياً:**
"""
if not self.available_materials:
welcome_text += "\n\n⚠️ عذراً، لم أتمكن من تحميل أي مواد دراسية حالياً. حاول تحديث القائمة بالضغط على (🔄)."
else:
for subject in sorted(self.available_materials.keys()): # فرز المواد أبجدياً
file_count = len(self.available_materials[subject]['files'])
welcome_text += f"\n• {subject} ({file_count} ملف)"
welcome_text += "\n\n👇 اختر المادة التي تريد البدء بها:"
keyboard = self.create_subjects_keyboard()
reply_markup = InlineKeyboardMarkup(keyboard)
# إذا كانت استجابة لـ callback query (مثل العودة للقائمة), عدّل الرسالة
if update.callback_query:
try:
await update.callback_query.edit_message_text(welcome_text, reply_markup=reply_markup)
except Exception as e:
logger.error(f"Error editing message in start: {e}")
# إذا فشل التعديل (ربما الرسالة قديمة), أرسل رسالة جديدة
await update.effective_message.reply_text(welcome_text, reply_markup=reply_markup)
else:
await update.message.reply_text(welcome_text, reply_markup=reply_markup)
return SELECTING_SUBJECT
def create_subjects_keyboard(self):
"""إنشاء لوحة مفاتيح للمواد المتاحة"""
keyboard = []
# عرض المواد في عمودين إذا كان عددها أكبر من 4
subjects = sorted(self.available_materials.keys())
row = []
max_cols = 2 if len(subjects) > 4 else 1
for i, subject in enumerate(subjects):
file_count = len(self.available_materials[subject]['files'])
display_name = f"{subject} ({file_count})"
row.append(InlineKeyboardButton(display_name, callback_data=f"subject_{subject}"))
if len(row) == max_cols or i == len(subjects) - 1:
keyboard.append(row)
row = []
keyboard.append([InlineKeyboardButton("🔄 تحديث قائمة المواد", callback_data="refresh_materials")])
keyboard.append([InlineKeyboardButton("❓ مساعدة", callback_data="general_help")])
return keyboard
async def handle_subject_selection(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة اختيار المادة"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
callback_data = query.data
logger.info(f"User {user_id} selected: {callback_data}")
if callback_data == "general_help":
return await self.handle_general_help(query, context)
elif callback_data == "refresh_materials":
await query.edit_message_text("🔄 جاري تحديث قائمة المواد...")
self.load_all_materials() # إعادة تحميل المواد
keyboard = self.create_subjects_keyboard()
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("✅ تم تحديث قائمة المواد.\nاختر المادة:", reply_markup=reply_markup)
return SELECTING_SUBJECT
subject = callback_data.replace("subject_", "")
context.user_data['current_subject'] = subject
memory = self.get_user_memory(user_id)
memory['last_subject'] = subject
# التأكد من أن المادة موجودة (احتياطي)
if subject not in self.available_materials:
logger.error(f"Selected subject '{subject}' not found in available materials.")
await query.edit_message_text("❌ خطأ: المادة المحددة غير موجودة. ربما تحتاج لتحديث القائمة؟")
return await self.start(update, context) # العودة للقائمة الرئيسية
subject_files = self.available_materials[subject]['files']
subject_name = subject.replace('_', ' ').title()
keyboard = [
[InlineKeyboardButton("📖 شرح محاضرة", callback_data="explain_lecture"), InlineKeyboardButton("🔍 استعراض الملفات", callback_data="browse_files")],
[InlineKeyboardButton("❓ توليد أسئلة", callback_data="generate_questions"), InlineKeyboardButton("📝 تلخيص ملف", callback_data="summarize_content")],
[InlineKeyboardButton("🧪 تفسير مفهوم", callback_data="explain_concept")],
[InlineKeyboardButton("🏠 العودة للقائمة الرئيسية", callback_data="main_menu")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"📚 **{subject_name}**\n\n"
f"عدد الملفات المتاحة: {len(subject_files)}\n\n"
f"ماذا تريد أن تفعل؟",
reply_markup=reply_markup,
parse_mode='Markdown'
)
return SELECTING_ACTION
async def handle_back_actions(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة العودة إلى قائمة الإجراءات"""
query = update.callback_query
await query.answer()
logger.info(f"User {query.from_user.id} requested back to actions.")
subject = context.user_data.get('current_subject')
if not subject or subject not in self.available_materials:
logger.warning("Subject context lost or invalid in handle_back_actions. Returning to main menu.")
return await self.start(update, context) # إذا ضاع السياق، عد للبداية
subject_files = self.available_materials[subject]['files']
subject_name = subject.replace('_', ' ').title()
keyboard = [
[InlineKeyboardButton("📖 شرح محاضرة", callback_data="explain_lecture"), InlineKeyboardButton("🔍 استعراض الملفات", callback_data="browse_files")],
[InlineKeyboardButton("❓ توليد أسئلة", callback_data="generate_questions"), InlineKeyboardButton("📝 تلخيص ملف", callback_data="summarize_content")],
[InlineKeyboardButton("🧪 تفسير مفهوم", callback_data="explain_concept")],
[InlineKeyboardButton("🏠 العودة للقائمة الرئيسية", callback_data="main_menu")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"📚 **{subject_name}**\n\n"
f"ماذا تريد أن تفعل؟",
reply_markup=reply_markup,
parse_mode='Markdown'
)
return SELECTING_ACTION
async def handle_main_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة العودة للقائمة الرئيسية"""
query = update.callback_query
await query.answer()
logger.info(f"User {query.from_user.id} requested main menu.")
context.user_data.clear() # تنظيف السياق عند العودة للرئيسية
return await self.start(update, context)
async def handle_more_questions(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة طلب المزيد من الأسئلة أو العودة"""
query = update.callback_query
await query.answer()
logger.info(f"User {query.from_user.id} requested more questions / interaction.")
subject = context.user_data.get('current_subject', 'عام')
keyboard = [
[InlineKeyboardButton("🔙 العودة لقائمة الخيارات", callback_data="back_to_actions")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(f"💬 تفضل، اكتب سؤالك أو طلبك الجديد المتعلق بمادة '{subject}':", reply_markup=reply_markup)
context.user_data['waiting_for'] = 'general' # ضبط الحالة لانتظار استعلام عام
return WAITING_FOR_QUESTION
async def handle_change_subject(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة تغيير المادة"""
query = update.callback_query
await query.answer()
logger.info(f"User {query.from_user.id} requested change subject.")
keyboard = self.create_subjects_keyboard()
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("🔄 اختر المادة الجديدة:", reply_markup=reply_markup)
return SELECTING_SUBJECT
async def handle_action_selection(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة اختيار الإجراء"""
query = update.callback_query
await query.answer()
action = query.data
user_id = query.from_user.id
subject = context.user_data.get('current_subject')
logger.info(f"User {user_id} selected action: {action} for subject: {subject}")
if not subject or subject not in self.available_materials:
logger.error("Subject context lost or invalid in handle_action_selection.")
await query.edit_message_text("❌ حدث خطأ في السياق. يرجى البدء من جديد.")
return await self.start(update, context)
# زر العودة للقائمة الرئيسية
back_button = InlineKeyboardButton("🔙 رجوع", callback_data="back_to_actions")
keyboard_with_back = [[back_button]]
reply_markup_back = InlineKeyboardMarkup(keyboard_with_back)
if action == "main_menu":
return await self.start(update, context)
elif action == "browse_files":
files_text = await self.get_files_list_text(subject)
keyboard = [
# يمكن إضافة زر لشرح ملف من هنا مباشرة إذا أردت
[InlineKeyboardButton("📖 طلب شرح ملف محدد", callback_data="explain_lecture")],
[back_button]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(files_text, reply_markup=reply_markup, parse_mode='Markdown') # إضافة parse_mode
# البقاء في نفس الحالة لعرض الخيارات مرة أخرى
return SELECTING_ACTION
elif action == "explain_lecture":
files_list = await self.get_files_list_text(subject)
if files_list.startswith("❌"): # إذا لم توجد ملفات
await query.edit_message_text(files_list, reply_markup=reply_markup_back)
return SELECTING_ACTION
await query.edit_message_text(
f"📖 **شرح محاضرة**\n\n"
f"{files_list}\n\n"
f"📝 اكتب رقم الملف من القائمة أعلاه أو جزءاً من اسمه:",
reply_markup=reply_markup_back,
parse_mode='Markdown' # إضافة parse_mode
)
context.user_data['waiting_for'] = 'lecture_explanation'
return WAITING_FOR_QUESTION
elif action == "generate_questions":
await query.edit_message_text("⏳ حسناً، جاري توليد بعض الأسئلة للمراجعة...", reply_markup=reply_markup_back)
questions = await self.generate_questions_for_subject(subject, user_id)
# await query.edit_message_text(questions, reply_markup=reply_markup_back) # لا تعدل الرسالة، أرسل الأسئلة كرد
await query.message.reply_text(questions, reply_markup=reply_markup_back, parse_mode='Markdown') # ارسال رد جديد + parse_mode
return SELECTING_ACTION # العودة لقائمة الخيارات
elif action == "explain_concept":
await query.edit_message_text(
"🧪 **تفسير مفهوم**\n\n"
"ما هو المفهوم أو المصطلح الطبي الذي تود شرحه؟\n"
"مثال: 'ما هو تحليل CBC؟' أو 'اشرح لي دورة كريبس'",
reply_markup=reply_markup_back
)
context.user_data['waiting_for'] = 'concept_explanation'
return WAITING_FOR_QUESTION
elif action == "summarize_content":
await query.edit_message_text("⏳ تمام، جاري تلخيص ملف مهم من المادة...", reply_markup=reply_markup_back)
summary = await self.generate_summary(subject, user_id)
# await query.edit_message_text(summary, reply_markup=reply_markup_back) # لا تعدل، أرسل كرد
await query.message.reply_text(summary, reply_markup=reply_markup_back, parse_mode='Markdown') # ارسال رد + parse_mode
return SELECTING_ACTION # العودة لقائمة الخيارات
# إذا لم يتعرف على الإجراء (احتياطي)
logger.warning(f"Unknown action selected: {action}")
await query.message.reply_text("عذراً، لم أتعرف على هذا الخيار.")
return SELECTING_ACTION
async def browse_available_files(self, query, context):
"""عرض الملفات المتاحة للمادة - تم دمجه في handle_action_selection"""
# هذه الدالة لم تعد مستخدمة بشكل مباشر، الكود موجود في handle_action_selection
pass
async def get_files_list_text(self, subject):
"""إنشاء نص لقائمة الملفات"""
if subject not in self.available_materials:
logger.error(f"Subject '{subject}' not found when trying to list files.")
return "❌ خطأ: لم يتم العثور على المادة المحددة."
files = self.available_materials[subject]['files']
if not files:
return "❌ لا توجد ملفات متاحة لهذه المادة بعد."
files_text = f"📁 **الملفات المتاحة لمادة {subject}:**\n\n"
# عرض عدد محدود من الملفات لتجنب رسالة طويلة جداً
max_files_to_show = 25
for i, file_info in enumerate(files[:max_files_to_show], 1):
file_name = file_info['name']
lecture_num = file_info['lecture_number']
file_type = file_info['type']
type_emoji = {
'lecture': '📖',
'lab': '🧪',
'exam': '📝',
'summary': '📋',
'unknown': '📄'
}.get(file_type, '📄')
num_text = f" (محاضرة {lecture_num})" if lecture_num else ""
# تنسيق السطر: الرقم. ايموجي اسم_الملف (رقم المحاضرة إن وجد) - استخدام Markdown للأسماء
# استبدال الشرطة السفلية بمسافة في اسم الملف للعرض فقط
display_name = file_name.replace("_", " ")
files_text += f"{i}. {type_emoji} `{display_name}`{num_text}\n"
if len(files) > max_files_to_show:
files_text += f"\n... و {len(files) - max_files_to_show} ملفات أخرى."
return files_text
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة الرسائل النصية من المستخدم"""
if not update.message or not update.message.text:
return # تجاهل الرسائل غير النصية أو الفارغة
user_message = update.message.text
user_id = update.effective_user.id
waiting_for = context.user_data.get('waiting_for')
subject = context.user_data.get('current_subject', 'general')
logger.info(f"User {user_id} sent message: '{user_message}' | waiting_for: {waiting_for} | subject: {subject}")
await update.message.chat.send_action(action="typing")
response = "عذراً، لم أفهم طلبك. هل يمكنك توضيحه؟" # رد افتراضي
next_state = WAITING_FOR_QUESTION # البقاء في حالة الانتظار افتراضياً
try:
if waiting_for == 'lecture_explanation':
response = await self.explain_lecture(user_message, subject, user_id)
next_state = SELECTING_ACTION # تم الرد، العودة للخيارات
elif waiting_for == 'concept_explanation':
response = await self.explain_concept(user_message, subject, user_id)
next_state = SELECTING_ACTION # تم الرد، العودة للخيارات
else: # يتضمن 'general' أو أي شيء آخر (مثل سؤال عام بعد الضغط على "أسئلة أخرى")
response = await self.process_general_query(user_message, subject, user_id)
next_state = SELECTING_ACTION # تم الرد، العودة للخيارات
# إرسال الرد
await update.message.reply_text(response, parse_mode='Markdown')
# إذا نجح الرد، أرسل قائمة الخيارات التالية
if next_state == SELECTING_ACTION:
context.user_data['waiting_for'] = None # إنهاء حالة الانتظار
keyboard = [
[InlineKeyboardButton("🔄 طرح سؤال آخر", callback_data="more_questions")],
[InlineKeyboardButton("📚 تغيير المادة", callback_data="change_subject")],
[InlineKeyboardButton("🏠 القائمة الرئيسية", callback_data="main_menu")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("هل تحتاج مساعدة أخرى؟", reply_markup=reply_markup)
return next_state # العودة للحالة المناسبة (إما خيارات أو انتظار سؤال آخر)
except Exception as e:
logger.error(f"❌ Error processing message from user {user_id}: {e}", exc_info=True)
await update.message.reply_text("❌ حدث خطأ غير متوقع أثناء معالجة طلبك. لقد تم تسجيل الخطأ. يرجى المحاولة مرة أخرى أو اختيار خيار آخر.")
context.user_data['waiting_for'] = None # إنهاء الانتظار عند حدوث خطأ فادح
return SELECTING_ACTION # العودة لقائمة الخيارات كحل احتياطي آمن
# ========== إنشاء كائن البوت ==========
bot = MedicalLabBot()
# ========== دوال FastAPI ==========
@app.get("/", response_class=HTMLResponse)
async def root():
"""الصفحة الرئيسية"""
materials_count = len(bot.available_materials)
total_files = sum(len(material['files']) for material in bot.available_materials.values())
# تحديد رسالة الحالة بناءً على حالة التهيئة
status_message = "⏳ جاري التهيئة (الاتصال بـ Telegram)..."
status_color = "#ffc107" # أصفر
init_details = ""
if bot.initialization_status == "success":
status_message = "✅ نشط ومهيأ"
status_color = "#28a745" # أخضر
init_details = "تم الاتصال بـ Telegram وضبط الويب هوك بنجاح."
elif bot.initialization_status == "failed":
status_message = "❌ فشل التهيئة"
status_color = "#dc3545" # أحمر
init_details = "فشل الاتصال بـ Telegram أو ضبط الويب هوك بعد عدة محاولات."
return f"""
<html>
<head>
<title>Medical Lab Bot Status</title>
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Oxygen-Sans, Ubuntu, Cantarell, "Helvetica Neue", sans-serif; margin: 40px; background-color: #f8f9fa; color: #343a40; line-height: 1.6; }}
.container {{ max-width: 800px; margin: 20px auto; background-color: #ffffff; padding: 30px; border-radius: 12px; box-shadow: 0 4px 12px rgba(0,0,0,0.08); }}
.status-box {{ padding: 25px; margin-bottom: 25px; border-radius: 8px; border-left: 6px solid {status_color}; background-color: #e9ecef; }}
.status-box h2 {{ margin-top: 0; color: #212529; font-size: 1.6em; }}
.status-box p {{ margin-bottom: 8px; font-size: 1.1em; }}
h1 {{ color: #0056b3; text-align: center; margin-bottom: 30px; font-weight: 600; }}
h3 {{ color: #0056b3; border-bottom: 2px solid #dee2e6; padding-bottom: 8px; margin-top: 30px; }}
ul {{ list-style: none; padding-left: 0; }}
li {{ margin-bottom: 12px; padding-left: 25px; position: relative; }}
li::before {{ content: '✓'; color: {status_color}; position: absolute; left: 0; font-weight: bold; }}
.info-box {{ padding: 20px; background-color: #f1f3f5; border-radius: 8px; margin-top: 30px; font-size: 0.95em; color: #495057; }}
strong {{ color: #212529; }}
</style>
</head>
<body>
<div class="container">
<h1>🏥 حالة بوت المختبرات الطبية الذكي</h1>
<div class="status-box">
<h2>{status_message}</h2>
<p><strong>تفاصيل التهيئة:</strong> {init_details}</p>
<p><strong>المواد المحملة:</strong> {materials_count} مادة</p>
<p><strong>إجمالي الملفات:</strong> {total_files} ملف</p>
</div>
<h3>🎯 المميزات الرئيسية:</h3>
<ul>
<li>شرح المواد الدراسية من ملفات PDF وWord وTXT.</li>
<li>توليد أسئلة متنوعة للمراجعة واختبار الفهم.</li>
<li>تلخيص المحتوى الدراسي في نقاط رئيسية.</li>
<li>تفسير المفاهيم والمصطلحات العلمية والطبية.</li>
<li>استعراض الملفات المتاحة لكل مادة.</li>
<li>ذاكرة محادثة لكل مستخدم للحفاظ على السياق.</li>
</ul>
<div class="info-box">
<strong>ℹ️ معلومات تقنية:</strong> هذا البوت يستخدم FastAPI للواجهة، Uvicorn كخادم، مكتبة python-telegram-bot للتفاعل مع Telegram، Hugging Face Datasets لتخزين المواد الدراسية، وواجهة برمجة تطبيقات NVIDIA للقدرات الذكية.
</div>
</div>
</body>
</html>
"""
# ========== !! الحل البديل: تحديد نوع الاستجابة صراحة !! ==========
@app.post("/telegram", response_model=Dict[str, str])
async def handle_telegram_update(request: Request):
"""معالجة تحديثات Telegram"""
try:
# التحقق من أن التطبيق مهيأ
if not bot.is_initialized or not bot.application:
logger.error("❌ التطبيق غير مهيأ، لا يمكن معالجة التحديث (ربما لا يزال قيد التهيئة).")
# إرجاع 503 Service Unavailable لإخبار Telegram بإعادة المحاولة لاحقًا
raise HTTPException(status_code=503, detail="Application not initialized or still initializing")
update_data = await request.json()
# logger.debug(f"Received update: {update_data}") # إلغاء التعليق للتتبع المكثف
update = Update.de_json(update_data, bot.application.bot)
# معالجة التحديث في مهمة منفصلة لتجنب حظر الحلقة الرئيسية
asyncio.create_task(bot.application.process_update(update))
# إرجاع استجابة سريعة لـ Telegram
return {"status": "ok"}
except json.JSONDecodeError:
logger.error("❌ Received invalid JSON data.")
raise HTTPException(status_code=400, detail="Invalid JSON data")
except HTTPException as http_exc:
# إعادة إرسال الخطأ HTTP كما هو (مثل 503 أعلاه)
raise http_exc
except Exception as e:
logger.error(f"❌ Error processing update: {e}", exc_info=True)
# إرجاع خطأ عام 500 Internal Server Error
raise HTTPException(status_code=500, detail="Internal server error while processing update")
@app.get("/health")
async def health_check():
"""فحص صحة الخدمة"""
materials_count = len(bot.available_materials)
total_files = sum(len(material['files']) for material in bot.available_materials.values())
status_code = 503 # Service Unavailable افتراضياً
service_status = "unhealthy"
if bot.initialization_status == "pending":
service_status = "initializing"
# يمكن إرجاع 200 هنا إذا أردت أن تعتبره "صحي" أثناء التهيئة
# status_code = 200
elif bot.initialization_status == "success":
service_status = "healthy"
status_code = 200 # OK
elif bot.initialization_status == "failed":
service_status = "unhealthy_failed_init"
status_code = 500 # Internal Server Error
response_payload = {
"status": service_status,
"service": "medical-lab-bot",
"initialization_status": bot.initialization_status,
"materials_loaded": materials_count,
"total_files": total_files,
"timestamp": datetime.now().isoformat()
}
# استخدام JSONResponse لتحديد status_code بشكل صحيح
from fastapi.responses import JSONResponse
return JSONResponse(content=response_payload, status_code=status_code)
# ========== التشغيل الرئيسي ==========
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
# يمكن تحديد مستوى السجل من متغير بيئة أيضاً
log_level = os.environ.get("LOG_LEVEL", "info").lower()
logger.info(f"🚀 Starting Medical Lab Bot on port {port} with log level {log_level}")
uvicorn.run(
"app:app", # تغيير لاستخدام صيغة uvicorn القياسية
host="0.0.0.0",
port=port,
log_level=log_level,
reload=False # تعطيل إعادة التحميل التلقائي في الإنتاج
)