Spaces:
Runtime error
Runtime error
| import os, uuid | |
| import tempfile | |
| import requests | |
| from mimetypes import guess_extension | |
| from PIL import Image | |
| from io import BytesIO | |
| import subprocess | |
| from fastapi import ( | |
| FastAPI, | |
| UploadFile, | |
| File, | |
| HTTPException, | |
| Response, | |
| Request, | |
| BackgroundTasks, | |
| ) | |
| from typing import List, Optional | |
| import asyncio, aiofiles | |
| from fastapi.responses import StreamingResponse, FileResponse | |
| app = FastAPI() | |
| def download_image(image_url: str): | |
| # Create a temporary directory | |
| temp_dir = tempfile.mkdtemp() | |
| # Get the image content | |
| response = requests.get(image_url) | |
| response.raise_for_status() | |
| # Detect the image type | |
| image = Image.open(BytesIO(response.content)) | |
| image_format = image.format.lower() | |
| image_extension = guess_extension(f"image/{image_format}") | |
| if image_extension is None: | |
| raise ValueError("Cannot detect image file type.") | |
| # Define the image file path | |
| image_path = os.path.join(temp_dir, f"image{image_extension}") | |
| # Save the image file | |
| with open(image_path, "wb") as image_file: | |
| image_file.write(response.content) | |
| # Return the image path and dimensions | |
| return image_path, image.size | |
| def make_effect( | |
| image_link: str, | |
| filename: str, | |
| frame_rate: int, | |
| duration: int, | |
| quality: int, | |
| ssaa: float, | |
| raw: bool, | |
| ): | |
| # Download the image and get its dimensions | |
| image_path, (width, height) = download_image(image_url=image_link) | |
| print(f"Image path: {image_path}, Width: {width}, Height: {height}", "#" * 100) | |
| # Define the output video file path | |
| destination = os.path.join("/tmp/Video", filename) | |
| # Create the destination directory if it doesn't exist | |
| os.makedirs(os.path.dirname(destination), exist_ok=True) | |
| # Build the depthflow command | |
| command = [ | |
| "depthflow", | |
| "input", | |
| "-i", | |
| image_path, | |
| "main", | |
| "-f", | |
| str(frame_rate), | |
| "-t", | |
| str(duration), | |
| "--width", | |
| str(width), | |
| "--height", | |
| str(height), | |
| "--quality", | |
| str(quality), | |
| "--ssaa", | |
| str(ssaa), | |
| "--benchmark", | |
| ] | |
| if raw: | |
| command.append("--raw") | |
| command.extend(["--output", destination]) | |
| # Execute the depthflow command | |
| subprocess.run(command, check=True) | |
| return destination | |
| async def generate_video( | |
| background_task: BackgroundTasks, | |
| image_link: str = None, | |
| frame_rate: int = 30, | |
| duration: int = 3, | |
| quality: int = 10, | |
| ssaa: float = 0.75, | |
| raw: bool = True, | |
| ): | |
| filename = f"{str(uuid.uuid4())}.mp4" | |
| try: | |
| background_task.add_task( | |
| make_effect, image_link, filename, frame_rate, duration, quality, ssaa, raw | |
| ) | |
| return {"output_file": filename} | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def download_video(filename: str, request: Request): | |
| video_directory = "/tmp/Video" | |
| video_path = os.path.join(video_directory, filename) | |
| if not os.path.isfile(video_path): | |
| raise HTTPException(status_code=404, detail="Video not found") | |
| range_header = request.headers.get("Range", None) | |
| video_size = os.path.getsize(video_path) | |
| if range_header: | |
| start, end = range_header.strip().split("=")[1].split("-") | |
| start = int(start) | |
| end = video_size if end == "" else int(end) | |
| headers = { | |
| "Content-Range": f"bytes {start}-{end}/{video_size}", | |
| "Accept-Ranges": "bytes", | |
| } | |
| content = read_file_range(video_path, start, end) | |
| return StreamingResponse(content, media_type="video/mp4", headers=headers) | |
| return FileResponse(video_path, media_type="video/mp4") | |
| async def read_file_range(path, start, end): | |
| async with aiofiles.open(path, "rb") as file: | |
| await file.seek(start) | |
| while True: | |
| data = await file.read(1024 * 1024) # read in chunks of 1MB | |
| if not data or await file.tell() > end: | |
| break | |
| yield data | |