import math import torch import torch.nn as nn import torch.nn.functional as F from torch import Tensor class EmbeddingLayer(nn.Module): def __init__(self, vocab_size: int, d_model: int = 768): super().__init__() self.d_model = d_model self.lut = nn.Embedding( num_embeddings=vocab_size, embedding_dim=d_model ) # (vocab_size, d_model) def forward(self, x): # x shape: (batch_size, seq_len) return self.lut(x) * math.sqrt(self.d_model) # (batch_size, seq_len, d_model) class PositionalEncoding(nn.Module): def __init__(self, d_model: int = 768, dropout: float = 0.1, max_length: int = 128): super().__init__() self.dropout = nn.Dropout(p=dropout) pe = torch.zeros(max_length, d_model) # (max_length, d_model) # Create position column k = torch.arange(0, max_length).unsqueeze(dim=1) # (max_length, 1) # Use the log version of the function for positional encodings div_term = torch.exp( torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model) ) # (d_model / 2) # Use sine for the even indices and cosine for the odd indices pe[:, 0::2] = torch.sin(k * div_term) pe[:, 1::2] = torch.cos(k * div_term) pe = pe.unsqueeze(dim=0) # Add the batch dimension(1, max_length, d_model) # We use a buffer because the positional encoding is fixed and not a model paramter that we want to be updated during backpropagation. self.register_buffer( "pe", pe ) # Buffers are saved with the model state and are moved to the correct device def forward(self, x): # x shape: (batch_size, seq_length, d_model) x += self.pe[:, : x.size(1)] return self.dropout(x) class MultiHeadAttention(nn.Module): def __init__(self, d_model: int = 768, n_heads: int = 8, dropout: float = 0.1): super().__init__() assert d_model % n_heads == 0 self.d_model = d_model self.n_heads = n_heads self.d_key = d_model // n_heads self.Wq = nn.Linear(in_features=d_model, out_features=d_model) self.Wk = nn.Linear(in_features=d_model, out_features=d_model) self.Wv = nn.Linear(in_features=d_model, out_features=d_model) self.Wo = nn.Linear(in_features=d_model, out_features=d_model) self.dropout = nn.Dropout(p=dropout) def forward(self, query: Tensor, key: Tensor, value: Tensor, mask: Tensor = None): # input shape: (batch_size, seq_len, d_model) batch_size = key.size(0) Q = self.Wq(query) K = self.Wk(key) V = self.Wv(value) Q = Q.view(batch_size, -1, self.n_heads, self.d_key).permute( 0, 2, 1, 3 ) # (batch_size, n_heads, q_length, d_key) K = K.view(batch_size, -1, self.n_heads, self.d_key).permute( 0, 2, 1, 3 ) # (batch_size, n_heads, k_length, d_key) V = V.view(batch_size, -1, self.n_heads, self.d_key).permute( 0, 2, 1, 3 ) # (batch_size, n_heads, v_length, d_key) scaled_dot_product = torch.matmul(Q, K.permute(0, 1, 3, 2)) / math.sqrt( self.d_key ) # (batch_size, n_heads, q_length, k_length) if mask is not None: scaled_dot_product = scaled_dot_product.masked_fill( mask == 0, float("-inf") ) attention_probs = torch.softmax(scaled_dot_product, dim=-1) A = torch.matmul( self.dropout(attention_probs), V ) # (batch_size, n_heads, q_length, d_key) A = A.permute(0, 2, 1, 3) # (batch_size, q_length, n_heads, d_key) A = A.contiguous().view( batch_size, -1, self.n_heads * self.d_key ) # (batch_size, q_length, d_model) output = self.Wo(A) # (batch_size, q_length, d_model) return output, attention_probs class PositionwiseFeedForward(nn.Module): def __init__(self, d_model: int = 768, dropout: float = 0.1): super().__init__() self.ffn = nn.Sequential( nn.Linear(in_features=d_model, out_features=(d_model * 4)), nn.ReLU(), nn.Linear(in_features=(d_model * 4), out_features=d_model), nn.Dropout(p=dropout), ) def forward(self, x): # x shape: (batch_size, q_length, d_model) return self.ffn(x) # (batch_size, q_length, d_model) class EncoderLayer(nn.Module): def __init__(self, d_model: int = 768, n_heads: int = 8, dropout: float = 0.1): super().__init__() self.attention = MultiHeadAttention( d_model=d_model, n_heads=n_heads, dropout=dropout ) self.attention_layer_norm = nn.LayerNorm(d_model) self.position_wise_ffn = PositionwiseFeedForward( d_model=d_model, dropout=dropout ) self.ffn_layer_norm = nn.LayerNorm(d_model) self.dropout = nn.Dropout(p=dropout) def forward(self, src: Tensor, src_mask: Tensor): _src, attention_probs = self.attention( query=src, key=src, value=src, mask=src_mask ) src = self.attention_layer_norm(src + self.dropout(_src)) _src = self.position_wise_ffn(src) src = self.ffn_layer_norm(src + self.dropout(_src)) return src, attention_probs class Encoder(nn.Module): def __init__( self, d_model: int = 768, n_layers: int = 3, n_heads: int = 8, dropout: float = 0.1, ): super().__init__() self.layers = nn.ModuleList( [ EncoderLayer(d_model=d_model, n_heads=n_heads, dropout=dropout) for layer in range(n_layers) ] ) self.dropout = nn.Dropout(p=dropout) def forward(self, src: Tensor, src_mask: Tensor): for layer in self.layers: src, attention_probs = layer(src, src_mask) self.attention_probs = attention_probs # src += torch.randn_like(src) * 0.001 return src class Transformer(nn.Module): def __init__( self, encoder: Encoder, src_embed: EmbeddingLayer, src_pad_idx: int, device, d_model: int = 768, num_labels: int = 5, ): super().__init__() self.encoder = encoder self.src_embed = src_embed self.device = device self.src_pad_idx = src_pad_idx self.dropout = nn.Dropout(p=0.1) self.classifier = nn.Linear(in_features=d_model, out_features=num_labels) def make_src_mask(self, src: Tensor): # Assign 1 to tokens that need attended to and 0 to padding tokens, then add 2 dimensions src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2) return src_mask def forward(self, src: Tensor): src_mask = self.make_src_mask(src) # (batch_size, 1, 1, src_seq_length) output = self.encoder( self.src_embed(src), src_mask ) # (batch_size, src_seq_length, d_model) output = output[ :, 0, : ] # Get the sos token vector representation (works sort of like a cls token in ViT) shape: (batch_size, 1, d_model) logits = self.classifier(self.dropout(output)) return logits def make_model( device, tokenizer, n_layers: int = 3, d_model: int = 768, num_labels: int = 5, n_heads: int = 8, dropout: float = 0.1, max_length: int = 128, ): encoder = Encoder( d_model=d_model, n_layers=n_layers, n_heads=n_heads, dropout=dropout ) src_embed = EmbeddingLayer(vocab_size=tokenizer.vocab_size, d_model=d_model) pos_enc = PositionalEncoding( d_model=d_model, dropout=dropout, max_length=max_length ) model = Transformer( encoder=encoder, src_embed=nn.Sequential(src_embed, pos_enc), src_pad_idx=tokenizer.pad_token_id, device=device, d_model=d_model, num_labels=num_labels, ) # Initialize parameters with Xaviar/Glorot # This maintains a consistent variance of activations throughout the network # Helps avoid issues like vanishing or exploding gradients. for p in model.parameters(): if p.dim() > 1: nn.init.xavier_uniform_(p) return model def get_sentiment(text, model, tokenizer, device, max_length: int = 32): model.eval() encoded = model.src_embed[0].lut.weight.new_tensor([]) encoded = tokenizer( text, truncation=True, max_length=max_length, padding="max_length", return_tensors="pt", ) src_tensor = encoded["input_ids"].to(device) with torch.inference_mode(): logits = model(src_tensor) # shape: (batch_size, num_labels) pred_index = torch.argmax(logits, dim=1).item() sentiment_map = { 0: "Very Negative", 1: "Negative", 2: "Neutral", 3: "Positive", 4: "Very Positive", } return sentiment_map.get(pred_index, "Unknown")