#!/usr/bin/env python3 """ Callbacks for BackgroundFX Pro UI --------------------------------- All functions here are *thin* wrappers wired to the Gradio interface. • key_color_mode parameter already added (matches ui_components.py) • PREVIEW FUNCTIONS NOW IMPLEMENTED: - cb_video_changed → returns first frame • SIMPLIFIED: Background handled by gr.Image component directly """ from __future__ import annotations import os, cv2 from typing import Any, Dict, Tuple import numpy as np from PIL import Image # ---- Core pipeline wrappers (from core/app.py) ---- from core.app import ( load_models_with_validation, process_video_fixed, get_model_status, get_cache_status, PROCESS_CANCELLED, ) # ---- Optional utilities (background generator & previews) ---- _try_bg_gen = None try: from utils.bg_generator import generate_ai_background as _try_bg_gen # type: ignore except Exception: pass # ------------------------------------------------------------------ # LIGHTWEIGHT BG GENERATOR (inline fallback) # ------------------------------------------------------------------ def _generate_ai_background( prompt_text: str, width: int, height: int, bokeh: float, vignette: float, contrast: float, ): """ If utils.bg_generator.generate_ai_background exists, use it. Otherwise fall back to a tiny procedural background made with PIL & NumPy. """ if _try_bg_gen is not None: return _try_bg_gen( prompt_text, width=width, height=height, bokeh=bokeh, vignette=vignette, contrast=contrast, ) # -------- Tiny fallback (PIL only) -------- from pathlib import Path import time, random, numpy as np from PIL import Image, ImageFilter, ImageOps TMP_DIR = Path("/tmp/bgfx") TMP_DIR.mkdir(parents=True, exist_ok=True) palettes = { "office": [(240, 245, 250), (210, 220, 230), (180, 190, 200)], "studio": [(18, 18, 20), (32, 32, 36), (58, 60, 64)], "sunset": [(255,183,77), (255,138,101), (244,143,177)], "forest": [(46,125,50), (102,187,106), (165,214,167)], "ocean": [(33,150,243), (3,169,244), (0,188,212)], "minimal": [(245,246,248), (230,232,236), (214,218,224)], "warm": [(255,224,178), (255,204,128), (255,171,145)], "cool": [(197,202,233), (179,229,252), (178,235,242)], "royal": [(63,81,181), (121,134,203), (159,168,218)], } p = (prompt_text or "").lower() palette = next((pal for k, pal in palettes.items() if k in p), None) if palette is None: random.seed(hash(p) & 0xFFFFFFFF) palette = [tuple(random.randint(90, 200) for _ in range(3)) for _ in range(3)] def _noise(h, w, octaves=4): acc = np.zeros((h, w), np.float32) for o in range(octaves): s = 2**o small = np.random.rand(h // s + 1, w // s + 1).astype(np.float32) acc += cv2.resize(small, (w, h), interpolation=cv2.INTER_LINEAR) / (o + 1) acc /= max(1e-6, acc.max()) return acc def _blend(n, pal): h, w = n.shape thr = [0.33, 0.66] img = np.zeros((h, w, 3), np.float32) c0, c1, c2 = [np.array(c, np.float32) for c in pal] img[n < thr[0]] = c0 mid = (n >= thr[0]) & (n < thr[1]) img[mid] = c1 img[n >= thr[1]] = c2 return Image.fromarray(np.clip(img, 0, 255).astype(np.uint8)) n = _noise(height, width, 4) out = _blend(n, palette) if bokeh > 0: out = out.filter(ImageFilter.GaussianBlur(radius=min(50, max(0, bokeh)))) if vignette > 0: y, x = np.ogrid[:height, :width] cx, cy = width / 2, height / 2 r = np.sqrt((x - cx) ** 2 + (y - cy) ** 2) mask = 1 - np.clip(r / (max(width, height) / 1.2), 0, 1) mask = (mask**2).astype(np.float32) base = np.array(out).astype(np.float32) / 255.0 out = Image.fromarray(np.clip(base * (mask[..., None] * (1 - vignette) + vignette) * 255, 0, 255).astype(np.uint8)) if contrast != 1.0: out = ImageOps.autocontrast(out, cutoff=1) arr = np.array(out).astype(np.float32) mean = arr.mean(axis=(0, 1), keepdims=True) arr = (arr - mean) * float(contrast) + mean out = Image.fromarray(np.clip(arr, 0, 255).astype(np.uint8)) ts = int(time.time() * 1000) path = str((TMP_DIR / f"ai_bg_{ts}.png").resolve()) out.save(path) return out, path # ------------------------------------------------------------------ # MODEL MANAGEMENT # ------------------------------------------------------------------ def cb_load_models() -> str: """Load SAM2 + MatAnyOne and return human-readable status.""" return load_models_with_validation() # ------------------------------------------------------------------ # MAIN video-processing callback # ------------------------------------------------------------------ def cb_process_video( vid: str, style: str, custom_bg_path: str | None, # Now directly a filepath from gr.Image use_two: bool, chroma: str, key_color_mode: str, prev_mask: bool, prev_green: bool, ): """ Runs the two-stage (or single-stage) pipeline and returns: (processed_video_path | None, status_message:str) """ # Reset any prior cancel flag when user clicks Run if PROCESS_CANCELLED.is_set(): PROCESS_CANCELLED.clear() # custom_bg_path is now directly a filepath string from gr.Image # No need to extract from dict # Fire the core function return process_video_fixed( video_path=vid, background_choice=style, custom_background_path=custom_bg_path, # Direct path progress_callback=None, use_two_stage=use_two, chroma_preset=chroma, key_color_mode=key_color_mode, preview_mask=prev_mask, preview_greenscreen=prev_green, ) # ------------------------------------------------------------------ # CANCEL / STATUS / CLEAR # ------------------------------------------------------------------ def cb_cancel() -> str: try: PROCESS_CANCELLED.set() return "Cancellation requested." except Exception as e: return f"Cancel failed: {e}" def cb_status() -> Tuple[Dict[str, Any], Dict[str, Any]]: try: return get_model_status(), get_cache_status() except Exception as e: return {"error": str(e)}, {"error": str(e)} def cb_clear(): """Clear all outputs""" # Return blanks for (out_video, status, gen_preview, gen_path, custom_bg) return None, "", None, "", None # ------------------------------------------------------------------ # AI BACKGROUND # ------------------------------------------------------------------ def cb_generate_bg(prompt_text: str, w: int, h: int, b: float, v: float, c: float): """Generate AI background""" img, path = _generate_ai_background(prompt_text, int(w), int(h), b, v, c) return img, path def cb_use_gen_bg(gen_path: str): """ Use generated background as custom. Returns the path for gr.Image to display. """ if gen_path and os.path.exists(gen_path): return gen_path # gr.Image can display from path return None # ------------------------------------------------------------------ # PREVIEWS # ------------------------------------------------------------------ def cb_video_changed(vid_path: str): """ Extract first frame of the uploaded video for a quick preview. Returns a numpy RGB array (Gradio will display it). """ try: if not vid_path: return None cap = cv2.VideoCapture(vid_path) ok, frame = cap.read() cap.release() if not ok: return None # Convert BGR→RGB for correct colours in the browser frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) return frame_rgb except Exception: return None def cb_preset_bg_preview(style: str): """ Generate and display preview for preset backgrounds. Returns image for gr.Image component to display. """ try: from utils.cv_processing import create_professional_background # Create a preview-sized version preview_bg = create_professional_background(style, 640, 360) return preview_bg except Exception: return None