rkihacker commited on
Commit
5bad7a1
·
verified ·
1 Parent(s): 7ee09b9

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +24 -55
main.py CHANGED
@@ -1,6 +1,6 @@
1
  import httpx
2
  from fastapi import FastAPI, Request, HTTPException
3
- from starlette.responses import StreamingResponse
4
  from starlette.background import BackgroundTask
5
  import os
6
  import random
@@ -9,34 +9,23 @@ import time
9
  from contextlib import asynccontextmanager
10
 
11
  # --- Production-Ready Configuration ---
12
- # All key settings are now configurable via environment variables.
13
-
14
- # 1. Logging Configuration
15
  LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
16
  logging.basicConfig(
17
  level=LOG_LEVEL,
18
  format='%(asctime)s - %(levelname)s - %(message)s'
19
  )
20
 
21
- # 2. Target URL
22
  TARGET_URL = os.getenv("TARGET_URL", "https://api.gmi-serving.com/v1/chat")
23
-
24
- # 3. Retry Logic Configuration
25
- # Default to 7 retries as requested.
26
  MAX_RETRIES = int(os.getenv("MAX_RETRIES", "7"))
27
-
28
- # Default retry codes now include 500. Configurable via a comma-separated string.
29
  DEFAULT_RETRY_CODES = "429,500,502,503,504"
30
  RETRY_CODES_STR = os.getenv("RETRY_CODES", DEFAULT_RETRY_CODES)
31
  try:
32
- # Parse the comma-separated string into a set of integers.
33
  RETRY_STATUS_CODES = {int(code.strip()) for code in RETRY_CODES_STR.split(',')}
34
  logging.info(f"Will retry on the following status codes: {RETRY_STATUS_CODES}")
35
  except ValueError:
36
  logging.error(f"Invalid RETRY_CODES format: '{RETRY_CODES_STR}'. Falling back to default: {DEFAULT_RETRY_CODES}")
37
  RETRY_STATUS_CODES = {int(code.strip()) for code in DEFAULT_RETRY_CODES.split(',')}
38
 
39
-
40
  # --- Helper Function ---
41
  def generate_random_ip():
42
  """Generates a random, valid-looking IPv4 address."""
@@ -46,7 +35,6 @@ def generate_random_ip():
46
  @asynccontextmanager
47
  async def lifespan(app: FastAPI):
48
  """Manages the lifecycle of the HTTPX client."""
49
- # Using a longer timeout for the client itself, but no timeout per-request.
50
  async with httpx.AsyncClient(base_url=TARGET_URL, timeout=None) as client:
51
  app.state.http_client = client
52
  yield
@@ -54,23 +42,34 @@ async def lifespan(app: FastAPI):
54
  # Initialize the FastAPI app with the lifespan manager and disabled docs
55
  app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
56
 
57
- # --- Reverse Proxy Logic ---
58
- async def _reverse_proxy(request: Request):
 
 
 
 
 
 
 
 
 
 
 
 
59
  """
60
- Forwards a request to the target URL with enhanced retry logic and latency logging.
 
61
  """
62
- # Start timer for latency tracking. time.monotonic is used for reliable duration measurement.
63
  start_time = time.monotonic()
64
 
65
  client: httpx.AsyncClient = request.app.state.http_client
66
  url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
67
 
68
- # --- Header Processing ---
69
  request_headers = dict(request.headers)
70
  request_headers.pop("host", None)
71
 
72
  random_ip = generate_random_ip()
73
- logging.info(f"Client '{request.client.host}' proxied with spoofed IP: {random_ip}")
74
 
75
  specific_headers = {
76
  "accept": "application/json, text/plain, */*",
@@ -89,7 +88,6 @@ async def _reverse_proxy(request: Request):
89
 
90
  body = await request.body()
91
 
92
- # --- Retry Logic ---
93
  last_exception = None
94
  for attempt in range(MAX_RETRIES):
95
  try:
@@ -98,23 +96,10 @@ async def _reverse_proxy(request: Request):
98
  )
99
  rp_resp = await client.send(rp_req, stream=True)
100
 
101
- # If status is successful or not in our retry list, we are done.
102
- if rp_resp.status_code not in RETRY_STATUS_CODES:
103
- # Log latency and success before returning
104
  duration_ms = (time.monotonic() - start_time) * 1000
105
- logging.info(f"Request finished: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
106
-
107
- return StreamingResponse(
108
- rp_resp.aiter_raw(),
109
- status_code=rp_resp.status_code,
110
- headers=rp_resp.headers,
111
- background=BackgroundTask(rp_resp.aclose),
112
- )
113
-
114
- # If we are on the last attempt, return the error response without retrying further.
115
- if attempt == MAX_RETRIES - 1:
116
- duration_ms = (time.monotonic() - start_time) * 1000
117
- logging.error(f"Request failed after max retries: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
118
 
119
  return StreamingResponse(
120
  rp_resp.aiter_raw(),
@@ -123,35 +108,19 @@ async def _reverse_proxy(request: Request):
123
  background=BackgroundTask(rp_resp.aclose),
124
  )
125
 
126
- # Log the retry attempt before closing the response and looping again.
127
  logging.warning(
128
- f"Attempt {attempt + 1}/{MAX_RETRIES} failed with status {rp_resp.status_code}. Retrying..."
129
  )
130
  await rp_resp.aclose()
131
 
132
  except httpx.ConnectError as e:
133
  last_exception = e
134
- logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} failed with connection error: {e}")
135
 
136
- # This block is reached if all attempts fail with a connection error.
137
  duration_ms = (time.monotonic() - start_time) * 1000
138
  logging.critical(f"Request failed, cannot connect to target: {request.method} {request.url.path} status_code=502 latency={duration_ms:.2f}ms")
139
 
140
  raise HTTPException(
141
  status_code=502,
142
  detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}"
143
- )
144
-
145
- # --- API Endpoints ---
146
- @app.api_route(
147
- "/completions/{full_path:path}",
148
- methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]
149
- )
150
- async def chat_proxy_handler(request: Request):
151
- """Captures all requests under /completions/ and forwards them."""
152
- return await _reverse_proxy(request)
153
-
154
- @app.get("/")
155
- async def health_check():
156
- """Provides a basic health check endpoint."""
157
- return {"status": "ok", "proxying_endpoint": "/completions", "target": "TypeGPT"}
 
1
  import httpx
2
  from fastapi import FastAPI, Request, HTTPException
3
+ from starlette.responses import StreamingResponse, JSONResponse
4
  from starlette.background import BackgroundTask
5
  import os
6
  import random
 
9
  from contextlib import asynccontextmanager
10
 
11
  # --- Production-Ready Configuration ---
 
 
 
12
  LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
13
  logging.basicConfig(
14
  level=LOG_LEVEL,
15
  format='%(asctime)s - %(levelname)s - %(message)s'
16
  )
17
 
 
18
  TARGET_URL = os.getenv("TARGET_URL", "https://api.gmi-serving.com/v1/chat")
 
 
 
19
  MAX_RETRIES = int(os.getenv("MAX_RETRIES", "7"))
 
 
20
  DEFAULT_RETRY_CODES = "429,500,502,503,504"
21
  RETRY_CODES_STR = os.getenv("RETRY_CODES", DEFAULT_RETRY_CODES)
22
  try:
 
23
  RETRY_STATUS_CODES = {int(code.strip()) for code in RETRY_CODES_STR.split(',')}
24
  logging.info(f"Will retry on the following status codes: {RETRY_STATUS_CODES}")
25
  except ValueError:
26
  logging.error(f"Invalid RETRY_CODES format: '{RETRY_CODES_STR}'. Falling back to default: {DEFAULT_RETRY_CODES}")
27
  RETRY_STATUS_CODES = {int(code.strip()) for code in DEFAULT_RETRY_CODES.split(',')}
28
 
 
29
  # --- Helper Function ---
30
  def generate_random_ip():
31
  """Generates a random, valid-looking IPv4 address."""
 
35
  @asynccontextmanager
36
  async def lifespan(app: FastAPI):
37
  """Manages the lifecycle of the HTTPX client."""
 
38
  async with httpx.AsyncClient(base_url=TARGET_URL, timeout=None) as client:
39
  app.state.http_client = client
40
  yield
 
42
  # Initialize the FastAPI app with the lifespan manager and disabled docs
43
  app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
44
 
45
+ # --- API Endpoints ---
46
+
47
+ # 1. Health Check Route (Defined FIRST)
48
+ # This specific route will be matched before the catch-all proxy route.
49
+ @app.get("/")
50
+ async def health_check():
51
+ """Provides a basic health check endpoint."""
52
+ return JSONResponse({"status": "ok", "target": TARGET_URL})
53
+
54
+ # 2. Catch-All Reverse Proxy Route (Defined SECOND)
55
+ # This will capture ALL other requests (e.g., /completions, /v1/models, etc.)
56
+ # and forward them. This eliminates any redirect issues.
57
+ @app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
58
+ async def reverse_proxy_handler(request: Request):
59
  """
60
+ A catch-all reverse proxy that forwards requests to the target URL with
61
+ enhanced retry logic and latency logging.
62
  """
 
63
  start_time = time.monotonic()
64
 
65
  client: httpx.AsyncClient = request.app.state.http_client
66
  url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
67
 
 
68
  request_headers = dict(request.headers)
69
  request_headers.pop("host", None)
70
 
71
  random_ip = generate_random_ip()
72
+ logging.info(f"Client '{request.client.host}' proxied with spoofed IP: {random_ip} for path: {url.path}")
73
 
74
  specific_headers = {
75
  "accept": "application/json, text/plain, */*",
 
88
 
89
  body = await request.body()
90
 
 
91
  last_exception = None
92
  for attempt in range(MAX_RETRIES):
93
  try:
 
96
  )
97
  rp_resp = await client.send(rp_req, stream=True)
98
 
99
+ if rp_resp.status_code not in RETRY_STATUS_CODES or attempt == MAX_RETRIES - 1:
 
 
100
  duration_ms = (time.monotonic() - start_time) * 1000
101
+ log_func = logging.info if rp_resp.is_success else logging.warning
102
+ log_func(f"Request finished: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
 
 
 
 
 
 
 
 
 
 
 
103
 
104
  return StreamingResponse(
105
  rp_resp.aiter_raw(),
 
108
  background=BackgroundTask(rp_resp.aclose),
109
  )
110
 
 
111
  logging.warning(
112
+ f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with status {rp_resp.status_code}. Retrying..."
113
  )
114
  await rp_resp.aclose()
115
 
116
  except httpx.ConnectError as e:
117
  last_exception = e
118
+ logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with connection error: {e}")
119
 
 
120
  duration_ms = (time.monotonic() - start_time) * 1000
121
  logging.critical(f"Request failed, cannot connect to target: {request.method} {request.url.path} status_code=502 latency={duration_ms:.2f}ms")
122
 
123
  raise HTTPException(
124
  status_code=502,
125
  detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}"
126
+ )