Spaces:
Running
Running
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| import os | |
| import tempfile | |
| import subprocess | |
| import torch | |
| model = torch.hub.load('ultralytics/yolov5', 'custom', path='watermark-detection-73.pt', force_reload=True) | |
| model.conf = 0.25 | |
| model.iou = 0.45 | |
| model.max_det = 1 | |
| # ------------------------------ | |
| # Helper Functions | |
| # ------------------------------ | |
| def extract_first_frame(video_path): | |
| cap = cv2.VideoCapture(video_path) | |
| ret, frame = cap.read() | |
| cap.release() | |
| if ret: | |
| return frame | |
| return None | |
| def detect_watermark_coordinates(frame): | |
| results = model(frame) | |
| detections = results.xyxy[0].cpu().numpy() | |
| if len(detections) == 0: | |
| return None | |
| x1, y1, x2, y2, _, _ = detections[0] | |
| return int(x1), int(y1), int(x2 - x1), int(y2 - y1) | |
| def generate_mask_from_coords(frame_shape, x, y, w, h): | |
| mask = np.zeros(frame_shape[:2], dtype=np.uint8) | |
| mask[int(y):int(y+h), int(x):int(x+w)] = 255 | |
| return mask | |
| def apply_inpaint_to_video(video_path, x, y, w, h): | |
| temp_dir = tempfile.mkdtemp() | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| output_video_path = os.path.join(temp_dir, "output.mp4") | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) | |
| for _ in range(frame_count): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| mask = generate_mask_from_coords(frame.shape, x, y, w, h) | |
| inpainted = cv2.inpaint(frame, mask, 3, cv2.INPAINT_TELEA) | |
| writer.write(inpainted) | |
| cap.release() | |
| writer.release() | |
| # Combine processed video with original audio | |
| temp_with_audio = os.path.splitext(video_path)[0] + "_no_watermark.mp4" | |
| cmd_audio = f'ffmpeg -y -i "{output_video_path}" -i "{video_path}" -c:v copy -c:a aac -map 0:v:0 -map 1:a:0 -shortest "{temp_with_audio}"' | |
| subprocess.call(cmd_audio, shell=True) | |
| # Re-encode to ensure browser compatibility | |
| final_output_path = os.path.splitext(video_path)[0] + "_no_watermark_fixed.mp4" | |
| cmd_fix = f'ffmpeg -y -i "{temp_with_audio}" -vf "scale=trunc(iw/2)*2:trunc(ih/2)*2" -c:v libx264 -preset fast -crf 23 -c:a aac -b:a 128k -movflags +faststart "{final_output_path}"' | |
| subprocess.call(cmd_fix, shell=True) | |
| return final_output_path | |
| def apply_inpaint_to_image(image, x, y, w, h): | |
| mask = generate_mask_from_coords(image.shape, x, y, w, h) | |
| inpainted = cv2.inpaint(image, mask, 3, cv2.INPAINT_TELEA) | |
| return inpainted | |
| def overlay_box_on_image(image, x, y, w, h): | |
| image_with_box = image.copy() | |
| cv2.rectangle(image_with_box, (x, y), (x + w, y + h), (0, 255, 0), 2) | |
| return image_with_box | |
| def get_coords_for_image(image): | |
| coords = detect_watermark_coordinates(image) | |
| h, w, _ = image.shape | |
| if coords: | |
| x, y, w_box, h_box = coords | |
| return overlay_box_on_image(image, x, y, w_box, h_box), x, y, w_box, h_box, "β Auto watermark detected." | |
| else: | |
| w_box, h_box = int(w * 0.25), int(h * 0.1) | |
| x, y = (w - w_box) // 2, (h - h_box) // 2 | |
| return overlay_box_on_image(image, x, y, w_box, h_box), x, y, w_box, h_box, "β οΈ No watermark detected. Default box placed." | |
| def update_image_live(image, x, y, w, h): | |
| return overlay_box_on_image(image, int(x), int(y), int(w), int(h)) | |
| def process_uploaded_video(video, x, y, w, h): | |
| try: | |
| output_path = apply_inpaint_to_video(video, int(x), int(y), int(w), int(h)) | |
| return output_path, "β Watermark removed from video." | |
| except Exception as e: | |
| return None, f"β Error: {str(e)}" | |
| def process_uploaded_image(image, x, y, w, h): | |
| try: | |
| result = apply_inpaint_to_image(image, int(x), int(y), int(w), int(h)) | |
| return result, "β Watermark removed from image." | |
| except Exception as e: | |
| return None, f"β Error: {str(e)}" | |
| # ------------------------------ | |
| # Gradio UI (Merged with Theme) | |
| # ------------------------------ | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Watermark Remover") as demo: | |
| gr.Markdown("<p style='text-align: center;'>Remove watermarks from both videos and images using AI detection or manual box selection.</p>") | |
| with gr.Tab("πΉ Video Watermark Remover"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video_input = gr.Video(label="ποΈ Upload Video") | |
| auto_btn_v = gr.Button("π Auto Detect Watermark", variant="primary") | |
| run_btn_v = gr.Button("π§Ή Remove Watermark", variant="secondary") | |
| status_v = gr.Textbox(label="Status", interactive=False) | |
| output_file_v = gr.File(label="β¬οΈ Download Cleaned Video") | |
| with gr.Column(scale=1): | |
| video_frame = gr.Image(label="π Watermark Preview", interactive=False) | |
| frame_original = gr.State() | |
| with gr.Accordion("π§ Manual Box Adjustment", open=False): | |
| x_v = gr.Slider(minimum=0, maximum=2000, label="X Coordinate", step=1) | |
| y_v = gr.Slider(minimum=0, maximum=2000, label="Y Coordinate", step=1) | |
| w_v = gr.Slider(minimum=10, maximum=2000, label="Width", step=1) | |
| h_v = gr.Slider(minimum=10, maximum=2000, label="Height", step=1) | |
| auto_btn_v.click( | |
| fn=lambda video: get_coords_for_image(extract_first_frame(video)), | |
| inputs=video_input, | |
| outputs=[video_frame, x_v, y_v, w_v, h_v, status_v], | |
| ).then( | |
| fn=lambda video: extract_first_frame(video), | |
| inputs=video_input, | |
| outputs=frame_original, | |
| ) | |
| for slider in [x_v, y_v, w_v, h_v]: | |
| slider.change( | |
| fn=update_image_live, | |
| inputs=[frame_original, x_v, y_v, w_v, h_v], | |
| outputs=video_frame | |
| ) | |
| run_btn_v.click( | |
| fn=process_uploaded_video, | |
| inputs=[video_input, x_v, y_v, w_v, h_v], | |
| outputs=[output_file_v, status_v] | |
| ) | |
| with gr.Tab("πΌοΈ Image Watermark Remover"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| image_input = gr.Image(label="πΌοΈ Upload Image") | |
| auto_btn_i = gr.Button("π Auto Detect Watermark", variant="primary") | |
| run_btn_i = gr.Button("π§Ή Remove Watermark", variant="secondary") | |
| status_i = gr.Textbox(label="Status", interactive=False) | |
| output_image = gr.Image(label="π§Ό Cleaned Image") | |
| with gr.Column(scale=1): | |
| image_display = gr.Image(label="π Watermark Preview", interactive=False) | |
| image_original = gr.State() | |
| with gr.Accordion("π§ Manual Box Adjustment", open=False): | |
| x_i = gr.Slider(minimum=0, maximum=2000, label="X Coordinate", step=1) | |
| y_i = gr.Slider(minimum=0, maximum=2000, label="Y Coordinate", step=1) | |
| w_i = gr.Slider(minimum=10, maximum=2000, label="Width", step=1) | |
| h_i = gr.Slider(minimum=10, maximum=2000, label="Height", step=1) | |
| auto_btn_i.click( | |
| fn=get_coords_for_image, | |
| inputs=image_input, | |
| outputs=[image_display, x_i, y_i, w_i, h_i, status_i], | |
| ).then( | |
| fn=lambda img: img, | |
| inputs=image_input, | |
| outputs=image_original, | |
| ) | |
| for slider in [x_i, y_i, w_i, h_i]: | |
| slider.change( | |
| fn=update_image_live, | |
| inputs=[image_original, x_i, y_i, w_i, h_i], | |
| outputs=image_display | |
| ) | |
| run_btn_i.click( | |
| fn=process_uploaded_image, | |
| inputs=[image_input, x_i, y_i, w_i, h_i], | |
| outputs=[output_image, status_i] | |
| ) | |
| if __name__ == '__main__': | |
| demo.launch() |