Spaces:
Paused
Paused
| import torch | |
| from einops import rearrange | |
| class CrossFrameAttnProcessor: | |
| def __init__(self, unet_chunk_size=2): | |
| self.unet_chunk_size = unet_chunk_size | |
| def __call__( | |
| self, | |
| attn, | |
| hidden_states, | |
| encoder_hidden_states=None, | |
| attention_mask=None, **kwargs): | |
| batch_size, sequence_length, _ = hidden_states.shape | |
| attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size) | |
| query = attn.to_q(hidden_states) | |
| is_cross_attention = encoder_hidden_states is not None | |
| if encoder_hidden_states is None: | |
| encoder_hidden_states = hidden_states | |
| elif attn.norm_cross: | |
| encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states) | |
| key = attn.to_k(encoder_hidden_states) | |
| value = attn.to_v(encoder_hidden_states) | |
| # Sparse Attention | |
| if not is_cross_attention: | |
| video_length = key.size()[0] // self.unet_chunk_size | |
| # print("Video length is", video_length) | |
| # former_frame_index = torch.arange(video_length) - 1 | |
| # former_frame_index[0] = 0 | |
| former_frame_index = [0] * video_length | |
| key = rearrange(key, "(b f) d c -> b f d c", f=video_length) | |
| key = key[:, former_frame_index] | |
| key = rearrange(key, "b f d c -> (b f) d c") | |
| value = rearrange(value, "(b f) d c -> b f d c", f=video_length) | |
| value = value[:, former_frame_index] | |
| value = rearrange(value, "b f d c -> (b f) d c") | |
| query = attn.head_to_batch_dim(query) | |
| key = attn.head_to_batch_dim(key) | |
| value = attn.head_to_batch_dim(value) | |
| attention_probs = attn.get_attention_scores(query, key, attention_mask) | |
| hidden_states = torch.bmm(attention_probs, value) | |
| hidden_states = attn.batch_to_head_dim(hidden_states) | |
| # linear proj | |
| hidden_states = attn.to_out[0](hidden_states) | |
| # dropout | |
| hidden_states = attn.to_out[1](hidden_states) | |
| return hidden_states | |
| class AttnProcessorX: | |
| r""" | |
| Default processor for performing attention-related computations. | |
| """ | |
| def __call__( | |
| self, | |
| attn, | |
| hidden_states, | |
| encoder_hidden_states=None, | |
| attention_mask=None, | |
| temb=None, | |
| scale=1.0, | |
| ): | |
| residual = hidden_states | |
| if attn.spatial_norm is not None: | |
| hidden_states = attn.spatial_norm(hidden_states, temb) | |
| input_ndim = hidden_states.ndim | |
| if input_ndim == 4: | |
| batch_size, channel, height, width = hidden_states.shape | |
| hidden_states = hidden_states.view(batch_size, channel, height * width).transpose(1, 2) | |
| batch_size, sequence_length, _ = ( | |
| hidden_states.shape if encoder_hidden_states is None else encoder_hidden_states.shape | |
| ) | |
| attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size) | |
| if attn.group_norm is not None: | |
| hidden_states = attn.group_norm(hidden_states.transpose(1, 2)).transpose(1, 2) | |
| query = attn.to_q(hidden_states, scale=scale) | |
| if encoder_hidden_states is None: | |
| encoder_hidden_states = hidden_states | |
| elif attn.norm_cross: | |
| encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states) | |
| key = attn.to_k(encoder_hidden_states, scale=scale) | |
| value = attn.to_v(encoder_hidden_states, scale=scale) | |
| query = attn.head_to_batch_dim(query) | |
| key = attn.head_to_batch_dim(key) | |
| value = attn.head_to_batch_dim(value) | |
| attention_probs = attn.get_attention_scores(query, key, attention_mask) | |
| hidden_states = torch.bmm(attention_probs, value) | |
| hidden_states = attn.batch_to_head_dim(hidden_states) | |
| # linear proj | |
| hidden_states = attn.to_out[0](hidden_states, scale=scale) | |
| # dropout | |
| hidden_states = attn.to_out[1](hidden_states) | |
| if input_ndim == 4: | |
| hidden_states = hidden_states.transpose(-1, -2).reshape(batch_size, channel, height, width) | |
| if attn.residual_connection: | |
| hidden_states = hidden_states + residual | |
| hidden_states = hidden_states / attn.rescale_output_factor | |
| return hidden_states |