|
|
import os |
|
|
import logging |
|
|
import asyncio |
|
|
import re |
|
|
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup |
|
|
from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes, ConversationHandler |
|
|
from huggingface_hub import HfApi, hf_hub_download, list_repo_files |
|
|
from openai import OpenAI |
|
|
import pickle |
|
|
import json |
|
|
from datetime import datetime |
|
|
import PyPDF2 |
|
|
import fitz |
|
|
from PIL import Image |
|
|
import io |
|
|
import requests |
|
|
from fastapi import FastAPI, Request, HTTPException |
|
|
from fastapi.responses import HTMLResponse |
|
|
import uvicorn |
|
|
import random |
|
|
import docx |
|
|
|
|
|
|
|
|
import httpx |
|
|
import dns.asyncresolver |
|
|
from httpx import AsyncClient, AsyncHTTPTransport, Request, Response |
|
|
import contextlib |
|
|
from typing import Dict |
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
|
level=logging.INFO |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
TELEGRAM_BOT_TOKEN = os.environ.get('TELEGRAM_BOT_TOKEN') |
|
|
NVAPI_API_KEY = os.environ.get('NVAPI_API_KEY') |
|
|
SPACE_URL = os.environ.get('SPACE_URL', '') |
|
|
|
|
|
|
|
|
nvidia_client = OpenAI( |
|
|
base_url="https://integrate.api.nvidia.com/v1", |
|
|
api_key=NVAPI_API_KEY |
|
|
) |
|
|
|
|
|
|
|
|
REPO_ID = "Riy777/Study" |
|
|
|
|
|
|
|
|
SELECTING_SUBJECT, SELECTING_ACTION, WAITING_FOR_QUESTION = range(3) |
|
|
|
|
|
|
|
|
|
|
|
class CustomDNSTransport(AsyncHTTPTransport): |
|
|
def __init__(self, *args, **kwargs): |
|
|
super().__init__(*args, **kwargs) |
|
|
self.resolver = dns.asyncresolver.Resolver() |
|
|
self.resolver.nameservers = ['1.1.1.1', '8.8.8.8'] |
|
|
logger.info("🔧 CustomDNSTransport initialized with 1.1.1.1 and 8.8.8.8") |
|
|
|
|
|
async def handle_async_request(self, request: Request) -> Response: |
|
|
try: |
|
|
host = request.url.host |
|
|
|
|
|
|
|
|
is_ip = True |
|
|
try: |
|
|
|
|
|
parts = host.split('.') |
|
|
if len(parts) != 4 or not all(p.isdigit() and 0 <= int(p) <= 255 for p in parts): |
|
|
is_ip = False |
|
|
|
|
|
except Exception: |
|
|
is_ip = False |
|
|
|
|
|
if is_ip: |
|
|
|
|
|
pass |
|
|
else: |
|
|
logger.info(f"🔧 Resolving host: {host}") |
|
|
result = await self.resolver.resolve(host, 'A') |
|
|
ip = result[0].address |
|
|
logger.info(f"✅ Resolved {host} to {ip}") |
|
|
|
|
|
|
|
|
request.extensions["sni_hostname"] = host |
|
|
|
|
|
request.url = request.url.copy_with(host=ip) |
|
|
|
|
|
except dns.resolver.NoAnswer: |
|
|
logger.error(f"❌ DNS NoAnswer for {host}. Trying request with original host...") |
|
|
except dns.resolver.NXDOMAIN: |
|
|
logger.error(f"❌ DNS NXDOMAIN for {host}. Trying request with original host...") |
|
|
except Exception as e: |
|
|
logger.error(f"❌ DNS resolution failed for {host}: {e}. Trying request with original host...") |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
return await super().handle_async_request(request) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@contextlib.asynccontextmanager |
|
|
async def lifespan(app: FastAPI): |
|
|
logger.info("🚀 بدء تشغيل FastAPI... بدء مهمة تهيئة البوت في الخلفية.") |
|
|
asyncio.create_task(bot.initialize_application()) |
|
|
logger.info("✅ خادم FastAPI يعمل. التهيئة (الاتصال بـ Telegram) جارية في الخلفية.") |
|
|
yield |
|
|
logger.info("🛑 إيقاف تشغيل خادم FastAPI.") |
|
|
|
|
|
app = FastAPI(title="Medical Lab Bot", version="1.0.0", lifespan=lifespan) |
|
|
|
|
|
|
|
|
|
|
|
class MedicalLabBot: |
|
|
def __init__(self): |
|
|
self.conversation_memory = {} |
|
|
self.available_materials = {} |
|
|
self.file_cache = {} |
|
|
self.application = None |
|
|
self.is_initialized = False |
|
|
self.initialization_status = "pending" |
|
|
self.load_all_materials() |
|
|
|
|
|
async def initialize_application(self): |
|
|
"""تهيئة تطبيق التليجرام بشكل غير متزامن مع حل DNS المخصص""" |
|
|
try: |
|
|
if self.is_initialized: |
|
|
return True |
|
|
|
|
|
logger.info("🔄 جاري تهيئة تطبيق التليجرام...") |
|
|
|
|
|
|
|
|
logger.info("🔧 إعداد عميل HTTP مخصص مع CustomDNSTransport...") |
|
|
custom_transport = CustomDNSTransport() |
|
|
custom_client = httpx.AsyncClient(transport=custom_transport) |
|
|
|
|
|
|
|
|
self.application = ( |
|
|
Application.builder() |
|
|
.token(TELEGRAM_BOT_TOKEN) |
|
|
.http_client(custom_client) |
|
|
.build() |
|
|
) |
|
|
await self.setup_handlers() |
|
|
|
|
|
max_retries = 3 |
|
|
retry_delay = 5 |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
logger.info(f"🚀 محاولة تهيئة الاتصال بـ Telegram (محاولة {attempt + 1}/{max_retries})...") |
|
|
|
|
|
await self.application.initialize() |
|
|
logger.info("✅ تم تهيئة الاتصال بـ Telegram بنجاح.") |
|
|
|
|
|
|
|
|
if SPACE_URL: |
|
|
webhook_url = f"{SPACE_URL.rstrip('/')}/telegram" |
|
|
logger.info(f"ℹ️ جاري إعداد الويب هوك على: {webhook_url}") |
|
|
await self.application.bot.set_webhook( |
|
|
url=webhook_url, |
|
|
allowed_updates=Update.ALL_TYPES, |
|
|
drop_pending_updates=True |
|
|
) |
|
|
logger.info(f"✅ Webhook set to: {webhook_url}") |
|
|
else: |
|
|
logger.warning("⚠️ SPACE_URL not set. Webhook cannot be set.") |
|
|
|
|
|
|
|
|
self.is_initialized = True |
|
|
self.initialization_status = "success" |
|
|
logger.info("✅✅✅ التطبيق جاهز لاستقبال الطلبات.") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"⚠️ فشلت محاولة التهيئة {attempt + 1}: {e}") |
|
|
if attempt < max_retries - 1: |
|
|
logger.info(f"⏳ الانتظار {retry_delay} ثواني قبل إعادة المحاولة...") |
|
|
await asyncio.sleep(retry_delay) |
|
|
else: |
|
|
logger.error(f"❌ فشل تهيئة التطبيق نهائياً بعد {max_retries} محاولات (حتى مع DNS المخصص).") |
|
|
|
|
|
|
|
|
self.is_initialized = False |
|
|
self.initialization_status = "failed" |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ خطأ فادح في تهيئة التطبيق: {e}") |
|
|
self.is_initialized = False |
|
|
self.initialization_status = "failed" |
|
|
return False |
|
|
|
|
|
async def setup_handlers(self): |
|
|
"""إعداد معالجات التليجرام""" |
|
|
|
|
|
conv_handler = ConversationHandler( |
|
|
entry_points=[CommandHandler('start', self.start)], |
|
|
states={ |
|
|
SELECTING_SUBJECT: [ |
|
|
CallbackQueryHandler(self.handle_subject_selection, pattern='^(subject_|general_help|refresh_materials)') |
|
|
], |
|
|
SELECTING_ACTION: [ |
|
|
CallbackQueryHandler(self.handle_action_selection, pattern='^(explain_lecture|browse_files|generate_questions|summarize_content|explain_concept|main_menu)$'), |
|
|
|
|
|
CallbackQueryHandler(self.handle_back_actions, pattern='^back_to_actions$') |
|
|
], |
|
|
WAITING_FOR_QUESTION: [ |
|
|
MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message), |
|
|
CallbackQueryHandler(self.handle_back_actions, pattern='^back_to_actions$') |
|
|
] |
|
|
}, |
|
|
fallbacks=[ |
|
|
CommandHandler('start', self.start), |
|
|
CallbackQueryHandler(self.handle_main_menu, pattern='^main_menu$') |
|
|
], |
|
|
name="medical_lab_conversation", |
|
|
persistent=False, |
|
|
per_message=False |
|
|
) |
|
|
|
|
|
|
|
|
self.application.add_handler(conv_handler) |
|
|
|
|
|
|
|
|
self.application.add_handler(CallbackQueryHandler(self.handle_more_questions, pattern='^more_questions$')) |
|
|
self.application.add_handler(CallbackQueryHandler(self.handle_change_subject, pattern='^change_subject$')) |
|
|
|
|
|
logger.info("✅ تم إعداد معالجات التليجرام") |
|
|
|
|
|
def load_all_materials(self): |
|
|
"""تحميل جميع المواد والملفات من Hugging Face""" |
|
|
try: |
|
|
logger.info("جاري تحميل قائمة المواد من Hugging Face...") |
|
|
all_files = list_repo_files(repo_id=REPO_ID, repo_type="dataset") |
|
|
|
|
|
materials = {} |
|
|
|
|
|
for file_path in all_files: |
|
|
try: |
|
|
path_parts = file_path.split('/') |
|
|
|
|
|
if len(path_parts) >= 2: |
|
|
subject = path_parts[0] |
|
|
file_name = path_parts[-1] |
|
|
|
|
|
if subject not in materials: |
|
|
materials[subject] = { |
|
|
'files': [], |
|
|
'file_details': {} |
|
|
} |
|
|
|
|
|
file_info = self.extract_file_info(file_name, file_path) |
|
|
materials[subject]['files'].append(file_info) |
|
|
materials[subject]['file_details'][file_name] = file_info |
|
|
|
|
|
else: |
|
|
|
|
|
if 'general' not in materials: |
|
|
materials['general'] = { |
|
|
'files': [], |
|
|
'file_details': {} |
|
|
} |
|
|
file_info = self.extract_file_info(file_path, file_path) |
|
|
materials['general']['files'].append(file_info) |
|
|
materials['general']['file_details'][file_path] = file_info |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"خطأ في معالجة الملف {file_path}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
for subject in materials: |
|
|
materials[subject]['files'].sort(key=lambda x: (x['lecture_number'] if x['lecture_number'] is not None else float('inf'), x['name'])) |
|
|
|
|
|
|
|
|
self.available_materials = materials |
|
|
logger.info(f"✅ تم تحميل {len(materials)} مادة بنجاح") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ خطأ في تحميل المواد: {e}") |
|
|
self.available_materials = {'Biochemistry': {'files': [], 'file_details': {}}} |
|
|
|
|
|
|
|
|
|
|
|
def get_user_memory(self, user_id): |
|
|
"""الحصول على ذاكرة المستخدم أو إنشاؤها""" |
|
|
if user_id not in self.conversation_memory: |
|
|
self.conversation_memory[user_id] = {'history': [], 'last_subject': None} |
|
|
return self.conversation_memory[user_id] |
|
|
|
|
|
async def handle_general_help(self, query, context): |
|
|
"""عرض رسالة المساعدة""" |
|
|
help_text = """ |
|
|
❓ **مساعدة** ❓ |
|
|
|
|
|
هذا البوت مصمم لمساعدتك في دراسة مواد المختبرات الطبية. |
|
|
|
|
|
1. **ابدأ** باختيار مادة من القائمة الرئيسية. |
|
|
2. **اختر الخدمة:** |
|
|
* **شرح محاضرة:** يعطيك ملخص للملف الذي تختاره. |
|
|
* **استعراض الملفات:** يعرض لك كل الملفات المتاحة. |
|
|
* **أسئلة عن المادة:** يولد أسئلة من ملفات عشوائية. |
|
|
* **ملخص المادة:** يلخص لك ملف مهم من المادة. |
|
|
* **تفسير مفهوم:** اطرح أي سؤال أو مصطلح (مثل "ما هو CBC") وسأشرحه لك. |
|
|
3. **تحديث المواد:** اضغط (🔄) لتحديث قائمة المواد من المصدر. |
|
|
""" |
|
|
keyboard = [[InlineKeyboardButton("🔙 العودة للقائمة الرئيسية", callback_data="main_menu")]] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
|
|
|
await query.edit_message_text(help_text, reply_markup=reply_markup) |
|
|
return SELECTING_SUBJECT |
|
|
|
|
|
def extract_file_info(self, file_name, file_path): |
|
|
"""استخراج معلومات الملف (النوع، رقم المحاضرة) من الاسم""" |
|
|
name_lower = file_name.lower() |
|
|
lecture_num = None |
|
|
file_type = 'unknown' |
|
|
|
|
|
|
|
|
|
|
|
match = re.search(r'(?:lecture|lec|محاضرة)\s*(\d+)', name_lower) |
|
|
if not match: |
|
|
|
|
|
match = re.search(r'^(\d+)\s*-|[\s_-](\d+)$', name_lower) |
|
|
if match: |
|
|
|
|
|
lecture_num_str = match.group(2) or match.group(1) |
|
|
lecture_num = int(lecture_num_str) if lecture_num_str else None |
|
|
else: |
|
|
lecture_num = int(match.group(1)) |
|
|
|
|
|
|
|
|
|
|
|
if 'lab' in name_lower or 'عملي' in name_lower: |
|
|
file_type = 'lab' |
|
|
elif 'exam' in name_lower or 'امتحان' in name_lower or 'اسئلة' in name_lower: |
|
|
file_type = 'exam' |
|
|
elif 'summary' in name_lower or 'ملخص' in name_lower: |
|
|
file_type = 'summary' |
|
|
elif 'lecture' in name_lower or 'محاضرة' in name_lower: |
|
|
file_type = 'lecture' |
|
|
|
|
|
elif lecture_num is not None: |
|
|
file_type = 'lecture' |
|
|
|
|
|
|
|
|
return { |
|
|
'name': file_name, |
|
|
'path': file_path, |
|
|
'lecture_number': lecture_num, |
|
|
'type': file_type |
|
|
} |
|
|
|
|
|
async def _call_nvidia_api(self, messages, max_tokens=1500): |
|
|
"""دالة مساعدة لاستدعاء NVIDIA API""" |
|
|
try: |
|
|
completion = await asyncio.to_thread( |
|
|
nvidia_client.chat.completions.create, |
|
|
model="meta/llama3-70b-instruct", |
|
|
messages=messages, |
|
|
temperature=0.5, |
|
|
top_p=1, |
|
|
max_tokens=max_tokens |
|
|
) |
|
|
return completion.choices[0].message.content |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error calling NVIDIA API: {e}") |
|
|
return f"❌ حدث خطأ أثناء التواصل مع الذكاء الاصطناعي: {e}" |
|
|
|
|
|
def _find_file_by_query(self, query, subject): |
|
|
"""البحث عن ملف بناءً على استعلام المستخدم (رقم، اسم، الخ)""" |
|
|
files = self.available_materials[subject]['files'] |
|
|
query_lower = query.lower().strip() |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
match_num = re.findall(r'^\d+$', query_lower) |
|
|
if match_num: |
|
|
index = int(match_num[0]) |
|
|
if 1 <= index <= len(files): |
|
|
logger.info(f"Found file by index {index}") |
|
|
return files[index - 1] |
|
|
except (IndexError, ValueError): |
|
|
pass |
|
|
|
|
|
|
|
|
match_lec_num = re.search(r'(?:lecture|lec|محاضرة)\s*(\d+)', query_lower) |
|
|
if match_lec_num: |
|
|
num = int(match_lec_num.group(1)) |
|
|
for file_info in files: |
|
|
if file_info['lecture_number'] == num: |
|
|
logger.info(f"Found file by exact lecture number {num}") |
|
|
return file_info |
|
|
|
|
|
|
|
|
best_match = None |
|
|
highest_score = 0 |
|
|
for file_info in files: |
|
|
name_lower = file_info['name'].lower() |
|
|
|
|
|
query_words = set(query_lower.split()) |
|
|
name_words = set(re.findall(r'\w+', name_lower)) |
|
|
common_words = query_words.intersection(name_words) |
|
|
score = len(common_words) |
|
|
|
|
|
if score > highest_score: |
|
|
highest_score = score |
|
|
best_match = file_info |
|
|
|
|
|
elif highest_score > 0 and file_info['lecture_number'] is not None: |
|
|
num_in_query = re.findall(r'\d+', query_lower) |
|
|
if num_in_query and file_info['lecture_number'] == int(num_in_query[0]): |
|
|
logger.info(f"Found file by partial name match with lecture number heuristic: {file_info['name']}") |
|
|
return file_info |
|
|
|
|
|
|
|
|
if best_match and highest_score > 0: |
|
|
logger.info(f"Found file by best partial name match: {best_match['name']} (score: {highest_score})") |
|
|
return best_match |
|
|
|
|
|
|
|
|
logger.warning(f"Could not find file matching query: '{query}' in subject: {subject}") |
|
|
return None |
|
|
|
|
|
async def download_and_extract_content(self, file_path, subject): |
|
|
"""تحميل الملف من HF واستخراج النص منه""" |
|
|
if file_path in self.file_cache: |
|
|
|
|
|
return self.file_cache[file_path] |
|
|
|
|
|
logger.info(f"⏳ Downloading {file_path} from Hugging Face...") |
|
|
try: |
|
|
|
|
|
local_path = await asyncio.to_thread( |
|
|
hf_hub_download, |
|
|
repo_id=REPO_ID, |
|
|
filename=file_path, |
|
|
repo_type="dataset" |
|
|
) |
|
|
|
|
|
text_content = "" |
|
|
logger.info(f"📄 Extracting content from {local_path}") |
|
|
|
|
|
if local_path.lower().endswith('.pdf'): |
|
|
|
|
|
with fitz.open(local_path) as doc: |
|
|
for page in doc: |
|
|
text_content += page.get_text("text", sort=True) |
|
|
text_content += "\n\n--- Page Break ---\n\n" |
|
|
|
|
|
elif local_path.lower().endswith('.txt'): |
|
|
with open(local_path, 'r', encoding='utf-8', errors='ignore') as f: |
|
|
text_content = f.read() |
|
|
|
|
|
elif local_path.lower().endswith('.docx'): |
|
|
doc_obj = docx.Document(local_path) |
|
|
full_text = [] |
|
|
for para in doc_obj.paragraphs: |
|
|
full_text.append(para.text) |
|
|
text_content = '\n'.join(full_text) |
|
|
|
|
|
else: |
|
|
logger.warning(f"Unsupported file type: {local_path}") |
|
|
return f"Error: Unsupported file type ({os.path.basename(file_path)})." |
|
|
|
|
|
|
|
|
text_content = re.sub(r'\s+\n', '\n', text_content) |
|
|
text_content = re.sub(r'\n{3,}', '\n\n', text_content) |
|
|
text_content = re.sub(r' +', ' ', text_content).strip() |
|
|
|
|
|
if len(text_content) < 50: |
|
|
logger.warning(f"File {file_path} content is very short or empty after extraction.") |
|
|
|
|
|
self.file_cache[file_path] = text_content |
|
|
|
|
|
return text_content |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error downloading/extracting {file_path}: {e}", exc_info=True) |
|
|
return f"Error: Could not retrieve or process file {os.path.basename(file_path)}." |
|
|
|
|
|
|
|
|
|
|
|
async def explain_lecture(self, user_query, subject, user_id): |
|
|
"""شرح محاضرة بناءً على استعلام المستخدم""" |
|
|
file_info = self._find_file_by_query(user_query, subject) |
|
|
|
|
|
if not file_info: |
|
|
files_list_text = await self.get_files_list_text(subject) |
|
|
return f"❌ لم أتمكن من العثور على الملف المطلوب.\n\n{files_list_text}\n\nيرجى تحديد رقم الملف من القائمة أو كتابة جزء واضح من اسمه." |
|
|
|
|
|
file_path = file_info['path'] |
|
|
file_name = file_info['name'] |
|
|
await bot.application.bot.send_chat_action(chat_id=user_id, action="typing") |
|
|
content = await self.download_and_extract_content(file_path, subject) |
|
|
|
|
|
if content.startswith("Error:"): |
|
|
return f"❌ خطأ في معالجة الملف: {file_name}\n{content}" |
|
|
|
|
|
if not content.strip(): |
|
|
return f"❌ المحتوى فارغ للملف: {file_name}" |
|
|
|
|
|
memory = self.get_user_memory(user_id) |
|
|
memory['history'].append({"role": "user", "content": f"اشرح لي النقاط الأساسية في هذه المحاضرة: {file_name}"}) |
|
|
|
|
|
|
|
|
max_content_chars = 7000 |
|
|
if len(content) > max_content_chars: |
|
|
content_snippet = content[:max_content_chars] + "\n\n[... المحتوى مقطوع ...]" |
|
|
logger.warning(f"Content for {file_name} truncated to {max_content_chars} chars.") |
|
|
else: |
|
|
content_snippet = content |
|
|
|
|
|
|
|
|
system_prompt = f"أنت مساعد أكاديمي متخصص في مادة {subject}. مهمتك هي شرح النقاط الأساسية في محتوى المحاضرة المقدم لك. ركز على المفاهيم الجوهرية، التعريفات الهامة، النتائج الرئيسية، وأي معلومات ضرورية لفهم الموضوع. قدم الشرح بطريقة منظمة وواضحة باستخدام نقاط Markdown. تجنب التفاصيل الثانوية غير الضرورية." |
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": f"اسم الملف: {file_name}\n\nالمحتوى:\n```\n{content_snippet}\n```\n\nيرجى شرح النقاط الأساسية في هذا المحتوى."} |
|
|
] |
|
|
|
|
|
response = await self._call_nvidia_api(messages, 1500) |
|
|
memory['history'].append({"role": "assistant", "content": response}) |
|
|
return f"📝 **شرح لأهم نقاط ملف: {file_name}**\n\n{response}" |
|
|
|
|
|
async def explain_concept(self, user_query, subject, user_id): |
|
|
"""شرح مفهوم أو مصطلح طبي""" |
|
|
memory = self.get_user_memory(user_id) |
|
|
memory['history'].append({"role": "user", "content": user_query}) |
|
|
|
|
|
system_prompt = f"أنت خبير أكاديمي في مجال المختبرات الطبية، متخصص حالياً في مادة {subject}. اشرح المفهوم أو المصطلح التالي ({user_query}) بوضوح ودقة. ابدأ بتعريف أساسي، ثم وضح أهميته وتطبيقاته العملية في المختبر، واربطه بمادة {subject} إن أمكن. استخدم لغة سهلة ومباشرة." |
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
|
|
|
*memory['history'][-3:-1], |
|
|
{"role": "user", "content": f"اشرح لي المفهوم التالي: {user_query}"} |
|
|
] |
|
|
|
|
|
response = await self._call_nvidia_api(messages, 1000) |
|
|
memory['history'].append({"role": "assistant", "content": response}) |
|
|
return f"🧪 **شرح مفهوم: {user_query}**\n\n{response}" |
|
|
|
|
|
async def process_general_query(self, user_message, subject, user_id): |
|
|
"""معالجة استعلام عام من المستخدم""" |
|
|
memory = self.get_user_memory(user_id) |
|
|
history = memory.get('history', []) |
|
|
|
|
|
system_prompt = f"أنت مساعد ذكي متخصص في المختبرات الطبية. المادة الدراسية الحالية التي يركز عليها الطالب هي '{subject}'. أجب على سؤال الطالب ({user_message}) إجابة واضحة ومباشرة. استخدم سياق المحادثة السابق إذا كان ضرورياً لفهم السؤال. إذا كان السؤال خارج نطاق المادة أو التخصص، اعتذر بلطف." |
|
|
|
|
|
messages = [{"role": "system", "content": system_prompt}] |
|
|
|
|
|
messages.extend(history[-4:]) |
|
|
messages.append({"role": "user", "content": user_message}) |
|
|
|
|
|
response = await self._call_nvidia_api(messages) |
|
|
memory['history'].append({"role": "user", "content": user_message}) |
|
|
memory['history'].append({"role": "assistant", "content": response}) |
|
|
return response |
|
|
|
|
|
async def generate_questions_for_subject(self, subject, user_id): |
|
|
"""توليد أسئلة من ملف عشوائي في المادة""" |
|
|
if subject not in self.available_materials or not self.available_materials[subject]['files']: |
|
|
return "❌ لا توجد ملفات في هذه المادة لتوليد أسئلة منها." |
|
|
|
|
|
files = self.available_materials[subject]['files'] |
|
|
|
|
|
preferred_files = [f for f in files if f['type'] in ('lecture', 'summary')] |
|
|
if preferred_files: |
|
|
file_info = random.choice(preferred_files) |
|
|
else: |
|
|
file_info = random.choice(files) |
|
|
|
|
|
file_path = file_info['path'] |
|
|
file_name = file_info['name'] |
|
|
|
|
|
await bot.application.bot.send_chat_action(chat_id=user_id, action="typing") |
|
|
content = await self.download_and_extract_content(file_path, subject) |
|
|
|
|
|
if content.startswith("Error:") or not content.strip(): |
|
|
logger.error(f"Failed to get content for question generation from {file_name}. Content: {content[:100]}") |
|
|
|
|
|
if len(files) > 1: |
|
|
logger.info("Retrying with another file for question generation...") |
|
|
remaining_files = [f for f in files if f['path'] != file_path] |
|
|
if remaining_files: |
|
|
file_info = random.choice(remaining_files) |
|
|
file_path = file_info['path'] |
|
|
file_name = file_info['name'] |
|
|
content = await self.download_and_extract_content(file_path, subject) |
|
|
if content.startswith("Error:") or not content.strip(): |
|
|
logger.error(f"Retry failed for question generation from {file_name}.") |
|
|
return f"❌ لم أتمكن من قراءة محتوى صالح لتوليد أسئلة من ملفات المادة." |
|
|
else: |
|
|
return f"❌ لم أتمكن من قراءة محتوى صالح لتوليد أسئلة من ملفات المادة." |
|
|
else: |
|
|
return f"❌ لم أتمكن من قراءة محتوى صالح لتوليد أسئلة من ملفات المادة." |
|
|
|
|
|
|
|
|
max_content_chars = 7000 |
|
|
content_snippet = content[:max_content_chars] if len(content) > max_content_chars else content |
|
|
|
|
|
system_prompt = f"أنت خبير في وضع الأسئلة لمادة {subject}. بناءً على المحتوى التالي من ملف '{file_name}'، قم بإنشاء 5 أسئلة متنوعة لاختبار الفهم. يجب أن تشمل الأسئلة (إذا أمكن): سؤال اختيار من متعدد (MCQ) واحد على الأقل، سؤال إجابة قصيرة واحد على الأقل، وسؤال يتطلب تفكيراً أعمق قليلاً. اجعل الأسئلة واضحة ومباشرة." |
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": f"المحتوى:\n```\n{content_snippet}\n```\n\nقم بإنشاء 5 أسئلة متنوعة بناءً على هذا المحتوى."} |
|
|
] |
|
|
|
|
|
response = await self._call_nvidia_api(messages) |
|
|
return f"❓ **أسئلة مقترحة من ملف: {file_name}**\n\n{response}" |
|
|
|
|
|
async def generate_summary(self, subject, user_id): |
|
|
"""تلخيص ملف مهم من المادة""" |
|
|
if subject not in self.available_materials or not self.available_materials[subject]['files']: |
|
|
return "❌ لا توجد ملفات في هذه المادة لتلخيصها." |
|
|
|
|
|
files = self.available_materials[subject]['files'] |
|
|
|
|
|
|
|
|
summary_file = next((f for f in files if f['type'] == 'summary'), None) |
|
|
|
|
|
if not summary_file: |
|
|
summary_file = next((f for f in files if f['type'] == 'lecture'), None) |
|
|
|
|
|
if not summary_file: |
|
|
summary_file = files[0] |
|
|
|
|
|
file_path = summary_file['path'] |
|
|
file_name = summary_file['name'] |
|
|
|
|
|
await bot.application.bot.send_chat_action(chat_id=user_id, action="typing") |
|
|
content = await self.download_and_extract_content(file_path, subject) |
|
|
|
|
|
if content.startswith("Error:") or not content.strip(): |
|
|
logger.error(f"Failed to get content for summary from {file_name}. Content: {content[:100]}") |
|
|
return f"❌ لم أتمكن من قراءة ملف ({file_name}) للتلخيص." |
|
|
|
|
|
max_content_chars = 7000 |
|
|
content_snippet = content[:max_content_chars] if len(content) > max_content_chars else content |
|
|
|
|
|
system_prompt = f"أنت خبير في تلخيص المواد العلمية لمادة {subject}. قم بتلخيص المحتوى التالي من ملف '{file_name}' في 5 نقاط رئيسية وموجزة. يجب أن تغطي النقاط أهم الأفكار والمفاهيم الواردة في النص." |
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": f"المحتوى:\n```\n{content_snippet}\n```\n\nلخص هذا المحتوى في 5 نقاط رئيسية."} |
|
|
] |
|
|
|
|
|
response = await self._call_nvidia_api(messages) |
|
|
return f"📋 **ملخص ملف: {file_name}**\n\n{response}" |
|
|
|
|
|
|
|
|
|
|
|
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""بدء المحادثة وعرض القائمة الرئيسية""" |
|
|
user_id = update.effective_user.id |
|
|
logger.info(f"User {user_id} started the bot.") |
|
|
|
|
|
welcome_text = """ |
|
|
🏥 **مرحباً بك في بوت المختبرات الطبية الذكي** 🔬 |
|
|
|
|
|
أنا هنا لمساعدتك في دراسة موادك! 📚 |
|
|
|
|
|
**المواد المتاحة حالياً:** |
|
|
""" |
|
|
|
|
|
if not self.available_materials: |
|
|
welcome_text += "\n\n⚠️ عذراً، لم أتمكن من تحميل أي مواد دراسية حالياً. حاول تحديث القائمة بالضغط على (🔄)." |
|
|
else: |
|
|
for subject in sorted(self.available_materials.keys()): |
|
|
file_count = len(self.available_materials[subject]['files']) |
|
|
welcome_text += f"\n• {subject} ({file_count} ملف)" |
|
|
|
|
|
welcome_text += "\n\n👇 اختر المادة التي تريد البدء بها:" |
|
|
|
|
|
keyboard = self.create_subjects_keyboard() |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
|
|
|
|
|
|
if update.callback_query: |
|
|
try: |
|
|
await update.callback_query.edit_message_text(welcome_text, reply_markup=reply_markup) |
|
|
except Exception as e: |
|
|
logger.error(f"Error editing message in start: {e}") |
|
|
|
|
|
await update.effective_message.reply_text(welcome_text, reply_markup=reply_markup) |
|
|
else: |
|
|
await update.message.reply_text(welcome_text, reply_markup=reply_markup) |
|
|
|
|
|
return SELECTING_SUBJECT |
|
|
|
|
|
def create_subjects_keyboard(self): |
|
|
"""إنشاء لوحة مفاتيح للمواد المتاحة""" |
|
|
keyboard = [] |
|
|
|
|
|
subjects = sorted(self.available_materials.keys()) |
|
|
row = [] |
|
|
max_cols = 2 if len(subjects) > 4 else 1 |
|
|
for i, subject in enumerate(subjects): |
|
|
file_count = len(self.available_materials[subject]['files']) |
|
|
display_name = f"{subject} ({file_count})" |
|
|
row.append(InlineKeyboardButton(display_name, callback_data=f"subject_{subject}")) |
|
|
if len(row) == max_cols or i == len(subjects) - 1: |
|
|
keyboard.append(row) |
|
|
row = [] |
|
|
|
|
|
|
|
|
keyboard.append([InlineKeyboardButton("🔄 تحديث قائمة المواد", callback_data="refresh_materials")]) |
|
|
keyboard.append([InlineKeyboardButton("❓ مساعدة", callback_data="general_help")]) |
|
|
return keyboard |
|
|
|
|
|
async def handle_subject_selection(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة اختيار المادة""" |
|
|
query = update.callback_query |
|
|
await query.answer() |
|
|
|
|
|
user_id = query.from_user.id |
|
|
callback_data = query.data |
|
|
logger.info(f"User {user_id} selected: {callback_data}") |
|
|
|
|
|
if callback_data == "general_help": |
|
|
return await self.handle_general_help(query, context) |
|
|
elif callback_data == "refresh_materials": |
|
|
await query.edit_message_text("🔄 جاري تحديث قائمة المواد...") |
|
|
self.load_all_materials() |
|
|
keyboard = self.create_subjects_keyboard() |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
await query.edit_message_text("✅ تم تحديث قائمة المواد.\nاختر المادة:", reply_markup=reply_markup) |
|
|
return SELECTING_SUBJECT |
|
|
|
|
|
subject = callback_data.replace("subject_", "") |
|
|
context.user_data['current_subject'] = subject |
|
|
|
|
|
memory = self.get_user_memory(user_id) |
|
|
memory['last_subject'] = subject |
|
|
|
|
|
|
|
|
if subject not in self.available_materials: |
|
|
logger.error(f"Selected subject '{subject}' not found in available materials.") |
|
|
await query.edit_message_text("❌ خطأ: المادة المحددة غير موجودة. ربما تحتاج لتحديث القائمة؟") |
|
|
return await self.start(update, context) |
|
|
|
|
|
subject_files = self.available_materials[subject]['files'] |
|
|
subject_name = subject.replace('_', ' ').title() |
|
|
|
|
|
keyboard = [ |
|
|
[InlineKeyboardButton("📖 شرح محاضرة", callback_data="explain_lecture"), InlineKeyboardButton("🔍 استعراض الملفات", callback_data="browse_files")], |
|
|
[InlineKeyboardButton("❓ توليد أسئلة", callback_data="generate_questions"), InlineKeyboardButton("📝 تلخيص ملف", callback_data="summarize_content")], |
|
|
[InlineKeyboardButton("🧪 تفسير مفهوم", callback_data="explain_concept")], |
|
|
[InlineKeyboardButton("🏠 العودة للقائمة الرئيسية", callback_data="main_menu")] |
|
|
] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
|
|
|
await query.edit_message_text( |
|
|
f"📚 **{subject_name}**\n\n" |
|
|
f"عدد الملفات المتاحة: {len(subject_files)}\n\n" |
|
|
f"ماذا تريد أن تفعل؟", |
|
|
reply_markup=reply_markup, |
|
|
parse_mode='Markdown' |
|
|
) |
|
|
return SELECTING_ACTION |
|
|
|
|
|
async def handle_back_actions(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة العودة إلى قائمة الإجراءات""" |
|
|
query = update.callback_query |
|
|
await query.answer() |
|
|
logger.info(f"User {query.from_user.id} requested back to actions.") |
|
|
|
|
|
subject = context.user_data.get('current_subject') |
|
|
if not subject or subject not in self.available_materials: |
|
|
logger.warning("Subject context lost or invalid in handle_back_actions. Returning to main menu.") |
|
|
return await self.start(update, context) |
|
|
|
|
|
subject_files = self.available_materials[subject]['files'] |
|
|
subject_name = subject.replace('_', ' ').title() |
|
|
|
|
|
keyboard = [ |
|
|
[InlineKeyboardButton("📖 شرح محاضرة", callback_data="explain_lecture"), InlineKeyboardButton("🔍 استعراض الملفات", callback_data="browse_files")], |
|
|
[InlineKeyboardButton("❓ توليد أسئلة", callback_data="generate_questions"), InlineKeyboardButton("📝 تلخيص ملف", callback_data="summarize_content")], |
|
|
[InlineKeyboardButton("🧪 تفسير مفهوم", callback_data="explain_concept")], |
|
|
[InlineKeyboardButton("🏠 العودة للقائمة الرئيسية", callback_data="main_menu")] |
|
|
] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
|
|
|
await query.edit_message_text( |
|
|
f"📚 **{subject_name}**\n\n" |
|
|
f"ماذا تريد أن تفعل؟", |
|
|
reply_markup=reply_markup, |
|
|
parse_mode='Markdown' |
|
|
) |
|
|
return SELECTING_ACTION |
|
|
|
|
|
async def handle_main_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة العودة للقائمة الرئيسية""" |
|
|
query = update.callback_query |
|
|
await query.answer() |
|
|
logger.info(f"User {query.from_user.id} requested main menu.") |
|
|
context.user_data.clear() |
|
|
return await self.start(update, context) |
|
|
|
|
|
async def handle_more_questions(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة طلب المزيد من الأسئلة أو العودة""" |
|
|
query = update.callback_query |
|
|
await query.answer() |
|
|
logger.info(f"User {query.from_user.id} requested more questions / interaction.") |
|
|
|
|
|
subject = context.user_data.get('current_subject', 'عام') |
|
|
|
|
|
keyboard = [ |
|
|
[InlineKeyboardButton("🔙 العودة لقائمة الخيارات", callback_data="back_to_actions")], |
|
|
] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
|
|
|
await query.edit_message_text(f"💬 تفضل، اكتب سؤالك أو طلبك الجديد المتعلق بمادة '{subject}':", reply_markup=reply_markup) |
|
|
context.user_data['waiting_for'] = 'general' |
|
|
return WAITING_FOR_QUESTION |
|
|
|
|
|
async def handle_change_subject(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة تغيير المادة""" |
|
|
query = update.callback_query |
|
|
await query.answer() |
|
|
logger.info(f"User {query.from_user.id} requested change subject.") |
|
|
keyboard = self.create_subjects_keyboard() |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
await query.edit_message_text("🔄 اختر المادة الجديدة:", reply_markup=reply_markup) |
|
|
return SELECTING_SUBJECT |
|
|
|
|
|
async def handle_action_selection(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة اختيار الإجراء""" |
|
|
query = update.callback_query |
|
|
await query.answer() |
|
|
|
|
|
action = query.data |
|
|
user_id = query.from_user.id |
|
|
subject = context.user_data.get('current_subject') |
|
|
logger.info(f"User {user_id} selected action: {action} for subject: {subject}") |
|
|
|
|
|
|
|
|
if not subject or subject not in self.available_materials: |
|
|
logger.error("Subject context lost or invalid in handle_action_selection.") |
|
|
await query.edit_message_text("❌ حدث خطأ في السياق. يرجى البدء من جديد.") |
|
|
return await self.start(update, context) |
|
|
|
|
|
|
|
|
back_button = InlineKeyboardButton("🔙 رجوع", callback_data="back_to_actions") |
|
|
keyboard_with_back = [[back_button]] |
|
|
reply_markup_back = InlineKeyboardMarkup(keyboard_with_back) |
|
|
|
|
|
|
|
|
if action == "main_menu": |
|
|
return await self.start(update, context) |
|
|
|
|
|
elif action == "browse_files": |
|
|
files_text = await self.get_files_list_text(subject) |
|
|
keyboard = [ |
|
|
|
|
|
[InlineKeyboardButton("📖 طلب شرح ملف محدد", callback_data="explain_lecture")], |
|
|
[back_button] |
|
|
] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
await query.edit_message_text(files_text, reply_markup=reply_markup, parse_mode='Markdown') |
|
|
|
|
|
return SELECTING_ACTION |
|
|
|
|
|
|
|
|
elif action == "explain_lecture": |
|
|
files_list = await self.get_files_list_text(subject) |
|
|
if files_list.startswith("❌"): |
|
|
await query.edit_message_text(files_list, reply_markup=reply_markup_back) |
|
|
return SELECTING_ACTION |
|
|
|
|
|
await query.edit_message_text( |
|
|
f"📖 **شرح محاضرة**\n\n" |
|
|
f"{files_list}\n\n" |
|
|
f"📝 اكتب رقم الملف من القائمة أعلاه أو جزءاً من اسمه:", |
|
|
reply_markup=reply_markup_back, |
|
|
parse_mode='Markdown' |
|
|
) |
|
|
context.user_data['waiting_for'] = 'lecture_explanation' |
|
|
return WAITING_FOR_QUESTION |
|
|
|
|
|
elif action == "generate_questions": |
|
|
await query.edit_message_text("⏳ حسناً، جاري توليد بعض الأسئلة للمراجعة...", reply_markup=reply_markup_back) |
|
|
questions = await self.generate_questions_for_subject(subject, user_id) |
|
|
|
|
|
await query.message.reply_text(questions, reply_markup=reply_markup_back, parse_mode='Markdown') |
|
|
return SELECTING_ACTION |
|
|
|
|
|
elif action == "explain_concept": |
|
|
await query.edit_message_text( |
|
|
"🧪 **تفسير مفهوم**\n\n" |
|
|
"ما هو المفهوم أو المصطلح الطبي الذي تود شرحه؟\n" |
|
|
"مثال: 'ما هو تحليل CBC؟' أو 'اشرح لي دورة كريبس'", |
|
|
reply_markup=reply_markup_back |
|
|
) |
|
|
context.user_data['waiting_for'] = 'concept_explanation' |
|
|
return WAITING_FOR_QUESTION |
|
|
|
|
|
elif action == "summarize_content": |
|
|
await query.edit_message_text("⏳ تمام، جاري تلخيص ملف مهم من المادة...", reply_markup=reply_markup_back) |
|
|
summary = await self.generate_summary(subject, user_id) |
|
|
|
|
|
await query.message.reply_text(summary, reply_markup=reply_markup_back, parse_mode='Markdown') |
|
|
return SELECTING_ACTION |
|
|
|
|
|
|
|
|
logger.warning(f"Unknown action selected: {action}") |
|
|
await query.message.reply_text("عذراً، لم أتعرف على هذا الخيار.") |
|
|
return SELECTING_ACTION |
|
|
|
|
|
|
|
|
async def browse_available_files(self, query, context): |
|
|
"""عرض الملفات المتاحة للمادة - تم دمجه في handle_action_selection""" |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
async def get_files_list_text(self, subject): |
|
|
"""إنشاء نص لقائمة الملفات""" |
|
|
if subject not in self.available_materials: |
|
|
logger.error(f"Subject '{subject}' not found when trying to list files.") |
|
|
return "❌ خطأ: لم يتم العثور على المادة المحددة." |
|
|
|
|
|
files = self.available_materials[subject]['files'] |
|
|
|
|
|
if not files: |
|
|
return "❌ لا توجد ملفات متاحة لهذه المادة بعد." |
|
|
|
|
|
files_text = f"📁 **الملفات المتاحة لمادة {subject}:**\n\n" |
|
|
|
|
|
max_files_to_show = 25 |
|
|
for i, file_info in enumerate(files[:max_files_to_show], 1): |
|
|
file_name = file_info['name'] |
|
|
lecture_num = file_info['lecture_number'] |
|
|
file_type = file_info['type'] |
|
|
|
|
|
type_emoji = { |
|
|
'lecture': '📖', |
|
|
'lab': '🧪', |
|
|
'exam': '📝', |
|
|
'summary': '📋', |
|
|
'unknown': '📄' |
|
|
}.get(file_type, '📄') |
|
|
|
|
|
num_text = f" (محاضرة {lecture_num})" if lecture_num else "" |
|
|
|
|
|
|
|
|
display_name = file_name.replace("_", " ") |
|
|
files_text += f"{i}. {type_emoji} `{display_name}`{num_text}\n" |
|
|
|
|
|
if len(files) > max_files_to_show: |
|
|
files_text += f"\n... و {len(files) - max_files_to_show} ملفات أخرى." |
|
|
|
|
|
return files_text |
|
|
|
|
|
|
|
|
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
|
"""معالجة الرسائل النصية من المستخدم""" |
|
|
if not update.message or not update.message.text: |
|
|
return |
|
|
|
|
|
user_message = update.message.text |
|
|
user_id = update.effective_user.id |
|
|
waiting_for = context.user_data.get('waiting_for') |
|
|
subject = context.user_data.get('current_subject', 'general') |
|
|
logger.info(f"User {user_id} sent message: '{user_message}' | waiting_for: {waiting_for} | subject: {subject}") |
|
|
|
|
|
|
|
|
await update.message.chat.send_action(action="typing") |
|
|
|
|
|
response = "عذراً، لم أفهم طلبك. هل يمكنك توضيحه؟" |
|
|
next_state = WAITING_FOR_QUESTION |
|
|
|
|
|
try: |
|
|
if waiting_for == 'lecture_explanation': |
|
|
response = await self.explain_lecture(user_message, subject, user_id) |
|
|
next_state = SELECTING_ACTION |
|
|
elif waiting_for == 'concept_explanation': |
|
|
response = await self.explain_concept(user_message, subject, user_id) |
|
|
next_state = SELECTING_ACTION |
|
|
else: |
|
|
response = await self.process_general_query(user_message, subject, user_id) |
|
|
next_state = SELECTING_ACTION |
|
|
|
|
|
|
|
|
await update.message.reply_text(response, parse_mode='Markdown') |
|
|
|
|
|
|
|
|
if next_state == SELECTING_ACTION: |
|
|
context.user_data['waiting_for'] = None |
|
|
keyboard = [ |
|
|
[InlineKeyboardButton("🔄 طرح سؤال آخر", callback_data="more_questions")], |
|
|
[InlineKeyboardButton("📚 تغيير المادة", callback_data="change_subject")], |
|
|
[InlineKeyboardButton("🏠 القائمة الرئيسية", callback_data="main_menu")] |
|
|
] |
|
|
reply_markup = InlineKeyboardMarkup(keyboard) |
|
|
await update.message.reply_text("هل تحتاج مساعدة أخرى؟", reply_markup=reply_markup) |
|
|
|
|
|
return next_state |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error processing message from user {user_id}: {e}", exc_info=True) |
|
|
await update.message.reply_text("❌ حدث خطأ غير متوقع أثناء معالجة طلبك. لقد تم تسجيل الخطأ. يرجى المحاولة مرة أخرى أو اختيار خيار آخر.") |
|
|
context.user_data['waiting_for'] = None |
|
|
return SELECTING_ACTION |
|
|
|
|
|
|
|
|
|
|
|
bot = MedicalLabBot() |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def root(): |
|
|
"""الصفحة الرئيسية""" |
|
|
materials_count = len(bot.available_materials) |
|
|
total_files = sum(len(material['files']) for material in bot.available_materials.values()) |
|
|
|
|
|
|
|
|
status_message = "⏳ جاري التهيئة (الاتصال بـ Telegram)..." |
|
|
status_color = "#ffc107" |
|
|
init_details = "" |
|
|
if bot.initialization_status == "success": |
|
|
status_message = "✅ نشط ومهيأ" |
|
|
status_color = "#28a745" |
|
|
init_details = "تم الاتصال بـ Telegram وضبط الويب هوك بنجاح." |
|
|
elif bot.initialization_status == "failed": |
|
|
status_message = "❌ فشل التهيئة" |
|
|
status_color = "#dc3545" |
|
|
init_details = "فشل الاتصال بـ Telegram أو ضبط الويب هوك بعد عدة محاولات." |
|
|
|
|
|
|
|
|
return f""" |
|
|
<html> |
|
|
<head> |
|
|
<title>Medical Lab Bot Status</title> |
|
|
<style> |
|
|
body {{ font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Oxygen-Sans, Ubuntu, Cantarell, "Helvetica Neue", sans-serif; margin: 40px; background-color: #f8f9fa; color: #343a40; line-height: 1.6; }} |
|
|
.container {{ max-width: 800px; margin: 20px auto; background-color: #ffffff; padding: 30px; border-radius: 12px; box-shadow: 0 4px 12px rgba(0,0,0,0.08); }} |
|
|
.status-box {{ padding: 25px; margin-bottom: 25px; border-radius: 8px; border-left: 6px solid {status_color}; background-color: #e9ecef; }} |
|
|
.status-box h2 {{ margin-top: 0; color: #212529; font-size: 1.6em; }} |
|
|
.status-box p {{ margin-bottom: 8px; font-size: 1.1em; }} |
|
|
h1 {{ color: #0056b3; text-align: center; margin-bottom: 30px; font-weight: 600; }} |
|
|
h3 {{ color: #0056b3; border-bottom: 2px solid #dee2e6; padding-bottom: 8px; margin-top: 30px; }} |
|
|
ul {{ list-style: none; padding-left: 0; }} |
|
|
li {{ margin-bottom: 12px; padding-left: 25px; position: relative; }} |
|
|
li::before {{ content: '✓'; color: {status_color}; position: absolute; left: 0; font-weight: bold; }} |
|
|
.info-box {{ padding: 20px; background-color: #f1f3f5; border-radius: 8px; margin-top: 30px; font-size: 0.95em; color: #495057; }} |
|
|
strong {{ color: #212529; }} |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<div class="container"> |
|
|
<h1>🏥 حالة بوت المختبرات الطبية الذكي</h1> |
|
|
<div class="status-box"> |
|
|
<h2>{status_message}</h2> |
|
|
<p><strong>تفاصيل التهيئة:</strong> {init_details}</p> |
|
|
<p><strong>المواد المحملة:</strong> {materials_count} مادة</p> |
|
|
<p><strong>إجمالي الملفات:</strong> {total_files} ملف</p> |
|
|
</div> |
|
|
<h3>🎯 المميزات الرئيسية:</h3> |
|
|
<ul> |
|
|
<li>شرح المواد الدراسية من ملفات PDF وWord وTXT.</li> |
|
|
<li>توليد أسئلة متنوعة للمراجعة واختبار الفهم.</li> |
|
|
<li>تلخيص المحتوى الدراسي في نقاط رئيسية.</li> |
|
|
<li>تفسير المفاهيم والمصطلحات العلمية والطبية.</li> |
|
|
<li>استعراض الملفات المتاحة لكل مادة.</li> |
|
|
<li>ذاكرة محادثة لكل مستخدم للحفاظ على السياق.</li> |
|
|
</ul> |
|
|
<div class="info-box"> |
|
|
<strong>ℹ️ معلومات تقنية:</strong> هذا البوت يستخدم FastAPI للواجهة، Uvicorn كخادم، مكتبة python-telegram-bot للتفاعل مع Telegram، Hugging Face Datasets لتخزين المواد الدراسية، وواجهة برمجة تطبيقات NVIDIA للقدرات الذكية. |
|
|
</div> |
|
|
</div> |
|
|
</body> |
|
|
</html> |
|
|
""" |
|
|
|
|
|
|
|
|
@app.post("/telegram", response_model=Dict[str, str]) |
|
|
async def handle_telegram_update(request: Request): |
|
|
"""معالجة تحديثات Telegram""" |
|
|
try: |
|
|
|
|
|
if not bot.is_initialized or not bot.application: |
|
|
logger.error("❌ التطبيق غير مهيأ، لا يمكن معالجة التحديث (ربما لا يزال قيد التهيئة).") |
|
|
|
|
|
raise HTTPException(status_code=503, detail="Application not initialized or still initializing") |
|
|
|
|
|
update_data = await request.json() |
|
|
|
|
|
update = Update.de_json(update_data, bot.application.bot) |
|
|
|
|
|
|
|
|
asyncio.create_task(bot.application.process_update(update)) |
|
|
|
|
|
|
|
|
return {"status": "ok"} |
|
|
|
|
|
except json.JSONDecodeError: |
|
|
logger.error("❌ Received invalid JSON data.") |
|
|
raise HTTPException(status_code=400, detail="Invalid JSON data") |
|
|
except HTTPException as http_exc: |
|
|
|
|
|
raise http_exc |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error processing update: {e}", exc_info=True) |
|
|
|
|
|
raise HTTPException(status_code=500, detail="Internal server error while processing update") |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
"""فحص صحة الخدمة""" |
|
|
materials_count = len(bot.available_materials) |
|
|
total_files = sum(len(material['files']) for material in bot.available_materials.values()) |
|
|
|
|
|
status_code = 503 |
|
|
service_status = "unhealthy" |
|
|
if bot.initialization_status == "pending": |
|
|
service_status = "initializing" |
|
|
|
|
|
|
|
|
elif bot.initialization_status == "success": |
|
|
service_status = "healthy" |
|
|
status_code = 200 |
|
|
elif bot.initialization_status == "failed": |
|
|
service_status = "unhealthy_failed_init" |
|
|
status_code = 500 |
|
|
|
|
|
|
|
|
response_payload = { |
|
|
"status": service_status, |
|
|
"service": "medical-lab-bot", |
|
|
"initialization_status": bot.initialization_status, |
|
|
"materials_loaded": materials_count, |
|
|
"total_files": total_files, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
from fastapi.responses import JSONResponse |
|
|
return JSONResponse(content=response_payload, status_code=status_code) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
port = int(os.environ.get("PORT", 7860)) |
|
|
|
|
|
log_level = os.environ.get("LOG_LEVEL", "info").lower() |
|
|
logger.info(f"🚀 Starting Medical Lab Bot on port {port} with log level {log_level}") |
|
|
uvicorn.run( |
|
|
"app:app", |
|
|
host="0.0.0.0", |
|
|
port=port, |
|
|
log_level=log_level, |
|
|
reload=False |
|
|
) |