# -------------------------------------------------------- # NVIDIA # Copyright (c) 2025 NVIDIA # Licensed under The MIT License [see LICENSE for details] # -------------------------------------------------------- import warnings import inspect from typing import Any, List, Optional, Tuple, Union import torch from torch import nn import torch.distributed as dist from torch.nn import CrossEntropyLoss import torch.nn.functional as F from transformers.models.qwen2.modeling_qwen2 import Qwen2ForCausalLM from transformers.models.llama.modeling_llama import LlamaForCausalLM import torch.utils.checkpoint as cp from transformers.models.siglip.modeling_siglip import SiglipVisionModel from peft import LoraConfig, get_peft_model from transformers.generation import GenerationMixin from transformers import GenerationConfig from transformers.modeling_outputs import CausalLMOutputWithPast from transformers.modeling_utils import PreTrainedModel from transformers.utils import ModelOutput, logging from .configuration_eagle2_5_vl import Eagle2_5_VLConfig from transformers.utils import add_start_docstrings, add_start_docstrings_to_model_forward, logging, replace_return_docstrings logger = logging.get_logger(__name__) # copy from https://github.com/huggingface/transformers/blob/main/src/transformers/models/llava_onevision/modeling_llava_onevision.py#L241C1-L280C1 EAGLE2_5_VL_START_DOCSTRING = r""" This model inherits from [`PreTrainedModel`]. Check the superclass documentation for the generic methods the library implements for all its model (such as downloading or saving, resizing the input embeddings, pruning heads etc.) This model is also a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass. Use it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage and behavior. Parameters: config ([`Eagle2_5_VLConfig`]): Model configuration class with all the parameters of the model. Initializing with a config file does not load the weights associated with the model, only the configuration. Check out the [`~PreTrainedModel.from_pretrained`] method to load the model weights. """ @add_start_docstrings( "The bare Eagle2_5_VL Model outputting raw hidden-states without any specific head on top.", EAGLE2_5_VL_START_DOCSTRING, ) class Eagle2_5_VLPreTrainedModel(PreTrainedModel): config_class = Eagle2_5_VLConfig base_model_prefix = "model" main_input_name = 'input_ids' supports_gradient_checkpointing = True _no_split_modules = ["Qwen2DecoderLayer", "LlamaDecoderLayer" ,"Siglip2EncoderLayer", "SiglipEncoderLayer"] _skip_keys_device_placement = "past_key_values" _supports_flash_attn_2 = True _supports_cache_class = True _supports_static_cache = True _supports_quantized_cache = True _supports_sdpa = True def _init_weights(self, module): std = self.config.initializer_range if isinstance(module, (nn.Linear, nn.Conv2d)): module.weight.data.normal_(mean=0.0, std=std) if module.bias is not None: module.bias.data.zero_() elif isinstance(module, nn.Embedding): module.weight.data.normal_(mean=0.0, std=std) if module.padding_idx is not None: module.weight.data[module.padding_idx].zero_() class Eagle2_5_VLForConditionalGeneration(Eagle2_5_VLPreTrainedModel, GenerationMixin): config_class = Eagle2_5_VLConfig def __init__(self, config: Eagle2_5_VLConfig, vision_model=None, language_model=None): super().__init__(config) image_size = config.force_image_size or config.vision_config.image_size patch_size = config.vision_config.patch_size self.patch_size = patch_size self.num_image_token = int((image_size // patch_size) ** 2 * (config.downsample_ratio ** 2)) self.select_layer = config.select_layer self.downsample_ratio = config.downsample_ratio self.loss_version = config.loss_version self.mlp_checkpoint = config.mlp_checkpoint logger.info(f'num_image_token: {self.num_image_token}') logger.info(f'mlp_checkpoint: {self.mlp_checkpoint}') if vision_model is not None: self.vision_model = vision_model else: if config.vision_config.model_type == 'siglip_vision_model': config.vision_config._attn_implementation = 'flash_attention_2' self.vision_model = SiglipVisionModel(config.vision_config) else: raise NotImplementedError(f'{config.vision_config.model_type} is not implemented.') if language_model is not None: self.language_model = language_model else: if config.text_config.architectures[0] == 'LlamaForCausalLM': self.language_model = LlamaForCausalLM(config.text_config) elif config.text_config.architectures[0] == 'Qwen2ForCausalLM': # assert config.text_config._attn_implementation == 'flash_attention_2', f"Qwen2 must use flash_attention_2 but got {config.text_config._attn_implementation}" self.language_model = Qwen2ForCausalLM(config.text_config) else: raise NotImplementedError(f'{config.text_config.architectures[0]} is not implemented.') vit_hidden_size = config.vision_config.hidden_size llm_hidden_size = config.text_config.hidden_size self.mlp1 = nn.Sequential( nn.LayerNorm(vit_hidden_size * int(1 / self.downsample_ratio) ** 2), nn.Linear(vit_hidden_size * int(1 / self.downsample_ratio) ** 2, llm_hidden_size), nn.GELU(), nn.Linear(llm_hidden_size, llm_hidden_size) ) self.image_token_index = config.image_token_index self.neftune_alpha = None if config.use_backbone_lora: self.wrap_backbone_lora(r=config.use_backbone_lora, lora_alpha=2 * config.use_backbone_lora) self.use_llm_lora = config.use_llm_lora if config.use_llm_lora: self.wrap_llm_lora(r=config.use_llm_lora, lora_alpha=2 * config.use_llm_lora) self.check_forward_kwargs() def check_forward_kwargs(self): # We intentionally avoid using **kwargs in forward because Hugging Face Transformers # has special handling for functions with **kwargs parameters that would affect # how our model is processed during training and inference. forward_params = inspect.signature(self.forward).parameters assert not any(k.kind == inspect.Parameter.VAR_KEYWORD for k in forward_params.values()) def wrap_backbone_lora(self, r=128, lora_alpha=256, lora_dropout=0.05): lora_config = LoraConfig( r=r, target_modules=['self_attn.q_proj', 'self_attn.k_proj', 'self_attn.v_proj', 'self_attn.out_proj', 'mlp.fc1', 'mlp.fc2'], lora_alpha=lora_alpha, lora_dropout=lora_dropout, ) self.vision_model = get_peft_model(self.vision_model, lora_config) self.vision_model.print_trainable_parameters() def wrap_llm_lora(self, r=128, lora_alpha=256, lora_dropout=0.05): lora_config = LoraConfig( r=r, target_modules=['self_attn.q_proj', 'self_attn.k_proj', 'self_attn.v_proj', 'self_attn.o_proj', 'mlp.gate_proj', 'mlp.down_proj', 'mlp.up_proj'], lora_alpha=lora_alpha, lora_dropout=lora_dropout, task_type='CAUSAL_LM' ) self.language_model = get_peft_model(self.language_model, lora_config) self.language_model.enable_input_require_grads() self.language_model.print_trainable_parameters() self.use_llm_lora = True def forward( self, pixel_values: torch.FloatTensor, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, image_flags: Optional[torch.LongTensor] = None, past_key_values: Optional[List[torch.FloatTensor]] = None, labels: Optional[torch.LongTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, num_tiles_list: Optional[List[torch.Tensor]] = None, ) -> Union[Tuple, CausalLMOutputWithPast]: return_dict = return_dict if return_dict is not None else self.config.use_return_dict image_flags = image_flags.squeeze(-1) input_embeds = self.language_model.get_input_embeddings()(input_ids) vit_embeds = self.extract_feature(pixel_values) if not isinstance(image_flags, list): image_flags = image_flags.squeeze(-1) vit_embeds = vit_embeds[image_flags == 1] vit_batch_size = pixel_values.shape[0] B, N, C = input_embeds.shape input_embeds = input_embeds.reshape(B * N, C) if torch.distributed.get_rank() == 0: print(f'dynamic ViT batch size: {vit_batch_size}, images per sample: {vit_batch_size / B}, dynamic token length: {N}') input_ids = input_ids.reshape(B * N) selected = (input_ids == self.image_token_index) try: input_embeds[selected] = input_embeds[selected] * 0.0 + vit_embeds.reshape(-1, C) except Exception as e: vit_embeds = vit_embeds.reshape(-1, C) print(f'warning: {e}, input_embeds[selected].shape={input_embeds[selected].shape}, ' f'vit_embeds.shape={vit_embeds.shape}') n_token = selected.sum() input_embeds[selected] = input_embeds[selected] * 0.0 + vit_embeds[:n_token] input_embeds = input_embeds.reshape(B, N, C) outputs = self.language_model( inputs_embeds=input_embeds, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, use_cache=use_cache, output_attentions=output_attentions, output_hidden_states=output_hidden_states, ) logits = outputs.logits loss = None if labels is not None: # Shift so that tokens < n predict n shift_logits = logits[..., :-1, :].contiguous() shift_labels = labels[..., 1:].contiguous() # Flatten the tokens loss_fct = CrossEntropyLoss() shift_logits = shift_logits.view(-1, self.language_model.config.vocab_size) shift_labels = shift_labels.view(-1) # Enable model parallelism shift_labels = shift_labels.to(shift_logits.device) loss = loss_fct(shift_logits, shift_labels) if not return_dict: output = (logits,) + outputs[1:] return (loss,) + output if loss is not None else output return CausalLMOutputWithPast( loss=loss, logits=logits, past_key_values=outputs.past_key_values, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) def pixel_shuffle(self, x, scale_factor=0.5): n, w, h, c = x.size() # N, W, H, C --> N, W, H * scale, C // scale x = x.view(n, w, int(h * scale_factor), int(c / scale_factor)) # N, W, H * scale, C // scale --> N, H * scale, W, C // scale x = x.permute(0, 2, 1, 3).contiguous() # N, H * scale, W, C // scale --> N, H * scale, W * scale, C // (scale ** 2) x = x.view(n, int(h * scale_factor), int(w * scale_factor), int(c / (scale_factor * scale_factor))) x = x.permute(0, 2, 1, 3).contiguous() return x def extract_feature(self, pixel_values): if self.select_layer == -1: vit_embeds = self.vision_model( pixel_values=pixel_values, output_hidden_states=False, return_dict=True) if hasattr(vit_embeds, 'last_hidden_state'): vit_embeds = vit_embeds.last_hidden_state else: vit_embeds = self.vision_model( pixel_values=pixel_values, output_hidden_states=True, return_dict=True).hidden_states[self.select_layer] h = w = int(vit_embeds.shape[1] ** 0.5) vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1) vit_embeds = self.pixel_shuffle(vit_embeds, scale_factor=self.downsample_ratio) # torch.Size([B, 1024, 1024]) -> torch.Size([B, 16, 16, 4096]) vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1]) # torch.Size([B, 16, 16, 4096]) -> torch.Size([B, 256, 4096]) if self.mlp_checkpoint and vit_embeds.requires_grad: vit_embeds = cp.checkpoint(self.mlp1, vit_embeds) else: vit_embeds = self.mlp1(vit_embeds) return vit_embeds @torch.no_grad() def generate( self, pixel_values: Optional[torch.FloatTensor] = None, input_ids: Optional[torch.FloatTensor] = None, attention_mask: Optional[torch.LongTensor] = None, visual_features: Optional[torch.FloatTensor] = None, generation_config: Optional[GenerationConfig] = None, output_hidden_states: Optional[bool] = None, image_sizes: Optional[List[Tuple[int, int]]] = None, **generate_kwargs, ) -> torch.LongTensor: if pixel_values is not None: if visual_features is not None: vit_embeds = visual_features else: vit_embeds = self.extract_feature(pixel_values) input_embeds = self.language_model.get_input_embeddings()(input_ids) B, N, C = input_embeds.shape input_embeds = input_embeds.reshape(B * N, C) input_ids = input_ids.reshape(B * N) selected = (input_ids == self.config.image_token_index) assert selected.sum() != 0 input_embeds[selected] = vit_embeds.reshape(-1, C).to(input_embeds.device) input_embeds = input_embeds.reshape(B, N, C) else: input_embeds = self.language_model.get_input_embeddings()(input_ids) outputs = self.language_model.generate( inputs_embeds=input_embeds, attention_mask=attention_mask, generation_config=generation_config, output_hidden_states=output_hidden_states, use_cache=True, **generate_kwargs, ) return outputs # Copied from transformers.models.llava_next.modeling_llava_next.LlavaNextForConditionalGeneration.get_input_embeddings def get_input_embeddings(self): return self.language_model.get_input_embeddings() # Copied from transformers.models.llava_next.modeling_llava_next.LlavaNextForConditionalGeneration.set_input_embeddings def set_input_embeddings(self, value): self.language_model.set_input_embeddings(value) # Copied from transformers.models.llava_next.modeling_llava_next.LlavaNextForConditionalGeneration.get_output_embeddings def get_output_embeddings(self): return self.language_model.get_output_embeddings() # Copied from transformers.models.llava_next.modeling_llava_next.LlavaNextForConditionalGeneration.set_output_embeddings def set_output_embeddings(self, new_embeddings): self.language_model.set_output_embeddings(new_embeddings) # Copied from transformers.models.llava_next.modeling_llava_next.LlavaNextForConditionalGeneration.set_decoder def set_decoder(self, decoder): self.language_model.set_decoder(decoder) # Copied from transformers.models.llava_next.modeling_llava_next.LlavaNextForConditionalGeneration.get_decoder def get_decoder(self): return self.language_model.get_decoder()