Spaces:
Running
on
Zero
Running
on
Zero
| import contextlib | |
| import functools | |
| import importlib | |
| import math | |
| import os | |
| import shlex | |
| import shutil | |
| import socket | |
| import subprocess | |
| import sys | |
| import uuid | |
| from collections.abc import Callable, Sequence | |
| from enum import Enum | |
| from pathlib import Path | |
| from typing import TypeVar | |
| import folder_paths | |
| import numpy as np | |
| import numpy.typing as npt | |
| import requests | |
| import torch | |
| from PIL import Image | |
| from .install import pip_map | |
| try: | |
| from .log import log | |
| except ImportError: | |
| try: | |
| from log import log | |
| log.warn("Imported log without relative path") | |
| except ImportError: | |
| import logging | |
| log = logging.getLogger("comfy mtb utils") | |
| log.warn("[comfy mtb] You probably called the file outside a module.") | |
| # region SANITY_CHECK Utilities | |
| def make_report(): | |
| pass | |
| # endregion | |
| # region NFOV | |
| class numpy_NFOV: | |
| def __init__(self, fov=None, height: int = 400, width: int = 800): | |
| self.field_of_view = fov or [0.45, 0.45] | |
| self.PI = np.pi | |
| self.PI_2 = np.pi * 0.5 | |
| self.PI2 = np.pi * 2.0 | |
| self.height = height | |
| self.width = width | |
| self.screen_points = self._get_screen_img() | |
| def _get_coord_rad(self, is_center_point, center_point=None): | |
| if is_center_point: | |
| center_point = np.array(center_point) | |
| return (center_point * 2 - 1) * np.array([self.PI, self.PI_2]) | |
| else: | |
| return ( | |
| (self.screen_points * 2 - 1) | |
| * np.array([self.PI, self.PI_2]) | |
| * (np.ones(self.screen_points.shape) * self.field_of_view) | |
| ) | |
| def _get_screen_img(self): | |
| xx, yy = np.meshgrid( | |
| np.linspace(0, 1, self.width), np.linspace(0, 1, self.height) | |
| ) | |
| return np.array([xx.ravel(), yy.ravel()]).T | |
| def _calc_spherical_to_gnomonic(self, converted_screen_coord): | |
| x = converted_screen_coord.T[0] | |
| y = converted_screen_coord.T[1] | |
| rou = np.sqrt(x**2 + y**2) | |
| c = np.arctan(rou) | |
| sin_c = np.sin(c) | |
| cos_c = np.cos(c) | |
| lat = np.arcsin( | |
| cos_c * np.sin(self.cp[1]) + (y * sin_c * np.cos(self.cp[1])) / rou | |
| ) | |
| lon = self.cp[0] + np.arctan2( | |
| x * sin_c, | |
| rou * np.cos(self.cp[1]) * cos_c - y * np.sin(self.cp[1]) * sin_c, | |
| ) | |
| lat = (lat / self.PI_2 + 1.0) * 0.5 | |
| lon = (lon / self.PI + 1.0) * 0.5 | |
| return np.array([lon, lat]).T | |
| def _bilinear_interpolation(self, screen_coord): | |
| uf = np.mod(screen_coord.T[0], 1) * self.frame_width # long - width | |
| vf = np.mod(screen_coord.T[1], 1) * self.frame_height # lat - height | |
| x0 = np.floor(uf).astype(int) # coord of pixel to bottom left | |
| y0 = np.floor(vf).astype(int) | |
| x2 = np.add( | |
| x0, np.ones(uf.shape).astype(int) | |
| ) # coords of pixel to top right | |
| y2 = np.add(y0, np.ones(vf.shape).astype(int)) | |
| base_y0 = np.multiply(y0, self.frame_width) | |
| base_y2 = np.multiply(y2, self.frame_width) | |
| A_idx = np.add(base_y0, x0) | |
| B_idx = np.add(base_y2, x0) | |
| C_idx = np.add(base_y0, x2) | |
| D_idx = np.add(base_y2, x2) | |
| flat_img = np.reshape(self.frame, [-1, self.frame_channel]) | |
| A = np.take(flat_img, A_idx, axis=0) | |
| B = np.take(flat_img, B_idx, axis=0) | |
| C = np.take(flat_img, C_idx, axis=0) | |
| D = np.take(flat_img, D_idx, axis=0) | |
| wa = np.multiply(x2 - uf, y2 - vf) | |
| wb = np.multiply(x2 - uf, vf - y0) | |
| wc = np.multiply(uf - x0, y2 - vf) | |
| wd = np.multiply(uf - x0, vf - y0) | |
| # interpolate | |
| AA = np.multiply(A, np.array([wa, wa, wa]).T) | |
| BB = np.multiply(B, np.array([wb, wb, wb]).T) | |
| CC = np.multiply(C, np.array([wc, wc, wc]).T) | |
| DD = np.multiply(D, np.array([wd, wd, wd]).T) | |
| nfov = np.reshape( | |
| np.round(AA + BB + CC + DD).astype(np.uint8), | |
| [self.height, self.width, 3], | |
| ) | |
| return nfov | |
| def to_nfov(self, frame, center_point): | |
| self.frame = frame | |
| self.frame_height = frame.shape[0] | |
| self.frame_width = frame.shape[1] | |
| self.frame_channel = frame.shape[2] | |
| self.cp = self._get_coord_rad( | |
| center_point=center_point, is_center_point=True | |
| ) | |
| converted_screen_coord = self._get_coord_rad(is_center_point=False) | |
| return self._bilinear_interpolation( | |
| self._calc_spherical_to_gnomonic(converted_screen_coord) | |
| ) | |
| # endregion | |
| # region SERVER Utilities | |
| class IPChecker: | |
| def __init__(self): | |
| self.ips = list(self.get_local_ips()) | |
| log.debug(f"Found {len(self.ips)} local ips") | |
| self.checked_ips: set[str] = set() | |
| def get_working_ip(self, test_url_template: str): | |
| for ip in self.ips: | |
| if ip not in self.checked_ips: | |
| self.checked_ips.add(ip) | |
| test_url = test_url_template.format(ip) | |
| if self._test_url(test_url): | |
| return ip | |
| return None | |
| def get_local_ips(prefix: str = "192.168."): | |
| hostname = socket.gethostname() | |
| log.debug(f"Getting local ips for {hostname}") | |
| for info in socket.getaddrinfo(hostname, None): | |
| # Filter out IPv6 addresses if you only want IPv4 | |
| log.debug(info) | |
| # if info[1] == socket.SOCK_STREAM and | |
| if info[0] == socket.AF_INET and info[4][0].startswith(prefix): | |
| yield info[4][0] | |
| def _test_url(self, url: str): | |
| try: | |
| response = requests.get(url, timeout=10) | |
| return response.status_code == 200 | |
| except Exception: | |
| return False | |
| def get_server_info(): | |
| from comfy.cli_args import args | |
| ip_checker = IPChecker() | |
| base_url: str = args.listen | |
| if base_url == "0.0.0.0": | |
| log.debug("Server set to 0.0.0.0, we will try to resolve the host IP") | |
| base_url = ip_checker.get_working_ip( | |
| f"http://{{}}:{args.port}/history" | |
| ) | |
| log.debug(f"Setting ip to {base_url}") | |
| return (base_url, args.port) | |
| # endregion | |
| # region MISC Utilities | |
| # TODO: use mtb.core directly instead of copying parts here | |
| T = TypeVar("T", bound="StringConvertibleEnum") | |
| class StringConvertibleEnum(Enum): | |
| """Base class for enums with utility methods for string conversion and member listing.""" | |
| def from_str(cls: type[T], label: str | T) -> T: | |
| """ | |
| Convert a string to the corresponding enum value (case sensitive). | |
| Args: | |
| label (Union[str, T]): The string or enum value to convert. | |
| Returns | |
| ------- | |
| T: The corresponding enum value. | |
| Raises | |
| ------ | |
| ValueError: If the label does not correspond to any enum member. | |
| """ | |
| if isinstance(label, cls): | |
| return label | |
| if isinstance(label, str): | |
| # from key | |
| if label in cls.__members__: | |
| return cls[label] | |
| for member in cls: | |
| if member.value == label: | |
| return member | |
| raise ValueError( | |
| f"Unknown label: '{label}'. Valid members: {list(cls.__members__.keys())}, " | |
| f"valid values: {cls.list_members()}" | |
| ) | |
| def to_str(cls: type[T], enum_value: T) -> str: | |
| """ | |
| Convert an enum value to its string representation. | |
| Args: | |
| enum_value (T): The enum value to convert. | |
| Returns | |
| ------- | |
| str: The string representation of the enum value. | |
| Raises | |
| ------ | |
| ValueError: If the enum value is invalid. | |
| """ | |
| if isinstance(enum_value, cls): | |
| return enum_value.value | |
| raise ValueError(f"Invalid Enum: {enum_value}") | |
| def list_members(cls: type[T]) -> list[str]: | |
| """ | |
| Return a list of string representations of all enum members. | |
| Returns | |
| ------- | |
| List[str]: List of all enum member values. | |
| """ | |
| return [enum.value for enum in cls] | |
| def __str__(self) -> str: | |
| """ | |
| Returns the string representation of the enum value. | |
| Returns | |
| ------- | |
| str: The string representation of the enum value. | |
| """ | |
| return self.value | |
| class Precision(StringConvertibleEnum): | |
| FULL = "full" | |
| FP32 = "fp32" | |
| FP16 = "fp16" | |
| BF16 = "bf16" | |
| FP8 = "fp8" | |
| def to_dtype(self): | |
| match self: | |
| case Precision.FP32 | Precision.FULL: | |
| return torch.float32 | |
| case Precision.FP16: | |
| return torch.float16 | |
| case Precision.BF16: | |
| return torch.bfloat16 | |
| case Precision.FP8: | |
| return torch.float8_e4m3fn | |
| class Operation(StringConvertibleEnum): | |
| COPY = "copy" | |
| CONVERT = "convert" | |
| DELETE = "delete" | |
| def backup_file( | |
| fp: Path, | |
| target: Path | None = None, | |
| backup_dir: str = ".bak", | |
| suffix: str | None = None, | |
| prefix: str | None = None, | |
| ): | |
| if not fp.exists(): | |
| raise FileNotFoundError(f"No file found at {fp}") | |
| backup_directory = target or fp.parent / backup_dir | |
| backup_directory.mkdir(parents=True, exist_ok=True) | |
| stem = fp.stem | |
| if suffix or prefix: | |
| new_stem = f"{prefix or ''}{stem}{suffix or ''}" | |
| else: | |
| new_stem = f"{stem}_{uuid.uuid4()}" | |
| backup_file_path = backup_directory / f"{new_stem}{fp.suffix}" | |
| # Perform the backup | |
| shutil.copy(fp, backup_file_path) | |
| log.debug(f"File backed up to {backup_file_path}") | |
| def hex_to_rgb(hex_color): | |
| try: | |
| hex_color = hex_color.lstrip("#") | |
| return tuple(int(hex_color[i : i + 2], 16) for i in (0, 2, 4)) | |
| except ValueError: | |
| log.error(f"Invalid hex color: {hex_color}") | |
| return (0, 0, 0) | |
| def add_path(path, prepend=False): | |
| if isinstance(path, list): | |
| for p in path: | |
| add_path(p, prepend) | |
| return | |
| if isinstance(path, Path): | |
| path = path.resolve().as_posix() | |
| if path not in sys.path: | |
| if prepend: | |
| sys.path.insert(0, path) | |
| else: | |
| sys.path.append(path) | |
| def run_command(cmd, ignored_lines_start=None): | |
| if ignored_lines_start is None: | |
| ignored_lines_start = [] | |
| if isinstance(cmd, str): | |
| shell_cmd = cmd | |
| elif isinstance(cmd, list): | |
| shell_cmd = " ".join( | |
| arg.as_posix() if isinstance(arg, Path) else shlex.quote(str(arg)) | |
| for arg in cmd | |
| ) | |
| else: | |
| raise ValueError( | |
| "Invalid 'cmd' argument. It must be a string or a list of arguments." | |
| ) | |
| try: | |
| _run_command(shell_cmd, ignored_lines_start) | |
| except subprocess.CalledProcessError as e: | |
| print( | |
| f"Command failed with return code: {e.returncode}", file=sys.stderr | |
| ) | |
| print(e.stderr.strip(), file=sys.stderr) | |
| except KeyboardInterrupt: | |
| print("Command execution interrupted.") | |
| def _run_command(shell_cmd, ignored_lines_start): | |
| log.debug(f"Running {shell_cmd}") | |
| result = subprocess.run( | |
| shell_cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| shell=True, | |
| check=True, | |
| ) | |
| stdout_lines = result.stdout.strip().split("\n") | |
| stderr_lines = result.stderr.strip().split("\n") | |
| # Print stdout, skipping ignored lines | |
| for line in stdout_lines: | |
| if not any(line.startswith(ign) for ign in ignored_lines_start): | |
| print(line) | |
| # Print stderr | |
| for line in stderr_lines: | |
| print(line, file=sys.stderr) | |
| print("Command executed successfully!") | |
| def import_install(package_name): | |
| package_spec = reqs_map.get(package_name, package_name) | |
| try: | |
| importlib.import_module(package_name) | |
| except Exception: # (ImportError, ModuleNotFoundError): | |
| run_command( | |
| [ | |
| Path(sys.executable).as_posix(), | |
| "-m", | |
| "pip", | |
| "install", | |
| package_spec, | |
| ] | |
| ) | |
| importlib.import_module(package_name) | |
| # endregion | |
| # region GLOBAL VARIABLES | |
| # - detect mode | |
| comfy_mode = None | |
| if os.environ.get("COLAB_GPU"): | |
| comfy_mode = "colab" | |
| elif "python_embeded" in sys.executable: | |
| comfy_mode = "embeded" | |
| elif ".venv" in sys.executable: | |
| comfy_mode = "venv" | |
| # - Get the absolute path of the parent directory of the current script | |
| here = Path(__file__).parent.absolute() | |
| # - Construct the absolute path to the ComfyUI directory | |
| comfy_dir = Path(folder_paths.base_path) | |
| models_dir = Path(folder_paths.models_dir) | |
| output_dir = Path(folder_paths.output_directory) | |
| input_dir = Path(folder_paths.input_directory) | |
| styles_dir = comfy_dir / "styles" | |
| session_id = str(uuid.uuid4()) | |
| # - Construct the path to the font file | |
| font_path = here / "data" / "font.ttf" | |
| # - Add extern folder to path | |
| extern_root = here / "extern" | |
| add_path(extern_root) | |
| for pth in extern_root.iterdir(): | |
| if pth.is_dir(): | |
| add_path(pth) | |
| # - Add the ComfyUI directory and custom nodes path to the sys.path list | |
| add_path(comfy_dir) | |
| add_path(comfy_dir / "custom_nodes") | |
| # TODO: use the requirements library | |
| reqs_map = {value: key for key, value in pip_map.items()} | |
| # NOTE: store already logged warnings to only alert once. | |
| warned_messages: set[str] = set() | |
| PIL_FILTER_MAP = { | |
| "nearest": Image.Resampling.NEAREST, | |
| "box": Image.Resampling.BOX, | |
| "bilinear": Image.Resampling.BILINEAR, | |
| "hamming": Image.Resampling.HAMMING, | |
| "bicubic": Image.Resampling.BICUBIC, | |
| "lanczos": Image.Resampling.LANCZOS, | |
| } | |
| # endregion | |
| # region TENSOR Utilities | |
| def to_numpy(image: torch.Tensor) -> npt.NDArray[np.uint8]: | |
| """Converts a tensor to a ndarray with proper scaling and type conversion.""" | |
| log.debug(f"Converting tensor to numpy array with shape {image.shape}") | |
| np_array = np.clip(255.0 * image.cpu().numpy(), 0, 255).astype(np.uint8) | |
| log.debug(f"Numpy array shape after conversion: {np_array.shape}") | |
| return np_array | |
| def handle_batch( | |
| tensor: torch.Tensor, | |
| func: Callable[[torch.Tensor], Image.Image | npt.NDArray[np.uint8]], | |
| ) -> list[Image.Image] | list[npt.NDArray[np.uint8]]: | |
| """Handles batch processing for a given tensor and conversion function.""" | |
| return [func(tensor[i]) for i in range(tensor.shape[0])] | |
| def tensor2pil(tensor: torch.Tensor) -> list[Image.Image]: | |
| """Converts a batch of tensors to a list of PIL Images.""" | |
| def single_tensor2pil(t: torch.Tensor) -> Image.Image: | |
| np_array = to_numpy(t) | |
| if np_array.ndim == 2: # (H, W) for masks | |
| return Image.fromarray(np_array, mode="L") | |
| elif np_array.ndim == 3: # (H, W, C) for RGB/RGBA | |
| if np_array.shape[2] == 3: | |
| return Image.fromarray(np_array, mode="RGB") | |
| elif np_array.shape[2] == 4: | |
| return Image.fromarray(np_array, mode="RGBA") | |
| raise ValueError(f"Invalid tensor shape: {t.shape}") | |
| return handle_batch(tensor, single_tensor2pil) | |
| def pil2tensor(images: Image.Image | list[Image.Image]) -> torch.Tensor: | |
| """Converts a PIL Image or a list of PIL Images to a tensor.""" | |
| def single_pil2tensor(image: Image.Image) -> torch.Tensor: | |
| np_image = np.array(image).astype(np.float32) / 255.0 | |
| if np_image.ndim == 2: # Grayscale | |
| return torch.from_numpy(np_image).unsqueeze(0) # (1, H, W) | |
| else: # RGB or RGBA | |
| return torch.from_numpy(np_image).unsqueeze(0) # (1, H, W, C) | |
| if isinstance(images, Image.Image): | |
| return single_pil2tensor(images) | |
| else: | |
| return torch.cat([single_pil2tensor(img) for img in images], dim=0) | |
| def np2tensor( | |
| np_array: npt.NDArray[np.float32] | Sequence[npt.NDArray[np.float32]], | |
| ) -> torch.Tensor: | |
| """Converts a NumPy array or a list of NumPy arrays to a tensor.""" | |
| def single_np2tensor(array: npt.NDArray[np.float32]) -> torch.Tensor: | |
| if array.ndim == 2: # (H, W) for masks | |
| return torch.from_numpy( | |
| array.astype(np.float32) / 255.0 | |
| ).unsqueeze(0) # (1, H, W) | |
| elif array.ndim == 3: # (H, W, C) for RGB/RGBA | |
| return torch.from_numpy( | |
| array.astype(np.float32) / 255.0 | |
| ).unsqueeze(0) # (1, H, W, C) | |
| raise ValueError(f"Invalid array shape: {array.shape}") | |
| if isinstance(np_array, np.ndarray): | |
| return single_np2tensor(np_array) | |
| else: | |
| return torch.cat([single_np2tensor(arr) for arr in np_array], dim=0) | |
| def tensor2np(tensor: torch.Tensor) -> list[npt.NDArray[np.uint8]]: | |
| """Converts a batch of tensors to a list of NumPy arrays.""" | |
| def single_tensor2np(t: torch.Tensor) -> npt.NDArray[np.uint8]: | |
| t = t.squeeze() # Remove any singleton dimensions | |
| if t.ndim == 2: # (H, W) for masks | |
| return to_numpy(t) | |
| elif t.ndim == 3: # (C, H, W) for RGB/RGBA | |
| if t.shape[0] in [1, 3, 4]: # Channel-first format | |
| t = t.permute(1, 2, 0) | |
| return to_numpy(t) | |
| else: | |
| raise ValueError(f"Invalid tensor shape: {t.shape}") | |
| return handle_batch(tensor, single_tensor2np) | |
| def pad(img, left, right, top, bottom): | |
| pad_width = np.array(((0, 0), (top, bottom), (left, right))) | |
| print( | |
| f"pad_width: {pad_width}, shape: {pad_width.shape}" | |
| ) # Debugging line | |
| return np.pad(img, pad_width, mode="wrap") | |
| def tiles_infer(tiles, ort_session, progress_callback=None): | |
| """Infer each tile with the given model. progress_callback will be called with | |
| arguments : current tile idx and total tiles amount (used to show progress on | |
| cursor in Blender). | |
| """ | |
| out_channels = 3 # normal map RGB channels | |
| tiles_nb = tiles.shape[0] | |
| pred_tiles = np.empty( | |
| (tiles_nb, out_channels, tiles.shape[2], tiles.shape[3]) | |
| ) | |
| for i in range(tiles_nb): | |
| if progress_callback != None: | |
| progress_callback(i + 1, tiles_nb) | |
| pred_tiles[i] = ort_session.run( | |
| None, {"input": tiles[i : i + 1].astype(np.float32)} | |
| )[0] | |
| return pred_tiles | |
| def generate_mask(tile_size, stride_size): | |
| """Generates a pyramidal-like mask. Used for mixing overlapping predicted tiles.""" | |
| tile_h, tile_w = tile_size | |
| stride_h, stride_w = stride_size | |
| ramp_h = tile_h - stride_h | |
| ramp_w = tile_w - stride_w | |
| mask = np.ones((tile_h, tile_w)) | |
| # ramps in width direction | |
| mask[ramp_h:-ramp_h, :ramp_w] = np.linspace(0, 1, num=ramp_w) | |
| mask[ramp_h:-ramp_h, -ramp_w:] = np.linspace(1, 0, num=ramp_w) | |
| # ramps in height direction | |
| mask[:ramp_h, ramp_w:-ramp_w] = np.transpose( | |
| np.linspace(0, 1, num=ramp_h)[None], (1, 0) | |
| ) | |
| mask[-ramp_h:, ramp_w:-ramp_w] = np.transpose( | |
| np.linspace(1, 0, num=ramp_h)[None], (1, 0) | |
| ) | |
| # Assume tiles are squared | |
| assert ramp_h == ramp_w | |
| # top left corner | |
| corner = np.rot90(corner_mask(ramp_h), 2) | |
| mask[:ramp_h, :ramp_w] = corner | |
| # top right corner | |
| corner = np.flip(corner, 1) | |
| mask[:ramp_h, -ramp_w:] = corner | |
| # bottom right corner | |
| corner = np.flip(corner, 0) | |
| mask[-ramp_h:, -ramp_w:] = corner | |
| # bottom right corner | |
| corner = np.flip(corner, 1) | |
| mask[-ramp_h:, :ramp_w] = corner | |
| return mask | |
| def corner_mask(side_length): | |
| """Generates the corner part of the pyramidal-like mask. | |
| Currently, only for square shapes. | |
| """ | |
| corner = np.zeros([side_length, side_length]) | |
| for h in range(0, side_length): | |
| for w in range(0, side_length): | |
| if h >= w: | |
| sh = h / (side_length - 1) | |
| corner[h, w] = 1 - sh | |
| if h <= w: | |
| sw = w / (side_length - 1) | |
| corner[h, w] = 1 - sw | |
| return corner - 0.25 * scaling_mask(side_length) | |
| def scaling_mask(side_length): | |
| scaling = np.zeros([side_length, side_length]) | |
| for h in range(0, side_length): | |
| for w in range(0, side_length): | |
| sh = h / (side_length - 1) | |
| sw = w / (side_length - 1) | |
| if h >= w and h <= side_length - w: | |
| scaling[h, w] = sw | |
| if h <= w and h <= side_length - w: | |
| scaling[h, w] = sh | |
| if h >= w and h >= side_length - w: | |
| scaling[h, w] = 1 - sh | |
| if h <= w and h >= side_length - w: | |
| scaling[h, w] = 1 - sw | |
| return 2 * scaling | |
| def tiles_merge(tiles, stride_size, img_size, paddings): | |
| """Merges the list of tiles into one image. img_size is the original size, before | |
| padding. | |
| """ | |
| _, tile_h, tile_w = tiles[0].shape | |
| pad_left, pad_right, pad_top, pad_bottom = paddings | |
| height = img_size[1] + pad_top + pad_bottom | |
| width = img_size[2] + pad_left + pad_right | |
| stride_h, stride_w = stride_size | |
| # stride must be even | |
| assert (stride_h % 2 == 0) and (stride_w % 2 == 0) | |
| # stride must be greater or equal than half tile | |
| assert (stride_h >= tile_h / 2) and (stride_w >= tile_w / 2) | |
| # stride must be smaller or equal tile size | |
| assert (stride_h <= tile_h) and (stride_w <= tile_w) | |
| merged = np.zeros((img_size[0], height, width)) | |
| mask = generate_mask((tile_h, tile_w), stride_size) | |
| h_range = ((height - tile_h) // stride_h) + 1 | |
| w_range = ((width - tile_w) // stride_w) + 1 | |
| idx = 0 | |
| for h in range(0, h_range): | |
| for w in range(0, w_range): | |
| h_from, h_to = h * stride_h, h * stride_h + tile_h | |
| w_from, w_to = w * stride_w, w * stride_w + tile_w | |
| merged[:, h_from:h_to, w_from:w_to] += tiles[idx] * mask | |
| idx += 1 | |
| return merged[:, pad_top:-pad_bottom, pad_left:-pad_right] | |
| def tiles_split(img, tile_size, stride_size): | |
| """Returns list of tiles from the given image and the padding used to fit the tiles | |
| in it. Input image must have dimension C,H,W. | |
| """ | |
| log.debug(f"Splitting img: tile {tile_size}, stride {stride_size} ") | |
| tile_h, tile_w = tile_size | |
| stride_h, stride_w = stride_size | |
| img_h, img_w = img.shape[0], img.shape[1] | |
| # stride must be even | |
| assert (stride_h % 2 == 0) and (stride_w % 2 == 0) | |
| # stride must be greater or equal than half tile | |
| assert (stride_h >= tile_h / 2) and (stride_w >= tile_w / 2) | |
| # stride must be smaller or equal tile size | |
| assert (stride_h <= tile_h) and (stride_w <= tile_w) | |
| # find total height & width padding sizes | |
| pad_h, pad_w = 0, 0 | |
| remainer_h = (img_h - tile_h) % stride_h | |
| remainer_w = (img_w - tile_w) % stride_w | |
| if remainer_h != 0: | |
| pad_h = stride_h - remainer_h | |
| if remainer_w != 0: | |
| pad_w = stride_w - remainer_w | |
| # if tile bigger than image, pad image to tile size | |
| if tile_h > img_h: | |
| pad_h = tile_h - img_h | |
| if tile_w > img_w: | |
| pad_w = tile_w - img_w | |
| # pad image, add extra stride to padding to avoid pyramid | |
| # weighting leaking onto the valid part of the picture | |
| pad_left = pad_w // 2 + stride_w | |
| pad_right = pad_left if pad_w % 2 == 0 else pad_left + 1 | |
| pad_top = pad_h // 2 + stride_h | |
| pad_bottom = pad_top if pad_h % 2 == 0 else pad_top + 1 | |
| img = pad(img, pad_left, pad_right, pad_top, pad_bottom) | |
| img_h, img_w = img.shape[1], img.shape[2] | |
| # extract tiles | |
| h_range = ((img_h - tile_h) // stride_h) + 1 | |
| w_range = ((img_w - tile_w) // stride_w) + 1 | |
| tiles = np.empty([h_range * w_range, img.shape[0], tile_h, tile_w]) | |
| idx = 0 | |
| for h in range(0, h_range): | |
| for w in range(0, w_range): | |
| h_from, h_to = h * stride_h, h * stride_h + tile_h | |
| w_from, w_to = w * stride_w, w * stride_w + tile_w | |
| tiles[idx] = img[:, h_from:h_to, w_from:w_to] | |
| idx += 1 | |
| return tiles, (pad_left, pad_right, pad_top, pad_bottom) | |
| # endregion | |
| # region MODEL Utilities | |
| def download_antelopev2(): | |
| antelopev2_url = ( | |
| "https://drive.google.com/uc?id=18wEUfMNohBJ4K3Ly5wpTejPfDzp-8fI8" | |
| ) | |
| try: | |
| import gdown | |
| log.debug("Loading antelopev2 model") | |
| dest = get_model_path("insightface") | |
| archive = dest / "antelopev2.zip" | |
| final_path = dest / "models" / "antelopev2" | |
| if not final_path.exists(): | |
| log.info(f"antelopev2 not found, downloading to {dest}") | |
| gdown.download( | |
| antelopev2_url, | |
| archive.as_posix(), | |
| resume=True, | |
| ) | |
| log.info(f"Unzipping antelopev2 to {final_path}") | |
| if archive.exists(): | |
| # we unzip it | |
| import zipfile | |
| with zipfile.ZipFile(archive.as_posix(), "r") as zip_ref: | |
| zip_ref.extractall(final_path.parent.as_posix()) | |
| except Exception as e: | |
| log.error( | |
| f"Could not load or download antelopev2 model, download it manually from {antelopev2_url}" | |
| ) | |
| raise e | |
| def get_model_path(fam, model=None): | |
| log.debug(f"Requesting {fam} with model {model}") | |
| res = None | |
| if model: | |
| res = folder_paths.get_full_path(fam, model) | |
| else: | |
| # this one can raise errors... | |
| with contextlib.suppress(KeyError): | |
| res = folder_paths.get_folder_paths(fam) | |
| if res: | |
| if isinstance(res, list): | |
| if len(res) > 1: | |
| warn_msg = f"Found multiple match, we will pick the last {res[-1]}\n{res}" | |
| if warn_msg not in warned_messages: | |
| log.info(warn_msg) | |
| warned_messages.add(warn_msg) | |
| res = res[-1] | |
| res = Path(res) | |
| log.debug(f"Resolved model path from folder_paths: {res}") | |
| else: | |
| res = models_dir / fam | |
| if model: | |
| res /= model | |
| return res | |
| # endregion | |
| # region UV Utilities | |
| def create_uv_map_tensor(width=512, height=512): | |
| u = torch.linspace(0.0, 1.0, steps=width) | |
| v = torch.linspace(0.0, 1.0, steps=height) | |
| U, V = torch.meshgrid(u, v) | |
| uv_map = torch.zeros(height, width, 3, dtype=torch.float32) | |
| uv_map[:, :, 0] = U.t() | |
| uv_map[:, :, 1] = V.t() | |
| return uv_map.unsqueeze(0) | |
| # endregion | |
| # region ANIMATION Utilities | |
| EASINGS = [ | |
| "Linear", | |
| "Sine In", | |
| "Sine Out", | |
| "Sine In/Out", | |
| "Quart In", | |
| "Quart Out", | |
| "Quart In/Out", | |
| "Cubic In", | |
| "Cubic Out", | |
| "Cubic In/Out", | |
| "Circ In", | |
| "Circ Out", | |
| "Circ In/Out", | |
| "Back In", | |
| "Back Out", | |
| "Back In/Out", | |
| "Elastic In", | |
| "Elastic Out", | |
| "Elastic In/Out", | |
| "Bounce In", | |
| "Bounce Out", | |
| "Bounce In/Out", | |
| ] | |
| def apply_easing(value, easing_type): | |
| if easing_type == "Linear": | |
| return value | |
| # Back easing functions | |
| def easeInBack(t): | |
| s = 1.70158 | |
| return t * t * ((s + 1) * t - s) | |
| def easeOutBack(t): | |
| s = 1.70158 | |
| return ((t - 1) * t * ((s + 1) * t + s)) + 1 | |
| def easeInOutBack(t): | |
| s = 1.70158 * 1.525 | |
| if t < 0.5: | |
| return (t * t * (t * (s + 1) - s)) * 2 | |
| return ((t - 2) * t * ((s + 1) * t + s) + 2) * 2 | |
| # Elastic easing functions | |
| def easeInElastic(t): | |
| if t == 0: | |
| return 0 | |
| if t == 1: | |
| return 1 | |
| p = 0.3 | |
| s = p / 4 | |
| return -( | |
| math.pow(2, 10 * (t - 1)) | |
| * math.sin((t - 1 - s) * (2 * math.pi) / p) | |
| ) | |
| def easeOutElastic(t): | |
| if t == 0: | |
| return 0 | |
| if t == 1: | |
| return 1 | |
| p = 0.3 | |
| s = p / 4 | |
| return math.pow(2, -10 * t) * math.sin((t - s) * (2 * math.pi) / p) + 1 | |
| def easeInOutElastic(t): | |
| if t == 0: | |
| return 0 | |
| if t == 1: | |
| return 1 | |
| p = 0.3 * 1.5 | |
| s = p / 4 | |
| t = t * 2 | |
| if t < 1: | |
| return -0.5 * ( | |
| math.pow(2, 10 * (t - 1)) | |
| * math.sin((t - 1 - s) * (2 * math.pi) / p) | |
| ) | |
| return ( | |
| 0.5 | |
| * math.pow(2, -10 * (t - 1)) | |
| * math.sin((t - 1 - s) * (2 * math.pi) / p) | |
| + 1 | |
| ) | |
| # Bounce easing functions | |
| def easeInBounce(t): | |
| return 1 - easeOutBounce(1 - t) | |
| def easeOutBounce(t): | |
| if t < (1 / 2.75): | |
| return 7.5625 * t * t | |
| elif t < (2 / 2.75): | |
| t -= 1.5 / 2.75 | |
| return 7.5625 * t * t + 0.75 | |
| elif t < (2.5 / 2.75): | |
| t -= 2.25 / 2.75 | |
| return 7.5625 * t * t + 0.9375 | |
| else: | |
| t -= 2.625 / 2.75 | |
| return 7.5625 * t * t + 0.984375 | |
| def easeInOutBounce(t): | |
| if t < 0.5: | |
| return easeInBounce(t * 2) * 0.5 | |
| return easeOutBounce(t * 2 - 1) * 0.5 + 0.5 | |
| # Quart easing functions | |
| def easeInQuart(t): | |
| return t * t * t * t | |
| def easeOutQuart(t): | |
| t -= 1 | |
| return -(t**2 * t * t - 1) | |
| def easeInOutQuart(t): | |
| t *= 2 | |
| if t < 1: | |
| return 0.5 * t * t * t * t | |
| t -= 2 | |
| return -0.5 * (t**2 * t * t - 2) | |
| # Cubic easing functions | |
| def easeInCubic(t): | |
| return t * t * t | |
| def easeOutCubic(t): | |
| t -= 1 | |
| return t**2 * t + 1 | |
| def easeInOutCubic(t): | |
| t *= 2 | |
| if t < 1: | |
| return 0.5 * t * t * t | |
| t -= 2 | |
| return 0.5 * (t**2 * t + 2) | |
| # Circ easing functions | |
| def easeInCirc(t): | |
| return -(math.sqrt(1 - t * t) - 1) | |
| def easeOutCirc(t): | |
| t -= 1 | |
| return math.sqrt(1 - t**2) | |
| def easeInOutCirc(t): | |
| t *= 2 | |
| if t < 1: | |
| return -0.5 * (math.sqrt(1 - t**2) - 1) | |
| t -= 2 | |
| return 0.5 * (math.sqrt(1 - t**2) + 1) | |
| # Sine easing functions | |
| def easeInSine(t): | |
| return -math.cos(t * (math.pi / 2)) + 1 | |
| def easeOutSine(t): | |
| return math.sin(t * (math.pi / 2)) | |
| def easeInOutSine(t): | |
| return -0.5 * (math.cos(math.pi * t) - 1) | |
| easing_functions = { | |
| "Sine In": easeInSine, | |
| "Sine Out": easeOutSine, | |
| "Sine In/Out": easeInOutSine, | |
| "Quart In": easeInQuart, | |
| "Quart Out": easeOutQuart, | |
| "Quart In/Out": easeInOutQuart, | |
| "Cubic In": easeInCubic, | |
| "Cubic Out": easeOutCubic, | |
| "Cubic In/Out": easeInOutCubic, | |
| "Circ In": easeInCirc, | |
| "Circ Out": easeOutCirc, | |
| "Circ In/Out": easeInOutCirc, | |
| "Back In": easeInBack, | |
| "Back Out": easeOutBack, | |
| "Back In/Out": easeInOutBack, | |
| "Elastic In": easeInElastic, | |
| "Elastic Out": easeOutElastic, | |
| "Elastic In/Out": easeInOutElastic, | |
| "Bounce In": easeInBounce, | |
| "Bounce Out": easeOutBounce, | |
| "Bounce In/Out": easeInOutBounce, | |
| } | |
| function_ease = easing_functions.get(easing_type) | |
| if function_ease: | |
| return function_ease(value) | |
| log.error(f"Unknown easing type: {easing_type}") | |
| log.error(f"Available easing types: {list(easing_functions.keys())}") | |
| raise ValueError(f"Unknown easing type: {easing_type}") | |
| # endregion | |