import gradio as gr import cv2 import numpy as np import torch import os import tempfile import subprocess import torch model = torch.hub.load('ultralytics/yolov5', 'custom', path='watermark-detection-73.pt', force_reload=True) model.conf = 0.25 model.iou = 0.45 model.max_det = 1 # ------------------------------ # Helper Functions # ------------------------------ def extract_first_frame(video_path): cap = cv2.VideoCapture(video_path) ret, frame = cap.read() cap.release() if ret: return frame return None def detect_watermark_coordinates(frame): results = model(frame) detections = results.xyxy[0].cpu().numpy() if len(detections) == 0: return None x1, y1, x2, y2, _, _ = detections[0] return int(x1), int(y1), int(x2 - x1), int(y2 - y1) def generate_mask_from_coords(frame_shape, x, y, w, h): mask = np.zeros(frame_shape[:2], dtype=np.uint8) mask[int(y):int(y+h), int(x):int(x+w)] = 255 return mask def apply_inpaint_to_video(video_path, x, y, w, h): temp_dir = tempfile.mkdtemp() cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) output_video_path = os.path.join(temp_dir, "output.mp4") fourcc = cv2.VideoWriter_fourcc(*'mp4v') writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) for _ in range(frame_count): ret, frame = cap.read() if not ret: break mask = generate_mask_from_coords(frame.shape, x, y, w, h) inpainted = cv2.inpaint(frame, mask, 3, cv2.INPAINT_TELEA) writer.write(inpainted) cap.release() writer.release() # Combine processed video with original audio temp_with_audio = os.path.splitext(video_path)[0] + "_no_watermark.mp4" cmd_audio = f'ffmpeg -y -i "{output_video_path}" -i "{video_path}" -c:v copy -c:a aac -map 0:v:0 -map 1:a:0 -shortest "{temp_with_audio}"' subprocess.call(cmd_audio, shell=True) # Re-encode to ensure browser compatibility final_output_path = os.path.splitext(video_path)[0] + "_no_watermark_fixed.mp4" cmd_fix = f'ffmpeg -y -i "{temp_with_audio}" -vf "scale=trunc(iw/2)*2:trunc(ih/2)*2" -c:v libx264 -preset fast -crf 23 -c:a aac -b:a 128k -movflags +faststart "{final_output_path}"' subprocess.call(cmd_fix, shell=True) return final_output_path def apply_inpaint_to_image(image, x, y, w, h): mask = generate_mask_from_coords(image.shape, x, y, w, h) inpainted = cv2.inpaint(image, mask, 3, cv2.INPAINT_TELEA) return inpainted def overlay_box_on_image(image, x, y, w, h): image_with_box = image.copy() cv2.rectangle(image_with_box, (x, y), (x + w, y + h), (0, 255, 0), 2) return image_with_box def get_coords_for_image(image): coords = detect_watermark_coordinates(image) h, w, _ = image.shape if coords: x, y, w_box, h_box = coords return overlay_box_on_image(image, x, y, w_box, h_box), x, y, w_box, h_box, "✅ Auto watermark detected." else: w_box, h_box = int(w * 0.25), int(h * 0.1) x, y = (w - w_box) // 2, (h - h_box) // 2 return overlay_box_on_image(image, x, y, w_box, h_box), x, y, w_box, h_box, "⚠️ No watermark detected. Default box placed." def update_image_live(image, x, y, w, h): return overlay_box_on_image(image, int(x), int(y), int(w), int(h)) def process_uploaded_video(video, x, y, w, h): try: output_path = apply_inpaint_to_video(video, int(x), int(y), int(w), int(h)) return output_path, "✅ Watermark removed from video." except Exception as e: return None, f"❌ Error: {str(e)}" def process_uploaded_image(image, x, y, w, h): try: result = apply_inpaint_to_image(image, int(x), int(y), int(w), int(h)) return result, "✅ Watermark removed from image." except Exception as e: return None, f"❌ Error: {str(e)}" # ------------------------------ # Gradio UI (Merged with Theme) # ------------------------------ with gr.Blocks(theme=gr.themes.Soft(), title="Watermark Remover") as demo: gr.Markdown("

Remove watermarks from both videos and images using AI detection or manual box selection.

") with gr.Tab("📹 Video Watermark Remover"): with gr.Row(): with gr.Column(scale=1): video_input = gr.Video(label="🎞️ Upload Video") auto_btn_v = gr.Button("🔍 Auto Detect Watermark", variant="primary") run_btn_v = gr.Button("🧹 Remove Watermark", variant="secondary") status_v = gr.Textbox(label="Status", interactive=False) output_file_v = gr.File(label="⬇️ Download Cleaned Video") with gr.Column(scale=1): video_frame = gr.Image(label="📍 Watermark Preview", interactive=False) frame_original = gr.State() with gr.Accordion("🔧 Manual Box Adjustment", open=False): x_v = gr.Slider(minimum=0, maximum=2000, label="X Coordinate", step=1) y_v = gr.Slider(minimum=0, maximum=2000, label="Y Coordinate", step=1) w_v = gr.Slider(minimum=10, maximum=2000, label="Width", step=1) h_v = gr.Slider(minimum=10, maximum=2000, label="Height", step=1) auto_btn_v.click( fn=lambda video: get_coords_for_image(extract_first_frame(video)), inputs=video_input, outputs=[video_frame, x_v, y_v, w_v, h_v, status_v], ).then( fn=lambda video: extract_first_frame(video), inputs=video_input, outputs=frame_original, ) for slider in [x_v, y_v, w_v, h_v]: slider.change( fn=update_image_live, inputs=[frame_original, x_v, y_v, w_v, h_v], outputs=video_frame ) run_btn_v.click( fn=process_uploaded_video, inputs=[video_input, x_v, y_v, w_v, h_v], outputs=[output_file_v, status_v] ) with gr.Tab("🖼️ Image Watermark Remover"): with gr.Row(): with gr.Column(scale=1): image_input = gr.Image(label="🖼️ Upload Image") auto_btn_i = gr.Button("🔍 Auto Detect Watermark", variant="primary") run_btn_i = gr.Button("🧹 Remove Watermark", variant="secondary") status_i = gr.Textbox(label="Status", interactive=False) output_image = gr.Image(label="🧼 Cleaned Image") with gr.Column(scale=1): image_display = gr.Image(label="📍 Watermark Preview", interactive=False) image_original = gr.State() with gr.Accordion("🔧 Manual Box Adjustment", open=False): x_i = gr.Slider(minimum=0, maximum=2000, label="X Coordinate", step=1) y_i = gr.Slider(minimum=0, maximum=2000, label="Y Coordinate", step=1) w_i = gr.Slider(minimum=10, maximum=2000, label="Width", step=1) h_i = gr.Slider(minimum=10, maximum=2000, label="Height", step=1) auto_btn_i.click( fn=get_coords_for_image, inputs=image_input, outputs=[image_display, x_i, y_i, w_i, h_i, status_i], ).then( fn=lambda img: img, inputs=image_input, outputs=image_original, ) for slider in [x_i, y_i, w_i, h_i]: slider.change( fn=update_image_live, inputs=[image_original, x_i, y_i, w_i, h_i], outputs=image_display ) run_btn_i.click( fn=process_uploaded_image, inputs=[image_input, x_i, y_i, w_i, h_i], outputs=[output_image, status_i] ) if __name__ == '__main__': demo.launch()