|
from typing import Optional |
|
|
|
import torch |
|
import torch.nn.functional as F |
|
|
|
|
|
def apply_rotary_emb( |
|
x: torch.Tensor, |
|
freqs_cis: torch.Tensor, |
|
curr_pos_id: Optional[torch.Tensor] = None, |
|
) -> torch.Tensor: |
|
""" |
|
Applies rotary positional embeddings to the input tensor. |
|
Args: |
|
x (torch.Tensor): The input tensor. |
|
freqs_cis (torch.Tensor): A tensor containing the precomputed rotary |
|
frequency components. |
|
curr_pos_id (Optional[torch.Tensor]): An optional tensor specifying the |
|
current position IDs to use for selecting a subset of `freqs_cis`. |
|
If None, the function uses the last `seq_len` positions. |
|
Returns: |
|
torch.Tensor: The input tensor `x` with rotary positional embeddings |
|
applied. |
|
""" |
|
x_ = torch.view_as_complex(x.float().reshape(*x.shape[:-1], -1, 2)) |
|
if curr_pos_id is None: |
|
freqs_cis = freqs_cis[:, -x.shape[2] :].unsqueeze(1) |
|
else: |
|
freqs_cis = freqs_cis[:, curr_pos_id, :].unsqueeze(1) |
|
y = torch.view_as_real(x_ * freqs_cis).flatten(3) |
|
return y.type_as(x) |
|
|
|
|
|
@torch.no_grad |
|
def precompute_freqs_cis(dim: int, t: torch.Tensor, theta: float = 10000.0): |
|
"""Calculate rotary embedding cos & sin, this is useful when every blocks in the network use same positional embedding. |
|
|
|
Args: |
|
dim (int): dimension of the single head of the transformer block |
|
t (torch.Tensor): position ids [..., L] |
|
theta (int, optional): rope theta. Defaults to 10000. |
|
|
|
Returns: |
|
Tuple[torch.Tensor, torch.Tensor]: tuple of cos and sin of rope |
|
""" |
|
assert dim % 2 == 0, ( |
|
"RoPE only supports embedding dimensions that are multiples of 2" |
|
) |
|
freqs = 1.0 / ( |
|
theta ** (torch.arange(0, dim, 2, dtype=torch.float32, device=t.device) / dim) |
|
) |
|
|
|
freqs = torch.outer(t.contiguous().view(-1), freqs).reshape(*t.shape, -1) |
|
freqs_cis = torch.polar(torch.ones_like(freqs), freqs) |
|
|
|
return freqs_cis |
|
|
|
|
|
def scaled_dot_product_attention_with_rotary_emb( |
|
q: torch.Tensor, |
|
k: torch.Tensor, |
|
v: torch.Tensor, |
|
freqs_cis: torch.Tensor, |
|
attn_mask: Optional[torch.Tensor] = None, |
|
curr_pos_id: Optional[torch.Tensor] = None, |
|
is_causal: bool = False, |
|
) -> torch.Tensor: |
|
""" |
|
Computes scaled dot product attention on query, key and value tensors |
|
with rotary position embeddings on query and key. |
|
|
|
Without caching enabled, |
|
q should be (bs, nh, seqlen, hd). |
|
k and v should stay unchanged, (bs, nh, seqlen, hd). |
|
With caching enabled, |
|
q should be (bs, nh, 1, hd). |
|
k and v should stay unchanged, (bs, nh, 1, hd). |
|
causal_mask must be False. |
|
""" |
|
q = apply_rotary_emb(q, freqs_cis, curr_pos_id=curr_pos_id) |
|
k = apply_rotary_emb(k, freqs_cis, curr_pos_id=None) |
|
|
|
x = F.scaled_dot_product_attention( |
|
q, |
|
k, |
|
v, |
|
attn_mask=attn_mask, |
|
dropout_p=0.0, |
|
is_causal=is_causal and attn_mask is None, |
|
) |
|
return x |
|
|