Spaces:
Runtime error
Runtime error
| import logging | |
| import os | |
| from pathlib import Path | |
| from typing import List, Optional, Tuple | |
| import torch | |
| from comet_ml import API | |
| from langchain.llms import HuggingFacePipeline | |
| from peft import LoraConfig, PeftConfig, PeftModel | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| BitsAndBytesConfig, | |
| StoppingCriteria, | |
| StoppingCriteriaList, | |
| TextIteratorStreamer, | |
| pipeline, | |
| ) | |
| from financial_bot import constants | |
| from financial_bot.utils import MockedPipeline | |
| logger = logging.getLogger(__name__) | |
| def download_from_model_registry( | |
| model_id: str, cache_dir: Optional[Path] = None | |
| ) -> Path: | |
| """ | |
| Downloads a model from the Comet ML Learning model registry. | |
| Args: | |
| model_id (str): The ID of the model to download, in the format "workspace/model_name:version". | |
| cache_dir (Optional[Path]): The directory to cache the downloaded model in. Defaults to the value of | |
| `constants.CACHE_DIR`. | |
| Returns: | |
| Path: The path to the downloaded model directory. | |
| """ | |
| if cache_dir is None: | |
| cache_dir = constants.CACHE_DIR | |
| output_folder = cache_dir / "models" / model_id | |
| already_downloaded = output_folder.exists() | |
| if not already_downloaded: | |
| workspace, model_id = model_id.split("/") | |
| model_name, version = model_id.split(":") | |
| api = API() | |
| model = api.get_model(workspace=workspace, model_name=model_name) | |
| model.download(version=version, output_folder=output_folder, expand=True) | |
| else: | |
| logger.info(f"Model {model_id=} already downloaded to: {output_folder}") | |
| subdirs = [d for d in output_folder.iterdir() if d.is_dir()] | |
| if len(subdirs) == 1: | |
| model_dir = subdirs[0] | |
| else: | |
| raise RuntimeError( | |
| f"There should be only one directory inside the model folder. \ | |
| Check the downloaded model at: {output_folder}" | |
| ) | |
| logger.info(f"Model {model_id=} downloaded from the registry to: {model_dir}") | |
| return model_dir | |
| class StopOnTokens(StoppingCriteria): | |
| """ | |
| A stopping criteria that stops generation when a specific token is generated. | |
| Args: | |
| stop_ids (List[int]): A list of token ids that will trigger the stopping criteria. | |
| """ | |
| def __init__(self, stop_ids: List[int]): | |
| super().__init__() | |
| self._stop_ids = stop_ids | |
| def __call__( | |
| self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs | |
| ) -> bool: | |
| """ | |
| Check if the last generated token is in the stop_ids list. | |
| Args: | |
| input_ids (torch.LongTensor): The input token ids. | |
| scores (torch.FloatTensor): The scores of the generated tokens. | |
| Returns: | |
| bool: True if the last generated token is in the stop_ids list, False otherwise. | |
| """ | |
| for stop_id in self._stop_ids: | |
| if input_ids[0][-1] == stop_id: | |
| return True | |
| return False | |
| def build_huggingface_pipeline( | |
| llm_model_id: str, | |
| llm_lora_model_id: str, | |
| max_new_tokens: int = constants.LLM_INFERNECE_MAX_NEW_TOKENS, | |
| temperature: float = constants.LLM_INFERENCE_TEMPERATURE, | |
| gradient_checkpointing: bool = False, | |
| use_streamer: bool = False, | |
| cache_dir: Optional[Path] = None, | |
| debug: bool = False, | |
| ) -> Tuple[HuggingFacePipeline, Optional[TextIteratorStreamer]]: | |
| """ | |
| Builds a HuggingFace pipeline for text generation using a custom LLM + Finetuned checkpoint. | |
| Args: | |
| llm_model_id (str): The ID or path of the LLM model. | |
| llm_lora_model_id (str): The ID or path of the LLM LoRA model. | |
| max_new_tokens (int, optional): The maximum number of new tokens to generate. Defaults to 128. | |
| temperature (float, optional): The temperature to use for sampling. Defaults to 0.7. | |
| gradient_checkpointing (bool, optional): Whether to use gradient checkpointing. Defaults to False. | |
| use_streamer (bool, optional): Whether to use a text iterator streamer. Defaults to False. | |
| cache_dir (Optional[Path], optional): The directory to use for caching. Defaults to None. | |
| debug (bool, optional): Whether to use a mocked pipeline for debugging. Defaults to False. | |
| Returns: | |
| Tuple[HuggingFacePipeline, Optional[TextIteratorStreamer]]: A tuple containing the HuggingFace pipeline | |
| and the text iterator streamer (if used). | |
| """ | |
| if debug is True: | |
| return ( | |
| HuggingFacePipeline( | |
| pipeline=MockedPipeline(f=lambda _: "You are doing great!") | |
| ), | |
| None, | |
| ) | |
| model, tokenizer, _ = build_qlora_model( | |
| pretrained_model_name_or_path=llm_model_id, | |
| peft_pretrained_model_name_or_path=llm_lora_model_id, | |
| gradient_checkpointing=gradient_checkpointing, | |
| cache_dir=cache_dir, | |
| ) | |
| model.eval() | |
| if use_streamer: | |
| streamer = TextIteratorStreamer( | |
| tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True | |
| ) | |
| stop_on_tokens = StopOnTokens(stop_ids=[tokenizer.eos_token_id]) | |
| stopping_criteria = StoppingCriteriaList([stop_on_tokens]) | |
| else: | |
| streamer = None | |
| stopping_criteria = StoppingCriteriaList([]) | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| max_new_tokens=max_new_tokens, | |
| temperature=temperature, | |
| streamer=streamer, | |
| stopping_criteria=stopping_criteria, | |
| ) | |
| hf = HuggingFacePipeline(pipeline=pipe) | |
| return hf, streamer | |
| def build_qlora_model( | |
| pretrained_model_name_or_path: str = "tiiuae/falcon-7b-instruct", | |
| peft_pretrained_model_name_or_path: Optional[str] = None, | |
| gradient_checkpointing: bool = True, | |
| cache_dir: Optional[Path] = None, | |
| ) -> Tuple[AutoModelForCausalLM, AutoTokenizer, PeftConfig]: | |
| """ | |
| Function that builds a QLoRA LLM model based on the given HuggingFace name: | |
| 1. Create and prepare the bitsandbytes configuration for QLoRa's quantization | |
| 2. Download, load, and quantize on-the-fly Falcon-7b | |
| 3. Create and prepare the LoRa configuration | |
| 4. Load and configuration Falcon-7B's tokenizer | |
| Args: | |
| pretrained_model_name_or_path (str): The name or path of the pretrained model to use. | |
| peft_pretrained_model_name_or_path (Optional[str]): The name or path of the PEFT pretrained model to use. | |
| gradient_checkpointing (bool): Whether to use gradient checkpointing or not. | |
| cache_dir (Optional[Path]): The directory to cache the downloaded models. | |
| Returns: | |
| Tuple[AutoModelForCausalLM, AutoTokenizer, PeftConfig]: | |
| A tuple containing the QLoRA LLM model, tokenizer, and PEFT config. | |
| """ | |
| bnb_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_compute_dtype=torch.bfloat16, | |
| ) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| pretrained_model_name_or_path, | |
| revision="main", | |
| quantization_config=bnb_config, | |
| load_in_4bit=True, | |
| device_map="auto", | |
| trust_remote_code=False, | |
| cache_dir=str(cache_dir) if cache_dir else None, | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| pretrained_model_name_or_path, | |
| trust_remote_code=False, | |
| truncation=True, | |
| cache_dir=str(cache_dir) if cache_dir else None, | |
| ) | |
| if tokenizer.pad_token_id is None: | |
| tokenizer.add_special_tokens({"pad_token": "<|pad|>"}) | |
| with torch.no_grad(): | |
| model.resize_token_embeddings(len(tokenizer)) | |
| model.config.pad_token_id = tokenizer.pad_token_id | |
| if peft_pretrained_model_name_or_path: | |
| is_model_name = not os.path.isdir(peft_pretrained_model_name_or_path) | |
| if is_model_name: | |
| logger.info( | |
| f"Downloading {peft_pretrained_model_name_or_path} from CometML Model Registry:" | |
| ) | |
| peft_pretrained_model_name_or_path = download_from_model_registry( | |
| model_id=peft_pretrained_model_name_or_path, | |
| cache_dir=cache_dir, | |
| ) | |
| logger.info(f"Loading Lora Confing from: {peft_pretrained_model_name_or_path}") | |
| lora_config = LoraConfig.from_pretrained(peft_pretrained_model_name_or_path) | |
| assert ( | |
| lora_config.base_model_name_or_path == pretrained_model_name_or_path | |
| ), f"Lora Model trained on different base model than the one requested: \ | |
| {lora_config.base_model_name_or_path} != {pretrained_model_name_or_path}" | |
| logger.info(f"Loading Peft Model from: {peft_pretrained_model_name_or_path}") | |
| model = PeftModel.from_pretrained(model, peft_pretrained_model_name_or_path) | |
| else: | |
| lora_config = LoraConfig( | |
| lora_alpha=16, | |
| lora_dropout=0.1, | |
| r=64, | |
| bias="none", | |
| task_type="CAUSAL_LM", | |
| target_modules=["query_key_value"], | |
| ) | |
| if gradient_checkpointing: | |
| model.gradient_checkpointing_enable() | |
| model.config.use_cache = ( | |
| False # Gradient checkpointing is not compatible with caching. | |
| ) | |
| else: | |
| model.gradient_checkpointing_disable() | |
| model.config.use_cache = True # It is good practice to enable caching when using the model for inference. | |
| return model, tokenizer, lora_config | |