Spaces:
Running
on
Zero
Running
on
Zero
| # Copyright 2025 ASLP Lab and Xiaomi Inc. All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| from transformers import LlamaConfig | |
| import torch | |
| import torch.nn as nn | |
| from typing import Optional, Tuple | |
| import math | |
| from transformers.models.llama.modeling_llama import LlamaDecoderLayer | |
| from .llama_attention import LLAMA_ATTENTION_CLASSES | |
| # sinusoidal positional encoding | |
| class SinusoidalPosEmb(nn.Module): | |
| def __init__(self, dim): | |
| super().__init__() | |
| self.dim = dim | |
| def forward(self, x): | |
| device = x.device | |
| half_dim = self.dim // 2 | |
| emb = math.log(10000) / (half_dim - 1) | |
| emb = torch.exp(torch.arange(half_dim, device=device) * -emb) | |
| emb = x[:, None] * emb[None, :] * 1.0 | |
| emb = torch.cat((emb.sin(), emb.cos()), dim=-1) | |
| return emb | |
| class LlamaAdaptiveRMSNorm(nn.Module): | |
| def __init__(self, hidden_size=1024, eps=1e-6, dim_cond=1024): | |
| super().__init__() | |
| self.to_weight = nn.Linear(dim_cond, hidden_size) | |
| nn.init.zeros_(self.to_weight.weight) | |
| nn.init.ones_(self.to_weight.bias) | |
| self.variance_epsilon = eps | |
| self._is_hf_initialized = True # disable automatic init | |
| def forward(self, hidden_states, cond_embedding): | |
| input_dtype = hidden_states.dtype | |
| variance = hidden_states.to(torch.float32).pow(2).mean(-1, keepdim=True) | |
| hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon) | |
| weight = self.to_weight(cond_embedding) | |
| if len(weight.shape) == 2: | |
| weight = weight.unsqueeze(1) | |
| return (weight * hidden_states).to(input_dtype) | |
| class LlamaNARDecoderLayer(LlamaDecoderLayer): | |
| def __init__(self, config: LlamaConfig, layer_idx: int, use_flex_attn: bool=False): | |
| """Override to adaptive layer norm""" | |
| super().__init__(config, layer_idx) # init attention, mlp, etc. | |
| _attn_implementation = config._attn_implementation | |
| if use_flex_attn: | |
| _attn_implementation = "flex_attention" | |
| # _attn_implementation = "flash_attention_2" | |
| self.self_attn = LLAMA_ATTENTION_CLASSES[_attn_implementation](config=config, layer_idx=layer_idx) | |
| # self.input_layernorm = LlamaAdaptiveRMSNorm( | |
| # config.hidden_size, eps=config.rms_norm_eps, dim_cond=config.hidden_size | |
| # ) | |
| # self.post_attention_layernorm = LlamaAdaptiveRMSNorm( | |
| # config.hidden_size, eps=config.rms_norm_eps, dim_cond=config.hidden_size | |
| # ) | |
| # add `cond` in forward function | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| position_embeddings: Optional[torch.LongTensor] = None, | |
| past_key_value: Optional[Tuple[torch.Tensor]] = None, | |
| output_attentions: Optional[bool] = False, | |
| use_cache: Optional[bool] = False, | |
| ) -> Tuple[ | |
| torch.FloatTensor, Optional[Tuple[torch.FloatTensor, torch.FloatTensor]] | |
| ]: | |
| """ | |
| Args: | |
| hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)` | |
| attention_mask (`torch.FloatTensor`, *optional*): attention mask of size | |
| `(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values. | |
| output_attentions (`bool`, *optional*): | |
| Whether or not to return the attentions tensors of all attention layers. See `attentions` under | |
| returned tensors for more detail. | |
| use_cache (`bool`, *optional*): | |
| If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding | |
| (see `past_key_values`). | |
| past_key_value (`Tuple(torch.FloatTensor)`, *optional*): cached past key and value projection states | |
| """ | |
| residual = hidden_states | |
| # print(-1, hidden_states.isnan().sum(), hidden_states.isinf().sum()) | |
| hidden_states = self.input_layernorm( | |
| hidden_states | |
| ) | |
| # print(0, hidden_states.isnan().sum(), hidden_states.isinf().sum()) | |
| # Self Attention | |
| hidden_states, self_attn_weights, present_key_value = self.self_attn( | |
| hidden_states=hidden_states, | |
| attention_mask=attention_mask, | |
| position_embeddings=position_embeddings, | |
| past_key_value=past_key_value, | |
| output_attentions=output_attentions, | |
| use_cache=use_cache, | |
| ) | |
| # print(1, hidden_states.isnan().sum(), hidden_states.isinf().sum()) | |
| hidden_states = residual + hidden_states | |
| # print(2, hidden_states.isnan().sum(), hidden_states.isinf().sum()) | |
| # Fully Connected | |
| residual = hidden_states | |
| hidden_states = self.post_attention_layernorm( | |
| hidden_states | |
| ) | |
| # print(3, hidden_states.isnan().sum(), hidden_states.isinf().sum()) | |
| hidden_states = self.mlp(hidden_states) | |
| hidden_states = residual + hidden_states | |
| # print(4, hidden_states.isnan().sum(), hidden_states.isinf().sum()) | |
| outputs = [hidden_states,] | |
| if output_attentions: | |
| outputs += [self_attn_weights,] | |
| if use_cache: | |
| outputs += [present_key_value,] | |
| return outputs | |