Spaces:
Paused
Paused
| import torch | |
| class FlowMatchScheduler: | |
| def __init__( | |
| self, | |
| num_inference_steps=100, | |
| num_train_timesteps=1000, | |
| shift=3.0, | |
| sigma_max=1.0, | |
| sigma_min=0.003 / 1.002, | |
| inverse_timesteps=False, | |
| extra_one_step=False, | |
| reverse_sigmas=False, | |
| ): | |
| self.num_train_timesteps = num_train_timesteps | |
| self.shift = shift | |
| self.sigma_max = sigma_max | |
| self.sigma_min = sigma_min | |
| self.inverse_timesteps = inverse_timesteps | |
| self.extra_one_step = extra_one_step | |
| self.reverse_sigmas = reverse_sigmas | |
| self.set_timesteps(num_inference_steps) | |
| def set_timesteps( | |
| self, | |
| num_inference_steps=100, | |
| denoising_strength=1.0, | |
| training=False, | |
| shift=None, | |
| ): | |
| if shift is not None: | |
| self.shift = shift | |
| sigma_start = ( | |
| self.sigma_min + (self.sigma_max - self.sigma_min) * denoising_strength | |
| ) | |
| if self.extra_one_step: | |
| self.sigmas = torch.linspace( | |
| sigma_start, self.sigma_min, num_inference_steps + 1 | |
| )[:-1] | |
| else: | |
| self.sigmas = torch.linspace( | |
| sigma_start, self.sigma_min, num_inference_steps | |
| ) | |
| if self.inverse_timesteps: | |
| self.sigmas = torch.flip(self.sigmas, dims=[0]) | |
| self.sigmas = self.shift * self.sigmas / (1 + (self.shift - 1) * self.sigmas) | |
| if self.reverse_sigmas: | |
| self.sigmas = 1 - self.sigmas | |
| self.timesteps = self.sigmas * self.num_train_timesteps | |
| if training: | |
| x = self.timesteps | |
| y = torch.exp( | |
| -2 * ((x - num_inference_steps / 2) / num_inference_steps) ** 2 | |
| ) | |
| y_shifted = y - y.min() | |
| bsmntw_weighing = y_shifted * (num_inference_steps / y_shifted.sum()) | |
| self.linear_timesteps_weights = bsmntw_weighing | |
| self.training = True | |
| else: | |
| self.training = False | |
| def step(self, model_output, timestep, sample, to_final=False, **kwargs): | |
| if isinstance(timestep, torch.Tensor): | |
| timestep = timestep.cpu() | |
| timestep_id = torch.argmin((self.timesteps - timestep).abs()) | |
| sigma = self.sigmas[timestep_id] | |
| if to_final or timestep_id + 1 >= len(self.timesteps): | |
| sigma_ = 1 if (self.inverse_timesteps or self.reverse_sigmas) else 0 | |
| else: | |
| sigma_ = self.sigmas[timestep_id + 1] | |
| prev_sample = sample + model_output * (sigma_ - sigma) | |
| return prev_sample | |
| def return_to_timestep(self, timestep, sample, sample_stablized): | |
| if isinstance(timestep, torch.Tensor): | |
| timestep = timestep.cpu() | |
| timestep_id = torch.argmin((self.timesteps - timestep).abs()) | |
| sigma = self.sigmas[timestep_id] | |
| model_output = (sample - sample_stablized) / sigma | |
| return model_output | |
| def add_noise(self, original_samples, noise, timestep): | |
| if isinstance(timestep, torch.Tensor): | |
| timestep = timestep.cpu() | |
| timestep_id = torch.argmin((self.timesteps - timestep).abs()) | |
| sigma = self.sigmas[timestep_id] | |
| sample = (1 - sigma) * original_samples + sigma * noise | |
| return sample | |
| def training_target(self, sample, noise, timestep): | |
| target = noise - sample | |
| return target | |
| def training_weight(self, timestep): | |
| timestep_id = torch.argmin( | |
| (self.timesteps - timestep.to(self.timesteps.device)).abs() | |
| ) | |
| weights = self.linear_timesteps_weights[timestep_id] | |
| return weights | |