from fastapi import FastAPI, File, UploadFile, Request from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse import requests import asyncio from typing import Dict import os import mimetypes app = FastAPI() HTML_CONTENT = """ PRO Uploader

PRO Uploader

or drag and drop file here/paste image

All file types are supported
×

Upload History

×
""" @app.get("/", response_class=HTMLResponse) async def index(): return HTML_CONTENT @app.post("/upload") async def handle_upload(file: UploadFile = File(...)): if not file.filename: return JSONResponse(content={"error": "No file selected."}, status_code=400) cookies = await get_cookies() if 'csrftoken' not in cookies or 'sessionid' not in cookies: return JSONResponse(content={"error": "Failed to get cookies from Replicate."}, status_code=500) original_extension = os.path.splitext(file.filename)[1][1:] supported_types = ['mp4', 'png', 'jpg', 'jpeg', 'gif', 'mp3', 'pdf', 'txt'] if original_extension.lower() in supported_types: temp_filename = file.filename content_type = file.content_type else: temp_filename = f"{file.filename}.png" content_type = "image/png" upload_result = await initiate_upload(cookies, temp_filename, content_type) if not upload_result or 'upload_url' not in upload_result: return JSONResponse(content={"error": "Failed to initiate upload with Replicate."}, status_code=500) # Log the real Replicate URL print(f"Real Replicate URL: {upload_result['serving_url']}") file_content = await file.read() upload_success = await retry_upload(upload_result['upload_url'], file_content, content_type) if not upload_success: return JSONResponse(content={"error": "File upload to Replicate failed after multiple attempts."}, status_code=500) original_url = upload_result['serving_url'] mirrored_url = f"/upload/{original_url.split('/pbxt/')[1]}" if original_extension.lower() not in supported_types: mirrored_url = mirrored_url.replace('.png', '') return JSONResponse(content={"url": mirrored_url, "originalExtension": original_extension}) @app.get("/upload/{path:path}") async def handle_file_stream(path: str, request: Request): original_url = f'https://replicate.delivery/pbxt/{path}' if not path.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.mp4', '.mp3', '.pdf', '.txt')): original_url += '.png' range_header = request.headers.get('Range') headers = {'Range': range_header} if range_header else {} try: response = requests.get(original_url, headers=headers, stream=True) response.raise_for_status() def generate(): for chunk in response.iter_content(chunk_size=8192): yield chunk response_headers = dict(response.headers) response_headers['Access-Control-Allow-Origin'] = '*' response_headers['Content-Disposition'] = 'inline' if response.status_code == 206: response_headers['Content-Range'] = response.headers.get('Content-Range') original_extension = os.path.splitext(path)[1][1:] content_type, _ = mimetypes.guess_type(f"file.{original_extension}") if content_type: response_headers['Content-Type'] = content_type return StreamingResponse(generate(), status_code=response.status_code, headers=response_headers) except requests.exceptions.RequestException as e: return JSONResponse(content={"error": f"Failed to fetch file from Replicate: {e}"}, status_code=500) @app.get("/embed") async def embed_video(url: str, thumbnail: str): html = f''' ''' return HTMLResponse(content=html) async def get_cookies() -> Dict[str, str]: try: response = requests.get('https://replicate.com/levelsio/neon-tokyo', headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36' }) response.raise_for_status() return dict(response.cookies) except requests.exceptions.RequestException as e: print(f'Error fetching the page: {e}') return {} async def initiate_upload(cookies: Dict[str, str], filename: str, content_type: str) -> Dict: url = f'https://replicate.com/api/upload/{filename}?content_type={content_type}' try: response = requests.post(url, cookies=cookies, headers={ 'X-CSRFToken': cookies.get('csrftoken'), 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36', 'Referer': 'https://replicate.com/levelsio/neon-tokyo', 'Origin': 'https://replicate.com', 'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate, br', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', }) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f'Error initiating upload: {e}') return {} async def upload_file_to_gcs(upload_url: str, file_content: bytes, content_type: str) -> bool: try: response = requests.put(upload_url, data=file_content, headers={'Content-Type': content_type}) response.raise_for_status() return response.status_code == 200 except requests.exceptions.RequestException as e: print(f'Error uploading file: {e}') return False async def retry_upload(upload_url: str, file_content: bytes, content_type: str, max_retries: int = 5, delay: int = 1) -> bool: for attempt in range(max_retries): try: success = await upload_file_to_gcs(upload_url, file_content, content_type) if success: return True print(f"Upload attempt {attempt + 1} failed. Retrying in {delay} seconds...") except Exception as e: print(f"Error during upload attempt {attempt + 1}: {e}") await asyncio.sleep(delay) delay = min(delay * 2, 60) return False