import httpx from fastapi import FastAPI, Request, HTTPException from starlette.responses import StreamingResponse from starlette.background import BackgroundTask import os import random import logging from contextlib import asynccontextmanager # --- Logging Configuration --- # Configure logging to display INFO level messages. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # --- Configuration --- # The target URL is configurable via an environment variable. TARGET_URL = os.getenv("TARGET_URL", "https://api.gmi-serving.com/v1/chat") # Number of retries for specific error codes. MAX_RETRIES = 5 # HTTP status codes that will trigger a retry. RETRY_STATUS_CODES = {429, 502, 503, 504} # --- Helper Function --- def generate_random_ip(): """Generates a random, valid-looking IPv4 address.""" return ".".join(str(random.randint(1, 254)) for _ in range(4)) # --- HTTPX Client Lifecycle Management --- @asynccontextmanager async def lifespan(app: FastAPI): """ Manages the lifecycle of the HTTPX client. The client is created on startup and gracefully closed on shutdown. WARNING: This client has no timeout and no explicit connection pool limits. """ # timeout=None disables all client-side timeouts. # The absence of a `limits` parameter means we rely on system defaults. async with httpx.AsyncClient(base_url=TARGET_URL, timeout=None) as client: app.state.http_client = client yield # Initialize the FastAPI app with the lifespan manager and disable default docs app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan) # --- Reverse Proxy Logic --- async def _reverse_proxy(request: Request): """ Forwards a request to the target URL with retry logic and spoofed IP headers. It allows for a user-provided Authorization header and logs the spoofed IP. """ client: httpx.AsyncClient = request.app.state.http_client # Construct the URL for the outgoing request using the incoming path and query. url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8")) # --- Header Processing --- # Start with headers from the incoming request. request_headers = dict(request.headers) # 1. CRITICAL: Remove host header. # The 'host' header is managed by httpx. request_headers.pop("host", None) # 2. Get the user's authorization key from the incoming request. authorization_header = request.headers.get("authorization") # 3. Generate a random IP for spoofing headers. random_ip = generate_random_ip() # --- ADDED LOGGING --- # Log the original client IP and the spoofed IP being used for the request. logging.info(f"Client '{request.client.host}' is being proxied with spoofed IP: {random_ip}") # 4. Set the specific, required headers for the target API. # This will overwrite any conflicting headers from the original request. specific_headers = { "accept": "application/json, text/plain, */*", "accept-language": "en-US,en;q=0.9,ru;q=0.8", "content-type": "application/json", "origin": "https://console.gmicloud.ai", "priority": "u=1, i", "referer": "https://console.gmicloud.ai/playground/llm/qwen3-next-80b-a3b-thinking/a5c879a4-be0a-4621-95d3-42575238d9af?tab=playground", "sec-ch-ua": '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36", "x-forwarded-for": random_ip, "x-real-ip": random_ip, } request_headers.update(specific_headers) # 5. Add the user's authorization key to the headers if it exists. if authorization_header: request_headers["authorization"] = authorization_header # Read the request body once, as it will be reused in case of retries. body = await request.body() # --- Retry Logic --- last_exception = None for attempt in range(MAX_RETRIES): try: # Build the request for each attempt to ensure the content stream is fresh. rp_req = client.build_request( method=request.method, url=url, headers=request_headers, content=body, ) # Send the request and get a streaming response. rp_resp = await client.send(rp_req, stream=True) # If the status code is not in our retry list, we can return the response. if rp_resp.status_code not in RETRY_STATUS_CODES: return StreamingResponse( rp_resp.aiter_raw(), status_code=rp_resp.status_code, headers=rp_resp.headers, background=BackgroundTask(rp_resp.aclose), ) # If it's a retryable error but this is the last attempt, return the error response. if attempt == MAX_RETRIES - 1: return StreamingResponse( rp_resp.aiter_raw(), status_code=rp_resp.status_code, headers=rp_resp.headers, background=BackgroundTask(rp_resp.aclose), ) # Otherwise, close the unsuccessful response before trying again. await rp_resp.aclose() except httpx.ConnectError as e: last_exception = e # Continue to the next attempt if a connection error occurs. # If all retry attempts failed with a connection error, raise a final exception. raise HTTPException( status_code=502, detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}" ) # --- API Endpoint --- @app.api_route( "/completions", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"] ) async def chat_proxy_handler(request: Request): """ This endpoint captures requests specifically for the "/completions" path and forwards them through the reverse proxy. """ return await _reverse_proxy(request) # A simple root endpoint for health checks. @app.get("/") async def health_check(): """Provides a basic health check endpoint.""" return {"status": "ok", "proxying_endpoint": "/completions", "target": "TypeGPT"} # Any request to a path other than "/completions" or "/" will result in a 404 Not Found.