Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.responses import JSONResponse | |
| import requests | |
| import hashlib | |
| import uuid | |
| import json | |
| import asyncio | |
| from typing import Dict, Any, Optional, Tuple, List | |
| import mimetypes | |
| app = FastAPI(title="10-Image Upscaler API") | |
| # --- Configuration --- | |
| API_BASE_URL = "https://api.grid.plus" | |
| APP_ID = "808645" | |
| PLATFORM = "h5" | |
| APP_VERSION = "8.9.7" | |
| SIGNATURE_SALT = "Pg@photo_photogrid#20250225" | |
| SIGNATURE_PREFIX = "XX" | |
| COMMON_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36" | |
| MAX_IMAGES = 10 # Maximum number of images allowed per request | |
| # --- Helper Functions --- | |
| def generate_uuid() -> str: | |
| return str(uuid.uuid4()) | |
| def hash_md5(data: str) -> str: | |
| return hashlib.md5(data.encode('utf-8')).hexdigest() | |
| def hash_sha256(data: str) -> str: | |
| return hashlib.sha256(data.encode('utf-8')).hexdigest() | |
| async def fetch_current_ip(session: requests.Session) -> str: | |
| placeholder_ip = f"98.76.54.{generate_uuid()[:3].replace('-', '')}" | |
| return placeholder_ip | |
| async def generate_ghost_id_once(session: requests.Session, device_id: str) -> str: | |
| ip_address = await fetch_current_ip(session) | |
| ghost_id = hash_md5(device_id + ip_address) | |
| return ghost_id | |
| def build_headers(token: str, uid: str, device_id: str, mcc: str = "en-US") -> Dict[str, str]: | |
| return { | |
| "X-AppID": APP_ID, | |
| "X-Platform": PLATFORM, | |
| "X-Version": APP_VERSION, | |
| "X-SessionToken": token, | |
| "X-UniqueID": uid, | |
| "X-DeviceID": device_id, | |
| "X-MCC": mcc, | |
| "User-Agent": COMMON_USER_AGENT, | |
| } | |
| def _value_to_string_for_signature(value: Any) -> str: | |
| if isinstance(value, bool): | |
| return "true" if value else "false" | |
| if value is None: | |
| return "null" | |
| return str(value) | |
| async def create_signature(data_obj: Dict[str, Any], step_name: str) -> str: | |
| sorted_keys = sorted(data_obj.keys()) | |
| concatenated_string = "".join(key + _value_to_string_for_signature(data_obj[key]) for key in sorted_keys) | |
| string_to_hash = concatenated_string + SIGNATURE_SALT | |
| signature = SIGNATURE_PREFIX + hash_sha256(string_to_hash) | |
| return signature | |
| async def http_request( | |
| session: requests.Session, | |
| endpoint: str, | |
| data: Optional[Dict[str, Any]], | |
| method: str, | |
| content_type_key: str, | |
| cookies: Dict[str, str], | |
| precomputed_ghost_id: str, | |
| step_name: str, | |
| is_s3_upload: bool = False | |
| ) -> Tuple[Optional[Dict[str, Any]], int, Optional[requests.Response]]: | |
| url = API_BASE_URL + endpoint if not is_s3_upload else endpoint | |
| if is_s3_upload: | |
| s3_headers = {"Content-Type": data.get("Content-Type", "application/octet-stream")} | |
| request_kwargs = {"method": "PUT", "url": url, "headers": s3_headers, "data": data.get('file_content'), "timeout": 60} | |
| else: | |
| default_headers = build_headers( | |
| token=cookies.get("t", ""), | |
| uid=cookies.get("u", ""), | |
| device_id=cookies.get("did", "") | |
| ) | |
| ghost_id = precomputed_ghost_id | |
| x_headers_for_sig = {k: v for k, v in default_headers.items() if k.startswith("X-")} | |
| signable_payload_parts = {k: v for k, v in data.items() if k != 'file'} if content_type_key == "MULTIPART" else data or {} | |
| data_to_sign = {**x_headers_for_sig, "X-GhostID": ghost_id, **signable_payload_parts} | |
| signature = await create_signature(data_to_sign, step_name) | |
| final_headers = {**default_headers, "sig": signature, "X-GhostID": ghost_id} | |
| if content_type_key != "MULTIPART": | |
| content_type_map = {"JSON": "application/json", "FORM": "application/x-www-form-urlencoded"} | |
| final_headers["Content-Type"] = content_type_map.get(content_type_key, "application/json") | |
| request_kwargs = {"method": method.upper(), "url": url, "headers": final_headers, "cookies": cookies, "timeout": 30} | |
| if data: | |
| if content_type_key == "MULTIPART": | |
| request_kwargs["files"] = {'file': data.get('file')} if data.get('file') else None | |
| request_kwargs["data"] = {k: str(v) for k, v in data.items() if k != 'file'} | |
| elif content_type_key == "JSON": | |
| request_kwargs["json"] = data | |
| else: | |
| request_kwargs["data"] = data | |
| try: | |
| response = await asyncio.to_thread(session.request, **request_kwargs) | |
| if is_s3_upload: | |
| return ({"s3_status": "success"} if response.ok else {"s3_status": "failed", "raw_text": response.text}, response.status_code, response) | |
| try: | |
| return response.json(), response.status_code, response | |
| except json.JSONDecodeError: | |
| return {"raw_text": response.text, "code": 0 if response.ok else response.status_code, "_non_json_response": True}, response.status_code, response | |
| except requests.RequestException as e: | |
| status_code = e.response.status_code if e.response else 0 | |
| response_text = e.response.text if e.response else "" | |
| return {"error_message": str(e), "raw_text": response_text, "code": -1, "_request_exception": True}, status_code, e.response | |
| async def upscale_image(image_bytes: bytes, file_extension: str, filename: str) -> Dict[str, Any]: | |
| async with asyncio.Semaphore(2): # Allow up to 2 concurrent requests to the external API | |
| with requests.Session() as session: | |
| device_id = generate_uuid() | |
| cookies = {"did": device_id, "t": "", "u": "", "_vid": generate_uuid()} | |
| flow_ghost_id = await generate_ghost_id_once(session, device_id) | |
| if file_extension == "jpg": file_extension = "jpeg" | |
| get_upload_url_payload = {'ext': file_extension, 'method': 'wn_superresolution'} | |
| s3_url_response_data, status, _ = await http_request( | |
| session, "/v1/ai/web/nologin/getuploadurl", get_upload_url_payload, "POST", "MULTIPART", cookies, flow_ghost_id, "GetUploadURL" | |
| ) | |
| if not s3_url_response_data or s3_url_response_data.get("code") != 0: | |
| return {"filename": filename, "error": "Failed to get S3 upload URL", "status_code": status} | |
| s3_upload_url = s3_url_response_data.get("data", {}).get("upload_url") | |
| s3_img_url = s3_url_response_data.get("data", {}).get("img_url") | |
| if not s3_upload_url or not s3_img_url: | |
| return {"filename": filename, "error": "Missing S3 upload URL or image URL", "status_code": 500} | |
| content_type_for_s3 = mimetypes.guess_type(f"file.{file_extension}")[0] or 'application/octet-stream' | |
| s3_payload = {'file_content': image_bytes, 'Content-Type': content_type_for_s3} | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| s3_upload_response_data, s3_status, _ = await http_request( | |
| session, s3_upload_url, s3_payload, "PUT", "", cookies, flow_ghost_id, "S3Upload", is_s3_upload=True | |
| ) | |
| if s3_status in [200, 201, 204]: | |
| break | |
| if attempt == max_retries - 1: | |
| return {"filename": filename, "error": "Failed to upload image to S3", "status_code": s3_status} | |
| await asyncio.sleep(2) | |
| trigger_upscale_payload = {'url': s3_img_url, 'method': 'wn_superresolution'} | |
| trigger_response_data, status, _ = await http_request( | |
| session, "/v1/ai/web/super_resolution/nologinupload", trigger_upscale_payload, "POST", "MULTIPART", cookies, flow_ghost_id, "TriggerUpscale" | |
| ) | |
| if not trigger_response_data or trigger_response_data.get("code") != 0: | |
| return {"filename": filename, "error": "Failed to trigger upscale", "status_code": status} | |
| task_id = trigger_response_data.get("task_id") or trigger_response_data.get("data", {}).get("task_id") | |
| if not task_id: | |
| return {"filename": filename, "error": "Task ID not found", "status_code": 500} | |
| max_retries, poll_interval = 20, 7 | |
| for i in range(max_retries): | |
| await asyncio.sleep(poll_interval) | |
| poll_payload = {"task_ids": [task_id]} | |
| poll_response_data, status, _ = await http_request( | |
| session, "/v1/ai/web/super_resolution/nologinbatchresult", poll_payload, "POST", "JSON", cookies, flow_ghost_id, "PollResult" | |
| ) | |
| if not poll_response_data or poll_response_data.get("code") != 0: | |
| continue | |
| results_list = poll_response_data.get("data", []) | |
| if results_list and results_list[0].get("status") in [0, 2]: | |
| upscaled_url_list = results_list[0].get("result_image_url") or results_list[0].get("image_url") | |
| if isinstance(upscaled_url_list, list) and upscaled_url_list: | |
| return {"filename": filename, "upscaled_url": upscaled_url_list[0], "status_code": 200} | |
| return {"filename": filename, "error": "Upscaled URL list empty", "status_code": 500} | |
| elif results_list and results_list[0].get("status") != 1: | |
| error_msg = results_list[0].get('fail_msg', results_list[0].get('errmsg', 'Unknown error')) | |
| return {"filename": filename, "error": f"Task failed: {error_msg}", "status_code": 500} | |
| return {"filename": filename, "error": "Max retries reached for polling", "status_code": 500} | |
| async def upscale_single(file: UploadFile = File(...)): | |
| try: | |
| file_extension = file.filename.split('.')[-1].lower() | |
| if file_extension not in ['jpg', 'jpeg', 'png']: | |
| raise HTTPException(status_code=400, detail="Unsupported file format. Use JPG, JPEG, or PNG.") | |
| image_bytes = await file.read() | |
| result = await upscale_image(image_bytes, file_extension, file.filename) | |
| if "error" in result: | |
| raise HTTPException(status_code=result["status_code"], detail=result["error"]) | |
| return JSONResponse(content={"filename": file.filename, "upscaled_url": result["upscaled_url"]}) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error processing image: {str(e)}") | |
| async def upscale_multiple(files: List[UploadFile] = File(...)): | |
| try: | |
| if len(files) > MAX_IMAGES: | |
| raise HTTPException(status_code=400, detail=f"Maximum {MAX_IMAGES} images allowed per request.") | |
| results = [] | |
| tasks = [] | |
| for file in files: | |
| file_extension = file.filename.split('.')[-1].lower() | |
| if file_extension not in ['jpg', 'jpeg', 'png']: | |
| results.append({"filename": file.filename, "error": "Unsupported file format. Use JPG, JPEG, or PNG.", "status_code": 400}) | |
| continue | |
| image_bytes = await file.read() | |
| tasks.append(upscale_image(image_bytes, file_extension, file.filename)) | |
| if tasks: | |
| upscale_results = await asyncio.gather(*tasks, return_exceptions=True) | |
| for result in upscale_results: | |
| if isinstance(result, Exception): | |
| results.append({"filename": "unknown", "error": str(result), "status_code": 500}) | |
| else: | |
| results.append(result) | |
| return JSONResponse(content={"results": results}) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error processing images: {str(e)}") | |
| async def root(): | |
| return {"message": f"Visit /upscale for single image or /upscale-multiple for up to {MAX_IMAGES} images"} |