Commit
·
aa79ca3
1
Parent(s):
63eb410
fixed auto retry for fake streaming
Browse files- app/api_helpers.py +47 -27
- app/routes/chat_api.py +10 -4
app/api_helpers.py
CHANGED
|
@@ -66,21 +66,13 @@ def is_response_valid(response):
|
|
| 66 |
# print(f"DEBUG: Response valid due to part.text in candidate's content part")
|
| 67 |
return True
|
| 68 |
|
| 69 |
-
#
|
| 70 |
-
#
|
| 71 |
-
#
|
| 72 |
-
|
| 73 |
-
# Check if there's any block reason, which might be interesting to log or handle
|
| 74 |
-
if hasattr(response.prompt_feedback, 'block_reason') and response.prompt_feedback.block_reason:
|
| 75 |
-
print(f"DEBUG: Response has prompt_feedback with block_reason: {response.prompt_feedback.block_reason}, considering it valid for processing.")
|
| 76 |
-
else:
|
| 77 |
-
print("DEBUG: Response has prompt_feedback (no block_reason), considering it valid for processing.")
|
| 78 |
-
return True
|
| 79 |
-
|
| 80 |
-
print("DEBUG: Response is invalid, no usable text content or prompt_feedback found.")
|
| 81 |
return False
|
| 82 |
|
| 83 |
-
async def fake_stream_generator(client_instance, model_name: str, prompt: Union[types.Content, List[types.Content]], current_gen_config: Dict[str, Any], request_obj: OpenAIRequest):
|
| 84 |
response_id = f"chatcmpl-{int(time.time())}"
|
| 85 |
async def fake_stream_inner():
|
| 86 |
print(f"FAKE STREAMING: Making non-streaming request to Gemini API (Model: {model_name})")
|
|
@@ -98,8 +90,20 @@ async def fake_stream_generator(client_instance, model_name: str, prompt: Union[
|
|
| 98 |
await asyncio.sleep(app_config.FAKE_STREAMING_INTERVAL_SECONDS)
|
| 99 |
try:
|
| 100 |
response = api_call_task.result()
|
| 101 |
-
|
| 102 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 103 |
full_text = ""
|
| 104 |
if hasattr(response, 'text'):
|
| 105 |
full_text = response.text or "" # Coalesce None to empty string
|
|
@@ -136,10 +140,12 @@ async def fake_stream_generator(client_instance, model_name: str, prompt: Union[
|
|
| 136 |
# It's good practice to log the JSON payload here too for consistency,
|
| 137 |
# though the main concern was the true streaming path.
|
| 138 |
json_payload_for_fake_stream_error = json.dumps(err_resp)
|
| 139 |
-
|
| 140 |
-
|
| 141 |
-
|
| 142 |
-
|
|
|
|
|
|
|
| 143 |
return fake_stream_inner()
|
| 144 |
|
| 145 |
async def execute_gemini_call(
|
|
@@ -147,14 +153,15 @@ async def execute_gemini_call(
|
|
| 147 |
model_to_call: str,
|
| 148 |
prompt_func: Callable[[List[OpenAIMessage]], Union[types.Content, List[types.Content]]],
|
| 149 |
gen_config_for_call: Dict[str, Any],
|
| 150 |
-
request_obj: OpenAIRequest # Pass the whole request object
|
|
|
|
| 151 |
):
|
| 152 |
actual_prompt_for_call = prompt_func(request_obj.messages)
|
| 153 |
|
| 154 |
if request_obj.stream:
|
| 155 |
if app_config.FAKE_STREAMING_ENABLED:
|
| 156 |
return StreamingResponse(
|
| 157 |
-
await fake_stream_generator(current_client, model_to_call, actual_prompt_for_call, gen_config_for_call, request_obj),
|
| 158 |
media_type="text/event-stream"
|
| 159 |
)
|
| 160 |
|
|
@@ -180,15 +187,28 @@ async def execute_gemini_call(
|
|
| 180 |
|
| 181 |
err_resp_content_call = create_openai_error_response(500, error_message_str, "server_error")
|
| 182 |
json_payload_for_error = json.dumps(err_resp_content_call)
|
| 183 |
-
|
| 184 |
-
|
| 185 |
-
|
| 186 |
-
|
|
|
|
|
|
|
| 187 |
return StreamingResponse(_stream_generator_inner_for_execute(), media_type="text/event-stream")
|
| 188 |
else:
|
| 189 |
response_obj_call = await current_client.aio.models.generate_content(
|
| 190 |
model=model_to_call, contents=actual_prompt_for_call, config=gen_config_for_call
|
| 191 |
)
|
| 192 |
-
|
| 193 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 194 |
return JSONResponse(content=convert_to_openai_format(response_obj_call, request_obj.model))
|
|
|
|
| 66 |
# print(f"DEBUG: Response valid due to part.text in candidate's content part")
|
| 67 |
return True
|
| 68 |
|
| 69 |
+
# Removed prompt_feedback as a sole criterion for validity.
|
| 70 |
+
# It should only be valid if actual text content is found.
|
| 71 |
+
# Block reasons will be checked explicitly by callers if they need to treat it as an error for retries.
|
| 72 |
+
print("DEBUG: Response is invalid, no usable text content found by is_response_valid.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 73 |
return False
|
| 74 |
|
| 75 |
+
async def fake_stream_generator(client_instance, model_name: str, prompt: Union[types.Content, List[types.Content]], current_gen_config: Dict[str, Any], request_obj: OpenAIRequest, is_auto_attempt: bool):
|
| 76 |
response_id = f"chatcmpl-{int(time.time())}"
|
| 77 |
async def fake_stream_inner():
|
| 78 |
print(f"FAKE STREAMING: Making non-streaming request to Gemini API (Model: {model_name})")
|
|
|
|
| 90 |
await asyncio.sleep(app_config.FAKE_STREAMING_INTERVAL_SECONDS)
|
| 91 |
try:
|
| 92 |
response = api_call_task.result()
|
| 93 |
+
|
| 94 |
+
# Check for safety blocks first, as this should trigger a retry in auto-mode
|
| 95 |
+
if hasattr(response, 'prompt_feedback') and \
|
| 96 |
+
hasattr(response.prompt_feedback, 'block_reason') and \
|
| 97 |
+
response.prompt_feedback.block_reason:
|
| 98 |
+
block_message = f"Response blocked by safety filter: {response.prompt_feedback.block_reason}"
|
| 99 |
+
if hasattr(response.prompt_feedback, 'block_reason_message') and response.prompt_feedback.block_reason_message:
|
| 100 |
+
block_message = f"Response blocked by safety filter: {response.prompt_feedback.block_reason_message} (Reason: {response.prompt_feedback.block_reason})"
|
| 101 |
+
print(f"DEBUG: {block_message} (in fake_stream_generator)") # Log this specific condition
|
| 102 |
+
raise ValueError(block_message) # This will be caught by the except Exception as e below it
|
| 103 |
+
|
| 104 |
+
if not is_response_valid(response): # is_response_valid now only checks for actual text
|
| 105 |
+
raise ValueError(f"Invalid/empty response in fake stream (no text content): {str(response)[:200]}")
|
| 106 |
+
|
| 107 |
full_text = ""
|
| 108 |
if hasattr(response, 'text'):
|
| 109 |
full_text = response.text or "" # Coalesce None to empty string
|
|
|
|
| 140 |
# It's good practice to log the JSON payload here too for consistency,
|
| 141 |
# though the main concern was the true streaming path.
|
| 142 |
json_payload_for_fake_stream_error = json.dumps(err_resp)
|
| 143 |
+
# Log the error JSON that WOULD have been sent if not in auto-mode or if this was the final error handler.
|
| 144 |
+
print(f"DEBUG: Internal error in fake_stream_generator. JSON error for handler: {json_payload_for_fake_stream_error}")
|
| 145 |
+
if not is_auto_attempt:
|
| 146 |
+
yield f"data: {json_payload_for_fake_stream_error}\n\n"
|
| 147 |
+
yield "data: [DONE]\n\n"
|
| 148 |
+
raise e # Re-raise the original exception e
|
| 149 |
return fake_stream_inner()
|
| 150 |
|
| 151 |
async def execute_gemini_call(
|
|
|
|
| 153 |
model_to_call: str,
|
| 154 |
prompt_func: Callable[[List[OpenAIMessage]], Union[types.Content, List[types.Content]]],
|
| 155 |
gen_config_for_call: Dict[str, Any],
|
| 156 |
+
request_obj: OpenAIRequest, # Pass the whole request object
|
| 157 |
+
is_auto_attempt: bool = False
|
| 158 |
):
|
| 159 |
actual_prompt_for_call = prompt_func(request_obj.messages)
|
| 160 |
|
| 161 |
if request_obj.stream:
|
| 162 |
if app_config.FAKE_STREAMING_ENABLED:
|
| 163 |
return StreamingResponse(
|
| 164 |
+
await fake_stream_generator(current_client, model_to_call, actual_prompt_for_call, gen_config_for_call, request_obj, is_auto_attempt=is_auto_attempt),
|
| 165 |
media_type="text/event-stream"
|
| 166 |
)
|
| 167 |
|
|
|
|
| 187 |
|
| 188 |
err_resp_content_call = create_openai_error_response(500, error_message_str, "server_error")
|
| 189 |
json_payload_for_error = json.dumps(err_resp_content_call)
|
| 190 |
+
# Log the error JSON that WOULD have been sent if not in auto-mode or if this was the final error handler.
|
| 191 |
+
print(f"DEBUG: Internal error in _stream_generator_inner_for_execute. JSON error for handler: {json_payload_for_error}")
|
| 192 |
+
if not is_auto_attempt: # is_auto_attempt is from execute_gemini_call's scope
|
| 193 |
+
yield f"data: {json_payload_for_error}\n\n"
|
| 194 |
+
yield "data: [DONE]\n\n"
|
| 195 |
+
raise e_stream_call # Re-raise the original exception
|
| 196 |
return StreamingResponse(_stream_generator_inner_for_execute(), media_type="text/event-stream")
|
| 197 |
else:
|
| 198 |
response_obj_call = await current_client.aio.models.generate_content(
|
| 199 |
model=model_to_call, contents=actual_prompt_for_call, config=gen_config_for_call
|
| 200 |
)
|
| 201 |
+
|
| 202 |
+
# Check for safety blocks first for non-streaming calls
|
| 203 |
+
if hasattr(response_obj_call, 'prompt_feedback') and \
|
| 204 |
+
hasattr(response_obj_call.prompt_feedback, 'block_reason') and \
|
| 205 |
+
response_obj_call.prompt_feedback.block_reason:
|
| 206 |
+
block_message = f"Response blocked by safety filter: {response_obj_call.prompt_feedback.block_reason}"
|
| 207 |
+
if hasattr(response_obj_call.prompt_feedback, 'block_reason_message') and response_obj_call.prompt_feedback.block_reason_message:
|
| 208 |
+
block_message = f"Response blocked by safety filter: {response_obj_call.prompt_feedback.block_reason_message} (Reason: {response_obj_call.prompt_feedback.block_reason})"
|
| 209 |
+
print(f"DEBUG: {block_message} (in execute_gemini_call non-streaming)") # Log this specific condition
|
| 210 |
+
raise ValueError(block_message)
|
| 211 |
+
|
| 212 |
+
if not is_response_valid(response_obj_call): # is_response_valid now only checks for actual text
|
| 213 |
+
raise ValueError("Invalid/empty response from non-streaming Gemini call (no text content).")
|
| 214 |
return JSONResponse(content=convert_to_openai_format(response_obj_call, request_obj.model))
|
app/routes/chat_api.py
CHANGED
|
@@ -240,7 +240,8 @@ async def chat_completions(fastapi_request: Request, request: OpenAIRequest, api
|
|
| 240 |
print(f"Auto-mode attempting: '{attempt['name']}' for model {attempt['model']}")
|
| 241 |
current_gen_config = attempt["config_modifier"](generation_config.copy())
|
| 242 |
try:
|
| 243 |
-
|
|
|
|
| 244 |
except Exception as e_auto:
|
| 245 |
last_err = e_auto
|
| 246 |
print(f"Auto-attempt '{attempt['name']}' for model {attempt['model']} failed: {e_auto}")
|
|
@@ -251,11 +252,15 @@ async def chat_completions(fastapi_request: Request, request: OpenAIRequest, api
|
|
| 251 |
if not request.stream and last_err:
|
| 252 |
return JSONResponse(status_code=500, content=create_openai_error_response(500, err_msg, "server_error"))
|
| 253 |
elif request.stream:
|
| 254 |
-
|
|
|
|
| 255 |
err_content = create_openai_error_response(500, err_msg, "server_error")
|
| 256 |
-
|
|
|
|
|
|
|
|
|
|
| 257 |
yield "data: [DONE]\n\n"
|
| 258 |
-
return StreamingResponse(
|
| 259 |
return JSONResponse(status_code=500, content=create_openai_error_response(500, "All auto-mode attempts failed without specific error.", "server_error"))
|
| 260 |
|
| 261 |
else: # Not an auto model
|
|
@@ -284,6 +289,7 @@ async def chat_completions(fastapi_request: Request, request: OpenAIRequest, api
|
|
| 284 |
# This means if `request.model` was "gemini-1.5-pro-search", `base_model_name` becomes "gemini-1.5-pro"
|
| 285 |
# but the API call might need the full "gemini-1.5-pro-search".
|
| 286 |
# Let's use `request.model` for the API call here, and `base_model_name` for checks like Express eligibility.
|
|
|
|
| 287 |
return await execute_gemini_call(client_to_use, base_model_name, current_prompt_func, generation_config, request)
|
| 288 |
|
| 289 |
except Exception as e:
|
|
|
|
| 240 |
print(f"Auto-mode attempting: '{attempt['name']}' for model {attempt['model']}")
|
| 241 |
current_gen_config = attempt["config_modifier"](generation_config.copy())
|
| 242 |
try:
|
| 243 |
+
# Pass is_auto_attempt=True for auto-mode calls
|
| 244 |
+
return await execute_gemini_call(client_to_use, attempt["model"], attempt["prompt_func"], current_gen_config, request, is_auto_attempt=True)
|
| 245 |
except Exception as e_auto:
|
| 246 |
last_err = e_auto
|
| 247 |
print(f"Auto-attempt '{attempt['name']}' for model {attempt['model']} failed: {e_auto}")
|
|
|
|
| 252 |
if not request.stream and last_err:
|
| 253 |
return JSONResponse(status_code=500, content=create_openai_error_response(500, err_msg, "server_error"))
|
| 254 |
elif request.stream:
|
| 255 |
+
# This is the final error handling for auto-mode if all attempts fail AND it was a streaming request
|
| 256 |
+
async def final_auto_error_stream():
|
| 257 |
err_content = create_openai_error_response(500, err_msg, "server_error")
|
| 258 |
+
json_payload_final_auto_error = json.dumps(err_content)
|
| 259 |
+
# Log the final error being sent to client after all auto-retries failed
|
| 260 |
+
print(f"DEBUG: Auto-mode all attempts failed. Yielding final error JSON: {json_payload_final_auto_error}")
|
| 261 |
+
yield f"data: {json_payload_final_auto_error}\n\n"
|
| 262 |
yield "data: [DONE]\n\n"
|
| 263 |
+
return StreamingResponse(final_auto_error_stream(), media_type="text/event-stream")
|
| 264 |
return JSONResponse(status_code=500, content=create_openai_error_response(500, "All auto-mode attempts failed without specific error.", "server_error"))
|
| 265 |
|
| 266 |
else: # Not an auto model
|
|
|
|
| 289 |
# This means if `request.model` was "gemini-1.5-pro-search", `base_model_name` becomes "gemini-1.5-pro"
|
| 290 |
# but the API call might need the full "gemini-1.5-pro-search".
|
| 291 |
# Let's use `request.model` for the API call here, and `base_model_name` for checks like Express eligibility.
|
| 292 |
+
# For non-auto mode, is_auto_attempt defaults to False in execute_gemini_call
|
| 293 |
return await execute_gemini_call(client_to_use, base_model_name, current_prompt_func, generation_config, request)
|
| 294 |
|
| 295 |
except Exception as e:
|