bassommma's picture
Update app.py
2ce4f16 verified
import gradio as gr
import json
from typing import List, Dict, Optional
import os
import asyncio
import threading
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from contextlib import asynccontextmanager
import requests
import time
# Import your modules
from agent import agent_executor
from valetax_rag import ValetaxRAG
# Global variables
valetax_rag = None
app = None
api_server_thread = None
API_BASE_URL = "http://127.0.0.1:8000"
# FastAPI Models
class QueryRequest(BaseModel):
question: str
max_sources: int = 6
class QueryResponse(BaseModel):
answer: str
# Initialize RAG system
@asynccontextmanager
async def lifespan(app: FastAPI):
global valetax_rag
try:
data_path = "documents.json" # Adjust path as needed
if os.path.exists(data_path):
valetax_rag = ValetaxRAG(data_path)
print("✅ Valetax RAG system initialized successfully")
else:
print(f"❌ Data file not found: {data_path}")
raise FileNotFoundError(f"Data file not found: {data_path}")
yield
except Exception as e:
print(f"❌ Failed to initialize RAG system: {e}")
raise
finally:
print("Shutting down RAG system...")
# Create FastAPI app
def create_fastapi_app():
app = FastAPI(
title="Valetax Customer Service API",
description="Arabic customer service chatbot for Valetax",
version="1.0.0",
lifespan=lifespan
)
@app.post("/ask", response_model=QueryResponse)
async def ask_agent(request: QueryRequest):
if not valetax_rag:
raise HTTPException(
status_code=503,
detail="RAG system not initialized"
)
try:
response = await agent_executor.ainvoke({"input": request.question})
answer = response.get("output", "No response received")
return QueryResponse(answer=answer)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error processing request: {str(e)}"
)
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"rag_initialized": valetax_rag is not None
}
@app.get("/")
async def root():
return {
"message": "مرحبًا بك في خدمة عملاء Valetax",
"endpoints": {
"ask": "/ask",
"health": "/health",
"docs": "/docs"
}
}
return app
# Start FastAPI server in background thread
def start_fastapi_server():
global app
app = create_fastapi_app()
uvicorn.run(
app,
host="127.0.0.1",
port=8000,
log_level="info"
)
def wait_for_api_server(max_retries=30, delay=1):
"""Wait for API server to be ready"""
for i in range(max_retries):
try:
response = requests.get(f"{API_BASE_URL}/health", timeout=5)
if response.status_code == 200:
print("✅ API server is ready")
return True
except requests.exceptions.RequestException:
pass
print(f"⏳ Waiting for API server... ({i+1}/{max_retries})")
time.sleep(delay)
print("❌ API server failed to start")
return False
# Gradio functions
def call_api(query: str, max_sources: int = 3):
"""Call FastAPI endpoint"""
try:
response = requests.post(
f"{API_BASE_URL}/ask",
json={"question": query, "max_sources": max_sources},
timeout=30
)
if response.status_code == 200:
data = response.json()
return data["answer"]
else:
return f"❌ API Error: {response.status_code} - {response.text}"
except requests.exceptions.RequestException as e:
return f"❌ Connection Error: {str(e)}"
def respond(
message: str,
history: List[List[str]],
max_sources: int = 3
):
"""
Response function for Gradio using FastAPI
"""
if not message.strip():
yield "من فضلك اكتب سؤالك"
return
try:
# Check if API is available
health_response = requests.get(f"{API_BASE_URL}/health", timeout=5)
if health_response.status_code != 200:
yield "❌ API server is not available"
return
# Get response from API
response = call_api(message, max_sources)
# Simulate streaming response
if response and not response.startswith("❌"):
words = response.split()
partial_response = ""
for i, word in enumerate(words):
partial_response += word + " "
if i % 5 == 0: # Update every few words
yield partial_response
yield response
else:
yield response or "❌ لم يتم الحصول على رد"
except Exception as e:
yield f"❌ حدث خطأ: {str(e)}"
def test_system():
"""Test system status"""
try:
# Test API health
response = requests.get(f"{API_BASE_URL}/health", timeout=5)
if response.status_code == 200:
health_data = response.json()
rag_status = "✅ مفعل" if health_data.get("rag_initialized") else "❌ غير مفعل"
return f"✅ النظام يعمل بشكل صحيح\n✅ API Server: متاح\n✅ RAG System: {rag_status}"
else:
return f"❌ API Server Error: {response.status_code}"
except Exception as e:
return f"❌ خطأ في الاتصال: {str(e)}"
# Create the Gradio ChatInterface
chatbot = gr.ChatInterface(
respond,
type="messages",
title="🏛️ Valetax Customer Service (API Mode)",
description="خدمة عملاء فاليتاكس - Arabic customer service chatbot with FastAPI backend",
additional_inputs=[
gr.Slider(
minimum=1,
maximum=10,
value=3,
step=1,
label="Max Sources",
info="أقصى عدد مصادر من قاعدة المعرفة"
)
],
examples=[
["ما هو فاليتاكس؟"],
["كيف يمكنني التسجيل؟"],
["ما هي خدماتكم؟"],
["عندي مشكلة في النظام"],
["What is Valetax?"],
["How can I register?"]
],
cache_examples=False
)
# Custom CSS
css = """
.gradio-container {
max-width: 1200px !important;
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
}
.message.user {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
color: white !important;
}
.message.bot {
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%) !important;
color: white !important;
}
.status-indicator {
border-radius: 10px;
padding: 10px;
margin: 5px 0;
}
.status-success {
background-color: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.status-error {
background-color: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}
"""
# Create the main Gradio interface
with gr.Blocks(css=css, theme=gr.themes.Soft(), title="Valetax Customer Service") as demo:
gr.HTML("""
<div style="text-align: center; margin-bottom: 20px;">
<h1>🏛️ Valetax Customer Service</h1>
<h2 style="color: #666; margin-top: 10px;">خدمة عملاء فاليتاكس (API Mode)</h2>
<p style="color: #888;">اسأل عن خدمات فاليتاكس بالعربية أو الإنجليزية</p>
<p style="color: #666; font-size: 14px;">🔗 API Backend: FastAPI | Frontend: Gradio</p>
</div>
""")
with gr.Row():
with gr.Column(scale=4):
chatbot.render()
with gr.Column(scale=1):
gr.Markdown("### ⚙️ حالة النظام")
# System status
with gr.Group():
gr.Markdown("**حالة النظام**")
test_btn = gr.Button("🔍 فحص النظام", variant="secondary")
system_status = gr.HTML(value="<div class='status-indicator'>⏳ جاري التحقق...</div>")
def update_system_status():
result = test_system()
if "✅" in result:
return f"<div class='status-indicator status-success'>{result}</div>"
else:
return f"<div class='status-indicator status-error'>{result}</div>"
test_btn.click(update_system_status, outputs=system_status)
# API Information
with gr.Group():
gr.Markdown("**معلومات API**")
gr.Markdown(f"""
- **API URL**: `{API_BASE_URL}`
- **Endpoints**:
- `POST /ask` - سؤال الـ Agent
- `GET /health` - حالة النظام
- `GET /docs` - API Documentation
- **Frontend**: Gradio Interface
""")
# Instructions
with gr.Group():
gr.Markdown("**التعليمات**")
gr.Markdown("""
1. **API Backend**: FastAPI يعمل في الخلفية
2. **للاستفسارات**: اسأل عن أي خدمة
3. **للمشاكل**: صف المشكلة مع الإيميل
4. **الدعم**: آدم سيساعدك بكل ود
**API Benefits**:
- أداء أفضل وأكثر استقرار
- إمكانية التكامل مع تطبيقات أخرى
- مراقبة وإدارة أفضل للأخطاء
""")
def main():
"""Main function to launch FastAPI + Gradio"""
print("🚀 Starting Valetax Customer Service with FastAPI + Gradio...")
# Check if documents file exists
if os.path.exists("documents.json"):
print("✅ Found documents.json")
else:
print("❌ Warning: documents.json not found")
# Start FastAPI server in background thread
print("🔥 Starting FastAPI server...")
global api_server_thread
api_server_thread = threading.Thread(target=start_fastapi_server, daemon=True)
api_server_thread.start()
# Wait for API server to be ready
if not wait_for_api_server():
print("❌ Failed to start API server. Exiting...")
return
# Test initial connection
try:
response = requests.get(f"{API_BASE_URL}/health")
print(f"✅ API Health Check: {response.json()}")
except Exception as e:
print(f"⚠️ API Health Check Warning: {e}")
# Launch Gradio interface
print("🎨 Starting Gradio interface...")
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)
if __name__ == "__main__":
main()