up6 / app.py
HIRO12121212's picture
Upload 3 files
538e75e verified
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
import requests
import hashlib
import uuid
import json
import asyncio
from typing import Dict, Any, Optional, Tuple, List
import mimetypes
app = FastAPI(title="10-Image Upscaler API")
# --- Configuration ---
API_BASE_URL = "https://api.grid.plus"
APP_ID = "808645"
PLATFORM = "h5"
APP_VERSION = "8.9.7"
SIGNATURE_SALT = "Pg@photo_photogrid#20250225"
SIGNATURE_PREFIX = "XX"
COMMON_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"
MAX_IMAGES = 10 # Maximum number of images allowed per request
# --- Helper Functions ---
def generate_uuid() -> str:
return str(uuid.uuid4())
def hash_md5(data: str) -> str:
return hashlib.md5(data.encode('utf-8')).hexdigest()
def hash_sha256(data: str) -> str:
return hashlib.sha256(data.encode('utf-8')).hexdigest()
async def fetch_current_ip(session: requests.Session) -> str:
placeholder_ip = f"98.76.54.{generate_uuid()[:3].replace('-', '')}"
return placeholder_ip
async def generate_ghost_id_once(session: requests.Session, device_id: str) -> str:
ip_address = await fetch_current_ip(session)
ghost_id = hash_md5(device_id + ip_address)
return ghost_id
def build_headers(token: str, uid: str, device_id: str, mcc: str = "en-US") -> Dict[str, str]:
return {
"X-AppID": APP_ID,
"X-Platform": PLATFORM,
"X-Version": APP_VERSION,
"X-SessionToken": token,
"X-UniqueID": uid,
"X-DeviceID": device_id,
"X-MCC": mcc,
"User-Agent": COMMON_USER_AGENT,
}
def _value_to_string_for_signature(value: Any) -> str:
if isinstance(value, bool):
return "true" if value else "false"
if value is None:
return "null"
return str(value)
async def create_signature(data_obj: Dict[str, Any], step_name: str) -> str:
sorted_keys = sorted(data_obj.keys())
concatenated_string = "".join(key + _value_to_string_for_signature(data_obj[key]) for key in sorted_keys)
string_to_hash = concatenated_string + SIGNATURE_SALT
signature = SIGNATURE_PREFIX + hash_sha256(string_to_hash)
return signature
async def http_request(
session: requests.Session,
endpoint: str,
data: Optional[Dict[str, Any]],
method: str,
content_type_key: str,
cookies: Dict[str, str],
precomputed_ghost_id: str,
step_name: str,
is_s3_upload: bool = False
) -> Tuple[Optional[Dict[str, Any]], int, Optional[requests.Response]]:
url = API_BASE_URL + endpoint if not is_s3_upload else endpoint
if is_s3_upload:
s3_headers = {"Content-Type": data.get("Content-Type", "application/octet-stream")}
request_kwargs = {"method": "PUT", "url": url, "headers": s3_headers, "data": data.get('file_content'), "timeout": 60}
else:
default_headers = build_headers(
token=cookies.get("t", ""),
uid=cookies.get("u", ""),
device_id=cookies.get("did", "")
)
ghost_id = precomputed_ghost_id
x_headers_for_sig = {k: v for k, v in default_headers.items() if k.startswith("X-")}
signable_payload_parts = {k: v for k, v in data.items() if k != 'file'} if content_type_key == "MULTIPART" else data or {}
data_to_sign = {**x_headers_for_sig, "X-GhostID": ghost_id, **signable_payload_parts}
signature = await create_signature(data_to_sign, step_name)
final_headers = {**default_headers, "sig": signature, "X-GhostID": ghost_id}
if content_type_key != "MULTIPART":
content_type_map = {"JSON": "application/json", "FORM": "application/x-www-form-urlencoded"}
final_headers["Content-Type"] = content_type_map.get(content_type_key, "application/json")
request_kwargs = {"method": method.upper(), "url": url, "headers": final_headers, "cookies": cookies, "timeout": 30}
if data:
if content_type_key == "MULTIPART":
request_kwargs["files"] = {'file': data.get('file')} if data.get('file') else None
request_kwargs["data"] = {k: str(v) for k, v in data.items() if k != 'file'}
elif content_type_key == "JSON":
request_kwargs["json"] = data
else:
request_kwargs["data"] = data
try:
response = await asyncio.to_thread(session.request, **request_kwargs)
if is_s3_upload:
return ({"s3_status": "success"} if response.ok else {"s3_status": "failed", "raw_text": response.text}, response.status_code, response)
try:
return response.json(), response.status_code, response
except json.JSONDecodeError:
return {"raw_text": response.text, "code": 0 if response.ok else response.status_code, "_non_json_response": True}, response.status_code, response
except requests.RequestException as e:
status_code = e.response.status_code if e.response else 0
response_text = e.response.text if e.response else ""
return {"error_message": str(e), "raw_text": response_text, "code": -1, "_request_exception": True}, status_code, e.response
async def upscale_image(image_bytes: bytes, file_extension: str, filename: str) -> Dict[str, Any]:
async with asyncio.Semaphore(2): # Allow up to 2 concurrent requests to the external API
with requests.Session() as session:
device_id = generate_uuid()
cookies = {"did": device_id, "t": "", "u": "", "_vid": generate_uuid()}
flow_ghost_id = await generate_ghost_id_once(session, device_id)
if file_extension == "jpg": file_extension = "jpeg"
get_upload_url_payload = {'ext': file_extension, 'method': 'wn_superresolution'}
s3_url_response_data, status, _ = await http_request(
session, "/v1/ai/web/nologin/getuploadurl", get_upload_url_payload, "POST", "MULTIPART", cookies, flow_ghost_id, "GetUploadURL"
)
if not s3_url_response_data or s3_url_response_data.get("code") != 0:
return {"filename": filename, "error": "Failed to get S3 upload URL", "status_code": status}
s3_upload_url = s3_url_response_data.get("data", {}).get("upload_url")
s3_img_url = s3_url_response_data.get("data", {}).get("img_url")
if not s3_upload_url or not s3_img_url:
return {"filename": filename, "error": "Missing S3 upload URL or image URL", "status_code": 500}
content_type_for_s3 = mimetypes.guess_type(f"file.{file_extension}")[0] or 'application/octet-stream'
s3_payload = {'file_content': image_bytes, 'Content-Type': content_type_for_s3}
max_retries = 3
for attempt in range(max_retries):
s3_upload_response_data, s3_status, _ = await http_request(
session, s3_upload_url, s3_payload, "PUT", "", cookies, flow_ghost_id, "S3Upload", is_s3_upload=True
)
if s3_status in [200, 201, 204]:
break
if attempt == max_retries - 1:
return {"filename": filename, "error": "Failed to upload image to S3", "status_code": s3_status}
await asyncio.sleep(2)
trigger_upscale_payload = {'url': s3_img_url, 'method': 'wn_superresolution'}
trigger_response_data, status, _ = await http_request(
session, "/v1/ai/web/super_resolution/nologinupload", trigger_upscale_payload, "POST", "MULTIPART", cookies, flow_ghost_id, "TriggerUpscale"
)
if not trigger_response_data or trigger_response_data.get("code") != 0:
return {"filename": filename, "error": "Failed to trigger upscale", "status_code": status}
task_id = trigger_response_data.get("task_id") or trigger_response_data.get("data", {}).get("task_id")
if not task_id:
return {"filename": filename, "error": "Task ID not found", "status_code": 500}
max_retries, poll_interval = 20, 7
for i in range(max_retries):
await asyncio.sleep(poll_interval)
poll_payload = {"task_ids": [task_id]}
poll_response_data, status, _ = await http_request(
session, "/v1/ai/web/super_resolution/nologinbatchresult", poll_payload, "POST", "JSON", cookies, flow_ghost_id, "PollResult"
)
if not poll_response_data or poll_response_data.get("code") != 0:
continue
results_list = poll_response_data.get("data", [])
if results_list and results_list[0].get("status") in [0, 2]:
upscaled_url_list = results_list[0].get("result_image_url") or results_list[0].get("image_url")
if isinstance(upscaled_url_list, list) and upscaled_url_list:
return {"filename": filename, "upscaled_url": upscaled_url_list[0], "status_code": 200}
return {"filename": filename, "error": "Upscaled URL list empty", "status_code": 500}
elif results_list and results_list[0].get("status") != 1:
error_msg = results_list[0].get('fail_msg', results_list[0].get('errmsg', 'Unknown error'))
return {"filename": filename, "error": f"Task failed: {error_msg}", "status_code": 500}
return {"filename": filename, "error": "Max retries reached for polling", "status_code": 500}
@app.post("/upscale")
async def upscale_single(file: UploadFile = File(...)):
try:
file_extension = file.filename.split('.')[-1].lower()
if file_extension not in ['jpg', 'jpeg', 'png']:
raise HTTPException(status_code=400, detail="Unsupported file format. Use JPG, JPEG, or PNG.")
image_bytes = await file.read()
result = await upscale_image(image_bytes, file_extension, file.filename)
if "error" in result:
raise HTTPException(status_code=result["status_code"], detail=result["error"])
return JSONResponse(content={"filename": file.filename, "upscaled_url": result["upscaled_url"]})
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing image: {str(e)}")
@app.post("/upscale-multiple")
async def upscale_multiple(files: List[UploadFile] = File(...)):
try:
if len(files) > MAX_IMAGES:
raise HTTPException(status_code=400, detail=f"Maximum {MAX_IMAGES} images allowed per request.")
results = []
tasks = []
for file in files:
file_extension = file.filename.split('.')[-1].lower()
if file_extension not in ['jpg', 'jpeg', 'png']:
results.append({"filename": file.filename, "error": "Unsupported file format. Use JPG, JPEG, or PNG.", "status_code": 400})
continue
image_bytes = await file.read()
tasks.append(upscale_image(image_bytes, file_extension, file.filename))
if tasks:
upscale_results = await asyncio.gather(*tasks, return_exceptions=True)
for result in upscale_results:
if isinstance(result, Exception):
results.append({"filename": "unknown", "error": str(result), "status_code": 500})
else:
results.append(result)
return JSONResponse(content={"results": results})
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing images: {str(e)}")
@app.get("/")
async def root():
return {"message": f"Visit /upscale for single image or /upscale-multiple for up to {MAX_IMAGES} images"}