# Conditional import for ZeroGPU support import os if os.environ.get("SPACES_ZERO_GPU") is not None: import spaces else: # Create a dummy spaces decorator for non-ZeroGPU environments class spaces: @staticmethod def GPU(*decorator_args, **decorator_kwargs): def decorator(func): def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper return decorator import gradio as gr import numpy as np import yaml import cv2 import zipfile from utils import process_video, get_npy_files, get_frame_count, process_image from infer_script import run_inference import time import datetime import shutil import imageio from media_pipe.draw_util import FaceMeshVisualizer from download_models import download import torch # Download models and check for exists download() # Check GPU availability print("="*50) print("đ GPU Status Check:") print(f" PyTorch version: {torch.__version__}") print(f" CUDA available: {torch.cuda.is_available()}") if torch.cuda.is_available(): print(f" CUDA version: {torch.version.cuda}") print(f" GPU device: {torch.cuda.get_device_name(0)}") print(f" GPU count: {torch.cuda.device_count()}") else: if os.environ.get("SPACES_ZERO_GPU"): print(" âšī¸ ZeroGPU mode - GPU will be allocated on-demand") else: print(" â ī¸ No CUDA GPU detected - will use CPU") print("="*50) PROCESSED_VIDEO_DIR = './processed_videos' TEMP_DIR = './temp' INFER_CONFIG_PATH = './configs/infer.yaml' MODEL_PATH = './ckpt_models/ckpts' OUTPUT_PATH = './output' def load_config(): with open(INFER_CONFIG_PATH, 'r') as file: return yaml.safe_load(file) def save_config(config): with open(INFER_CONFIG_PATH, 'w') as file: yaml.dump(config, file) config = load_config() def get_video_fps(video_path): video = cv2.VideoCapture(video_path) fps = video.get(cv2.CAP_PROP_FPS) video.release() return int(fps) def update_npy_choices(): npy_files = get_npy_files(PROCESSED_VIDEO_DIR) return gr.update(choices=["None"] + npy_files) def create_gif_from_npy(npy_path, gif_path): face_results = np.load(npy_path, allow_pickle=True) vis = FaceMeshVisualizer(forehead_edge=False) frames = [] for face_result in face_results: width = face_result['width'] height = face_result['height'] lmks = face_result['lmks'].astype(np.float32) frame = vis.draw_landmarks((width, height), lmks, normed=True) frames.append(frame) imageio.mimsave(gif_path, frames, 'GIF', duration=0.2, loop=0) return gif_path def show_gif_for_npy(npy_file, video_path): if npy_file and npy_file != "None": npy_path = npy_file elif video_path: video_name = os.path.splitext(os.path.basename(video_path))[0] npy_path = os.path.join(PROCESSED_VIDEO_DIR if input_video_save.value else TEMP_DIR, video_name, f"{video_name}_mppose.npy") else: return None, None, "No NPY file or video selected" if not os.path.exists(npy_path): return None, None, "NPY file not found" try: gif_path = os.path.join(os.path.dirname(npy_path), f"{os.path.splitext(os.path.basename(npy_path))[0]}_preview.gif") gif_path_align = os.path.join(os.path.dirname(npy_path), f"{os.path.splitext(os.path.basename(npy_path))[0]}_aligned.gif") create_gif_from_npy(npy_path, gif_path) return gif_path,gif_path_align, "GIF created and displayed" except Exception as e: return None, None, f"Failed to create GIF: {str(e)}" def process_input_video(video, save_to_processed): if video is None: return "No video uploaded", None, gr.update(), gr.update() video_name = os.path.splitext(os.path.basename(video))[0] if save_to_processed: save_dir = os.path.join(PROCESSED_VIDEO_DIR, video_name) else: save_dir = os.path.join(TEMP_DIR, video_name) os.makedirs(save_dir, exist_ok=True) npy_path, frame_count = process_video(video, save_dir) frame_count = frame_count - 1 fps = get_video_fps(video) return (f"Video processed. NPY file saved at {npy_path}. Original FPS: {fps}", npy_path, gr.update(maximum=frame_count, value=frame_count), gr.update(value=f"Reference video FPS: {fps}")) def update_frame_count(npy_file): if npy_file is None or npy_file == "None": return gr.update() frame_count = get_frame_count(npy_file) return gr.update(maximum=frame_count, value=frame_count) def update_gif_on_video_change(video): if video: gif_path,gif_path_align, status = show_gif_for_npy(None, video) return gif_path,gif_path_align, status return None, None, "No video selected" def toggle_fps_slider(use_custom): return gr.update(interactive=use_custom) def crop_face(image_path, should_crop_face, npy_file, video_path, expand_x, expand_y, offset_x, offset_y): if not should_crop_face: return image_path, "Face cropping not requested" if npy_file and npy_file != "None": npy_path = npy_file elif video_path: video_name = os.path.splitext(os.path.basename(video_path))[0] npy_path = os.path.join(PROCESSED_VIDEO_DIR, video_name, f"{video_name}_mppose.npy") if not os.path.exists(npy_path): npy_path = os.path.join(TEMP_DIR, video_name, f"{video_name}_mppose.npy") else: return image_path, "No NPY file or video selected for face cropping" if not os.path.exists(npy_path): return image_path, "NPY file not found for face cropping" save_dir = os.path.dirname(npy_path) cropped_image_path, motion_path = process_image(image_path, npy_path, save_dir, expand_x, expand_y, offset_x, offset_y) if cropped_image_path: return cropped_image_path, "Face cropped successfully" else: return image_path, "Face cropping failed" def preview_crop(image_path, npy_file, video_path, expand_x, expand_y, offset_x, offset_y): if not image_path: return None,None, "No image uploaded" if npy_file and npy_file != "None": npy_path = npy_file elif video_path: video_name = os.path.splitext(os.path.basename(video_path))[0] npy_path = os.path.join(PROCESSED_VIDEO_DIR, video_name, f"{video_name}_mppose.npy") if not os.path.exists(npy_path): npy_path = os.path.join(TEMP_DIR, video_name, f"{video_name}_mppose.npy") else: return None,None, "No NPY file or video selected for face cropping" if not os.path.exists(npy_path): return None,None, "NPY file not found for face cropping" save_dir = TEMP_DIR # Create if not exists os.makedirs(save_dir, exist_ok=True) cropped_image_path, motion_path = process_image(image_path, npy_path, save_dir, expand_x, expand_y, offset_x, offset_y) if cropped_image_path: return cropped_image_path,motion_path, "Crop preview generated" else: return None,None, "Failed to generate crop preview" @spaces.GPU(duration=300) def generate_video(input_img, should_crop_face, expand_x, expand_y, offset_x, offset_y, input_video_type, input_video, input_npy_select, input_npy, input_video_frames, settings_steps, settings_cfg_scale, settings_seed, resolution_w, resolution_h, model_step, custom_output_path, use_custom_fps, output_fps, callback_steps, context_frames, context_stride, context_overlap, context_batch_size, anomaly_action,intropolate_factor): print("đ Generate Video started!") print(f" Input image: {input_img}") print(f" Video type: {input_video_type}") config['resolution_w'] = resolution_w config['resolution_h'] = resolution_h config['video_length'] = input_video_frames save_config(config) if input_video_type == "video": video_name = os.path.splitext(os.path.basename(input_video))[0] lmk_path = os.path.join(PROCESSED_VIDEO_DIR if input_video_save.value else TEMP_DIR, video_name, f"{video_name}_mppose.npy") if not use_custom_fps: output_fps = 7 else: if input_npy_select != "None": lmk_path = input_npy_select else: lmk_path = input_npy video_name = os.path.splitext(os.path.basename(lmk_path))[0] if not use_custom_fps: output_fps = 7 # default FPS timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") output_folder = f"{video_name}_{timestamp}" if custom_output_path: output_path = os.path.join(custom_output_path, output_folder) else: output_path = os.path.join(OUTPUT_PATH, output_folder) os.makedirs(output_path, exist_ok=True) if should_crop_face: cropped_image_path, crop_status = crop_face(input_img, should_crop_face, input_npy_select if input_video_type == "npy" else None, input_video if input_video_type == "video" else None, expand_x, expand_y, offset_x, offset_y) print(crop_status) if cropped_image_path and os.path.exists(cropped_image_path): cropped_face_in_result = os.path.join(output_path, "cropped_face.png") shutil.copy(cropped_image_path, cropped_face_in_result) print(f"Cropped face saved in result folder: {cropped_face_in_result}") input_img = cropped_image_path try: print("đ Calling run_inference...") status, oo_video_path, all_video_path = run_inference( config_path=INFER_CONFIG_PATH, model_path=MODEL_PATH, input_path=input_img, lmk_path=lmk_path, output_path=output_path, model_step=model_step, seed=settings_seed, resolution_w=resolution_w, resolution_h=resolution_h, video_length=input_video_frames, num_inference_steps=settings_steps, guidance_scale=settings_cfg_scale, output_fps=output_fps, callback_steps=callback_steps, context_frames=context_frames, context_stride=context_stride, context_overlap=context_overlap, context_batch_size=context_batch_size, anomaly_action=anomaly_action, interpolation_factor=intropolate_factor ) print(f"â Inference completed! Status: {status}") except Exception as e: print(f"â Error in run_inference: {str(e)}") import traceback traceback.print_exc() return f"Error: {str(e)}", None, None, None frames_archive = None frames_dir = os.path.join(output_path, f"frames") if os.path.exists(frames_dir): archive_path = os.path.join(output_path, f"frames.zip") with zipfile.ZipFile(archive_path, 'w') as zipf: for root, dirs, files in os.walk(frames_dir): for file in files: zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), os.path.join(frames_dir, '..'))) frames_archive = archive_path print(f"The archive has been created: {archive_path}") else: print(f"Directory with frames not found: {frames_dir}") return status, oo_video_path, all_video_path, frames_archive with gr.Blocks() as demo: gr.Markdown("