Study / app.py
Riy777's picture
Update app.py
b4f9a00 verified
raw
history blame
45.2 kB
import os
import logging
import asyncio
import re
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes, ConversationHandler
from huggingface_hub import HfApi, hf_hub_download, list_repo_files
from openai import OpenAI
import pickle
import json
from datetime import datetime
import PyPDF2
import fitz # PyMuPDF
from PIL import Image
import io
import requests
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import HTMLResponse
import uvicorn
import random
import docx # لإضافة دعم .docx
# ========== (الحل) إضافة المكتبات اللازمة لحل مشكلة DNS ==========
import httpx
import httpx_dnspython
# ==========================================================
# ========== تكوين السجلات ==========
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# ========== التوكنات ومتغيرات البيئة ==========
TELEGRAM_BOT_TOKEN = os.environ.get('TELEGRAM_BOT_TOKEN')
NVAPI_API_KEY = os.environ.get('NVAPI_API_KEY')
SPACE_URL = os.environ.get('SPACE_URL', '')
# ========== تكوين عميل NVIDIA ==========
nvidia_client = OpenAI(
base_url="https://integrate.api.nvidia.com/v1",
api_key=NVAPI_API_KEY
)
# ========== مستودع Hugging Face للمواد ==========
REPO_ID = "Riy777/Study"
# ========== حالات المحادثة ==========
SELECTING_SUBJECT, SELECTING_ACTION, WAITING_FOR_QUESTION = range(3)
# ========== تطبيق FastAPI ==========
# تعديل: استخدام lifespan بدلاً من on_event
@asyncio.contextmanager
async def lifespan(app: FastAPI):
# الكود الذي كان في on_startup
logger.info("🚀 بدء تشغيل FastAPI... بدء مهمة تهيئة البوت في الخلفية.")
asyncio.create_task(bot.initialize_application())
logger.info("✅ خادم FastAPI يعمل. التهيئة (الاتصال بـ Telegram) جارية في الخلفية.")
yield
# كود إيقاف التشغيل (on_shutdown) (إن وجد)
logger.info("🛑 إيقاف تشغيل خادم FastAPI.")
app = FastAPI(title="Medical Lab Bot", version="1.0.0", lifespan=lifespan)
# ========== فئة البوت الرئيسية ==========
class MedicalLabBot:
def __init__(self):
self.conversation_memory = {}
self.available_materials = {}
self.file_cache = {}
self.application = None
self.is_initialized = False
self.initialization_status = "pending" # تتبع حالة التهيئة
self.load_all_materials()
async def initialize_application(self):
"""تهيئة تطبيق التليجرام بشكل غير متزامن مع حل DNS المخصص"""
try:
if self.is_initialized:
return True
logger.info("🔄 جاري تهيئة تطبيق التليجرام...")
# ========== !! الحل الجذري (DNS) !! ==========
logger.info("🔧 إعداد عميل HTTP مخصص مع DNS (1.1.1.1)...")
transport = httpx_dnspython.AsyncDNSTransport(
resolver=httpx_dnspython.SyncResolver(nameservers=["1.1.1.1", "8.8.8.8"])
)
# نستخدم httpx.AsyncClient مع الناقل المخصص
custom_client = httpx.AsyncClient(transport=transport)
# ============================================
self.application = (
Application.builder()
.token(TELEGRAM_BOT_TOKEN)
.http_client(custom_client) # <-- تمرير العميل المخصص هنا
.build()
)
await self.setup_handlers()
max_retries = 3 # تقليل المحاولات لأن المشكلة يفترض أن تُحل
retry_delay = 5 # تقليل التأخير
for attempt in range(max_retries):
try:
logger.info(f"🚀 محاولة تهيئة الاتصال بـ Telegram (محاولة {attempt + 1}/{max_retries})...")
# 1. الاتصال بـ Telegram (getMe)
await self.application.initialize()
logger.info("✅ تم تهيئة الاتصال بـ Telegram بنجاح.")
# 2. ضبط الـ Webhook (فقط بعد نجاح الاتصال)
if SPACE_URL:
webhook_url = f"{SPACE_URL.rstrip('/')}/telegram"
logger.info(f"ℹ️ جاري إعداد الويب هوك على: {webhook_url}")
await self.application.bot.set_webhook(
url=webhook_url,
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=True
)
logger.info(f"✅ Webhook set to: {webhook_url}")
else:
logger.warning("⚠️ SPACE_URL not set. Webhook cannot be set.")
# 3. ضبط الحالة على "ناجح"
self.is_initialized = True
self.initialization_status = "success"
logger.info("✅✅✅ التطبيق جاهز لاستقبال الطلبات.")
return True # نجح، اخرج من الدالة
except Exception as e:
logger.warning(f"⚠️ فشلت محاولة التهيئة {attempt + 1}: {e}")
if attempt < max_retries - 1:
logger.info(f"⏳ الانتظار {retry_delay} ثواني قبل إعادة المحاولة...")
await asyncio.sleep(retry_delay)
else:
logger.error(f"❌ فشل تهيئة التطبيق نهائياً بعد {max_retries} محاولات (حتى مع DNS المخصص).")
# فشل بعد كل المحاولات
self.is_initialized = False
self.initialization_status = "failed"
return False
except Exception as e:
logger.error(f"❌ خطأ فادح في تهيئة التطبيق: {e}")
self.is_initialized = False
self.initialization_status = "failed"
return False
async def setup_handlers(self):
"""إعداد معالجات التليجرام"""
# إعداد Conversation Handler مع إعدادات محسنة
conv_handler = ConversationHandler(
entry_points=[CommandHandler('start', self.start)],
states={
SELECTING_SUBJECT: [
CallbackQueryHandler(self.handle_subject_selection, pattern='^(subject_|general_help|refresh_materials)')
],
SELECTING_ACTION: [
CallbackQueryHandler(self.handle_action_selection, pattern='^(explain_lecture|browse_files|generate_questions|summarize_content|explain_concept|main_menu)$'),
# إضافة معالج العودة هنا أيضاً
CallbackQueryHandler(self.handle_back_actions, pattern='^back_to_actions$')
],
WAITING_FOR_QUESTION: [
MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message),
CallbackQueryHandler(self.handle_back_actions, pattern='^back_to_actions$')
]
},
fallbacks=[
CommandHandler('start', self.start),
CallbackQueryHandler(self.handle_main_menu, pattern='^main_menu$')
],
name="medical_lab_conversation",
persistent=False,
per_message=False
)
# إضافة الـ handlers
self.application.add_handler(conv_handler)
# إضافة handlers منفصلة للاستعلامات العامة
self.application.add_handler(CallbackQueryHandler(self.handle_more_questions, pattern='^more_questions$'))
self.application.add_handler(CallbackQueryHandler(self.handle_change_subject, pattern='^change_subject$'))
logger.info("✅ تم إعداد معالجات التليجرام")
def load_all_materials(self):
"""تحميل جميع المواد والملفات من Hugging Face"""
try:
logger.info("جاري تحميل قائمة المواد من Hugging Face...")
all_files = list_repo_files(repo_id=REPO_ID, repo_type="dataset")
materials = {}
for file_path in all_files:
try:
path_parts = file_path.split('/')
if len(path_parts) >= 2:
subject = path_parts[0]
file_name = path_parts[-1]
if subject not in materials:
materials[subject] = {
'files': [],
'file_details': {}
}
file_info = self.extract_file_info(file_name, file_path)
materials[subject]['files'].append(file_info)
materials[subject]['file_details'][file_name] = file_info
else:
if 'general' not in materials:
materials['general'] = {
'files': [],
'file_details': {}
}
file_info = self.extract_file_info(file_path, file_path)
materials['general']['files'].append(file_info)
materials['general']['file_details'][file_path] = file_info
except Exception as e:
logger.error(f"خطأ في معالجة الملف {file_path}: {e}")
continue
self.available_materials = materials
logger.info(f"✅ تم تحميل {len(materials)} مادة بنجاح")
except Exception as e:
logger.error(f"❌ خطأ في تحميل المواد: {e}")
self.available_materials = {'Biochemistry': {'files': [], 'file_details': {}}} # مادة افتراضية عند الفشل
# ========== (الدوال التي تم إكمالها) ==========
def get_user_memory(self, user_id):
"""الحصول على ذاكرة المستخدم أو إنشاؤها"""
if user_id not in self.conversation_memory:
self.conversation_memory[user_id] = {'history': [], 'last_subject': None}
return self.conversation_memory[user_id]
async def handle_general_help(self, query, context):
"""عرض رسالة المساعدة"""
help_text = """
❓ **مساعدة** ❓
هذا البوت مصمم لمساعدتك في دراسة مواد المختبرات الطبية.
1. **ابدأ** باختيار مادة من القائمة الرئيسية.
2. **اختر الخدمة:**
* **شرح محاضرة:** يعطيك ملخص للملف الذي تختاره.
* **استعراض الملفات:** يعرض لك كل الملفات المتاحة.
* **أسئلة عن المادة:** يولد أسئلة من ملفات عشوائية.
* **ملخص المادة:** يلخص لك ملف مهم من المادة.
* **تفسير مفهوم:** اطرح أي سؤال أو مصطلح (مثل "ما هو CBC") وسأشرحه لك.
3. **تحديث المواد:** اضغط (🔄) لتحديث قائمة المواد من المصدر.
"""
keyboard = [[InlineKeyboardButton("🔙 العودة للقائمة الرئيسية", callback_data="main_menu")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(help_text, reply_markup=reply_markup)
return SELECTING_SUBJECT
def extract_file_info(self, file_name, file_path):
"""استخراج معلومات الملف (النوع، رقم المحاضرة) من الاسم"""
name_lower = file_name.lower()
lecture_num = None
file_type = 'unknown'
# استخراج رقم المحاضرة
match = re.search(r'(lecture|lec|محاضرة)\s*(\d+)', name_lower)
if match:
lecture_num = int(match.group(2))
# تحديد نوع الملف
if 'lab' in name_lower or 'عملي' in name_lower:
file_type = 'lab'
elif 'exam' in name_lower or 'امتحان' in name_lower:
file_type = 'exam'
elif 'summary' in name_lower or 'ملخص' in name_lower:
file_type = 'summary'
elif 'lecture' in name_lower or 'محاضرة' in name_lower:
file_type = 'lecture'
return {
'name': file_name,
'path': file_path,
'lecture_number': lecture_num,
'type': file_type
}
async def _call_nvidia_api(self, messages, max_tokens=1500):
"""دالة مساعدة لاستدعاء NVIDIA API"""
try:
completion = nvidia_client.chat.completions.create(
model="meta/llama3-70b-instruct", # استخدام موديل قوي
messages=messages,
temperature=0.5,
top_p=1,
max_tokens=max_tokens
)
return completion.choices[0].message.content
except Exception as e:
logger.error(f"❌ Error calling NVIDIA API: {e}")
return f"❌ حدث خطأ أثناء التواصل مع الذكاء الاصطناعي: {e}"
def _find_file_by_query(self, query, subject):
"""البحث عن ملف بناءً على استعلام المستخدم (رقم، اسم، الخ)"""
files = self.available_materials[subject]['files']
query_lower = query.lower().strip()
# محاولة البحث بالرقم (مثل "1", "2")
try:
# استخراج الرقم فقط
match_num = re.findall(r'\d+', query_lower)
if match_num:
index = int(match_num[0])
if 1 <= index <= len(files):
return files[index - 1]
except (IndexError, ValueError):
pass # ليس استعلام رقمي
# محاولة البحث برقم المحاضرة (مثل "محاضرة 3")
match = re.search(r'(\d+)', query_lower)
if match:
num = int(match.group(1))
for file_info in files:
if file_info['lecture_number'] == num:
return file_info
# محاولة البحث بالاسم
for file_info in files:
if query_lower in file_info['name'].lower():
return file_info
return None # لم يتم العثور
async def download_and_extract_content(self, file_path, subject):
"""تحميل الملف من HF واستخراج النص منه"""
if file_path in self.file_cache:
logger.info(f"💾 Using cached content for {file_path}")
return self.file_cache[file_path]
logger.info(f"⏳ Downloading {file_path} from Hugging Face...")
try:
local_path = hf_hub_download(
repo_id=REPO_ID,
filename=file_path,
repo_type="dataset"
)
text_content = ""
logger.info(f"📄 Extracting content from {local_path}")
if local_path.lower().endswith('.pdf'):
# استخدام PyMuPDF (fitz) لاستخراج أدق
with fitz.open(local_path) as doc:
for page in doc:
text_content += page.get_text()
elif local_path.lower().endswith('.txt'):
with open(local_path, 'r', encoding='utf-8') as f:
text_content = f.read()
elif local_path.lower().endswith('.docx'):
doc_obj = docx.Document(local_path)
for para in doc_obj.paragraphs:
text_content += para.text + "\n"
else:
logger.warning(f"Unsupported file type: {local_path}")
return f"Error: Unsupported file type ({file_path})."
# تنظيف أساسي
text_content = re.sub(r'\s+', ' ', text_content).strip()
if len(text_content) < 50:
logger.warning(f"File {file_path} content is very short or empty.")
# قد لا يكون هذا خطأ، ربما الملف قصير فعلاً
self.file_cache[file_path] = text_content
return text_content
except Exception as e:
logger.error(f"❌ Error downloading/extracting {file_path}: {e}")
return f"Error: Could not retrieve file {file_path}."
# ========== (الدوال الرئيسية التي كانت ناقصة) ==========
async def explain_lecture(self, user_query, subject, user_id):
"""شرح محاضرة بناءً على استعلام المستخدم"""
file_info = self._find_file_by_query(user_query, subject)
if not file_info:
return "❌ لم أتمكن من العثور على الملف. يرجى تحديد رقم أو اسم الملف بوضوح."
file_path = file_info['path']
await bot.application.bot.send_chat_action(chat_id=user_id, action="typing")
content = await self.download_and_extract_content(file_path, subject)
if content.startswith("Error:"):
return f"❌ خطأ في معالجة الملف: {file_path}\n{content}"
if not content:
return f"❌ المحتوى فارغ للملف: {file_path}"
memory = self.get_user_memory(user_id)
memory['history'].append({"role": "user", "content": f"اشرح لي النقاط الأساسية في هذه المحاضرة: {file_info['name']}"})
system_prompt = f"أنت مساعد مختبرات طبية متخصص. اشرح النقاط الأساسية في المحتوى التالي من مادة {subject} (ملف: {file_info['name']}). ركز على المفاهيم الرئيسية والنتائج المهمة. استخدم تنسيق Markdown ونقاط واضحة."
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"المحتوى:\n```\n{content[:8000]}\n```\n\nاشرح النقاط الأساسية."} # قص المحتوى ليلائم نافذة السياق
]
response = await self._call_nvidia_api(messages, 1500)
memory['history'].append({"role": "assistant", "content": response})
return f"📝 **شرح لأهم نقاط ملف: {file_info['name']}**\n\n{response}"
async def explain_concept(self, user_query, subject, user_id):
"""شرح مفهوم أو مصطلح طبي"""
memory = self.get_user_memory(user_id)
memory['history'].append({"role": "user", "content": user_query})
system_prompt = f"أنت خبير في المختبرات الطبية. اشرح المفهوم التالي بوضوح وبشكل مبسط، مع التركيز على أهميته في مادة {subject} إذا كان ذا صلة."
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_query}
]
response = await self._call_nvidia_api(messages)
memory['history'].append({"role": "assistant", "content": response})
return f"🧪 **شرح مفهوم: {user_query}**\n\n{response}"
async def process_general_query(self, user_message, subject, user_id):
"""معالجة استعلام عام من المستخدم"""
memory = self.get_user_memory(user_id)
history = memory.get('history', [])
system_prompt = f"أنت مساعد ذكي متخصص في المختبرات الطبية. المادة الحالية التي يركز عليها الطالب هي {subject}. أجب على سؤال الطالب بناءً على سياق المحادثة إن وجد."
messages = [{"role": "system", "content": system_prompt}]
messages.extend(history[-5:]) # إضافة آخر 5 تفاعلات للسياق
messages.append({"role": "user", "content": user_message})
response = await self._call_nvidia_api(messages)
memory['history'].append({"role": "user", "content": user_message})
memory['history'].append({"role": "assistant", "content": response})
return response
async def generate_questions_for_subject(self, subject, user_id):
"""توليد أسئلة من ملف عشوائي في المادة"""
files = self.available_materials[subject]['files']
if not files:
return "❌ لا توجد ملفات في هذه المادة لتوليد أسئلة منها."
# اختيار ملف عشوائي
file_info = random.choice(files)
file_path = file_info['path']
await bot.application.bot.send_chat_action(chat_id=user_id, action="typing")
content = await self.download_and_extract_content(file_path, subject)
if content.startswith("Error:") or not content:
return f"❌ لم أتمكن من قراءة ملف ({file_info['name']}) لتوليد أسئلة."
system_prompt = f"أنت خبير في مادة {subject}. قم بإنشاء 5 أسئلة متنوعة (MCQ أو أسئلة قصيرة) بناءً على المحتوى التالي من ملف {file_info['name']}."
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"المحتوى:\n```\n{content[:8000]}\n```\n\nقم بإنشاء 5 أسئلة متنوعة وواضحة."}
]
response = await self._call_nvidia_api(messages)
return f"❓ **أسئلة مقترحة من ملف: {file_info['name']}**\n\n{response}"
async def generate_summary(self, subject, user_id):
"""تلخيص ملف مهم من المادة"""
files = self.available_materials[subject]['files']
if not files:
return "❌ لا توجد ملفات في هذه المادة لتلخيصها."
# محاولة العثور على ملف ملخص
summary_file = next((f for f in files if f['type'] == 'summary'), None)
if not summary_file:
# إذا لم يوجد ملخص، اختر أول محاضرة
summary_file = next((f for f in files if f['type'] == 'lecture'), files[0])
file_path = summary_file['path']
await bot.application.bot.send_chat_action(chat_id=user_id, action="typing")
content = await self.download_and_extract_content(file_path, subject)
if content.startswith("Error:") or not content:
return f"❌ لم أتمكن من قراءة ملف ({summary_file['name']}) للتلخيص."
system_prompt = f"أنت خبير في مادة {subject}. قم بتلخيص المحتوى التالي من ملف {summary_file['name']} في 5 نقاط رئيسية وواضحة."
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"المحتوى:\n```\n{content[:8000]}\n```\n\nلخص المحتوى في 5 نقاط رئيسية."}
]
response = await self._call_nvidia_api(messages)
return f"📋 **ملخص ملف: {summary_file['name']}**\n\n{response}"
# ========== (باقي دوال المعالجة الأصلية) ==========
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""بدء المحادثة وعرض القائمة الرئيسية"""
user_id = update.effective_user.id
welcome_text = """
🏥 **مرحباً بك في بوت المختبرات الطبية الذكي** 🔬
📚 **المواد المتاحة حالياً:**
"""
if not self.available_materials:
welcome_text += "\n\n❌ عذراً، لم أتمكن من تحميل أي مواد دراسية حالياً. حاول تحديث القائمة."
else:
for subject in self.available_materials.keys():
file_count = len(self.available_materials[subject]['files'])
welcome_text += f"\n• {subject} ({file_count} ملف)"
welcome_text += "\n\nاختر المادة التي تريد البدء بها:"
keyboard = self.create_subjects_keyboard()
reply_markup = InlineKeyboardMarkup(keyboard)
if update.callback_query:
await update.callback_query.edit_message_text(welcome_text, reply_markup=reply_markup)
else:
await update.message.reply_text(welcome_text, reply_markup=reply_markup)
return SELECTING_SUBJECT
def create_subjects_keyboard(self):
"""إنشاء لوحة مفاتيح للمواد المتاحة"""
keyboard = []
for subject in self.available_materials.keys():
file_count = len(self.available_materials[subject]['files'])
display_name = f"{subject} ({file_count})"
keyboard.append([InlineKeyboardButton(display_name, callback_data=f"subject_{subject}")])
keyboard.append([InlineKeyboardButton("🔄 تحديث قائمة المواد", callback_data="refresh_materials")])
keyboard.append([InlineKeyboardButton("❓ مساعدة", callback_Ddata="general_help")])
return keyboard
async def handle_subject_selection(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة اختيار المادة"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
callback_data = query.data
if callback_data == "general_help":
return await self.handle_general_help(query, context)
elif callback_data == "refresh_materials":
await query.edit_message_text("🔄 جاري تحديث قائمة المواد...")
self.load_all_materials()
keyboard = self.create_subjects_keyboard()
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("✅ تم تحديث قائمة المواد\nاختر المادة:", reply_markup=reply_markup)
return SELECTING_SUBJECT
subject = callback_data.replace("subject_", "")
context.user_data['current_subject'] = subject
memory = self.get_user_memory(user_id)
memory['last_subject'] = subject
# التأكد من أن المادة موجودة (احتياطي)
if subject not in self.available_materials:
await query.edit_message_text("❌ خطأ: المادة غير موجودة. ربما تحتاج لتحديث القائمة؟")
return await self.start(update, context)
subject_files = self.available_materials[subject]['files']
subject_name = subject.replace('_', ' ').title()
keyboard = [
[InlineKeyboardButton("📖 شرح محاضرة محددة", callback_data="explain_lecture")],
[InlineKeyboardButton("🔍 استعراض جميع الملفات", callback_data="browse_files")],
[InlineKeyboardButton("❓ أسئلة عن المادة", callback_data="generate_questions")],
[InlineKeyboardButton("📝 ملخص المادة", callback_data="summarize_content")],
[InlineKeyboardButton("🧪 تفسير مفهوم", callback_data="explain_concept")],
[InlineKeyboardButton("🏠 القائمة الرئيسية", callback_data="main_menu")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"📚 **{subject_name}**\n\n"
f"عدد الملفات المتاحة: {len(subject_files)}\n"
f"اختر الخدمة التي تريدها:",
reply_markup=reply_markup,
parse_mode='Markdown'
)
return SELECTING_ACTION
async def handle_back_actions(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة العودة إلى قائمة الإجراءات"""
query = update.callback_query
await query.answer()
subject = context.user_data.get('current_subject')
if not subject:
return await self.start(update, context) # إذا ضاع السياق، عد للبداية
subject_files = self.available_materials[subject]['files']
subject_name = subject.replace('_', ' ').title()
keyboard = [
[InlineKeyboardButton("📖 شرح محاضرة محددة", callback_data="explain_lecture")],
[InlineKeyboardButton("🔍 استعراض جميع الملفات", callback_data="browse_files")],
[InlineKeyboardButton("❓ أسئلة عن المادة", callback_data="generate_questions")],
[InlineKeyboardButton("📝 ملخص المادة", callback_data="summarize_content")],
[InlineKeyboardButton("🧪 تفسير مفهوم", callback_data="explain_concept")],
[InlineKeyboardButton("🏠 القائمة الرئيسية", callback_data="main_menu")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"📚 **{subject_name}**\n\n"
f"اختر الخدمة التي تريدها:",
reply_markup=reply_markup,
parse_mode='Markdown'
)
return SELECTING_ACTION
async def handle_main_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة العودة للقائمة الرئيسية"""
query = update.callback_query
await query.answer()
context.user_data.clear() # تنظيف السياق عند العودة للرئيسية
return await self.start(update, context)
async def handle_more_questions(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة طلب المزيد من الأسئلة"""
query = update.callback_query
await query.answer()
subject = context.user_data.get('current_subject', 'عام')
keyboard = [
[InlineKeyboardButton("🔙 العودة للإجراءات", callback_data="back_to_actions")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(f"💬 اكتب سؤالك أو طلبك (في سياق مادة {subject}):", reply_markup=reply_markup)
context.user_data['waiting_for'] = 'general'
return WAITING_FOR_QUESTION
async def handle_change_subject(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة تغيير المادة"""
query = update.callback_query
await query.answer()
keyboard = self.create_subjects_keyboard()
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("اختر المادة:", reply_markup=reply_markup)
return SELECTING_SUBJECT
async def handle_action_selection(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة اختيار الإجراء"""
query = update.callback_query
await query.answer()
action = query.data
user_id = query.from_user.id
subject = context.user_data.get('current_subject')
if not subject:
await query.edit_message_text("❌ حدث خطأ في السياق. يرجى البدء من جديد.")
return await self.start(update, context)
if action == "main_menu":
return await self.start(update, context)
elif action == "browse_files":
return await self.browse_available_files(query, context)
elif action == "explain_lecture":
files_list = await self.get_files_list_text(subject)
keyboard = [[InlineKeyboardButton("🔙 العودة", callback_data="back_to_actions")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"📖 **شرح محاضرة**\n\n"
f"{files_list}\n"
f"اكتب رقم المحاضرة أو اسمها:\n"
f"مثال: '1' أو 'محاضرة 3' أو 'الملف الثاني'",
reply_markup=reply_markup
)
context.user_data['waiting_for'] = 'lecture_explanation'
return WAITING_FOR_QUESTION
elif action == "generate_questions":
await query.edit_message_text("⏳ جاري توليد الأسئلة...")
questions = await self.generate_questions_for_subject(subject, user_id)
keyboard = [[InlineKeyboardButton("🔙 العودة", callback_data="back_to_actions")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(questions, reply_markup=reply_markup)
return SELECTING_ACTION
elif action == "explain_concept":
keyboard = [[InlineKeyboardButton("🔙 العودة", callback_data="back_to_actions")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"🧪 **تفسير مفهوم**\n\n"
"ما هو المفهوم أو المصطلح الذي تريد شرحه؟\n"
"مثال: 'ما هو تحليل الإنزيمات' أو 'اشرح لي تحليل البول'",
reply_markup=reply_markup
)
context.user_data['waiting_for'] = 'concept_explanation'
return WAITING_FOR_QUESTION
elif action == "summarize_content":
await query.edit_message_text("⏳ جاري تلخيص المادة...")
summary = await self.generate_summary(subject, user_id)
keyboard = [[InlineKeyboardButton("🔙 العودة", callback_data="back_to_actions")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(summary, reply_markup=reply_markup)
return SELECTING_ACTION
async def browse_available_files(self, query, context):
"""عرض الملفات المتاحة للمادة"""
subject = context.user_data.get('current_subject')
files_text = await self.get_files_list_text(subject)
keyboard = [
[InlineKeyboardButton("📖 طلب شرح ملف", callback_data="explain_lecture")],
[InlineKeyboardButton("🔙 العودة", callback_data="back_to_actions")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(files_text, reply_markup=reply_markup)
return SELECTING_ACTION
async def get_files_list_text(self, subject):
"""إنشاء نص لقائمة الملفات"""
if subject not in self.available_materials:
return "❌ خطأ: لم يتم العثور على المادة المحددة."
files = self.available_materials[subject]['files']
if not files:
return "❌ لا توجد ملفات متاحة لهذه المادة."
files_text = "📁 **الملفات المتاحة:**\n\n"
for i, file_info in enumerate(files[:20], 1): # زيادة الحد إلى 20
file_name = file_info['name']
lecture_num = file_info['lecture_number']
file_type = file_info['type']
type_emoji = {
'lecture': '📖',
'lab': '🧪',
'exam': '📝',
'summary': '📋',
'unknown': '📄'
}.get(file_type, '📄')
num_text = f" - (محاضرة {lecture_num})" if lecture_num else ""
files_text += f"{i}. {type_emoji} {file_name}{num_text}\n"
if len(files) > 20:
files_text += f"\n... وغيرها {len(files) - 20} ملف آخر"
return files_text
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""معالجة الرسائل النصية من المستخدم"""
user_message = update.message.text
user_id = update.effective_user.id
waiting_for = context.user_data.get('waiting_for')
subject = context.user_data.get('current_subject', 'general')
await update.message.chat.send_action(action="typing")
try:
if waiting_for == 'lecture_explanation':
response = await self.explain_lecture(user_message, subject, user_id)
elif waiting_for == 'concept_explanation':
response = await self.explain_concept(user_message, subject, user_id)
else: # يتضمن 'general' أو أي شيء آخر
response = await self.process_general_query(user_message, subject, user_id)
await update.message.reply_text(response, parse_mode='Markdown')
keyboard = [
[InlineKeyboardButton("🔄 أسئلة أخرى", callback_data="more_questions")],
[InlineKeyboardButton("📚 تغيير المادة", callback_data="change_subject")],
[InlineKeyboardButton("🏠 القائمة الرئيسية", callback_data="main_menu")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("ماذا تريد أن تفعل بعد؟", reply_markup=reply_markup)
context.user_data['waiting_for'] = None # إنهاء حالة الانتظار
return SELECTING_ACTION # العودة إلى قائمة الإجراءات
except Exception as e:
logger.error(f"❌ Error processing message: {e}")
await update.message.reply_text("❌ حدث خطأ أثناء معالجة طلبك. يرجى المحاولة مرة أخرى.")
return SELECTING_ACTION
# ========== إنشاء كائن البوت ==========
bot = MedicalLabBot()
# ========== دوال FastAPI ==========
# تم نقل @app.on_event("startup") إلى تعريف "lifespan"
# @app.on_event("startup")
# async def on_startup():
# """بدء تشغيل التطبيق وبدء تهيئة البوت في الخلفية"""
# logger.info("🚀 بدء تشغيل FastAPI... بدء مهمة تهيئة البوت في الخلفية.")
# # لا ننتظر (no await) - سيبدأ الخادم فوراً، وستعمل هذه المهمة في الخلفية
# asyncio.create_task(bot.initialize_application())
# logger.info("✅ خادم FastAPI يعمل. التهيئة (الاتصال بـ Telegram) جارية في الخلفية.")
@app.get("/", response_class=HTMLResponse)
async def root():
"""الصفحة الرئيسية"""
materials_count = len(bot.available_materials)
total_files = sum(len(material['files']) for material in bot.available_materials.values())
# تحديد رسالة الحالة بناءً على حالة التهيئة
status_message = "⏳ جاري التهيئة (الاتصال بـ Telegram)..."
status_color = "#ffc107" # أصفر
if bot.initialization_status == "success":
status_message = "✅ نشط ومهيأ"
status_color = "#28a745" # أخضر
elif bot.initialization_status == "failed":
status_message = "❌ فشل التهيئة (خطأ في الاتصال بـ Telegram)"
status_color = "#dc3545" # أحمر
return f"""
<html>
<head>
<title>Medical Lab Bot</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; background: #f5f5f5; }}
.container {{ max-width: 800px; margin: 0 auto; background: white; padding: 30px; border-radius: 15px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); }}
.status {{ padding: 20px; background: #f0f8ff; border-radius: 10px; border-left: 5px solid {status_color}; }}
h1 {{ color: #2c3e50; text-align: center; }}
ul {{ line-height: 1.8; }}
li {{ margin: 10px 0; }}
</style>
</head>
<body>
<div class="container">
<h1>🏥 بوت المختبرات الطبية الذكي</h1>
<div class="status">
<h2>حالة البوت: {status_message}</h2>
<p><strong>المواد المحملة:</strong> {materials_count} مادة</p>
<p><strong>إجمالي الملفات:</strong> {total_files} ملف</p>
</div>
<h3>🎯 المميزات:</h3>
<ul>
<li>📚 شرح المواد الدراسية من ملفات PDF وWord</li>
<li>❓ توليد أسئلة متنوعة للمراجعة</li>
<li>📖 تلخيص المحتوى الدراسي</li>
<li>🧪 تفسير المفاهيم العلمية</li>
<li>💾 ذاكرة محادثة لكل مستخدم</li>
</ul>
<div class="status" style="margin-top: 20px;">
<p><strong>ℹ️ معلومات:</strong> هذا البوت يستخدم الذكاء الاصطناعي من NVIDIA وملفات Hugging Face</p>
</div>
</div>
</body>
</html>
"""
@app.post("/telegram")
async def handle_telegram_update(request: Request):
"""معالجة تحديثات Telegram"""
try:
# التحقق من أن التطبيق مهيأ
if not bot.is_initialized or not bot.application:
logger.error("❌ التطبيق غير مهيأ، لا يمكن معالجة التحديث (ربما لا يزال قيد التهيئة).")
raise HTTPException(status_code=503, detail="Application not initialized or still initializing")
update_data = await request.json()
update = Update.de_json(update_data, bot.application.bot)
# معالجة التحديث
await bot.application.process_update(update)
return {"status": "ok"}
except Exception as e:
logger.error(f"❌ Error processing update: {e}")
raise HTTPException(status_code=400, detail="Invalid update")
@app.get("/health")
async def health_check():
"""فحص صحة الخدمة"""
materials_count = len(bot.available_materials)
total_files = sum(len(material['files']) for material in bot.available_materials.values())
status = "unhealthy"
if bot.initialization_status == "pending":
status = "initializing"
elif bot.initialization_status == "success":
status = "healthy"
elif bot.initialization_status == "failed":
status = "unhealthy_failed_init"
return {
"status": status,
"service": "medical-lab-bot",
"initialization_status": bot.initialization_status,
"materials_loaded": materials_count,
"total_files": total_files,
"timestamp": datetime.now().isoformat()
}
# ========== التشغيل الرئيسي ==========
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
logger.info(f"🚀 Starting Medical Lab Bot on port {port}")
uvicorn.run(app, host="0.0.0.0", port=port)