Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| # Copyright (c) Alibaba, Inc. and its affiliates. | |
| import math | |
| import warnings | |
| import torch | |
| import torch.nn as nn | |
| from .pos_embed import rope_apply_multires as rope_apply | |
| try: | |
| from flash_attn import (flash_attn_varlen_func) | |
| FLASHATTN_IS_AVAILABLE = True | |
| except ImportError as e: | |
| FLASHATTN_IS_AVAILABLE = False | |
| flash_attn_varlen_func = None | |
| warnings.warn(f'{e}') | |
| __all__ = [ | |
| "drop_path", | |
| "modulate", | |
| "PatchEmbed", | |
| "DropPath", | |
| "RMSNorm", | |
| "Mlp", | |
| "TimestepEmbedder", | |
| "DiTEditBlock", | |
| "MultiHeadAttentionDiTEdit", | |
| "T2IFinalLayer", | |
| ] | |
| def drop_path(x, drop_prob: float = 0., training: bool = False): | |
| """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). | |
| This is the same as the DropConnect impl I created for EfficientNet, etc networks, however, | |
| the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper... | |
| See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for | |
| changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use | |
| 'survival rate' as the argument. | |
| """ | |
| if drop_prob == 0. or not training: | |
| return x | |
| keep_prob = 1 - drop_prob | |
| shape = (x.shape[0], ) + (1, ) * ( | |
| x.ndim - 1) # work with diff dim tensors, not just 2D ConvNets | |
| random_tensor = keep_prob + torch.rand( | |
| shape, dtype=x.dtype, device=x.device) | |
| random_tensor.floor_() # binarize | |
| output = x.div(keep_prob) * random_tensor | |
| return output | |
| def modulate(x, shift, scale, unsqueeze=False): | |
| if unsqueeze: | |
| return x * (1 + scale.unsqueeze(1)) + shift.unsqueeze(1) | |
| else: | |
| return x * (1 + scale) + shift | |
| class PatchEmbed(nn.Module): | |
| """ 2D Image to Patch Embedding | |
| """ | |
| def __init__( | |
| self, | |
| patch_size=16, | |
| in_chans=3, | |
| embed_dim=768, | |
| norm_layer=None, | |
| flatten=True, | |
| bias=True, | |
| ): | |
| super().__init__() | |
| self.flatten = flatten | |
| self.proj = nn.Conv2d(in_chans, | |
| embed_dim, | |
| kernel_size=patch_size, | |
| stride=patch_size, | |
| bias=bias) | |
| self.norm = norm_layer(embed_dim) if norm_layer else nn.Identity() | |
| def forward(self, x): | |
| x = self.proj(x) | |
| if self.flatten: | |
| x = x.flatten(2).transpose(1, 2) # BCHW -> BNC | |
| x = self.norm(x) | |
| return x | |
| class DropPath(nn.Module): | |
| """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). | |
| """ | |
| def __init__(self, drop_prob=None): | |
| super(DropPath, self).__init__() | |
| self.drop_prob = drop_prob | |
| def forward(self, x): | |
| return drop_path(x, self.drop_prob, self.training) | |
| class RMSNorm(nn.Module): | |
| def __init__(self, dim, eps=1e-6): | |
| super().__init__() | |
| self.dim = dim | |
| self.eps = eps | |
| self.weight = nn.Parameter(torch.ones(dim)) | |
| def forward(self, x): | |
| return self._norm(x.float()).type_as(x) * self.weight | |
| def _norm(self, x): | |
| return x * torch.rsqrt(x.pow(2).mean(dim=-1, keepdim=True) + self.eps) | |
| class Mlp(nn.Module): | |
| """ MLP as used in Vision Transformer, MLP-Mixer and related networks | |
| """ | |
| def __init__(self, | |
| in_features, | |
| hidden_features=None, | |
| out_features=None, | |
| act_layer=nn.GELU, | |
| drop=0.): | |
| super().__init__() | |
| out_features = out_features or in_features | |
| hidden_features = hidden_features or in_features | |
| self.fc1 = nn.Linear(in_features, hidden_features) | |
| self.act = act_layer() | |
| self.fc2 = nn.Linear(hidden_features, out_features) | |
| self.drop = nn.Dropout(drop) | |
| def forward(self, x): | |
| x = self.fc1(x) | |
| x = self.act(x) | |
| x = self.drop(x) | |
| x = self.fc2(x) | |
| x = self.drop(x) | |
| return x | |
| class TimestepEmbedder(nn.Module): | |
| """ | |
| Embeds scalar timesteps into vector representations. | |
| """ | |
| def __init__(self, hidden_size, frequency_embedding_size=256): | |
| super().__init__() | |
| self.mlp = nn.Sequential( | |
| nn.Linear(frequency_embedding_size, hidden_size, bias=True), | |
| nn.SiLU(), | |
| nn.Linear(hidden_size, hidden_size, bias=True), | |
| ) | |
| self.frequency_embedding_size = frequency_embedding_size | |
| def timestep_embedding(t, dim, max_period=10000): | |
| """ | |
| Create sinusoidal timestep embeddings. | |
| :param t: a 1-D Tensor of N indices, one per batch element. | |
| These may be fractional. | |
| :param dim: the dimension of the output. | |
| :param max_period: controls the minimum frequency of the embeddings. | |
| :return: an (N, D) Tensor of positional embeddings. | |
| """ | |
| # https://github.com/openai/glide-text2im/blob/main/glide_text2im/nn.py | |
| half = dim // 2 | |
| freqs = torch.exp( | |
| -math.log(max_period) * | |
| torch.arange(start=0, end=half, dtype=torch.float32) / | |
| half).to(device=t.device) | |
| args = t[:, None].float() * freqs[None] | |
| embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1) | |
| if dim % 2: | |
| embedding = torch.cat( | |
| [embedding, torch.zeros_like(embedding[:, :1])], dim=-1) | |
| return embedding | |
| def forward(self, t): | |
| t_freq = self.timestep_embedding(t, self.frequency_embedding_size) | |
| t_emb = self.mlp(t_freq) | |
| return t_emb | |
| class DiTACEBlock(nn.Module): | |
| def __init__(self, | |
| hidden_size, | |
| num_heads, | |
| mlp_ratio=4.0, | |
| drop_path=0., | |
| window_size=0, | |
| backend=None, | |
| use_condition=True, | |
| qk_norm=False, | |
| **block_kwargs): | |
| super().__init__() | |
| self.hidden_size = hidden_size | |
| self.use_condition = use_condition | |
| self.norm1 = nn.LayerNorm(hidden_size, | |
| elementwise_affine=False, | |
| eps=1e-6) | |
| self.attn = MultiHeadAttention(hidden_size, | |
| num_heads=num_heads, | |
| qkv_bias=True, | |
| backend=backend, | |
| qk_norm=qk_norm, | |
| **block_kwargs) | |
| if self.use_condition: | |
| self.cross_attn = MultiHeadAttention( | |
| hidden_size, | |
| context_dim=hidden_size, | |
| num_heads=num_heads, | |
| qkv_bias=True, | |
| backend=backend, | |
| qk_norm=qk_norm, | |
| **block_kwargs) | |
| self.norm2 = nn.LayerNorm(hidden_size, | |
| elementwise_affine=False, | |
| eps=1e-6) | |
| # to be compatible with lower version pytorch | |
| approx_gelu = lambda: nn.GELU(approximate='tanh') | |
| self.mlp = Mlp(in_features=hidden_size, | |
| hidden_features=int(hidden_size * mlp_ratio), | |
| act_layer=approx_gelu, | |
| drop=0) | |
| self.drop_path = DropPath( | |
| drop_path) if drop_path > 0. else nn.Identity() | |
| self.window_size = window_size | |
| self.scale_shift_table = nn.Parameter( | |
| torch.randn(6, hidden_size) / hidden_size**0.5) | |
| def forward(self, x, y, t, **kwargs): | |
| B = x.size(0) | |
| shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp = ( | |
| self.scale_shift_table[None] + t.reshape(B, 6, -1)).chunk(6, dim=1) | |
| shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp = ( | |
| shift_msa.squeeze(1), scale_msa.squeeze(1), gate_msa.squeeze(1), | |
| shift_mlp.squeeze(1), scale_mlp.squeeze(1), gate_mlp.squeeze(1)) | |
| x = x + self.drop_path(gate_msa * self.attn( | |
| modulate(self.norm1(x), shift_msa, scale_msa, unsqueeze=False), ** | |
| kwargs)) | |
| if self.use_condition: | |
| x = x + self.cross_attn(x, context=y, **kwargs) | |
| x = x + self.drop_path(gate_mlp * self.mlp( | |
| modulate(self.norm2(x), shift_mlp, scale_mlp, unsqueeze=False))) | |
| return x | |
| class MultiHeadAttention(nn.Module): | |
| def __init__(self, | |
| dim, | |
| context_dim=None, | |
| num_heads=None, | |
| head_dim=None, | |
| attn_drop=0.0, | |
| qkv_bias=False, | |
| dropout=0.0, | |
| backend=None, | |
| qk_norm=False, | |
| eps=1e-6, | |
| **block_kwargs): | |
| super().__init__() | |
| # consider head_dim first, then num_heads | |
| num_heads = dim // head_dim if head_dim else num_heads | |
| head_dim = dim // num_heads | |
| assert num_heads * head_dim == dim | |
| context_dim = context_dim or dim | |
| self.dim = dim | |
| self.context_dim = context_dim | |
| self.num_heads = num_heads | |
| self.head_dim = head_dim | |
| self.scale = math.pow(head_dim, -0.25) | |
| # layers | |
| self.q = nn.Linear(dim, dim, bias=qkv_bias) | |
| self.k = nn.Linear(context_dim, dim, bias=qkv_bias) | |
| self.v = nn.Linear(context_dim, dim, bias=qkv_bias) | |
| self.o = nn.Linear(dim, dim) | |
| self.norm_q = RMSNorm(dim, eps=eps) if qk_norm else nn.Identity() | |
| self.norm_k = RMSNorm(dim, eps=eps) if qk_norm else nn.Identity() | |
| self.dropout = nn.Dropout(dropout) | |
| self.attention_op = None | |
| self.attn_drop = nn.Dropout(attn_drop) | |
| self.backend = backend | |
| assert self.backend in ('flash_attn', 'xformer_attn', 'pytorch_attn', | |
| None) | |
| if FLASHATTN_IS_AVAILABLE and self.backend in ('flash_attn', None): | |
| self.backend = 'flash_attn' | |
| self.softmax_scale = block_kwargs.get('softmax_scale', None) | |
| self.causal = block_kwargs.get('causal', False) | |
| self.window_size = block_kwargs.get('window_size', (-1, -1)) | |
| self.deterministic = block_kwargs.get('deterministic', False) | |
| else: | |
| raise NotImplementedError | |
| def flash_attn(self, x, context=None, **kwargs): | |
| ''' | |
| The implementation will be very slow when mask is not None, | |
| because we need rearange the x/context features according to mask. | |
| Args: | |
| x: | |
| context: | |
| mask: | |
| **kwargs: | |
| Returns: x | |
| ''' | |
| dtype = kwargs.get('dtype', torch.float16) | |
| def half(x): | |
| return x if x.dtype in [torch.float16, torch.bfloat16 | |
| ] else x.to(dtype) | |
| x_shapes = kwargs['x_shapes'] | |
| freqs = kwargs['freqs'] | |
| self_x_len = kwargs['self_x_len'] | |
| cross_x_len = kwargs['cross_x_len'] | |
| txt_lens = kwargs['txt_lens'] | |
| n, d = self.num_heads, self.head_dim | |
| if context is None: | |
| # self-attn | |
| q = self.norm_q(self.q(x)).view(-1, n, d) | |
| k = self.norm_q(self.k(x)).view(-1, n, d) | |
| v = self.v(x).view(-1, n, d) | |
| q = rope_apply(q, self_x_len, x_shapes, freqs, pad=False) | |
| k = rope_apply(k, self_x_len, x_shapes, freqs, pad=False) | |
| q_lens = k_lens = self_x_len | |
| else: | |
| # cross-attn | |
| q = self.norm_q(self.q(x)).view(-1, n, d) | |
| k = self.norm_q(self.k(context)).view(-1, n, d) | |
| v = self.v(context).view(-1, n, d) | |
| q_lens = cross_x_len | |
| k_lens = txt_lens | |
| cu_seqlens_q = torch.cat([q_lens.new_zeros([1]), | |
| q_lens]).cumsum(0, dtype=torch.int32) | |
| cu_seqlens_k = torch.cat([k_lens.new_zeros([1]), | |
| k_lens]).cumsum(0, dtype=torch.int32) | |
| max_seqlen_q = q_lens.max() | |
| max_seqlen_k = k_lens.max() | |
| out_dtype = q.dtype | |
| q, k, v = half(q), half(k), half(v) | |
| x = flash_attn_varlen_func(q, | |
| k, | |
| v, | |
| cu_seqlens_q=cu_seqlens_q, | |
| cu_seqlens_k=cu_seqlens_k, | |
| max_seqlen_q=max_seqlen_q, | |
| max_seqlen_k=max_seqlen_k, | |
| dropout_p=self.attn_drop.p, | |
| softmax_scale=self.softmax_scale, | |
| causal=self.causal, | |
| window_size=self.window_size, | |
| deterministic=self.deterministic) | |
| x = x.type(out_dtype) | |
| x = x.reshape(-1, n * d) | |
| x = self.o(x) | |
| x = self.dropout(x) | |
| return x | |
| def forward(self, x, context=None, **kwargs): | |
| x = getattr(self, self.backend)(x, context=context, **kwargs) | |
| return x | |
| class T2IFinalLayer(nn.Module): | |
| """ | |
| The final layer of PixArt. | |
| """ | |
| def __init__(self, hidden_size, patch_size, out_channels): | |
| super().__init__() | |
| self.norm_final = nn.LayerNorm(hidden_size, | |
| elementwise_affine=False, | |
| eps=1e-6) | |
| self.linear = nn.Linear(hidden_size, | |
| patch_size * patch_size * out_channels, | |
| bias=True) | |
| self.scale_shift_table = nn.Parameter( | |
| torch.randn(2, hidden_size) / hidden_size**0.5) | |
| self.out_channels = out_channels | |
| def forward(self, x, t): | |
| shift, scale = (self.scale_shift_table[None] + t[:, None]).chunk(2, | |
| dim=1) | |
| shift, scale = shift.squeeze(1), scale.squeeze(1) | |
| x = modulate(self.norm_final(x), shift, scale) | |
| x = self.linear(x) | |
| return x |