rkihacker commited on
Commit
9e5e128
·
verified ·
1 Parent(s): 5e05fcc

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +50 -19
main.py CHANGED
@@ -7,6 +7,7 @@ import random
7
  import logging
8
  import time
9
  from contextlib import asynccontextmanager
 
10
 
11
  # --- Production-Ready Configuration ---
12
  LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
@@ -45,20 +46,17 @@ app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
45
  # --- API Endpoints ---
46
 
47
  # 1. Health Check Route (Defined FIRST)
48
- # This specific route will be matched before the catch-all proxy route.
49
  @app.get("/")
50
  async def health_check():
51
  """Provides a basic health check endpoint."""
52
  return JSONResponse({"status": "ok", "target": TARGET_URL})
53
 
54
  # 2. Catch-All Reverse Proxy Route (Defined SECOND)
55
- # This will capture ALL other requests (e.g., /completions, /v1/models, etc.)
56
- # and forward them. This eliminates any redirect issues.
57
  @app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
58
  async def reverse_proxy_handler(request: Request):
59
  """
60
  A catch-all reverse proxy that forwards requests to the target URL with
61
- enhanced retry logic and latency logging.
62
  """
63
  start_time = time.monotonic()
64
 
@@ -84,6 +82,7 @@ async def reverse_proxy_handler(request: Request):
84
  body = await request.body()
85
 
86
  last_exception = None
 
87
  for attempt in range(MAX_RETRIES):
88
  try:
89
  rp_req = client.build_request(
@@ -92,30 +91,62 @@ async def reverse_proxy_handler(request: Request):
92
  rp_resp = await client.send(rp_req, stream=True)
93
 
94
  if rp_resp.status_code not in RETRY_STATUS_CODES or attempt == MAX_RETRIES - 1:
95
- duration_ms = (time.monotonic() - start_time) * 1000
96
- log_func = logging.info if rp_resp.is_success else logging.warning
97
- log_func(f"Request finished: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
98
-
99
- return StreamingResponse(
100
- rp_resp.aiter_raw(),
101
- status_code=rp_resp.status_code,
102
- headers=rp_resp.headers,
103
- background=BackgroundTask(rp_resp.aclose),
104
- )
105
 
106
  logging.warning(
107
  f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with status {rp_resp.status_code}. Retrying..."
108
  )
109
  await rp_resp.aclose()
 
110
 
111
  except httpx.ConnectError as e:
112
  last_exception = e
113
  logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with connection error: {e}")
114
 
 
 
 
 
 
 
 
 
115
  duration_ms = (time.monotonic() - start_time) * 1000
116
- logging.critical(f"Request failed, cannot connect to target: {request.method} {request.url.path} status_code=502 latency={duration_ms:.2f}ms")
117
-
118
- raise HTTPException(
119
- status_code=502,
120
- detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
  )
 
7
  import logging
8
  import time
9
  from contextlib import asynccontextmanager
10
+ import asyncio
11
 
12
  # --- Production-Ready Configuration ---
13
  LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
 
46
  # --- API Endpoints ---
47
 
48
  # 1. Health Check Route (Defined FIRST)
 
49
  @app.get("/")
50
  async def health_check():
51
  """Provides a basic health check endpoint."""
52
  return JSONResponse({"status": "ok", "target": TARGET_URL})
53
 
54
  # 2. Catch-All Reverse Proxy Route (Defined SECOND)
 
 
55
  @app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
56
  async def reverse_proxy_handler(request: Request):
57
  """
58
  A catch-all reverse proxy that forwards requests to the target URL with
59
+ enhanced retry logic, latency logging, and an initial processing message on delay.
60
  """
61
  start_time = time.monotonic()
62
 
 
82
  body = await request.body()
83
 
84
  last_exception = None
85
+ rp_resp = None
86
  for attempt in range(MAX_RETRIES):
87
  try:
88
  rp_req = client.build_request(
 
91
  rp_resp = await client.send(rp_req, stream=True)
92
 
93
  if rp_resp.status_code not in RETRY_STATUS_CODES or attempt == MAX_RETRIES - 1:
94
+ break # Exit loop on success or last retry
 
 
 
 
 
 
 
 
 
95
 
96
  logging.warning(
97
  f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with status {rp_resp.status_code}. Retrying..."
98
  )
99
  await rp_resp.aclose()
100
+ rp_resp = None # Ensure response is not carried over
101
 
102
  except httpx.ConnectError as e:
103
  last_exception = e
104
  logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with connection error: {e}")
105
 
106
+ if rp_resp is None:
107
+ duration_ms = (time.monotonic() - start_time) * 1000
108
+ logging.critical(f"Request failed, cannot connect to target: {request.method} {request.url.path} status_code=502 latency={duration_ms:.2f}ms")
109
+ raise HTTPException(
110
+ status_code=502,
111
+ detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}"
112
+ )
113
+
114
  duration_ms = (time.monotonic() - start_time) * 1000
115
+ log_func = logging.info if rp_resp.is_success else logging.warning
116
+ log_func(f"Request headers received: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
117
+
118
+ async def body_generator(response: httpx.Response):
119
+ """
120
+ Streams the response body. If the first chunk takes too long,
121
+ it sends a processing message first.
122
+ """
123
+ response_iterator = response.aiter_raw()
124
+ first_chunk = None
125
+ try:
126
+ # Wait for the first chunk of the body with a timeout
127
+ first_chunk = await asyncio.wait_for(response_iterator.__anext__(), timeout=1.5)
128
+ except asyncio.TimeoutError:
129
+ # If timeout occurs, send the processing message
130
+ logging.warning(f"Response from target timed out. Sending processing message for {url.path}")
131
+ processing_message = ':NiansuhAI Proccessing:\n\n'
132
+ yield processing_message.encode('utf-8')
133
+ except StopAsyncIteration:
134
+ # The response body is empty
135
+ pass
136
+
137
+ if first_chunk is not None:
138
+ yield first_chunk
139
+
140
+ # Yield the rest of the body
141
+ async for chunk in response_iterator:
142
+ yield chunk
143
+
144
+ final_duration_ms = (time.monotonic() - start_time) * 1000
145
+ logging.info(f"Request finished streaming: {request.method} {request.url.path} status_code={response.status_code} total_latency={final_duration_ms:.2f}ms")
146
+
147
+ return StreamingResponse(
148
+ body_generator(rp_resp),
149
+ status_code=rp_resp.status_code,
150
+ headers=rp_resp.headers,
151
+ background=BackgroundTask(rp_resp.aclose),
152
  )