MogensR's picture
Rename app.py to app_old.py
28090e4
raw
history blame
21.4 kB
#!/usr/bin/env python3
"""
BackgroundFX Pro - CSP-Safe Application Entry Point
Now with: live background preview + sources: Preset / Upload / Gradient / AI Generate
(uses utils.cv_processing to avoid circular imports)
"""
import early_env # <<< must be FIRST
import os, time
from typing import Optional, Dict, Any, Callable, Tuple
# 1) CSP-safe Gradio env
os.environ['GRADIO_ALLOW_FLAGGING'] = 'never'
os.environ['GRADIO_ANALYTICS_ENABLED'] = 'False'
os.environ['GRADIO_SERVER_NAME'] = '0.0.0.0'
os.environ['GRADIO_SERVER_PORT'] = '7860'
# 2) Gradio schema patch
try:
import gradio_client.utils as gc_utils
_orig_get_type = gc_utils.get_type
def _patched_get_type(schema):
if not isinstance(schema, dict):
if isinstance(schema, bool): return "boolean"
if isinstance(schema, str): return "string"
if isinstance(schema, (int, float)): return "number"
return "string"
return _orig_get_type(schema)
gc_utils.get_type = _patched_get_type
except Exception:
pass
# 3) Logging early
from utils.logging_setup import setup_logging, make_logger
setup_logging(app_name="backgroundfx")
logger = make_logger("entrypoint")
logger.info("Entrypoint starting…")
# 4) Imports
from config.app_config import get_config
from utils.hardware.device_manager import DeviceManager
from utils.system.memory_manager import MemoryManager
from models.loaders.model_loader import ModelLoader
from processing.video.video_processor import CoreVideoProcessor, ProcessorConfig
from processing.audio.audio_processor import AudioProcessor
# ⛑️ Bring helpers from the slim, self-contained cv_processing (no circular imports)
from utils.cv_processing import (
PROFESSIONAL_BACKGROUNDS, # dict of presets
validate_video_file, # returns (ok, reason)
create_professional_background, # used for preview defaults
)
# 5) CSP-safe fallbacks for models
class CSPSafeSAM2:
def set_image(self, image):
self.shape = getattr(image, 'shape', (512, 512, 3))
def predict(self, point_coords=None, point_labels=None, box=None, multimask_output=True, **kwargs):
import numpy as np
h, w = self.shape[:2] if hasattr(self, 'shape') else (512, 512)
n = 3 if multimask_output else 1
return np.ones((n, h, w), dtype=bool), np.array([0.9, 0.8, 0.7][:n]), np.ones((n, h, w), dtype=np.float32)
class CSPSafeMatAnyone:
def step(self, image_tensor, mask_tensor=None, objects=None, first_frame_pred=False, **kwargs):
import torch
if hasattr(image_tensor, "shape"):
if len(image_tensor.shape) == 3:
_, H, W = image_tensor.shape
elif len(image_tensor.shape) == 4:
_, _, H, W = image_tensor.shape
else:
H, W = 256, 256
else:
H, W = 256, 256
return torch.ones((1, 1, H, W))
def output_prob_to_mask(self, output_prob):
return (output_prob > 0.5).float()
def process(self, image, mask, **kwargs):
return mask
# ---------- helpers for UI ----------
import numpy as np
import cv2
from PIL import Image
PREVIEW_W, PREVIEW_H = 640, 360 # 16:9
def _hex_to_rgb(x: str) -> Tuple[int, int, int]:
x = (x or "").strip()
if x.startswith("#") and len(x) == 7:
return tuple(int(x[i:i+2], 16) for i in (1, 3, 5))
return (255, 255, 255)
def _np_to_pil(arr: np.ndarray) -> Image.Image:
if arr.dtype != np.uint8:
arr = arr.clip(0, 255).astype(np.uint8)
return Image.fromarray(arr)
def _create_gradient_preview(spec: Dict[str, Any], width: int, height: int) -> np.ndarray:
"""Lightweight linear gradient (with rotation) for previews."""
def _to_rgb(c):
if isinstance(c, (list, tuple)) and len(c) == 3:
return tuple(int(x) for x in c)
if isinstance(c, str) and c.startswith("#") and len(c) == 7:
return tuple(int(c[i:i+2], 16) for i in (1,3,5))
return (255, 255, 255)
start = _to_rgb(spec.get("start", "#222222"))
end = _to_rgb(spec.get("end", "#888888"))
angle = float(spec.get("angle_deg", 0))
bg = np.zeros((height, width, 3), np.uint8)
for y in range(height):
t = y / max(1, height - 1)
r = int(start[0] * (1 - t) + end[0] * t)
g = int(start[1] * (1 - t) + end[1] * t)
b = int(start[2] * (1 - t) + end[2] * t)
bg[y, :] = (r, g, b)
if abs(angle) % 360 < 1e-6:
return bg
center = (width / 2, height / 2)
rot = cv2.getRotationMatrix2D(center, angle, 1.0)
return cv2.warpAffine(bg, rot, (width, height), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT_101)
# ---------- main app ----------
class VideoBackgroundApp:
def __init__(self):
self.config = get_config()
self.device_mgr = DeviceManager()
self.memory_mgr = MemoryManager(self.device_mgr.get_optimal_device())
self.model_loader = ModelLoader(self.device_mgr, self.memory_mgr)
self.audio_proc = AudioProcessor()
self.models_loaded = False
self.core_processor: Optional[CoreVideoProcessor] = None
logger.info("VideoBackgroundApp initialized (device=%s)", self.device_mgr.get_optimal_device())
def _build_processor_config_safely(self) -> ProcessorConfig:
"""
Build ProcessorConfig including stability knobs if supported by your installed CoreVideoProcessor.
If your version doesn't have those fields, we auto-filter them out to avoid TypeError.
"""
# Desired config (includes stability + encoding)
desired: Dict[str, Any] = dict(
background_preset="office",
write_fps=None,
max_model_size=1280,
# --- stability knobs (only used if supported in your CoreVideoProcessor) ---
temporal_ema_alpha=0.75, # 0.6–0.85 typical
min_iou_to_accept=0.05, # reject sudden mask jumps
dilate_px=6, # pad edges for hair/ears
edge_blur_px=2, # calm shimmering edges
# --- encoding (NVENC + fallbacks used inside the processor you installed) ---
use_nvenc=True,
nvenc_codec="h264",
nvenc_preset="p5",
nvenc_cq=18,
nvenc_tune_hq=True,
nvenc_pix_fmt="yuv420p",
)
# Filter against dataclass fields if present
fields = getattr(ProcessorConfig, "__dataclass_fields__", None)
if isinstance(fields, dict):
filtered = {k: v for k, v in desired.items() if k in fields}
else:
# very old ProcessorConfig: just pass the common ones
filtered = {
"background_preset": desired["background_preset"],
"write_fps": desired["write_fps"],
"max_model_size": desired["max_model_size"],
"use_nvenc": desired["use_nvenc"],
"nvenc_codec": desired["nvenc_codec"],
"nvenc_preset": desired["nvenc_preset"],
"nvenc_cq": desired["nvenc_cq"],
"nvenc_tune_hq": desired["nvenc_tune_hq"],
"nvenc_pix_fmt": desired["nvenc_pix_fmt"],
}
try:
return ProcessorConfig(**filtered)
except TypeError:
# final safety: pass minimal args
return ProcessorConfig(
background_preset="office",
write_fps=None,
max_model_size=1280,
use_nvenc=True,
nvenc_codec="h264",
nvenc_preset="p5",
nvenc_cq=18,
nvenc_tune_hq=True,
nvenc_pix_fmt="yuv420p",
)
def load_models(self, progress_callback: Optional[Callable] = None) -> str:
logger.info("Loading models (CSP-safe)…")
try:
sam2, matanyone = self.model_loader.load_all_models(progress_callback=progress_callback)
except Exception as e:
logger.warning("Model loading failed (%s) - Using CSP-safe fallbacks", e)
sam2, matanyone = None, None
sam2_model = getattr(sam2, "model", sam2) if sam2 else CSPSafeSAM2()
matanyone_model = getattr(matanyone, "model", matanyone) if matanyone else CSPSafeMatAnyone()
cfg = self._build_processor_config_safely()
self.core_processor = CoreVideoProcessor(config=cfg, models=None)
self.core_processor.models = type('FakeModelManager', (), {
'get_sam2': lambda self_: sam2_model,
'get_matanyone': lambda self_: matanyone_model
})()
self.models_loaded = True
logger.info("Models ready (SAM2=%s, MatAnyOne=%s)",
type(sam2_model).__name__, type(matanyone_model).__name__)
return "Models loaded (CSP-safe; fallbacks in use if actual AI models failed)."
# ---- PREVIEWS ----
def preview_preset(self, preset_key: str) -> Image.Image:
key = preset_key if preset_key in PROFESSIONAL_BACKGROUNDS else "office"
bg = create_professional_background(key, PREVIEW_W, PREVIEW_H) # RGB
return _np_to_pil(bg)
def preview_upload(self, file) -> Optional[Image.Image]:
if file is None:
return None
try:
img = Image.open(file.name).convert("RGB")
img = img.resize((PREVIEW_W, PREVIEW_H), Image.LANCZOS)
return img
except Exception as e:
logger.warning("Upload preview failed: %s", e)
return None
def preview_gradient(self, gtype: str, color1: str, color2: str, angle: int) -> Image.Image:
spec = {
"type": (gtype or "linear").lower(), # "linear" or "radial" (preview uses linear with rotation)
"start": _hex_to_rgb(color1 or "#222222"),
"end": _hex_to_rgb(color2 or "#888888"),
"angle_deg": float(angle or 0),
}
bg = _create_gradient_preview(spec, PREVIEW_W, PREVIEW_H)
return _np_to_pil(bg)
def ai_generate_background(self, prompt: str, seed: int, width: int, height: int) -> Tuple[Optional[Image.Image], Optional[str], str]:
"""
Try generating a background with diffusers; save to /tmp and return (img, path, status).
"""
try:
from diffusers import StableDiffusionPipeline
import torch
model_id = os.environ.get("BGFX_T2I_MODEL", "stabilityai/stable-diffusion-2-1")
dtype = torch.float16 if torch.cuda.is_available() else torch.float32
device = "cuda" if torch.cuda.is_available() else "cpu"
pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=dtype).to(device)
g = torch.Generator(device=device).manual_seed(int(seed)) if seed is not None else None
if device == "cuda":
with torch.autocast("cuda"):
img = pipe(prompt, height=height, width=width, guidance_scale=7.0, num_inference_steps=25, generator=g).images[0]
else:
img = pipe(prompt, height=height, width=width, guidance_scale=7.0, num_inference_steps=25, generator=g).images[0]
tmp_path = f"/tmp/ai_bg_{int(time.time())}.png"
img.save(tmp_path)
return img.resize((PREVIEW_W, PREVIEW_H), Image.LANCZOS), tmp_path, f"AI background generated ✓ ({os.path.basename(tmp_path)})"
except Exception as e:
logger.warning("AI generation unavailable: %s", e)
return None, None, f"AI generation unavailable: {e}"
# ---- PROCESS VIDEO ----
def process_video(
self,
video: str,
bg_source: str,
preset_key: str,
custom_bg_file,
grad_type: str,
grad_color1: str,
grad_color2: str,
grad_angle: int,
ai_bg_path: Optional[str],
):
if not self.models_loaded:
return None, "Models not loaded yet"
if not video:
return None, "Please upload a video first."
logger.info("process_video called (video=%s, source=%s, preset=%s, file=%s, grad=%s, ai=%s)",
video, bg_source, preset_key, getattr(custom_bg_file, "name", None) if custom_bg_file else None,
{"type": grad_type, "c1": grad_color1, "c2": grad_color2, "angle": grad_angle},
ai_bg_path)
output_path = f"/tmp/output_{int(time.time())}.mp4"
# ✅ Validate input video (tuple: ok, reason)
ok, reason = validate_video_file(video)
if not ok:
logger.warning("Invalid/unreadable video: %s (%s)", video, reason)
return None, f"Invalid or unreadable video file: {reason}"
# Build bg_config based on source
src = (bg_source or "Preset").lower()
if src == "upload" and custom_bg_file is not None:
bg_cfg: Dict[str, Any] = {"custom_path": custom_bg_file.name}
elif src == "gradient":
bg_cfg = {
"gradient": {
"type": (grad_type or "linear").lower(),
"start": _hex_to_rgb(grad_color1 or "#222222"),
"end": _hex_to_rgb(grad_color2 or "#888888"),
"angle_deg": float(grad_angle or 0),
}
}
elif src == "ai generate" and ai_bg_path:
bg_cfg = {"custom_path": ai_bg_path}
else:
key = preset_key if preset_key in PROFESSIONAL_BACKGROUNDS else "office"
bg_cfg = {"background_choice": key}
try:
result = self.core_processor.process_video(
input_path=video,
output_path=output_path,
bg_config=bg_cfg
)
logger.info("Core processing done → %s", output_path)
output_with_audio = self.audio_proc.add_audio_to_video(video, output_path)
logger.info("Audio merged → %s", output_with_audio)
frames = (result.get('frames') if isinstance(result, dict) else None) or "n/a"
return output_with_audio, f"Processing complete ({frames} frames, background={bg_source})"
except Exception as e:
logger.exception("Processing failed")
return None, f"Processing failed: {e}"
# 7) Gradio UI
def create_csp_safe_gradio():
import gradio as gr
app = VideoBackgroundApp()
with gr.Blocks(
title="BackgroundFX Pro - CSP Safe",
analytics_enabled=False,
css="""
.gradio-container { max-width: 1100px; margin: auto; }
"""
) as demo:
gr.Markdown("# 🎬 BackgroundFX Pro (CSP-Safe)")
gr.Markdown("Replace your video background with cinema-quality AI matting. Now with live background preview.")
with gr.Row():
with gr.Column(scale=1):
video = gr.Video(label="Upload Video")
bg_source = gr.Radio(
["Preset", "Upload", "Gradient", "AI Generate"],
value="Preset",
label="Background Source",
interactive=True,
)
# PRESET
preset_choices = list(PROFESSIONAL_BACKGROUNDS.keys())
default_preset = "office" if "office" in preset_choices else (preset_choices[0] if preset_choices else "office")
preset_key = gr.Dropdown(choices=preset_choices, value=default_preset, label="Preset")
# UPLOAD
custom_bg = gr.File(label="Custom Background (Image)", file_types=["image"], visible=False)
# GRADIENT
grad_type = gr.Dropdown(choices=["Linear", "Radial"], value="Linear", label="Gradient Type", visible=False)
grad_color1 = gr.ColorPicker(value="#222222", label="Start Color", visible=False)
grad_color2 = gr.ColorPicker(value="#888888", label="End Color", visible=False)
grad_angle = gr.Slider(0, 360, value=0, step=1, label="Angle (degrees)", visible=False)
# AI
ai_prompt = gr.Textbox(label="AI Prompt", placeholder="e.g., sunlit modern office, soft bokeh, neutral palette", visible=False)
ai_seed = gr.Slider(0, 2**31-1, step=1, value=42, label="Seed", visible=False)
ai_size = gr.Dropdown(choices=["640x360","960x540","1280x720"], value="640x360", label="AI Image Size", visible=False)
ai_go = gr.Button("✨ Generate Background", visible=False, variant="secondary")
ai_status = gr.Markdown(visible=False)
ai_bg_path_state = gr.State(value=None) # store /tmp path
btn_load = gr.Button("🔄 Load Models", variant="secondary")
btn_run = gr.Button("🎬 Process Video", variant="primary")
with gr.Column(scale=1):
status = gr.Textbox(label="Status", lines=4)
bg_preview = gr.Image(label="Background Preview", width=PREVIEW_W, height=PREVIEW_H, interactive=False)
out_video = gr.Video(label="Processed Video")
# ---------- UI wiring ----------
# background source → show/hide controls
def on_source_toggle(src):
src = (src or "Preset").lower()
return (
gr.update(visible=(src == "preset")),
gr.update(visible=(src == "upload")),
gr.update(visible=(src == "gradient")),
gr.update(visible=(src == "gradient")),
gr.update(visible=(src == "gradient")),
gr.update(visible=(src == "gradient")),
gr.update(visible=(src == "ai generate")),
gr.update(visible=(src == "ai generate")),
gr.update(visible=(src == "ai generate")),
gr.update(visible=(src == "ai generate")),
gr.update(visible=(src == "ai generate")),
)
bg_source.change(
fn=on_source_toggle,
inputs=[bg_source],
outputs=[preset_key, custom_bg, grad_type, grad_color1, grad_color2, grad_angle, ai_prompt, ai_seed, ai_size, ai_go, ai_status],
)
# ✅ Clear any previous AI image path when switching source (avoids stale AI background)
def _clear_ai_state(_):
return None
bg_source.change(fn=_clear_ai_state, inputs=[bg_source], outputs=[ai_bg_path_state])
# When source changes, also refresh preview based on visible controls
def on_source_preview(src, pkey, gt, c1, c2, ang):
src_l = (src or "Preset").lower()
if src_l == "preset":
return app.preview_preset(pkey)
elif src_l == "gradient":
return app.preview_gradient(gt, c1, c2, ang)
# For upload/AI we keep whatever the component change handler sets (don’t overwrite)
return gr.update() # no-op
bg_source.change(
fn=on_source_preview,
inputs=[bg_source, preset_key, grad_type, grad_color1, grad_color2, grad_angle],
outputs=[bg_preview]
)
# live previews
preset_key.change(fn=lambda k: app.preview_preset(k), inputs=[preset_key], outputs=[bg_preview])
custom_bg.change(fn=lambda f: app.preview_upload(f), inputs=[custom_bg], outputs=[bg_preview])
for comp in (grad_type, grad_color1, grad_color2, grad_angle):
comp.change(
fn=lambda gt, c1, c2, ang: app.preview_gradient(gt, c1, c2, ang),
inputs=[grad_type, grad_color1, grad_color2, grad_angle],
outputs=[bg_preview],
)
# AI generate
def ai_generate(prompt, seed, size):
try:
w, h = map(int, size.split("x"))
except Exception:
w, h = PREVIEW_W, PREVIEW_H
img, path, msg = app.ai_generate_background(
prompt or "professional modern office background, neutral colors, depth of field",
int(seed), w, h
)
return img, (path or None), msg
ai_go.click(fn=ai_generate, inputs=[ai_prompt, ai_seed, ai_size], outputs=[bg_preview, ai_bg_path_state, ai_status])
# model load / run
def safe_load():
msg = app.load_models()
logger.info("UI: models loaded")
# Set initial preview (preset default)
default_key = preset_key.value if hasattr(preset_key, "value") else "office"
return msg, app.preview_preset(default_key)
btn_load.click(fn=safe_load, outputs=[status, bg_preview])
def safe_process(vid, src, pkey, file, gtype, c1, c2, ang, ai_path):
return app.process_video(vid, src, pkey, file, gtype, c1, c2, ang, ai_path)
btn_run.click(
fn=safe_process,
inputs=[video, bg_source, preset_key, custom_bg, grad_type, grad_color1, grad_color2, grad_angle, ai_bg_path_state],
outputs=[out_video, status]
)
return demo
# 8) Launch
if __name__ == "__main__":
logger.info("Launching CSP-safe Gradio interface for Hugging Face Spaces")
demo = create_csp_safe_gradio()
demo.queue().launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
debug=False,
inbrowser=False
)