|
|
|
|
|
""" |
|
|
BackgroundFX Pro - CSP-Safe Application Entry Point |
|
|
Now with: live background preview + sources: Preset / Upload / Gradient / AI Generate |
|
|
(uses utils.cv_processing to avoid circular imports) |
|
|
""" |
|
|
|
|
|
import early_env |
|
|
|
|
|
import os, time |
|
|
from typing import Optional, Dict, Any, Callable, Tuple |
|
|
|
|
|
|
|
|
os.environ['GRADIO_ALLOW_FLAGGING'] = 'never' |
|
|
os.environ['GRADIO_ANALYTICS_ENABLED'] = 'False' |
|
|
os.environ['GRADIO_SERVER_NAME'] = '0.0.0.0' |
|
|
os.environ['GRADIO_SERVER_PORT'] = '7860' |
|
|
|
|
|
|
|
|
try: |
|
|
import gradio_client.utils as gc_utils |
|
|
_orig_get_type = gc_utils.get_type |
|
|
def _patched_get_type(schema): |
|
|
if not isinstance(schema, dict): |
|
|
if isinstance(schema, bool): return "boolean" |
|
|
if isinstance(schema, str): return "string" |
|
|
if isinstance(schema, (int, float)): return "number" |
|
|
return "string" |
|
|
return _orig_get_type(schema) |
|
|
gc_utils.get_type = _patched_get_type |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
from utils.logging_setup import setup_logging, make_logger |
|
|
setup_logging(app_name="backgroundfx") |
|
|
logger = make_logger("entrypoint") |
|
|
logger.info("Entrypoint starting…") |
|
|
|
|
|
|
|
|
from config.app_config import get_config |
|
|
from utils.hardware.device_manager import DeviceManager |
|
|
from utils.system.memory_manager import MemoryManager |
|
|
from models.loaders.model_loader import ModelLoader |
|
|
from processing.video.video_processor import CoreVideoProcessor, ProcessorConfig |
|
|
from processing.audio.audio_processor import AudioProcessor |
|
|
|
|
|
|
|
|
from utils.cv_processing import ( |
|
|
PROFESSIONAL_BACKGROUNDS, |
|
|
validate_video_file, |
|
|
create_professional_background, |
|
|
) |
|
|
|
|
|
|
|
|
class CSPSafeSAM2: |
|
|
def set_image(self, image): |
|
|
self.shape = getattr(image, 'shape', (512, 512, 3)) |
|
|
def predict(self, point_coords=None, point_labels=None, box=None, multimask_output=True, **kwargs): |
|
|
import numpy as np |
|
|
h, w = self.shape[:2] if hasattr(self, 'shape') else (512, 512) |
|
|
n = 3 if multimask_output else 1 |
|
|
return np.ones((n, h, w), dtype=bool), np.array([0.9, 0.8, 0.7][:n]), np.ones((n, h, w), dtype=np.float32) |
|
|
|
|
|
class CSPSafeMatAnyone: |
|
|
def step(self, image_tensor, mask_tensor=None, objects=None, first_frame_pred=False, **kwargs): |
|
|
import torch |
|
|
if hasattr(image_tensor, "shape"): |
|
|
if len(image_tensor.shape) == 3: |
|
|
_, H, W = image_tensor.shape |
|
|
elif len(image_tensor.shape) == 4: |
|
|
_, _, H, W = image_tensor.shape |
|
|
else: |
|
|
H, W = 256, 256 |
|
|
else: |
|
|
H, W = 256, 256 |
|
|
return torch.ones((1, 1, H, W)) |
|
|
def output_prob_to_mask(self, output_prob): |
|
|
return (output_prob > 0.5).float() |
|
|
def process(self, image, mask, **kwargs): |
|
|
return mask |
|
|
|
|
|
|
|
|
import numpy as np |
|
|
import cv2 |
|
|
from PIL import Image |
|
|
|
|
|
PREVIEW_W, PREVIEW_H = 640, 360 |
|
|
|
|
|
def _hex_to_rgb(x: str) -> Tuple[int, int, int]: |
|
|
x = (x or "").strip() |
|
|
if x.startswith("#") and len(x) == 7: |
|
|
return tuple(int(x[i:i+2], 16) for i in (1, 3, 5)) |
|
|
return (255, 255, 255) |
|
|
|
|
|
def _np_to_pil(arr: np.ndarray) -> Image.Image: |
|
|
if arr.dtype != np.uint8: |
|
|
arr = arr.clip(0, 255).astype(np.uint8) |
|
|
return Image.fromarray(arr) |
|
|
|
|
|
def _create_gradient_preview(spec: Dict[str, Any], width: int, height: int) -> np.ndarray: |
|
|
"""Lightweight linear gradient (with rotation) for previews.""" |
|
|
def _to_rgb(c): |
|
|
if isinstance(c, (list, tuple)) and len(c) == 3: |
|
|
return tuple(int(x) for x in c) |
|
|
if isinstance(c, str) and c.startswith("#") and len(c) == 7: |
|
|
return tuple(int(c[i:i+2], 16) for i in (1,3,5)) |
|
|
return (255, 255, 255) |
|
|
start = _to_rgb(spec.get("start", "#222222")) |
|
|
end = _to_rgb(spec.get("end", "#888888")) |
|
|
angle = float(spec.get("angle_deg", 0)) |
|
|
|
|
|
bg = np.zeros((height, width, 3), np.uint8) |
|
|
for y in range(height): |
|
|
t = y / max(1, height - 1) |
|
|
r = int(start[0] * (1 - t) + end[0] * t) |
|
|
g = int(start[1] * (1 - t) + end[1] * t) |
|
|
b = int(start[2] * (1 - t) + end[2] * t) |
|
|
bg[y, :] = (r, g, b) |
|
|
if abs(angle) % 360 < 1e-6: |
|
|
return bg |
|
|
center = (width / 2, height / 2) |
|
|
rot = cv2.getRotationMatrix2D(center, angle, 1.0) |
|
|
return cv2.warpAffine(bg, rot, (width, height), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT_101) |
|
|
|
|
|
|
|
|
class VideoBackgroundApp: |
|
|
def __init__(self): |
|
|
self.config = get_config() |
|
|
self.device_mgr = DeviceManager() |
|
|
self.memory_mgr = MemoryManager(self.device_mgr.get_optimal_device()) |
|
|
self.model_loader = ModelLoader(self.device_mgr, self.memory_mgr) |
|
|
self.audio_proc = AudioProcessor() |
|
|
self.models_loaded = False |
|
|
self.core_processor: Optional[CoreVideoProcessor] = None |
|
|
logger.info("VideoBackgroundApp initialized (device=%s)", self.device_mgr.get_optimal_device()) |
|
|
|
|
|
def _build_processor_config_safely(self) -> ProcessorConfig: |
|
|
""" |
|
|
Build ProcessorConfig including stability knobs if supported by your installed CoreVideoProcessor. |
|
|
If your version doesn't have those fields, we auto-filter them out to avoid TypeError. |
|
|
""" |
|
|
|
|
|
desired: Dict[str, Any] = dict( |
|
|
background_preset="office", |
|
|
write_fps=None, |
|
|
max_model_size=1280, |
|
|
|
|
|
temporal_ema_alpha=0.75, |
|
|
min_iou_to_accept=0.05, |
|
|
dilate_px=6, |
|
|
edge_blur_px=2, |
|
|
|
|
|
use_nvenc=True, |
|
|
nvenc_codec="h264", |
|
|
nvenc_preset="p5", |
|
|
nvenc_cq=18, |
|
|
nvenc_tune_hq=True, |
|
|
nvenc_pix_fmt="yuv420p", |
|
|
) |
|
|
|
|
|
|
|
|
fields = getattr(ProcessorConfig, "__dataclass_fields__", None) |
|
|
if isinstance(fields, dict): |
|
|
filtered = {k: v for k, v in desired.items() if k in fields} |
|
|
else: |
|
|
|
|
|
filtered = { |
|
|
"background_preset": desired["background_preset"], |
|
|
"write_fps": desired["write_fps"], |
|
|
"max_model_size": desired["max_model_size"], |
|
|
"use_nvenc": desired["use_nvenc"], |
|
|
"nvenc_codec": desired["nvenc_codec"], |
|
|
"nvenc_preset": desired["nvenc_preset"], |
|
|
"nvenc_cq": desired["nvenc_cq"], |
|
|
"nvenc_tune_hq": desired["nvenc_tune_hq"], |
|
|
"nvenc_pix_fmt": desired["nvenc_pix_fmt"], |
|
|
} |
|
|
|
|
|
try: |
|
|
return ProcessorConfig(**filtered) |
|
|
except TypeError: |
|
|
|
|
|
return ProcessorConfig( |
|
|
background_preset="office", |
|
|
write_fps=None, |
|
|
max_model_size=1280, |
|
|
use_nvenc=True, |
|
|
nvenc_codec="h264", |
|
|
nvenc_preset="p5", |
|
|
nvenc_cq=18, |
|
|
nvenc_tune_hq=True, |
|
|
nvenc_pix_fmt="yuv420p", |
|
|
) |
|
|
|
|
|
def load_models(self, progress_callback: Optional[Callable] = None) -> str: |
|
|
logger.info("Loading models (CSP-safe)…") |
|
|
try: |
|
|
sam2, matanyone = self.model_loader.load_all_models(progress_callback=progress_callback) |
|
|
except Exception as e: |
|
|
logger.warning("Model loading failed (%s) - Using CSP-safe fallbacks", e) |
|
|
sam2, matanyone = None, None |
|
|
|
|
|
sam2_model = getattr(sam2, "model", sam2) if sam2 else CSPSafeSAM2() |
|
|
matanyone_model = getattr(matanyone, "model", matanyone) if matanyone else CSPSafeMatAnyone() |
|
|
|
|
|
cfg = self._build_processor_config_safely() |
|
|
|
|
|
self.core_processor = CoreVideoProcessor(config=cfg, models=None) |
|
|
self.core_processor.models = type('FakeModelManager', (), { |
|
|
'get_sam2': lambda self_: sam2_model, |
|
|
'get_matanyone': lambda self_: matanyone_model |
|
|
})() |
|
|
|
|
|
self.models_loaded = True |
|
|
logger.info("Models ready (SAM2=%s, MatAnyOne=%s)", |
|
|
type(sam2_model).__name__, type(matanyone_model).__name__) |
|
|
return "Models loaded (CSP-safe; fallbacks in use if actual AI models failed)." |
|
|
|
|
|
|
|
|
def preview_preset(self, preset_key: str) -> Image.Image: |
|
|
key = preset_key if preset_key in PROFESSIONAL_BACKGROUNDS else "office" |
|
|
bg = create_professional_background(key, PREVIEW_W, PREVIEW_H) |
|
|
return _np_to_pil(bg) |
|
|
|
|
|
def preview_upload(self, file) -> Optional[Image.Image]: |
|
|
if file is None: |
|
|
return None |
|
|
try: |
|
|
img = Image.open(file.name).convert("RGB") |
|
|
img = img.resize((PREVIEW_W, PREVIEW_H), Image.LANCZOS) |
|
|
return img |
|
|
except Exception as e: |
|
|
logger.warning("Upload preview failed: %s", e) |
|
|
return None |
|
|
|
|
|
def preview_gradient(self, gtype: str, color1: str, color2: str, angle: int) -> Image.Image: |
|
|
spec = { |
|
|
"type": (gtype or "linear").lower(), |
|
|
"start": _hex_to_rgb(color1 or "#222222"), |
|
|
"end": _hex_to_rgb(color2 or "#888888"), |
|
|
"angle_deg": float(angle or 0), |
|
|
} |
|
|
bg = _create_gradient_preview(spec, PREVIEW_W, PREVIEW_H) |
|
|
return _np_to_pil(bg) |
|
|
|
|
|
def ai_generate_background(self, prompt: str, seed: int, width: int, height: int) -> Tuple[Optional[Image.Image], Optional[str], str]: |
|
|
""" |
|
|
Try generating a background with diffusers; save to /tmp and return (img, path, status). |
|
|
""" |
|
|
try: |
|
|
from diffusers import StableDiffusionPipeline |
|
|
import torch |
|
|
model_id = os.environ.get("BGFX_T2I_MODEL", "stabilityai/stable-diffusion-2-1") |
|
|
dtype = torch.float16 if torch.cuda.is_available() else torch.float32 |
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=dtype).to(device) |
|
|
g = torch.Generator(device=device).manual_seed(int(seed)) if seed is not None else None |
|
|
if device == "cuda": |
|
|
with torch.autocast("cuda"): |
|
|
img = pipe(prompt, height=height, width=width, guidance_scale=7.0, num_inference_steps=25, generator=g).images[0] |
|
|
else: |
|
|
img = pipe(prompt, height=height, width=width, guidance_scale=7.0, num_inference_steps=25, generator=g).images[0] |
|
|
tmp_path = f"/tmp/ai_bg_{int(time.time())}.png" |
|
|
img.save(tmp_path) |
|
|
return img.resize((PREVIEW_W, PREVIEW_H), Image.LANCZOS), tmp_path, f"AI background generated ✓ ({os.path.basename(tmp_path)})" |
|
|
except Exception as e: |
|
|
logger.warning("AI generation unavailable: %s", e) |
|
|
return None, None, f"AI generation unavailable: {e}" |
|
|
|
|
|
|
|
|
def process_video( |
|
|
self, |
|
|
video: str, |
|
|
bg_source: str, |
|
|
preset_key: str, |
|
|
custom_bg_file, |
|
|
grad_type: str, |
|
|
grad_color1: str, |
|
|
grad_color2: str, |
|
|
grad_angle: int, |
|
|
ai_bg_path: Optional[str], |
|
|
): |
|
|
if not self.models_loaded: |
|
|
return None, "Models not loaded yet" |
|
|
|
|
|
if not video: |
|
|
return None, "Please upload a video first." |
|
|
|
|
|
logger.info("process_video called (video=%s, source=%s, preset=%s, file=%s, grad=%s, ai=%s)", |
|
|
video, bg_source, preset_key, getattr(custom_bg_file, "name", None) if custom_bg_file else None, |
|
|
{"type": grad_type, "c1": grad_color1, "c2": grad_color2, "angle": grad_angle}, |
|
|
ai_bg_path) |
|
|
|
|
|
output_path = f"/tmp/output_{int(time.time())}.mp4" |
|
|
|
|
|
|
|
|
ok, reason = validate_video_file(video) |
|
|
if not ok: |
|
|
logger.warning("Invalid/unreadable video: %s (%s)", video, reason) |
|
|
return None, f"Invalid or unreadable video file: {reason}" |
|
|
|
|
|
|
|
|
src = (bg_source or "Preset").lower() |
|
|
if src == "upload" and custom_bg_file is not None: |
|
|
bg_cfg: Dict[str, Any] = {"custom_path": custom_bg_file.name} |
|
|
elif src == "gradient": |
|
|
bg_cfg = { |
|
|
"gradient": { |
|
|
"type": (grad_type or "linear").lower(), |
|
|
"start": _hex_to_rgb(grad_color1 or "#222222"), |
|
|
"end": _hex_to_rgb(grad_color2 or "#888888"), |
|
|
"angle_deg": float(grad_angle or 0), |
|
|
} |
|
|
} |
|
|
elif src == "ai generate" and ai_bg_path: |
|
|
bg_cfg = {"custom_path": ai_bg_path} |
|
|
else: |
|
|
key = preset_key if preset_key in PROFESSIONAL_BACKGROUNDS else "office" |
|
|
bg_cfg = {"background_choice": key} |
|
|
|
|
|
try: |
|
|
result = self.core_processor.process_video( |
|
|
input_path=video, |
|
|
output_path=output_path, |
|
|
bg_config=bg_cfg |
|
|
) |
|
|
logger.info("Core processing done → %s", output_path) |
|
|
|
|
|
output_with_audio = self.audio_proc.add_audio_to_video(video, output_path) |
|
|
logger.info("Audio merged → %s", output_with_audio) |
|
|
|
|
|
frames = (result.get('frames') if isinstance(result, dict) else None) or "n/a" |
|
|
return output_with_audio, f"Processing complete ({frames} frames, background={bg_source})" |
|
|
|
|
|
except Exception as e: |
|
|
logger.exception("Processing failed") |
|
|
return None, f"Processing failed: {e}" |
|
|
|
|
|
|
|
|
def create_csp_safe_gradio(): |
|
|
import gradio as gr |
|
|
app = VideoBackgroundApp() |
|
|
|
|
|
with gr.Blocks( |
|
|
title="BackgroundFX Pro - CSP Safe", |
|
|
analytics_enabled=False, |
|
|
css=""" |
|
|
.gradio-container { max-width: 1100px; margin: auto; } |
|
|
""" |
|
|
) as demo: |
|
|
gr.Markdown("# 🎬 BackgroundFX Pro (CSP-Safe)") |
|
|
gr.Markdown("Replace your video background with cinema-quality AI matting. Now with live background preview.") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
video = gr.Video(label="Upload Video") |
|
|
bg_source = gr.Radio( |
|
|
["Preset", "Upload", "Gradient", "AI Generate"], |
|
|
value="Preset", |
|
|
label="Background Source", |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
|
|
|
preset_choices = list(PROFESSIONAL_BACKGROUNDS.keys()) |
|
|
default_preset = "office" if "office" in preset_choices else (preset_choices[0] if preset_choices else "office") |
|
|
preset_key = gr.Dropdown(choices=preset_choices, value=default_preset, label="Preset") |
|
|
|
|
|
|
|
|
custom_bg = gr.File(label="Custom Background (Image)", file_types=["image"], visible=False) |
|
|
|
|
|
|
|
|
grad_type = gr.Dropdown(choices=["Linear", "Radial"], value="Linear", label="Gradient Type", visible=False) |
|
|
grad_color1 = gr.ColorPicker(value="#222222", label="Start Color", visible=False) |
|
|
grad_color2 = gr.ColorPicker(value="#888888", label="End Color", visible=False) |
|
|
grad_angle = gr.Slider(0, 360, value=0, step=1, label="Angle (degrees)", visible=False) |
|
|
|
|
|
|
|
|
ai_prompt = gr.Textbox(label="AI Prompt", placeholder="e.g., sunlit modern office, soft bokeh, neutral palette", visible=False) |
|
|
ai_seed = gr.Slider(0, 2**31-1, step=1, value=42, label="Seed", visible=False) |
|
|
ai_size = gr.Dropdown(choices=["640x360","960x540","1280x720"], value="640x360", label="AI Image Size", visible=False) |
|
|
ai_go = gr.Button("✨ Generate Background", visible=False, variant="secondary") |
|
|
ai_status = gr.Markdown(visible=False) |
|
|
ai_bg_path_state = gr.State(value=None) |
|
|
|
|
|
btn_load = gr.Button("🔄 Load Models", variant="secondary") |
|
|
btn_run = gr.Button("🎬 Process Video", variant="primary") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
status = gr.Textbox(label="Status", lines=4) |
|
|
bg_preview = gr.Image(label="Background Preview", width=PREVIEW_W, height=PREVIEW_H, interactive=False) |
|
|
out_video = gr.Video(label="Processed Video") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def on_source_toggle(src): |
|
|
src = (src or "Preset").lower() |
|
|
return ( |
|
|
gr.update(visible=(src == "preset")), |
|
|
gr.update(visible=(src == "upload")), |
|
|
gr.update(visible=(src == "gradient")), |
|
|
gr.update(visible=(src == "gradient")), |
|
|
gr.update(visible=(src == "gradient")), |
|
|
gr.update(visible=(src == "gradient")), |
|
|
gr.update(visible=(src == "ai generate")), |
|
|
gr.update(visible=(src == "ai generate")), |
|
|
gr.update(visible=(src == "ai generate")), |
|
|
gr.update(visible=(src == "ai generate")), |
|
|
gr.update(visible=(src == "ai generate")), |
|
|
) |
|
|
bg_source.change( |
|
|
fn=on_source_toggle, |
|
|
inputs=[bg_source], |
|
|
outputs=[preset_key, custom_bg, grad_type, grad_color1, grad_color2, grad_angle, ai_prompt, ai_seed, ai_size, ai_go, ai_status], |
|
|
) |
|
|
|
|
|
|
|
|
def _clear_ai_state(_): |
|
|
return None |
|
|
bg_source.change(fn=_clear_ai_state, inputs=[bg_source], outputs=[ai_bg_path_state]) |
|
|
|
|
|
|
|
|
def on_source_preview(src, pkey, gt, c1, c2, ang): |
|
|
src_l = (src or "Preset").lower() |
|
|
if src_l == "preset": |
|
|
return app.preview_preset(pkey) |
|
|
elif src_l == "gradient": |
|
|
return app.preview_gradient(gt, c1, c2, ang) |
|
|
|
|
|
return gr.update() |
|
|
bg_source.change( |
|
|
fn=on_source_preview, |
|
|
inputs=[bg_source, preset_key, grad_type, grad_color1, grad_color2, grad_angle], |
|
|
outputs=[bg_preview] |
|
|
) |
|
|
|
|
|
|
|
|
preset_key.change(fn=lambda k: app.preview_preset(k), inputs=[preset_key], outputs=[bg_preview]) |
|
|
custom_bg.change(fn=lambda f: app.preview_upload(f), inputs=[custom_bg], outputs=[bg_preview]) |
|
|
for comp in (grad_type, grad_color1, grad_color2, grad_angle): |
|
|
comp.change( |
|
|
fn=lambda gt, c1, c2, ang: app.preview_gradient(gt, c1, c2, ang), |
|
|
inputs=[grad_type, grad_color1, grad_color2, grad_angle], |
|
|
outputs=[bg_preview], |
|
|
) |
|
|
|
|
|
|
|
|
def ai_generate(prompt, seed, size): |
|
|
try: |
|
|
w, h = map(int, size.split("x")) |
|
|
except Exception: |
|
|
w, h = PREVIEW_W, PREVIEW_H |
|
|
img, path, msg = app.ai_generate_background( |
|
|
prompt or "professional modern office background, neutral colors, depth of field", |
|
|
int(seed), w, h |
|
|
) |
|
|
return img, (path or None), msg |
|
|
ai_go.click(fn=ai_generate, inputs=[ai_prompt, ai_seed, ai_size], outputs=[bg_preview, ai_bg_path_state, ai_status]) |
|
|
|
|
|
|
|
|
def safe_load(): |
|
|
msg = app.load_models() |
|
|
logger.info("UI: models loaded") |
|
|
|
|
|
default_key = preset_key.value if hasattr(preset_key, "value") else "office" |
|
|
return msg, app.preview_preset(default_key) |
|
|
btn_load.click(fn=safe_load, outputs=[status, bg_preview]) |
|
|
|
|
|
def safe_process(vid, src, pkey, file, gtype, c1, c2, ang, ai_path): |
|
|
return app.process_video(vid, src, pkey, file, gtype, c1, c2, ang, ai_path) |
|
|
btn_run.click( |
|
|
fn=safe_process, |
|
|
inputs=[video, bg_source, preset_key, custom_bg, grad_type, grad_color1, grad_color2, grad_angle, ai_bg_path_state], |
|
|
outputs=[out_video, status] |
|
|
) |
|
|
|
|
|
return demo |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
logger.info("Launching CSP-safe Gradio interface for Hugging Face Spaces") |
|
|
demo = create_csp_safe_gradio() |
|
|
demo.queue().launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
show_error=True, |
|
|
debug=False, |
|
|
inbrowser=False |
|
|
) |
|
|
|