File size: 11,801 Bytes
538e75e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
import requests
import hashlib
import uuid
import json
import asyncio
from typing import Dict, Any, Optional, Tuple, List
import mimetypes

app = FastAPI(title="10-Image Upscaler API")

# --- Configuration ---
API_BASE_URL = "https://api.grid.plus"
APP_ID = "808645"
PLATFORM = "h5"
APP_VERSION = "8.9.7"
SIGNATURE_SALT = "Pg@photo_photogrid#20250225"
SIGNATURE_PREFIX = "XX"
COMMON_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"
MAX_IMAGES = 10  # Maximum number of images allowed per request

# --- Helper Functions ---
def generate_uuid() -> str:
    return str(uuid.uuid4())

def hash_md5(data: str) -> str:
    return hashlib.md5(data.encode('utf-8')).hexdigest()

def hash_sha256(data: str) -> str:
    return hashlib.sha256(data.encode('utf-8')).hexdigest()

async def fetch_current_ip(session: requests.Session) -> str:
    placeholder_ip = f"98.76.54.{generate_uuid()[:3].replace('-', '')}"
    return placeholder_ip

async def generate_ghost_id_once(session: requests.Session, device_id: str) -> str:
    ip_address = await fetch_current_ip(session)
    ghost_id = hash_md5(device_id + ip_address)
    return ghost_id

def build_headers(token: str, uid: str, device_id: str, mcc: str = "en-US") -> Dict[str, str]:
    return {
        "X-AppID": APP_ID,
        "X-Platform": PLATFORM,
        "X-Version": APP_VERSION,
        "X-SessionToken": token,
        "X-UniqueID": uid,
        "X-DeviceID": device_id,
        "X-MCC": mcc,
        "User-Agent": COMMON_USER_AGENT,
    }

def _value_to_string_for_signature(value: Any) -> str:
    if isinstance(value, bool):
        return "true" if value else "false"
    if value is None:
        return "null"
    return str(value)

async def create_signature(data_obj: Dict[str, Any], step_name: str) -> str:
    sorted_keys = sorted(data_obj.keys())
    concatenated_string = "".join(key + _value_to_string_for_signature(data_obj[key]) for key in sorted_keys)
    string_to_hash = concatenated_string + SIGNATURE_SALT
    signature = SIGNATURE_PREFIX + hash_sha256(string_to_hash)
    return signature

async def http_request(
    session: requests.Session,
    endpoint: str,
    data: Optional[Dict[str, Any]],
    method: str,
    content_type_key: str,
    cookies: Dict[str, str],
    precomputed_ghost_id: str,
    step_name: str,
    is_s3_upload: bool = False
) -> Tuple[Optional[Dict[str, Any]], int, Optional[requests.Response]]:
    url = API_BASE_URL + endpoint if not is_s3_upload else endpoint
    if is_s3_upload:
        s3_headers = {"Content-Type": data.get("Content-Type", "application/octet-stream")}
        request_kwargs = {"method": "PUT", "url": url, "headers": s3_headers, "data": data.get('file_content'), "timeout": 60}
    else:
        default_headers = build_headers(
            token=cookies.get("t", ""),
            uid=cookies.get("u", ""),
            device_id=cookies.get("did", "")
        )
        ghost_id = precomputed_ghost_id
        x_headers_for_sig = {k: v for k, v in default_headers.items() if k.startswith("X-")}
        signable_payload_parts = {k: v for k, v in data.items() if k != 'file'} if content_type_key == "MULTIPART" else data or {}
        data_to_sign = {**x_headers_for_sig, "X-GhostID": ghost_id, **signable_payload_parts}
        signature = await create_signature(data_to_sign, step_name)
        final_headers = {**default_headers, "sig": signature, "X-GhostID": ghost_id}
        if content_type_key != "MULTIPART":
            content_type_map = {"JSON": "application/json", "FORM": "application/x-www-form-urlencoded"}
            final_headers["Content-Type"] = content_type_map.get(content_type_key, "application/json")
        request_kwargs = {"method": method.upper(), "url": url, "headers": final_headers, "cookies": cookies, "timeout": 30}
        if data:
            if content_type_key == "MULTIPART":
                request_kwargs["files"] = {'file': data.get('file')} if data.get('file') else None
                request_kwargs["data"] = {k: str(v) for k, v in data.items() if k != 'file'}
            elif content_type_key == "JSON":
                request_kwargs["json"] = data
            else:
                request_kwargs["data"] = data
    try:
        response = await asyncio.to_thread(session.request, **request_kwargs)
        if is_s3_upload:
            return ({"s3_status": "success"} if response.ok else {"s3_status": "failed", "raw_text": response.text}, response.status_code, response)
        try:
            return response.json(), response.status_code, response
        except json.JSONDecodeError:
            return {"raw_text": response.text, "code": 0 if response.ok else response.status_code, "_non_json_response": True}, response.status_code, response
    except requests.RequestException as e:
        status_code = e.response.status_code if e.response else 0
        response_text = e.response.text if e.response else ""
        return {"error_message": str(e), "raw_text": response_text, "code": -1, "_request_exception": True}, status_code, e.response

async def upscale_image(image_bytes: bytes, file_extension: str, filename: str) -> Dict[str, Any]:
    async with asyncio.Semaphore(2):  # Allow up to 2 concurrent requests to the external API
        with requests.Session() as session:
            device_id = generate_uuid()
            cookies = {"did": device_id, "t": "", "u": "", "_vid": generate_uuid()}
            flow_ghost_id = await generate_ghost_id_once(session, device_id)
            if file_extension == "jpg": file_extension = "jpeg"
            get_upload_url_payload = {'ext': file_extension, 'method': 'wn_superresolution'}
            s3_url_response_data, status, _ = await http_request(
                session, "/v1/ai/web/nologin/getuploadurl", get_upload_url_payload, "POST", "MULTIPART", cookies, flow_ghost_id, "GetUploadURL"
            )
            if not s3_url_response_data or s3_url_response_data.get("code") != 0:
                return {"filename": filename, "error": "Failed to get S3 upload URL", "status_code": status}
            s3_upload_url = s3_url_response_data.get("data", {}).get("upload_url")
            s3_img_url = s3_url_response_data.get("data", {}).get("img_url")
            if not s3_upload_url or not s3_img_url:
                return {"filename": filename, "error": "Missing S3 upload URL or image URL", "status_code": 500}
            content_type_for_s3 = mimetypes.guess_type(f"file.{file_extension}")[0] or 'application/octet-stream'
            s3_payload = {'file_content': image_bytes, 'Content-Type': content_type_for_s3}
            max_retries = 3
            for attempt in range(max_retries):
                s3_upload_response_data, s3_status, _ = await http_request(
                    session, s3_upload_url, s3_payload, "PUT", "", cookies, flow_ghost_id, "S3Upload", is_s3_upload=True
                )
                if s3_status in [200, 201, 204]:
                    break
                if attempt == max_retries - 1:
                    return {"filename": filename, "error": "Failed to upload image to S3", "status_code": s3_status}
                await asyncio.sleep(2)
            trigger_upscale_payload = {'url': s3_img_url, 'method': 'wn_superresolution'}
            trigger_response_data, status, _ = await http_request(
                session, "/v1/ai/web/super_resolution/nologinupload", trigger_upscale_payload, "POST", "MULTIPART", cookies, flow_ghost_id, "TriggerUpscale"
            )
            if not trigger_response_data or trigger_response_data.get("code") != 0:
                return {"filename": filename, "error": "Failed to trigger upscale", "status_code": status}
            task_id = trigger_response_data.get("task_id") or trigger_response_data.get("data", {}).get("task_id")
            if not task_id:
                return {"filename": filename, "error": "Task ID not found", "status_code": 500}
            max_retries, poll_interval = 20, 7
            for i in range(max_retries):
                await asyncio.sleep(poll_interval)
                poll_payload = {"task_ids": [task_id]}
                poll_response_data, status, _ = await http_request(
                    session, "/v1/ai/web/super_resolution/nologinbatchresult", poll_payload, "POST", "JSON", cookies, flow_ghost_id, "PollResult"
                )
                if not poll_response_data or poll_response_data.get("code") != 0:
                    continue
                results_list = poll_response_data.get("data", [])
                if results_list and results_list[0].get("status") in [0, 2]:
                    upscaled_url_list = results_list[0].get("result_image_url") or results_list[0].get("image_url")
                    if isinstance(upscaled_url_list, list) and upscaled_url_list:
                        return {"filename": filename, "upscaled_url": upscaled_url_list[0], "status_code": 200}
                    return {"filename": filename, "error": "Upscaled URL list empty", "status_code": 500}
                elif results_list and results_list[0].get("status") != 1:
                    error_msg = results_list[0].get('fail_msg', results_list[0].get('errmsg', 'Unknown error'))
                    return {"filename": filename, "error": f"Task failed: {error_msg}", "status_code": 500}
            return {"filename": filename, "error": "Max retries reached for polling", "status_code": 500}

@app.post("/upscale")
async def upscale_single(file: UploadFile = File(...)):
    try:
        file_extension = file.filename.split('.')[-1].lower()
        if file_extension not in ['jpg', 'jpeg', 'png']:
            raise HTTPException(status_code=400, detail="Unsupported file format. Use JPG, JPEG, or PNG.")
        image_bytes = await file.read()
        result = await upscale_image(image_bytes, file_extension, file.filename)
        if "error" in result:
            raise HTTPException(status_code=result["status_code"], detail=result["error"])
        return JSONResponse(content={"filename": file.filename, "upscaled_url": result["upscaled_url"]})
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error processing image: {str(e)}")

@app.post("/upscale-multiple")
async def upscale_multiple(files: List[UploadFile] = File(...)):
    try:
        if len(files) > MAX_IMAGES:
            raise HTTPException(status_code=400, detail=f"Maximum {MAX_IMAGES} images allowed per request.")
        
        results = []
        tasks = []
        for file in files:
            file_extension = file.filename.split('.')[-1].lower()
            if file_extension not in ['jpg', 'jpeg', 'png']:
                results.append({"filename": file.filename, "error": "Unsupported file format. Use JPG, JPEG, or PNG.", "status_code": 400})
                continue
            image_bytes = await file.read()
            tasks.append(upscale_image(image_bytes, file_extension, file.filename))
        
        if tasks:
            upscale_results = await asyncio.gather(*tasks, return_exceptions=True)
            for result in upscale_results:
                if isinstance(result, Exception):
                    results.append({"filename": "unknown", "error": str(result), "status_code": 500})
                else:
                    results.append(result)
        
        return JSONResponse(content={"results": results})
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error processing images: {str(e)}")

@app.get("/")
async def root():
    return {"message": f"Visit /upscale for single image or /upscale-multiple for up to {MAX_IMAGES} images"}