flash2 / main.py
rkihacker's picture
Update main.py
d281b17 verified
raw
history blame
6.69 kB
import httpx
from fastapi import FastAPI, Request, HTTPException
from starlette.responses import StreamingResponse
from starlette.background import BackgroundTask
import os
import random
import logging
from contextlib import asynccontextmanager
# --- Logging Configuration ---
# Configure logging to display INFO level messages.
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Configuration ---
# The target URL is configurable via an environment variable.
TARGET_URL = os.getenv("TARGET_URL", "https://api.gmi-serving.com/v1/chat")
# Number of retries for specific error codes.
MAX_RETRIES = 5
# HTTP status codes that will trigger a retry.
RETRY_STATUS_CODES = {429, 502, 503, 504}
# --- Helper Function ---
def generate_random_ip():
"""Generates a random, valid-looking IPv4 address."""
return ".".join(str(random.randint(1, 254)) for _ in range(4))
# --- HTTPX Client Lifecycle Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Manages the lifecycle of the HTTPX client.
The client is created on startup and gracefully closed on shutdown.
WARNING: This client has no timeout and no explicit connection pool limits.
"""
# timeout=None disables all client-side timeouts.
# The absence of a `limits` parameter means we rely on system defaults.
async with httpx.AsyncClient(base_url=TARGET_URL, timeout=None) as client:
app.state.http_client = client
yield
# Initialize the FastAPI app with the lifespan manager and disable default docs
app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
# --- Reverse Proxy Logic ---
async def _reverse_proxy(request: Request):
"""
Forwards a request to the target URL with retry logic and spoofed IP headers.
It allows for a user-provided Authorization header and logs the spoofed IP.
"""
client: httpx.AsyncClient = request.app.state.http_client
# Construct the URL for the outgoing request using the incoming path and query.
url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
# --- Header Processing ---
# Start with headers from the incoming request.
request_headers = dict(request.headers)
# 1. CRITICAL: Remove host header.
# The 'host' header is managed by httpx.
request_headers.pop("host", None)
# 2. Get the user's authorization key from the incoming request.
authorization_header = request.headers.get("authorization")
# 3. Generate a random IP for spoofing headers.
random_ip = generate_random_ip()
# --- ADDED LOGGING ---
# Log the original client IP and the spoofed IP being used for the request.
logging.info(f"Client '{request.client.host}' is being proxied with spoofed IP: {random_ip}")
# 4. Set the specific, required headers for the target API.
# This will overwrite any conflicting headers from the original request.
specific_headers = {
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9,ru;q=0.8",
"content-type": "application/json",
"origin": "https://console.gmicloud.ai",
"priority": "u=1, i",
"referer": "https://console.gmicloud.ai/playground/llm/qwen3-next-80b-a3b-thinking/a5c879a4-be0a-4621-95d3-42575238d9af?tab=playground",
"sec-ch-ua": '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
"x-forwarded-for": random_ip,
"x-real-ip": random_ip,
}
request_headers.update(specific_headers)
# 5. Add the user's authorization key to the headers if it exists.
if authorization_header:
request_headers["authorization"] = authorization_header
# Read the request body once, as it will be reused in case of retries.
body = await request.body()
# --- Retry Logic ---
last_exception = None
for attempt in range(MAX_RETRIES):
try:
# Build the request for each attempt to ensure the content stream is fresh.
rp_req = client.build_request(
method=request.method,
url=url,
headers=request_headers,
content=body,
)
# Send the request and get a streaming response.
rp_resp = await client.send(rp_req, stream=True)
# If the status code is not in our retry list, we can return the response.
if rp_resp.status_code not in RETRY_STATUS_CODES:
return StreamingResponse(
rp_resp.aiter_raw(),
status_code=rp_resp.status_code,
headers=rp_resp.headers,
background=BackgroundTask(rp_resp.aclose),
)
# If it's a retryable error but this is the last attempt, return the error response.
if attempt == MAX_RETRIES - 1:
return StreamingResponse(
rp_resp.aiter_raw(),
status_code=rp_resp.status_code,
headers=rp_resp.headers,
background=BackgroundTask(rp_resp.aclose),
)
# Otherwise, close the unsuccessful response before trying again.
await rp_resp.aclose()
except httpx.ConnectError as e:
last_exception = e
# Continue to the next attempt if a connection error occurs.
# If all retry attempts failed with a connection error, raise a final exception.
raise HTTPException(
status_code=502,
detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}"
)
# --- API Endpoint ---
@app.api_route(
"/completions",
methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]
)
async def chat_proxy_handler(request: Request):
"""
This endpoint captures requests specifically for the "/completions" path
and forwards them through the reverse proxy.
"""
return await _reverse_proxy(request)
# A simple root endpoint for health checks.
@app.get("/")
async def health_check():
"""Provides a basic health check endpoint."""
return {"status": "ok", "proxying_endpoint": "/completions", "target": "TypeGPT"}
# Any request to a path other than "/completions" or "/" will result in a 404 Not Found.