|
|
|
|
|
""" |
|
|
Callbacks for BackgroundFX Pro UI |
|
|
--------------------------------- |
|
|
All functions here are *thin* wrappers wired to the Gradio interface. |
|
|
NO IMPORTS FROM core.app AT MODULE LEVEL to avoid circular imports |
|
|
NO HEAVY IMPORTS (cv2, numpy) AT MODULE LEVEL to avoid CSP issues |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
import os |
|
|
import time |
|
|
from typing import Any, Dict, Tuple |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_try_bg_gen = None |
|
|
try: |
|
|
from utils.bg_generator import generate_ai_background as _try_bg_gen |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _generate_ai_background( |
|
|
prompt_text: str, |
|
|
width: int, |
|
|
height: int, |
|
|
bokeh: float, |
|
|
vignette: float, |
|
|
contrast: float, |
|
|
): |
|
|
""" |
|
|
If utils.bg_generator.generate_ai_background exists, use it. |
|
|
Otherwise fall back to a tiny procedural background made with PIL & NumPy. |
|
|
""" |
|
|
if _try_bg_gen is not None: |
|
|
return _try_bg_gen( |
|
|
prompt_text, |
|
|
width=width, |
|
|
height=height, |
|
|
bokeh=bokeh, |
|
|
vignette=vignette, |
|
|
contrast=contrast, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
from pathlib import Path |
|
|
import time, random |
|
|
import numpy as np |
|
|
import cv2 |
|
|
from PIL import Image, ImageFilter, ImageOps |
|
|
|
|
|
TMP_DIR = Path("/tmp/bgfx") |
|
|
TMP_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
palettes = { |
|
|
"office": [(240, 245, 250), (210, 220, 230), (180, 190, 200)], |
|
|
"studio": [(18, 18, 20), (32, 32, 36), (58, 60, 64)], |
|
|
"sunset": [(255,183,77), (255,138,101), (244,143,177)], |
|
|
"forest": [(46,125,50), (102,187,106), (165,214,167)], |
|
|
"ocean": [(33,150,243), (3,169,244), (0,188,212)], |
|
|
"minimal": [(245,246,248), (230,232,236), (214,218,224)], |
|
|
"warm": [(255,224,178), (255,204,128), (255,171,145)], |
|
|
"cool": [(197,202,233), (179,229,252), (178,235,242)], |
|
|
"royal": [(63,81,181), (121,134,203), (159,168,218)], |
|
|
} |
|
|
p = (prompt_text or "").lower() |
|
|
palette = next((pal for k, pal in palettes.items() if k in p), None) |
|
|
if palette is None: |
|
|
random.seed(hash(p) & 0xFFFFFFFF) |
|
|
palette = [tuple(random.randint(90, 200) for _ in range(3)) for _ in range(3)] |
|
|
|
|
|
def _noise(h, w, octaves=4): |
|
|
acc = np.zeros((h, w), np.float32) |
|
|
for o in range(octaves): |
|
|
s = 2**o |
|
|
small = np.random.rand(h // s + 1, w // s + 1).astype(np.float32) |
|
|
acc += cv2.resize(small, (w, h), interpolation=cv2.INTER_LINEAR) / (o + 1) |
|
|
acc /= max(1e-6, acc.max()) |
|
|
return acc |
|
|
|
|
|
def _blend(n, pal): |
|
|
h, w = n.shape |
|
|
thr = [0.33, 0.66] |
|
|
img = np.zeros((h, w, 3), np.float32) |
|
|
c0, c1, c2 = [np.array(c, np.float32) for c in pal] |
|
|
img[n < thr[0]] = c0 |
|
|
mid = (n >= thr[0]) & (n < thr[1]) |
|
|
img[mid] = c1 |
|
|
img[n >= thr[1]] = c2 |
|
|
return Image.fromarray(np.clip(img, 0, 255).astype(np.uint8)) |
|
|
|
|
|
n = _noise(height, width, 4) |
|
|
out = _blend(n, palette) |
|
|
|
|
|
if bokeh > 0: |
|
|
out = out.filter(ImageFilter.GaussianBlur(radius=min(50, max(0, bokeh)))) |
|
|
if vignette > 0: |
|
|
y, x = np.ogrid[:height, :width] |
|
|
cx, cy = width / 2, height / 2 |
|
|
r = np.sqrt((x - cx) ** 2 + (y - cy) ** 2) |
|
|
mask = 1 - np.clip(r / (max(width, height) / 1.2), 0, 1) |
|
|
mask = (mask**2).astype(np.float32) |
|
|
base = np.array(out).astype(np.float32) / 255.0 |
|
|
out = Image.fromarray(np.clip(base * (mask[..., None] * (1 - vignette) + vignette) * 255, 0, 255).astype(np.uint8)) |
|
|
if contrast != 1.0: |
|
|
out = ImageOps.autocontrast(out, cutoff=1) |
|
|
arr = np.array(out).astype(np.float32) |
|
|
mean = arr.mean(axis=(0, 1), keepdims=True) |
|
|
arr = (arr - mean) * float(contrast) + mean |
|
|
out = Image.fromarray(np.clip(arr, 0, 255).astype(np.uint8)) |
|
|
|
|
|
ts = int(time.time() * 1000) |
|
|
path = str((TMP_DIR / f"ai_bg_{ts}.png").resolve()) |
|
|
out.save(path) |
|
|
return out, path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def cb_load_models() -> str: |
|
|
"""Load SAM2 + MatAnyOne and return human-readable status.""" |
|
|
try: |
|
|
|
|
|
from core.app import load_models_with_validation |
|
|
result = load_models_with_validation() |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"❌ Error loading models: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def cb_process_video( |
|
|
vid: str, |
|
|
style: str, |
|
|
custom_bg_path: str | None, |
|
|
use_two: bool, |
|
|
chroma: str, |
|
|
key_color_mode: str, |
|
|
prev_mask: bool, |
|
|
prev_green: bool, |
|
|
): |
|
|
""" |
|
|
Runs the two-stage (or single-stage) pipeline and returns: |
|
|
(processed_video_path | None, status_message:str) |
|
|
""" |
|
|
|
|
|
from core.app import process_video_fixed, PROCESS_CANCELLED |
|
|
|
|
|
|
|
|
if PROCESS_CANCELLED.is_set(): |
|
|
PROCESS_CANCELLED.clear() |
|
|
|
|
|
|
|
|
return process_video_fixed( |
|
|
video_path=vid, |
|
|
background_choice=style, |
|
|
custom_background_path=custom_bg_path, |
|
|
progress_callback=None, |
|
|
use_two_stage=use_two, |
|
|
chroma_preset=chroma, |
|
|
key_color_mode=key_color_mode, |
|
|
preview_mask=prev_mask, |
|
|
preview_greenscreen=prev_green, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def cb_cancel() -> str: |
|
|
try: |
|
|
from core.app import PROCESS_CANCELLED |
|
|
PROCESS_CANCELLED.set() |
|
|
return "Cancellation requested." |
|
|
except Exception as e: |
|
|
return f"Cancel failed: {e}" |
|
|
|
|
|
def cb_status() -> Tuple[Dict[str, Any], Dict[str, Any]]: |
|
|
"""Get current status - NEVER cache, always return fresh data""" |
|
|
try: |
|
|
|
|
|
|
|
|
model_status = { |
|
|
"models_loaded": False, |
|
|
"sam2_loaded": False, |
|
|
"matanyone_loaded": False, |
|
|
"timestamp": time.time() |
|
|
} |
|
|
|
|
|
cache_status = { |
|
|
"cache_disabled": True, |
|
|
"timestamp": time.time() |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
from core.app import get_model_status, get_cache_status |
|
|
|
|
|
real_model_status = get_model_status() |
|
|
real_cache_status = get_cache_status() |
|
|
|
|
|
|
|
|
if isinstance(real_model_status, dict): |
|
|
if real_model_status.get("timestamp", 0) > time.time() - 5: |
|
|
model_status = real_model_status |
|
|
|
|
|
if isinstance(real_cache_status, dict): |
|
|
cache_status = real_cache_status |
|
|
|
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
return model_status, cache_status |
|
|
|
|
|
except Exception as e: |
|
|
return {"error": str(e), "timestamp": time.time()}, {"error": str(e), "timestamp": time.time()} |
|
|
|
|
|
def cb_clear(): |
|
|
"""Clear all outputs""" |
|
|
|
|
|
return None, "", None, "", None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def cb_generate_bg(prompt_text: str, w: int, h: int, b: float, v: float, c: float): |
|
|
"""Generate AI background""" |
|
|
img, path = _generate_ai_background(prompt_text, int(w), int(h), b, v, c) |
|
|
return img, path |
|
|
|
|
|
def cb_use_gen_bg(gen_path: str): |
|
|
""" |
|
|
Use generated background as custom. |
|
|
Returns the path for gr.Image to display. |
|
|
""" |
|
|
if gen_path and os.path.exists(gen_path): |
|
|
return gen_path |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def cb_video_changed(vid_path: str): |
|
|
""" |
|
|
Extract first frame of the uploaded video for a quick preview. |
|
|
Returns a numpy RGB array (Gradio will display it). |
|
|
""" |
|
|
try: |
|
|
if not vid_path: |
|
|
return None |
|
|
|
|
|
|
|
|
import cv2 |
|
|
|
|
|
cap = cv2.VideoCapture(vid_path) |
|
|
ok, frame = cap.read() |
|
|
cap.release() |
|
|
if not ok: |
|
|
return None |
|
|
|
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
return frame_rgb |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
def cb_preset_bg_preview(style: str): |
|
|
""" |
|
|
Generate and display preview for preset backgrounds. |
|
|
Returns image for gr.Image component to display. |
|
|
""" |
|
|
try: |
|
|
from utils.cv_processing import create_professional_background |
|
|
|
|
|
preview_bg = create_professional_background(style, 640, 360) |
|
|
return preview_bg |
|
|
except Exception: |
|
|
return None |