Update main.py
Browse files
main.py
CHANGED
|
@@ -7,7 +7,6 @@ import random
|
|
| 7 |
import logging
|
| 8 |
import time
|
| 9 |
from contextlib import asynccontextmanager
|
| 10 |
-
import asyncio
|
| 11 |
|
| 12 |
# --- Production-Ready Configuration ---
|
| 13 |
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
|
@@ -46,17 +45,20 @@ app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
|
|
| 46 |
# --- API Endpoints ---
|
| 47 |
|
| 48 |
# 1. Health Check Route (Defined FIRST)
|
|
|
|
| 49 |
@app.get("/")
|
| 50 |
async def health_check():
|
| 51 |
"""Provides a basic health check endpoint."""
|
| 52 |
return JSONResponse({"status": "ok", "target": TARGET_URL})
|
| 53 |
|
| 54 |
# 2. Catch-All Reverse Proxy Route (Defined SECOND)
|
|
|
|
|
|
|
| 55 |
@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
|
| 56 |
async def reverse_proxy_handler(request: Request):
|
| 57 |
"""
|
| 58 |
A catch-all reverse proxy that forwards requests to the target URL with
|
| 59 |
-
enhanced retry logic
|
| 60 |
"""
|
| 61 |
start_time = time.monotonic()
|
| 62 |
|
|
@@ -82,7 +84,6 @@ async def reverse_proxy_handler(request: Request):
|
|
| 82 |
body = await request.body()
|
| 83 |
|
| 84 |
last_exception = None
|
| 85 |
-
rp_resp = None
|
| 86 |
for attempt in range(MAX_RETRIES):
|
| 87 |
try:
|
| 88 |
rp_req = client.build_request(
|
|
@@ -91,62 +92,30 @@ async def reverse_proxy_handler(request: Request):
|
|
| 91 |
rp_resp = await client.send(rp_req, stream=True)
|
| 92 |
|
| 93 |
if rp_resp.status_code not in RETRY_STATUS_CODES or attempt == MAX_RETRIES - 1:
|
| 94 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 95 |
|
| 96 |
logging.warning(
|
| 97 |
f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with status {rp_resp.status_code}. Retrying..."
|
| 98 |
)
|
| 99 |
await rp_resp.aclose()
|
| 100 |
-
rp_resp = None # Ensure response is not carried over
|
| 101 |
|
| 102 |
except httpx.ConnectError as e:
|
| 103 |
last_exception = e
|
| 104 |
logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with connection error: {e}")
|
| 105 |
|
| 106 |
-
if rp_resp is None:
|
| 107 |
-
duration_ms = (time.monotonic() - start_time) * 1000
|
| 108 |
-
logging.critical(f"Request failed, cannot connect to target: {request.method} {request.url.path} status_code=502 latency={duration_ms:.2f}ms")
|
| 109 |
-
raise HTTPException(
|
| 110 |
-
status_code=502,
|
| 111 |
-
detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}"
|
| 112 |
-
)
|
| 113 |
-
|
| 114 |
duration_ms = (time.monotonic() - start_time) * 1000
|
| 115 |
-
|
| 116 |
-
|
| 117 |
-
|
| 118 |
-
|
| 119 |
-
""
|
| 120 |
-
Streams the response body. If the first chunk takes too long,
|
| 121 |
-
it sends a processing message first.
|
| 122 |
-
"""
|
| 123 |
-
response_iterator = response.aiter_raw()
|
| 124 |
-
first_chunk = None
|
| 125 |
-
try:
|
| 126 |
-
# Wait for the first chunk of the body with a timeout
|
| 127 |
-
first_chunk = await asyncio.wait_for(response_iterator.__anext__(), timeout=1.5)
|
| 128 |
-
except asyncio.TimeoutError:
|
| 129 |
-
# If timeout occurs, send the processing message
|
| 130 |
-
logging.warning(f"Response from target timed out. Sending processing message for {url.path}")
|
| 131 |
-
processing_message = ':NiansuhAI Proccessing:\n\n'
|
| 132 |
-
yield processing_message.encode('utf-8')
|
| 133 |
-
except StopAsyncIteration:
|
| 134 |
-
# The response body is empty
|
| 135 |
-
pass
|
| 136 |
-
|
| 137 |
-
if first_chunk is not None:
|
| 138 |
-
yield first_chunk
|
| 139 |
-
|
| 140 |
-
# Yield the rest of the body
|
| 141 |
-
async for chunk in response_iterator:
|
| 142 |
-
yield chunk
|
| 143 |
-
|
| 144 |
-
final_duration_ms = (time.monotonic() - start_time) * 1000
|
| 145 |
-
logging.info(f"Request finished streaming: {request.method} {request.url.path} status_code={response.status_code} total_latency={final_duration_ms:.2f}ms")
|
| 146 |
-
|
| 147 |
-
return StreamingResponse(
|
| 148 |
-
body_generator(rp_resp),
|
| 149 |
-
status_code=rp_resp.status_code,
|
| 150 |
-
headers=rp_resp.headers,
|
| 151 |
-
background=BackgroundTask(rp_resp.aclose),
|
| 152 |
)
|
|
|
|
| 7 |
import logging
|
| 8 |
import time
|
| 9 |
from contextlib import asynccontextmanager
|
|
|
|
| 10 |
|
| 11 |
# --- Production-Ready Configuration ---
|
| 12 |
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
|
|
|
| 45 |
# --- API Endpoints ---
|
| 46 |
|
| 47 |
# 1. Health Check Route (Defined FIRST)
|
| 48 |
+
# This specific route will be matched before the catch-all proxy route.
|
| 49 |
@app.get("/")
|
| 50 |
async def health_check():
|
| 51 |
"""Provides a basic health check endpoint."""
|
| 52 |
return JSONResponse({"status": "ok", "target": TARGET_URL})
|
| 53 |
|
| 54 |
# 2. Catch-All Reverse Proxy Route (Defined SECOND)
|
| 55 |
+
# This will capture ALL other requests (e.g., /completions, /v1/models, etc.)
|
| 56 |
+
# and forward them. This eliminates any redirect issues.
|
| 57 |
@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
|
| 58 |
async def reverse_proxy_handler(request: Request):
|
| 59 |
"""
|
| 60 |
A catch-all reverse proxy that forwards requests to the target URL with
|
| 61 |
+
enhanced retry logic and latency logging.
|
| 62 |
"""
|
| 63 |
start_time = time.monotonic()
|
| 64 |
|
|
|
|
| 84 |
body = await request.body()
|
| 85 |
|
| 86 |
last_exception = None
|
|
|
|
| 87 |
for attempt in range(MAX_RETRIES):
|
| 88 |
try:
|
| 89 |
rp_req = client.build_request(
|
|
|
|
| 92 |
rp_resp = await client.send(rp_req, stream=True)
|
| 93 |
|
| 94 |
if rp_resp.status_code not in RETRY_STATUS_CODES or attempt == MAX_RETRIES - 1:
|
| 95 |
+
duration_ms = (time.monotonic() - start_time) * 1000
|
| 96 |
+
log_func = logging.info if rp_resp.is_success else logging.warning
|
| 97 |
+
log_func(f"Request finished: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
|
| 98 |
+
|
| 99 |
+
return StreamingResponse(
|
| 100 |
+
rp_resp.aiter_raw(),
|
| 101 |
+
status_code=rp_resp.status_code,
|
| 102 |
+
headers=rp_resp.headers,
|
| 103 |
+
background=BackgroundTask(rp_resp.aclose),
|
| 104 |
+
)
|
| 105 |
|
| 106 |
logging.warning(
|
| 107 |
f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with status {rp_resp.status_code}. Retrying..."
|
| 108 |
)
|
| 109 |
await rp_resp.aclose()
|
|
|
|
| 110 |
|
| 111 |
except httpx.ConnectError as e:
|
| 112 |
last_exception = e
|
| 113 |
logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with connection error: {e}")
|
| 114 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 115 |
duration_ms = (time.monotonic() - start_time) * 1000
|
| 116 |
+
logging.critical(f"Request failed, cannot connect to target: {request.method} {request.url.path} status_code=502 latency={duration_ms:.2f}ms")
|
| 117 |
+
|
| 118 |
+
raise HTTPException(
|
| 119 |
+
status_code=502,
|
| 120 |
+
detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 121 |
)
|