|
|
""" |
|
|
Complete utils/__init__.py with all required functions |
|
|
Provides direct implementations to avoid import recursion |
|
|
""" |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import torch |
|
|
import logging |
|
|
from typing import Optional, Tuple, Dict, Any, List |
|
|
import tempfile |
|
|
import os |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
PROFESSIONAL_BACKGROUNDS = { |
|
|
"office": {"color": (240, 248, 255), "gradient": True}, |
|
|
"studio": {"color": (32, 32, 32), "gradient": False}, |
|
|
"nature": {"color": (34, 139, 34), "gradient": True}, |
|
|
"abstract": {"color": (75, 0, 130), "gradient": True}, |
|
|
"white": {"color": (255, 255, 255), "gradient": False}, |
|
|
"black": {"color": (0, 0, 0), "gradient": False} |
|
|
} |
|
|
|
|
|
def validate_video_file(video_path: str) -> bool: |
|
|
"""Validate if video file is readable""" |
|
|
try: |
|
|
if not os.path.exists(video_path): |
|
|
return False |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
return False |
|
|
|
|
|
ret, frame = cap.read() |
|
|
cap.release() |
|
|
return ret and frame is not None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Video validation failed: {e}") |
|
|
return False |
|
|
|
|
|
def segment_person_hq(frame: np.ndarray, use_sam2: bool = True) -> Optional[np.ndarray]: |
|
|
"""High-quality person segmentation using SAM2 or fallback methods""" |
|
|
try: |
|
|
if use_sam2: |
|
|
|
|
|
try: |
|
|
from sam2.sam2_image_predictor import SAM2ImagePredictor |
|
|
from sam2.build_sam import build_sam2 |
|
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
|
|
|
sam_checkpoint = hf_hub_download("facebook/sam2-hiera-base-plus", "sam2_hiera_b+.pt") |
|
|
sam_model = build_sam2(model_name='sam2_hiera_base_plus_t', ckpt_path=sam_checkpoint) |
|
|
predictor = SAM2ImagePredictor(sam_model) |
|
|
|
|
|
|
|
|
predictor.set_image(frame) |
|
|
|
|
|
|
|
|
h, w = frame.shape[:2] |
|
|
center_point = np.array([[w//2, h//2]]) |
|
|
center_label = np.array([1]) |
|
|
|
|
|
masks, scores, _ = predictor.predict( |
|
|
point_coords=center_point, |
|
|
point_labels=center_label, |
|
|
multimask_output=False |
|
|
) |
|
|
|
|
|
return masks[0] if len(masks) > 0 else None |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"SAM2 segmentation failed: {e}, falling back to simple method") |
|
|
|
|
|
|
|
|
return _simple_person_segmentation(frame) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Person segmentation failed: {e}") |
|
|
return None |
|
|
|
|
|
def _simple_person_segmentation(frame: np.ndarray) -> np.ndarray: |
|
|
"""Simple person segmentation using color-based methods""" |
|
|
|
|
|
hsv = cv2.cvtColor(frame, cv2.COLOR_RGB2HSV) |
|
|
|
|
|
|
|
|
|
|
|
lower_green = np.array([40, 40, 40]) |
|
|
upper_green = np.array([80, 255, 255]) |
|
|
green_mask = cv2.inRange(hsv, lower_green, upper_green) |
|
|
|
|
|
|
|
|
lower_white = np.array([0, 0, 200]) |
|
|
upper_white = np.array([180, 30, 255]) |
|
|
white_mask = cv2.inRange(hsv, lower_white, upper_white) |
|
|
|
|
|
|
|
|
bg_mask = cv2.bitwise_or(green_mask, white_mask) |
|
|
|
|
|
|
|
|
person_mask = cv2.bitwise_not(bg_mask) |
|
|
|
|
|
|
|
|
kernel = np.ones((5, 5), np.uint8) |
|
|
person_mask = cv2.morphologyEx(person_mask, cv2.MORPH_CLOSE, kernel) |
|
|
person_mask = cv2.morphologyEx(person_mask, cv2.MORPH_OPEN, kernel) |
|
|
|
|
|
|
|
|
return person_mask.astype(np.float32) / 255.0 |
|
|
|
|
|
def refine_mask_hq(mask: np.ndarray, frame: np.ndarray, use_matanyone: bool = True) -> np.ndarray: |
|
|
"""High-quality mask refinement using MatAnyone or fallback methods""" |
|
|
try: |
|
|
if use_matanyone: |
|
|
try: |
|
|
from matanyone import InferenceCore |
|
|
|
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
processor = InferenceCore(model_name="PeiqingYang/MatAnyone-v1.0", device=device) |
|
|
|
|
|
|
|
|
frame_pil = Image.fromarray(frame.astype(np.uint8)) |
|
|
mask_pil = Image.fromarray((mask * 255).astype(np.uint8)) |
|
|
|
|
|
|
|
|
refined_mask = processor.infer(frame_pil, mask_pil) |
|
|
|
|
|
|
|
|
return np.array(refined_mask).astype(np.float32) / 255.0 |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"MatAnyone refinement failed: {e}, using simple refinement") |
|
|
|
|
|
|
|
|
return _simple_mask_refinement(mask, frame) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Mask refinement failed: {e}") |
|
|
return mask |
|
|
|
|
|
def _simple_mask_refinement(mask: np.ndarray, frame: np.ndarray) -> np.ndarray: |
|
|
"""Simple mask refinement using OpenCV operations""" |
|
|
|
|
|
mask_uint8 = (mask * 255).astype(np.uint8) |
|
|
|
|
|
|
|
|
mask_blurred = cv2.GaussianBlur(mask_uint8, (5, 5), 0) |
|
|
|
|
|
|
|
|
mask_refined = cv2.bilateralFilter(mask_blurred, 9, 75, 75) |
|
|
|
|
|
|
|
|
return mask_refined.astype(np.float32) / 255.0 |
|
|
|
|
|
def replace_background_hq(frame: np.ndarray, mask: np.ndarray, background: np.ndarray) -> np.ndarray: |
|
|
"""High-quality background replacement with proper compositing""" |
|
|
try: |
|
|
|
|
|
h, w = frame.shape[:2] |
|
|
background_resized = cv2.resize(background, (w, h)) |
|
|
|
|
|
|
|
|
if len(mask.shape) == 2: |
|
|
mask_3d = np.stack([mask] * 3, axis=-1) |
|
|
else: |
|
|
mask_3d = mask |
|
|
|
|
|
|
|
|
mask_feathered = _apply_feathering(mask_3d) |
|
|
|
|
|
|
|
|
result = frame * mask_feathered + background_resized * (1 - mask_feathered) |
|
|
|
|
|
return result.astype(np.uint8) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Background replacement failed: {e}") |
|
|
return frame |
|
|
|
|
|
def _apply_feathering(mask: np.ndarray, feather_amount: int = 3) -> np.ndarray: |
|
|
"""Apply feathering to mask edges for smoother blending""" |
|
|
if len(mask.shape) == 3: |
|
|
|
|
|
mask_single = mask[:, :, 0] |
|
|
else: |
|
|
mask_single = mask |
|
|
|
|
|
|
|
|
mask_feathered = cv2.GaussianBlur(mask_single, (feather_amount*2+1, feather_amount*2+1), 0) |
|
|
|
|
|
|
|
|
if len(mask.shape) == 3: |
|
|
mask_feathered = np.stack([mask_feathered] * 3, axis=-1) |
|
|
|
|
|
return mask_feathered |
|
|
|
|
|
def create_professional_background(bg_type: str, width: int, height: int) -> np.ndarray: |
|
|
"""Create professional background of specified type and size""" |
|
|
try: |
|
|
if bg_type not in PROFESSIONAL_BACKGROUNDS: |
|
|
bg_type = "office" |
|
|
|
|
|
config = PROFESSIONAL_BACKGROUNDS[bg_type] |
|
|
color = config["color"] |
|
|
use_gradient = config["gradient"] |
|
|
|
|
|
if use_gradient: |
|
|
|
|
|
background = _create_gradient_background(color, width, height) |
|
|
else: |
|
|
|
|
|
background = np.full((height, width, 3), color, dtype=np.uint8) |
|
|
|
|
|
return background |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Background creation failed: {e}") |
|
|
|
|
|
return np.full((height, width, 3), (255, 255, 255), dtype=np.uint8) |
|
|
|
|
|
def _create_gradient_background(base_color: Tuple[int, int, int], width: int, height: int) -> np.ndarray: |
|
|
"""Create a gradient background from base color""" |
|
|
|
|
|
r, g, b = base_color |
|
|
|
|
|
|
|
|
dark_color = (int(r * 0.7), int(g * 0.7), int(b * 0.7)) |
|
|
|
|
|
|
|
|
background = np.zeros((height, width, 3), dtype=np.uint8) |
|
|
|
|
|
for y in range(height): |
|
|
|
|
|
blend = y / height |
|
|
|
|
|
|
|
|
current_r = int(dark_color[0] * (1 - blend) + r * blend) |
|
|
current_g = int(dark_color[1] * (1 - blend) + g * blend) |
|
|
current_b = int(dark_color[2] * (1 - blend) + b * blend) |
|
|
|
|
|
background[y, :] = [current_r, current_g, current_b] |
|
|
|
|
|
return background |
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
"segment_person_hq", |
|
|
"refine_mask_hq", |
|
|
"replace_background_hq", |
|
|
"create_professional_background", |
|
|
"PROFESSIONAL_BACKGROUNDS", |
|
|
"validate_video_file" |
|
|
] |
|
|
|