minhvtt commited on
Commit
f376a38
·
verified ·
1 Parent(s): 20cf93e

Update hybrid_chat_stream.py

Browse files
Files changed (1) hide show
  1. hybrid_chat_stream.py +241 -207
hybrid_chat_stream.py CHANGED
@@ -1,207 +1,241 @@
1
- """
2
- Hybrid Chat Streaming Endpoint
3
- Real-time SSE streaming for scenarios + RAG
4
- """
5
- from typing import AsyncGenerator
6
- import asyncio
7
- from datetime import datetime
8
-
9
- from stream_utils import (
10
- format_sse, stream_text_slowly,
11
- EVENT_STATUS, EVENT_TOKEN, EVENT_DONE, EVENT_ERROR, EVENT_METADATA
12
- )
13
-
14
-
15
- async def hybrid_chat_stream(
16
- request,
17
- conversation_service,
18
- intent_classifier,
19
- scenario_engine,
20
- advanced_rag,
21
- embedding_service,
22
- qdrant_service,
23
- hf_token,
24
- lead_storage
25
- ) -> AsyncGenerator[str, None]:
26
- """
27
- Stream chat responses in real-time (SSE format)
28
-
29
- Yields SSE events:
30
- - status: "Đang suy nghĩ...", "Đang tìm kiếm..."
31
- - token: Individual text chunks
32
- - metadata: Context, session info
33
- - done: Completion signal
34
- - error: Error messages
35
- """
36
- try:
37
- # === SESSION MANAGEMENT ===
38
- session_id = request.session_id
39
- if not session_id:
40
- session_id = conversation_service.create_session(
41
- metadata={"user_agent": "api", "created_via": "stream"},
42
- user_id=request.user_id
43
- )
44
- yield format_sse(EVENT_METADATA, {"session_id": session_id})
45
-
46
- # === INTENT CLASSIFICATION ===
47
- yield format_sse(EVENT_STATUS, "Đang phân tích câu hỏi...")
48
-
49
- scenario_state = conversation_service.get_scenario_state(session_id) or {}
50
- intent = intent_classifier.classify(request.message, scenario_state)
51
-
52
- # === ROUTING ===
53
- if intent.startswith("scenario:"):
54
- # Scenario flow with simulated streaming
55
- async for sse_event in handle_scenario_stream(
56
- intent, request.message, session_id,
57
- scenario_state, scenario_engine, conversation_service, lead_storage
58
- ):
59
- yield sse_event
60
-
61
- elif intent == "rag:with_resume":
62
- # Quick RAG answer + resume scenario
63
- yield format_sse(EVENT_STATUS, "Đang tra cứu...")
64
- async for sse_event in handle_rag_stream(
65
- request, advanced_rag, embedding_service, qdrant_service
66
- ):
67
- yield sse_event
68
-
69
- # Resume hint
70
- async for chunk in stream_text_slowly(
71
- "\n\n---\nVậy nha! Quay lại câu hỏi trước nhé ^^",
72
- chars_per_chunk=5,
73
- delay_ms=15
74
- ):
75
- yield chunk
76
-
77
- else: # Pure RAG
78
- yield format_sse(EVENT_STATUS, "Đang tìm kiếm trong tài liệu...")
79
- async for sse_event in handle_rag_stream(
80
- request, advanced_rag, embedding_service, qdrant_service
81
- ):
82
- yield sse_event
83
-
84
- # === SAVE HISTORY ===
85
- # Note: We'll save the full response after streaming completes
86
- # This requires buffering on the server side
87
-
88
- # === DONE ===
89
- yield format_sse(EVENT_DONE, {
90
- "session_id": session_id,
91
- "timestamp": datetime.utcnow().isoformat()
92
- })
93
-
94
- except Exception as e:
95
- yield format_sse(EVENT_ERROR, str(e))
96
-
97
-
98
- async def handle_scenario_stream(
99
- intent, user_message, session_id,
100
- scenario_state, scenario_engine, conversation_service, lead_storage
101
- ) -> AsyncGenerator[str, None]:
102
- """
103
- Handle scenario with simulated typing effect
104
- """
105
- # Get scenario response (sync)
106
- if intent == "scenario:continue":
107
- result = scenario_engine.next_step(
108
- scenario_id=scenario_state["active_scenario"],
109
- current_step=scenario_state["scenario_step"],
110
- user_input=user_message,
111
- scenario_data=scenario_state.get("scenario_data", {})
112
- )
113
- else:
114
- scenario_type = intent.split(":", 1)[1]
115
- result = scenario_engine.start_scenario(scenario_type)
116
-
117
- # Update state
118
- if result.get("end_scenario"):
119
- conversation_service.clear_scenario(session_id)
120
- elif result.get("new_state"):
121
- conversation_service.set_scenario_state(session_id, result["new_state"])
122
-
123
- # Execute actions
124
- if result.get("action") and lead_storage:
125
- action = result['action']
126
- scenario_data = result.get('new_state', {}).get('scenario_data', {})
127
-
128
- if action == "send_pdf_email":
129
- lead_storage.save_lead(
130
- event_name=scenario_data.get('step_1_input', 'Unknown'),
131
- email=scenario_data.get('step_5_input'),
132
- interests={"group": scenario_data.get('group_size'), "wants_pdf": True},
133
- session_id=session_id
134
- )
135
- elif action == "save_lead_phone":
136
- lead_storage.save_lead(
137
- event_name=scenario_data.get('step_1_input', 'Unknown'),
138
- email=scenario_data.get('step_5_input'),
139
- phone=scenario_data.get('step_8_input'),
140
- interests={"group": scenario_data.get('group_size'), "wants_reminder": True},
141
- session_id=session_id
142
- )
143
-
144
- # Stream response with typing effect
145
- response_text = result["message"]
146
- async for chunk in stream_text_slowly(
147
- response_text,
148
- chars_per_chunk=4, # Faster for scenarios
149
- delay_ms=15
150
- ):
151
- yield chunk
152
-
153
- yield format_sse(EVENT_METADATA, {
154
- "mode": "scenario",
155
- "scenario_active": not result.get("end_scenario")
156
- })
157
-
158
-
159
- async def handle_rag_stream(
160
- request, advanced_rag, embedding_service, qdrant_service
161
- ) -> AsyncGenerator[str, None]:
162
- """
163
- Handle RAG with real LLM streaming
164
- """
165
- # RAG search (sync part)
166
- context_used = []
167
- if request.use_rag:
168
- query_embedding = embedding_service.encode_text(request.message)
169
- results = qdrant_service.search(
170
- query_embedding=query_embedding,
171
- limit=request.top_k,
172
- score_threshold=request.score_threshold,
173
- ef=256
174
- )
175
- context_used = results
176
-
177
- # Build context
178
- if context_used:
179
- context_str = "\n\n".join([
180
- f"[{i+1}] {r['metadata'].get('text', '')[:500]}"
181
- for i, r in enumerate(context_used[:3])
182
- ])
183
- else:
184
- context_str = "Không tìm thấy thông tin liên quan."
185
-
186
- # Simple response (for now - can integrate with real LLM streaming later)
187
- if context_used:
188
- response_text = f"Dựa trên tài liệu, {context_used[0]['metadata'].get('text', '')[:300]}..."
189
- else:
190
- response_text = "Xin lỗi, tôi không tìm thấy thông tin về câu hỏi này."
191
-
192
- # Simulate streaming (will be replaced with real HF streaming)
193
- async for chunk in stream_text_slowly(
194
- response_text,
195
- chars_per_chunk=3,
196
- delay_ms=20
197
- ):
198
- yield chunk
199
-
200
- yield format_sse(EVENT_METADATA, {
201
- "mode": "rag",
202
- "context_count": len(context_used)
203
- })
204
-
205
-
206
- # TODO: Implement real HF InferenceClient streaming
207
- # This requires updating advanced_rag.py to support stream=True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Hybrid Chat Streaming Endpoint
3
+ Real-time SSE streaming for scenarios + RAG
4
+ """
5
+ from typing import AsyncGenerator
6
+ import asyncio
7
+ from datetime import datetime
8
+
9
+ from stream_utils import (
10
+ format_sse, stream_text_slowly,
11
+ EVENT_STATUS, EVENT_TOKEN, EVENT_DONE, EVENT_ERROR, EVENT_METADATA
12
+ )
13
+
14
+ # Import scenario handlers
15
+ from scenario_handlers.price_inquiry import PriceInquiryHandler
16
+ from scenario_handlers.event_recommendation import EventRecommendationHandler
17
+ from scenario_handlers.post_event_feedback import PostEventFeedbackHandler
18
+ from scenario_handlers.exit_intent_rescue import ExitIntentRescueHandler
19
+
20
+
21
+ async def hybrid_chat_stream(
22
+ request,
23
+ conversation_service,
24
+ intent_classifier,
25
+ embedding_service, # For handlers
26
+ qdrant_service, # For handlers
27
+ advanced_rag,
28
+ hf_token,
29
+ lead_storage
30
+ ) -> AsyncGenerator[str, None]:
31
+ """
32
+ Stream chat responses in real-time (SSE format)
33
+
34
+ Yields SSE events:
35
+ - status: "Đang suy nghĩ...", "Đang tìm kiếm..."
36
+ - token: Individual text chunks
37
+ - metadata: Context, session info
38
+ - done: Completion signal
39
+ - error: Error messages
40
+ """
41
+ try:
42
+ # === SESSION MANAGEMENT ===
43
+ session_id = request.session_id
44
+ if not session_id:
45
+ session_id = conversation_service.create_session(
46
+ metadata={"user_agent": "api", "created_via": "stream"},
47
+ user_id=request.user_id
48
+ )
49
+ yield format_sse(EVENT_METADATA, {"session_id": session_id})
50
+
51
+ # === INTENT CLASSIFICATION ===
52
+ yield format_sse(EVENT_STATUS, "Đang phân tích câu hỏi...")
53
+
54
+ scenario_state = conversation_service.get_scenario_state(session_id) or {}
55
+ intent = intent_classifier.classify(request.message, scenario_state)
56
+
57
+ # === ROUTING ===
58
+ if intent.startswith("scenario:"):
59
+ # Scenario flow with simulated streaming using handlers
60
+ async for sse_event in handle_scenario_stream(
61
+ intent, request.message, session_id,
62
+ scenario_state, embedding_service, qdrant_service,
63
+ conversation_service, lead_storage
64
+ ):
65
+ yield sse_event
66
+
67
+ elif intent == "rag:with_resume":
68
+ # Quick RAG answer + resume scenario
69
+ yield format_sse(EVENT_STATUS, "Đang tra cứu...")
70
+ async for sse_event in handle_rag_stream(
71
+ request, advanced_rag, embedding_service, qdrant_service
72
+ ):
73
+ yield sse_event
74
+
75
+ # Resume hint
76
+ async for chunk in stream_text_slowly(
77
+ "\n\n---\nVậy nha! Quay lại câu hỏi trước nhé ^^",
78
+ chars_per_chunk=5,
79
+ delay_ms=15
80
+ ):
81
+ yield chunk
82
+
83
+ else: # Pure RAG
84
+ yield format_sse(EVENT_STATUS, "Đang tìm kiếm trong tài liệu...")
85
+ async for sse_event in handle_rag_stream(
86
+ request, advanced_rag, embedding_service, qdrant_service
87
+ ):
88
+ yield sse_event
89
+
90
+ # === SAVE HISTORY ===
91
+ # Note: We'll save the full response after streaming completes
92
+ # This requires buffering on the server side
93
+
94
+ # === DONE ===
95
+ yield format_sse(EVENT_DONE, {
96
+ "session_id": session_id,
97
+ "timestamp": datetime.utcnow().isoformat()
98
+ })
99
+
100
+ except Exception as e:
101
+ yield format_sse(EVENT_ERROR, str(e))
102
+
103
+
104
+ async def handle_scenario_stream(
105
+ intent, user_message, session_id,
106
+ scenario_state, embedding_service, qdrant_service,
107
+ conversation_service, lead_storage
108
+ ) -> AsyncGenerator[str, None]:
109
+ """
110
+ Handle scenario with simulated typing effect using dedicated handlers
111
+ """
112
+ # Initialize all scenario handlers
113
+ handlers = {
114
+ 'price_inquiry': PriceInquiryHandler(embedding_service, qdrant_service, lead_storage),
115
+ 'event_recommendation': EventRecommendationHandler(embedding_service, qdrant_service, lead_storage),
116
+ 'post_event_feedback': PostEventFeedbackHandler(embedding_service, qdrant_service, lead_storage),
117
+ 'exit_intent_rescue': ExitIntentRescueHandler(embedding_service, qdrant_service, lead_storage)
118
+ }
119
+
120
+ # Get scenario response using handlers
121
+ if intent == "scenario:continue":
122
+ scenario_id = scenario_state.get("active_scenario")
123
+
124
+ if scenario_id not in handlers:
125
+ yield format_sse(EVENT_ERROR, f"Scenario '{scenario_id}' không tồn tại")
126
+ return
127
+
128
+ handler = handlers[scenario_id]
129
+ result = handler.next_step(
130
+ current_step=scenario_state.get("scenario_step", 1),
131
+ user_input=user_message,
132
+ scenario_data=scenario_state.get("scenario_data", {})
133
+ )
134
+ else:
135
+ scenario_type = intent.split(":", 1)[1]
136
+
137
+ if scenario_type not in handlers:
138
+ yield format_sse(EVENT_ERROR, f"Scenario '{scenario_type}' không tồn tại")
139
+ return
140
+
141
+ handler = handlers[scenario_type]
142
+ initial_data = scenario_state.get("scenario_data", {})
143
+ result = handler.start(initial_data=initial_data)
144
+
145
+ # Show loading message if RAG is being performed
146
+ if result.get("loading_message"):
147
+ yield format_sse(EVENT_STATUS, result["loading_message"])
148
+ # Small delay to let UI show loading
149
+ await asyncio.sleep(0.1)
150
+
151
+ # Update state
152
+ if result.get("end_scenario"):
153
+ conversation_service.clear_scenario(session_id)
154
+ elif result.get("new_state"):
155
+ conversation_service.set_scenario_state(session_id, result["new_state"])
156
+
157
+ # Execute actions
158
+ if result.get("action") and lead_storage:
159
+ action = result['action']
160
+ scenario_data = result.get('new_state', {}).get('scenario_data', {})
161
+
162
+ if action == "send_pdf_email":
163
+ lead_storage.save_lead(
164
+ event_name=scenario_data.get('step_1_input', 'Unknown'),
165
+ email=scenario_data.get('step_5_input'),
166
+ interests={"group": scenario_data.get('group_size'), "wants_pdf": True},
167
+ session_id=session_id
168
+ )
169
+ elif action == "save_lead_phone":
170
+ lead_storage.save_lead(
171
+ event_name=scenario_data.get('step_1_input', 'Unknown'),
172
+ email=scenario_data.get('step_5_input'),
173
+ phone=scenario_data.get('step_8_input'),
174
+ interests={"group": scenario_data.get('group_size'), "wants_reminder": True},
175
+ session_id=session_id
176
+ )
177
+
178
+ # Stream response with typing effect
179
+ response_text = result["message"]
180
+ async for chunk in stream_text_slowly(
181
+ response_text,
182
+ chars_per_chunk=4, # Faster for scenarios
183
+ delay_ms=15
184
+ ):
185
+ yield chunk
186
+
187
+ yield format_sse(EVENT_METADATA, {
188
+ "mode": "scenario",
189
+ "scenario_active": not result.get("end_scenario")
190
+ })
191
+
192
+
193
+ async def handle_rag_stream(
194
+ request, advanced_rag, embedding_service, qdrant_service
195
+ ) -> AsyncGenerator[str, None]:
196
+ """
197
+ Handle RAG with real LLM streaming
198
+ """
199
+ # RAG search (sync part)
200
+ context_used = []
201
+ if request.use_rag:
202
+ query_embedding = embedding_service.encode_text(request.message)
203
+ results = qdrant_service.search(
204
+ query_embedding=query_embedding,
205
+ limit=request.top_k,
206
+ score_threshold=request.score_threshold,
207
+ ef=256
208
+ )
209
+ context_used = results
210
+
211
+ # Build context
212
+ if context_used:
213
+ context_str = "\n\n".join([
214
+ f"[{i+1}] {r['metadata'].get('text', '')[:500]}"
215
+ for i, r in enumerate(context_used[:3])
216
+ ])
217
+ else:
218
+ context_str = "Không tìm thấy thông tin liên quan."
219
+
220
+ # Simple response (for now - can integrate with real LLM streaming later)
221
+ if context_used:
222
+ response_text = f"Dựa trên tài liệu, {context_used[0]['metadata'].get('text', '')[:300]}..."
223
+ else:
224
+ response_text = "Xin lỗi, tôi không tìm thấy thông tin về câu hỏi này."
225
+
226
+ # Simulate streaming (will be replaced with real HF streaming)
227
+ async for chunk in stream_text_slowly(
228
+ response_text,
229
+ chars_per_chunk=3,
230
+ delay_ms=20
231
+ ):
232
+ yield chunk
233
+
234
+ yield format_sse(EVENT_METADATA, {
235
+ "mode": "rag",
236
+ "context_count": len(context_used)
237
+ })
238
+
239
+
240
+ # TODO: Implement real HF InferenceClient streaming
241
+ # This requires updating advanced_rag.py to support stream=True