Spaces:
Running
on
L40S
Running
on
L40S
File size: 6,048 Bytes
38e20ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
from packaging import version
from typing import Optional, Tuple
import numpy as np
import torch
import torch.nn.functional as F
import transformers
from transformers import Wav2Vec2Model
from transformers.modeling_outputs import BaseModelOutput
_CONFIG_FOR_DOC = 'Wav2Vec2Config'
# the implementation of Wav2Vec2Model is borrowed from
# https://huggingface.co./transformers/_modules/transformers/models/wav2vec2/modeling_wav2vec2.html#Wav2Vec2Model
# initialize our encoder with the pre-trained wav2vec 2.0 weights.
def _compute_mask_indices(shape: Tuple[int, int], mask_prob: float, mask_length: int,
attention_mask: Optional[torch.Tensor] = None, min_masks: int = 0, ) -> np.ndarray:
bsz, all_sz = shape
mask = np.full((bsz, all_sz), False)
all_num_mask = int(mask_prob * all_sz / float(mask_length) + np.random.rand())
all_num_mask = max(min_masks, all_num_mask)
mask_idcs = []
padding_mask = attention_mask.ne(1) if attention_mask is not None else None
for i in range(bsz):
if padding_mask is not None:
sz = all_sz - padding_mask[i].long().sum().item()
num_mask = int(mask_prob * sz / float(mask_length) + np.random.rand())
num_mask = max(min_masks, num_mask)
else:
sz = all_sz
num_mask = all_num_mask
lengths = np.full(num_mask, mask_length)
if sum(lengths) == 0:
lengths[0] = min(mask_length, sz - 1)
min_len = min(lengths)
if sz - min_len <= num_mask:
min_len = sz - num_mask - 1
mask_idc = np.random.choice(sz - min_len, num_mask, replace=False)
mask_idc = np.asarray([mask_idc[j] + offset for j in range(len(mask_idc)) for offset in range(lengths[j])])
mask_idcs.append(np.unique(mask_idc[mask_idc < sz]))
min_len = min([len(m) for m in mask_idcs])
for i, mask_idc in enumerate(mask_idcs):
if len(mask_idc) > min_len:
mask_idc = np.random.choice(mask_idc, min_len, replace=False)
mask[i, mask_idc] = True
return mask
# linear interpolation layer
def linear_interpolation(features, input_fps, output_fps, output_len=None):
# features: (N, C, L)
seq_len = features.shape[2] / float(input_fps)
if output_len is None:
output_len = int(seq_len * output_fps)
output_features = F.interpolate(features, size=output_len, align_corners=False, mode='linear')
return output_features
class Wav2Vec2Model(Wav2Vec2Model):
def __init__(self, config):
super().__init__(config)
self.is_old_version = version.parse(transformers.__version__) < version.parse('4.7.0')
def forward(self, input_values, output_fps=25, attention_mask=None, output_attentions=None,
output_hidden_states=None, return_dict=None, frame_num=None):
self.config.output_attentions = True
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
hidden_states = self.feature_extractor(input_values) # (N, C, L)
# Resample the audio feature @ 50 fps to `output_fps`.
if frame_num is not None:
hidden_states_len = round(frame_num * 50 / output_fps)
hidden_states = hidden_states[:, :, :hidden_states_len]
hidden_states = linear_interpolation(hidden_states, 50, output_fps, output_len=frame_num)
hidden_states = hidden_states.transpose(1, 2) # (N, L, C)
if attention_mask is not None:
output_lengths = self._get_feat_extract_output_lengths(attention_mask.sum(-1))
attention_mask = torch.zeros(hidden_states.shape[:2], dtype=hidden_states.dtype,
device=hidden_states.device)
attention_mask[(torch.arange(attention_mask.shape[0], device=hidden_states.device), output_lengths - 1)] = 1
attention_mask = attention_mask.flip([-1]).cumsum(-1).flip([-1]).bool()
if self.is_old_version:
hidden_states = self.feature_projection(hidden_states)
else:
hidden_states = self.feature_projection(hidden_states)[0]
if self.config.apply_spec_augment and self.training:
batch_size, sequence_length, hidden_size = hidden_states.size()
if self.config.mask_time_prob > 0:
mask_time_indices = _compute_mask_indices((batch_size, sequence_length), self.config.mask_time_prob,
self.config.mask_time_length, attention_mask=attention_mask,
min_masks=2, )
hidden_states[torch.from_numpy(mask_time_indices)] = self.masked_spec_embed.to(hidden_states.dtype)
if self.config.mask_feature_prob > 0:
mask_feature_indices = _compute_mask_indices((batch_size, hidden_size), self.config.mask_feature_prob,
self.config.mask_feature_length, )
mask_feature_indices = torch.from_numpy(mask_feature_indices).to(hidden_states.device)
hidden_states[mask_feature_indices[:, None].expand(-1, sequence_length, -1)] = 0
encoder_outputs = self.encoder(hidden_states, attention_mask=attention_mask,
output_attentions=output_attentions, output_hidden_states=output_hidden_states,
return_dict=return_dict, )
hidden_states = encoder_outputs[0]
if not return_dict:
return (hidden_states,) + encoder_outputs[1:]
return BaseModelOutput(last_hidden_state=hidden_states, hidden_states=encoder_outputs.hidden_states,
attentions=encoder_outputs.attentions, )
|