ChatbotRAG / hybrid_chat_endpoint.py
minhvtt's picture
Upload 20 files
2ecdea6 verified
raw
history blame
9.83 kB
"""
Hybrid Chat Endpoint: RAG + Scenario FSM
Routes between scripted scenarios and knowledge retrieval
"""
from fastapi import HTTPException
from datetime import datetime
from typing import Dict, Any
import json
async def hybrid_chat_endpoint(
request, # ChatRequest
conversation_service,
intent_classifier,
scenario_engine,
tools_service,
advanced_rag,
embedding_service,
qdrant_service,
chat_history_collection,
hf_token,
lead_storage # NEW: For saving customer leads
):
"""
Hybrid conversational chatbot: Scenario FSM + RAG
Flow:
1. Load session & scenario state
2. Classify intent (scenario vs RAG)
3. Route:
- Scenario: Execute FSM flow
- RAG: Knowledge retrieval
- RAG+Resume: Answer question then resume scenario
4. Save state & history
"""
try:
# ===== SESSION MANAGEMENT =====
session_id = request.session_id
if not session_id:
session_id = conversation_service.create_session(
metadata={"user_agent": "api", "created_via": "hybrid_chat"},
user_id=request.user_id
)
print(f"✓ Created session: {session_id} (user: {request.user_id or 'anon'})")
else:
if not conversation_service.session_exists(session_id):
raise HTTPException(404, detail=f"Session {session_id} not found")
# ===== LOAD SCENARIO STATE =====
scenario_state = conversation_service.get_scenario_state(session_id) or {}
# ===== INTENT CLASSIFICATION =====
intent = intent_classifier.classify(request.message, scenario_state)
print(f"🎯 Intent: {intent}")
# ===== ROUTING =====
if intent.startswith("scenario:"):
# Route to scenario engine
response_data = await handle_scenario(
intent,
request.message,
session_id,
scenario_state,
scenario_engine,
conversation_service,
advanced_rag,
lead_storage # NEW: Pass for action handling
)
elif intent == "rag:with_resume":
# Answer question but keep scenario active
response_data = await handle_rag_with_resume(
request,
session_id,
scenario_state,
advanced_rag,
embedding_service,
qdrant_service,
conversation_service
)
else: # rag:general
# Pure RAG query
response_data = await handle_pure_rag(
request,
session_id,
advanced_rag,
embedding_service,
qdrant_service,
tools_service,
chat_history_collection,
hf_token,
conversation_service
)
# ===== SAVE HISTORY =====
conversation_service.add_message(
session_id,
"user",
request.message,
metadata={"intent": intent}
)
conversation_service.add_message(
session_id,
"assistant",
response_data["response"],
metadata={
"mode": response_data.get("mode", "unknown"),
"context_used": response_data.get("context_used", [])[:3] # Limit size
}
)
return {
"response": response_data["response"],
"session_id": session_id,
"mode": response_data.get("mode"),
"scenario_active": response_data.get("scenario_active", False),
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
print(f"❌ Error in hybrid_chat: {str(e)}")
raise HTTPException(500, detail=f"Chat error: {str(e)}")
async def handle_scenario(
intent,
user_message,
session_id,
scenario_state,
scenario_engine,
conversation_service,
advanced_rag,
lead_storage=None
):
"""Handle scenario-based conversation"""
if intent == "scenario:continue":
# Continue existing scenario
result = scenario_engine.next_step(
scenario_id=scenario_state["active_scenario"],
current_step=scenario_state["scenario_step"],
user_input=user_message,
scenario_data=scenario_state.get("scenario_data", {}),
rag_service=advanced_rag
)
else:
# Start new scenario
scenario_type = intent.split(":", 1)[1]
result = scenario_engine.start_scenario(scenario_type)
# Update scenario state
if result.get("end_scenario"):
conversation_service.clear_scenario(session_id)
scenario_active = False
else:
conversation_service.set_scenario_state(session_id, result["new_state"])
scenario_active = True
# Execute action if any
if result.get("action") and lead_storage:
action = result['action']
scenario_data = result.get('new_state', {}).get('scenario_data', scenario_state.get('scenario_data', {}))
if action == "send_pdf_email":
# Save lead with email
lead_storage.save_lead(
event_name=scenario_data.get('step_1_input', 'Unknown Event'),
email=scenario_data.get('step_5_input'), # Email from step 5
interests={
"group": scenario_data.get('group_size'),
"wants_pdf": True
},
session_id=session_id
)
print(f"📧 Lead saved: email sent (saved to DB)")
elif action == "save_lead_phone":
# Save lead with phone
lead_storage.save_lead(
event_name=scenario_data.get('step_1_input', 'Unknown Event'),
email=scenario_data.get('step_5_input'),
phone=scenario_data.get('step_8_input'), # Phone from step 8
interests={
"group": scenario_data.get('group_size'),
"wants_reminder": True
},
session_id=session_id
)
print(f"📱 Lead saved: SMS reminder (saved to DB)")
return {
"response": result["message"],
"mode": "scenario",
"scenario_active": scenario_active
}
async def handle_rag_with_resume(
request,
session_id,
scenario_state,
advanced_rag,
embedding_service,
qdrant_service,
conversation_service
):
"""
Handle RAG query mid-scenario
Answer question properly, then remind user to continue scenario
"""
# Query RAG with proper search
context_used = []
if request.use_rag:
query_embedding = embedding_service.encode_text(request.message)
results = qdrant_service.search(
query_embedding=query_embedding,
limit=request.top_k,
score_threshold=request.score_threshold,
ef=256
)
context_used = results
# Build REAL RAG response (not placeholder)
if context_used and len(context_used) > 0:
# Format top results nicely
top_result = context_used[0]
text = top_result['metadata'].get('text', '')
# Extract most relevant snippet (first 300 chars)
if text:
rag_response = text[:300].strip()
if len(text) > 300:
rag_response += "..."
else:
rag_response = "Tôi tìm thấy thông tin nhưng không thể hiển thị chi tiết."
# If multiple results, add count
if len(context_used) > 1:
rag_response += f"\n\n(Tìm thấy {len(context_used)} kết quả liên quan)"
else:
rag_response = "Xin lỗi, tôi không tìm thấy thông tin về câu hỏi này trong tài liệu."
# Add resume hint
resume_hint = "\n\n---\n💬 Vậy nha! Quay lại câu hỏi trước, bạn đã quyết định chưa?"
return {
"response": rag_response + resume_hint,
"mode": "rag_with_resume",
"scenario_active": True,
"context_used": context_used
}
async def handle_pure_rag(
request,
session_id,
advanced_rag,
embedding_service,
qdrant_service,
tools_service,
chat_history_collection,
hf_token,
conversation_service
):
"""
Handle pure RAG query (fallback to existing logic)
"""
# Import existing chat_endpoint logic
from chat_endpoint import chat_endpoint
# Call existing endpoint
result = await chat_endpoint(
request,
conversation_service,
tools_service,
advanced_rag,
embedding_service,
qdrant_service,
chat_history_collection,
hf_token
)
return {
"response": result["response"],
"mode": "rag",
"context_used": result.get("context_used", [])
}
async def simple_rag_response(message, context, system_message):
"""Simple RAG response without LLM (for quick answers)"""
if context:
# Return top context
top = context[0]
return f"{top['metadata'].get('text', 'Không tìm thấy thông tin.')}"
return "Xin lỗi, tôi không tìm thấy thông tin về điều này."