Spaces:
Runtime error
Runtime error
compositional_test
/
transformers
/examples
/research_projects
/zero-shot-distillation
/distill_classifier.py
| import logging | |
| import os | |
| import sys | |
| from dataclasses import dataclass, field | |
| from typing import List, Optional | |
| import torch | |
| from datasets import Dataset | |
| from torch import nn | |
| from tqdm.auto import tqdm | |
| from transformers import ( | |
| AutoModelForSequenceClassification, | |
| AutoTokenizer, | |
| HfArgumentParser, | |
| Trainer, | |
| TrainingArguments, | |
| set_seed, | |
| utils, | |
| ) | |
| from transformers.trainer_utils import get_last_checkpoint, is_main_process | |
| DESCRIPTION = """ | |
| Distills an NLI-based zero-shot classifier to a smaller, more efficient model with a fixed set of candidate class | |
| names. Useful for speeding up zero-shot classification in cases where labeled training data is not available, but | |
| when only a single fixed set of classes is needed. Takes a teacher NLI model, student classifier model, unlabeled | |
| dataset, and set of K possible class names. Yields a single classifier with K outputs corresponding to the provided | |
| class names. | |
| """ | |
| logger = logging.getLogger(__name__) | |
| class TeacherModelArguments: | |
| teacher_name_or_path: Optional[str] = field( | |
| default="roberta-large-mnli", metadata={"help": "The NLI/zero-shot teacher model to be distilled."} | |
| ) | |
| hypothesis_template: Optional[str] = field( | |
| default="This example is {}.", | |
| metadata={ | |
| "help": ( | |
| "Template used to turn class names into mock hypotheses for teacher NLI model. Must include {{}}" | |
| "where class name is inserted." | |
| ) | |
| }, | |
| ) | |
| teacher_batch_size: Optional[int] = field( | |
| default=32, metadata={"help": "Batch size for generating teacher predictions."} | |
| ) | |
| multi_label: Optional[bool] = field( | |
| default=False, | |
| metadata={ | |
| "help": ( | |
| "Allow multiple classes to be true rather than forcing them to sum to 1 (sometimes called" | |
| "multi-class multi-label classification)." | |
| ) | |
| }, | |
| ) | |
| temperature: Optional[float] = field( | |
| default=1.0, metadata={"help": "Temperature applied to teacher softmax for distillation."} | |
| ) | |
| class StudentModelArguments: | |
| student_name_or_path: Optional[str] = field( | |
| default="distilbert-base-uncased", metadata={"help": "The NLI/zero-shot teacher model to be distilled."} | |
| ) | |
| class DataTrainingArguments: | |
| data_file: str = field(metadata={"help": "Text file with one unlabeled instance per line."}) | |
| class_names_file: str = field(metadata={"help": "Text file with one class name per line."}) | |
| use_fast_tokenizer: bool = field( | |
| default=True, | |
| metadata={"help": "Whether to use one of the fast tokenizer (backed by the Rust tokenizers library) or not."}, | |
| ) | |
| class DistillTrainingArguments(TrainingArguments): | |
| output_dir: Optional[str] = field( | |
| default=None, | |
| metadata={"help": "The output directory where the model predictions and checkpoints will be written."}, | |
| ) | |
| per_device_train_batch_size: int = field( | |
| default=32, metadata={"help": "Batch size per GPU/TPU core/CPU for training."} | |
| ) | |
| per_device_eval_batch_size: int = field( | |
| default=128, metadata={"help": "Batch size per GPU/TPU core/CPU for evaluation."} | |
| ) | |
| num_train_epochs: float = field(default=1.0, metadata={"help": "Total number of training epochs to perform."}) | |
| do_train: bool = field(default=True, metadata={"help": "Whether to run training of student model."}) | |
| do_eval: bool = field( | |
| default=True, | |
| metadata={ | |
| "help": ( | |
| "Whether to evaluate the agreement of the final student predictions and the teacher predictions" | |
| "after training." | |
| ) | |
| }, | |
| ) | |
| save_total_limit: Optional[int] = field( | |
| default=0, | |
| metadata={ | |
| "help": ( | |
| "Limit the total amount of checkpoints." | |
| "Deletes the older checkpoints in the output_dir. Default is 0 (no checkpoints)." | |
| ) | |
| }, | |
| ) | |
| class DistillationTrainer(Trainer): | |
| def compute_loss(self, model, inputs, return_outputs=False): | |
| target_p = inputs["labels"] | |
| outputs = model(inputs["input_ids"], attention_mask=inputs["attention_mask"]) | |
| logits = outputs[0] | |
| loss = -torch.sum(target_p * logits.log_softmax(dim=-1), axis=-1).mean() | |
| if return_outputs: | |
| return loss, outputs | |
| return loss | |
| def read_lines(path): | |
| lines = [] | |
| with open(path, "r") as f: | |
| for line in f: | |
| line = line.strip() | |
| if len(line) > 0: | |
| lines.append(line) | |
| return lines | |
| def get_premise_hypothesis_pairs(examples, class_names, hypothesis_template): | |
| premises = [] | |
| hypotheses = [] | |
| for example in examples: | |
| for name in class_names: | |
| premises.append(example) | |
| hypotheses.append(hypothesis_template.format(name)) | |
| return premises, hypotheses | |
| def get_entailment_id(config): | |
| for label, ind in config.label2id.items(): | |
| if label.lower().startswith("entail"): | |
| return ind | |
| logger.warning("Could not identify entailment dimension from teacher config label2id. Setting to -1.") | |
| return -1 | |
| def get_teacher_predictions( | |
| model_path: str, | |
| examples: List[str], | |
| class_names: List[str], | |
| hypothesis_template: str, | |
| batch_size: int, | |
| temperature: float, | |
| multi_label: bool, | |
| use_fast_tokenizer: bool, | |
| no_cuda: bool, | |
| fp16: bool, | |
| ): | |
| """ | |
| Gets predictions by the same method as the zero-shot pipeline but with DataParallel & more efficient batching | |
| """ | |
| model = AutoModelForSequenceClassification.from_pretrained(model_path) | |
| model_config = model.config | |
| if not no_cuda and torch.cuda.is_available(): | |
| model = nn.DataParallel(model.cuda()) | |
| batch_size *= len(model.device_ids) | |
| tokenizer = AutoTokenizer.from_pretrained(model_path, use_fast=use_fast_tokenizer) | |
| premises, hypotheses = get_premise_hypothesis_pairs(examples, class_names, hypothesis_template) | |
| logits = [] | |
| for i in tqdm(range(0, len(premises), batch_size)): | |
| batch_premises = premises[i : i + batch_size] | |
| batch_hypotheses = hypotheses[i : i + batch_size] | |
| encodings = tokenizer( | |
| batch_premises, | |
| batch_hypotheses, | |
| padding=True, | |
| truncation="only_first", | |
| return_tensors="pt", | |
| ) | |
| with torch.cuda.amp.autocast(enabled=fp16): | |
| with torch.no_grad(): | |
| outputs = model(**encodings) | |
| logits.append(outputs.logits.detach().cpu().float()) | |
| entail_id = get_entailment_id(model_config) | |
| contr_id = -1 if entail_id == 0 else 0 | |
| logits = torch.cat(logits, dim=0) # N*K x 3 | |
| nli_logits = logits.reshape(len(examples), len(class_names), -1)[..., [contr_id, entail_id]] # N x K x 2 | |
| if multi_label: | |
| # softmax over (contr, entail) logits for each class independently | |
| nli_prob = (nli_logits / temperature).softmax(-1) | |
| else: | |
| # softmax over entail logits across classes s.t. class probabilities sum to 1. | |
| nli_prob = (nli_logits / temperature).softmax(1) | |
| return nli_prob[..., 1] # N x K | |
| def main(): | |
| parser = HfArgumentParser( | |
| (DataTrainingArguments, TeacherModelArguments, StudentModelArguments, DistillTrainingArguments), | |
| description=DESCRIPTION, | |
| ) | |
| if len(sys.argv) == 2 and sys.argv[1].endswith(".json"): | |
| # If we pass only one argument to the script and it's the path to a json file, | |
| # let's parse it to get our arguments. | |
| data_args, teacher_args, student_args, training_args = parser.parse_json_file( | |
| json_file=os.path.abspath(sys.argv[1]) | |
| ) | |
| else: | |
| data_args, teacher_args, student_args, training_args = parser.parse_args_into_dataclasses() | |
| # Detecting last checkpoint. | |
| last_checkpoint = None | |
| if os.path.isdir(training_args.output_dir) and training_args.do_train and not training_args.overwrite_output_dir: | |
| last_checkpoint = get_last_checkpoint(training_args.output_dir) | |
| if last_checkpoint is None and len(os.listdir(training_args.output_dir)) > 0: | |
| raise ValueError( | |
| f"Output directory ({training_args.output_dir}) already exists and is not empty. " | |
| "Use --overwrite_output_dir to overcome." | |
| ) | |
| elif last_checkpoint is not None: | |
| logger.info( | |
| f"Checkpoint detected, resuming training at {last_checkpoint}. To avoid this behavior, change " | |
| "the `--output_dir` or add `--overwrite_output_dir` to train from scratch." | |
| ) | |
| # Setup logging | |
| logging.basicConfig( | |
| format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", | |
| datefmt="%m/%d/%Y %H:%M:%S", | |
| handlers=[logging.StreamHandler(sys.stdout)], | |
| ) | |
| logger.setLevel(logging.INFO if is_main_process(training_args.local_rank) else logging.WARN) | |
| # Log on each process the small summary: | |
| logger.warning( | |
| f"Process rank: {training_args.local_rank}, device: {training_args.device}, n_gpu: {training_args.n_gpu}" | |
| + f"distributed training: {bool(training_args.local_rank != -1)}, 16-bits training: {training_args.fp16}" | |
| ) | |
| # Set the verbosity to info of the Transformers logger (on main process only): | |
| if is_main_process(training_args.local_rank): | |
| utils.logging.set_verbosity_info() | |
| utils.logging.enable_default_handler() | |
| utils.logging.enable_explicit_format() | |
| if training_args.local_rank != -1: | |
| raise ValueError("Distributed training is not currently supported.") | |
| if training_args.tpu_num_cores is not None: | |
| raise ValueError("TPU acceleration is not currently supported.") | |
| logger.info(f"Training/evaluation parameters {training_args}") | |
| # Set seed before initializing model. | |
| set_seed(training_args.seed) | |
| # 1. read in data | |
| examples = read_lines(data_args.data_file) | |
| class_names = read_lines(data_args.class_names_file) | |
| # 2. get teacher predictions and load into dataset | |
| logger.info("Generating predictions from zero-shot teacher model") | |
| teacher_soft_preds = get_teacher_predictions( | |
| teacher_args.teacher_name_or_path, | |
| examples, | |
| class_names, | |
| teacher_args.hypothesis_template, | |
| teacher_args.teacher_batch_size, | |
| teacher_args.temperature, | |
| teacher_args.multi_label, | |
| data_args.use_fast_tokenizer, | |
| training_args.no_cuda, | |
| training_args.fp16, | |
| ) | |
| dataset = Dataset.from_dict( | |
| { | |
| "text": examples, | |
| "labels": teacher_soft_preds, | |
| } | |
| ) | |
| # 3. create student | |
| logger.info("Initializing student model") | |
| model = AutoModelForSequenceClassification.from_pretrained( | |
| student_args.student_name_or_path, num_labels=len(class_names) | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained(student_args.student_name_or_path, use_fast=data_args.use_fast_tokenizer) | |
| model.config.id2label = dict(enumerate(class_names)) | |
| model.config.label2id = {label: i for i, label in enumerate(class_names)} | |
| # 4. train student on teacher predictions | |
| dataset = dataset.map(tokenizer, input_columns="text") | |
| dataset.set_format("torch") | |
| def compute_metrics(p, return_outputs=False): | |
| preds = p.predictions.argmax(-1) | |
| proxy_labels = p.label_ids.argmax(-1) # "label_ids" are actually distributions | |
| return {"agreement": (preds == proxy_labels).mean().item()} | |
| trainer = DistillationTrainer( | |
| model=model, | |
| tokenizer=tokenizer, | |
| args=training_args, | |
| train_dataset=dataset, | |
| compute_metrics=compute_metrics, | |
| ) | |
| if training_args.do_train: | |
| logger.info("Training student model on teacher predictions") | |
| trainer.train() | |
| if training_args.do_eval: | |
| agreement = trainer.evaluate(eval_dataset=dataset)["eval_agreement"] | |
| logger.info(f"Agreement of student and teacher predictions: {agreement * 100:0.2f}%") | |
| trainer.save_model() | |
| if __name__ == "__main__": | |
| main() | |