#!/usr/bin/env python3 """ BackgroundFX Pro - CSP-Safe Application Entry Point Now with: live background preview + sources: Preset / Upload / Gradient / AI Generate (uses utils.cv_processing to avoid circular imports) """ import early_env # <<< must be FIRST import os, time from typing import Optional, Dict, Any, Callable, Tuple # 1) CSP-safe Gradio env os.environ['GRADIO_ALLOW_FLAGGING'] = 'never' os.environ['GRADIO_ANALYTICS_ENABLED'] = 'False' os.environ['GRADIO_SERVER_NAME'] = '0.0.0.0' os.environ['GRADIO_SERVER_PORT'] = '7860' # 2) Gradio schema patch try: import gradio_client.utils as gc_utils _orig_get_type = gc_utils.get_type def _patched_get_type(schema): if not isinstance(schema, dict): if isinstance(schema, bool): return "boolean" if isinstance(schema, str): return "string" if isinstance(schema, (int, float)): return "number" return "string" return _orig_get_type(schema) gc_utils.get_type = _patched_get_type except Exception: pass # 3) Logging early from utils.logging_setup import setup_logging, make_logger setup_logging(app_name="backgroundfx") logger = make_logger("entrypoint") logger.info("Entrypoint starting…") # 4) Imports from config.app_config import get_config from utils.hardware.device_manager import DeviceManager from utils.system.memory_manager import MemoryManager from models.loaders.model_loader import ModelLoader from processing.video.video_processor import CoreVideoProcessor, ProcessorConfig from processing.audio.audio_processor import AudioProcessor # ⛑️ Bring helpers from the slim, self-contained cv_processing (no circular imports) from utils.cv_processing import ( PROFESSIONAL_BACKGROUNDS, # dict of presets validate_video_file, # returns (ok, reason) create_professional_background, # used for preview defaults ) # 5) CSP-safe fallbacks for models class CSPSafeSAM2: def set_image(self, image): self.shape = getattr(image, 'shape', (512, 512, 3)) def predict(self, point_coords=None, point_labels=None, box=None, multimask_output=True, **kwargs): import numpy as np h, w = self.shape[:2] if hasattr(self, 'shape') else (512, 512) n = 3 if multimask_output else 1 return np.ones((n, h, w), dtype=bool), np.array([0.9, 0.8, 0.7][:n]), np.ones((n, h, w), dtype=np.float32) class CSPSafeMatAnyone: def step(self, image_tensor, mask_tensor=None, objects=None, first_frame_pred=False, **kwargs): import torch if hasattr(image_tensor, "shape"): if len(image_tensor.shape) == 3: _, H, W = image_tensor.shape elif len(image_tensor.shape) == 4: _, _, H, W = image_tensor.shape else: H, W = 256, 256 else: H, W = 256, 256 return torch.ones((1, 1, H, W)) def output_prob_to_mask(self, output_prob): return (output_prob > 0.5).float() def process(self, image, mask, **kwargs): return mask # ---------- helpers for UI ---------- import numpy as np import cv2 from PIL import Image PREVIEW_W, PREVIEW_H = 640, 360 # 16:9 def _hex_to_rgb(x: str) -> Tuple[int, int, int]: x = (x or "").strip() if x.startswith("#") and len(x) == 7: return tuple(int(x[i:i+2], 16) for i in (1, 3, 5)) return (255, 255, 255) def _np_to_pil(arr: np.ndarray) -> Image.Image: if arr.dtype != np.uint8: arr = arr.clip(0, 255).astype(np.uint8) return Image.fromarray(arr) def _create_gradient_preview(spec: Dict[str, Any], width: int, height: int) -> np.ndarray: """Lightweight linear gradient (with rotation) for previews.""" def _to_rgb(c): if isinstance(c, (list, tuple)) and len(c) == 3: return tuple(int(x) for x in c) if isinstance(c, str) and c.startswith("#") and len(c) == 7: return tuple(int(c[i:i+2], 16) for i in (1,3,5)) return (255, 255, 255) start = _to_rgb(spec.get("start", "#222222")) end = _to_rgb(spec.get("end", "#888888")) angle = float(spec.get("angle_deg", 0)) bg = np.zeros((height, width, 3), np.uint8) for y in range(height): t = y / max(1, height - 1) r = int(start[0] * (1 - t) + end[0] * t) g = int(start[1] * (1 - t) + end[1] * t) b = int(start[2] * (1 - t) + end[2] * t) bg[y, :] = (r, g, b) if abs(angle) % 360 < 1e-6: return bg center = (width / 2, height / 2) rot = cv2.getRotationMatrix2D(center, angle, 1.0) return cv2.warpAffine(bg, rot, (width, height), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT_101) # ---------- main app ---------- class VideoBackgroundApp: def __init__(self): self.config = get_config() self.device_mgr = DeviceManager() self.memory_mgr = MemoryManager(self.device_mgr.get_optimal_device()) self.model_loader = ModelLoader(self.device_mgr, self.memory_mgr) self.audio_proc = AudioProcessor() self.models_loaded = False self.core_processor: Optional[CoreVideoProcessor] = None logger.info("VideoBackgroundApp initialized (device=%s)", self.device_mgr.get_optimal_device()) def _build_processor_config_safely(self) -> ProcessorConfig: """ Build ProcessorConfig including stability knobs if supported by your installed CoreVideoProcessor. If your version doesn't have those fields, we auto-filter them out to avoid TypeError. """ # Desired config (includes stability + encoding) desired: Dict[str, Any] = dict( background_preset="office", write_fps=None, max_model_size=1280, # --- stability knobs (only used if supported in your CoreVideoProcessor) --- temporal_ema_alpha=0.75, # 0.6–0.85 typical min_iou_to_accept=0.05, # reject sudden mask jumps dilate_px=6, # pad edges for hair/ears edge_blur_px=2, # calm shimmering edges # --- encoding (NVENC + fallbacks used inside the processor you installed) --- use_nvenc=True, nvenc_codec="h264", nvenc_preset="p5", nvenc_cq=18, nvenc_tune_hq=True, nvenc_pix_fmt="yuv420p", ) # Filter against dataclass fields if present fields = getattr(ProcessorConfig, "__dataclass_fields__", None) if isinstance(fields, dict): filtered = {k: v for k, v in desired.items() if k in fields} else: # very old ProcessorConfig: just pass the common ones filtered = { "background_preset": desired["background_preset"], "write_fps": desired["write_fps"], "max_model_size": desired["max_model_size"], "use_nvenc": desired["use_nvenc"], "nvenc_codec": desired["nvenc_codec"], "nvenc_preset": desired["nvenc_preset"], "nvenc_cq": desired["nvenc_cq"], "nvenc_tune_hq": desired["nvenc_tune_hq"], "nvenc_pix_fmt": desired["nvenc_pix_fmt"], } try: return ProcessorConfig(**filtered) except TypeError: # final safety: pass minimal args return ProcessorConfig( background_preset="office", write_fps=None, max_model_size=1280, use_nvenc=True, nvenc_codec="h264", nvenc_preset="p5", nvenc_cq=18, nvenc_tune_hq=True, nvenc_pix_fmt="yuv420p", ) def load_models(self, progress_callback: Optional[Callable] = None) -> str: logger.info("Loading models (CSP-safe)…") try: sam2, matanyone = self.model_loader.load_all_models(progress_callback=progress_callback) except Exception as e: logger.warning("Model loading failed (%s) - Using CSP-safe fallbacks", e) sam2, matanyone = None, None sam2_model = getattr(sam2, "model", sam2) if sam2 else CSPSafeSAM2() matanyone_model = getattr(matanyone, "model", matanyone) if matanyone else CSPSafeMatAnyone() cfg = self._build_processor_config_safely() self.core_processor = CoreVideoProcessor(config=cfg, models=None) self.core_processor.models = type('FakeModelManager', (), { 'get_sam2': lambda self_: sam2_model, 'get_matanyone': lambda self_: matanyone_model })() self.models_loaded = True logger.info("Models ready (SAM2=%s, MatAnyOne=%s)", type(sam2_model).__name__, type(matanyone_model).__name__) return "Models loaded (CSP-safe; fallbacks in use if actual AI models failed)." # ---- PREVIEWS ---- def preview_preset(self, preset_key: str) -> Image.Image: key = preset_key if preset_key in PROFESSIONAL_BACKGROUNDS else "office" bg = create_professional_background(key, PREVIEW_W, PREVIEW_H) # RGB return _np_to_pil(bg) def preview_upload(self, file) -> Optional[Image.Image]: if file is None: return None try: img = Image.open(file.name).convert("RGB") img = img.resize((PREVIEW_W, PREVIEW_H), Image.LANCZOS) return img except Exception as e: logger.warning("Upload preview failed: %s", e) return None def preview_gradient(self, gtype: str, color1: str, color2: str, angle: int) -> Image.Image: spec = { "type": (gtype or "linear").lower(), # "linear" or "radial" (preview uses linear with rotation) "start": _hex_to_rgb(color1 or "#222222"), "end": _hex_to_rgb(color2 or "#888888"), "angle_deg": float(angle or 0), } bg = _create_gradient_preview(spec, PREVIEW_W, PREVIEW_H) return _np_to_pil(bg) def ai_generate_background(self, prompt: str, seed: int, width: int, height: int) -> Tuple[Optional[Image.Image], Optional[str], str]: """ Try generating a background with diffusers; save to /tmp and return (img, path, status). """ try: from diffusers import StableDiffusionPipeline import torch model_id = os.environ.get("BGFX_T2I_MODEL", "stabilityai/stable-diffusion-2-1") dtype = torch.float16 if torch.cuda.is_available() else torch.float32 device = "cuda" if torch.cuda.is_available() else "cpu" pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=dtype).to(device) g = torch.Generator(device=device).manual_seed(int(seed)) if seed is not None else None if device == "cuda": with torch.autocast("cuda"): img = pipe(prompt, height=height, width=width, guidance_scale=7.0, num_inference_steps=25, generator=g).images[0] else: img = pipe(prompt, height=height, width=width, guidance_scale=7.0, num_inference_steps=25, generator=g).images[0] tmp_path = f"/tmp/ai_bg_{int(time.time())}.png" img.save(tmp_path) return img.resize((PREVIEW_W, PREVIEW_H), Image.LANCZOS), tmp_path, f"AI background generated ✓ ({os.path.basename(tmp_path)})" except Exception as e: logger.warning("AI generation unavailable: %s", e) return None, None, f"AI generation unavailable: {e}" # ---- PROCESS VIDEO ---- def process_video( self, video: str, bg_source: str, preset_key: str, custom_bg_file, grad_type: str, grad_color1: str, grad_color2: str, grad_angle: int, ai_bg_path: Optional[str], ): if not self.models_loaded: return None, "Models not loaded yet" if not video: return None, "Please upload a video first." logger.info("process_video called (video=%s, source=%s, preset=%s, file=%s, grad=%s, ai=%s)", video, bg_source, preset_key, getattr(custom_bg_file, "name", None) if custom_bg_file else None, {"type": grad_type, "c1": grad_color1, "c2": grad_color2, "angle": grad_angle}, ai_bg_path) output_path = f"/tmp/output_{int(time.time())}.mp4" # ✅ Validate input video (tuple: ok, reason) ok, reason = validate_video_file(video) if not ok: logger.warning("Invalid/unreadable video: %s (%s)", video, reason) return None, f"Invalid or unreadable video file: {reason}" # Build bg_config based on source src = (bg_source or "Preset").lower() if src == "upload" and custom_bg_file is not None: bg_cfg: Dict[str, Any] = {"custom_path": custom_bg_file.name} elif src == "gradient": bg_cfg = { "gradient": { "type": (grad_type or "linear").lower(), "start": _hex_to_rgb(grad_color1 or "#222222"), "end": _hex_to_rgb(grad_color2 or "#888888"), "angle_deg": float(grad_angle or 0), } } elif src == "ai generate" and ai_bg_path: bg_cfg = {"custom_path": ai_bg_path} else: key = preset_key if preset_key in PROFESSIONAL_BACKGROUNDS else "office" bg_cfg = {"background_choice": key} try: result = self.core_processor.process_video( input_path=video, output_path=output_path, bg_config=bg_cfg ) logger.info("Core processing done → %s", output_path) output_with_audio = self.audio_proc.add_audio_to_video(video, output_path) logger.info("Audio merged → %s", output_with_audio) frames = (result.get('frames') if isinstance(result, dict) else None) or "n/a" return output_with_audio, f"Processing complete ({frames} frames, background={bg_source})" except Exception as e: logger.exception("Processing failed") return None, f"Processing failed: {e}" # 7) Gradio UI def create_csp_safe_gradio(): import gradio as gr app = VideoBackgroundApp() with gr.Blocks( title="BackgroundFX Pro - CSP Safe", analytics_enabled=False, css=""" .gradio-container { max-width: 1100px; margin: auto; } """ ) as demo: gr.Markdown("# 🎬 BackgroundFX Pro (CSP-Safe)") gr.Markdown("Replace your video background with cinema-quality AI matting. Now with live background preview.") with gr.Row(): with gr.Column(scale=1): video = gr.Video(label="Upload Video") bg_source = gr.Radio( ["Preset", "Upload", "Gradient", "AI Generate"], value="Preset", label="Background Source", interactive=True, ) # PRESET preset_choices = list(PROFESSIONAL_BACKGROUNDS.keys()) default_preset = "office" if "office" in preset_choices else (preset_choices[0] if preset_choices else "office") preset_key = gr.Dropdown(choices=preset_choices, value=default_preset, label="Preset") # UPLOAD custom_bg = gr.File(label="Custom Background (Image)", file_types=["image"], visible=False) # GRADIENT grad_type = gr.Dropdown(choices=["Linear", "Radial"], value="Linear", label="Gradient Type", visible=False) grad_color1 = gr.ColorPicker(value="#222222", label="Start Color", visible=False) grad_color2 = gr.ColorPicker(value="#888888", label="End Color", visible=False) grad_angle = gr.Slider(0, 360, value=0, step=1, label="Angle (degrees)", visible=False) # AI ai_prompt = gr.Textbox(label="AI Prompt", placeholder="e.g., sunlit modern office, soft bokeh, neutral palette", visible=False) ai_seed = gr.Slider(0, 2**31-1, step=1, value=42, label="Seed", visible=False) ai_size = gr.Dropdown(choices=["640x360","960x540","1280x720"], value="640x360", label="AI Image Size", visible=False) ai_go = gr.Button("✨ Generate Background", visible=False, variant="secondary") ai_status = gr.Markdown(visible=False) ai_bg_path_state = gr.State(value=None) # store /tmp path btn_load = gr.Button("🔄 Load Models", variant="secondary") btn_run = gr.Button("🎬 Process Video", variant="primary") with gr.Column(scale=1): status = gr.Textbox(label="Status", lines=4) bg_preview = gr.Image(label="Background Preview", width=PREVIEW_W, height=PREVIEW_H, interactive=False) out_video = gr.Video(label="Processed Video") # ---------- UI wiring ---------- # background source → show/hide controls def on_source_toggle(src): src = (src or "Preset").lower() return ( gr.update(visible=(src == "preset")), gr.update(visible=(src == "upload")), gr.update(visible=(src == "gradient")), gr.update(visible=(src == "gradient")), gr.update(visible=(src == "gradient")), gr.update(visible=(src == "gradient")), gr.update(visible=(src == "ai generate")), gr.update(visible=(src == "ai generate")), gr.update(visible=(src == "ai generate")), gr.update(visible=(src == "ai generate")), gr.update(visible=(src == "ai generate")), ) bg_source.change( fn=on_source_toggle, inputs=[bg_source], outputs=[preset_key, custom_bg, grad_type, grad_color1, grad_color2, grad_angle, ai_prompt, ai_seed, ai_size, ai_go, ai_status], ) # ✅ Clear any previous AI image path when switching source (avoids stale AI background) def _clear_ai_state(_): return None bg_source.change(fn=_clear_ai_state, inputs=[bg_source], outputs=[ai_bg_path_state]) # When source changes, also refresh preview based on visible controls def on_source_preview(src, pkey, gt, c1, c2, ang): src_l = (src or "Preset").lower() if src_l == "preset": return app.preview_preset(pkey) elif src_l == "gradient": return app.preview_gradient(gt, c1, c2, ang) # For upload/AI we keep whatever the component change handler sets (don’t overwrite) return gr.update() # no-op bg_source.change( fn=on_source_preview, inputs=[bg_source, preset_key, grad_type, grad_color1, grad_color2, grad_angle], outputs=[bg_preview] ) # live previews preset_key.change(fn=lambda k: app.preview_preset(k), inputs=[preset_key], outputs=[bg_preview]) custom_bg.change(fn=lambda f: app.preview_upload(f), inputs=[custom_bg], outputs=[bg_preview]) for comp in (grad_type, grad_color1, grad_color2, grad_angle): comp.change( fn=lambda gt, c1, c2, ang: app.preview_gradient(gt, c1, c2, ang), inputs=[grad_type, grad_color1, grad_color2, grad_angle], outputs=[bg_preview], ) # AI generate def ai_generate(prompt, seed, size): try: w, h = map(int, size.split("x")) except Exception: w, h = PREVIEW_W, PREVIEW_H img, path, msg = app.ai_generate_background( prompt or "professional modern office background, neutral colors, depth of field", int(seed), w, h ) return img, (path or None), msg ai_go.click(fn=ai_generate, inputs=[ai_prompt, ai_seed, ai_size], outputs=[bg_preview, ai_bg_path_state, ai_status]) # model load / run def safe_load(): msg = app.load_models() logger.info("UI: models loaded") # Set initial preview (preset default) default_key = preset_key.value if hasattr(preset_key, "value") else "office" return msg, app.preview_preset(default_key) btn_load.click(fn=safe_load, outputs=[status, bg_preview]) def safe_process(vid, src, pkey, file, gtype, c1, c2, ang, ai_path): return app.process_video(vid, src, pkey, file, gtype, c1, c2, ang, ai_path) btn_run.click( fn=safe_process, inputs=[video, bg_source, preset_key, custom_bg, grad_type, grad_color1, grad_color2, grad_angle, ai_bg_path_state], outputs=[out_video, status] ) return demo # 8) Launch if __name__ == "__main__": logger.info("Launching CSP-safe Gradio interface for Hugging Face Spaces") demo = create_csp_safe_gradio() demo.queue().launch( server_name="0.0.0.0", server_port=7860, show_error=True, debug=False, inbrowser=False )