Spaces:
Running
on
Zero
Running
on
Zero
| # Source: https://github.com/iSEE-Laboratory/LLMDet/blob/main/hf_model/modeling_grounding_dino.py | |
| # Read details: https://github.com/iSEE-Laboratory/LLMDet/tree/main/hf_model | |
| # | |
| # coding=utf-8 | |
| # Copyright 2024 IDEA Research and The HuggingFace Inc. team. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """PyTorch Grounding DINO model.""" | |
| import copy | |
| import math | |
| import os | |
| import warnings | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Union | |
| import torch | |
| import torch.nn.functional as F | |
| from torch import Tensor, nn | |
| from torch.autograd import Function | |
| from torch.autograd.function import once_differentiable | |
| from transformers.activations import ACT2FN | |
| from transformers.file_utils import ( | |
| ModelOutput, | |
| add_start_docstrings, | |
| add_start_docstrings_to_model_forward, | |
| is_scipy_available, | |
| is_timm_available, | |
| is_torch_cuda_available, | |
| is_vision_available, | |
| replace_return_docstrings, | |
| requires_backends, | |
| ) | |
| from transformers.modeling_utils import PreTrainedModel | |
| from transformers.pytorch_utils import meshgrid | |
| from transformers.utils import is_accelerate_available, is_ninja_available, logging | |
| from transformers.utils.backbone_utils import load_backbone | |
| from transformers.models.auto import AutoModel | |
| from transformers.models.grounding_dino.configuration_grounding_dino import GroundingDinoConfig | |
| if is_vision_available(): | |
| from transformers.image_transforms import center_to_corners_format | |
| if is_accelerate_available(): | |
| from accelerate import PartialState | |
| from accelerate.utils import reduce | |
| if is_scipy_available(): | |
| from scipy.optimize import linear_sum_assignment | |
| if is_timm_available(): | |
| from timm import create_model | |
| logger = logging.get_logger(__name__) | |
| MultiScaleDeformableAttention = None | |
| # Copied from models.deformable_detr.load_cuda_kernels | |
| def load_cuda_kernels(): | |
| from torch.utils.cpp_extension import load | |
| global MultiScaleDeformableAttention | |
| import transformers | |
| root = Path(os.path.dirname(transformers.__file__)) / "kernels" / "deformable_detr" | |
| src_files = [ | |
| root / filename | |
| for filename in [ | |
| "vision.cpp", | |
| os.path.join("cpu", "ms_deform_attn_cpu.cpp"), | |
| os.path.join("cuda", "ms_deform_attn_cuda.cu"), | |
| ] | |
| ] | |
| MultiScaleDeformableAttention = load( | |
| "MultiScaleDeformableAttention", | |
| src_files, | |
| with_cuda=True, | |
| extra_include_paths=[str(root)], | |
| extra_cflags=["-DWITH_CUDA=1"], | |
| extra_cuda_cflags=[ | |
| "-DCUDA_HAS_FP16=1", | |
| "-D__CUDA_NO_HALF_OPERATORS__", | |
| "-D__CUDA_NO_HALF_CONVERSIONS__", | |
| "-D__CUDA_NO_HALF2_OPERATORS__", | |
| ], | |
| ) | |
| # Copied from transformers.models.deformable_detr.modeling_deformable_detr.MultiScaleDeformableAttentionFunction | |
| class MultiScaleDeformableAttentionFunction(Function): | |
| def forward( | |
| context, | |
| value, | |
| value_spatial_shapes, | |
| value_level_start_index, | |
| sampling_locations, | |
| attention_weights, | |
| im2col_step, | |
| ): | |
| context.im2col_step = im2col_step | |
| output = MultiScaleDeformableAttention.ms_deform_attn_forward( | |
| value, | |
| value_spatial_shapes, | |
| value_level_start_index, | |
| sampling_locations, | |
| attention_weights, | |
| context.im2col_step, | |
| ) | |
| context.save_for_backward( | |
| value, value_spatial_shapes, value_level_start_index, sampling_locations, attention_weights | |
| ) | |
| return output | |
| def backward(context, grad_output): | |
| ( | |
| value, | |
| value_spatial_shapes, | |
| value_level_start_index, | |
| sampling_locations, | |
| attention_weights, | |
| ) = context.saved_tensors | |
| grad_value, grad_sampling_loc, grad_attn_weight = MultiScaleDeformableAttention.ms_deform_attn_backward( | |
| value, | |
| value_spatial_shapes, | |
| value_level_start_index, | |
| sampling_locations, | |
| attention_weights, | |
| grad_output, | |
| context.im2col_step, | |
| ) | |
| return grad_value, None, None, grad_sampling_loc, grad_attn_weight, None | |
| logger = logging.get_logger(__name__) | |
| _CONFIG_FOR_DOC = "GroundingDinoConfig" | |
| _CHECKPOINT_FOR_DOC = "IDEA-Research/grounding-dino-tiny" | |
| class GroundingDinoDecoderOutput(ModelOutput): | |
| """ | |
| Base class for outputs of the GroundingDinoDecoder. This class adds two attributes to | |
| BaseModelOutputWithCrossAttentions, namely: | |
| - a stacked tensor of intermediate decoder hidden states (i.e. the output of each decoder layer) | |
| - a stacked tensor of intermediate reference points. | |
| Args: | |
| last_hidden_state (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): | |
| Sequence of hidden-states at the output of the last layer of the model. | |
| intermediate_hidden_states (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, hidden_size)`): | |
| Stacked intermediate hidden states (output of each layer of the decoder). | |
| intermediate_reference_points (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, sequence_length, hidden_size)`): | |
| Stacked intermediate reference points (reference points of each layer of the decoder). | |
| hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`): | |
| Tuple of `torch.FloatTensor` (one for the output of the embeddings + one for the output of each layer) of | |
| shape `(batch_size, sequence_length, hidden_size)`. Hidden-states of the model at the output of each layer | |
| plus the initial embedding outputs. | |
| attentions (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`): | |
| Tuple of tuples of `torch.FloatTensor` (one for attention for each layer) of shape `(batch_size, num_heads, | |
| sequence_length, sequence_length)`. Attentions weights after the attention softmax, used to compute the | |
| weighted average in the self-attention, cross-attention and multi-scale deformable attention heads. | |
| """ | |
| last_hidden_state: torch.FloatTensor = None | |
| intermediate_hidden_states: torch.FloatTensor = None | |
| intermediate_reference_points: torch.FloatTensor = None | |
| hidden_states: Optional[Tuple[torch.FloatTensor]] = None | |
| attentions: Optional[Tuple[Tuple[torch.FloatTensor]]] = None | |
| class GroundingDinoEncoderOutput(ModelOutput): | |
| """ | |
| Base class for outputs of the GroundingDinoEncoder. This class extends BaseModelOutput, due to: | |
| - vision and text last hidden states | |
| - vision and text intermediate hidden states | |
| Args: | |
| last_hidden_state_vision (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): | |
| Sequence of hidden-states at the output of the last layer of the vision encoder. | |
| last_hidden_state_text (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): | |
| Sequence of hidden-states at the output of the last layer of the text encoder. | |
| vision_hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`): | |
| Tuple of `torch.FloatTensor` (one for the output of the vision embeddings + one for the output of each | |
| layer) of shape `(batch_size, sequence_length, hidden_size)`. Hidden-states of the vision encoder at the | |
| output of each layer plus the initial embedding outputs. | |
| text_hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`): | |
| Tuple of `torch.FloatTensor` (one for the output of the text embeddings + one for the output of each layer) | |
| of shape `(batch_size, sequence_length, hidden_size)`. Hidden-states of the text encoder at the output of | |
| each layer plus the initial embedding outputs. | |
| attentions (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`): | |
| Tuple of tuples of `torch.FloatTensor` (one for attention for each layer) of shape `(batch_size, num_heads, | |
| sequence_length, sequence_length)`. Attentions weights after the attention softmax, used to compute the | |
| weighted average in the text-vision attention, vision-text attention, text-enhancer (self-attention) and | |
| multi-scale deformable attention heads. | |
| """ | |
| last_hidden_state_vision: torch.FloatTensor = None | |
| last_hidden_state_text: torch.FloatTensor = None | |
| vision_hidden_states: Optional[Tuple[torch.FloatTensor]] = None | |
| text_hidden_states: Optional[Tuple[torch.FloatTensor]] = None | |
| attentions: Optional[Tuple[Tuple[torch.FloatTensor]]] = None | |
| class GroundingDinoModelOutput(ModelOutput): | |
| """ | |
| Base class for outputs of the Grounding DINO encoder-decoder model. | |
| Args: | |
| last_hidden_state (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`): | |
| Sequence of hidden-states at the output of the last layer of the decoder of the model. | |
| init_reference_points (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)`): | |
| Initial reference points sent through the Transformer decoder. | |
| intermediate_hidden_states (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, hidden_size)`): | |
| Stacked intermediate hidden states (output of each layer of the decoder). | |
| intermediate_reference_points (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): | |
| Stacked intermediate reference points (reference points of each layer of the decoder). | |
| decoder_hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`): | |
| Tuple of `torch.FloatTensor` (one for the output of the embeddings + one for the output of each layer) of | |
| shape `(batch_size, num_queries, hidden_size)`. Hidden-states of the decoder at the output of each layer | |
| plus the initial embedding outputs. | |
| decoder_attentions (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`): | |
| Tuple of tuples of `torch.FloatTensor` (one for attention for each layer) of shape `(batch_size, num_heads, | |
| sequence_length, sequence_length)`. Attentions weights after the attention softmax, used to compute the | |
| weighted average in the self-attention, cross-attention and multi-scale deformable attention heads. | |
| encoder_last_hidden_state_vision (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): | |
| Sequence of hidden-states at the output of the last layer of the encoder of the model. | |
| encoder_last_hidden_state_text (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): | |
| Sequence of hidden-states at the output of the last layer of the encoder of the model. | |
| encoder_vision_hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`): | |
| Tuple of `torch.FloatTensor` (one for the output of the vision embeddings + one for the output of each | |
| layer) of shape `(batch_size, sequence_length, hidden_size)`. Hidden-states of the vision encoder at the | |
| output of each layer plus the initial embedding outputs. | |
| encoder_text_hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`): | |
| Tuple of `torch.FloatTensor` (one for the output of the text embeddings + one for the output of each layer) | |
| of shape `(batch_size, sequence_length, hidden_size)`. Hidden-states of the text encoder at the output of | |
| each layer plus the initial embedding outputs. | |
| encoder_attentions (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`): | |
| Tuple of tuples of `torch.FloatTensor` (one for attention for each layer) of shape `(batch_size, num_heads, | |
| sequence_length, sequence_length)`. Attentions weights after the attention softmax, used to compute the | |
| weighted average in the text-vision attention, vision-text attention, text-enhancer (self-attention) and | |
| multi-scale deformable attention heads. attention softmax, used to compute the weighted average in the | |
| bi-attention heads. | |
| enc_outputs_class (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.num_labels)`, *optional*, returned when `config.two_stage=True`): | |
| Predicted bounding boxes scores where the top `config.num_queries` scoring bounding boxes are picked as | |
| region proposals in the first stage. Output of bounding box binary classification (i.e. foreground and | |
| background). | |
| enc_outputs_coord_logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, 4)`, *optional*, returned when `config.two_stage=True`): | |
| Logits of predicted bounding boxes coordinates in the first stage. | |
| """ | |
| last_hidden_state: torch.FloatTensor = None | |
| init_reference_points: torch.FloatTensor = None | |
| intermediate_hidden_states: torch.FloatTensor = None | |
| intermediate_reference_points: torch.FloatTensor = None | |
| decoder_hidden_states: Optional[Tuple[torch.FloatTensor]] = None | |
| decoder_attentions: Optional[Tuple[Tuple[torch.FloatTensor]]] = None | |
| encoder_last_hidden_state_vision: Optional[torch.FloatTensor] = None | |
| encoder_last_hidden_state_text: Optional[torch.FloatTensor] = None | |
| encoder_vision_hidden_states: Optional[Tuple[torch.FloatTensor]] = None | |
| encoder_text_hidden_states: Optional[Tuple[torch.FloatTensor]] = None | |
| encoder_attentions: Optional[Tuple[Tuple[torch.FloatTensor]]] = None | |
| enc_outputs_class: Optional[torch.FloatTensor] = None | |
| enc_outputs_coord_logits: Optional[torch.FloatTensor] = None | |
| class GroundingDinoObjectDetectionOutput(ModelOutput): | |
| """ | |
| Output type of [`GroundingDinoForObjectDetection`]. | |
| Args: | |
| loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` are provided)): | |
| Total loss as a linear combination of a negative log-likehood (cross-entropy) for class prediction and a | |
| bounding box loss. The latter is defined as a linear combination of the L1 loss and the generalized | |
| scale-invariant IoU loss. | |
| loss_dict (`Dict`, *optional*): | |
| A dictionary containing the individual losses. Useful for logging. | |
| logits (`torch.FloatTensor` of shape `(batch_size, num_queries, num_classes + 1)`): | |
| Classification logits (including no-object) for all queries. | |
| pred_boxes (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)`): | |
| Normalized boxes coordinates for all queries, represented as (center_x, center_y, width, height). These | |
| values are normalized in [0, 1], relative to the size of each individual image in the batch (disregarding | |
| possible padding). You can use [`~GroundingDinoProcessor.post_process_object_detection`] to retrieve the | |
| unnormalized bounding boxes. | |
| auxiliary_outputs (`List[Dict]`, *optional*): | |
| Optional, only returned when auxilary losses are activated (i.e. `config.auxiliary_loss` is set to `True`) | |
| and labels are provided. It is a list of dictionaries containing the two above keys (`logits` and | |
| `pred_boxes`) for each decoder layer. | |
| last_hidden_state (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`, *optional*): | |
| Sequence of hidden-states at the output of the last layer of the decoder of the model. | |
| decoder_hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`): | |
| Tuple of `torch.FloatTensor` (one for the output of the embeddings + one for the output of each layer) of | |
| shape `(batch_size, num_queries, hidden_size)`. Hidden-states of the decoder at the output of each layer | |
| plus the initial embedding outputs. | |
| decoder_attentions (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`): | |
| Tuple of tuples of `torch.FloatTensor` (one for attention for each layer) of shape `(batch_size, num_heads, | |
| sequence_length, sequence_length)`. Attentions weights after the attention softmax, used to compute the | |
| weighted average in the self-attention, cross-attention and multi-scale deformable attention heads. | |
| encoder_last_hidden_state_vision (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): | |
| Sequence of hidden-states at the output of the last layer of the encoder of the model. | |
| encoder_last_hidden_state_text (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): | |
| Sequence of hidden-states at the output of the last layer of the encoder of the model. | |
| encoder_vision_hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`): | |
| Tuple of `torch.FloatTensor` (one for the output of the vision embeddings + one for the output of each | |
| layer) of shape `(batch_size, sequence_length, hidden_size)`. Hidden-states of the vision encoder at the | |
| output of each layer plus the initial embedding outputs. | |
| encoder_text_hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`): | |
| Tuple of `torch.FloatTensor` (one for the output of the text embeddings + one for the output of each layer) | |
| of shape `(batch_size, sequence_length, hidden_size)`. Hidden-states of the text encoder at the output of | |
| each layer plus the initial embedding outputs. | |
| encoder_attentions (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`): | |
| Tuple of tuples of `torch.FloatTensor` (one for attention for each layer) of shape `(batch_size, num_heads, | |
| sequence_length, sequence_length)`. Attentions weights after the attention softmax, used to compute the | |
| weighted average in the text-vision attention, vision-text attention, text-enhancer (self-attention) and | |
| multi-scale deformable attention heads. | |
| intermediate_hidden_states (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, hidden_size)`): | |
| Stacked intermediate hidden states (output of each layer of the decoder). | |
| intermediate_reference_points (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): | |
| Stacked intermediate reference points (reference points of each layer of the decoder). | |
| init_reference_points (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)`): | |
| Initial reference points sent through the Transformer decoder. | |
| enc_outputs_class (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.num_labels)`, *optional*, returned when `config.two_stage=True`): | |
| Predicted bounding boxes scores where the top `config.num_queries` scoring bounding boxes are picked as | |
| region proposals in the first stage. Output of bounding box binary classification (i.e. foreground and | |
| background). | |
| enc_outputs_coord_logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, 4)`, *optional*, returned when `config.two_stage=True`): | |
| Logits of predicted bounding boxes coordinates in the first stage. | |
| """ | |
| loss: Optional[torch.FloatTensor] = None | |
| loss_dict: Optional[Dict] = None | |
| logits: torch.FloatTensor = None | |
| pred_boxes: torch.FloatTensor = None | |
| auxiliary_outputs: Optional[List[Dict]] = None | |
| last_hidden_state: Optional[torch.FloatTensor] = None | |
| init_reference_points: Optional[torch.FloatTensor] = None | |
| intermediate_hidden_states: Optional[torch.FloatTensor] = None | |
| intermediate_reference_points: Optional[torch.FloatTensor] = None | |
| decoder_hidden_states: Optional[Tuple[torch.FloatTensor]] = None | |
| decoder_attentions: Optional[Tuple[Tuple[torch.FloatTensor]]] = None | |
| encoder_last_hidden_state_vision: Optional[torch.FloatTensor] = None | |
| encoder_last_hidden_state_text: Optional[torch.FloatTensor] = None | |
| encoder_vision_hidden_states: Optional[Tuple[torch.FloatTensor]] = None | |
| encoder_text_hidden_states: Optional[Tuple[torch.FloatTensor]] = None | |
| encoder_attentions: Optional[Tuple[Tuple[torch.FloatTensor]]] = None | |
| enc_outputs_class: Optional[torch.FloatTensor] = None | |
| enc_outputs_coord_logits: Optional[torch.FloatTensor] = None | |
| # Copied from transformers.models.detr.modeling_detr.DetrFrozenBatchNorm2d with Detr->GroundingDino | |
| class GroundingDinoFrozenBatchNorm2d(nn.Module): | |
| """ | |
| BatchNorm2d where the batch statistics and the affine parameters are fixed. | |
| Copy-paste from torchvision.misc.ops with added eps before rqsrt, without which any other models than | |
| torchvision.models.resnet[18,34,50,101] produce nans. | |
| """ | |
| def __init__(self, n): | |
| super().__init__() | |
| self.register_buffer("weight", torch.ones(n)) | |
| self.register_buffer("bias", torch.zeros(n)) | |
| self.register_buffer("running_mean", torch.zeros(n)) | |
| self.register_buffer("running_var", torch.ones(n)) | |
| def _load_from_state_dict( | |
| self, state_dict, prefix, local_metadata, strict, missing_keys, unexpected_keys, error_msgs | |
| ): | |
| num_batches_tracked_key = prefix + "num_batches_tracked" | |
| if num_batches_tracked_key in state_dict: | |
| del state_dict[num_batches_tracked_key] | |
| super()._load_from_state_dict( | |
| state_dict, prefix, local_metadata, strict, missing_keys, unexpected_keys, error_msgs | |
| ) | |
| def forward(self, x): | |
| # move reshapes to the beginning | |
| # to make it user-friendly | |
| weight = self.weight.reshape(1, -1, 1, 1) | |
| bias = self.bias.reshape(1, -1, 1, 1) | |
| running_var = self.running_var.reshape(1, -1, 1, 1) | |
| running_mean = self.running_mean.reshape(1, -1, 1, 1) | |
| epsilon = 1e-5 | |
| scale = weight * (running_var + epsilon).rsqrt() | |
| bias = bias - running_mean * scale | |
| return x * scale + bias | |
| # Copied from transformers.models.detr.modeling_detr.replace_batch_norm with Detr->GroundingDino | |
| def replace_batch_norm(model): | |
| r""" | |
| Recursively replace all `torch.nn.BatchNorm2d` with `GroundingDinoFrozenBatchNorm2d`. | |
| Args: | |
| model (torch.nn.Module): | |
| input model | |
| """ | |
| for name, module in model.named_children(): | |
| if isinstance(module, nn.BatchNorm2d): | |
| new_module = GroundingDinoFrozenBatchNorm2d(module.num_features) | |
| if not module.weight.device == torch.device("meta"): | |
| new_module.weight.data.copy_(module.weight) | |
| new_module.bias.data.copy_(module.bias) | |
| new_module.running_mean.data.copy_(module.running_mean) | |
| new_module.running_var.data.copy_(module.running_var) | |
| model._modules[name] = new_module | |
| if len(list(module.children())) > 0: | |
| replace_batch_norm(module) | |
| class GroundingDinoConvEncoder(nn.Module): | |
| """ | |
| Convolutional backbone, using either the AutoBackbone API or one from the timm library. | |
| nn.BatchNorm2d layers are replaced by GroundingDinoFrozenBatchNorm2d as defined above. | |
| """ | |
| def __init__(self, config): | |
| super().__init__() | |
| self.config = config | |
| if config.use_timm_backbone: | |
| requires_backends(self, ["timm"]) | |
| backbone = create_model( | |
| config.backbone, | |
| pretrained=config.use_pretrained_backbone, | |
| features_only=True, | |
| **config.backbone_kwargs, | |
| ) | |
| else: | |
| backbone = load_backbone(config) | |
| # replace batch norm by frozen batch norm | |
| with torch.no_grad(): | |
| replace_batch_norm(backbone) | |
| self.model = backbone | |
| self.intermediate_channel_sizes = ( | |
| self.model.feature_info.channels() if config.use_timm_backbone else self.model.channels | |
| ) | |
| backbone_model_type = None | |
| if config.backbone is not None: | |
| backbone_model_type = config.backbone | |
| elif config.backbone_config is not None: | |
| backbone_model_type = config.backbone_config.model_type | |
| else: | |
| raise ValueError("Either `backbone` or `backbone_config` should be provided in the config") | |
| if "resnet" in backbone_model_type: | |
| for name, parameter in self.model.named_parameters(): | |
| if config.use_timm_backbone: | |
| if "layer2" not in name and "layer3" not in name and "layer4" not in name: | |
| parameter.requires_grad_(False) | |
| else: | |
| if "stage.1" not in name and "stage.2" not in name and "stage.3" not in name: | |
| parameter.requires_grad_(False) | |
| # Copied from transformers.models.detr.modeling_detr.DetrConvEncoder.forward with Detr->GroundingDino | |
| def forward(self, pixel_values: torch.Tensor, pixel_mask: torch.Tensor): | |
| # send pixel_values through the model to get list of feature maps | |
| features = self.model(pixel_values) if self.config.use_timm_backbone else self.model(pixel_values).feature_maps | |
| out = [] | |
| for feature_map in features: | |
| # downsample pixel_mask to match shape of corresponding feature_map | |
| mask = nn.functional.interpolate(pixel_mask[None].float(), size=feature_map.shape[-2:]).to(torch.bool)[0] | |
| out.append((feature_map, mask)) | |
| return out | |
| # Copied from transformers.models.detr.modeling_detr.DetrConvModel with Detr->GroundingDino | |
| class GroundingDinoConvModel(nn.Module): | |
| """ | |
| This module adds 2D position embeddings to all intermediate feature maps of the convolutional encoder. | |
| """ | |
| def __init__(self, conv_encoder, position_embedding): | |
| super().__init__() | |
| self.conv_encoder = conv_encoder | |
| self.position_embedding = position_embedding | |
| def forward(self, pixel_values, pixel_mask): | |
| # send pixel_values and pixel_mask through backbone to get list of (feature_map, pixel_mask) tuples | |
| out = self.conv_encoder(pixel_values, pixel_mask) | |
| pos = [] | |
| for feature_map, mask in out: | |
| # position encoding | |
| pos.append(self.position_embedding(feature_map, mask).to(feature_map.dtype)) | |
| return out, pos | |
| class GroundingDinoSinePositionEmbedding(nn.Module): | |
| """ | |
| This is a more standard version of the position embedding, very similar to the one used by the Attention is all you | |
| need paper, generalized to work on images. | |
| """ | |
| def __init__(self, config): | |
| super().__init__() | |
| self.embedding_dim = config.d_model // 2 | |
| self.temperature = config.positional_embedding_temperature | |
| self.scale = 2 * math.pi | |
| def forward(self, pixel_values, pixel_mask): | |
| y_embed = pixel_mask.cumsum(1, dtype=torch.float32) | |
| x_embed = pixel_mask.cumsum(2, dtype=torch.float32) | |
| eps = 1e-6 | |
| y_embed = y_embed / (y_embed[:, -1:, :] + eps) * self.scale | |
| x_embed = x_embed / (x_embed[:, :, -1:] + eps) * self.scale | |
| dim_t = torch.arange(self.embedding_dim, dtype=torch.float32, device=pixel_values.device) | |
| dim_t = self.temperature ** (2 * torch.div(dim_t, 2, rounding_mode="floor") / self.embedding_dim) | |
| pos_x = x_embed[:, :, :, None] / dim_t | |
| pos_y = y_embed[:, :, :, None] / dim_t | |
| pos_x = torch.stack((pos_x[:, :, :, 0::2].sin(), pos_x[:, :, :, 1::2].cos()), dim=4).flatten(3) | |
| pos_y = torch.stack((pos_y[:, :, :, 0::2].sin(), pos_y[:, :, :, 1::2].cos()), dim=4).flatten(3) | |
| pos = torch.cat((pos_y, pos_x), dim=3).permute(0, 3, 1, 2) | |
| return pos | |
| class GroundingDinoLearnedPositionEmbedding(nn.Module): | |
| """ | |
| This module learns positional embeddings up to a fixed maximum size. | |
| """ | |
| def __init__(self, config): | |
| super().__init__() | |
| embedding_dim = config.d_model // 2 | |
| self.row_embeddings = nn.Embedding(50, embedding_dim) | |
| self.column_embeddings = nn.Embedding(50, embedding_dim) | |
| def forward(self, pixel_values, pixel_mask=None): | |
| height, width = pixel_values.shape[-2:] | |
| width_values = torch.arange(width, device=pixel_values.device) | |
| height_values = torch.arange(height, device=pixel_values.device) | |
| x_emb = self.column_embeddings(width_values) | |
| y_emb = self.row_embeddings(height_values) | |
| pos = torch.cat([x_emb.unsqueeze(0).repeat(height, 1, 1), y_emb.unsqueeze(1).repeat(1, width, 1)], dim=-1) | |
| pos = pos.permute(2, 0, 1) | |
| pos = pos.unsqueeze(0) | |
| pos = pos.repeat(pixel_values.shape[0], 1, 1, 1) | |
| return pos | |
| def build_position_encoding(config): | |
| if config.position_embedding_type == "sine": | |
| position_embedding = GroundingDinoSinePositionEmbedding(config) | |
| elif config.position_embedding_type == "learned": | |
| position_embedding = GroundingDinoLearnedPositionEmbedding(config) | |
| else: | |
| raise ValueError(f"Not supported {config.position_embedding_type}") | |
| return position_embedding | |
| # Copied from transformers.models.deformable_detr.modeling_deformable_detr.multi_scale_deformable_attention | |
| def multi_scale_deformable_attention( | |
| value: Tensor, value_spatial_shapes: Tensor, sampling_locations: Tensor, attention_weights: Tensor | |
| ) -> Tensor: | |
| batch_size, _, num_heads, hidden_dim = value.shape | |
| _, num_queries, num_heads, num_levels, num_points, _ = sampling_locations.shape | |
| value_list = value.split([height.item() * width.item() for height, width in value_spatial_shapes], dim=1) | |
| sampling_grids = 2 * sampling_locations - 1 | |
| sampling_value_list = [] | |
| for level_id, (height, width) in enumerate(value_spatial_shapes): | |
| # batch_size, height*width, num_heads, hidden_dim | |
| # -> batch_size, height*width, num_heads*hidden_dim | |
| # -> batch_size, num_heads*hidden_dim, height*width | |
| # -> batch_size*num_heads, hidden_dim, height, width | |
| value_l_ = ( | |
| value_list[level_id].flatten(2).transpose(1, 2).reshape(batch_size * num_heads, hidden_dim, height, width) | |
| ) | |
| # batch_size, num_queries, num_heads, num_points, 2 | |
| # -> batch_size, num_heads, num_queries, num_points, 2 | |
| # -> batch_size*num_heads, num_queries, num_points, 2 | |
| sampling_grid_l_ = sampling_grids[:, :, :, level_id].transpose(1, 2).flatten(0, 1) | |
| # batch_size*num_heads, hidden_dim, num_queries, num_points | |
| sampling_value_l_ = nn.functional.grid_sample( | |
| value_l_, sampling_grid_l_, mode="bilinear", padding_mode="zeros", align_corners=False | |
| ) | |
| sampling_value_list.append(sampling_value_l_) | |
| # (batch_size, num_queries, num_heads, num_levels, num_points) | |
| # -> (batch_size, num_heads, num_queries, num_levels, num_points) | |
| # -> (batch_size, num_heads, 1, num_queries, num_levels*num_points) | |
| attention_weights = attention_weights.transpose(1, 2).reshape( | |
| batch_size * num_heads, 1, num_queries, num_levels * num_points | |
| ) | |
| output = ( | |
| (torch.stack(sampling_value_list, dim=-2).flatten(-2) * attention_weights) | |
| .sum(-1) | |
| .view(batch_size, num_heads * hidden_dim, num_queries) | |
| ) | |
| return output.transpose(1, 2).contiguous() | |
| # Copied from transformers.models.deformable_detr.modeling_deformable_detr.DeformableDetrMultiscaleDeformableAttention with DeformableDetr->GroundingDino, Deformable DETR->Grounding DINO | |
| class GroundingDinoMultiscaleDeformableAttention(nn.Module): | |
| """ | |
| Multiscale deformable attention as proposed in Deformable DETR. | |
| """ | |
| def __init__(self, config: GroundingDinoConfig, num_heads: int, n_points: int): | |
| super().__init__() | |
| kernel_loaded = MultiScaleDeformableAttention is not None | |
| if is_torch_cuda_available() and is_ninja_available() and not kernel_loaded: | |
| try: | |
| load_cuda_kernels() | |
| except Exception as e: | |
| logger.warning(f"Could not load the custom kernel for multi-scale deformable attention: {e}") | |
| if config.d_model % num_heads != 0: | |
| raise ValueError( | |
| f"embed_dim (d_model) must be divisible by num_heads, but got {config.d_model} and {num_heads}" | |
| ) | |
| dim_per_head = config.d_model // num_heads | |
| # check if dim_per_head is power of 2 | |
| if not ((dim_per_head & (dim_per_head - 1) == 0) and dim_per_head != 0): | |
| warnings.warn( | |
| "You'd better set embed_dim (d_model) in GroundingDinoMultiscaleDeformableAttention to make the" | |
| " dimension of each attention head a power of 2 which is more efficient in the authors' CUDA" | |
| " implementation." | |
| ) | |
| self.im2col_step = 64 | |
| self.d_model = config.d_model | |
| self.n_levels = config.num_feature_levels | |
| self.n_heads = num_heads | |
| self.n_points = n_points | |
| self.sampling_offsets = nn.Linear(config.d_model, num_heads * self.n_levels * n_points * 2) | |
| self.attention_weights = nn.Linear(config.d_model, num_heads * self.n_levels * n_points) | |
| self.value_proj = nn.Linear(config.d_model, config.d_model) | |
| self.output_proj = nn.Linear(config.d_model, config.d_model) | |
| self.disable_custom_kernels = config.disable_custom_kernels | |
| self._reset_parameters() | |
| def _reset_parameters(self): | |
| nn.init.constant_(self.sampling_offsets.weight.data, 0.0) | |
| default_dtype = torch.get_default_dtype() | |
| thetas = torch.arange(self.n_heads, dtype=torch.int64).to(default_dtype) * (2.0 * math.pi / self.n_heads) | |
| grid_init = torch.stack([thetas.cos(), thetas.sin()], -1) | |
| grid_init = ( | |
| (grid_init / grid_init.abs().max(-1, keepdim=True)[0]) | |
| .view(self.n_heads, 1, 1, 2) | |
| .repeat(1, self.n_levels, self.n_points, 1) | |
| ) | |
| for i in range(self.n_points): | |
| grid_init[:, :, i, :] *= i + 1 | |
| with torch.no_grad(): | |
| self.sampling_offsets.bias = nn.Parameter(grid_init.view(-1)) | |
| nn.init.constant_(self.attention_weights.weight.data, 0.0) | |
| nn.init.constant_(self.attention_weights.bias.data, 0.0) | |
| nn.init.xavier_uniform_(self.value_proj.weight.data) | |
| nn.init.constant_(self.value_proj.bias.data, 0.0) | |
| nn.init.xavier_uniform_(self.output_proj.weight.data) | |
| nn.init.constant_(self.output_proj.bias.data, 0.0) | |
| def with_pos_embed(self, tensor: torch.Tensor, position_embeddings: Optional[Tensor]): | |
| return tensor if position_embeddings is None else tensor + position_embeddings | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| encoder_hidden_states=None, | |
| encoder_attention_mask=None, | |
| position_embeddings: Optional[torch.Tensor] = None, | |
| reference_points=None, | |
| spatial_shapes=None, | |
| level_start_index=None, | |
| output_attentions: bool = False, | |
| ): | |
| # add position embeddings to the hidden states before projecting to queries and keys | |
| if position_embeddings is not None: | |
| hidden_states = self.with_pos_embed(hidden_states, position_embeddings) | |
| batch_size, num_queries, _ = hidden_states.shape | |
| batch_size, sequence_length, _ = encoder_hidden_states.shape | |
| if (spatial_shapes[:, 0] * spatial_shapes[:, 1]).sum() != sequence_length: | |
| raise ValueError( | |
| "Make sure to align the spatial shapes with the sequence length of the encoder hidden states" | |
| ) | |
| value = self.value_proj(encoder_hidden_states) | |
| if attention_mask is not None: | |
| # we invert the attention_mask | |
| value = value.masked_fill(~attention_mask[..., None], float(0)) | |
| value = value.view(batch_size, sequence_length, self.n_heads, self.d_model // self.n_heads) | |
| sampling_offsets = self.sampling_offsets(hidden_states).view( | |
| batch_size, num_queries, self.n_heads, self.n_levels, self.n_points, 2 | |
| ) | |
| attention_weights = self.attention_weights(hidden_states).view( | |
| batch_size, num_queries, self.n_heads, self.n_levels * self.n_points | |
| ) | |
| attention_weights = F.softmax(attention_weights, -1).view( | |
| batch_size, num_queries, self.n_heads, self.n_levels, self.n_points | |
| ) | |
| # batch_size, num_queries, n_heads, n_levels, n_points, 2 | |
| num_coordinates = reference_points.shape[-1] | |
| if num_coordinates == 2: | |
| offset_normalizer = torch.stack([spatial_shapes[..., 1], spatial_shapes[..., 0]], -1) | |
| sampling_locations = ( | |
| reference_points[:, :, None, :, None, :] | |
| + sampling_offsets / offset_normalizer[None, None, None, :, None, :] | |
| ) | |
| elif num_coordinates == 4: | |
| sampling_locations = ( | |
| reference_points[:, :, None, :, None, :2] | |
| + sampling_offsets / self.n_points * reference_points[:, :, None, :, None, 2:] * 0.5 | |
| ) | |
| else: | |
| raise ValueError(f"Last dim of reference_points must be 2 or 4, but got {reference_points.shape[-1]}") | |
| if self.disable_custom_kernels: | |
| # PyTorch implementation | |
| output = multi_scale_deformable_attention(value, spatial_shapes, sampling_locations, attention_weights) | |
| else: | |
| try: | |
| # custom kernel | |
| output = MultiScaleDeformableAttentionFunction.apply( | |
| value, | |
| spatial_shapes, | |
| level_start_index, | |
| sampling_locations, | |
| attention_weights, | |
| self.im2col_step, | |
| ) | |
| except Exception: | |
| # PyTorch implementation | |
| output = multi_scale_deformable_attention(value, spatial_shapes, sampling_locations, attention_weights) | |
| output = self.output_proj(output) | |
| return output, attention_weights | |
| class GroundingDinoTextEnhancerLayer(nn.Module): | |
| """Vanilla Transformer with text embeddings as input""" | |
| def __init__(self, config): | |
| super().__init__() | |
| self.self_attn = GroundingDinoMultiheadAttention( | |
| config, num_attention_heads=config.encoder_attention_heads // 2 | |
| ) | |
| # Implementation of Feedforward model | |
| self.fc1 = nn.Linear(config.d_model, config.encoder_ffn_dim // 2) | |
| self.fc2 = nn.Linear(config.encoder_ffn_dim // 2, config.d_model) | |
| self.layer_norm_before = nn.LayerNorm(config.d_model, config.layer_norm_eps) | |
| self.layer_norm_after = nn.LayerNorm(config.d_model, config.layer_norm_eps) | |
| self.activation = ACT2FN[config.activation_function] | |
| self.num_heads = config.encoder_attention_heads // 2 | |
| self.dropout = config.text_enhancer_dropout | |
| def with_pos_embed(self, hidden_state: Tensor, position_embeddings: Optional[Tensor]): | |
| return hidden_state if position_embeddings is None else hidden_state + position_embeddings | |
| def forward( | |
| self, | |
| hidden_states: torch.FloatTensor, | |
| attention_masks: Optional[torch.BoolTensor] = None, | |
| position_embeddings: Optional[torch.FloatTensor] = None, | |
| ) -> Tuple[torch.FloatTensor, torch.FloatTensor]: | |
| """Text self-attention to enhance projection of text features generated by | |
| the text encoder (AutoModel based on text_config) within GroundingDinoEncoderLayer | |
| Args: | |
| hidden_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_dim)`): | |
| Text features generated by the text encoder. | |
| attention_masks (`torch.BoolTensor`, *optional*): | |
| Attention mask for text self-attention. False for real tokens and True for padding tokens. | |
| position_embeddings (`torch.FloatTensor`, *optional*): | |
| Position embeddings to be added to the hidden states. | |
| Returns: | |
| `tuple(torch.FloatTensor)` comprising two elements: | |
| - **hidden_states** (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`) -- | |
| Output of the text self-attention layer. | |
| - **attention_weights** (`torch.FloatTensor` of shape `(batch_size, num_heads, sequence_length, | |
| sequence_length)`) -- | |
| Attention weights of the text self-attention layer. | |
| """ | |
| # repeat attn mask | |
| if attention_masks.dim() == 3 and attention_masks.shape[0] == hidden_states.shape[0]: | |
| # batch_size, num_queries, num_keys | |
| attention_masks = attention_masks[:, None, :, :] | |
| attention_masks = attention_masks.repeat(1, self.num_heads, 1, 1) | |
| dtype = hidden_states.dtype | |
| attention_masks = attention_masks.to(dtype=dtype) # fp16 compatibility | |
| attention_masks = (1.0 - attention_masks) * torch.finfo(dtype).min | |
| queries = keys = self.with_pos_embed(hidden_states, position_embeddings) | |
| attention_output, attention_weights = self.self_attn( | |
| queries=queries, | |
| keys=keys, | |
| values=hidden_states, | |
| attention_mask=attention_masks, | |
| output_attentions=True, | |
| ) | |
| attention_output = nn.functional.dropout(attention_output, p=self.dropout, training=self.training) | |
| hidden_states = hidden_states + attention_output | |
| hidden_states = self.layer_norm_before(hidden_states) | |
| residual = hidden_states | |
| hidden_states = self.activation(self.fc1(hidden_states)) | |
| hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) | |
| hidden_states = self.fc2(hidden_states) | |
| hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) | |
| hidden_states = hidden_states + residual | |
| hidden_states = self.layer_norm_after(hidden_states) | |
| return hidden_states, attention_weights | |
| class GroundingDinoBiMultiHeadAttention(nn.Module): | |
| def __init__(self, config): | |
| super().__init__() | |
| vision_dim = text_dim = config.d_model | |
| embed_dim = config.encoder_ffn_dim // 2 | |
| num_heads = config.encoder_attention_heads // 2 | |
| dropout = config.fusion_dropout | |
| self.embed_dim = embed_dim | |
| self.num_heads = num_heads | |
| self.head_dim = embed_dim // num_heads | |
| self.vision_dim = vision_dim | |
| self.text_dim = text_dim | |
| if self.head_dim * self.num_heads != self.embed_dim: | |
| raise ValueError( | |
| f"`embed_dim` must be divisible by `num_heads` (got `embed_dim`: {self.embed_dim} and `num_heads`: {self.num_heads})." | |
| ) | |
| self.scale = self.head_dim ** (-0.5) | |
| self.dropout = dropout | |
| self.vision_proj = nn.Linear(self.vision_dim, self.embed_dim) | |
| self.text_proj = nn.Linear(self.text_dim, self.embed_dim) | |
| self.values_vision_proj = nn.Linear(self.vision_dim, self.embed_dim) | |
| self.values_text_proj = nn.Linear(self.text_dim, self.embed_dim) | |
| self.out_vision_proj = nn.Linear(self.embed_dim, self.vision_dim) | |
| self.out_text_proj = nn.Linear(self.embed_dim, self.text_dim) | |
| def _reshape(self, tensor: torch.Tensor, seq_len: int, batch_size: int): | |
| return tensor.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2).contiguous() | |
| def forward( | |
| self, | |
| vision_features: torch.FloatTensor, | |
| text_features: torch.FloatTensor, | |
| vision_attention_mask: Optional[torch.BoolTensor] = None, | |
| text_attention_mask: Optional[torch.BoolTensor] = None, | |
| ) -> Tuple[Tuple[torch.FloatTensor, torch.FloatTensor], Tuple[torch.FloatTensor, torch.FloatTensor]]: | |
| """Image-to-text and text-to-image cross-attention | |
| Args: | |
| vision_features (`torch.FloatTensor` of shape `(batch_size, vision_sequence_length, hidden_dim)`): | |
| Projected flattened image features generated by the vision backbone. | |
| text_features (`torch.FloatTensor` of shape `(batch_size, text_sequence_length, hidden_dim)`): | |
| Projected text features generated by the text encoder. | |
| vision_attention_mask (`torch.BoolTensor`, **optional**): | |
| Attention mask for image-to-text cross-attention. False for real tokens and True for padding tokens. | |
| text_attention_mask (`torch.BoolTensor`, **optional**): | |
| Attention mask for text-to-image cross-attention. False for real tokens and True for padding tokens. | |
| Returns: | |
| `tuple(tuple(torch.FloatTensor), tuple(torch.FloatTensor))` where each inner tuple comprises an attention | |
| output and weights: | |
| - **vision_attn_output** (`torch.FloatTensor` of shape `(batch_size, vision_sequence_length, hidden_din)`) | |
| -- | |
| Output of the image-to-text cross-attention layer. | |
| - **vision_attn_weights** (`torch.FloatTensor` of shape `(batch_size, num_heads, vision_sequence_length, | |
| vision_sequence_length)`) -- | |
| Attention weights of the image-to-text cross-attention layer. | |
| - **text_attn_output** (`torch.FloatTensor` of shape `(batch_size, text_sequence_length, hidden_dim)`) -- | |
| Output of the text-to-image cross-attention layer. | |
| - **text_attn_weights** (`torch.FloatTensor` of shape `(batch_size, num_heads, text_sequence_length, | |
| text_sequence_length)`) -- | |
| Attention weights of the text-to-image cross-attention layer. | |
| """ | |
| batch_size, tgt_len, _ = vision_features.size() | |
| vision_query_states = self.vision_proj(vision_features) * self.scale | |
| vision_query_states = self._reshape(vision_query_states, tgt_len, batch_size) | |
| text_key_states = self.text_proj(text_features) | |
| text_key_states = self._reshape(text_key_states, -1, batch_size) | |
| vision_value_states = self.values_vision_proj(vision_features) | |
| vision_value_states = self._reshape(vision_value_states, -1, batch_size) | |
| text_value_states = self.values_text_proj(text_features) | |
| text_value_states = self._reshape(text_value_states, -1, batch_size) | |
| proj_shape = (batch_size * self.num_heads, -1, self.head_dim) | |
| vision_query_states = vision_query_states.view(*proj_shape) | |
| text_key_states = text_key_states.view(*proj_shape) | |
| vision_value_states = vision_value_states.view(*proj_shape) | |
| text_value_states = text_value_states.view(*proj_shape) | |
| src_len = text_key_states.size(1) | |
| attn_weights = torch.bmm(vision_query_states, text_key_states.transpose(1, 2)) # bs*nhead, nimg, ntxt | |
| if attn_weights.size() != (batch_size * self.num_heads, tgt_len, src_len): | |
| raise ValueError( | |
| f"Attention weights should be of size {(batch_size * self.num_heads, tgt_len, src_len)}, but is {attn_weights.size()}" | |
| ) | |
| attn_weights = attn_weights - attn_weights.max() | |
| # Do not increase -50000/50000, data type half has quite limited range | |
| attn_weights = torch.clamp(attn_weights, min=-50000, max=50000) | |
| attn_weights_transposed = attn_weights.transpose(1, 2) | |
| text_attn_weights = attn_weights_transposed - torch.max(attn_weights_transposed, dim=-1, keepdim=True)[0] | |
| # Do not increase -50000/50000, data type half has quite limited range | |
| text_attn_weights = torch.clamp(text_attn_weights, min=-50000, max=50000) | |
| # mask vision for language | |
| if vision_attention_mask is not None: | |
| vision_attention_mask = ( | |
| vision_attention_mask[:, None, None, :].repeat(1, self.num_heads, 1, 1).flatten(0, 1) | |
| ) | |
| text_attn_weights.masked_fill_(vision_attention_mask, float("-inf")) | |
| text_attn_weights = text_attn_weights.softmax(dim=-1) | |
| # mask language for vision | |
| if text_attention_mask is not None: | |
| text_attention_mask = text_attention_mask[:, None, None, :].repeat(1, self.num_heads, 1, 1).flatten(0, 1) | |
| attn_weights.masked_fill_(text_attention_mask, float("-inf")) | |
| vision_attn_weights = attn_weights.softmax(dim=-1) | |
| vision_attn_probs = F.dropout(vision_attn_weights, p=self.dropout, training=self.training) | |
| text_attn_probs = F.dropout(text_attn_weights, p=self.dropout, training=self.training) | |
| vision_attn_output = torch.bmm(vision_attn_probs, text_value_states) | |
| text_attn_output = torch.bmm(text_attn_probs, vision_value_states) | |
| if vision_attn_output.size() != (batch_size * self.num_heads, tgt_len, self.head_dim): | |
| raise ValueError( | |
| f"`vision_attn_output` should be of size {(batch_size, self.num_heads, tgt_len, self.head_dim)}, but is {vision_attn_output.size()}" | |
| ) | |
| if text_attn_output.size() != (batch_size * self.num_heads, src_len, self.head_dim): | |
| raise ValueError( | |
| f"`text_attn_output` should be of size {(batch_size, self.num_heads, src_len, self.head_dim)}, but is {text_attn_output.size()}" | |
| ) | |
| vision_attn_output = vision_attn_output.view(batch_size, self.num_heads, tgt_len, self.head_dim) | |
| vision_attn_output = vision_attn_output.transpose(1, 2) | |
| vision_attn_output = vision_attn_output.reshape(batch_size, tgt_len, self.embed_dim) | |
| text_attn_output = text_attn_output.view(batch_size, self.num_heads, src_len, self.head_dim) | |
| text_attn_output = text_attn_output.transpose(1, 2) | |
| text_attn_output = text_attn_output.reshape(batch_size, src_len, self.embed_dim) | |
| vision_attn_output = self.out_vision_proj(vision_attn_output) | |
| text_attn_output = self.out_text_proj(text_attn_output) | |
| return (vision_attn_output, vision_attn_weights), (text_attn_output, text_attn_weights) | |
| # Copied from transformers.models.beit.modeling_beit.drop_path | |
| def drop_path(input: torch.Tensor, drop_prob: float = 0.0, training: bool = False) -> torch.Tensor: | |
| """ | |
| Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). | |
| Comment by Ross Wightman: This is the same as the DropConnect impl I created for EfficientNet, etc networks, | |
| however, the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper... | |
| See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for changing the | |
| layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use 'survival rate' as the | |
| argument. | |
| """ | |
| if drop_prob == 0.0 or not training: | |
| return input | |
| keep_prob = 1 - drop_prob | |
| shape = (input.shape[0],) + (1,) * (input.ndim - 1) # work with diff dim tensors, not just 2D ConvNets | |
| random_tensor = keep_prob + torch.rand(shape, dtype=input.dtype, device=input.device) | |
| random_tensor.floor_() # binarize | |
| output = input.div(keep_prob) * random_tensor | |
| return output | |
| # Copied from transformers.models.beit.modeling_beit.BeitDropPath with Beit->GroundingDino | |
| class GroundingDinoDropPath(nn.Module): | |
| """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).""" | |
| def __init__(self, drop_prob: Optional[float] = None) -> None: | |
| super().__init__() | |
| self.drop_prob = drop_prob | |
| def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: | |
| return drop_path(hidden_states, self.drop_prob, self.training) | |
| def extra_repr(self) -> str: | |
| return "p={}".format(self.drop_prob) | |
| class GroundingDinoFusionLayer(nn.Module): | |
| def __init__(self, config): | |
| super().__init__() | |
| drop_path = config.fusion_droppath | |
| # pre layer norm | |
| self.layer_norm_vision = nn.LayerNorm(config.d_model, config.layer_norm_eps) | |
| self.layer_norm_text = nn.LayerNorm(config.d_model, config.layer_norm_eps) | |
| self.attn = GroundingDinoBiMultiHeadAttention(config) | |
| # add layer scale for training stability | |
| self.drop_path = GroundingDinoDropPath(drop_path) if drop_path > 0.0 else nn.Identity() | |
| init_values = 1e-4 | |
| self.vision_param = nn.Parameter(init_values * torch.ones((config.d_model)), requires_grad=True) | |
| self.text_param = nn.Parameter(init_values * torch.ones((config.d_model)), requires_grad=True) | |
| def forward( | |
| self, | |
| vision_features: torch.FloatTensor, | |
| text_features: torch.FloatTensor, | |
| attention_mask_vision: Optional[torch.BoolTensor] = None, | |
| attention_mask_text: Optional[torch.BoolTensor] = None, | |
| ) -> Tuple[Tuple[torch.FloatTensor, torch.FloatTensor], Tuple[torch.FloatTensor, torch.FloatTensor]]: | |
| """Image and text features fusion | |
| Args: | |
| vision_features (`torch.FloatTensor` of shape `(batch_size, vision_sequence_length, hidden_dim)`): | |
| Projected flattened image features generated by the vision backbone. | |
| text_features (`torch.FloatTensor` of shape `(batch_size, text_sequence_length, hidden_dim)`): | |
| Projected text features generated by the text encoder. | |
| attention_mask_vision (`torch.BoolTensor`, **optional**): | |
| Attention mask for image-to-text cross-attention. False for real tokens and True for padding tokens. | |
| attention_mask_text (`torch.BoolTensor`, **optional**): | |
| Attention mask for text-to-image cross-attention. False for real tokens and True for padding tokens. | |
| Returns: | |
| `tuple(tuple(torch.FloatTensor), tuple(torch.FloatTensor))` where each inner tuple comprises an enhanced | |
| feature and attention output and weights: | |
| - **vision_features** (`torch.FloatTensor` of shape `(batch_size, vision_sequence_length, vision_dim)`) -- | |
| Updated vision features with attention output from image-to-text cross-attention layer. | |
| - **vision_attn_weights** (`torch.FloatTensor` of shape `(batch_size, num_heads, vision_sequence_length, | |
| vision_sequence_length)`) -- | |
| Attention weights of the image-to-text cross-attention layer. | |
| - **text_features** (`torch.FloatTensor` of shape `(batch_size, text_sequence_length, text_dim)`) -- | |
| Updated text features with attention output from text-to-image cross-attention layer. | |
| - **text_attn_weights** (`torch.FloatTensor` of shape `(batch_size, num_heads, text_sequence_length, | |
| text_sequence_length)`) -- | |
| Attention weights of the text-to-image cross-attention layer. | |
| """ | |
| vision_features = self.layer_norm_vision(vision_features) | |
| text_features = self.layer_norm_text(text_features) | |
| (delta_v, vision_attn), (delta_t, text_attn) = self.attn( | |
| vision_features, | |
| text_features, | |
| vision_attention_mask=attention_mask_vision, | |
| text_attention_mask=attention_mask_text, | |
| ) | |
| vision_features = vision_features + self.drop_path(self.vision_param * delta_v) | |
| text_features = text_features + self.drop_path(self.text_param * delta_t) | |
| return (vision_features, vision_attn), (text_features, text_attn) | |
| class GroundingDinoDeformableLayer(nn.Module): | |
| def __init__(self, config: GroundingDinoConfig): | |
| super().__init__() | |
| self.embed_dim = config.d_model | |
| self.self_attn = GroundingDinoMultiscaleDeformableAttention( | |
| config, num_heads=config.encoder_attention_heads, n_points=config.encoder_n_points | |
| ) | |
| self.self_attn_layer_norm = nn.LayerNorm(self.embed_dim, config.layer_norm_eps) | |
| self.dropout = config.dropout | |
| self.activation_fn = ACT2FN[config.activation_function] | |
| self.activation_dropout = config.activation_dropout | |
| self.fc1 = nn.Linear(self.embed_dim, config.encoder_ffn_dim) | |
| self.fc2 = nn.Linear(config.encoder_ffn_dim, self.embed_dim) | |
| self.final_layer_norm = nn.LayerNorm(self.embed_dim, config.layer_norm_eps) | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| attention_mask: torch.Tensor, | |
| position_embeddings: torch.Tensor = None, | |
| reference_points=None, | |
| spatial_shapes=None, | |
| level_start_index=None, | |
| output_attentions: bool = False, | |
| ): | |
| """ | |
| Args: | |
| hidden_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): | |
| Input to the layer. | |
| attention_mask (`torch.FloatTensor` of shape `(batch_size, sequence_length)`): | |
| Attention mask. | |
| position_embeddings (`torch.FloatTensor`, *optional*): | |
| Position embeddings, to be added to `hidden_states`. | |
| reference_points (`torch.FloatTensor`, *optional*): | |
| Reference points. | |
| spatial_shapes (`torch.LongTensor`, *optional*): | |
| Spatial shapes of the backbone feature maps. | |
| level_start_index (`torch.LongTensor`, *optional*): | |
| Level start index. | |
| output_attentions (`bool`, *optional*): | |
| Whether or not to return the attentions tensors of all attention layers. See `attentions` under | |
| returned tensors for more detail. | |
| """ | |
| residual = hidden_states | |
| # Apply Multi-scale Deformable Attention Module on the multi-scale feature maps. | |
| hidden_states, attn_weights = self.self_attn( | |
| hidden_states=hidden_states, | |
| attention_mask=attention_mask, | |
| encoder_hidden_states=hidden_states, | |
| encoder_attention_mask=attention_mask, | |
| position_embeddings=position_embeddings, | |
| reference_points=reference_points, | |
| spatial_shapes=spatial_shapes, | |
| level_start_index=level_start_index, | |
| output_attentions=output_attentions, | |
| ) | |
| hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) | |
| hidden_states = residual + hidden_states | |
| hidden_states = self.self_attn_layer_norm(hidden_states) | |
| residual = hidden_states | |
| hidden_states = self.activation_fn(self.fc1(hidden_states)) | |
| hidden_states = nn.functional.dropout(hidden_states, p=self.activation_dropout, training=self.training) | |
| hidden_states = self.fc2(hidden_states) | |
| hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) | |
| hidden_states = residual + hidden_states | |
| hidden_states = self.final_layer_norm(hidden_states) | |
| if self.training: | |
| if torch.isinf(hidden_states).any() or torch.isnan(hidden_states).any(): | |
| clamp_value = torch.finfo(hidden_states.dtype).max - 1000 | |
| hidden_states = torch.clamp(hidden_states, min=-clamp_value, max=clamp_value) | |
| return hidden_states, attn_weights | |
| # Based on https://github.com/IDEA-Research/GroundingDINO/blob/2b62f419c292ca9c518daae55512fabc3fead4a4/groundingdino/models/GroundingDINO/utils.py#L24 | |
| def get_sine_pos_embed( | |
| pos_tensor: torch.Tensor, num_pos_feats: int = 128, temperature: int = 10000, exchange_xy: bool = True | |
| ) -> Tensor: | |
| """ | |
| Generate sine position embeddings from a position tensor. | |
| Args: | |
| pos_tensor (torch.Tensor): | |
| Tensor containing positions. Shape: [..., n]. | |
| num_pos_feats (`int`, *optional*, defaults to 128): | |
| Projected shape for each float in the tensor. | |
| temperature (`int`, *optional*, defaults to 10000): | |
| Temperature in the sine/cosine function. | |
| exchange_xy (`bool`, *optional*, defaults to `True`): | |
| Exchange pos x and pos y. For example, input tensor is [x,y], the results will be [pos(y), pos(x)]. | |
| Returns: | |
| position_embeddings (torch.Tensor): shape: [..., n * hidden_size]. | |
| """ | |
| scale = 2 * math.pi | |
| dim_t = torch.arange(num_pos_feats, dtype=torch.float32, device=pos_tensor.device) | |
| dim_t = temperature ** (2 * torch.div(dim_t, 2, rounding_mode="floor") / num_pos_feats) | |
| def sine_func(x: torch.Tensor): | |
| sin_x = x * scale / dim_t | |
| sin_x = torch.stack((sin_x[..., 0::2].sin(), sin_x[..., 1::2].cos()), dim=3).flatten(2) | |
| return sin_x | |
| pos_tensor = pos_tensor.split([1] * pos_tensor.shape[-1], dim=-1) | |
| position_embeddings = [sine_func(x) for x in pos_tensor] | |
| if exchange_xy: | |
| position_embeddings[0], position_embeddings[1] = position_embeddings[1], position_embeddings[0] | |
| position_embeddings = torch.cat(position_embeddings, dim=-1) | |
| return position_embeddings | |
| class GroundingDinoEncoderLayer(nn.Module): | |
| def __init__(self, config) -> None: | |
| super().__init__() | |
| self.d_model = config.d_model | |
| self.text_enhancer_layer = GroundingDinoTextEnhancerLayer(config) | |
| self.fusion_layer = GroundingDinoFusionLayer(config) | |
| self.deformable_layer = GroundingDinoDeformableLayer(config) | |
| def get_text_position_embeddings( | |
| self, | |
| text_features: Tensor, | |
| text_position_embedding: Optional[torch.Tensor], | |
| text_position_ids: Optional[torch.Tensor], | |
| ) -> Tensor: | |
| batch_size, seq_length, _ = text_features.shape | |
| if text_position_embedding is None and text_position_ids is None: | |
| text_position_embedding = torch.arange(seq_length, device=text_features.device) | |
| text_position_embedding = text_position_embedding.float() | |
| text_position_embedding = text_position_embedding.unsqueeze(0).unsqueeze(-1) | |
| text_position_embedding = text_position_embedding.repeat(batch_size, 1, 1) | |
| text_position_embedding = get_sine_pos_embed( | |
| text_position_embedding, num_pos_feats=self.d_model, exchange_xy=False | |
| ) | |
| if text_position_ids is not None: | |
| text_position_embedding = get_sine_pos_embed( | |
| text_position_ids[..., None], num_pos_feats=self.d_model, exchange_xy=False | |
| ) | |
| return text_position_embedding | |
| def forward( | |
| self, | |
| vision_features: Tensor, | |
| vision_position_embedding: Tensor, | |
| spatial_shapes: Tensor, | |
| level_start_index: Tensor, | |
| key_padding_mask: Tensor, | |
| reference_points: Tensor, | |
| text_features: Optional[Tensor] = None, | |
| text_attention_mask: Optional[Tensor] = None, | |
| text_position_embedding: Optional[Tensor] = None, | |
| text_self_attention_masks: Optional[Tensor] = None, | |
| text_position_ids: Optional[Tensor] = None, | |
| ): | |
| text_position_embedding = self.get_text_position_embeddings( | |
| text_features, text_position_embedding, text_position_ids | |
| ) | |
| (vision_features, vision_fused_attn), (text_features, text_fused_attn) = self.fusion_layer( | |
| vision_features=vision_features, | |
| text_features=text_features, | |
| attention_mask_vision=key_padding_mask, | |
| attention_mask_text=text_attention_mask, | |
| ) | |
| (text_features, text_enhanced_attn) = self.text_enhancer_layer( | |
| hidden_states=text_features, | |
| attention_masks=~text_self_attention_masks, # note we use ~ for mask here | |
| position_embeddings=(text_position_embedding if text_position_embedding is not None else None), | |
| ) | |
| (vision_features, vision_deformable_attn) = self.deformable_layer( | |
| hidden_states=vision_features, | |
| attention_mask=~key_padding_mask, | |
| position_embeddings=vision_position_embedding, | |
| reference_points=reference_points, | |
| spatial_shapes=spatial_shapes, | |
| level_start_index=level_start_index, | |
| ) | |
| return ( | |
| (vision_features, text_features), | |
| (vision_fused_attn, text_fused_attn, text_enhanced_attn, vision_deformable_attn), | |
| ) | |
| class GroundingDinoMultiheadAttention(nn.Module): | |
| """Equivalent implementation of nn.MultiheadAttention with `batch_first=True`.""" | |
| def __init__(self, config, num_attention_heads=None): | |
| super().__init__() | |
| if config.hidden_size % num_attention_heads != 0 and not hasattr(config, "embedding_size"): | |
| raise ValueError( | |
| f"The hidden size ({config.hidden_size}) is not a multiple of the number of attention " | |
| f"heads ({num_attention_heads})" | |
| ) | |
| self.num_attention_heads = num_attention_heads | |
| self.attention_head_size = int(config.hidden_size / num_attention_heads) | |
| self.all_head_size = self.num_attention_heads * self.attention_head_size | |
| self.query = nn.Linear(config.hidden_size, self.all_head_size) | |
| self.key = nn.Linear(config.hidden_size, self.all_head_size) | |
| self.value = nn.Linear(config.hidden_size, self.all_head_size) | |
| self.out_proj = nn.Linear(config.hidden_size, config.hidden_size) | |
| self.dropout = nn.Dropout(config.attention_dropout) | |
| def transpose_for_scores(self, x: torch.Tensor) -> torch.Tensor: | |
| new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size) | |
| x = x.view(new_x_shape) | |
| return x.permute(0, 2, 1, 3) | |
| def forward( | |
| self, | |
| queries: torch.Tensor, | |
| keys: torch.Tensor, | |
| values: torch.Tensor, | |
| attention_mask: Optional[torch.FloatTensor] = None, | |
| output_attentions: Optional[bool] = False, | |
| ) -> Tuple[torch.Tensor]: | |
| query_layer = self.transpose_for_scores(self.query(queries)) | |
| key_layer = self.transpose_for_scores(self.key(keys)) | |
| value_layer = self.transpose_for_scores(self.value(values)) | |
| # Take the dot product between "query" and "key" to get the raw attention scores. | |
| attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) | |
| attention_scores = attention_scores / math.sqrt(self.attention_head_size) | |
| if attention_mask is not None: | |
| # Apply the attention mask is (precomputed for all layers in GroundingDinoModel forward() function) | |
| attention_scores = attention_scores + attention_mask | |
| # Normalize the attention scores to probabilities. | |
| attention_probs = nn.functional.softmax(attention_scores, dim=-1) | |
| # This is actually dropping out entire tokens to attend to, which might | |
| # seem a bit unusual, but is taken from the original Transformer paper. | |
| attention_probs = self.dropout(attention_probs) | |
| context_layer = torch.matmul(attention_probs, value_layer) | |
| context_layer = context_layer.permute(0, 2, 1, 3).contiguous() | |
| new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,) | |
| context_layer = context_layer.view(new_context_layer_shape) | |
| context_layer = self.out_proj(context_layer) | |
| outputs = (context_layer, attention_probs) if output_attentions else (context_layer,) | |
| return outputs | |
| class GroundingDinoDecoderLayer(nn.Module): | |
| def __init__(self, config: GroundingDinoConfig): | |
| super().__init__() | |
| self.embed_dim = config.d_model | |
| # self-attention | |
| self.self_attn = GroundingDinoMultiheadAttention(config, num_attention_heads=config.decoder_attention_heads) | |
| self.dropout = config.dropout | |
| self.activation_fn = ACT2FN[config.activation_function] | |
| self.activation_dropout = config.activation_dropout | |
| self.self_attn_layer_norm = nn.LayerNorm(self.embed_dim, config.layer_norm_eps) | |
| # cross-attention text | |
| self.encoder_attn_text = GroundingDinoMultiheadAttention( | |
| config, num_attention_heads=config.decoder_attention_heads | |
| ) | |
| self.encoder_attn_text_layer_norm = nn.LayerNorm(self.embed_dim, config.layer_norm_eps) | |
| # cross-attention | |
| self.encoder_attn = GroundingDinoMultiscaleDeformableAttention( | |
| config, | |
| num_heads=config.decoder_attention_heads, | |
| n_points=config.decoder_n_points, | |
| ) | |
| self.encoder_attn_layer_norm = nn.LayerNorm(self.embed_dim, config.layer_norm_eps) | |
| # feedforward neural networks | |
| self.fc1 = nn.Linear(self.embed_dim, config.decoder_ffn_dim) | |
| self.fc2 = nn.Linear(config.decoder_ffn_dim, self.embed_dim) | |
| self.final_layer_norm = nn.LayerNorm(self.embed_dim, config.layer_norm_eps) | |
| def with_pos_embed(self, tensor: torch.Tensor, position_embeddings: Optional[Tensor]): | |
| return tensor if position_embeddings is None else tensor + position_embeddings | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| position_embeddings: Optional[torch.Tensor] = None, | |
| reference_points=None, | |
| spatial_shapes=None, | |
| level_start_index=None, | |
| vision_encoder_hidden_states: Optional[torch.Tensor] = None, | |
| vision_encoder_attention_mask: Optional[torch.Tensor] = None, | |
| text_encoder_hidden_states: Optional[torch.Tensor] = None, | |
| text_encoder_attention_mask: Optional[torch.Tensor] = None, | |
| self_attn_mask: Optional[torch.Tensor] = None, | |
| output_attentions: Optional[bool] = False, | |
| ): | |
| residual = hidden_states | |
| # Self Attention | |
| queries = keys = self.with_pos_embed(hidden_states, position_embeddings) | |
| hidden_states, self_attn_weights = self.self_attn( | |
| queries=queries, | |
| keys=keys, | |
| values=hidden_states, | |
| attention_mask=self_attn_mask, | |
| output_attentions=True, | |
| ) | |
| hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) | |
| hidden_states = residual + hidden_states | |
| hidden_states = self.self_attn_layer_norm(hidden_states) | |
| second_residual = hidden_states | |
| # Cross-Attention Text | |
| queries = self.with_pos_embed(hidden_states, position_embeddings) | |
| hidden_states, text_cross_attn_weights = self.encoder_attn_text( | |
| queries=queries, | |
| keys=text_encoder_hidden_states, | |
| values=text_encoder_hidden_states, | |
| attention_mask=text_encoder_attention_mask, | |
| output_attentions=True, | |
| ) | |
| hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) | |
| hidden_states = second_residual + hidden_states | |
| hidden_states = self.encoder_attn_text_layer_norm(hidden_states) | |
| third_residual = hidden_states | |
| # Cross-Attention | |
| cross_attn_weights = None | |
| hidden_states, cross_attn_weights = self.encoder_attn( | |
| hidden_states=hidden_states, | |
| attention_mask=vision_encoder_attention_mask, | |
| encoder_hidden_states=vision_encoder_hidden_states, | |
| encoder_attention_mask=vision_encoder_attention_mask, | |
| position_embeddings=position_embeddings, | |
| reference_points=reference_points, | |
| spatial_shapes=spatial_shapes, | |
| level_start_index=level_start_index, | |
| output_attentions=output_attentions, | |
| ) | |
| hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) | |
| hidden_states = third_residual + hidden_states | |
| hidden_states = self.encoder_attn_layer_norm(hidden_states) | |
| # Fully Connected | |
| residual = hidden_states | |
| hidden_states = self.activation_fn(self.fc1(hidden_states)) | |
| hidden_states = nn.functional.dropout(hidden_states, p=self.activation_dropout, training=self.training) | |
| hidden_states = self.fc2(hidden_states) | |
| hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) | |
| hidden_states = residual + hidden_states | |
| hidden_states = self.final_layer_norm(hidden_states) | |
| outputs = (hidden_states,) | |
| if output_attentions: | |
| outputs += (self_attn_weights, text_cross_attn_weights, cross_attn_weights) | |
| return outputs | |
| # class GroundingDinoContrastiveEmbedding(nn.Module): | |
| # def __init__(self, config): | |
| # super().__init__() | |
| # self.max_text_len = config.max_text_len | |
| # bias_value = -math.log((1 - 0.01) / 0.01) | |
| # self.bias = nn.Parameter(torch.Tensor([bias_value])) | |
| # def forward( | |
| # self, | |
| # vision_hidden_state: torch.FloatTensor, | |
| # text_hidden_state: torch.FloatTensor, | |
| # text_token_mask: torch.BoolTensor, | |
| # ) -> torch.FloatTensor: | |
| # output = vision_hidden_state @ text_hidden_state.transpose(-1, -2) | |
| # output = output + self.bias | |
| # output = output.masked_fill(~text_token_mask[:, None, :], float("-inf")) | |
| # # padding to max_text_len | |
| # new_output = torch.full((*output.shape[:-1], self.max_text_len), float("-inf"), device=output.device) | |
| # new_output[..., : output.shape[-1]] = output | |
| # return new_output | |
| class GroundingDinoContrastiveEmbedding(nn.Module): | |
| """text visual ContrastiveEmbed layer. | |
| """ | |
| def __init__(self, config): | |
| super().__init__() | |
| self.max_text_len = config.max_text_len | |
| self.log_scale = 'auto' | |
| self.bias = None | |
| if True: | |
| bias_value = -math.log((1 - 0.01) / 0.01) | |
| self.bias = nn.Parameter( | |
| torch.Tensor([bias_value]), requires_grad=True) | |
| def forward(self, | |
| vision_hidden_state: torch.FloatTensor, | |
| text_hidden_state: torch.FloatTensor, | |
| text_token_mask: torch.BoolTensor,) -> Tensor: | |
| """Forward function. | |
| Args: | |
| visual_feat (Tensor): Visual features. | |
| text_feat (Tensor): Text features. | |
| text_token_mask (Tensor): A mask used for text feats. | |
| Returns: | |
| Tensor: Classification score. | |
| """ | |
| y = text_hidden_state | |
| text_token_mask = text_token_mask | |
| res = vision_hidden_state @ y.transpose(-1, -2) | |
| if isinstance(self.log_scale, nn.Parameter): | |
| res = res * self.log_scale.exp() | |
| elif self.log_scale == 'auto': | |
| # NOTE: similar to the normalizer in self-attention | |
| res = res / math.sqrt(vision_hidden_state.shape[-1]) | |
| if self.bias is not None: | |
| res = res + self.bias | |
| res.masked_fill_(~text_token_mask[:, None, :], float('-inf')) | |
| new_res = torch.full((*res.shape[:-1], self.max_text_len), | |
| float('-inf'), | |
| device=res.device) | |
| new_res[..., :res.shape[-1]] = res | |
| return new_res | |
| class GroundingDinoPreTrainedModel(PreTrainedModel): | |
| config_class = GroundingDinoConfig | |
| base_model_prefix = "model" | |
| main_input_name = "pixel_values" | |
| def _init_weights(self, module): | |
| std = self.config.init_std | |
| if isinstance(module, GroundingDinoLearnedPositionEmbedding): | |
| nn.init.uniform_(module.row_embeddings.weight) | |
| nn.init.uniform_(module.column_embeddings.weight) | |
| elif isinstance(module, GroundingDinoMultiscaleDeformableAttention): | |
| module._reset_parameters() | |
| elif isinstance(module, GroundingDinoBiMultiHeadAttention): | |
| nn.init.xavier_uniform_(module.vision_proj.weight) | |
| module.vision_proj.bias.data.fill_(0) | |
| nn.init.xavier_uniform_(module.text_proj.weight) | |
| module.text_proj.bias.data.fill_(0) | |
| nn.init.xavier_uniform_(module.values_vision_proj.weight) | |
| module.values_vision_proj.bias.data.fill_(0) | |
| nn.init.xavier_uniform_(module.values_text_proj.weight) | |
| module.values_text_proj.bias.data.fill_(0) | |
| nn.init.xavier_uniform_(module.out_vision_proj.weight) | |
| module.out_vision_proj.bias.data.fill_(0) | |
| nn.init.xavier_uniform_(module.out_text_proj.weight) | |
| module.out_text_proj.bias.data.fill_(0) | |
| elif isinstance(module, (GroundingDinoEncoderLayer, GroundingDinoDecoderLayer)): | |
| for p in module.parameters(): | |
| if p.dim() > 1: | |
| nn.init.normal_(p, mean=0.0, std=std) | |
| elif isinstance(module, (nn.Linear, nn.Conv2d, nn.BatchNorm2d)): | |
| # Slightly different from the TF version which uses truncated_normal for initialization | |
| # cf https://github.com/pytorch/pytorch/pull/5617 | |
| module.weight.data.normal_(mean=0.0, std=std) | |
| if module.bias is not None: | |
| module.bias.data.zero_() | |
| elif isinstance(module, nn.Embedding): | |
| module.weight.data.normal_(mean=0.0, std=std) | |
| if module.padding_idx is not None: | |
| module.weight.data[module.padding_idx].zero_() | |
| elif isinstance(module, GroundingDinoMLPPredictionHead): | |
| nn.init.constant_(module.layers[-1].weight.data, 0) | |
| nn.init.constant_(module.layers[-1].bias.data, 0) | |
| if hasattr(module, "reference_points") and not self.config.two_stage: | |
| nn.init.xavier_uniform_(module.reference_points.weight.data, gain=1.0) | |
| nn.init.constant_(module.reference_points.bias.data, 0.0) | |
| if hasattr(module, "level_embed"): | |
| nn.init.normal_(module.level_embed) | |
| def _set_gradient_checkpointing(self, module, value=False): | |
| if isinstance(module, GroundingDinoDecoder): | |
| module.gradient_checkpointing = value | |
| GROUNDING_DINO_START_DOCSTRING = r""" | |
| This model inherits from [`PreTrainedModel`]. Check the superclass documentation for the generic methods the | |
| library implements for all its model (such as downloading or saving, resizing the input embeddings, pruning heads | |
| etc.) | |
| This model is also a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass. | |
| Use it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage | |
| and behavior. | |
| Parameters: | |
| config ([`GroundingDinoConfig`]): | |
| Model configuration class with all the parameters of the model. Initializing with a config file does not | |
| load the weights associated with the model, only the configuration. Check out the | |
| [`~PreTrainedModel.from_pretrained`] method to load the model weights. | |
| """ | |
| GROUNDING_DINO_INPUTS_DOCSTRING = r""" | |
| Args: | |
| pixel_values (`torch.FloatTensor` of shape `(batch_size, num_channels, height, width)`): | |
| Pixel values. Padding will be ignored by default should you provide it. | |
| Pixel values can be obtained using [`AutoImageProcessor`]. See [`GroundingDinoImageProcessor.__call__`] for | |
| details. | |
| input_ids (`torch.LongTensor` of shape `(batch_size, text_sequence_length)`): | |
| Indices of input sequence tokens in the vocabulary. Padding will be ignored by default should you provide | |
| it. | |
| Indices can be obtained using [`AutoTokenizer`]. See [`GroundingDinoTokenizer.__call__`] for details. | |
| token_type_ids (`torch.LongTensor` of shape `(batch_size, text_sequence_length)`, *optional*): | |
| Segment token indices to indicate first and second portions of the inputs. Indices are selected in `[0, | |
| 1]`: 0 corresponds to a `sentence A` token, 1 corresponds to a `sentence B` token | |
| [What are token type IDs?](../glossary#token-type-ids) | |
| attention_mask (`torch.LongTensor` of shape `(batch_size, text_sequence_length)`, *optional*): | |
| Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`: | |
| - 1 for tokens that are real (i.e. **not masked**), | |
| - 0 for tokens that are padding (i.e. **masked**). | |
| [What are attention masks?](../glossary#attention-mask) | |
| pixel_mask (`torch.LongTensor` of shape `(batch_size, height, width)`, *optional*): | |
| Mask to avoid performing attention on padding pixel values. Mask values selected in `[0, 1]`: | |
| - 1 for pixels that are real (i.e. **not masked**), | |
| - 0 for pixels that are padding (i.e. **masked**). | |
| [What are attention masks?](../glossary#attention-mask) | |
| encoder_outputs (`tuple(tuple(torch.FloatTensor)`, *optional*): | |
| Tuple consists of (`last_hidden_state_vision`, *optional*: `last_hidden_state_text`, *optional*: | |
| `vision_hidden_states`, *optional*: `text_hidden_states`, *optional*: `attentions`) | |
| `last_hidden_state_vision` of shape `(batch_size, sequence_length, hidden_size)`, *optional*) is a sequence | |
| of hidden-states at the output of the last layer of the encoder. Used in the cross-attention of the | |
| decoder. | |
| output_attentions (`bool`, *optional*): | |
| Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned | |
| tensors for more detail. | |
| output_hidden_states (`bool`, *optional*): | |
| Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for | |
| more detail. | |
| return_dict (`bool`, *optional*): | |
| Whether or not to return a [`~file_utils.ModelOutput`] instead of a plain tuple. | |
| """ | |
| class GroundingDinoEncoder(GroundingDinoPreTrainedModel): | |
| """ | |
| Transformer encoder consisting of *config.encoder_layers* deformable attention layers. Each layer is a | |
| [`GroundingDinoEncoderLayer`]. | |
| The encoder updates the flattened multi-scale feature maps through multiple deformable attention layers. | |
| Args: | |
| config: GroundingDinoConfig | |
| """ | |
| def __init__(self, config: GroundingDinoConfig): | |
| super().__init__(config) | |
| self.dropout = config.dropout | |
| self.layers = nn.ModuleList([GroundingDinoEncoderLayer(config) for _ in range(config.encoder_layers)]) | |
| # Initialize weights and apply final processing | |
| self.post_init() | |
| def get_reference_points(spatial_shapes, valid_ratios, device): | |
| """ | |
| Get reference points for each feature map. | |
| Args: | |
| spatial_shapes (`torch.LongTensor` of shape `(num_feature_levels, 2)`): | |
| Spatial shapes of each feature map. | |
| valid_ratios (`torch.FloatTensor` of shape `(batch_size, num_feature_levels, 2)`): | |
| Valid ratios of each feature map. | |
| device (`torch.device`): | |
| Device on which to create the tensors. | |
| Returns: | |
| `torch.FloatTensor` of shape `(batch_size, num_queries, num_feature_levels, 2)` | |
| """ | |
| reference_points_list = [] | |
| for level, (height, width) in enumerate(spatial_shapes): | |
| ref_y, ref_x = meshgrid( | |
| torch.linspace(0.5, height - 0.5, height, dtype=torch.float32, device=device), | |
| torch.linspace(0.5, width - 0.5, width, dtype=torch.float32, device=device), | |
| indexing="ij", | |
| ) | |
| # TODO: valid_ratios could be useless here. check https://github.com/fundamentalvision/Deformable-DETR/issues/36 | |
| ref_y = ref_y.reshape(-1)[None] / (valid_ratios[:, None, level, 1] * height) | |
| ref_x = ref_x.reshape(-1)[None] / (valid_ratios[:, None, level, 0] * width) | |
| ref = torch.stack((ref_x, ref_y), -1) | |
| reference_points_list.append(ref) | |
| reference_points = torch.cat(reference_points_list, 1) | |
| reference_points = reference_points[:, :, None] * valid_ratios[:, None] | |
| return reference_points | |
| def forward( | |
| self, | |
| vision_features: Tensor, | |
| vision_attention_mask: Tensor, | |
| vision_position_embedding: Tensor, | |
| spatial_shapes: Tensor, | |
| level_start_index: Tensor, | |
| valid_ratios=None, | |
| text_features: Optional[Tensor] = None, | |
| text_attention_mask: Optional[Tensor] = None, | |
| text_position_embedding: Optional[Tensor] = None, | |
| text_self_attention_masks: Optional[Tensor] = None, | |
| text_position_ids: Optional[Tensor] = None, | |
| output_attentions=None, | |
| output_hidden_states=None, | |
| return_dict=None, | |
| ): | |
| r""" | |
| Args: | |
| vision_features (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): | |
| Flattened feature map (output of the backbone + projection layer) that is passed to the encoder. | |
| vision_attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*): | |
| Mask to avoid performing attention on padding pixel features. Mask values selected in `[0, 1]`: | |
| - 0 for pixel features that are real (i.e. **not masked**), | |
| - 1 for pixel features that are padding (i.e. **masked**). | |
| [What are attention masks?](../glossary#attention-mask) | |
| vision_position_embedding (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): | |
| Position embeddings that are added to the queries and keys in each self-attention layer. | |
| spatial_shapes (`torch.LongTensor` of shape `(num_feature_levels, 2)`): | |
| Spatial shapes of each feature map. | |
| level_start_index (`torch.LongTensor` of shape `(num_feature_levels)`): | |
| Starting index of each feature map. | |
| valid_ratios (`torch.FloatTensor` of shape `(batch_size, num_feature_levels, 2)`): | |
| Ratio of valid area in each feature level. | |
| text_features (`torch.FloatTensor` of shape `(batch_size, text_seq_len, hidden_size)`): | |
| Flattened text features that are passed to the encoder. | |
| text_attention_mask (`torch.Tensor` of shape `(batch_size, text_seq_len)`, *optional*): | |
| Mask to avoid performing attention on padding text features. Mask values selected in `[0, 1]`: | |
| - 0 for text features that are real (i.e. **not masked**), | |
| - 1 for text features that are padding (i.e. **masked**). | |
| [What are attention masks?](../glossary#attention-mask) | |
| text_position_embedding (`torch.FloatTensor` of shape `(batch_size, text_seq_len)`): | |
| Position embeddings that are added to the queries and keys in each self-attention layer. | |
| text_self_attention_masks (`torch.BoolTensor` of shape `(batch_size, text_seq_len, text_seq_len)`): | |
| Masks to avoid performing attention between padding text features. Mask values selected in `[0, 1]`: | |
| - 1 for text features that are real (i.e. **not masked**), | |
| - 0 for text features that are padding (i.e. **masked**). | |
| text_position_ids (`torch.LongTensor` of shape `(batch_size, num_queries)`): | |
| Position ids for text features. | |
| output_attentions (`bool`, *optional*): | |
| Whether or not to return the attentions tensors of all attention layers. See `attentions` under | |
| returned tensors for more detail. | |
| output_hidden_states (`bool`, *optional*): | |
| Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors | |
| for more detail. | |
| return_dict (`bool`, *optional*): | |
| Whether or not to return a [`~file_utils.ModelOutput`] instead of a plain tuple. | |
| """ | |
| output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions | |
| output_hidden_states = ( | |
| output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states | |
| ) | |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
| reference_points = self.get_reference_points(spatial_shapes, valid_ratios, device=vision_features.device) | |
| encoder_vision_states = () if output_hidden_states else None | |
| encoder_text_states = () if output_hidden_states else None | |
| all_attns = () if output_attentions else None | |
| all_attn_fused_text = () if output_attentions else None | |
| all_attn_fused_vision = () if output_attentions else None | |
| all_attn_enhanced_text = () if output_attentions else None | |
| all_attn_deformable = () if output_attentions else None | |
| for i, encoder_layer in enumerate(self.layers): | |
| if output_hidden_states: | |
| encoder_vision_states += (vision_features,) | |
| encoder_text_states += (text_features,) | |
| (vision_features, text_features), attentions = encoder_layer( | |
| vision_features=vision_features, | |
| vision_position_embedding=vision_position_embedding, | |
| spatial_shapes=spatial_shapes, | |
| level_start_index=level_start_index, | |
| key_padding_mask=vision_attention_mask, | |
| reference_points=reference_points, | |
| text_features=text_features, | |
| text_attention_mask=text_attention_mask, | |
| text_position_embedding=text_position_embedding, | |
| text_self_attention_masks=text_self_attention_masks, | |
| text_position_ids=text_position_ids, | |
| ) | |
| if output_attentions: | |
| all_attn_fused_vision += (attentions[0],) | |
| all_attn_fused_text += (attentions[1],) | |
| all_attn_enhanced_text += (attentions[2],) | |
| all_attn_deformable += (attentions[3],) | |
| if output_hidden_states: | |
| encoder_vision_states += (vision_features,) | |
| encoder_text_states += (text_features,) | |
| if output_attentions: | |
| all_attns = (all_attn_fused_vision, all_attn_fused_text, all_attn_enhanced_text, all_attn_deformable) | |
| if not return_dict: | |
| enc_outputs = [vision_features, text_features, encoder_vision_states, encoder_text_states, all_attns] | |
| return tuple(v for v in enc_outputs if v is not None) | |
| return GroundingDinoEncoderOutput( | |
| last_hidden_state_vision=vision_features, | |
| last_hidden_state_text=text_features, | |
| vision_hidden_states=encoder_vision_states, | |
| text_hidden_states=encoder_text_states, | |
| attentions=all_attns, | |
| ) | |
| class GroundingDinoDecoder(GroundingDinoPreTrainedModel): | |
| """ | |
| Transformer decoder consisting of *config.decoder_layers* layers. Each layer is a [`GroundingDinoDecoderLayer`]. | |
| The decoder updates the query embeddings through multiple self-attention and cross-attention layers. | |
| Some tweaks for Grounding DINO: | |
| - `position_embeddings`, `reference_points`, `spatial_shapes` and `valid_ratios` are added to the forward pass. | |
| - it also returns a stack of intermediate outputs and reference points from all decoding layers. | |
| Args: | |
| config: GroundingDinoConfig | |
| """ | |
| def __init__(self, config: GroundingDinoConfig): | |
| super().__init__(config) | |
| self.dropout = config.dropout | |
| self.layer_norm = nn.LayerNorm(config.d_model, config.layer_norm_eps) | |
| self.layers = nn.ModuleList([GroundingDinoDecoderLayer(config) for _ in range(config.decoder_layers)]) | |
| self.reference_points_head = GroundingDinoMLPPredictionHead( | |
| config.query_dim // 2 * config.d_model, config.d_model, config.d_model, 2 | |
| ) | |
| self.gradient_checkpointing = False | |
| # hack implementation for iterative bounding box refinement as in two-stage Deformable DETR | |
| self.bbox_embed = None | |
| self.class_embed = None | |
| self.query_scale = None | |
| # Initialize weights and apply final processing | |
| self.post_init() | |
| def forward( | |
| self, | |
| inputs_embeds, | |
| vision_encoder_hidden_states, | |
| vision_encoder_attention_mask=None, | |
| text_encoder_hidden_states=None, | |
| text_encoder_attention_mask=None, | |
| reference_points=None, | |
| spatial_shapes=None, | |
| level_start_index=None, | |
| valid_ratios=None, | |
| self_attn_mask=None, | |
| output_attentions=None, | |
| output_hidden_states=None, | |
| return_dict=None, | |
| ): | |
| r""" | |
| Args: | |
| inputs_embeds (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`): | |
| The query embeddings that are passed into the decoder. | |
| vision_encoder_hidden_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): | |
| Last hidden state from encoder related to vision feature map. | |
| vision_encoder_attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*): | |
| Mask to avoid performing attention on padding pixel features. Mask values selected in `[0, 1]`: | |
| - 1 for pixel features that are real (i.e. **not masked**), | |
| - 0 for pixel features that are padding (i.e. **masked**). | |
| text_encoder_hidden_states (`torch.FloatTensor` of shape `(batch_size, text_seq_len, hidden_size)`): | |
| Last hidden state from encoder related to text features. | |
| text_encoder_attention_mask (`torch.Tensor` of shape `(batch_size, text_seq_len)`, *optional*): | |
| Mask to avoid performing attention on padding text features. Mask values selected in `[0, 1]`: | |
| - 0 for text features that are real (i.e. **not masked**), | |
| - 1 for text features that are padding (i.e. **masked**). | |
| reference_points (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)` is `as_two_stage` else `(batch_size, num_queries, 2)` or , *optional*): | |
| Reference point in range `[0, 1]`, top-left (0,0), bottom-right (1, 1), including padding area. | |
| spatial_shapes (`torch.FloatTensor` of shape `(num_feature_levels, 2)`): | |
| Spatial shapes of the feature maps. | |
| level_start_index (`torch.LongTensor` of shape `(num_feature_levels)`, *optional*): | |
| Indexes for the start of each feature level. In range `[0, sequence_length]`. | |
| valid_ratios (`torch.FloatTensor` of shape `(batch_size, num_feature_levels, 2)`, *optional*): | |
| Ratio of valid area in each feature level. | |
| self_attn_mask (`torch.BoolTensor` of shape `(batch_size, text_seq_len)`): | |
| Masks to avoid performing self-attention between vision hidden state. Mask values selected in `[0, 1]`: | |
| - 1 for queries that are real (i.e. **not masked**), | |
| - 0 for queries that are padding (i.e. **masked**). | |
| output_attentions (`bool`, *optional*): | |
| Whether or not to return the attentions tensors of all attention layers. See `attentions` under | |
| returned tensors for more detail. | |
| output_hidden_states (`bool`, *optional*): | |
| Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors | |
| for more detail. | |
| return_dict (`bool`, *optional*): | |
| Whether or not to return a [`~file_utils.ModelOutput`] instead of a plain tuple. | |
| """ | |
| output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions | |
| output_hidden_states = ( | |
| output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states | |
| ) | |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
| if inputs_embeds is not None: | |
| hidden_states = inputs_embeds | |
| # decoder layers | |
| all_hidden_states = () if output_hidden_states else None | |
| all_self_attns = () if output_attentions else None | |
| all_attns = () if output_attentions else None | |
| all_cross_attns_vision = () if (output_attentions and vision_encoder_hidden_states is not None) else None | |
| all_cross_attns_text = () if (output_attentions and text_encoder_hidden_states is not None) else None | |
| intermediate = () | |
| intermediate_reference_points = () | |
| if text_encoder_attention_mask is not None: | |
| dtype = text_encoder_hidden_states.dtype | |
| text_encoder_attention_mask = text_encoder_attention_mask[:, None, None, :] | |
| text_encoder_attention_mask = text_encoder_attention_mask.repeat( | |
| 1, self.config.decoder_attention_heads, self.config.num_queries, 1 | |
| ) | |
| text_encoder_attention_mask = text_encoder_attention_mask.to(dtype=dtype) | |
| text_encoder_attention_mask = text_encoder_attention_mask * torch.finfo(dtype).min | |
| for idx, decoder_layer in enumerate(self.layers): | |
| num_coordinates = reference_points.shape[-1] | |
| if num_coordinates == 4: | |
| reference_points_input = ( | |
| reference_points[:, :, None] * torch.cat([valid_ratios, valid_ratios], -1)[:, None] | |
| ) | |
| elif num_coordinates == 2: | |
| reference_points_input = reference_points[:, :, None] * valid_ratios[:, None] | |
| else: | |
| raise ValueError("Last dim of reference_points must be 2 or 4, but got {reference_points.shape[-1]}") | |
| query_pos = get_sine_pos_embed(reference_points_input[:, :, 0, :], num_pos_feats=self.config.d_model // 2) | |
| query_pos = self.reference_points_head(query_pos) | |
| # In original implementation they apply layer norm before outputting intermediate hidden states | |
| # Though that's not through between layers so the layers use as input the output of the previous layer | |
| # withtout layer norm | |
| if output_hidden_states: | |
| all_hidden_states += (self.layer_norm(hidden_states),) | |
| if self.gradient_checkpointing and self.training: | |
| def create_custom_forward(module): | |
| def custom_forward(*inputs): | |
| return module(*inputs, output_attentions) | |
| return custom_forward | |
| layer_outputs = torch.utils.checkpoint.checkpoint( | |
| create_custom_forward(decoder_layer), | |
| hidden_states, | |
| query_pos, | |
| reference_points_input, | |
| spatial_shapes, | |
| level_start_index, | |
| vision_encoder_hidden_states, | |
| vision_encoder_attention_mask, | |
| text_encoder_hidden_states, | |
| text_encoder_attention_mask, | |
| self_attn_mask, | |
| None, | |
| ) | |
| else: | |
| layer_outputs = decoder_layer( | |
| hidden_states=hidden_states, | |
| position_embeddings=query_pos, | |
| reference_points=reference_points_input, | |
| spatial_shapes=spatial_shapes, | |
| level_start_index=level_start_index, | |
| vision_encoder_hidden_states=vision_encoder_hidden_states, | |
| vision_encoder_attention_mask=vision_encoder_attention_mask, | |
| text_encoder_hidden_states=text_encoder_hidden_states, | |
| text_encoder_attention_mask=text_encoder_attention_mask, | |
| self_attn_mask=self_attn_mask, | |
| output_attentions=output_attentions, | |
| ) | |
| hidden_states = layer_outputs[0] | |
| # hack implementation for iterative bounding box refinement | |
| if self.bbox_embed is not None: | |
| tmp = self.bbox_embed[idx](hidden_states) | |
| num_coordinates = reference_points.shape[-1] | |
| if num_coordinates == 4: | |
| new_reference_points = tmp + torch.special.logit(reference_points, eps=1e-5) | |
| new_reference_points = new_reference_points.sigmoid() | |
| elif num_coordinates == 2: | |
| new_reference_points = tmp | |
| new_reference_points[..., :2] = tmp[..., :2] + torch.special.logit(reference_points, eps=1e-5) | |
| new_reference_points = new_reference_points.sigmoid() | |
| else: | |
| raise ValueError( | |
| f"Last dim of reference_points must be 2 or 4, but got {reference_points.shape[-1]}" | |
| ) | |
| reference_points = new_reference_points.detach() | |
| intermediate += (self.layer_norm(hidden_states),) | |
| intermediate_reference_points += (reference_points,) | |
| if output_attentions: | |
| all_self_attns += (layer_outputs[1],) | |
| if text_encoder_hidden_states is not None: | |
| all_cross_attns_text += (layer_outputs[2],) | |
| if vision_encoder_hidden_states is not None: | |
| all_cross_attns_vision += (layer_outputs[3],) | |
| # Keep batch_size as first dimension | |
| intermediate = torch.stack(intermediate, dim=1) | |
| intermediate_reference_points = torch.stack(intermediate_reference_points, dim=1) | |
| hidden_states = self.layer_norm(hidden_states) | |
| # add hidden states from the last decoder layer | |
| if output_hidden_states: | |
| all_hidden_states += (hidden_states,) | |
| if output_attentions: | |
| all_attns += (all_self_attns, all_cross_attns_text, all_cross_attns_vision) | |
| if not return_dict: | |
| return tuple( | |
| v | |
| for v in [ | |
| hidden_states, | |
| intermediate, | |
| intermediate_reference_points, | |
| all_hidden_states, | |
| all_attns, | |
| ] | |
| if v is not None | |
| ) | |
| return GroundingDinoDecoderOutput( | |
| last_hidden_state=hidden_states, | |
| intermediate_hidden_states=intermediate, | |
| intermediate_reference_points=intermediate_reference_points, | |
| hidden_states=all_hidden_states, | |
| attentions=all_attns, | |
| ) | |
| # these correspond to [CLS], [SEP], . and ? | |
| SPECIAL_TOKENS = [101, 102, 1012, 1029] | |
| def generate_masks_with_special_tokens_and_transfer_map(input_ids: torch.LongTensor) -> Tuple[Tensor, Tensor]: | |
| """Generate attention mask between each pair of special tokens and positional ids. | |
| Args: | |
| input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`): | |
| Indices of input sequence tokens in the vocabulary. | |
| Returns: | |
| `tuple(torch.Tensor)` comprising attention mask between each special tokens and position_ids: | |
| - **attention_mask** (`torch.BoolTensor` of shape `(batch_size, sequence_length, sequence_length)`) | |
| - **position_ids** (`torch.LongTensor` of shape `(batch_size, sequence_length)`) | |
| """ | |
| batch_size, num_token = input_ids.shape | |
| # special_tokens_mask: batch_size, num_token. 1 for special tokens. 0 for normal tokens | |
| special_tokens_mask = torch.zeros((batch_size, num_token), device=input_ids.device).bool() | |
| for special_token in SPECIAL_TOKENS: | |
| special_tokens_mask |= input_ids == special_token | |
| # idxs: each row is a list of indices of special tokens | |
| idxs = torch.nonzero(special_tokens_mask) | |
| # generate attention mask and positional ids | |
| attention_mask = torch.eye(num_token, device=input_ids.device).bool().unsqueeze(0).repeat(batch_size, 1, 1) | |
| position_ids = torch.zeros((batch_size, num_token), device=input_ids.device) | |
| previous_col = 0 | |
| for i in range(idxs.shape[0]): | |
| row, col = idxs[i] | |
| if (col == 0) or (col == num_token - 1): | |
| attention_mask[row, col, col] = True | |
| position_ids[row, col] = 0 | |
| else: | |
| attention_mask[row, previous_col + 1 : col + 1, previous_col + 1 : col + 1] = True | |
| position_ids[row, previous_col + 1 : col + 1] = torch.arange( | |
| 0, col - previous_col, device=input_ids.device | |
| ) | |
| previous_col = col | |
| return attention_mask, position_ids.to(torch.long) | |
| class GroundingDinoModel(GroundingDinoPreTrainedModel): | |
| def __init__(self, config: GroundingDinoConfig): | |
| super().__init__(config) | |
| # Create backbone + positional encoding | |
| backbone = GroundingDinoConvEncoder(config) | |
| position_embeddings = build_position_encoding(config) | |
| self.backbone = GroundingDinoConvModel(backbone, position_embeddings) | |
| # Create input projection layers | |
| if config.num_feature_levels > 1: | |
| num_backbone_outs = len(backbone.intermediate_channel_sizes) | |
| input_proj_list = [] | |
| for i in range(num_backbone_outs): | |
| in_channels = backbone.intermediate_channel_sizes[i] | |
| input_proj_list.append( | |
| nn.Sequential( | |
| nn.Conv2d(in_channels, config.d_model, kernel_size=1), | |
| nn.GroupNorm(32, config.d_model), | |
| ) | |
| ) | |
| for _ in range(config.num_feature_levels - num_backbone_outs): | |
| input_proj_list.append( | |
| nn.Sequential( | |
| nn.Conv2d(in_channels, config.d_model, kernel_size=3, stride=2, padding=1), | |
| nn.GroupNorm(32, config.d_model), | |
| ) | |
| ) | |
| in_channels = config.d_model | |
| self.input_proj_vision = nn.ModuleList(input_proj_list) | |
| else: | |
| self.input_proj_vision = nn.ModuleList( | |
| [ | |
| nn.Sequential( | |
| nn.Conv2d(backbone.intermediate_channel_sizes[-1], config.d_model, kernel_size=1), | |
| nn.GroupNorm(32, config.d_model), | |
| ) | |
| ] | |
| ) | |
| # Create text backbone | |
| self.text_backbone = AutoModel.from_config( | |
| config.text_config, add_pooling_layer=False, attn_implementation=config._attn_implementation | |
| ) | |
| self.text_projection = nn.Linear(config.text_config.hidden_size, config.d_model) | |
| if config.embedding_init_target or not config.two_stage: | |
| self.query_position_embeddings = nn.Embedding(config.num_queries, config.d_model) | |
| self.encoder = GroundingDinoEncoder(config) | |
| self.decoder = GroundingDinoDecoder(config) | |
| self.level_embed = nn.Parameter(torch.Tensor(config.num_feature_levels, config.d_model)) | |
| if config.two_stage: | |
| self.enc_output = nn.Linear(config.d_model, config.d_model) | |
| self.enc_output_norm = nn.LayerNorm(config.d_model, config.layer_norm_eps) | |
| if ( | |
| config.two_stage_bbox_embed_share | |
| and config.decoder_bbox_embed_share | |
| and self.decoder.bbox_embed is not None | |
| ): | |
| self.encoder_output_bbox_embed = self.decoder.bbox_embed | |
| else: | |
| self.encoder_output_bbox_embed = GroundingDinoMLPPredictionHead( | |
| input_dim=config.d_model, hidden_dim=config.d_model, output_dim=4, num_layers=3 | |
| ) | |
| self.encoder_output_class_embed = GroundingDinoContrastiveEmbedding(config) | |
| else: | |
| self.reference_points = nn.Embedding(config.num_queries, 4) | |
| self.post_init() | |
| def get_encoder(self): | |
| return self.encoder | |
| def get_decoder(self): | |
| return self.decoder | |
| def freeze_backbone(self): | |
| for name, param in self.backbone.conv_encoder.model.named_parameters(): | |
| param.requires_grad_(False) | |
| def unfreeze_backbone(self): | |
| for name, param in self.backbone.conv_encoder.model.named_parameters(): | |
| param.requires_grad_(True) | |
| def get_valid_ratio(self, mask): | |
| """Get the valid ratio of all feature maps.""" | |
| _, height, width = mask.shape | |
| valid_height = torch.sum(mask[:, :, 0], 1) | |
| valid_width = torch.sum(mask[:, 0, :], 1) | |
| valid_ratio_heigth = valid_height.float() / height | |
| valid_ratio_width = valid_width.float() / width | |
| valid_ratio = torch.stack([valid_ratio_width, valid_ratio_heigth], -1) | |
| return valid_ratio | |
| def generate_encoder_output_proposals(self, enc_output, padding_mask, spatial_shapes): | |
| """Generate the encoder output proposals from encoded enc_output. | |
| Args: | |
| enc_output (`torch.Tensor[batch_size, sequence_length, hidden_size]`): Output of the encoder. | |
| padding_mask (`torch.Tensor[batch_size, sequence_length]`): Padding mask for `enc_output`. | |
| spatial_shapes (`torch.Tensor[num_feature_levels, 2]`): Spatial shapes of the feature maps. | |
| Returns: | |
| `tuple(torch.FloatTensor)`: A tuple of feature map and bbox prediction. | |
| - object_query (Tensor[batch_size, sequence_length, hidden_size]): Object query features. Later used to | |
| directly predict a bounding box. (without the need of a decoder) | |
| - output_proposals (Tensor[batch_size, sequence_length, 4]): Normalized proposals, after an inverse | |
| sigmoid. | |
| """ | |
| batch_size = enc_output.shape[0] | |
| proposals = [] | |
| current_position = 0 | |
| for level, (height, width) in enumerate(spatial_shapes): | |
| mask_flatten_ = padding_mask[:, current_position : (current_position + height * width)] | |
| mask_flatten_ = mask_flatten_.view(batch_size, height, width, 1) | |
| valid_height = torch.sum(~mask_flatten_[:, :, 0, 0], 1) | |
| valid_width = torch.sum(~mask_flatten_[:, 0, :, 0], 1) | |
| grid_y, grid_x = meshgrid( | |
| torch.linspace(0, height - 1, height, dtype=torch.float32, device=enc_output.device), | |
| torch.linspace(0, width - 1, width, dtype=torch.float32, device=enc_output.device), | |
| indexing="ij", | |
| ) | |
| grid = torch.cat([grid_x.unsqueeze(-1), grid_y.unsqueeze(-1)], -1) | |
| scale = torch.cat([valid_width.unsqueeze(-1), valid_height.unsqueeze(-1)], 1).view(batch_size, 1, 1, 2) | |
| grid = (grid.unsqueeze(0).expand(batch_size, -1, -1, -1) + 0.5) / scale | |
| width_heigth = torch.ones_like(grid) * 0.05 * (2.0**level) | |
| proposal = torch.cat((grid, width_heigth), -1).view(batch_size, -1, 4) | |
| proposals.append(proposal) | |
| current_position += height * width | |
| output_proposals = torch.cat(proposals, 1) | |
| output_proposals_valid = ((output_proposals > 0.01) & (output_proposals < 0.99)).all(-1, keepdim=True) | |
| output_proposals = torch.log(output_proposals / (1 - output_proposals)) # inverse sigmoid | |
| output_proposals = output_proposals.masked_fill(padding_mask.unsqueeze(-1), float("inf")) | |
| output_proposals = output_proposals.masked_fill(~output_proposals_valid, float("inf")) | |
| # assign each pixel as an object query | |
| object_query = enc_output | |
| object_query = object_query.masked_fill(padding_mask.unsqueeze(-1), float(0)) | |
| object_query = object_query.masked_fill(~output_proposals_valid, float(0)) | |
| object_query = self.enc_output_norm(self.enc_output(object_query)) | |
| return object_query, output_proposals | |
| def forward( | |
| self, | |
| pixel_values: Tensor, | |
| input_ids: Tensor, | |
| token_type_ids: Optional[Tensor] = None, | |
| attention_mask: Optional[Tensor] = None, | |
| pixel_mask: Optional[Tensor] = None, | |
| encoder_outputs=None, | |
| output_attentions=None, | |
| output_hidden_states=None, | |
| return_dict=None, | |
| ): | |
| r""" | |
| Returns: | |
| Examples: | |
| ```python | |
| >>> from transformers import AutoProcessor, AutoModel | |
| >>> from PIL import Image | |
| >>> import requests | |
| >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg" | |
| >>> image = Image.open(requests.get(url, stream=True).raw) | |
| >>> text = "a cat." | |
| >>> processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-tiny") | |
| >>> model = AutoModel.from_pretrained("IDEA-Research/grounding-dino-tiny") | |
| >>> inputs = processor(images=image, text=text, return_tensors="pt") | |
| >>> outputs = model(**inputs) | |
| >>> last_hidden_states = outputs.last_hidden_state | |
| >>> list(last_hidden_states.shape) | |
| [1, 900, 256] | |
| ```""" | |
| output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions | |
| output_hidden_states = ( | |
| output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states | |
| ) | |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
| text_self_attention_masks, position_ids = generate_masks_with_special_tokens_and_transfer_map(input_ids) | |
| if attention_mask is None: | |
| attention_mask = torch.ones_like(input_ids) | |
| if token_type_ids is None: | |
| token_type_ids = torch.zeros_like(input_ids) | |
| text_token_mask = attention_mask.bool() # just to avoid renaming everywhere | |
| max_text_len = self.config.max_text_len | |
| if text_self_attention_masks.shape[1] > max_text_len: | |
| text_self_attention_masks = text_self_attention_masks[:, :max_text_len, :max_text_len] | |
| position_ids = position_ids[:, :max_text_len] | |
| input_ids = input_ids[:, :max_text_len] | |
| token_type_ids = token_type_ids[:, :max_text_len] | |
| text_token_mask = text_token_mask[:, :max_text_len] | |
| # Extract text features from text backbone | |
| text_outputs = self.text_backbone( | |
| input_ids, text_self_attention_masks, token_type_ids, position_ids, return_dict=return_dict | |
| ) | |
| text_features = text_outputs.last_hidden_state if return_dict else text_outputs[0] | |
| text_features = self.text_projection(text_features) | |
| batch_size, num_channels, height, width = pixel_values.shape | |
| device = pixel_values.device | |
| if pixel_mask is None: | |
| pixel_mask = torch.ones(((batch_size, height, width)), dtype=torch.long, device=device) | |
| # Extract multi-scale feature maps of same resolution `config.d_model` (cf Figure 4 in paper) | |
| # First, sent pixel_values + pixel_mask through Backbone to obtain the features | |
| # which is a list of tuples | |
| vision_features, position_embeddings_list = self.backbone(pixel_values, pixel_mask) | |
| # Then, apply 1x1 convolution to reduce the channel dimension to d_model (256 by default) | |
| feature_maps = [] | |
| masks = [] | |
| for level, (source, mask) in enumerate(vision_features): | |
| feature_maps.append(self.input_proj_vision[level](source)) | |
| masks.append(mask) | |
| # Lowest resolution feature maps are obtained via 3x3 stride 2 convolutions on the final stage | |
| if self.config.num_feature_levels > len(feature_maps): | |
| _len_sources = len(feature_maps) | |
| for level in range(_len_sources, self.config.num_feature_levels): | |
| if level == _len_sources: | |
| source = self.input_proj_vision[level](vision_features[-1][0]) | |
| else: | |
| source = self.input_proj_vision[level](feature_maps[-1]) | |
| mask = nn.functional.interpolate(pixel_mask[None].float(), size=source.shape[-2:]).to(torch.bool)[0] | |
| pos_l = self.backbone.position_embedding(source, mask).to(source.dtype) | |
| feature_maps.append(source) | |
| masks.append(mask) | |
| position_embeddings_list.append(pos_l) | |
| # Create queries | |
| query_embeds = None | |
| if self.config.embedding_init_target or self.config.two_stage: | |
| query_embeds = self.query_position_embeddings.weight | |
| # Prepare encoder inputs (by flattening) | |
| source_flatten = [] | |
| mask_flatten = [] | |
| lvl_pos_embed_flatten = [] | |
| spatial_shapes = [] | |
| for level, (source, mask, pos_embed) in enumerate(zip(feature_maps, masks, position_embeddings_list)): | |
| batch_size, num_channels, height, width = source.shape | |
| spatial_shape = (height, width) | |
| spatial_shapes.append(spatial_shape) | |
| source = source.flatten(2).transpose(1, 2) | |
| mask = mask.flatten(1) | |
| pos_embed = pos_embed.flatten(2).transpose(1, 2) | |
| lvl_pos_embed = pos_embed + self.level_embed[level].view(1, 1, -1) | |
| lvl_pos_embed_flatten.append(lvl_pos_embed) | |
| source_flatten.append(source) | |
| mask_flatten.append(mask) | |
| source_flatten = torch.cat(source_flatten, 1) | |
| mask_flatten = torch.cat(mask_flatten, 1) | |
| lvl_pos_embed_flatten = torch.cat(lvl_pos_embed_flatten, 1) | |
| spatial_shapes = torch.as_tensor(spatial_shapes, dtype=torch.long, device=source_flatten.device) | |
| level_start_index = torch.cat((spatial_shapes.new_zeros((1,)), spatial_shapes.prod(1).cumsum(0)[:-1])) | |
| valid_ratios = torch.stack([self.get_valid_ratio(m) for m in masks], 1) | |
| valid_ratios = valid_ratios.float() | |
| # Fourth, sent source_flatten + mask_flatten + lvl_pos_embed_flatten (backbone + proj layer output) through encoder | |
| # Also provide spatial_shapes, level_start_index and valid_ratios | |
| if encoder_outputs is None: | |
| encoder_outputs = self.encoder( | |
| vision_features=source_flatten, | |
| vision_attention_mask=~mask_flatten, | |
| vision_position_embedding=lvl_pos_embed_flatten, | |
| spatial_shapes=spatial_shapes, | |
| level_start_index=level_start_index, | |
| valid_ratios=valid_ratios, | |
| text_features=text_features, | |
| text_attention_mask=~text_token_mask, | |
| text_position_embedding=None, | |
| text_self_attention_masks=~text_self_attention_masks, | |
| text_position_ids=position_ids, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| # If the user passed a tuple for encoder_outputs, we wrap it in a GroundingDinoEncoderOutput when return_dict=True | |
| elif return_dict and not isinstance(encoder_outputs, GroundingDinoEncoderOutput): | |
| encoder_outputs = GroundingDinoEncoderOutput( | |
| last_hidden_state_vision=encoder_outputs[0], | |
| last_hidden_state_text=encoder_outputs[1], | |
| vision_hidden_states=encoder_outputs[2] if output_hidden_states else None, | |
| text_hidden_states=encoder_outputs[3] if output_hidden_states else None, | |
| attentions=encoder_outputs[-1] if output_attentions else None, | |
| ) | |
| # Fifth, prepare decoder inputs | |
| enc_outputs_class = None | |
| enc_outputs_coord_logits = None | |
| if self.config.two_stage: | |
| object_query_embedding, output_proposals = self.generate_encoder_output_proposals( | |
| encoder_outputs[0], ~mask_flatten, spatial_shapes | |
| ) | |
| # hack implementation as in two-stage Deformable DETR | |
| # apply a detection head to each pixel (A.4 in paper) | |
| # linear projection for bounding box binary classification (i.e. foreground and background) | |
| enc_outputs_class = self.encoder_output_class_embed( | |
| object_query_embedding, encoder_outputs[1], text_token_mask | |
| ) | |
| # 3-layer FFN to predict bounding boxes coordinates (bbox regression branch) | |
| delta_bbox = self.encoder_output_bbox_embed(object_query_embedding) | |
| enc_outputs_coord_logits = delta_bbox + output_proposals | |
| # only keep top scoring `config.num_queries` proposals | |
| topk = self.config.num_queries | |
| topk_logits = enc_outputs_class.max(-1)[0] | |
| topk_proposals = torch.topk(topk_logits, topk, dim=1)[1] | |
| topk_coords_logits = torch.gather( | |
| enc_outputs_coord_logits, 1, topk_proposals.unsqueeze(-1).repeat(1, 1, 4) | |
| ) | |
| topk_coords_logits = topk_coords_logits.detach() | |
| reference_points = topk_coords_logits.sigmoid() | |
| init_reference_points = reference_points | |
| if query_embeds is not None: | |
| target = query_embeds.unsqueeze(0).repeat(batch_size, 1, 1) | |
| else: | |
| target = torch.gather( | |
| object_query_embedding, 1, topk_proposals.unsqueeze(-1).repeat(1, 1, self.d_model) | |
| ).detach() | |
| else: | |
| target = query_embeds.unsqueeze(0).repeat(batch_size, 1, 1) | |
| reference_points = self.reference_points.weight.unsqueeze(0).repeat(batch_size, 1, 1).sigmoid() | |
| init_reference_points = reference_points | |
| decoder_outputs = self.decoder( | |
| inputs_embeds=target, | |
| vision_encoder_hidden_states=encoder_outputs[0], | |
| vision_encoder_attention_mask=mask_flatten, | |
| text_encoder_hidden_states=encoder_outputs[1], | |
| text_encoder_attention_mask=~text_token_mask, | |
| reference_points=reference_points, | |
| spatial_shapes=spatial_shapes, | |
| level_start_index=level_start_index, | |
| valid_ratios=valid_ratios, | |
| self_attn_mask=None, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| if not return_dict: | |
| enc_outputs = tuple(value for value in [enc_outputs_class, enc_outputs_coord_logits] if value is not None) | |
| tuple_outputs = ( | |
| (decoder_outputs[0], init_reference_points) + decoder_outputs[1:] + encoder_outputs + enc_outputs | |
| ) | |
| return tuple_outputs | |
| return GroundingDinoModelOutput( | |
| last_hidden_state=decoder_outputs.last_hidden_state, | |
| init_reference_points=init_reference_points, | |
| intermediate_hidden_states=decoder_outputs.intermediate_hidden_states, | |
| intermediate_reference_points=decoder_outputs.intermediate_reference_points, | |
| decoder_hidden_states=decoder_outputs.hidden_states, | |
| decoder_attentions=decoder_outputs.attentions, | |
| encoder_last_hidden_state_vision=encoder_outputs.last_hidden_state_vision, | |
| encoder_last_hidden_state_text=encoder_outputs.last_hidden_state_text, | |
| encoder_vision_hidden_states=encoder_outputs.vision_hidden_states, | |
| encoder_text_hidden_states=encoder_outputs.text_hidden_states, | |
| encoder_attentions=encoder_outputs.attentions, | |
| enc_outputs_class=enc_outputs_class, | |
| enc_outputs_coord_logits=enc_outputs_coord_logits, | |
| ) | |
| # Copied from transformers.models.detr.modeling_detr.DetrMLPPredictionHead | |
| class GroundingDinoMLPPredictionHead(nn.Module): | |
| """ | |
| Very simple multi-layer perceptron (MLP, also called FFN), used to predict the normalized center coordinates, | |
| height and width of a bounding box w.r.t. an image. | |
| Copied from https://github.com/facebookresearch/detr/blob/master/models/detr.py | |
| """ | |
| def __init__(self, input_dim, hidden_dim, output_dim, num_layers): | |
| super().__init__() | |
| self.num_layers = num_layers | |
| h = [hidden_dim] * (num_layers - 1) | |
| self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([input_dim] + h, h + [output_dim])) | |
| def forward(self, x): | |
| for i, layer in enumerate(self.layers): | |
| x = nn.functional.relu(layer(x)) if i < self.num_layers - 1 else layer(x) | |
| return x | |
| # Copied from transformers.models.detr.modeling_detr._upcast | |
| def _upcast(t: Tensor) -> Tensor: | |
| # Protects from numerical overflows in multiplications by upcasting to the equivalent higher type | |
| if t.is_floating_point(): | |
| return t if t.dtype in (torch.float32, torch.float64) else t.float() | |
| else: | |
| return t if t.dtype in (torch.int32, torch.int64) else t.int() | |
| # Copied from transformers.models.detr.modeling_detr.box_area | |
| def box_area(boxes: Tensor) -> Tensor: | |
| """ | |
| Computes the area of a set of bounding boxes, which are specified by its (x1, y1, x2, y2) coordinates. | |
| Args: | |
| boxes (`torch.FloatTensor` of shape `(number_of_boxes, 4)`): | |
| Boxes for which the area will be computed. They are expected to be in (x1, y1, x2, y2) format with `0 <= x1 | |
| < x2` and `0 <= y1 < y2`. | |
| Returns: | |
| `torch.FloatTensor`: a tensor containing the area for each box. | |
| """ | |
| boxes = _upcast(boxes) | |
| return (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) | |
| # Copied from transformers.models.detr.modeling_detr.box_iou | |
| def box_iou(boxes1, boxes2): | |
| area1 = box_area(boxes1) | |
| area2 = box_area(boxes2) | |
| left_top = torch.max(boxes1[:, None, :2], boxes2[:, :2]) # [N,M,2] | |
| right_bottom = torch.min(boxes1[:, None, 2:], boxes2[:, 2:]) # [N,M,2] | |
| width_height = (right_bottom - left_top).clamp(min=0) # [N,M,2] | |
| inter = width_height[:, :, 0] * width_height[:, :, 1] # [N,M] | |
| union = area1[:, None] + area2 - inter | |
| iou = inter / union | |
| return iou, union | |
| # Copied from transformers.models.detr.modeling_detr.generalized_box_iou | |
| def generalized_box_iou(boxes1, boxes2): | |
| """ | |
| Generalized IoU from https://giou.stanford.edu/. The boxes should be in [x0, y0, x1, y1] (corner) format. | |
| Returns: | |
| `torch.FloatTensor`: a [N, M] pairwise matrix, where N = len(boxes1) and M = len(boxes2) | |
| """ | |
| # degenerate boxes gives inf / nan results | |
| # so do an early check | |
| if not (boxes1[:, 2:] >= boxes1[:, :2]).all(): | |
| raise ValueError(f"boxes1 must be in [x0, y0, x1, y1] (corner) format, but got {boxes1}") | |
| if not (boxes2[:, 2:] >= boxes2[:, :2]).all(): | |
| raise ValueError(f"boxes2 must be in [x0, y0, x1, y1] (corner) format, but got {boxes2}") | |
| iou, union = box_iou(boxes1, boxes2) | |
| top_left = torch.min(boxes1[:, None, :2], boxes2[:, :2]) | |
| bottom_right = torch.max(boxes1[:, None, 2:], boxes2[:, 2:]) | |
| width_height = (bottom_right - top_left).clamp(min=0) # [N,M,2] | |
| area = width_height[:, :, 0] * width_height[:, :, 1] | |
| return iou - (area - union) / area | |
| # Copied from transformers.models.detr.modeling_detr._max_by_axis | |
| def _max_by_axis(the_list): | |
| # type: (List[List[int]]) -> List[int] | |
| maxes = the_list[0] | |
| for sublist in the_list[1:]: | |
| for index, item in enumerate(sublist): | |
| maxes[index] = max(maxes[index], item) | |
| return maxes | |
| # Copied from transformers.models.detr.modeling_detr.dice_loss | |
| def dice_loss(inputs, targets, num_boxes): | |
| """ | |
| Compute the DICE loss, similar to generalized IOU for masks | |
| Args: | |
| inputs: A float tensor of arbitrary shape. | |
| The predictions for each example. | |
| targets: A float tensor with the same shape as inputs. Stores the binary | |
| classification label for each element in inputs (0 for the negative class and 1 for the positive | |
| class). | |
| """ | |
| inputs = inputs.sigmoid() | |
| inputs = inputs.flatten(1) | |
| numerator = 2 * (inputs * targets).sum(1) | |
| denominator = inputs.sum(-1) + targets.sum(-1) | |
| loss = 1 - (numerator + 1) / (denominator + 1) | |
| return loss.sum() / num_boxes | |
| # Copied from transformers.models.detr.modeling_detr.sigmoid_focal_loss | |
| def sigmoid_focal_loss(inputs, targets, num_boxes, alpha: float = 0.25, gamma: float = 2): | |
| """ | |
| Loss used in RetinaNet for dense detection: https://arxiv.org/abs/1708.02002. | |
| Args: | |
| inputs (`torch.FloatTensor` of arbitrary shape): | |
| The predictions for each example. | |
| targets (`torch.FloatTensor` with the same shape as `inputs`) | |
| A tensor storing the binary classification label for each element in the `inputs` (0 for the negative class | |
| and 1 for the positive class). | |
| alpha (`float`, *optional*, defaults to `0.25`): | |
| Optional weighting factor in the range (0,1) to balance positive vs. negative examples. | |
| gamma (`int`, *optional*, defaults to `2`): | |
| Exponent of the modulating factor (1 - p_t) to balance easy vs hard examples. | |
| Returns: | |
| Loss tensor | |
| """ | |
| prob = inputs.sigmoid() | |
| ce_loss = nn.functional.binary_cross_entropy_with_logits(inputs, targets, reduction="none") | |
| # add modulating factor | |
| p_t = prob * targets + (1 - prob) * (1 - targets) | |
| loss = ce_loss * ((1 - p_t) ** gamma) | |
| if alpha >= 0: | |
| alpha_t = alpha * targets + (1 - alpha) * (1 - targets) | |
| loss = alpha_t * loss | |
| return loss.mean(1).sum() / num_boxes | |
| # Copied from transformers.models.detr.modeling_detr.NestedTensor | |
| class NestedTensor(object): | |
| def __init__(self, tensors, mask: Optional[Tensor]): | |
| self.tensors = tensors | |
| self.mask = mask | |
| def to(self, device): | |
| cast_tensor = self.tensors.to(device) | |
| mask = self.mask | |
| if mask is not None: | |
| cast_mask = mask.to(device) | |
| else: | |
| cast_mask = None | |
| return NestedTensor(cast_tensor, cast_mask) | |
| def decompose(self): | |
| return self.tensors, self.mask | |
| def __repr__(self): | |
| return str(self.tensors) | |
| # Copied from transformers.models.detr.modeling_detr.nested_tensor_from_tensor_list | |
| def nested_tensor_from_tensor_list(tensor_list: List[Tensor]): | |
| if tensor_list[0].ndim == 3: | |
| max_size = _max_by_axis([list(img.shape) for img in tensor_list]) | |
| batch_shape = [len(tensor_list)] + max_size | |
| batch_size, num_channels, height, width = batch_shape | |
| dtype = tensor_list[0].dtype | |
| device = tensor_list[0].device | |
| tensor = torch.zeros(batch_shape, dtype=dtype, device=device) | |
| mask = torch.ones((batch_size, height, width), dtype=torch.bool, device=device) | |
| for img, pad_img, m in zip(tensor_list, tensor, mask): | |
| pad_img[: img.shape[0], : img.shape[1], : img.shape[2]].copy_(img) | |
| m[: img.shape[1], : img.shape[2]] = False | |
| else: | |
| raise ValueError("Only 3-dimensional tensors are supported") | |
| return NestedTensor(tensor, mask) | |
| # Copied from transformers.models.deformable_detr.modeling_deformable_detr.DeformableDetrHungarianMatcher with DeformableDetr->GroundingDino | |
| class GroundingDinoHungarianMatcher(nn.Module): | |
| """ | |
| This class computes an assignment between the targets and the predictions of the network. | |
| For efficiency reasons, the targets don't include the no_object. Because of this, in general, there are more | |
| predictions than targets. In this case, we do a 1-to-1 matching of the best predictions, while the others are | |
| un-matched (and thus treated as non-objects). | |
| Args: | |
| class_cost: | |
| The relative weight of the classification error in the matching cost. | |
| bbox_cost: | |
| The relative weight of the L1 error of the bounding box coordinates in the matching cost. | |
| giou_cost: | |
| The relative weight of the giou loss of the bounding box in the matching cost. | |
| """ | |
| def __init__(self, class_cost: float = 1, bbox_cost: float = 1, giou_cost: float = 1): | |
| super().__init__() | |
| requires_backends(self, ["scipy"]) | |
| self.class_cost = class_cost | |
| self.bbox_cost = bbox_cost | |
| self.giou_cost = giou_cost | |
| if class_cost == 0 and bbox_cost == 0 and giou_cost == 0: | |
| raise ValueError("All costs of the Matcher can't be 0") | |
| def forward(self, outputs, targets): | |
| """ | |
| Args: | |
| outputs (`dict`): | |
| A dictionary that contains at least these entries: | |
| * "logits": Tensor of dim [batch_size, num_queries, num_classes] with the classification logits | |
| * "pred_boxes": Tensor of dim [batch_size, num_queries, 4] with the predicted box coordinates. | |
| targets (`List[dict]`): | |
| A list of targets (len(targets) = batch_size), where each target is a dict containing: | |
| * "class_labels": Tensor of dim [num_target_boxes] (where num_target_boxes is the number of | |
| ground-truth | |
| objects in the target) containing the class labels | |
| * "boxes": Tensor of dim [num_target_boxes, 4] containing the target box coordinates. | |
| Returns: | |
| `List[Tuple]`: A list of size `batch_size`, containing tuples of (index_i, index_j) where: | |
| - index_i is the indices of the selected predictions (in order) | |
| - index_j is the indices of the corresponding selected targets (in order) | |
| For each batch element, it holds: len(index_i) = len(index_j) = min(num_queries, num_target_boxes) | |
| """ | |
| batch_size, num_queries = outputs["logits"].shape[:2] | |
| # We flatten to compute the cost matrices in a batch | |
| out_prob = outputs["logits"].flatten(0, 1).sigmoid() # [batch_size * num_queries, num_classes] | |
| out_bbox = outputs["pred_boxes"].flatten(0, 1) # [batch_size * num_queries, 4] | |
| # Also concat the target labels and boxes | |
| target_ids = torch.cat([v["class_labels"] for v in targets]) | |
| target_bbox = torch.cat([v["boxes"] for v in targets]) | |
| # Compute the classification cost. | |
| alpha = 0.25 | |
| gamma = 2.0 | |
| neg_cost_class = (1 - alpha) * (out_prob**gamma) * (-(1 - out_prob + 1e-8).log()) | |
| pos_cost_class = alpha * ((1 - out_prob) ** gamma) * (-(out_prob + 1e-8).log()) | |
| class_cost = pos_cost_class[:, target_ids] - neg_cost_class[:, target_ids] | |
| # Compute the L1 cost between boxes | |
| bbox_cost = torch.cdist(out_bbox, target_bbox, p=1) | |
| # Compute the giou cost between boxes | |
| giou_cost = -generalized_box_iou(center_to_corners_format(out_bbox), center_to_corners_format(target_bbox)) | |
| # Final cost matrix | |
| cost_matrix = self.bbox_cost * bbox_cost + self.class_cost * class_cost + self.giou_cost * giou_cost | |
| cost_matrix = cost_matrix.view(batch_size, num_queries, -1).cpu() | |
| sizes = [len(v["boxes"]) for v in targets] | |
| indices = [linear_sum_assignment(c[i]) for i, c in enumerate(cost_matrix.split(sizes, -1))] | |
| return [(torch.as_tensor(i, dtype=torch.int64), torch.as_tensor(j, dtype=torch.int64)) for i, j in indices] | |
| # Copied from transformers.models.deformable_detr.modeling_deformable_detr.DeformableDetrLoss with DeformableDetr->GroundingDino | |
| class GroundingDinoLoss(nn.Module): | |
| """ | |
| This class computes the losses for `GroundingDinoForObjectDetection`. The process happens in two steps: 1) we | |
| compute hungarian assignment between ground truth boxes and the outputs of the model 2) we supervise each pair of | |
| matched ground-truth / prediction (supervise class and box). | |
| Args: | |
| matcher (`GroundingDinoHungarianMatcher`): | |
| Module able to compute a matching between targets and proposals. | |
| num_classes (`int`): | |
| Number of object categories, omitting the special no-object category. | |
| focal_alpha (`float`): | |
| Alpha parameter in focal loss. | |
| losses (`List[str]`): | |
| List of all the losses to be applied. See `get_loss` for a list of all available losses. | |
| """ | |
| def __init__(self, matcher, num_classes, focal_alpha, losses): | |
| super().__init__() | |
| self.matcher = matcher | |
| self.num_classes = num_classes | |
| self.focal_alpha = focal_alpha | |
| self.losses = losses | |
| # removed logging parameter, which was part of the original implementation | |
| def loss_labels(self, outputs, targets, indices, num_boxes): | |
| """ | |
| Classification loss (Binary focal loss) targets dicts must contain the key "class_labels" containing a tensor | |
| of dim [nb_target_boxes] | |
| """ | |
| if "logits" not in outputs: | |
| raise KeyError("No logits were found in the outputs") | |
| source_logits = outputs["logits"] | |
| idx = self._get_source_permutation_idx(indices) | |
| target_classes_o = torch.cat([t["class_labels"][J] for t, (_, J) in zip(targets, indices)]) | |
| target_classes = torch.full( | |
| source_logits.shape[:2], self.num_classes, dtype=torch.int64, device=source_logits.device | |
| ) | |
| target_classes[idx] = target_classes_o | |
| target_classes_onehot = torch.zeros( | |
| [source_logits.shape[0], source_logits.shape[1], source_logits.shape[2] + 1], | |
| dtype=source_logits.dtype, | |
| layout=source_logits.layout, | |
| device=source_logits.device, | |
| ) | |
| target_classes_onehot.scatter_(2, target_classes.unsqueeze(-1), 1) | |
| target_classes_onehot = target_classes_onehot[:, :, :-1] | |
| loss_ce = ( | |
| sigmoid_focal_loss(source_logits, target_classes_onehot, num_boxes, alpha=self.focal_alpha, gamma=2) | |
| * source_logits.shape[1] | |
| ) | |
| losses = {"loss_ce": loss_ce} | |
| return losses | |
| # Copied from transformers.models.detr.modeling_detr.DetrLoss.loss_cardinality | |
| def loss_cardinality(self, outputs, targets, indices, num_boxes): | |
| """ | |
| Compute the cardinality error, i.e. the absolute error in the number of predicted non-empty boxes. | |
| This is not really a loss, it is intended for logging purposes only. It doesn't propagate gradients. | |
| """ | |
| logits = outputs["logits"] | |
| device = logits.device | |
| target_lengths = torch.as_tensor([len(v["class_labels"]) for v in targets], device=device) | |
| # Count the number of predictions that are NOT "no-object" (which is the last class) | |
| card_pred = (logits.argmax(-1) != logits.shape[-1] - 1).sum(1) | |
| card_err = nn.functional.l1_loss(card_pred.float(), target_lengths.float()) | |
| losses = {"cardinality_error": card_err} | |
| return losses | |
| # Copied from transformers.models.detr.modeling_detr.DetrLoss.loss_boxes | |
| def loss_boxes(self, outputs, targets, indices, num_boxes): | |
| """ | |
| Compute the losses related to the bounding boxes, the L1 regression loss and the GIoU loss. | |
| Targets dicts must contain the key "boxes" containing a tensor of dim [nb_target_boxes, 4]. The target boxes | |
| are expected in format (center_x, center_y, w, h), normalized by the image size. | |
| """ | |
| if "pred_boxes" not in outputs: | |
| raise KeyError("No predicted boxes found in outputs") | |
| idx = self._get_source_permutation_idx(indices) | |
| source_boxes = outputs["pred_boxes"][idx] | |
| target_boxes = torch.cat([t["boxes"][i] for t, (_, i) in zip(targets, indices)], dim=0) | |
| loss_bbox = nn.functional.l1_loss(source_boxes, target_boxes, reduction="none") | |
| losses = {} | |
| losses["loss_bbox"] = loss_bbox.sum() / num_boxes | |
| loss_giou = 1 - torch.diag( | |
| generalized_box_iou(center_to_corners_format(source_boxes), center_to_corners_format(target_boxes)) | |
| ) | |
| losses["loss_giou"] = loss_giou.sum() / num_boxes | |
| return losses | |
| # Copied from transformers.models.detr.modeling_detr.DetrLoss._get_source_permutation_idx | |
| def _get_source_permutation_idx(self, indices): | |
| # permute predictions following indices | |
| batch_idx = torch.cat([torch.full_like(source, i) for i, (source, _) in enumerate(indices)]) | |
| source_idx = torch.cat([source for (source, _) in indices]) | |
| return batch_idx, source_idx | |
| # Copied from transformers.models.detr.modeling_detr.DetrLoss._get_target_permutation_idx | |
| def _get_target_permutation_idx(self, indices): | |
| # permute targets following indices | |
| batch_idx = torch.cat([torch.full_like(target, i) for i, (_, target) in enumerate(indices)]) | |
| target_idx = torch.cat([target for (_, target) in indices]) | |
| return batch_idx, target_idx | |
| def get_loss(self, loss, outputs, targets, indices, num_boxes): | |
| loss_map = { | |
| "labels": self.loss_labels, | |
| "cardinality": self.loss_cardinality, | |
| "boxes": self.loss_boxes, | |
| } | |
| if loss not in loss_map: | |
| raise ValueError(f"Loss {loss} not supported") | |
| return loss_map[loss](outputs, targets, indices, num_boxes) | |
| def forward(self, outputs, targets): | |
| """ | |
| This performs the loss computation. | |
| Args: | |
| outputs (`dict`, *optional*): | |
| Dictionary of tensors, see the output specification of the model for the format. | |
| targets (`List[dict]`, *optional*): | |
| List of dicts, such that `len(targets) == batch_size`. The expected keys in each dict depends on the | |
| losses applied, see each loss' doc. | |
| """ | |
| outputs_without_aux = {k: v for k, v in outputs.items() if k != "auxiliary_outputs" and k != "enc_outputs"} | |
| # Retrieve the matching between the outputs of the last layer and the targets | |
| indices = self.matcher(outputs_without_aux, targets) | |
| # Compute the average number of target boxes accross all nodes, for normalization purposes | |
| num_boxes = sum(len(t["class_labels"]) for t in targets) | |
| num_boxes = torch.as_tensor([num_boxes], dtype=torch.float, device=next(iter(outputs.values())).device) | |
| world_size = 1 | |
| if is_accelerate_available(): | |
| if PartialState._shared_state != {}: | |
| num_boxes = reduce(num_boxes) | |
| world_size = PartialState().num_processes | |
| num_boxes = torch.clamp(num_boxes / world_size, min=1).item() | |
| # Compute all the requested losses | |
| losses = {} | |
| for loss in self.losses: | |
| losses.update(self.get_loss(loss, outputs, targets, indices, num_boxes)) | |
| # In case of auxiliary losses, we repeat this process with the output of each intermediate layer. | |
| if "auxiliary_outputs" in outputs: | |
| for i, auxiliary_outputs in enumerate(outputs["auxiliary_outputs"]): | |
| indices = self.matcher(auxiliary_outputs, targets) | |
| for loss in self.losses: | |
| l_dict = self.get_loss(loss, auxiliary_outputs, targets, indices, num_boxes) | |
| l_dict = {k + f"_{i}": v for k, v in l_dict.items()} | |
| losses.update(l_dict) | |
| if "enc_outputs" in outputs: | |
| enc_outputs = outputs["enc_outputs"] | |
| bin_targets = copy.deepcopy(targets) | |
| for bt in bin_targets: | |
| bt["class_labels"] = torch.zeros_like(bt["class_labels"]) | |
| indices = self.matcher(enc_outputs, bin_targets) | |
| for loss in self.losses: | |
| l_dict = self.get_loss(loss, enc_outputs, bin_targets, indices, num_boxes) | |
| l_dict = {k + "_enc": v for k, v in l_dict.items()} | |
| losses.update(l_dict) | |
| return losses | |
| class GroundingDinoForObjectDetection(GroundingDinoPreTrainedModel): | |
| # When using clones, all layers > 0 will be clones, but layer 0 *is* required | |
| # the bbox_embed in the decoder are all clones though | |
| _tied_weights_keys = [r"bbox_embed\.[1-9]\d*", r"model\.decoder\.bbox_embed\.[0-9]\d*"] | |
| def __init__(self, config: GroundingDinoConfig): | |
| super().__init__(config) | |
| self.model = GroundingDinoModel(config) | |
| _class_embed = GroundingDinoContrastiveEmbedding(config) | |
| if config.decoder_bbox_embed_share: | |
| _bbox_embed = GroundingDinoMLPPredictionHead( | |
| input_dim=config.d_model, hidden_dim=config.d_model, output_dim=4, num_layers=3 | |
| ) | |
| self.bbox_embed = nn.ModuleList([_bbox_embed for _ in range(config.decoder_layers)]) | |
| else: | |
| model_list = [] | |
| for _ in range(config.decoder_layers): | |
| _bbox_embed = GroundingDinoMLPPredictionHead( | |
| input_dim=config.d_model, hidden_dim=config.d_model, output_dim=4, num_layers=3 | |
| ) | |
| model_list.append(_bbox_embed) | |
| self.bbox_embed = nn.ModuleList(model_list) | |
| self.class_embed = nn.ModuleList([_class_embed for _ in range(config.decoder_layers)]) | |
| # hack for box-refinement | |
| self.model.decoder.bbox_embed = self.bbox_embed | |
| # hack implementation for two-stage | |
| self.model.decoder.class_embed = self.class_embed | |
| # Initialize weights and apply final processing | |
| self.post_init() | |
| # taken from https://github.com/facebookresearch/detr/blob/master/models/detr.py | |
| def _set_aux_loss(self, outputs_class, outputs_coord): | |
| # this is a workaround to make torchscript happy, as torchscript | |
| # doesn't support dictionary with non-homogeneous values, such | |
| # as a dict having both a Tensor and a list. | |
| return [{"logits": a, "pred_boxes": b} for a, b in zip(outputs_class[:-1], outputs_coord[:-1])] | |
| def forward( | |
| self, | |
| pixel_values: torch.FloatTensor, | |
| input_ids: torch.LongTensor, | |
| token_type_ids: torch.LongTensor = None, | |
| attention_mask: torch.LongTensor = None, | |
| pixel_mask: Optional[torch.BoolTensor] = None, | |
| encoder_outputs: Optional[Union[GroundingDinoEncoderOutput, Tuple]] = None, | |
| output_attentions: Optional[bool] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| labels: List[Dict[str, Union[torch.LongTensor, torch.FloatTensor]]] = None, | |
| ): | |
| r""" | |
| labels (`List[Dict]` of len `(batch_size,)`, *optional*): | |
| Labels for computing the bipartite matching loss. List of dicts, each dictionary containing at least the | |
| following 2 keys: 'class_labels' and 'boxes' (the class labels and bounding boxes of an image in the batch | |
| respectively). The class labels themselves should be a `torch.LongTensor` of len `(number of bounding boxes | |
| in the image,)` and the boxes a `torch.FloatTensor` of shape `(number of bounding boxes in the image, 4)`. | |
| Returns: | |
| Examples: | |
| ```python | |
| >>> from transformers import AutoProcessor, GroundingDinoForObjectDetection | |
| >>> from PIL import Image | |
| >>> import requests | |
| >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg" | |
| >>> image = Image.open(requests.get(url, stream=True).raw) | |
| >>> text = "a cat." | |
| >>> processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-tiny") | |
| >>> model = GroundingDinoForObjectDetection.from_pretrained("IDEA-Research/grounding-dino-tiny") | |
| >>> inputs = processor(images=image, text=text, return_tensors="pt") | |
| >>> outputs = model(**inputs) | |
| >>> # convert outputs (bounding boxes and class logits) to COCO API | |
| >>> target_sizes = torch.tensor([image.size[::-1]]) | |
| >>> results = processor.image_processor.post_process_object_detection( | |
| ... outputs, threshold=0.35, target_sizes=target_sizes | |
| ... )[0] | |
| >>> for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): | |
| ... box = [round(i, 1) for i in box.tolist()] | |
| ... print(f"Detected {label.item()} with confidence " f"{round(score.item(), 2)} at location {box}") | |
| Detected 1 with confidence 0.45 at location [344.8, 23.2, 637.4, 373.8] | |
| Detected 1 with confidence 0.41 at location [11.9, 51.6, 316.6, 472.9] | |
| ```""" | |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
| if attention_mask is None: | |
| attention_mask = torch.ones_like(input_ids) | |
| # First, sent images through Grounding DINO base model to obtain encoder + decoder outputs | |
| outputs = self.model( | |
| pixel_values=pixel_values, | |
| input_ids=input_ids, | |
| token_type_ids=token_type_ids, | |
| attention_mask=attention_mask, | |
| pixel_mask=pixel_mask, | |
| encoder_outputs=encoder_outputs, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| idx = 5 + (1 if output_attentions else 0) + (1 if output_hidden_states else 0) | |
| enc_text_hidden_state = outputs.encoder_last_hidden_state_text if return_dict else outputs[idx] | |
| hidden_states = outputs.intermediate_hidden_states if return_dict else outputs[2] | |
| init_reference_points = outputs.init_reference_points if return_dict else outputs[1] | |
| inter_references_points = outputs.intermediate_reference_points if return_dict else outputs[3] | |
| # class logits + predicted bounding boxes | |
| outputs_classes = [] | |
| outputs_coords = [] | |
| # hidden_states are of shape (batch_size, num_stages, height, width) | |
| # predict class and bounding box deltas for each stage | |
| num_levels = hidden_states.shape[1] | |
| for level in range(num_levels): | |
| if level == 0: | |
| reference = init_reference_points | |
| else: | |
| reference = inter_references_points[:, level - 1] | |
| reference = torch.special.logit(reference, eps=1e-5) | |
| outputs_class = self.class_embed[level]( | |
| vision_hidden_state=hidden_states[:, level], | |
| text_hidden_state=enc_text_hidden_state, | |
| text_token_mask=attention_mask.bool(), | |
| ) | |
| delta_bbox = self.bbox_embed[level](hidden_states[:, level]) | |
| reference_coordinates = reference.shape[-1] | |
| if reference_coordinates == 4: | |
| outputs_coord_logits = delta_bbox + reference | |
| elif reference_coordinates == 2: | |
| delta_bbox[..., :2] += reference | |
| outputs_coord_logits = delta_bbox | |
| else: | |
| raise ValueError(f"reference.shape[-1] should be 4 or 2, but got {reference.shape[-1]}") | |
| outputs_coord = outputs_coord_logits.sigmoid() | |
| outputs_classes.append(outputs_class) | |
| outputs_coords.append(outputs_coord) | |
| outputs_class = torch.stack(outputs_classes) | |
| outputs_coord = torch.stack(outputs_coords) | |
| logits = outputs_class[-1] | |
| pred_boxes = outputs_coord[-1] | |
| loss, loss_dict, auxiliary_outputs = None, None, None | |
| if labels is not None: | |
| # First: create the matcher | |
| matcher = GroundingDinoHungarianMatcher( | |
| class_cost=self.config.class_cost, bbox_cost=self.config.bbox_cost, giou_cost=self.config.giou_cost | |
| ) | |
| # Second: create the criterion | |
| losses = ["labels", "boxes", "cardinality"] | |
| criterion = GroundingDinoLoss( | |
| matcher=matcher, | |
| num_classes=self.config.num_labels, | |
| focal_alpha=self.config.focal_alpha, | |
| losses=losses, | |
| ) | |
| criterion.to(self.device) | |
| # Third: compute the losses, based on outputs and labels | |
| outputs_loss = {} | |
| outputs_loss["logits"] = logits | |
| outputs_loss["pred_boxes"] = pred_boxes | |
| if self.config.auxiliary_loss: | |
| auxiliary_outputs = self._set_aux_loss(outputs_class, outputs_coord) | |
| outputs_loss["auxiliary_outputs"] = auxiliary_outputs | |
| if self.config.two_stage: | |
| enc_outputs_coord = outputs[-1].sigmoid() | |
| outputs_loss["enc_outputs"] = {"logits": outputs[-2], "pred_boxes": enc_outputs_coord} | |
| loss_dict = criterion(outputs_loss, labels) | |
| # Fourth: compute total loss, as a weighted sum of the various losses | |
| weight_dict = {"loss_ce": 1, "loss_bbox": self.config.bbox_loss_coefficient} | |
| weight_dict["loss_giou"] = self.config.giou_loss_coefficient | |
| if self.config.auxiliary_loss: | |
| aux_weight_dict = {} | |
| for i in range(self.config.decoder_layers - 1): | |
| aux_weight_dict.update({k + f"_{i}": v for k, v in weight_dict.items()}) | |
| weight_dict.update(aux_weight_dict) | |
| loss = sum(loss_dict[k] * weight_dict[k] for k in loss_dict.keys() if k in weight_dict) | |
| if not return_dict: | |
| if auxiliary_outputs is not None: | |
| output = (logits, pred_boxes) + auxiliary_outputs + outputs | |
| else: | |
| output = (logits, pred_boxes) + outputs | |
| tuple_outputs = ((loss, loss_dict) + output) if loss is not None else output | |
| return tuple_outputs | |
| dict_outputs = GroundingDinoObjectDetectionOutput( | |
| loss=loss, | |
| loss_dict=loss_dict, | |
| logits=logits, | |
| pred_boxes=pred_boxes, | |
| last_hidden_state=outputs.last_hidden_state, | |
| auxiliary_outputs=auxiliary_outputs, | |
| decoder_hidden_states=outputs.decoder_hidden_states, | |
| decoder_attentions=outputs.decoder_attentions, | |
| encoder_last_hidden_state_vision=outputs.encoder_last_hidden_state_vision, | |
| encoder_last_hidden_state_text=outputs.encoder_last_hidden_state_text, | |
| encoder_vision_hidden_states=outputs.encoder_vision_hidden_states, | |
| encoder_text_hidden_states=outputs.encoder_text_hidden_states, | |
| encoder_attentions=outputs.encoder_attentions, | |
| intermediate_hidden_states=outputs.intermediate_hidden_states, | |
| intermediate_reference_points=outputs.intermediate_reference_points, | |
| init_reference_points=outputs.init_reference_points, | |
| enc_outputs_class=outputs.enc_outputs_class, | |
| enc_outputs_coord_logits=outputs.enc_outputs_coord_logits, | |
| ) | |
| return dict_outputs |