Spaces:
Running
Running
| """ | |
| This module defines a self-contained StarVector model with support for remote code loading. | |
| """ | |
| import os | |
| import torch | |
| import torch.nn as nn | |
| from transformers import PreTrainedModel, PretrainedConfig | |
| from typing import Optional, Union, List | |
| from abc import ABC, abstractmethod | |
| # Import components - these will be included in the HF repo | |
| from .starvector.image_encoder import ImageEncoder | |
| from .starvector.adapter import Adapter | |
| # === Model Configuration === | |
| class StarVectorConfig(PretrainedConfig): | |
| model_type = "starvector" | |
| def __init__( | |
| self, | |
| starcoder_model_name: str = "bigcode/starcoderbase-1b", | |
| image_encoder_type: str = "clip", | |
| adapter_norm: str = "layer_norm", | |
| image_size: int = 224, | |
| max_length: int = 8192, | |
| max_length_train: int = 8192, | |
| use_flash_attn: bool = True, | |
| use_cache: bool = True, | |
| num_attention_heads: int = 16, | |
| num_hidden_layers: int = 24, | |
| vocab_size: int = 49152, | |
| hidden_size: int = 2048, | |
| num_kv_heads: int = 4, | |
| torch_dtype: str = "bfloat16", | |
| **kwargs, | |
| ): | |
| # Initialize the parent config first | |
| super().__init__(**kwargs) | |
| self.starcoder_model_name = starcoder_model_name | |
| self.image_encoder_type = image_encoder_type | |
| self.adapter_norm = adapter_norm | |
| self.image_size = image_size | |
| self.max_length = max_length | |
| self.max_length_train = max_length_train | |
| self.use_flash_attn = use_flash_attn | |
| self.use_cache = use_cache | |
| self.num_attention_heads = num_attention_heads | |
| self.num_hidden_layers = num_hidden_layers | |
| self.vocab_size = vocab_size | |
| self.hidden_size = hidden_size | |
| self.num_kv_heads = num_kv_heads | |
| self.torch_dtype = torch_dtype | |
| # === Base Model Classes === | |
| class StarVectorBase(nn.Module, ABC): | |
| def __init__(self, config, **kwargs): | |
| super().__init__() | |
| self.task = kwargs.get('task', 'im2svg') | |
| self.model_precision = kwargs.get('model_precision', config.torch_dtype) | |
| # Instantiate the SVG transformer using the abstract method. | |
| self.svg_transformer = self._get_svg_transformer(config, **kwargs) | |
| if self.use_image_encoder(): | |
| self.image_encoder = ImageEncoder(config, **kwargs) | |
| self.image_projection = self.get_adapter(config, **kwargs).to(dtype=self.model_precision) | |
| else: | |
| self.query_length = 0 | |
| self.max_length = config.max_length_train - getattr(self, "query_length", 0) - 4 | |
| self.train_image_encoder = kwargs.get('train_image_encoder', False) | |
| self.train_LLM = kwargs.get('train_LLM', False) | |
| self._freeze_parameters(self.train_image_encoder, self.train_LLM) | |
| def _get_svg_transformer(self, config, **kwargs): | |
| """Get SVG transformer model - implementation differs between versions""" | |
| pass | |
| def _freeze_parameters(self, train_image_encoder, train_LLM): | |
| if self.use_image_encoder(): | |
| for _, param in self.image_encoder.named_parameters(): | |
| param.requires_grad = train_image_encoder | |
| for _, param in self.image_projection.named_parameters(): | |
| param.requires_grad = train_image_encoder | |
| for _, param in self.svg_transformer.transformer.named_parameters(): | |
| param.requires_grad = train_LLM | |
| def use_image_encoder(self): | |
| return self.task == 'im2svg' | |
| def get_adapter(self, config, **kwargs): | |
| # Determine hidden size and query length based on the image encoder type. | |
| if config.image_encoder_type == 'clip': | |
| hidden_size = self.image_encoder.num_features | |
| self.query_length = 257 | |
| elif config.image_encoder_type == 'vqgan': | |
| hidden_size = 256 | |
| self.query_length = 196 | |
| else: | |
| hidden_size = 256 # default fallback | |
| self.query_length = 200 | |
| llm_hidden_size = config.hidden_size # assuming the transformer hidden size | |
| return Adapter(hidden_size, llm_hidden_size, adapter_norm=config.adapter_norm, query_length=self.query_length, dropout_prob=kwargs.get('dropout', 0.1)) | |
| def forward(self, batch): | |
| # Simplified forward pass where we assume batch has an "image" key. | |
| image = batch["image"] | |
| if self.use_image_encoder(): | |
| embedded_image = self.image_encoder(image) | |
| conditioning_embeds = self.image_projection(embedded_image) | |
| # For demo purposes, we generate dummy input embeddings (replace with your logic) | |
| inputs_embeds = self.svg_transformer.transformer.wte( | |
| torch.randint(0, self.svg_transformer.transformer.wte.num_embeddings, (image.size(0), self.max_length)) | |
| ) | |
| else: | |
| inputs_embeds = self.svg_transformer.transformer.wte( | |
| torch.randint(0, self.svg_transformer.transformer.wte.num_embeddings, (image.size(0), self.max_length)) | |
| ) | |
| return inputs_embeds # Dummy return | |
| def generate_im2svg(self, batch, **kwargs): | |
| # Prepare generation inputs (dummy implementation) | |
| image = batch["image"] | |
| if self.use_image_encoder(): | |
| embedded_image = self.image_encoder(image) | |
| conditioning_embeds = self.image_projection(embedded_image) | |
| else: | |
| conditioning_embeds = torch.zeros((image.size(0), 10, 1), device=image.device) | |
| generation_output = self.svg_transformer.transformer.generate(inputs_embeds=conditioning_embeds, max_length=kwargs.get('max_length', 30)) | |
| raw_svg = self.svg_transformer.tokenizer.batch_decode(generation_output, skip_special_tokens=True) | |
| return raw_svg | |
| def _get_embeddings(self, input_ids): | |
| """Get embeddings from input ids - implementation differs between v1 and v2""" | |
| pass | |
| def _get_svg_text(self, svg_list): | |
| """Get SVG text with appropriate end tokens - implementation differs between v1 and v2""" | |
| pass | |
| # V1 implementation: Delegates transformer creation to the external LLM file. | |
| class StarVectorStarCoder(StarVectorBase): | |
| def __init__(self, config, **kwargs): | |
| super().__init__(config, **kwargs) | |
| def _get_svg_transformer(self, config, **kwargs): | |
| from starvector.model.llm.starcoder import StarCoderModel # V1: use StarCoderModel from external file | |
| return StarCoderModel(config, **kwargs) | |
| def _get_embeddings(self, input_ids): | |
| """V1-specific embedding method""" | |
| # This follows the implementation in starvector/model/models/starvector_v1.py. | |
| return self.svg_transformer.transformer.transformer.wte(input_ids) | |
| def _get_svg_text(self, svg_list): | |
| """V1-specific SVG text preparation""" | |
| return [t + self.svg_transformer.tokenizer.eos_token for t in svg_list] | |
| # V2 implementation: Delegates transformer creation to the external V2 LLM file. | |
| class StarVectorStarCoder2(StarVectorBase): | |
| def __init__(self, config, **kwargs): | |
| super().__init__(config, **kwargs) | |
| def _get_svg_transformer(self, config, **kwargs): | |
| from starvector.model.llm.starcoder2 import StarCoderModel # V2: use external StarCoderModel from starcoder2.py | |
| return StarCoderModel(config, **kwargs) | |
| def _get_embeddings(self, input_ids): | |
| """V2-specific embedding method""" | |
| return self.svg_transformer.transformer.model.embed_tokens(input_ids) | |
| def _get_svg_text(self, svg_list): | |
| """V2-specific SVG text preparation""" | |
| return [t + self.svg_transformer.svg_end_token + self.svg_transformer.tokenizer.eos_token for t in svg_list] | |
| def _get_im2svg_specific_kwargs(self, kwargs): | |
| """V2-specific generation kwargs""" | |
| return { | |
| 'eos_token_id': self.svg_transformer.svg_end_token_id, | |
| } | |
| def _get_text2svg_specific_kwargs(self, kwargs): | |
| """V2-specific text2svg generation kwargs""" | |
| return { | |
| 'eos_token_id': self.svg_transformer.tokenizer.eos_token_id, | |
| } | |
| # === Main Model Class for Hugging Face === | |
| class StarVectorForCausalLM(PreTrainedModel): | |
| config_class = StarVectorConfig | |
| _no_split_modules = [] | |
| def __init__(self, config, **kwargs): | |
| super().__init__(config) | |
| # Choose V2 if the model name indicates starcoder2; otherwise use V1. | |
| if "starcoder2" in config.starcoder_model_name.lower(): | |
| self.model = StarVectorStarCoder2(config=config, **kwargs) | |
| else: | |
| self.model = StarVectorStarCoder(config=config, **kwargs) | |
| def forward(self, batch): | |
| return self.model(batch) | |
| def generate_im2svg(self, batch, **kwargs): | |
| return self.model.generate_im2svg(batch, **kwargs) | |
| def generate_im2text(self, batch, **kwargs): | |
| return self.model.generate_im2text(batch, **kwargs) | |
| def process_images(self, images): | |
| return self.model.image_encoder.process_images(images) | |
| # === Registration for Autonomous Loading === | |
| StarVectorConfig.register_for_auto_class() | |
| StarVectorForCausalLM.register_for_auto_class("AutoModelForCausalLM") |