|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | import logging | 
					
						
						|  | import os | 
					
						
						|  | import importlib.metadata | 
					
						
						|  |  | 
					
						
						|  | from packaging import version | 
					
						
						|  | from importlib import import_module | 
					
						
						|  | from typing import List, Callable, Union, Optional, Dict | 
					
						
						|  |  | 
					
						
						|  | import PIL.Image | 
					
						
						|  | import torch | 
					
						
						|  | from torch import Tensor | 
					
						
						|  | from torch.nn import init | 
					
						
						|  | from torch.nn.functional import softmax, gumbel_softmax, pad | 
					
						
						|  | from transformers.utils import is_flash_attn_2_available | 
					
						
						|  | from transformers import PreTrainedModel, AutoModel, AutoTokenizer, AutoModelForCausalLM, AutoImageProcessor | 
					
						
						|  | from transformers.generation.utils import GenerateOutput | 
					
						
						|  |  | 
					
						
						|  | from .configuration_ovis import BaseVisualTokenizerConfig, Aimv2VisualTokenizerConfig | 
					
						
						|  | from .configuration_ovis import OvisConfig, ConversationFormatter | 
					
						
						|  | from .configuration_ovis import IGNORE_ID, IMAGE_ATOM_ID, IMAGE_INDICATOR_IDS, IMAGE_TOKEN_ID | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class BaseVisualTokenizer(PreTrainedModel): | 
					
						
						|  | base_model_prefix = "backbone" | 
					
						
						|  | main_input_name = None | 
					
						
						|  | _image_processor_class = None | 
					
						
						|  | _image_processor_kwargs = {} | 
					
						
						|  | _backbone_class = None | 
					
						
						|  | _backbone_name_or_path = None | 
					
						
						|  |  | 
					
						
						|  | def __init__(self, config: BaseVisualTokenizerConfig, *inputs, **kwargs): | 
					
						
						|  | super().__init__(config, *inputs, **kwargs) | 
					
						
						|  | self.image_processor = AutoImageProcessor.from_pretrained(kwargs['image_processor_name_or_path']) | 
					
						
						|  | self.backbone = AutoModel.from_config(self.config.backbone_config) | 
					
						
						|  | head_dim = self.config.vocab_size - len(IMAGE_INDICATOR_IDS) | 
					
						
						|  | self.head = torch.nn.Sequential( | 
					
						
						|  | torch.nn.Linear( | 
					
						
						|  | self.backbone.config.hidden_size * self.config.hidden_stride * self.config.hidden_stride, head_dim, | 
					
						
						|  | bias=False | 
					
						
						|  | ), | 
					
						
						|  | torch.nn.LayerNorm(head_dim) | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | assert all((self.image_processor.do_resize, | 
					
						
						|  | not getattr(self.image_processor, 'do_center_crop', False), | 
					
						
						|  | self.image_processor.do_rescale, | 
					
						
						|  | self.image_processor.do_normalize | 
					
						
						|  | )), f"image_processor `{self.image_processor}` is not supported currently" | 
					
						
						|  |  | 
					
						
						|  | def get_backbone(self): | 
					
						
						|  | return self.backbone | 
					
						
						|  |  | 
					
						
						|  | def get_image_processor(self): | 
					
						
						|  | return self.image_processor | 
					
						
						|  |  | 
					
						
						|  | def mock_input(self): | 
					
						
						|  | height, width = self.get_image_size() | 
					
						
						|  | return torch.zeros(1, 3, height, width), self.construct_image_placeholders((1, 1)) | 
					
						
						|  |  | 
					
						
						|  | def get_head(self): | 
					
						
						|  | return self.head | 
					
						
						|  |  | 
					
						
						|  | def get_image_size(self): | 
					
						
						|  | raise NotImplementedError | 
					
						
						|  |  | 
					
						
						|  | @staticmethod | 
					
						
						|  | def construct_image_placeholders(grid): | 
					
						
						|  | image_placeholders = [IMAGE_INDICATOR_IDS[0], IMAGE_ATOM_ID, IMAGE_INDICATOR_IDS[1]] | 
					
						
						|  | if grid[0] * grid[1] > 1: | 
					
						
						|  | for r in range(grid[0]): | 
					
						
						|  | for c in range(grid[1]): | 
					
						
						|  | image_placeholders.append(IMAGE_ATOM_ID) | 
					
						
						|  | if c < grid[1] - 1: | 
					
						
						|  | image_placeholders.append(IMAGE_INDICATOR_IDS[2]) | 
					
						
						|  | if r < grid[0] - 1: | 
					
						
						|  | image_placeholders.append(IMAGE_INDICATOR_IDS[3]) | 
					
						
						|  | image_placeholders.append(IMAGE_INDICATOR_IDS[4]) | 
					
						
						|  | return image_placeholders | 
					
						
						|  |  | 
					
						
						|  | def preprocess_image(self, image: PIL.Image.Image, max_partition=9, covering_threshold=0.9, convert_to_rgb=True): | 
					
						
						|  | def _preprocess(img: PIL.Image.Image, side): | 
					
						
						|  |  | 
					
						
						|  | w, h = img.size | 
					
						
						|  | if w == h: | 
					
						
						|  | new_width = new_height = side | 
					
						
						|  | elif w > h: | 
					
						
						|  | new_width = side | 
					
						
						|  | new_height = int(h / w * new_width) | 
					
						
						|  | else: | 
					
						
						|  | new_height = side | 
					
						
						|  | new_width = int(w / h * new_height) | 
					
						
						|  | new_size = dict(height=new_height, width=new_width) | 
					
						
						|  | pixel_values = self.image_processor.preprocess(img, size=new_size, return_tensors='pt')['pixel_values'] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | square_values = torch.zeros([1, 3, side, side], dtype=pixel_values.dtype, device=pixel_values.device) | 
					
						
						|  | new_height, new_width = pixel_values.shape[2:] | 
					
						
						|  | if new_height == new_width: | 
					
						
						|  | square_values[:, :, :, :] = pixel_values | 
					
						
						|  | elif new_height > new_width: | 
					
						
						|  | from_index = (side - new_width) // 2 | 
					
						
						|  | square_values[:, :, :, from_index:from_index + new_width] = pixel_values | 
					
						
						|  | else: | 
					
						
						|  | from_index = (side - new_height) // 2 | 
					
						
						|  | square_values[:, :, from_index:from_index + new_height, :] = pixel_values | 
					
						
						|  |  | 
					
						
						|  | return square_values | 
					
						
						|  |  | 
					
						
						|  | def _partition(img, grid): | 
					
						
						|  | w, h = img.size | 
					
						
						|  | row_height = h // grid[0] | 
					
						
						|  | col_width = w // grid[1] | 
					
						
						|  |  | 
					
						
						|  | partition = [] | 
					
						
						|  | for row in range(grid[0]): | 
					
						
						|  | for col in range(grid[1]): | 
					
						
						|  | left = col * col_width | 
					
						
						|  | upper = row * row_height | 
					
						
						|  | right = w if col == grid[1] - 1 else (col + 1) * col_width | 
					
						
						|  | lower = h if row == grid[0] - 1 else (row + 1) * row_height | 
					
						
						|  | partition.append((left, upper, right, lower)) | 
					
						
						|  |  | 
					
						
						|  | return partition | 
					
						
						|  |  | 
					
						
						|  | def _covering_area(left, upper, right, lower, side): | 
					
						
						|  | w = right - left | 
					
						
						|  | h = lower - upper | 
					
						
						|  | w, h = max(w, h), min(w, h) | 
					
						
						|  | if w > side: | 
					
						
						|  | h = h / w * side | 
					
						
						|  | w = side | 
					
						
						|  | return w * h | 
					
						
						|  |  | 
					
						
						|  | def _get_best_grid(img, side): | 
					
						
						|  | img_area = img.size[0] * img.size[1] | 
					
						
						|  |  | 
					
						
						|  | candidate_grids = [] | 
					
						
						|  | for i in range(1, max_partition + 1): | 
					
						
						|  | for j in range(1, max_partition + 1): | 
					
						
						|  | if i * j <= max_partition: | 
					
						
						|  | candidate_grids.append((i, j)) | 
					
						
						|  |  | 
					
						
						|  | all_grids = [] | 
					
						
						|  | good_grids = [] | 
					
						
						|  | for grid in candidate_grids: | 
					
						
						|  | partition = _partition(img, grid) | 
					
						
						|  | covering_ratio = sum([_covering_area(*p, side) for p in partition]) / img_area | 
					
						
						|  | assert covering_ratio <= 1.0 | 
					
						
						|  | all_grids.append((grid, covering_ratio)) | 
					
						
						|  | if covering_ratio > covering_threshold: | 
					
						
						|  | good_grids.append((grid, covering_ratio)) | 
					
						
						|  |  | 
					
						
						|  | if len(good_grids) > 0: | 
					
						
						|  |  | 
					
						
						|  | return sorted(good_grids, key=lambda x: (x[0][0] * x[0][1], -x[1]))[0][0] | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | return sorted(all_grids, key=lambda x: (-x[1], x[0][0] * x[0][1]))[0][0] | 
					
						
						|  |  | 
					
						
						|  | if convert_to_rgb and image.mode != 'RGB': | 
					
						
						|  | image = image.convert('RGB') | 
					
						
						|  |  | 
					
						
						|  | sides = self.get_image_size() | 
					
						
						|  | if sides[0] != sides[1]: | 
					
						
						|  | raise ValueError('get_image_size() returns non-square size') | 
					
						
						|  | side = sides[0] | 
					
						
						|  | grid = _get_best_grid(image, side) | 
					
						
						|  | partition = _partition(image, grid) | 
					
						
						|  | crops = [image.crop(p) for p in partition] | 
					
						
						|  | if len(crops) > 1: | 
					
						
						|  | crops.insert(0, image) | 
					
						
						|  | pixel_values = torch.cat([_preprocess(crop, side) for crop in crops], dim=0) | 
					
						
						|  | image_placeholders = self.construct_image_placeholders(grid) | 
					
						
						|  | return pixel_values, image_placeholders | 
					
						
						|  |  | 
					
						
						|  | def tokenize(self, logits): | 
					
						
						|  | def st_argmax(y_soft, dim): | 
					
						
						|  | index = y_soft.max(dim, keepdim=True)[1] | 
					
						
						|  | y_hard = torch.zeros_like(y_soft, memory_format=torch.legacy_contiguous_format).scatter_(dim, index, 1.0) | 
					
						
						|  | ret = y_hard - y_soft.detach() + y_soft | 
					
						
						|  | return ret | 
					
						
						|  |  | 
					
						
						|  | if self.config.tokenize_function == 'softmax': | 
					
						
						|  | tokens = softmax(logits, dim=-1) | 
					
						
						|  | elif self.config.tokenize_function == 'gumbel_argmax': | 
					
						
						|  | tokens = gumbel_softmax(logits, tau=self.config.tau, hard=True) | 
					
						
						|  | elif self.config.tokenize_function == 'st_argmax': | 
					
						
						|  | tokens = st_argmax(logits, dim=-1) | 
					
						
						|  | else: | 
					
						
						|  | raise ValueError( | 
					
						
						|  | f'Invalid `max_type`, expected softmax or gumbel_argmax or st_argmax, but got {self.config.tokenize_function}') | 
					
						
						|  | return tokens | 
					
						
						|  |  | 
					
						
						|  | def encode(self, pixel_values): | 
					
						
						|  | output = self.backbone(pixel_values, output_hidden_states=True, return_dict=True) | 
					
						
						|  | features = output.hidden_states[-1] | 
					
						
						|  | if self.config.drop_cls_token: | 
					
						
						|  | features = features[:, 1:, :] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if self.config.hidden_stride > 1: | 
					
						
						|  | n, l, d = features.shape | 
					
						
						|  | sqrt_l = int(l ** 0.5) | 
					
						
						|  | assert sqrt_l ** 2 == l, "The token sequence length should be a perfect square." | 
					
						
						|  | features = features.reshape(n, sqrt_l, sqrt_l, d) | 
					
						
						|  | pl = (self.config.hidden_stride - (sqrt_l % self.config.hidden_stride)) % self.config.hidden_stride | 
					
						
						|  | features = pad(features, (0, 0, 0, pl, 0, pl), "constant", 0) | 
					
						
						|  | sqrt_l += pl | 
					
						
						|  | features = features.reshape(n, sqrt_l // self.config.hidden_stride, self.config.hidden_stride, | 
					
						
						|  | sqrt_l // self.config.hidden_stride, self.config.hidden_stride, d) | 
					
						
						|  | features = features.permute(0, 1, 3, 2, 4, 5) | 
					
						
						|  | features = features.flatten(3) | 
					
						
						|  | features = features.reshape( | 
					
						
						|  | n, -1, self.config.hidden_stride * self.config.hidden_stride * d) | 
					
						
						|  |  | 
					
						
						|  | return features | 
					
						
						|  |  | 
					
						
						|  | def forward(self, pixel_values) -> torch.Tensor: | 
					
						
						|  | features = self.encode(pixel_values) | 
					
						
						|  | logits = self.head(features) | 
					
						
						|  | tokens = self.tokenize(logits) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | batch_size, token_len, _ = tokens.shape | 
					
						
						|  | padding_tensor = torch.zeros(size=(batch_size, token_len, len(IMAGE_INDICATOR_IDS)), | 
					
						
						|  | dtype=tokens.dtype, | 
					
						
						|  | device=tokens.device, | 
					
						
						|  | layout=tokens.layout, | 
					
						
						|  | requires_grad=False) | 
					
						
						|  | tokens = torch.cat((tokens, padding_tensor), dim=2) | 
					
						
						|  | return tokens | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class Aimv2VisualTokenizer(BaseVisualTokenizer): | 
					
						
						|  | config_class = Aimv2VisualTokenizerConfig | 
					
						
						|  | supports_gradient_checkpointing = True | 
					
						
						|  | _no_split_modules = ["AIMv2ViTPreprocessor", "AIMv2Block"] | 
					
						
						|  | _image_processor_kwargs = dict(do_center_crop=False) | 
					
						
						|  |  | 
					
						
						|  | def get_image_size(self): | 
					
						
						|  | height = self.image_processor.crop_size["height"] | 
					
						
						|  | width = self.image_processor.crop_size["width"] | 
					
						
						|  | return height, width | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | AutoModel.register(Aimv2VisualTokenizerConfig, Aimv2VisualTokenizer) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class VisualEmbedding(torch.nn.Embedding): | 
					
						
						|  | def forward(self, visual_tokens: Tensor) -> Tensor: | 
					
						
						|  | if visual_tokens.dtype in [torch.int8, torch.int16, torch.int32, torch.int64, torch.long]: | 
					
						
						|  | return super().forward(visual_tokens) | 
					
						
						|  | return torch.matmul(visual_tokens, self.weight) | 
					
						
						|  |  | 
					
						
						|  | def reset_parameters(self, mean=0., std=1.) -> None: | 
					
						
						|  | init.normal_(self.weight, mean=mean, std=std) | 
					
						
						|  | self._fill_padding_idx_with_zero() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class OvisPreTrainedModel(PreTrainedModel): | 
					
						
						|  | config_class = OvisConfig | 
					
						
						|  | base_model_prefix = "ovis" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class Ovis(OvisPreTrainedModel): | 
					
						
						|  |  | 
					
						
						|  | def __init__(self, config: OvisConfig, *inputs, **kwargs): | 
					
						
						|  | super().__init__(config, *inputs, **kwargs) | 
					
						
						|  | attn_kwargs = dict() | 
					
						
						|  | if self.config.llm_attn_implementation: | 
					
						
						|  | if self.config.llm_attn_implementation == "flash_attention_2": | 
					
						
						|  | assert (is_flash_attn_2_available() and | 
					
						
						|  | version.parse(importlib.metadata.version("flash_attn")) >= version.parse("2.6.3")), \ | 
					
						
						|  | "Using `flash_attention_2` requires having `flash_attn>=2.6.3` installed." | 
					
						
						|  | attn_kwargs["attn_implementation"] = self.config.llm_attn_implementation | 
					
						
						|  | self.llm = AutoModelForCausalLM.from_config(self.config.llm_config, **attn_kwargs) | 
					
						
						|  | assert self.config.hidden_size == self.llm.config.hidden_size, "hidden size mismatch" | 
					
						
						|  | self.text_tokenizer = AutoTokenizer.from_pretrained(self.config.name_or_path) | 
					
						
						|  | self.visual_tokenizer = AutoModel.from_config(self.config.visual_tokenizer_config, | 
					
						
						|  | image_processor_name_or_path=self.config.name_or_path) | 
					
						
						|  | self.vte = VisualEmbedding( | 
					
						
						|  | self.config.visual_tokenizer_config.vocab_size, | 
					
						
						|  | self.config.hidden_size, | 
					
						
						|  | device=self.visual_tokenizer.device, | 
					
						
						|  | dtype=self.visual_tokenizer.dtype | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | def _merge_modules(modules_list: tuple): | 
					
						
						|  | merged_modules = [] | 
					
						
						|  | for modules in modules_list: | 
					
						
						|  | merged_modules.extend(modules if modules else []) | 
					
						
						|  | return merged_modules | 
					
						
						|  |  | 
					
						
						|  | self._no_split_modules = _merge_modules((self.llm._no_split_modules, self.visual_tokenizer._no_split_modules)) | 
					
						
						|  | self._skip_keys_device_placement = self.llm._skip_keys_device_placement | 
					
						
						|  | self._keep_in_fp32_modules = _merge_modules( | 
					
						
						|  | (self.llm._keep_in_fp32_modules, self.visual_tokenizer._keep_in_fp32_modules)) | 
					
						
						|  | self.is_parallelizable = all((self.llm.is_parallelizable, self.visual_tokenizer.is_parallelizable)) | 
					
						
						|  | self.supports_gradient_checkpointing = True | 
					
						
						|  | self._supports_flash_attn_2 = True | 
					
						
						|  |  | 
					
						
						|  | def get_text_tokenizer(self): | 
					
						
						|  | return self.text_tokenizer | 
					
						
						|  |  | 
					
						
						|  | def get_visual_tokenizer(self): | 
					
						
						|  | return self.visual_tokenizer | 
					
						
						|  |  | 
					
						
						|  | def tie_weights(self): | 
					
						
						|  | if not self.config.disable_tie_weight: | 
					
						
						|  | self.get_llm().tie_weights() | 
					
						
						|  |  | 
					
						
						|  | def get_llm(self): | 
					
						
						|  | return self.llm | 
					
						
						|  |  | 
					
						
						|  | def get_vte(self): | 
					
						
						|  | return self.vte | 
					
						
						|  |  | 
					
						
						|  | def get_wte(self): | 
					
						
						|  | return self.llm.get_input_embeddings() | 
					
						
						|  |  | 
					
						
						|  | def get_conversation_formatter(self) -> ConversationFormatter: | 
					
						
						|  | if getattr(self, 'conversation_formatter', None) is None: | 
					
						
						|  | self.conversation_formatter = getattr(import_module(".configuration_ovis", __package__), | 
					
						
						|  | self.config.conversation_formatter_class)(self.text_tokenizer) | 
					
						
						|  | return self.conversation_formatter | 
					
						
						|  |  | 
					
						
						|  | def forward( | 
					
						
						|  | self, | 
					
						
						|  | input_ids: torch.Tensor, | 
					
						
						|  | attention_mask: torch.Tensor, | 
					
						
						|  | labels: Optional[torch.Tensor], | 
					
						
						|  | pixel_values: List[Optional[torch.Tensor]], | 
					
						
						|  | **kwargs | 
					
						
						|  | ): | 
					
						
						|  |  | 
					
						
						|  | _, inputs_embeds, labels, attention_mask = self.merge_multimodal( | 
					
						
						|  | text_input_ids=input_ids, | 
					
						
						|  | text_attention_masks=attention_mask, | 
					
						
						|  | text_labels=labels, | 
					
						
						|  | pixel_values=pixel_values | 
					
						
						|  | ) | 
					
						
						|  | return self.llm(inputs_embeds=inputs_embeds, labels=labels, attention_mask=attention_mask, **kwargs) | 
					
						
						|  |  | 
					
						
						|  | def merge_multimodal( | 
					
						
						|  | self, | 
					
						
						|  | text_input_ids: torch.Tensor, | 
					
						
						|  | text_attention_masks: torch.Tensor, | 
					
						
						|  | text_labels: Optional[torch.Tensor], | 
					
						
						|  | pixel_values: List[Optional[torch.Tensor]], | 
					
						
						|  | left_padding: bool = False | 
					
						
						|  | ): | 
					
						
						|  | input_device = text_input_ids.device | 
					
						
						|  | visual_vocab_szie = self.get_visual_tokenizer().config.vocab_size | 
					
						
						|  | visual_indicator_embeds = self.get_vte()( | 
					
						
						|  | torch.tensor( | 
					
						
						|  | list(range(visual_vocab_szie - 5, visual_vocab_szie)), | 
					
						
						|  | dtype=torch.long, | 
					
						
						|  | device=self.get_visual_tokenizer().device | 
					
						
						|  | ) | 
					
						
						|  | ).to(device=input_device) | 
					
						
						|  |  | 
					
						
						|  | if self.training: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | num_images = [x.shape[0] for x in pixel_values] | 
					
						
						|  | visual_tokens = self.visual_tokenizer(torch.cat([x for x in pixel_values], dim=0)) | 
					
						
						|  | visual_embeds = torch.split(self.get_vte()(visual_tokens).to(dtype=self.dtype, device=input_device), | 
					
						
						|  | split_size_or_sections=num_images, dim=0) | 
					
						
						|  | visual_input_ids = torch.split(torch.argmax(visual_tokens, dim=-1).to(device=input_device), | 
					
						
						|  | split_size_or_sections=num_images, dim=0) | 
					
						
						|  | visual_labels = [torch.full(x.shape, IGNORE_ID, dtype=torch.long, device=input_device) for x in | 
					
						
						|  | visual_input_ids] | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | num_images = [x.shape[0] if x is not None else 0 for x in pixel_values] | 
					
						
						|  | if sum(num_images) > 0: | 
					
						
						|  | visual_tokens = self.visual_tokenizer(torch.cat([x for x in pixel_values if x is not None], dim=0)) | 
					
						
						|  | visual_embeds = torch.split(self.get_vte()(visual_tokens).to(dtype=self.dtype, device=input_device), | 
					
						
						|  | split_size_or_sections=num_images, dim=0) | 
					
						
						|  | visual_input_ids = torch.split(torch.argmax(visual_tokens, dim=-1).to(device=input_device), | 
					
						
						|  | split_size_or_sections=num_images, dim=0) | 
					
						
						|  | visual_labels = [torch.full(x.shape, IGNORE_ID, dtype=torch.long, device=input_device) for x in | 
					
						
						|  | visual_input_ids] | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | visual_embeds = [None] * len(num_images) | 
					
						
						|  | visual_input_ids = [None] * len(num_images) | 
					
						
						|  | visual_labels = [None] * len(num_images) | 
					
						
						|  |  | 
					
						
						|  | if text_labels is None: | 
					
						
						|  | text_labels = torch.full(text_input_ids.shape, IGNORE_ID, dtype=torch.long, device=input_device) | 
					
						
						|  |  | 
					
						
						|  | input_embeds = [] | 
					
						
						|  | attention_masks = [] | 
					
						
						|  | labels = [] | 
					
						
						|  | for text_input_id, text_label, text_attention_mask, visual_embed, visual_input_id, visual_label in zip( | 
					
						
						|  | text_input_ids, text_labels, text_attention_masks, visual_embeds, visual_input_ids, visual_labels | 
					
						
						|  | ): | 
					
						
						|  | placeholder_token_mask = torch.lt(text_input_id, 0) | 
					
						
						|  | text_embed = self.get_wte()(torch.masked_fill(text_input_id, placeholder_token_mask, 0)) | 
					
						
						|  | for i, indicator_id in enumerate(IMAGE_INDICATOR_IDS): | 
					
						
						|  | text_embed[text_input_id == indicator_id] = visual_indicator_embeds[i] | 
					
						
						|  | image_atom_positions = torch.where(torch.eq(text_input_id, IMAGE_ATOM_ID))[0].tolist() | 
					
						
						|  | if len(image_atom_positions) > 0: | 
					
						
						|  | input_embed_parts = [] | 
					
						
						|  | attention_mask_parts = [] | 
					
						
						|  | label_parts = [] | 
					
						
						|  | prev_image_atom_position = -1 | 
					
						
						|  | for index, image_atom_position in enumerate(image_atom_positions): | 
					
						
						|  | input_embed_parts.append( | 
					
						
						|  | text_embed[prev_image_atom_position + 1:image_atom_position, :]) | 
					
						
						|  | label_parts.append( | 
					
						
						|  | text_label[prev_image_atom_position + 1:image_atom_position]) | 
					
						
						|  | attention_mask_parts.append( | 
					
						
						|  | text_attention_mask[prev_image_atom_position + 1:image_atom_position]) | 
					
						
						|  | input_embed_parts.append(visual_embed[index]) | 
					
						
						|  | attention_mask_parts.append( | 
					
						
						|  | torch.ones_like(visual_label[index], dtype=torch.bool)) | 
					
						
						|  | label_parts.append(visual_label[index]) | 
					
						
						|  | prev_image_atom_position = image_atom_position | 
					
						
						|  | if prev_image_atom_position + 1 < text_input_id.shape[0]: | 
					
						
						|  | input_embed_parts.append( | 
					
						
						|  | text_embed[prev_image_atom_position + 1:, :]) | 
					
						
						|  | attention_mask_parts.append( | 
					
						
						|  | text_attention_mask[prev_image_atom_position + 1:]) | 
					
						
						|  | label_parts.append( | 
					
						
						|  | text_label[prev_image_atom_position + 1:]) | 
					
						
						|  | input_embed = torch.cat(input_embed_parts, dim=0) | 
					
						
						|  | attention_mask = torch.cat(attention_mask_parts, dim=0) | 
					
						
						|  | label = torch.cat(label_parts, dim=0) | 
					
						
						|  | else: | 
					
						
						|  | input_embed = text_embed | 
					
						
						|  | attention_mask = text_attention_mask | 
					
						
						|  | label = text_label | 
					
						
						|  | if self.training: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | input_embed += torch.sum(visual_embed * 0.0) + torch.sum(visual_indicator_embeds * 0.0) | 
					
						
						|  | input_embeds.append(input_embed) | 
					
						
						|  | attention_masks.append(attention_mask) | 
					
						
						|  | labels.append(label) | 
					
						
						|  |  | 
					
						
						|  | if self.training: | 
					
						
						|  | padding_size = max(0, self.config.multimodal_max_length - len(input_embeds[0])) | 
					
						
						|  | input_embeds[0] = torch.nn.ConstantPad2d((0, 0, 0, padding_size), 0.0)(input_embeds[0]) | 
					
						
						|  | attention_masks[0] = torch.nn.ConstantPad1d((0, padding_size), False)(attention_masks[0]) | 
					
						
						|  | labels[0] = torch.nn.ConstantPad1d((0, padding_size), IGNORE_ID)(labels[0]) | 
					
						
						|  | batch_input_embeds = self.pad_truncate_sequence(input_embeds, batch_first=True, padding_value=0.0, left_padding=left_padding) | 
					
						
						|  | batch_attention_mask = self.pad_truncate_sequence(attention_masks, batch_first=True, padding_value=False, left_padding=left_padding) | 
					
						
						|  | batch_labels = self.pad_truncate_sequence(labels, batch_first=True, padding_value=IGNORE_ID, left_padding=left_padding) | 
					
						
						|  |  | 
					
						
						|  | return visual_input_ids, batch_input_embeds, batch_labels, batch_attention_mask | 
					
						
						|  |  | 
					
						
						|  | def pad_truncate_sequence(self, sequences: List[torch.Tensor], batch_first: bool = True, padding_value: float = 0.0, left_padding: bool = False) -> torch.Tensor: | 
					
						
						|  | if not left_padding: | 
					
						
						|  | pad_sequence = torch.nn.utils.rnn.pad_sequence(sequences, batch_first=batch_first, padding_value=padding_value) | 
					
						
						|  | return pad_sequence[:,:self.config.multimodal_max_length] | 
					
						
						|  | else: | 
					
						
						|  | pad_sequence = torch.nn.utils.rnn.pad_sequence([i.flip(dims=[0]) for i in sequences],batch_first=True, padding_value=padding_value).flip(dims=[1]) | 
					
						
						|  | return pad_sequence[:,-self.config.multimodal_max_length:] | 
					
						
						|  |  | 
					
						
						|  | def preprocess_inputs( | 
					
						
						|  | self, | 
					
						
						|  | text_or_conversations: Union[List[Dict], str], | 
					
						
						|  | images: Optional[List[PIL.Image.Image]], | 
					
						
						|  | max_partition=9, | 
					
						
						|  | generation_preface='', | 
					
						
						|  | return_labels=False, | 
					
						
						|  | propagate_exception=True, | 
					
						
						|  | frame_selector=None, | 
					
						
						|  | frame_selector_kwargs=None | 
					
						
						|  | ): | 
					
						
						|  |  | 
					
						
						|  | if isinstance(text_or_conversations, str): | 
					
						
						|  | conversations = [{ | 
					
						
						|  | "from": "human", | 
					
						
						|  | "value": text_or_conversations | 
					
						
						|  | }] | 
					
						
						|  | elif isinstance(text_or_conversations, list): | 
					
						
						|  | conversations = text_or_conversations | 
					
						
						|  | else: | 
					
						
						|  | raise ValueError(f'Invalid type of `text_or_conversations`, expected `List[Dict]` or `str`,' | 
					
						
						|  | f' but got {type(text_or_conversations)}') | 
					
						
						|  |  | 
					
						
						|  | if frame_selector is not None: | 
					
						
						|  | frame_selector_kwargs = frame_selector_kwargs or {} | 
					
						
						|  | conversations, images = frame_selector(conversations=conversations, frames=images, **frame_selector_kwargs) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | prompt, raw_input_ids, raw_labels = self.get_conversation_formatter().format( | 
					
						
						|  | conversations, generation_preface=generation_preface) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | input_ids = [] | 
					
						
						|  | labels = [] | 
					
						
						|  | pixel_values = [] | 
					
						
						|  | invalidate_label = False | 
					
						
						|  | image_token_indices = [i for i, v in enumerate(raw_input_ids) if v == IMAGE_TOKEN_ID] | 
					
						
						|  | last_image_token_index = -1 | 
					
						
						|  | for i in range(len(image_token_indices)): | 
					
						
						|  | head = 0 if i == 0 else image_token_indices[i - 1] + 1 | 
					
						
						|  | tail = image_token_indices[i] | 
					
						
						|  | last_image_token_index = tail | 
					
						
						|  | input_ids.extend(raw_input_ids[head:tail]) | 
					
						
						|  | labels.extend(raw_labels[head:tail]) | 
					
						
						|  | try: | 
					
						
						|  | image = images[i] | 
					
						
						|  | raw_pixel_values, image_placeholders = self.visual_tokenizer.preprocess_image( | 
					
						
						|  | image, max_partition=max_partition) | 
					
						
						|  | except Exception as e: | 
					
						
						|  | if propagate_exception: | 
					
						
						|  | raise e | 
					
						
						|  | logging.exception(e) | 
					
						
						|  | invalidate_label = True | 
					
						
						|  | raw_pixel_values, image_placeholders = self.visual_tokenizer.mock_input() | 
					
						
						|  | input_ids.extend(image_placeholders) | 
					
						
						|  | labels.extend([IGNORE_ID] * len(image_placeholders)) | 
					
						
						|  | pixel_values.append(raw_pixel_values) | 
					
						
						|  | input_ids.extend(raw_input_ids[last_image_token_index + 1:]) | 
					
						
						|  | labels.extend(raw_labels[last_image_token_index + 1:]) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | input_ids = torch.tensor(input_ids, dtype=torch.long) | 
					
						
						|  | labels = torch.tensor([IGNORE_ID] * len(labels) if invalidate_label else labels, dtype=torch.long) | 
					
						
						|  | pixel_values = torch.cat(pixel_values, dim=0) if len(pixel_values) > 0 else None | 
					
						
						|  |  | 
					
						
						|  | if return_labels: | 
					
						
						|  | return prompt, input_ids, pixel_values, labels | 
					
						
						|  | else: | 
					
						
						|  | return prompt, input_ids, pixel_values | 
					
						
						|  |  | 
					
						
						|  | def save_pretrained( | 
					
						
						|  | self, | 
					
						
						|  | save_directory: Union[str, os.PathLike], | 
					
						
						|  | is_main_process: bool = True, | 
					
						
						|  | state_dict: Optional[dict] = None, | 
					
						
						|  | save_function: Callable = torch.save, | 
					
						
						|  | push_to_hub: bool = False, | 
					
						
						|  | max_shard_size: Union[int, str] = "5GB", | 
					
						
						|  | safe_serialization: bool = True, | 
					
						
						|  | variant: Optional[str] = None, | 
					
						
						|  | token: Optional[Union[str, bool]] = None, | 
					
						
						|  | save_peft_format: bool = True, | 
					
						
						|  | **kwargs | 
					
						
						|  | ): | 
					
						
						|  | super().save_pretrained(save_directory, | 
					
						
						|  | is_main_process=is_main_process, | 
					
						
						|  | state_dict=state_dict, | 
					
						
						|  | save_function=save_function, | 
					
						
						|  | safe_serialization=safe_serialization) | 
					
						
						|  | self.get_text_tokenizer().save_pretrained(save_directory) | 
					
						
						|  | self.get_visual_tokenizer().get_image_processor().save_pretrained(save_directory) | 
					
						
						|  |  | 
					
						
						|  | def generate( | 
					
						
						|  | self, | 
					
						
						|  | inputs: Optional[torch.Tensor] = None, | 
					
						
						|  | **kwargs | 
					
						
						|  | ) -> Union[GenerateOutput, torch.LongTensor]: | 
					
						
						|  | _, inputs_embeds, labels, attention_mask = self.merge_multimodal( | 
					
						
						|  | text_input_ids=inputs, | 
					
						
						|  | text_attention_masks=kwargs.pop('attention_mask'), | 
					
						
						|  | text_labels=None, | 
					
						
						|  | pixel_values=kwargs.pop('pixel_values'), | 
					
						
						|  | left_padding=True | 
					
						
						|  | ) | 
					
						
						|  | inputs_embeds = inputs_embeds.detach() | 
					
						
						|  | torch.cuda.empty_cache() | 
					
						
						|  |  | 
					
						
						|  | return self.llm.generate(inputs=None, inputs_embeds=inputs_embeds, attention_mask=attention_mask, **kwargs) | 
					
						
						|  |  |