Spaces:
Running
Running
| # -*- coding:UTF-8 -*- | |
| #!/usr/bin/env python | |
| import numpy as np | |
| import gradio as gr | |
| import roop.globals | |
| from roop.core import ( | |
| start, | |
| decode_execution_providers, | |
| suggest_max_memory, | |
| suggest_execution_threads, | |
| ) | |
| from roop.processors.frame.core import get_frame_processors_modules | |
| from roop.utilities import normalize_output_path | |
| import os | |
| from PIL import Image | |
| from datetime import datetime | |
| from huggingface_hub import HfApi, login | |
| from datasets import load_dataset, Dataset | |
| import json | |
| import shutil | |
| from dotenv import load_dotenv | |
| import cv2 | |
| from insightface.app import FaceAnalysis | |
| # Load environment variables | |
| load_dotenv() | |
| # Hàm tính cosine similarity để mày so sánh "điểm tương đồng" của khuôn mặt | |
| def cosine_similarity(a, b): | |
| return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-6) | |
| # Class FaceIntegrDataset nguyên bản (cho image swap, không cần "xịn" cho video) | |
| class FaceIntegrDataset: | |
| def __init__(self, repo_id="Arrcttacsrks/face_integrData"): | |
| self.token = os.getenv('hf_token') | |
| if not self.token: | |
| raise ValueError("HF_TOKEN environment variable is not set") | |
| self.repo_id = repo_id | |
| self.api = HfApi() | |
| login(self.token) | |
| self.temp_dir = "temp_dataset" | |
| os.makedirs(self.temp_dir, exist_ok=True) | |
| def create_date_folder(self): | |
| current_date = datetime.now().strftime("%Y-%m-%d") | |
| folder_path = os.path.join(self.temp_dir, current_date) | |
| os.makedirs(folder_path, exist_ok=True) | |
| return folder_path, current_date | |
| def save_metadata(self, source_path, target_path, output_path, timestamp): | |
| metadata = { | |
| "timestamp": timestamp, | |
| "source_image": source_path, | |
| "target_image": target_path, | |
| "output_image": output_path, | |
| "date_created": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| return metadata | |
| def upload_to_hf(self, local_folder, date_folder): | |
| try: | |
| self.api.upload_folder( | |
| folder_path=local_folder, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| path_in_repo=date_folder | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"Error uploading to Hugging Face: {str(e)}") | |
| return False | |
| # Hàm swap_face nguyên bản dành cho ghép ảnh tĩnh | |
| def swap_face(source_file, target_file, doFaceEnhancer): | |
| folder_path = None | |
| try: | |
| dataset_handler = FaceIntegrDataset() | |
| folder_path, date_folder = dataset_handler.create_date_folder() | |
| timestamp = datetime.now().strftime("%S-%M-%H-%d-%m-%Y") | |
| source_path = os.path.join(folder_path, f"source_{timestamp}.jpg") | |
| target_path = os.path.join(folder_path, f"target_{timestamp}.jpg") | |
| output_path = os.path.join(folder_path, f"OutputImage{timestamp}.jpg") | |
| if source_file is None or target_file is None: | |
| raise ValueError("Source and target images are required") | |
| Image.fromarray(source_file).save(source_path) | |
| Image.fromarray(target_file).save(target_path) | |
| print("source_path: ", source_path) | |
| print("target_path: ", target_path) | |
| roop.globals.source_path = source_path | |
| roop.globals.target_path = target_path | |
| roop.globals.output_path = normalize_output_path( | |
| roop.globals.source_path, | |
| roop.globals.target_path, | |
| output_path | |
| ) | |
| if doFaceEnhancer: | |
| roop.globals.frame_processors = ["face_swapper", "face_enhancer"] | |
| else: | |
| roop.globals.frame_processors = ["face_swapper"] | |
| roop.globals.headless = True | |
| roop.globals.keep_fps = True | |
| roop.globals.keep_audio = True | |
| roop.globals.keep_frames = False | |
| roop.globals.many_faces = False | |
| roop.globals.video_encoder = "libx264" | |
| roop.globals.video_quality = 18 | |
| roop.globals.max_memory = suggest_max_memory() | |
| roop.globals.execution_providers = decode_execution_providers(["cuda"]) | |
| roop.globals.execution_threads = suggest_execution_threads() | |
| print( | |
| "start process", | |
| roop.globals.source_path, | |
| roop.globals.target_path, | |
| roop.globals.output_path, | |
| ) | |
| for frame_processor in get_frame_processors_modules(roop.globals.frame_processors): | |
| if not frame_processor.pre_check(): | |
| return None | |
| start() | |
| metadata = dataset_handler.save_metadata( | |
| f"source_{timestamp}.jpg", | |
| f"target_{timestamp}.jpg", | |
| f"OutputImage{timestamp}.jpg", | |
| timestamp | |
| ) | |
| metadata_path = os.path.join(folder_path, f"metadata_{timestamp}.json") | |
| with open(metadata_path, 'w') as f: | |
| json.dump(metadata, f, indent=4) | |
| upload_success = dataset_handler.upload_to_hf(folder_path, date_folder) | |
| if upload_success: | |
| print(f"Successfully uploaded files to dataset {dataset_handler.repo_id}") | |
| else: | |
| print("Failed to upload files to Hugging Face dataset") | |
| if os.path.exists(output_path): | |
| output_image = Image.open(output_path) | |
| output_array = np.array(output_image) | |
| shutil.rmtree(folder_path) | |
| return output_array | |
| else: | |
| print("Output image not found") | |
| if folder_path and os.path.exists(folder_path): | |
| shutil.rmtree(folder_path) | |
| return None | |
| except Exception as e: | |
| print(f"Error in face swap process: {str(e)}") | |
| if folder_path and os.path.exists(folder_path): | |
| shutil.rmtree(folder_path) | |
| raise gr.Error(f"Face swap failed: {str(e)}") | |
| # Hàm xử lý ghép mặt cho 1 frame video bằng cách "mượn" thuật toán của roop | |
| def swap_face_frame(frame_bgr, replacement_face_rgb, doFaceEnhancer): | |
| # Chuyển frame từ BGR sang RGB vì PIL làm việc với RGB | |
| frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) | |
| temp_dir = "temp_faceswap_frame" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| timestamp = datetime.now().strftime("%S-%M-%H-%d-%m-%Y") | |
| source_path = os.path.join(temp_dir, f"source_{timestamp}.jpg") | |
| target_path = os.path.join(temp_dir, f"target_{timestamp}.jpg") | |
| output_path = os.path.join(temp_dir, f"OutputImage_{timestamp}.jpg") | |
| Image.fromarray(frame_rgb).save(source_path) | |
| Image.fromarray(replacement_face_rgb).save(target_path) | |
| roop.globals.source_path = source_path | |
| roop.globals.target_path = target_path | |
| roop.globals.output_path = normalize_output_path(source_path, target_path, output_path) | |
| if doFaceEnhancer: | |
| roop.globals.frame_processors = ["face_swapper", "face_enhancer"] | |
| else: | |
| roop.globals.frame_processors = ["face_swapper"] | |
| roop.globals.headless = True | |
| roop.globals.keep_fps = True | |
| roop.globals.keep_audio = True | |
| roop.globals.keep_frames = False | |
| roop.globals.many_faces = False | |
| roop.globals.video_encoder = "libx264" | |
| roop.globals.video_quality = 18 | |
| roop.globals.max_memory = suggest_max_memory() | |
| roop.globals.execution_providers = decode_execution_providers(["cuda"]) | |
| roop.globals.execution_threads = suggest_execution_threads() | |
| start() | |
| if os.path.exists(output_path): | |
| swapped_img = np.array(Image.open(output_path)) | |
| else: | |
| swapped_img = frame_rgb | |
| shutil.rmtree(temp_dir) | |
| return swapped_img | |
| # Hàm xử lý ghép mặt cho video frame-by-frame với insightface để so sánh khuôn mặt | |
| def swap_face_video(reference_face, replacement_face, video_input, similarity_threshold, doFaceEnhancer): | |
| """ | |
| reference_face: Ảnh tham chiếu (RGB) để khóa khuôn mặt | |
| replacement_face: Ảnh ghép (RGB) | |
| video_input: Đường dẫn file video đầu vào | |
| similarity_threshold: Ngưỡng (0.0 - 1.0) cho tỉ lệ tương đồng | |
| doFaceEnhancer: Boolean, có áp dụng cải thiện chất lượng hay không | |
| """ | |
| try: | |
| # Chuẩn bị insightface | |
| fa = FaceAnalysis() | |
| # Loại bỏ nms=0.4 vì hàm prepare() không hỗ trợ argument này | |
| fa.prepare(ctx_id=0) | |
| # Lấy embedding của khuôn mặt tham chiếu | |
| ref_detections = fa.get(reference_face) | |
| if not ref_detections: | |
| raise gr.Error("Không phát hiện khuôn mặt trong ảnh tham chiếu!") | |
| ref_embedding = ref_detections[0].embedding | |
| # Mở video đầu vào | |
| cap = cv2.VideoCapture(video_input) | |
| if not cap.isOpened(): | |
| raise gr.Error("Không mở được video đầu vào!") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| output_video_path = "temp_faceswap_video.mp4" | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) | |
| frame_index = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Chuyển frame sang RGB để insightface xử lý | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| detections = fa.get(frame_rgb) | |
| swap_this_frame = False | |
| for det in detections: | |
| sim = cosine_similarity(det.embedding, ref_embedding) | |
| if sim >= similarity_threshold: | |
| swap_this_frame = True | |
| break | |
| if swap_this_frame: | |
| # Ghép mặt từ replacement_face vào frame | |
| swapped_frame_rgb = swap_face_frame(frame, replacement_face, doFaceEnhancer) | |
| # Chuyển ngược lại sang BGR để ghi video | |
| swapped_frame = cv2.cvtColor(swapped_frame_rgb, cv2.COLOR_RGB2BGR) | |
| else: | |
| swapped_frame = frame | |
| out.write(swapped_frame) | |
| frame_index += 1 | |
| print(f"Đã xử lý frame {frame_index}") | |
| cap.release() | |
| out.release() | |
| return output_video_path | |
| except Exception as e: | |
| print(f"Lỗi khi xử lý video: {str(e)}") | |
| raise gr.Error(f"Face swap video failed: {str(e)}") | |
| # Giao diện Gradio được xây dựng với hai tab: Image và Video | |
| def create_interface(): | |
| custom_css = """ | |
| .container { | |
| max-width: 1200px; | |
| margin: auto; | |
| padding: 20px; | |
| } | |
| .output-image { | |
| min-height: 400px; | |
| border: 1px solid #ccc; | |
| border-radius: 8px; | |
| padding: 10px; | |
| } | |
| """ | |
| title = "Face - Integrator" | |
| description = r""" | |
| Upload source and target images to perform face swap. | |
| """ | |
| article = r""" | |
| <div style="text-align: center; max-width: 650px; margin: 40px auto;"> | |
| <p> | |
| This tool performs face swapping with optional enhancement. | |
| </p> | |
| </div> | |
| """ | |
| with gr.Blocks(title=title, css=custom_css) as app: | |
| gr.Markdown(f"<h1 style='text-align: center;'>{title}</h1>") | |
| gr.Markdown(description) | |
| with gr.Tabs(): | |
| with gr.TabItem("FaceSwap Image"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| source_image = gr.Image( | |
| label="Source Image", | |
| type="numpy", | |
| sources=["upload"] | |
| ) | |
| with gr.Column(scale=1): | |
| target_image = gr.Image( | |
| label="Target Image", | |
| type="numpy", | |
| sources=["upload"] | |
| ) | |
| with gr.Column(scale=1): | |
| output_image = gr.Image( | |
| label="Output Image", | |
| type="numpy", | |
| interactive=False, | |
| elem_classes="output-image" | |
| ) | |
| with gr.Row(): | |
| enhance_checkbox = gr.Checkbox( | |
| label="Apply the algorithm?", | |
| info="Image Quality Improvement", | |
| value=False | |
| ) | |
| with gr.Row(): | |
| process_btn = gr.Button( | |
| "Process Face Swap", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| process_btn.click( | |
| fn=swap_face, | |
| inputs=[source_image, target_image, enhance_checkbox], | |
| outputs=output_image, | |
| api_name="swap_face" | |
| ) | |
| with gr.TabItem("FaceSwap Video"): | |
| gr.Markdown("<h2 style='text-align:center;'>FaceSwap Video</h2>") | |
| with gr.Row(): | |
| ref_image = gr.Image( | |
| label="Ảnh mặt tham chiếu (khóa khuôn mặt)", | |
| type="numpy", | |
| sources=["upload"] | |
| ) | |
| swap_image = gr.Image( | |
| label="Ảnh mặt ghép", | |
| type="numpy", | |
| sources=["upload"] | |
| ) | |
| video_input = gr.Video( | |
| label="Video đầu vào" | |
| ) | |
| similarity_threshold = gr.Slider( | |
| minimum=0.0, | |
| maximum=1.0, | |
| step=0.01, | |
| value=0.7, | |
| label="Tỉ lệ tương đồng" | |
| ) | |
| enhance_checkbox_video = gr.Checkbox( | |
| label="Áp dụng cải thiện chất lượng ảnh", | |
| info="Tùy chọn cải thiện", | |
| value=False | |
| ) | |
| process_video_btn = gr.Button( | |
| "Xử lý FaceSwap Video", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| video_output = gr.Video( | |
| label="Video kết quả" | |
| ) | |
| process_video_btn.click( | |
| fn=swap_face_video, | |
| inputs=[ref_image, swap_image, video_input, similarity_threshold, enhance_checkbox_video], | |
| outputs=video_output, | |
| api_name="swap_face_video" | |
| ) | |
| gr.Markdown(article) | |
| return app | |
| def main(): | |
| app = create_interface() | |
| app.launch(share=False) | |
| if __name__ == "__main__": | |
| main() | |