Spaces:
Runtime error
Runtime error
| # Copyright (c) 2025 NVIDIA CORPORATION. | |
| # Licensed under the MIT license. | |
| # Adapted from https://github.com/NVlabs/VILA/tree/main under the Apache 2.0 license. | |
| # LICENSE is in incl_licenses directory. | |
| # Copyright 2024 NVIDIA CORPORATION & AFFILIATES | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # | |
| # SPDX-License-Identifier: Apache-2.0 | |
| import transformers | |
| from transformers.image_transforms import ( | |
| ChannelDimension, | |
| Iterable, | |
| Optional, | |
| Union, | |
| get_channel_dimension_axis, | |
| infer_channel_dimension_format, | |
| np, | |
| to_channel_dimension_format, | |
| ) | |
| def patched_normalize( | |
| image: np.ndarray, | |
| mean: Union[float, Iterable[float]], | |
| std: Union[float, Iterable[float]], | |
| data_format: Optional[ChannelDimension] = None, | |
| input_data_format: Optional[Union[str, ChannelDimension]] = None, | |
| ) -> np.ndarray: | |
| """ | |
| Normalizes `image` using the mean and standard deviation specified by `mean` and `std`. | |
| image = (image - mean) / std | |
| Args: | |
| image (`np.ndarray`): | |
| The image to normalize. | |
| mean (`float` or `Iterable[float]`): | |
| The mean to use for normalization. | |
| std (`float` or `Iterable[float]`): | |
| The standard deviation to use for normalization. | |
| data_format (`ChannelDimension`, *optional*): | |
| The channel dimension format of the output image. If unset, will use the inferred format from the input. | |
| """ | |
| if not isinstance(image, np.ndarray): | |
| raise ValueError("image must be a numpy array") | |
| input_data_format = infer_channel_dimension_format(image) | |
| channel_axis = get_channel_dimension_axis(image) | |
| num_channels = image.shape[channel_axis] | |
| if isinstance(mean, Iterable): | |
| if len(mean) != num_channels: | |
| if num_channels == 1: | |
| num_channels = 3 | |
| image = np.concatenate([image, image, image], axis=channel_axis) | |
| else: | |
| raise ValueError(f"mean must have {num_channels} elements if it is an iterable, got {len(mean)}") | |
| else: | |
| mean = [mean] * num_channels | |
| mean = np.array(mean, dtype=image.dtype) | |
| if isinstance(std, Iterable): | |
| if len(std) != num_channels: | |
| raise ValueError(f"std must have {num_channels} elements if it is an iterable, got {len(std)}") | |
| else: | |
| std = [std] * num_channels | |
| std = np.array(std, dtype=image.dtype) | |
| if input_data_format == ChannelDimension.LAST: | |
| image = (image - mean) / std | |
| else: | |
| image = ((image.T - mean) / std).T | |
| image = to_channel_dimension_format(image, data_format) if data_format is not None else image | |
| return image | |
| def patch_normalize_preprocess(): | |
| transformers.image_transforms.normalize = patched_normalize | |
| import os | |
| import torch | |
| from transformers.trainer_utils import PREFIX_CHECKPOINT_DIR | |
| from transformers.utils import logging | |
| TRAINER_STATE_NAME = "trainer_state.json" | |
| logger = logging.get_logger(__name__) | |
| def _save_checkpoint(self, model, trial, metrics=None): | |
| # In all cases, including ddp/dp/deepspeed, self.model is always a reference to the model we | |
| # want to save except FullyShardedDDP. | |
| # assert unwrap_model(model) is self.model, "internal model should be a reference to self.model" | |
| # Save model checkpoint | |
| checkpoint_folder = f"{PREFIX_CHECKPOINT_DIR}-{self.state.global_step}" | |
| if self.hp_search_backend is None and trial is None: | |
| self.store_flos() | |
| run_dir = self._get_output_dir(trial=trial) | |
| output_dir = os.path.join(run_dir, checkpoint_folder) | |
| if os.path.exists(output_dir) and len(os.listdir(output_dir)) > 0: | |
| logger.warning( | |
| f"Checkpoint destination directory {output_dir} already exists and is non-empty." | |
| "Saving will proceed but saved results may be invalid." | |
| ) | |
| staging_output_dir = output_dir | |
| else: | |
| staging_output_dir = os.path.join(run_dir, f"tmp-{checkpoint_folder}") | |
| self.save_model(staging_output_dir, _internal_call=True) | |
| if not self.args.save_only_model: | |
| # Save optimizer and scheduler | |
| self._save_optimizer_and_scheduler(staging_output_dir) | |
| # Save RNG state | |
| self._save_rng_state(staging_output_dir) | |
| # Determine the new best metric / best model checkpoint | |
| if metrics is not None and self.args.metric_for_best_model is not None: | |
| metric_to_check = self.args.metric_for_best_model | |
| if not metric_to_check.startswith("eval_"): | |
| metric_to_check = f"eval_{metric_to_check}" | |
| metric_value = metrics[metric_to_check] | |
| operator = np.greater if self.args.greater_is_better else np.less | |
| if ( | |
| self.state.best_metric is None | |
| or self.state.best_model_checkpoint is None | |
| or operator(metric_value, self.state.best_metric) | |
| ): | |
| self.state.best_metric = metric_value | |
| self.state.best_model_checkpoint = staging_output_dir | |
| # Save the Trainer state | |
| if self.args.should_save: | |
| self.state.save_to_json(os.path.join(staging_output_dir, TRAINER_STATE_NAME)) | |
| if self.args.push_to_hub: | |
| self._push_from_checkpoint(staging_output_dir) | |
| torch.distributed.barrier() | |
| if staging_output_dir != output_dir: | |
| with self.args.main_process_first( | |
| desc="Renaming model checkpoint folder to true location", local=self.args.save_on_each_node | |
| ): | |
| if os.path.exists(staging_output_dir): | |
| os.rename(staging_output_dir, output_dir) | |
| # Maybe delete some older checkpoints. | |
| if self.args.should_save: | |
| # Solely rely on numerical checkpoint id for rotation. | |
| # mtime is not reliable especially on some fuse fs in cloud environments. | |
| self._rotate_checkpoints(use_mtime=False, output_dir=run_dir) | |
| from typing import Any, Dict, Union | |
| from torch import nn | |
| from transformers.training_args import OptimizerNames | |
| from transformers.utils import ( | |
| is_sagemaker_mp_enabled, | |
| is_torch_mlu_available, | |
| is_torch_mps_available, | |
| is_torch_musa_available, | |
| is_torch_npu_available, | |
| is_torch_xpu_available, | |
| ) | |
| def training_step( | |
| self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]], num_items_in_batch=None | |
| ) -> torch.Tensor: | |
| """ | |
| Perform a training step on a batch of inputs. | |
| Subclass and override to inject custom behavior. | |
| Args: | |
| model (`nn.Module`): | |
| The model to train. | |
| inputs (`Dict[str, Union[torch.Tensor, Any]]`): | |
| The inputs and targets of the model. | |
| The dictionary will be unpacked before being fed to the model. Most models expect the targets under the | |
| argument `labels`. Check your model's documentation for all accepted arguments. | |
| Return: | |
| `torch.Tensor`: The tensor with training loss on this batch. | |
| """ | |
| model.train() | |
| if hasattr(self.optimizer, "train") and callable(self.optimizer.train): | |
| self.optimizer.train() | |
| inputs = self._prepare_inputs(inputs) | |
| if is_sagemaker_mp_enabled(): | |
| loss_mb = smp_forward_backward(model, inputs, self.args.gradient_accumulation_steps) | |
| return loss_mb.reduce_mean().detach().to(self.args.device) | |
| with self.compute_loss_context_manager(): | |
| loss = self.compute_loss(model, inputs, num_items_in_batch=num_items_in_batch) | |
| del inputs | |
| if ( | |
| self.args.torch_empty_cache_steps is not None | |
| and self.state.global_step % self.args.torch_empty_cache_steps == 0 | |
| ): | |
| if is_torch_xpu_available(): | |
| torch.xpu.empty_cache() | |
| elif is_torch_mlu_available(): | |
| torch.mlu.empty_cache() | |
| elif is_torch_musa_available(): | |
| torch.musa.empty_cache() | |
| elif is_torch_npu_available(): | |
| torch.npu.empty_cache() | |
| elif is_torch_mps_available(min_version="2.0"): | |
| torch.mps.empty_cache() | |
| else: | |
| torch.cuda.empty_cache() | |
| kwargs = {} | |
| # For LOMO optimizers you need to explicitly use the learnign rate | |
| if self.args.optim in [OptimizerNames.LOMO, OptimizerNames.ADALOMO]: | |
| kwargs["learning_rate"] = self._get_learning_rate() | |
| if self.args.n_gpu > 1: | |
| loss = loss.mean() # mean() to average on multi-gpu parallel training | |
| if self.use_apex: | |
| with amp.scale_loss(loss, self.optimizer) as scaled_loss: | |
| scaled_loss.backward() | |
| else: | |
| if num_items_in_batch is not None: | |
| if self.compute_loss_func or self.model_accepts_loss_kwargs: | |
| loss *= self.args.gradient_accumulation_steps | |
| # Average tokens across devices is orthogonal to gradient accumulation | |
| loss *= self.args.world_size | |
| self.accelerator.backward(loss, **kwargs) | |
| return loss.detach() / self.args.gradient_accumulation_steps | |
| def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None): | |
| """ | |
| How the loss is computed by Trainer. By default, all models return the loss in the first element. | |
| Subclass and override for custom behavior. | |
| """ | |
| if (self.label_smoother is not None or self.compute_loss_func is not None) and "labels" in inputs: | |
| labels = inputs.pop("labels") | |
| else: | |
| labels = None | |
| if num_items_in_batch is not None: | |
| num_items_in_batch_tensor = torch.tensor(num_items_in_batch, device=self.args.device) | |
| num_items_in_batch = int(self.accelerator.gather(num_items_in_batch_tensor).sum().cpu()) | |
| if self.model_accepts_loss_kwargs: | |
| loss_kwargs = {} | |
| if num_items_in_batch is not None: | |
| loss_kwargs["num_items_in_batch"] = num_items_in_batch | |
| inputs = {**inputs, **loss_kwargs} | |
| outputs = model(**inputs) | |
| # Save past state if it exists | |
| # TODO: this needs to be fixed and made cleaner later. | |
| if self.args.past_index >= 0: | |
| self._past = outputs[self.args.past_index] | |
| if labels is not None: | |
| unwrapped_model = self.accelerator.unwrap_model(model) | |
| if _is_peft_model(unwrapped_model): | |
| model_name = unwrapped_model.base_model.model._get_name() | |
| else: | |
| model_name = unwrapped_model._get_name() | |
| # User-defined compute_loss function | |
| if self.compute_loss_func is not None: | |
| loss = self.compute_loss_func(outputs, labels, num_items_in_batch=num_items_in_batch) | |
| elif model_name in MODEL_FOR_CAUSAL_LM_MAPPING_NAMES.values(): | |
| loss = self.label_smoother(outputs, labels, shift_labels=True) | |
| else: | |
| loss = self.label_smoother(outputs, labels) | |
| else: | |
| if isinstance(outputs, dict) and "loss" not in outputs: | |
| raise ValueError( | |
| "The model did not return a loss from the inputs, only the following keys: " | |
| f"{','.join(outputs.keys())}. For reference, the inputs it received are {','.join(inputs.keys())}." | |
| ) | |
| # We don't use .loss here since the model may return tuples instead of ModelOutput. | |
| loss = outputs["loss"] if isinstance(outputs, dict) else outputs[0] | |
| return (loss, outputs) if return_outputs else loss | |