Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| from typing import List, Dict, Optional | |
| import os | |
| import asyncio | |
| import threading | |
| import uvicorn | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| from contextlib import asynccontextmanager | |
| import requests | |
| import time | |
| # Import your modules | |
| from agent import agent_executor | |
| from valetax_rag import ValetaxRAG | |
| # Global variables | |
| valetax_rag = None | |
| app = None | |
| api_server_thread = None | |
| API_BASE_URL = "http://127.0.0.1:8000" | |
| # FastAPI Models | |
| class QueryRequest(BaseModel): | |
| question: str | |
| max_sources: int = 6 | |
| class QueryResponse(BaseModel): | |
| answer: str | |
| # Initialize RAG system | |
| async def lifespan(app: FastAPI): | |
| global valetax_rag | |
| try: | |
| data_path = "documents.json" # Adjust path as needed | |
| if os.path.exists(data_path): | |
| valetax_rag = ValetaxRAG(data_path) | |
| print("✅ Valetax RAG system initialized successfully") | |
| else: | |
| print(f"❌ Data file not found: {data_path}") | |
| raise FileNotFoundError(f"Data file not found: {data_path}") | |
| yield | |
| except Exception as e: | |
| print(f"❌ Failed to initialize RAG system: {e}") | |
| raise | |
| finally: | |
| print("Shutting down RAG system...") | |
| # Create FastAPI app | |
| def create_fastapi_app(): | |
| app = FastAPI( | |
| title="Valetax Customer Service API", | |
| description="Arabic customer service chatbot for Valetax", | |
| version="1.0.0", | |
| lifespan=lifespan | |
| ) | |
| async def ask_agent(request: QueryRequest): | |
| if not valetax_rag: | |
| raise HTTPException( | |
| status_code=503, | |
| detail="RAG system not initialized" | |
| ) | |
| try: | |
| response = await agent_executor.ainvoke({"input": request.question}) | |
| answer = response.get("output", "No response received") | |
| return QueryResponse(answer=answer) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error processing request: {str(e)}" | |
| ) | |
| async def health_check(): | |
| return { | |
| "status": "healthy", | |
| "rag_initialized": valetax_rag is not None | |
| } | |
| async def root(): | |
| return { | |
| "message": "مرحبًا بك في خدمة عملاء Valetax", | |
| "endpoints": { | |
| "ask": "/ask", | |
| "health": "/health", | |
| "docs": "/docs" | |
| } | |
| } | |
| return app | |
| # Start FastAPI server in background thread | |
| def start_fastapi_server(): | |
| global app | |
| app = create_fastapi_app() | |
| uvicorn.run( | |
| app, | |
| host="127.0.0.1", | |
| port=8000, | |
| log_level="info" | |
| ) | |
| def wait_for_api_server(max_retries=30, delay=1): | |
| """Wait for API server to be ready""" | |
| for i in range(max_retries): | |
| try: | |
| response = requests.get(f"{API_BASE_URL}/health", timeout=5) | |
| if response.status_code == 200: | |
| print("✅ API server is ready") | |
| return True | |
| except requests.exceptions.RequestException: | |
| pass | |
| print(f"⏳ Waiting for API server... ({i+1}/{max_retries})") | |
| time.sleep(delay) | |
| print("❌ API server failed to start") | |
| return False | |
| # Gradio functions | |
| def call_api(query: str, max_sources: int = 3): | |
| """Call FastAPI endpoint""" | |
| try: | |
| response = requests.post( | |
| f"{API_BASE_URL}/ask", | |
| json={"question": query, "max_sources": max_sources}, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data["answer"] | |
| else: | |
| return f"❌ API Error: {response.status_code} - {response.text}" | |
| except requests.exceptions.RequestException as e: | |
| return f"❌ Connection Error: {str(e)}" | |
| def respond( | |
| message: str, | |
| history: List[List[str]], | |
| max_sources: int = 3 | |
| ): | |
| """ | |
| Response function for Gradio using FastAPI | |
| """ | |
| if not message.strip(): | |
| yield "من فضلك اكتب سؤالك" | |
| return | |
| try: | |
| # Check if API is available | |
| health_response = requests.get(f"{API_BASE_URL}/health", timeout=5) | |
| if health_response.status_code != 200: | |
| yield "❌ API server is not available" | |
| return | |
| # Get response from API | |
| response = call_api(message, max_sources) | |
| # Simulate streaming response | |
| if response and not response.startswith("❌"): | |
| words = response.split() | |
| partial_response = "" | |
| for i, word in enumerate(words): | |
| partial_response += word + " " | |
| if i % 5 == 0: # Update every few words | |
| yield partial_response | |
| yield response | |
| else: | |
| yield response or "❌ لم يتم الحصول على رد" | |
| except Exception as e: | |
| yield f"❌ حدث خطأ: {str(e)}" | |
| def test_system(): | |
| """Test system status""" | |
| try: | |
| # Test API health | |
| response = requests.get(f"{API_BASE_URL}/health", timeout=5) | |
| if response.status_code == 200: | |
| health_data = response.json() | |
| rag_status = "✅ مفعل" if health_data.get("rag_initialized") else "❌ غير مفعل" | |
| return f"✅ النظام يعمل بشكل صحيح\n✅ API Server: متاح\n✅ RAG System: {rag_status}" | |
| else: | |
| return f"❌ API Server Error: {response.status_code}" | |
| except Exception as e: | |
| return f"❌ خطأ في الاتصال: {str(e)}" | |
| # Create the Gradio ChatInterface | |
| chatbot = gr.ChatInterface( | |
| respond, | |
| type="messages", | |
| title="🏛️ Valetax Customer Service (API Mode)", | |
| description="خدمة عملاء فاليتاكس - Arabic customer service chatbot with FastAPI backend", | |
| additional_inputs=[ | |
| gr.Slider( | |
| minimum=1, | |
| maximum=10, | |
| value=3, | |
| step=1, | |
| label="Max Sources", | |
| info="أقصى عدد مصادر من قاعدة المعرفة" | |
| ) | |
| ], | |
| examples=[ | |
| ["ما هو فاليتاكس؟"], | |
| ["كيف يمكنني التسجيل؟"], | |
| ["ما هي خدماتكم؟"], | |
| ["عندي مشكلة في النظام"], | |
| ["What is Valetax?"], | |
| ["How can I register?"] | |
| ], | |
| cache_examples=False | |
| ) | |
| # Custom CSS | |
| css = """ | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| } | |
| .message.user { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; | |
| color: white !important; | |
| } | |
| .message.bot { | |
| background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%) !important; | |
| color: white !important; | |
| } | |
| .status-indicator { | |
| border-radius: 10px; | |
| padding: 10px; | |
| margin: 5px 0; | |
| } | |
| .status-success { | |
| background-color: #d4edda; | |
| color: #155724; | |
| border: 1px solid #c3e6cb; | |
| } | |
| .status-error { | |
| background-color: #f8d7da; | |
| color: #721c24; | |
| border: 1px solid #f5c6cb; | |
| } | |
| """ | |
| # Create the main Gradio interface | |
| with gr.Blocks(css=css, theme=gr.themes.Soft(), title="Valetax Customer Service") as demo: | |
| gr.HTML(""" | |
| <div style="text-align: center; margin-bottom: 20px;"> | |
| <h1>🏛️ Valetax Customer Service</h1> | |
| <h2 style="color: #666; margin-top: 10px;">خدمة عملاء فاليتاكس (API Mode)</h2> | |
| <p style="color: #888;">اسأل عن خدمات فاليتاكس بالعربية أو الإنجليزية</p> | |
| <p style="color: #666; font-size: 14px;">🔗 API Backend: FastAPI | Frontend: Gradio</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| chatbot.render() | |
| with gr.Column(scale=1): | |
| gr.Markdown("### ⚙️ حالة النظام") | |
| # System status | |
| with gr.Group(): | |
| gr.Markdown("**حالة النظام**") | |
| test_btn = gr.Button("🔍 فحص النظام", variant="secondary") | |
| system_status = gr.HTML(value="<div class='status-indicator'>⏳ جاري التحقق...</div>") | |
| def update_system_status(): | |
| result = test_system() | |
| if "✅" in result: | |
| return f"<div class='status-indicator status-success'>{result}</div>" | |
| else: | |
| return f"<div class='status-indicator status-error'>{result}</div>" | |
| test_btn.click(update_system_status, outputs=system_status) | |
| # API Information | |
| with gr.Group(): | |
| gr.Markdown("**معلومات API**") | |
| gr.Markdown(f""" | |
| - **API URL**: `{API_BASE_URL}` | |
| - **Endpoints**: | |
| - `POST /ask` - سؤال الـ Agent | |
| - `GET /health` - حالة النظام | |
| - `GET /docs` - API Documentation | |
| - **Frontend**: Gradio Interface | |
| """) | |
| # Instructions | |
| with gr.Group(): | |
| gr.Markdown("**التعليمات**") | |
| gr.Markdown(""" | |
| 1. **API Backend**: FastAPI يعمل في الخلفية | |
| 2. **للاستفسارات**: اسأل عن أي خدمة | |
| 3. **للمشاكل**: صف المشكلة مع الإيميل | |
| 4. **الدعم**: آدم سيساعدك بكل ود | |
| **API Benefits**: | |
| - أداء أفضل وأكثر استقرار | |
| - إمكانية التكامل مع تطبيقات أخرى | |
| - مراقبة وإدارة أفضل للأخطاء | |
| """) | |
| def main(): | |
| """Main function to launch FastAPI + Gradio""" | |
| print("🚀 Starting Valetax Customer Service with FastAPI + Gradio...") | |
| # Check if documents file exists | |
| if os.path.exists("documents.json"): | |
| print("✅ Found documents.json") | |
| else: | |
| print("❌ Warning: documents.json not found") | |
| # Start FastAPI server in background thread | |
| print("🔥 Starting FastAPI server...") | |
| global api_server_thread | |
| api_server_thread = threading.Thread(target=start_fastapi_server, daemon=True) | |
| api_server_thread.start() | |
| # Wait for API server to be ready | |
| if not wait_for_api_server(): | |
| print("❌ Failed to start API server. Exiting...") | |
| return | |
| # Test initial connection | |
| try: | |
| response = requests.get(f"{API_BASE_URL}/health") | |
| print(f"✅ API Health Check: {response.json()}") | |
| except Exception as e: | |
| print(f"⚠️ API Health Check Warning: {e}") | |
| # Launch Gradio interface | |
| print("🎨 Starting Gradio interface...") | |
| demo.launch( | |
| share=True, | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True | |
| ) | |
| if __name__ == "__main__": | |
| main() |