Spaces:
Running
on
Zero
Running
on
Zero
| # Copyright 2025 The HuggingFace Inc. team. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| from typing import Optional, Tuple, Union | |
| import torch | |
| from torch.nn.attention.flex_attention import BlockMask, flex_attention | |
| from torch.nn.attention.flex_attention import ( | |
| create_block_mask as create_block_causal_mask_flex, | |
| ) | |
| class WrappedFlexAttention: | |
| """ | |
| We are doing a singleton class so that flex attention is compiled once when it's first called. | |
| """ | |
| _instance = None | |
| _is_flex_compiled = False | |
| _compiled_flex_attention = None | |
| def __new__(cls, *args, **kwargs): | |
| if cls._instance is None: | |
| # Create a new instance if one doesn't already exist | |
| cls._instance = super().__new__(cls) | |
| return cls._instance | |
| def __init__(self, training): | |
| """ | |
| Initialize or update the singleton instance. | |
| """ | |
| if not self._is_flex_compiled or training != self.training: | |
| # In PyTorch 2.6.0, there's a known issue with flex attention compilation which may | |
| # cause errors. The suggested fix is to compile with "max-autotune-no-cudagraphs" | |
| # see https://github.com/pytorch/pytorch/issues/146260 for training | |
| self.training = training | |
| if torch.__version__.split("+")[0] == "2.6.0" and training: | |
| self._compiled_flex_attention = torch.compile( | |
| flex_attention, dynamic=False, mode="max-autotune-no-cudagraphs" | |
| ) | |
| else: | |
| self._compiled_flex_attention = torch.compile(flex_attention) | |
| self._is_flex_compiled = True | |
| def __call__(self): | |
| return self._compiled_flex_attention | |
| Offset = Union[torch.Tensor, int] | |
| def make_flex_block_causal_mask( | |
| attention_mask_2d: torch.Tensor, | |
| attention_chunk_size: Optional[int] = None, | |
| query_length=None, | |
| key_length=None, | |
| offsets: Optional[Tuple[Offset, Offset]] = None, | |
| ) -> "BlockMask": | |
| """ | |
| Create a block causal document mask for a batch of sequences, both packed and unpacked. | |
| Create Block causal logic and passing it into :func:`torch.nn.attention.flex_attention.create_block_mask`. | |
| The resultant BlockMask is a compressed representation of the full block causal | |
| mask. BlockMask is essential for performant computation of flex attention. | |
| See: https://pytorch.org/blog/flexattention/ | |
| Args: | |
| attention_mask_2d (torch.Tensor): Attention mask for packed and padded sequences | |
| of shape (batch_size, total_seq_len). e.g. | |
| For unpacked sequence: | |
| [[1, 1, 1, 1, 0, 0, 0], | |
| [1, 1, 1, 1, 1, 0, 0]] | |
| For packed sequence: | |
| [[1, 1, 1, 2, 2, 2, 0], | |
| [1, 1, 2, 2, 2, 3, 3]] | |
| Returns: | |
| BlockMask | |
| """ | |
| batch_size, total_seq_len = attention_mask_2d.shape | |
| if not key_length: | |
| key_length = total_seq_len | |
| if not query_length: | |
| query_length = total_seq_len | |
| attention_mask_2d = torch.nn.functional.pad(attention_mask_2d, value=0, pad=(0, key_length)) | |
| device = attention_mask_2d.device | |
| document_ids = attention_mask_2d.clone() | |
| if attention_chunk_size is not None: | |
| # we create an arange, then we just // by chunk size to get [0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3] | |
| document_ids = (document_ids.fill_(1).cumsum(-1) - 1) // (attention_chunk_size) | |
| # Instead of passing a tensor mask, flex attention requires a mask_mod function | |
| # that determines which elements of QK^T should be included in the attention | |
| # computation prior to the softmax. For sample packing, we need both the | |
| # logic for both causal mask and document mask. See PyTorch's official | |
| # blog post for more details: https://pytorch.org/blog/flexattention/#mask-mods | |
| def causal_mask_mod(batch_idx, head_idx, q_idx, kv_idx): | |
| """ | |
| Defines the logic of a block causal mask by combining both a standard causal mask | |
| and a block diagonal document mask. | |
| See :func:`~torchtune.modules.attention_utils.create_block_causal_mask` | |
| for an illustration. | |
| """ | |
| causal_mask = q_idx >= kv_idx # not valid when decoding | |
| document_mask = document_ids[batch_idx, q_idx] == document_ids[batch_idx, kv_idx] | |
| padding_mask = attention_mask_2d[batch_idx, q_idx] > 0 | |
| final_mask = causal_mask & padding_mask & document_mask | |
| return final_mask | |
| if offsets is not None: | |
| q_offset = offsets[0] | |
| kv_offset = offsets[1] | |
| def mask_mod(batch_idx, head_idx, q_idx, kv_idx): | |
| offset_q = q_idx + q_offset | |
| offset_kv = kv_idx + kv_offset | |
| return causal_mask_mod(batch_idx, head_idx, offset_q, offset_kv) | |
| else: | |
| mask_mod = causal_mask_mod | |
| return create_block_causal_mask_flex( | |
| mask_mod=mask_mod, | |
| B=batch_size, | |
| H=None, # attention head | |
| Q_LEN=query_length, | |
| KV_LEN=key_length, | |
| device=device, | |
| _compile=True, | |
| ) | |
| def compile_friendly_flex_attention( | |
| query: torch.Tensor, | |
| key: torch.Tensor, | |
| value: torch.Tensor, | |
| training=False, | |
| **kwargs, | |
| ) -> torch.Tensor: | |
| # First call initialise singleton wrapper object, second call invokes the object method to return compiled flex attention | |
| flex_attention_compiled = WrappedFlexAttention(training)() | |
| return flex_attention_compiled( | |
| query, | |
| key, | |
| value, | |
| **kwargs, | |
| ) | |
| def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor: | |
| """ | |
| This is the equivalent of torch.repeat_interleave(x, dim=1, repeats=n_rep). The hidden states go from (batch, | |
| num_key_value_heads, seqlen, head_dim) to (batch, num_attention_heads, seqlen, head_dim) | |
| """ | |
| batch, num_key_value_heads, slen, head_dim = hidden_states.shape | |
| if n_rep == 1: | |
| return hidden_states | |
| hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim) | |
| return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim) | |
| def flex_attention_forward( | |
| query: torch.Tensor, | |
| key: torch.Tensor, | |
| value: torch.Tensor, | |
| attention_mask: Union[torch.Tensor, "BlockMask"], | |
| training: bool = True, | |
| scaling: Optional[float] = None, | |
| softcap: Optional[float] = None, | |
| head_mask: Optional[torch.Tensor] = None, | |
| **kwargs, | |
| ) -> Tuple[torch.Tensor, torch.Tensor]: | |
| block_mask = None | |
| causal_mask = None | |
| block_mask = attention_mask | |
| # if isinstance(attention_mask, BlockMask): | |
| # block_mask = attention_mask | |
| # else: | |
| # causal_mask = attention_mask | |
| if causal_mask is not None: | |
| causal_mask = causal_mask[:, :, :, : key.shape[-2]] | |
| def score_mod(score, batch_idx, head_idx, q_idx, kv_idx): | |
| if softcap is not None: | |
| score = softcap * torch.tanh(score / softcap) | |
| if causal_mask is not None: | |
| score = score + causal_mask[batch_idx][0][q_idx][kv_idx] | |
| if head_mask is not None: | |
| score = score + head_mask[batch_idx][head_idx][0][0] | |
| return score | |
| enable_gqa = True | |
| num_local_query_heads = query.shape[1] | |
| # When running TP this helps: | |
| if not ((num_local_query_heads & (num_local_query_heads - 1)) == 0): | |
| key = repeat_kv(key, query.shape[1] // key.shape[1]) | |
| value = repeat_kv(value, query.shape[1] // value.shape[1]) | |
| enable_gqa = False | |
| kernel_options = kwargs.get("kernel_options", None) | |
| attn_output, attention_weights = compile_friendly_flex_attention( | |
| query, | |
| key, | |
| value, | |
| score_mod=score_mod, | |
| block_mask=block_mask, | |
| enable_gqa=enable_gqa, | |
| scale=scaling, | |
| kernel_options=kernel_options, | |
| # Last time checked on PyTorch == 2.5.1: Flex Attention always computes the lse regardless. | |
| # For simplification, we thus always return it as no additional computations are introduced. | |
| return_lse=True, | |
| training=training, | |
| ) | |
| # lse is returned in float32 | |
| attention_weights = attention_weights.to(value.dtype) | |
| attn_output = attn_output.transpose(1, 2).contiguous() | |
| return attn_output, attention_weights |