Spaces:
Running
on
L40S
Running
on
L40S
| # Copyright (C) 2022-present Naver Corporation. All rights reserved. | |
| # Licensed under CC BY-NC-SA 4.0 (non-commercial use only). | |
| # -------------------------------------------------------- | |
| # CroCo model for downstream tasks | |
| # -------------------------------------------------------- | |
| import torch | |
| from .croco import CroCoNet | |
| def croco_args_from_ckpt(ckpt): | |
| if 'croco_kwargs' in ckpt: # CroCo v2 released models | |
| return ckpt['croco_kwargs'] | |
| elif 'args' in ckpt and hasattr(ckpt['args'], 'model'): # pretrained using the official code release | |
| s = ckpt['args'].model # eg "CroCoNet(enc_embed_dim=1024, enc_num_heads=16, enc_depth=24)" | |
| assert s.startswith('CroCoNet(') | |
| return eval('dict'+s[len('CroCoNet'):]) # transform it into the string of a dictionary and evaluate it | |
| else: # CroCo v1 released models | |
| return dict() | |
| class CroCoDownstreamMonocularEncoder(CroCoNet): | |
| def __init__(self, | |
| head, | |
| **kwargs): | |
| """ Build network for monocular downstream task, only using the encoder. | |
| It takes an extra argument head, that is called with the features | |
| and a dictionary img_info containing 'width' and 'height' keys | |
| The head is setup with the croconet arguments in this init function | |
| NOTE: It works by *calling super().__init__() but with redefined setters | |
| """ | |
| super(CroCoDownstreamMonocularEncoder, self).__init__(**kwargs) | |
| head.setup(self) | |
| self.head = head | |
| def _set_mask_generator(self, *args, **kwargs): | |
| """ No mask generator """ | |
| return | |
| def _set_mask_token(self, *args, **kwargs): | |
| """ No mask token """ | |
| self.mask_token = None | |
| return | |
| def _set_decoder(self, *args, **kwargs): | |
| """ No decoder """ | |
| return | |
| def _set_prediction_head(self, *args, **kwargs): | |
| """ No 'prediction head' for downstream tasks.""" | |
| return | |
| def forward(self, img): | |
| """ | |
| img if of size batch_size x 3 x h x w | |
| """ | |
| B, C, H, W = img.size() | |
| img_info = {'height': H, 'width': W} | |
| need_all_layers = hasattr(self.head, 'return_all_blocks') and self.head.return_all_blocks | |
| out, _, _ = self._encode_image(img, do_mask=False, return_all_blocks=need_all_layers) | |
| return self.head(out, img_info) | |
| class CroCoDownstreamBinocular(CroCoNet): | |
| def __init__(self, | |
| head, | |
| **kwargs): | |
| """ Build network for binocular downstream task | |
| It takes an extra argument head, that is called with the features | |
| and a dictionary img_info containing 'width' and 'height' keys | |
| The head is setup with the croconet arguments in this init function | |
| """ | |
| super(CroCoDownstreamBinocular, self).__init__(**kwargs) | |
| head.setup(self) | |
| self.head = head | |
| def _set_mask_generator(self, *args, **kwargs): | |
| """ No mask generator """ | |
| return | |
| def _set_mask_token(self, *args, **kwargs): | |
| """ No mask token """ | |
| self.mask_token = None | |
| return | |
| def _set_prediction_head(self, *args, **kwargs): | |
| """ No prediction head for downstream tasks, define your own head """ | |
| return | |
| def encode_image_pairs(self, img1, img2, return_all_blocks=False): | |
| """ run encoder for a pair of images | |
| it is actually ~5% faster to concatenate the images along the batch dimension | |
| than to encode them separately | |
| """ | |
| ## the two commented lines below is the naive version with separate encoding | |
| #out, pos, _ = self._encode_image(img1, do_mask=False, return_all_blocks=return_all_blocks) | |
| #out2, pos2, _ = self._encode_image(img2, do_mask=False, return_all_blocks=False) | |
| ## and now the faster version | |
| out, pos, _ = self._encode_image( torch.cat( (img1,img2), dim=0), do_mask=False, return_all_blocks=return_all_blocks ) | |
| if return_all_blocks: | |
| out,out2 = list(map(list, zip(*[o.chunk(2, dim=0) for o in out]))) | |
| out2 = out2[-1] | |
| else: | |
| out,out2 = out.chunk(2, dim=0) | |
| pos,pos2 = pos.chunk(2, dim=0) | |
| return out, out2, pos, pos2 | |
| def forward(self, img1, img2): | |
| B, C, H, W = img1.size() | |
| img_info = {'height': H, 'width': W} | |
| return_all_blocks = hasattr(self.head, 'return_all_blocks') and self.head.return_all_blocks | |
| out, out2, pos, pos2 = self.encode_image_pairs(img1, img2, return_all_blocks=return_all_blocks) | |
| if return_all_blocks: | |
| decout = self._decoder(out[-1], pos, None, out2, pos2, return_all_blocks=return_all_blocks) | |
| decout = out+decout | |
| else: | |
| decout = self._decoder(out, pos, None, out2, pos2, return_all_blocks=return_all_blocks) | |
| return self.head(decout, img_info) |