|
|
import os |
|
|
import logging |
|
|
import asyncio |
|
|
import re |
|
|
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup |
|
|
from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes, ConversationHandler |
|
|
from huggingface_hub import HfApi, hf_hub_download, list_repo_files |
|
|
from openai import OpenAI |
|
|
import pickle |
|
|
import json |
|
|
from datetime import datetime |
|
|
import PyPDF2 |
|
|
import fitz |
|
|
from PIL import Image |
|
|
import io |
|
|
import requests |
|
|
from fastapi import FastAPI, Request, HTTPException |
|
|
from fastapi.responses import HTMLResponse |
|
|
import uvicorn |
|
|
import random |
|
|
import docx |
|
|
|
|
|
|
|
|
import httpx |
|
|
import httpx_dnspython |
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
|
level=logging.INFO |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
TELEGRAM_BOT_TOKEN = os.environ.get('TELEGRAM_BOT_TOKEN') |
|
|
NVAPI_API_KEY = os.environ.get('NVAPI_API_KEY') |
|
|
SPACE_URL = os.environ.get('SPACE_URL', '') |
|
|
|
|
|
|
|
|
nvidia_client = OpenAI( |
|
|
base_url="https://integrate.api.nvidia.com/v1", |
|
|
api_key=NVAPI_API_KEY |
|
|
) |
|
|
|
|
|
|
|
|
REPO_ID = "Riy777/Study" |
|
|
|
|
|
|
|
|
SELECTING_SUBJECT, SELECTING_ACTION, WAITING_FOR_QUESTION = range(3) |
|
|
|
|
|
|
|
|
|
|
|
@asyncio.contextmanager |
|
|
async def lifespan(app: FastAPI): |
|
|
|
|
|
logger.info("🚀 بدء تشغيل FastAPI... بدء مهمة تهيئة البوت في الخلفية.") |
|
|
asyncio.create_task(bot.initialize_application()) |
|
|
logger.info("✅ خادم FastAPI يعمل. التهيئة (الاتصال بـ Telegram) جارية في الخلفية.") |
|
|
yield |
|
|
|
|
|
logger.info("🛑 إيقاف تشغيل خادم FastAPI.") |
|
|
|
|
|
app = FastAPI(title="Medical Lab Bot", version="1.0.0", lifespan=lifespan) |
|
|
|
|
|
|
|
|
class MedicalLabBot: |
|
|
def __init__(self): |
|
|
self.conversation_memory = {} |
|
|
self.available_materials = {} |
|
|
self.file_cache = {} |
|
|
self.application = None |
|
|
self.is_initialized = False |
|
|
self.initialization_status = "pending" |
|
|
self.load_all_materials() |
|
|
|
|
|
async def initialize_application(self): |
|
|
"""تهيئة تطبيق التليجرام بشكل غير متزامن مع حل DNS المخصص""" |
|
|
try: |
|
|
if self.is_initialized: |
|
|
return True |
|
|
|
|
|
logger.info("🔄 جاري تهيئة تطبيق التليجرام...") |
|
|
|
|
|
|
|
|
logger.info("🔧 إعداد عميل HTTP مخصص مع DNS (1.1.1.1)...") |
|
|
transport = httpx_dnspython.AsyncDNSTransport( |
|
|
resolver=httpx_dnspython.SyncResolver(nameservers=["1.1.1.1", "8.8.8.8"]) |
|
|
) |
|
|
|
|
|
custom_client = httpx.AsyncClient(transport=transport) |
|
|
|
|
|
|
|
|
self.application = ( |
|
|
Application.builder() |
|
|
.token(TELEGRAM_BOT_TOKEN) |
|
|
.http_client(custom_client) |
|
|
.build() |
|
|
) |
|
|
await self.setup_handlers() |
|
|
|
|
|
max_retries = 3 |
|
|
retry_delay = 5 |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
logger.info(f"🚀 محاولة تهيئة الاتصال بـ Telegram (محاولة {attempt + 1}/{max_retries})...") |
|
|
|
|
|
await self.application.initialize() |
|
|
logger.info("✅ تم تهيئة الاتصال بـ Telegram بنجاح.") |
|
|
|
|
|
|
|
|
if SPACE_URL: |
|
|
webhook_url = f"{SPACE_URL.rstrip('/')}/telegram" |
|
|
logger.info(f"ℹ️ جاري إعداد الويب هوك على: {webhook_url}") |
|
|
await self.application.bot.set_webhook( |
|
|
url=webhook_url, |
|
|
allowed_updates=Update.ALL_TYPES, |
|
|
drop_pending_updates=True |
|
|
) |
|
|
logger.info(f"✅ Webhook set to: {webhook_url}") |
|
|
else: |
|
|
logger.warning("⚠️ SPACE_URL not set. Webhook cannot be set.") |
|
|
|
|
|
|
|
|
self.is_initialized = True |
|
|
self.initialization_status = "success" |
|
|
logger.info("✅✅✅ التطبيق جاهز لاستقبال الطلبات.") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"⚠️ فشلت محاولة التهيئة {attempt + 1}: {e}") |
|
|
if attempt < max_retries - 1: |
|
|
logger.info(f"⏳ الانتظار {retry_delay} ثواني قبل إعادة المحاولة...") |
|
|
await asyncio.sleep(retry_delay) |
|
|
else: |
|
|
logger.error(f"❌ فشل تهيئة التطبيق نهائياً بعد {max_retries} محاولات (حتى مع DNS المخصص).") |
|
|
|
|
|
|
|
|
self.is_initialized = False |
|
|
self.initialization_status = "failed" |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ خطأ فادح في تهيئة التطبيق: {e}") |
|
|
self.is_initialized = False |
|
|
self.initialization_status = "failed" |
|
|
return False |
|
|
|
|
|
async def setup_handlers(self): |
|
|
"""إعداد معالجات التليجرام""" |
|
|
|
|
|
conv_handler = ConversationHandler( |
|
|
entry_points=[CommandHandler('start', self.start)], |
|
|
states={ |
|
|
SELECTING_SUBJECT: [ |
|
|
CallbackQueryHandler(self.handle_subject_selection, pattern='^(subject_|general_help|refresh_materials)') |
|
|
], |
|
|
SELECTING_ACTION: [ |
|
|
CallbackQueryHandler(self.handle_action_selection, pattern='^(explain_lecture|browse_files|generate_questions|summarize_content|explain_concept|main_menu)$'), |
|
|
|
|
|
CallbackQueryHandler(self.handle_back_actions, pattern='^back_to_actions$') |
|
|
], |
|
|
WAITING_FOR_QUESTION: [ |
|
|
MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message), |
|
|
CallbackQueryHandler(self.handle_back_actions, pattern='^back_to_actions$') |
|
|
] |
|
|
}, |
|
|
fallbacks=[ |
|
|
CommandHandler('start', self.start), |
|
|
CallbackQueryHandler(self.handle_main_menu, pattern='^main_menu$') |
|
|
], |
|
|
name="medical_lab_conversation", |
|
|
persistent=False, |
|
|
per_message=False |
|
|
) |
|
|
|
|
|
|
|
|
self.application.add_handler(conv_handler) |
|
|
|
|
|
|
|
|
self.application.add_handler(CallbackQueryHandler(self.handle_more_questions, pattern='^more_questions$')) |
|
|
self.application.add_handler(CallbackQueryHandler(self.handle_change_subject, pattern='^change_subject$')) |
|
|
|
|
|
logger.info("✅ تم إعداد معالجات التليجرام") |
|
|
|
|
|
def load_all_materials(self): |
|
|
"""تحميل جميع المواد والملفات من Hugging Face""" |
|
|
try: |
|
|
logger.info("جاري تحميل قائمة المواد من Hugging Face...") |
|
|
all_files = list_repo_files(repo_id=REPO_ID, repo_type="dataset") |
|
|
|
|
|
materials = {} |
|
|
|
|
|
for file_path in all_files: |
|
|
try: |
|
|
path_parts = file_path.split('/') |
|
|
|
|
|
if len(path_parts) >= 2: |
|
|
subject = path_parts[0] |
|
|
file_name = path_parts[-1] |
|
|
|
|
|
if subject not in materials: |
|
|
materials[subject] = { |
|
|
'files': [], |
|
|
'file_details': {} |
|
|
} |
|
|
|
|
|
file_info = self.extract_file_info(file_name, file_path) |
|
|
materials[subject]['files'].append(file_info) |
|
|
materials[subject]['file_details'][file_name] = file_info |
|
|
|
|
|
else: |
|
|
if 'general' not in materials: |
|
|
materials['general'] = { |
|
|
'files': [], |
|
|
'file_details': {} |
|
|
} |
|
|
file_info = self.extract_file_info(file_path, file_path) |
|
|
materials['general']['files'].append(file_info) |
|
|
materials['general']['file_details'][file_path] = file_info |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"خطأ في معالجة الملف {file_path}: {e}") |
|
|
continue |
|
|
|
|
|
self.available_materials = materials |
|
|
logger.info(f"✅ تم تحميل {len(materials)} مادة بنجاح") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ خطأ في تحميل المواد: {e}") |
|
|
self.available_materials = {'Biochemistry': {'files': [], 'file_details': {}}} |
|
|
|
|
|
|
|
|
|
|
|
def get_user_memory(self, user_id): |
|
|
"""الحصول على ذاكرة المستخدم أو إنشاؤها""" |
|
|
if user_id not in self.conversation_memory: |
|
|
self.conversation_memory[user_id] = {'history': [], 'last_subject': None} |
|
|
return self.conversation_memory[user_id] |
|
|
|
|
|
async def handle_general_help(self, query, context): |
|
|
"""عرض رسالة المساعدة""" |
|
|
help_text = """ |
|
|
❓ **مساعدة** ❓ |
|
|
|
|
|
هذا البوت مصمم لمساعدتك في دراسة مواد المختبرات الطبية. |
|
|
|
|
|
1. **ابدأ** باختيار مادة من القائمة الرئيسية. |
|
|
2. **اختر الخدمة:** |
|
|
* **شرح محاضرة:** يعطيك ملخص للملف الذي تختاره. |
|
|
* **استعراض الملفات:** يعرض لك كل الملفات المتاحة. |
|
|
* **أسئلة عن المادة:** يولد أسئلة من ملفات عشوائية. |
|
|
* **ملخص المادة:** يلخص لك ملف مهم من المادة. |
|
|
* **تفسير مفهوم:** اطرح أي سؤال أو مصطلح (مثل "ما هو CBC") وسأشرحه لك. |
|
|
3. **تحديث المواد:** اضغط (🔄) لتحديث قائمة المواد من المصدر. |
|
|
""" |
|
|
keyboard = [[InlineKeyboardButton("🔙 العودة للقائمة الرئيسية", callback_data="main_menu")]] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
|
|
|
await query.edit_message_text(help_text, reply_markup=reply_markup) |
|
|
return SELECTING_SUBJECT |
|
|
|
|
|
def extract_file_info(self, file_name, file_path): |
|
|
"""استخراج معلومات الملف (النوع، رقم المحاضرة) من الاسم""" |
|
|
name_lower = file_name.lower() |
|
|
lecture_num = None |
|
|
file_type = 'unknown' |
|
|
|
|
|
|
|
|
match = re.search(r'(lecture|lec|محاضرة)\s*(\d+)', name_lower) |
|
|
if match: |
|
|
lecture_num = int(match.group(2)) |
|
|
|
|
|
|
|
|
if 'lab' in name_lower or 'عملي' in name_lower: |
|
|
file_type = 'lab' |
|
|
elif 'exam' in name_lower or 'امتحان' in name_lower: |
|
|
file_type = 'exam' |
|
|
elif 'summary' in name_lower or 'ملخص' in name_lower: |
|
|
file_type = 'summary' |
|
|
elif 'lecture' in name_lower or 'محاضرة' in name_lower: |
|
|
file_type = 'lecture' |
|
|
|
|
|
return { |
|
|
'name': file_name, |
|
|
'path': file_path, |
|
|
'lecture_number': lecture_num, |
|
|
'type': file_type |
|
|
} |
|
|
|
|
|
async def _call_nvidia_api(self, messages, max_tokens=1500): |
|
|
"""دالة مساعدة لاستدعاء NVIDIA API""" |
|
|
try: |
|
|
completion = nvidia_client.chat.completions.create( |
|
|
model="meta/llama3-70b-instruct", |
|
|
messages=messages, |
|
|
temperature=0.5, |
|
|
top_p=1, |
|
|
max_tokens=max_tokens |
|
|
) |
|
|
return completion.choices[0].message.content |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error calling NVIDIA API: {e}") |
|
|
return f"❌ حدث خطأ أثناء التواصل مع الذكاء الاصطناعي: {e}" |
|
|
|
|
|
def _find_file_by_query(self, query, subject): |
|
|
"""البحث عن ملف بناءً على استعلام المستخدم (رقم، اسم، الخ)""" |
|
|
files = self.available_materials[subject]['files'] |
|
|
query_lower = query.lower().strip() |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
match_num = re.findall(r'\d+', query_lower) |
|
|
if match_num: |
|
|
index = int(match_num[0]) |
|
|
if 1 <= index <= len(files): |
|
|
return files[index - 1] |
|
|
except (IndexError, ValueError): |
|
|
pass |
|
|
|
|
|
|
|
|
match = re.search(r'(\d+)', query_lower) |
|
|
if match: |
|
|
num = int(match.group(1)) |
|
|
for file_info in files: |
|
|
if file_info['lecture_number'] == num: |
|
|
return file_info |
|
|
|
|
|
|
|
|
for file_info in files: |
|
|
if query_lower in file_info['name'].lower(): |
|
|
return file_info |
|
|
|
|
|
return None |
|
|
|
|
|
async def download_and_extract_content(self, file_path, subject): |
|
|
"""تحميل الملف من HF واستخراج النص منه""" |
|
|
if file_path in self.file_cache: |
|
|
logger.info(f"💾 Using cached content for {file_path}") |
|
|
return self.file_cache[file_path] |
|
|
|
|
|
logger.info(f"⏳ Downloading {file_path} from Hugging Face...") |
|
|
try: |
|
|
local_path = hf_hub_download( |
|
|
repo_id=REPO_ID, |
|
|
filename=file_path, |
|
|
repo_type="dataset" |
|
|
) |
|
|
|
|
|
text_content = "" |
|
|
logger.info(f"📄 Extracting content from {local_path}") |
|
|
|
|
|
if local_path.lower().endswith('.pdf'): |
|
|
|
|
|
with fitz.open(local_path) as doc: |
|
|
for page in doc: |
|
|
text_content += page.get_text() |
|
|
|
|
|
elif local_path.lower().endswith('.txt'): |
|
|
with open(local_path, 'r', encoding='utf-8') as f: |
|
|
text_content = f.read() |
|
|
|
|
|
elif local_path.lower().endswith('.docx'): |
|
|
doc_obj = docx.Document(local_path) |
|
|
for para in doc_obj.paragraphs: |
|
|
text_content += para.text + "\n" |
|
|
|
|
|
else: |
|
|
logger.warning(f"Unsupported file type: {local_path}") |
|
|
return f"Error: Unsupported file type ({file_path})." |
|
|
|
|
|
|
|
|
text_content = re.sub(r'\s+', ' ', text_content).strip() |
|
|
|
|
|
if len(text_content) < 50: |
|
|
logger.warning(f"File {file_path} content is very short or empty.") |
|
|
|
|
|
|
|
|
self.file_cache[file_path] = text_content |
|
|
return text_content |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error downloading/extracting {file_path}: {e}") |
|
|
return f"Error: Could not retrieve file {file_path}." |
|
|
|
|
|
|
|
|
|
|
|
async def explain_lecture(self, user_query, subject, user_id): |
|
|
"""شرح محاضرة بناءً على استعلام المستخدم""" |
|
|
file_info = self._find_file_by_query(user_query, subject) |
|
|
|
|
|
if not file_info: |
|
|
return "❌ لم أتمكن من العثور على الملف. يرجى تحديد رقم أو اسم الملف بوضوح." |
|
|
|
|
|
file_path = file_info['path'] |
|
|
await bot.application.bot.send_chat_action(chat_id=user_id, action="typing") |
|
|
content = await self.download_and_extract_content(file_path, subject) |
|
|
|
|
|
if content.startswith("Error:"): |
|
|
return f"❌ خطأ في معالجة الملف: {file_path}\n{content}" |
|
|
|
|
|
if not content: |
|
|
return f"❌ المحتوى فارغ للملف: {file_path}" |
|
|
|
|
|
memory = self.get_user_memory(user_id) |
|
|
memory['history'].append({"role": "user", "content": f"اشرح لي النقاط الأساسية في هذه المحاضرة: {file_info['name']}"}) |
|
|
|
|
|
system_prompt = f"أنت مساعد مختبرات طبية متخصص. اشرح النقاط الأساسية في المحتوى التالي من مادة {subject} (ملف: {file_info['name']}). ركز على المفاهيم الرئيسية والنتائج المهمة. استخدم تنسيق Markdown ونقاط واضحة." |
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": f"المحتوى:\n```\n{content[:8000]}\n```\n\nاشرح النقاط الأساسية."} |
|
|
] |
|
|
|
|
|
response = await self._call_nvidia_api(messages, 1500) |
|
|
memory['history'].append({"role": "assistant", "content": response}) |
|
|
return f"📝 **شرح لأهم نقاط ملف: {file_info['name']}**\n\n{response}" |
|
|
|
|
|
async def explain_concept(self, user_query, subject, user_id): |
|
|
"""شرح مفهوم أو مصطلح طبي""" |
|
|
memory = self.get_user_memory(user_id) |
|
|
memory['history'].append({"role": "user", "content": user_query}) |
|
|
|
|
|
system_prompt = f"أنت خبير في المختبرات الطبية. اشرح المفهوم التالي بوضوح وبشكل مبسط، مع التركيز على أهميته في مادة {subject} إذا كان ذا صلة." |
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": user_query} |
|
|
] |
|
|
|
|
|
response = await self._call_nvidia_api(messages) |
|
|
memory['history'].append({"role": "assistant", "content": response}) |
|
|
return f"🧪 **شرح مفهوم: {user_query}**\n\n{response}" |
|
|
|
|
|
async def process_general_query(self, user_message, subject, user_id): |
|
|
"""معالجة استعلام عام من المستخدم""" |
|
|
memory = self.get_user_memory(user_id) |
|
|
history = memory.get('history', []) |
|
|
|
|
|
system_prompt = f"أنت مساعد ذكي متخصص في المختبرات الطبية. المادة الحالية التي يركز عليها الطالب هي {subject}. أجب على سؤال الطالب بناءً على سياق المحادثة إن وجد." |
|
|
|
|
|
messages = [{"role": "system", "content": system_prompt}] |
|
|
messages.extend(history[-5:]) |
|
|
messages.append({"role": "user", "content": user_message}) |
|
|
|
|
|
response = await self._call_nvidia_api(messages) |
|
|
memory['history'].append({"role": "user", "content": user_message}) |
|
|
memory['history'].append({"role": "assistant", "content": response}) |
|
|
return response |
|
|
|
|
|
async def generate_questions_for_subject(self, subject, user_id): |
|
|
"""توليد أسئلة من ملف عشوائي في المادة""" |
|
|
files = self.available_materials[subject]['files'] |
|
|
if not files: |
|
|
return "❌ لا توجد ملفات في هذه المادة لتوليد أسئلة منها." |
|
|
|
|
|
|
|
|
file_info = random.choice(files) |
|
|
file_path = file_info['path'] |
|
|
|
|
|
await bot.application.bot.send_chat_action(chat_id=user_id, action="typing") |
|
|
content = await self.download_and_extract_content(file_path, subject) |
|
|
|
|
|
if content.startswith("Error:") or not content: |
|
|
return f"❌ لم أتمكن من قراءة ملف ({file_info['name']}) لتوليد أسئلة." |
|
|
|
|
|
system_prompt = f"أنت خبير في مادة {subject}. قم بإنشاء 5 أسئلة متنوعة (MCQ أو أسئلة قصيرة) بناءً على المحتوى التالي من ملف {file_info['name']}." |
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": f"المحتوى:\n```\n{content[:8000]}\n```\n\nقم بإنشاء 5 أسئلة متنوعة وواضحة."} |
|
|
] |
|
|
|
|
|
response = await self._call_nvidia_api(messages) |
|
|
return f"❓ **أسئلة مقترحة من ملف: {file_info['name']}**\n\n{response}" |
|
|
|
|
|
async def generate_summary(self, subject, user_id): |
|
|
"""تلخيص ملف مهم من المادة""" |
|
|
files = self.available_materials[subject]['files'] |
|
|
if not files: |
|
|
return "❌ لا توجد ملفات في هذه المادة لتلخيصها." |
|
|
|
|
|
|
|
|
summary_file = next((f for f in files if f['type'] == 'summary'), None) |
|
|
if not summary_file: |
|
|
|
|
|
summary_file = next((f for f in files if f['type'] == 'lecture'), files[0]) |
|
|
|
|
|
file_path = summary_file['path'] |
|
|
await bot.application.bot.send_chat_action(chat_id=user_id, action="typing") |
|
|
content = await self.download_and_extract_content(file_path, subject) |
|
|
|
|
|
if content.startswith("Error:") or not content: |
|
|
return f"❌ لم أتمكن من قراءة ملف ({summary_file['name']}) للتلخيص." |
|
|
|
|
|
system_prompt = f"أنت خبير في مادة {subject}. قم بتلخيص المحتوى التالي من ملف {summary_file['name']} في 5 نقاط رئيسية وواضحة." |
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": f"المحتوى:\n```\n{content[:8000]}\n```\n\nلخص المحتوى في 5 نقاط رئيسية."} |
|
|
] |
|
|
|
|
|
response = await self._call_nvidia_api(messages) |
|
|
return f"📋 **ملخص ملف: {summary_file['name']}**\n\n{response}" |
|
|
|
|
|
|
|
|
|
|
|
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""بدء المحادثة وعرض القائمة الرئيسية""" |
|
|
user_id = update.effective_user.id |
|
|
|
|
|
welcome_text = """ |
|
|
🏥 **مرحباً بك في بوت المختبرات الطبية الذكي** 🔬 |
|
|
|
|
|
📚 **المواد المتاحة حالياً:** |
|
|
""" |
|
|
|
|
|
if not self.available_materials: |
|
|
welcome_text += "\n\n❌ عذراً، لم أتمكن من تحميل أي مواد دراسية حالياً. حاول تحديث القائمة." |
|
|
else: |
|
|
for subject in self.available_materials.keys(): |
|
|
file_count = len(self.available_materials[subject]['files']) |
|
|
welcome_text += f"\n• {subject} ({file_count} ملف)" |
|
|
|
|
|
welcome_text += "\n\nاختر المادة التي تريد البدء بها:" |
|
|
|
|
|
keyboard = self.create_subjects_keyboard() |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
|
|
|
if update.callback_query: |
|
|
await update.callback_query.edit_message_text(welcome_text, reply_markup=reply_markup) |
|
|
else: |
|
|
await update.message.reply_text(welcome_text, reply_markup=reply_markup) |
|
|
|
|
|
return SELECTING_SUBJECT |
|
|
|
|
|
def create_subjects_keyboard(self): |
|
|
"""إنشاء لوحة مفاتيح للمواد المتاحة""" |
|
|
keyboard = [] |
|
|
for subject in self.available_materials.keys(): |
|
|
file_count = len(self.available_materials[subject]['files']) |
|
|
display_name = f"{subject} ({file_count})" |
|
|
keyboard.append([InlineKeyboardButton(display_name, callback_data=f"subject_{subject}")]) |
|
|
|
|
|
keyboard.append([InlineKeyboardButton("🔄 تحديث قائمة المواد", callback_data="refresh_materials")]) |
|
|
keyboard.append([InlineKeyboardButton("❓ مساعدة", callback_Ddata="general_help")]) |
|
|
return keyboard |
|
|
|
|
|
async def handle_subject_selection(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة اختيار المادة""" |
|
|
query = update.callback_query |
|
|
await query.answer() |
|
|
|
|
|
user_id = query.from_user.id |
|
|
callback_data = query.data |
|
|
|
|
|
if callback_data == "general_help": |
|
|
return await self.handle_general_help(query, context) |
|
|
elif callback_data == "refresh_materials": |
|
|
await query.edit_message_text("🔄 جاري تحديث قائمة المواد...") |
|
|
self.load_all_materials() |
|
|
keyboard = self.create_subjects_keyboard() |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
await query.edit_message_text("✅ تم تحديث قائمة المواد\nاختر المادة:", reply_markup=reply_markup) |
|
|
return SELECTING_SUBJECT |
|
|
|
|
|
subject = callback_data.replace("subject_", "") |
|
|
context.user_data['current_subject'] = subject |
|
|
|
|
|
memory = self.get_user_memory(user_id) |
|
|
memory['last_subject'] = subject |
|
|
|
|
|
|
|
|
if subject not in self.available_materials: |
|
|
await query.edit_message_text("❌ خطأ: المادة غير موجودة. ربما تحتاج لتحديث القائمة؟") |
|
|
return await self.start(update, context) |
|
|
|
|
|
subject_files = self.available_materials[subject]['files'] |
|
|
subject_name = subject.replace('_', ' ').title() |
|
|
|
|
|
keyboard = [ |
|
|
[InlineKeyboardButton("📖 شرح محاضرة محددة", callback_data="explain_lecture")], |
|
|
[InlineKeyboardButton("🔍 استعراض جميع الملفات", callback_data="browse_files")], |
|
|
[InlineKeyboardButton("❓ أسئلة عن المادة", callback_data="generate_questions")], |
|
|
[InlineKeyboardButton("📝 ملخص المادة", callback_data="summarize_content")], |
|
|
[InlineKeyboardButton("🧪 تفسير مفهوم", callback_data="explain_concept")], |
|
|
[InlineKeyboardButton("🏠 القائمة الرئيسية", callback_data="main_menu")] |
|
|
] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
|
|
|
await query.edit_message_text( |
|
|
f"📚 **{subject_name}**\n\n" |
|
|
f"عدد الملفات المتاحة: {len(subject_files)}\n" |
|
|
f"اختر الخدمة التي تريدها:", |
|
|
reply_markup=reply_markup, |
|
|
parse_mode='Markdown' |
|
|
) |
|
|
return SELECTING_ACTION |
|
|
|
|
|
async def handle_back_actions(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة العودة إلى قائمة الإجراءات""" |
|
|
query = update.callback_query |
|
|
await query.answer() |
|
|
|
|
|
subject = context.user_data.get('current_subject') |
|
|
if not subject: |
|
|
return await self.start(update, context) |
|
|
|
|
|
subject_files = self.available_materials[subject]['files'] |
|
|
subject_name = subject.replace('_', ' ').title() |
|
|
|
|
|
keyboard = [ |
|
|
[InlineKeyboardButton("📖 شرح محاضرة محددة", callback_data="explain_lecture")], |
|
|
[InlineKeyboardButton("🔍 استعراض جميع الملفات", callback_data="browse_files")], |
|
|
[InlineKeyboardButton("❓ أسئلة عن المادة", callback_data="generate_questions")], |
|
|
[InlineKeyboardButton("📝 ملخص المادة", callback_data="summarize_content")], |
|
|
[InlineKeyboardButton("🧪 تفسير مفهوم", callback_data="explain_concept")], |
|
|
[InlineKeyboardButton("🏠 القائمة الرئيسية", callback_data="main_menu")] |
|
|
] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
|
|
|
await query.edit_message_text( |
|
|
f"📚 **{subject_name}**\n\n" |
|
|
f"اختر الخدمة التي تريدها:", |
|
|
reply_markup=reply_markup, |
|
|
parse_mode='Markdown' |
|
|
) |
|
|
return SELECTING_ACTION |
|
|
|
|
|
async def handle_main_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة العودة للقائمة الرئيسية""" |
|
|
query = update.callback_query |
|
|
await query.answer() |
|
|
context.user_data.clear() |
|
|
return await self.start(update, context) |
|
|
|
|
|
async def handle_more_questions(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة طلب المزيد من الأسئلة""" |
|
|
query = update.callback_query |
|
|
await query.answer() |
|
|
|
|
|
subject = context.user_data.get('current_subject', 'عام') |
|
|
|
|
|
keyboard = [ |
|
|
[InlineKeyboardButton("🔙 العودة للإجراءات", callback_data="back_to_actions")], |
|
|
] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
|
|
|
await query.edit_message_text(f"💬 اكتب سؤالك أو طلبك (في سياق مادة {subject}):", reply_markup=reply_markup) |
|
|
context.user_data['waiting_for'] = 'general' |
|
|
return WAITING_FOR_QUESTION |
|
|
|
|
|
async def handle_change_subject(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة تغيير المادة""" |
|
|
query = update.callback_query |
|
|
await query.answer() |
|
|
keyboard = self.create_subjects_keyboard() |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
await query.edit_message_text("اختر المادة:", reply_markup=reply_markup) |
|
|
return SELECTING_SUBJECT |
|
|
|
|
|
async def handle_action_selection(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة اختيار الإجراء""" |
|
|
query = update.callback_query |
|
|
await query.answer() |
|
|
|
|
|
action = query.data |
|
|
user_id = query.from_user.id |
|
|
subject = context.user_data.get('current_subject') |
|
|
|
|
|
if not subject: |
|
|
await query.edit_message_text("❌ حدث خطأ في السياق. يرجى البدء من جديد.") |
|
|
return await self.start(update, context) |
|
|
|
|
|
if action == "main_menu": |
|
|
return await self.start(update, context) |
|
|
|
|
|
elif action == "browse_files": |
|
|
return await self.browse_available_files(query, context) |
|
|
|
|
|
elif action == "explain_lecture": |
|
|
files_list = await self.get_files_list_text(subject) |
|
|
keyboard = [[InlineKeyboardButton("🔙 العودة", callback_data="back_to_actions")]] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
await query.edit_message_text( |
|
|
f"📖 **شرح محاضرة**\n\n" |
|
|
f"{files_list}\n" |
|
|
f"اكتب رقم المحاضرة أو اسمها:\n" |
|
|
f"مثال: '1' أو 'محاضرة 3' أو 'الملف الثاني'", |
|
|
reply_markup=reply_markup |
|
|
) |
|
|
context.user_data['waiting_for'] = 'lecture_explanation' |
|
|
return WAITING_FOR_QUESTION |
|
|
|
|
|
elif action == "generate_questions": |
|
|
await query.edit_message_text("⏳ جاري توليد الأسئلة...") |
|
|
questions = await self.generate_questions_for_subject(subject, user_id) |
|
|
keyboard = [[InlineKeyboardButton("🔙 العودة", callback_data="back_to_actions")]] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
await query.edit_message_text(questions, reply_markup=reply_markup) |
|
|
return SELECTING_ACTION |
|
|
|
|
|
elif action == "explain_concept": |
|
|
keyboard = [[InlineKeyboardButton("🔙 العودة", callback_data="back_to_actions")]] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
await query.edit_message_text( |
|
|
"🧪 **تفسير مفهوم**\n\n" |
|
|
"ما هو المفهوم أو المصطلح الذي تريد شرحه؟\n" |
|
|
"مثال: 'ما هو تحليل الإنزيمات' أو 'اشرح لي تحليل البول'", |
|
|
reply_markup=reply_markup |
|
|
) |
|
|
context.user_data['waiting_for'] = 'concept_explanation' |
|
|
return WAITING_FOR_QUESTION |
|
|
|
|
|
elif action == "summarize_content": |
|
|
await query.edit_message_text("⏳ جاري تلخيص المادة...") |
|
|
summary = await self.generate_summary(subject, user_id) |
|
|
keyboard = [[InlineKeyboardButton("🔙 العودة", callback_data="back_to_actions")]] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
await query.edit_message_text(summary, reply_markup=reply_markup) |
|
|
return SELECTING_ACTION |
|
|
|
|
|
async def browse_available_files(self, query, context): |
|
|
"""عرض الملفات المتاحة للمادة""" |
|
|
subject = context.user_data.get('current_subject') |
|
|
files_text = await self.get_files_list_text(subject) |
|
|
|
|
|
keyboard = [ |
|
|
[InlineKeyboardButton("📖 طلب شرح ملف", callback_data="explain_lecture")], |
|
|
[InlineKeyboardButton("🔙 العودة", callback_data="back_to_actions")] |
|
|
] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
|
|
|
await query.edit_message_text(files_text, reply_markup=reply_markup) |
|
|
return SELECTING_ACTION |
|
|
|
|
|
async def get_files_list_text(self, subject): |
|
|
"""إنشاء نص لقائمة الملفات""" |
|
|
if subject not in self.available_materials: |
|
|
return "❌ خطأ: لم يتم العثور على المادة المحددة." |
|
|
|
|
|
files = self.available_materials[subject]['files'] |
|
|
|
|
|
if not files: |
|
|
return "❌ لا توجد ملفات متاحة لهذه المادة." |
|
|
|
|
|
files_text = "📁 **الملفات المتاحة:**\n\n" |
|
|
for i, file_info in enumerate(files[:20], 1): |
|
|
file_name = file_info['name'] |
|
|
lecture_num = file_info['lecture_number'] |
|
|
file_type = file_info['type'] |
|
|
|
|
|
type_emoji = { |
|
|
'lecture': '📖', |
|
|
'lab': '🧪', |
|
|
'exam': '📝', |
|
|
'summary': '📋', |
|
|
'unknown': '📄' |
|
|
}.get(file_type, '📄') |
|
|
|
|
|
num_text = f" - (محاضرة {lecture_num})" if lecture_num else "" |
|
|
files_text += f"{i}. {type_emoji} {file_name}{num_text}\n" |
|
|
|
|
|
if len(files) > 20: |
|
|
files_text += f"\n... وغيرها {len(files) - 20} ملف آخر" |
|
|
|
|
|
return files_text |
|
|
|
|
|
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة الرسائل النصية من المستخدم""" |
|
|
user_message = update.message.text |
|
|
user_id = update.effective_user.id |
|
|
waiting_for = context.user_data.get('waiting_for') |
|
|
subject = context.user_data.get('current_subject', 'general') |
|
|
|
|
|
await update.message.chat.send_action(action="typing") |
|
|
|
|
|
try: |
|
|
if waiting_for == 'lecture_explanation': |
|
|
response = await self.explain_lecture(user_message, subject, user_id) |
|
|
elif waiting_for == 'concept_explanation': |
|
|
response = await self.explain_concept(user_message, subject, user_id) |
|
|
else: |
|
|
response = await self.process_general_query(user_message, subject, user_id) |
|
|
|
|
|
await update.message.reply_text(response, parse_mode='Markdown') |
|
|
|
|
|
keyboard = [ |
|
|
[InlineKeyboardButton("🔄 أسئلة أخرى", callback_data="more_questions")], |
|
|
[InlineKeyboardButton("📚 تغيير المادة", callback_data="change_subject")], |
|
|
[InlineKeyboardButton("🏠 القائمة الرئيسية", callback_data="main_menu")] |
|
|
] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
|
|
|
await update.message.reply_text("ماذا تريد أن تفعل بعد؟", reply_markup=reply_markup) |
|
|
|
|
|
context.user_data['waiting_for'] = None |
|
|
return SELECTING_ACTION |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error processing message: {e}") |
|
|
await update.message.reply_text("❌ حدث خطأ أثناء معالجة طلبك. يرجى المحاولة مرة أخرى.") |
|
|
return SELECTING_ACTION |
|
|
|
|
|
|
|
|
bot = MedicalLabBot() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def root(): |
|
|
"""الصفحة الرئيسية""" |
|
|
materials_count = len(bot.available_materials) |
|
|
total_files = sum(len(material['files']) for material in bot.available_materials.values()) |
|
|
|
|
|
|
|
|
status_message = "⏳ جاري التهيئة (الاتصال بـ Telegram)..." |
|
|
status_color = "#ffc107" |
|
|
if bot.initialization_status == "success": |
|
|
status_message = "✅ نشط ومهيأ" |
|
|
status_color = "#28a745" |
|
|
elif bot.initialization_status == "failed": |
|
|
status_message = "❌ فشل التهيئة (خطأ في الاتصال بـ Telegram)" |
|
|
status_color = "#dc3545" |
|
|
|
|
|
return f""" |
|
|
<html> |
|
|
<head> |
|
|
<title>Medical Lab Bot</title> |
|
|
<style> |
|
|
body {{ font-family: Arial, sans-serif; margin: 40px; background: #f5f5f5; }} |
|
|
.container {{ max-width: 800px; margin: 0 auto; background: white; padding: 30px; border-radius: 15px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); }} |
|
|
.status {{ padding: 20px; background: #f0f8ff; border-radius: 10px; border-left: 5px solid {status_color}; }} |
|
|
h1 {{ color: #2c3e50; text-align: center; }} |
|
|
ul {{ line-height: 1.8; }} |
|
|
li {{ margin: 10px 0; }} |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<div class="container"> |
|
|
<h1>🏥 بوت المختبرات الطبية الذكي</h1> |
|
|
<div class="status"> |
|
|
<h2>حالة البوت: {status_message}</h2> |
|
|
<p><strong>المواد المحملة:</strong> {materials_count} مادة</p> |
|
|
<p><strong>إجمالي الملفات:</strong> {total_files} ملف</p> |
|
|
</div> |
|
|
<h3>🎯 المميزات:</h3> |
|
|
<ul> |
|
|
<li>📚 شرح المواد الدراسية من ملفات PDF وWord</li> |
|
|
<li>❓ توليد أسئلة متنوعة للمراجعة</li> |
|
|
<li>📖 تلخيص المحتوى الدراسي</li> |
|
|
<li>🧪 تفسير المفاهيم العلمية</li> |
|
|
<li>💾 ذاكرة محادثة لكل مستخدم</li> |
|
|
</ul> |
|
|
<div class="status" style="margin-top: 20px;"> |
|
|
<p><strong>ℹ️ معلومات:</strong> هذا البوت يستخدم الذكاء الاصطناعي من NVIDIA وملفات Hugging Face</p> |
|
|
</div> |
|
|
</div> |
|
|
</body> |
|
|
</html> |
|
|
""" |
|
|
|
|
|
@app.post("/telegram") |
|
|
async def handle_telegram_update(request: Request): |
|
|
"""معالجة تحديثات Telegram""" |
|
|
try: |
|
|
|
|
|
if not bot.is_initialized or not bot.application: |
|
|
logger.error("❌ التطبيق غير مهيأ، لا يمكن معالجة التحديث (ربما لا يزال قيد التهيئة).") |
|
|
raise HTTPException(status_code=503, detail="Application not initialized or still initializing") |
|
|
|
|
|
update_data = await request.json() |
|
|
update = Update.de_json(update_data, bot.application.bot) |
|
|
|
|
|
|
|
|
await bot.application.process_update(update) |
|
|
return {"status": "ok"} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error processing update: {e}") |
|
|
raise HTTPException(status_code=400, detail="Invalid update") |
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
"""فحص صحة الخدمة""" |
|
|
materials_count = len(bot.available_materials) |
|
|
total_files = sum(len(material['files']) for material in bot.available_materials.values()) |
|
|
|
|
|
status = "unhealthy" |
|
|
if bot.initialization_status == "pending": |
|
|
status = "initializing" |
|
|
elif bot.initialization_status == "success": |
|
|
status = "healthy" |
|
|
elif bot.initialization_status == "failed": |
|
|
status = "unhealthy_failed_init" |
|
|
|
|
|
return { |
|
|
"status": status, |
|
|
"service": "medical-lab-bot", |
|
|
"initialization_status": bot.initialization_status, |
|
|
"materials_loaded": materials_count, |
|
|
"total_files": total_files, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
port = int(os.environ.get("PORT", 7860)) |
|
|
logger.info(f"🚀 Starting Medical Lab Bot on port {port}") |
|
|
uvicorn.run(app, host="0.0.0.0", port=port) |