rkihacker commited on
Commit
7ee09b9
·
verified ·
1 Parent(s): 73f517b

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +70 -49
main.py CHANGED
@@ -5,19 +5,37 @@ from starlette.background import BackgroundTask
5
  import os
6
  import random
7
  import logging
 
8
  from contextlib import asynccontextmanager
9
 
10
- # --- Logging Configuration ---
11
- # Configure logging to display INFO level messages.
12
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
13
 
14
- # --- Configuration ---
15
- # The target URL is configurable via an environment variable.
 
 
 
 
 
 
16
  TARGET_URL = os.getenv("TARGET_URL", "https://api.gmi-serving.com/v1/chat")
17
- # Number of retries for specific error codes.
18
- MAX_RETRIES = 5
19
- # HTTP status codes that will trigger a retry.
20
- RETRY_STATUS_CODES = {429, 502, 503, 504}
 
 
 
 
 
 
 
 
 
 
 
 
21
 
22
  # --- Helper Function ---
23
  def generate_random_ip():
@@ -27,61 +45,47 @@ def generate_random_ip():
27
  # --- HTTPX Client Lifecycle Management ---
28
  @asynccontextmanager
29
  async def lifespan(app: FastAPI):
30
- """
31
- Manages the lifecycle of the HTTPX client.
32
- The client is created on startup and gracefully closed on shutdown.
33
- NOTE: The client has no timeout, which is a deliberate choice for a proxy
34
- that should not impose its own timeout limits on long-running requests.
35
- """
36
  async with httpx.AsyncClient(base_url=TARGET_URL, timeout=None) as client:
37
  app.state.http_client = client
38
  yield
39
 
40
- # Initialize the FastAPI app with the lifespan manager and disable default docs
41
  app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
42
 
43
  # --- Reverse Proxy Logic ---
44
  async def _reverse_proxy(request: Request):
45
  """
46
- Forwards a request to the target URL with retry logic and spoofed IP headers.
47
- It allows for a user-provided Authorization header and logs the spoofed IP.
48
  """
 
 
 
49
  client: httpx.AsyncClient = request.app.state.http_client
50
-
51
- # Construct the URL for the outgoing request using the incoming path and query.
52
  url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
53
 
54
  # --- Header Processing ---
55
  request_headers = dict(request.headers)
56
- request_headers.pop("host", None) # The 'host' header is managed by httpx.
57
 
58
- authorization_header = request.headers.get("authorization")
59
  random_ip = generate_random_ip()
60
-
61
- logging.info(f"Client '{request.client.host}' is being proxied with spoofed IP: {random_ip}")
62
 
63
- # Set the specific, required headers for the target API.
64
  specific_headers = {
65
  "accept": "application/json, text/plain, */*",
66
  "accept-language": "en-US,en;q=0.9,ru;q=0.8",
67
  "content-type": "application/json",
68
  "origin": "https://console.gmicloud.ai",
69
- "priority": "u=1, i",
70
- "referer": "https://console.gmicloud.ai/playground/llm/qwen3-next-80b-a3b-thinking/a5c879a4-be0a-4621-95d3-42575238d9af?tab=playground",
71
- "sec-ch-ua": '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"',
72
- "sec-ch-ua-mobile": "?0",
73
- "sec-ch-ua-platform": '"Windows"',
74
- "sec-fetch-dest": "empty",
75
- "sec-fetch-mode": "cors",
76
- "sec-fetch-site": "same-origin",
77
  "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
78
  "x-forwarded-for": random_ip,
79
  "x-real-ip": random_ip,
80
  }
81
  request_headers.update(specific_headers)
82
 
83
- if authorization_header:
84
- request_headers["authorization"] = authorization_header
85
 
86
  body = await request.body()
87
 
@@ -90,14 +94,16 @@ async def _reverse_proxy(request: Request):
90
  for attempt in range(MAX_RETRIES):
91
  try:
92
  rp_req = client.build_request(
93
- method=request.method,
94
- url=url,
95
- headers=request_headers,
96
- content=body,
97
  )
98
  rp_resp = await client.send(rp_req, stream=True)
99
 
100
- if rp_resp.status_code not in RETRY_STATUS_CODES or attempt == MAX_RETRIES - 1:
 
 
 
 
 
101
  return StreamingResponse(
102
  rp_resp.aiter_raw(),
103
  status_code=rp_resp.status_code,
@@ -105,29 +111,44 @@ async def _reverse_proxy(request: Request):
105
  background=BackgroundTask(rp_resp.aclose),
106
  )
107
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
108
  await rp_resp.aclose()
109
 
110
  except httpx.ConnectError as e:
111
  last_exception = e
112
  logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} failed with connection error: {e}")
113
 
114
- # If all retry attempts failed, raise a final exception.
 
 
 
115
  raise HTTPException(
116
  status_code=502,
117
- detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. Last error: {last_exception}"
118
  )
119
 
120
-
121
- # --- API Endpoint ---
122
  @app.api_route(
123
- "/completions",
124
  methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]
125
  )
126
  async def chat_proxy_handler(request: Request):
127
- """
128
- This endpoint captures requests specifically for the "/completions" path
129
- and forwards them through the reverse proxy.
130
- """
131
  return await _reverse_proxy(request)
132
 
133
  @app.get("/")
 
5
  import os
6
  import random
7
  import logging
8
+ import time
9
  from contextlib import asynccontextmanager
10
 
11
+ # --- Production-Ready Configuration ---
12
+ # All key settings are now configurable via environment variables.
 
13
 
14
+ # 1. Logging Configuration
15
+ LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
16
+ logging.basicConfig(
17
+ level=LOG_LEVEL,
18
+ format='%(asctime)s - %(levelname)s - %(message)s'
19
+ )
20
+
21
+ # 2. Target URL
22
  TARGET_URL = os.getenv("TARGET_URL", "https://api.gmi-serving.com/v1/chat")
23
+
24
+ # 3. Retry Logic Configuration
25
+ # Default to 7 retries as requested.
26
+ MAX_RETRIES = int(os.getenv("MAX_RETRIES", "7"))
27
+
28
+ # Default retry codes now include 500. Configurable via a comma-separated string.
29
+ DEFAULT_RETRY_CODES = "429,500,502,503,504"
30
+ RETRY_CODES_STR = os.getenv("RETRY_CODES", DEFAULT_RETRY_CODES)
31
+ try:
32
+ # Parse the comma-separated string into a set of integers.
33
+ RETRY_STATUS_CODES = {int(code.strip()) for code in RETRY_CODES_STR.split(',')}
34
+ logging.info(f"Will retry on the following status codes: {RETRY_STATUS_CODES}")
35
+ except ValueError:
36
+ logging.error(f"Invalid RETRY_CODES format: '{RETRY_CODES_STR}'. Falling back to default: {DEFAULT_RETRY_CODES}")
37
+ RETRY_STATUS_CODES = {int(code.strip()) for code in DEFAULT_RETRY_CODES.split(',')}
38
+
39
 
40
  # --- Helper Function ---
41
  def generate_random_ip():
 
45
  # --- HTTPX Client Lifecycle Management ---
46
  @asynccontextmanager
47
  async def lifespan(app: FastAPI):
48
+ """Manages the lifecycle of the HTTPX client."""
49
+ # Using a longer timeout for the client itself, but no timeout per-request.
 
 
 
 
50
  async with httpx.AsyncClient(base_url=TARGET_URL, timeout=None) as client:
51
  app.state.http_client = client
52
  yield
53
 
54
+ # Initialize the FastAPI app with the lifespan manager and disabled docs
55
  app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
56
 
57
  # --- Reverse Proxy Logic ---
58
  async def _reverse_proxy(request: Request):
59
  """
60
+ Forwards a request to the target URL with enhanced retry logic and latency logging.
 
61
  """
62
+ # Start timer for latency tracking. time.monotonic is used for reliable duration measurement.
63
+ start_time = time.monotonic()
64
+
65
  client: httpx.AsyncClient = request.app.state.http_client
 
 
66
  url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
67
 
68
  # --- Header Processing ---
69
  request_headers = dict(request.headers)
70
+ request_headers.pop("host", None)
71
 
 
72
  random_ip = generate_random_ip()
73
+ logging.info(f"Client '{request.client.host}' proxied with spoofed IP: {random_ip}")
 
74
 
 
75
  specific_headers = {
76
  "accept": "application/json, text/plain, */*",
77
  "accept-language": "en-US,en;q=0.9,ru;q=0.8",
78
  "content-type": "application/json",
79
  "origin": "https://console.gmicloud.ai",
80
+ "referer": "https://console.gmicloud.ai/",
 
 
 
 
 
 
 
81
  "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
82
  "x-forwarded-for": random_ip,
83
  "x-real-ip": random_ip,
84
  }
85
  request_headers.update(specific_headers)
86
 
87
+ if "authorization" in request.headers:
88
+ request_headers["authorization"] = request.headers["authorization"]
89
 
90
  body = await request.body()
91
 
 
94
  for attempt in range(MAX_RETRIES):
95
  try:
96
  rp_req = client.build_request(
97
+ method=request.method, url=url, headers=request_headers, content=body
 
 
 
98
  )
99
  rp_resp = await client.send(rp_req, stream=True)
100
 
101
+ # If status is successful or not in our retry list, we are done.
102
+ if rp_resp.status_code not in RETRY_STATUS_CODES:
103
+ # Log latency and success before returning
104
+ duration_ms = (time.monotonic() - start_time) * 1000
105
+ logging.info(f"Request finished: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
106
+
107
  return StreamingResponse(
108
  rp_resp.aiter_raw(),
109
  status_code=rp_resp.status_code,
 
111
  background=BackgroundTask(rp_resp.aclose),
112
  )
113
 
114
+ # If we are on the last attempt, return the error response without retrying further.
115
+ if attempt == MAX_RETRIES - 1:
116
+ duration_ms = (time.monotonic() - start_time) * 1000
117
+ logging.error(f"Request failed after max retries: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
118
+
119
+ return StreamingResponse(
120
+ rp_resp.aiter_raw(),
121
+ status_code=rp_resp.status_code,
122
+ headers=rp_resp.headers,
123
+ background=BackgroundTask(rp_resp.aclose),
124
+ )
125
+
126
+ # Log the retry attempt before closing the response and looping again.
127
+ logging.warning(
128
+ f"Attempt {attempt + 1}/{MAX_RETRIES} failed with status {rp_resp.status_code}. Retrying..."
129
+ )
130
  await rp_resp.aclose()
131
 
132
  except httpx.ConnectError as e:
133
  last_exception = e
134
  logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} failed with connection error: {e}")
135
 
136
+ # This block is reached if all attempts fail with a connection error.
137
+ duration_ms = (time.monotonic() - start_time) * 1000
138
+ logging.critical(f"Request failed, cannot connect to target: {request.method} {request.url.path} status_code=502 latency={duration_ms:.2f}ms")
139
+
140
  raise HTTPException(
141
  status_code=502,
142
+ detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}"
143
  )
144
 
145
+ # --- API Endpoints ---
 
146
  @app.api_route(
147
+ "/completions/{full_path:path}",
148
  methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]
149
  )
150
  async def chat_proxy_handler(request: Request):
151
+ """Captures all requests under /completions/ and forwards them."""
 
 
 
152
  return await _reverse_proxy(request)
153
 
154
  @app.get("/")