Spaces:
Running
Running
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from transformers.models.bert.modeling_bert import BertPreTrainedModel, BertModel | |
| from transformers.models.roberta.modeling_roberta import RobertaPreTrainedModel, RobertaModel | |
| from transformers.models.albert.modeling_albert import AlbertPreTrainedModel, AlbertModel | |
| from transformers.models.megatron_bert.modeling_megatron_bert import MegatronBertPreTrainedModel, MegatronBertModel | |
| from transformers.modeling_outputs import TokenClassifierOutput | |
| from torch.nn import CrossEntropyLoss | |
| from loss.focal_loss import FocalLoss | |
| from loss.label_smoothing import LabelSmoothingCrossEntropy | |
| from models.basic_modules.crf import CRF | |
| from tools.model_utils.parameter_freeze import ParameterFreeze | |
| from tools.runner_utils.log_util import logging | |
| logger = logging.getLogger(__name__) | |
| freezer = ParameterFreeze() | |
| """ | |
| BERT for token-level classification with softmax head. | |
| """ | |
| class BertSoftmaxForSequenceLabeling(BertPreTrainedModel): | |
| def __init__(self, config): | |
| super(BertSoftmaxForSequenceLabeling, self).__init__(config) | |
| self.num_labels = config.num_labels | |
| self.bert = BertModel(config) | |
| if self.config.use_freezing: | |
| self.bert = freezer.freeze_lm(self.bert) | |
| self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
| self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
| self.loss_type = config.loss_type | |
| self.init_weights() | |
| def forward( | |
| self, | |
| input_ids, | |
| attention_mask=None, | |
| token_type_ids=None, | |
| position_ids=None, | |
| head_mask=None, | |
| labels=None, | |
| return_dict=False, | |
| ): | |
| outputs = self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
| sequence_output = outputs[0] | |
| sequence_output = self.dropout(sequence_output) | |
| logits = self.classifier(sequence_output) | |
| outputs = (logits,) + outputs[2:] # add hidden states and attention if they are here | |
| if labels is not None: | |
| assert self.loss_type in ["lsr", "focal", "ce"] | |
| if self.loss_type == "lsr": | |
| loss_fct = LabelSmoothingCrossEntropy(ignore_index=0) | |
| elif self.loss_type == "focal": | |
| loss_fct = FocalLoss(ignore_index=0) | |
| else: | |
| loss_fct = CrossEntropyLoss(ignore_index=0) | |
| # Only keep active parts of the loss | |
| if attention_mask is not None: | |
| active_loss = attention_mask.view(-1) == 1 | |
| active_logits = logits.view(-1, self.num_labels)[active_loss] | |
| active_labels = labels.view(-1)[active_loss] | |
| loss = loss_fct(active_logits, active_labels) | |
| else: | |
| loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) | |
| if not return_dict: | |
| outputs = (loss,) + outputs | |
| return outputs # (loss), scores, (hidden_states), (attentions) | |
| return TokenClassifierOutput( | |
| loss=loss, | |
| logits=logits, | |
| hidden_states=outputs.hidden_states, | |
| attentions=outputs.attentions, | |
| ) | |
| """ | |
| RoBERTa for token-level classification with softmax head. | |
| """ | |
| class RobertaSoftmaxForSequenceLabeling(RobertaPreTrainedModel): | |
| def __init__(self, config): | |
| super(RobertaSoftmaxForSequenceLabeling, self).__init__(config) | |
| self.num_labels = config.num_labels | |
| self.roberta = RobertaModel(config) | |
| if self.config.use_freezing: | |
| self.roberta = freezer.freeze_lm(self.roberta) | |
| self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
| self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
| self.loss_type = config.loss_type | |
| self.init_weights() | |
| def forward( | |
| self, | |
| input_ids, | |
| attention_mask=None, | |
| token_type_ids=None, | |
| position_ids=None, | |
| head_mask=None, | |
| labels=None, | |
| return_dict=False, | |
| ): | |
| outputs = self.roberta(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
| sequence_output = outputs[0] | |
| sequence_output = self.dropout(sequence_output) | |
| logits = self.classifier(sequence_output) | |
| outputs = (logits,) + outputs[2:] # add hidden states and attention if they are here | |
| if labels is not None: | |
| assert self.loss_type in ["lsr", "focal", "ce"] | |
| if self.loss_type == "lsr": | |
| loss_fct = LabelSmoothingCrossEntropy(ignore_index=0) | |
| elif self.loss_type == "focal": | |
| loss_fct = FocalLoss(ignore_index=0) | |
| else: | |
| loss_fct = CrossEntropyLoss(ignore_index=0) | |
| # Only keep active parts of the loss | |
| if attention_mask is not None: | |
| active_loss = attention_mask.view(-1) == 1 | |
| active_logits = logits.view(-1, self.num_labels)[active_loss] | |
| active_labels = labels.view(-1)[active_loss] | |
| loss = loss_fct(active_logits, active_labels) | |
| else: | |
| loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) | |
| if not return_dict: | |
| outputs = (loss,) + outputs | |
| return outputs # (loss), scores, (hidden_states), (attentions) | |
| return TokenClassifierOutput( | |
| loss=loss, | |
| logits=logits, | |
| hidden_states=outputs.hidden_states, | |
| attentions=outputs.attentions, | |
| ) | |
| """ | |
| ALBERT for token-level classification with softmax head. | |
| """ | |
| class AlbertSoftmaxForSequenceLabeling(AlbertPreTrainedModel): | |
| def __init__(self, config): | |
| super(AlbertSoftmaxForSequenceLabeling, self).__init__(config) | |
| self.num_labels = config.num_labels | |
| self.loss_type = config.loss_type | |
| self.bert = AlbertModel(config) | |
| if self.config.use_freezing: | |
| self.bert = freezer.freeze_lm(self.bert) | |
| self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
| self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
| self.init_weights() | |
| def forward( | |
| self, | |
| input_ids, | |
| attention_mask=None, | |
| token_type_ids=None, | |
| position_ids=None, | |
| head_mask=None, | |
| labels=None, | |
| return_dict=False, | |
| ): | |
| outputs = self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids, | |
| position_ids=position_ids,head_mask=head_mask) | |
| sequence_output = outputs[0] | |
| sequence_output = self.dropout(sequence_output) | |
| logits = self.classifier(sequence_output) | |
| outputs = (logits,) + outputs[2:] # add hidden states and attention if they are here | |
| if labels is not None: | |
| assert self.loss_type in ["lsr", "focal", "ce"] | |
| if self.loss_type =="lsr": | |
| loss_fct = LabelSmoothingCrossEntropy(ignore_index=0) | |
| elif self.loss_type == "focal": | |
| loss_fct = FocalLoss(ignore_index=0) | |
| else: | |
| loss_fct = CrossEntropyLoss(ignore_index=0) | |
| # Only keep active parts of the loss | |
| if attention_mask is not None: | |
| active_loss = attention_mask.view(-1) == 1 | |
| active_logits = logits.view(-1, self.num_labels)[active_loss] | |
| active_labels = labels.view(-1)[active_loss] | |
| loss = loss_fct(active_logits, active_labels) | |
| else: | |
| loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) | |
| if not return_dict: | |
| outputs = (loss,) + outputs | |
| return outputs # (loss), scores, (hidden_states), (attentions) | |
| return TokenClassifierOutput( | |
| loss=loss, | |
| logits=logits, | |
| hidden_states=outputs.hidden_states, | |
| attentions=outputs.attentions, | |
| ) | |
| """ | |
| MegatronBERT for token-level classification with softmax head. | |
| """ | |
| class MegatronBertSoftmaxForSequenceLabeling(MegatronBertPreTrainedModel): | |
| def __init__(self, config): | |
| super(MegatronBertSoftmaxForSequenceLabeling, self).__init__(config) | |
| self.num_labels = config.num_labels | |
| self.bert = MegatronBertModel(config) | |
| if self.config.use_freezing: | |
| self.bert = freezer.freeze_lm(self.bert) | |
| self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
| self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
| self.loss_type = config.loss_type | |
| self.init_weights() | |
| def forward( | |
| self, | |
| input_ids, | |
| attention_mask=None, | |
| token_type_ids=None, | |
| position_ids=None, | |
| head_mask=None, | |
| labels=None, | |
| return_dict=False, | |
| ): | |
| outputs = self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
| sequence_output = outputs[0] | |
| sequence_output = self.dropout(sequence_output) | |
| logits = self.classifier(sequence_output) | |
| outputs = (logits,) + outputs[2:] # add hidden states and attention if they are here | |
| if labels is not None: | |
| assert self.loss_type in ["lsr", "focal", "ce"] | |
| if self.loss_type == "lsr": | |
| loss_fct = LabelSmoothingCrossEntropy(ignore_index=0) | |
| elif self.loss_type == "focal": | |
| loss_fct = FocalLoss(ignore_index=0) | |
| else: | |
| loss_fct = CrossEntropyLoss(ignore_index=0) | |
| # Only keep active parts of the loss | |
| if attention_mask is not None: | |
| active_loss = attention_mask.view(-1) == 1 | |
| active_logits = logits.view(-1, self.num_labels)[active_loss] | |
| active_labels = labels.view(-1)[active_loss] | |
| loss = loss_fct(active_logits, active_labels) | |
| else: | |
| loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) | |
| if not return_dict: | |
| outputs = (loss,) + outputs | |
| return outputs # (loss), scores, (hidden_states), (attentions) | |
| return TokenClassifierOutput( | |
| loss=loss, | |
| logits=logits, | |
| hidden_states=outputs.hidden_states, | |
| attentions=outputs.attentions, | |
| ) | |
| """ | |
| BERT for token-level classification with CRF head. | |
| """ | |
| class BertCrfForSequenceLabeling(BertPreTrainedModel): | |
| def __init__(self, config): | |
| super(BertCrfForSequenceLabeling, self).__init__(config) | |
| self.bert = BertModel(config) | |
| if self.config.use_freezing: | |
| self.bert = freezer.freeze_lm(self.bert) | |
| self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
| self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
| self.crf = CRF(num_tags=config.num_labels, batch_first=True) | |
| self.init_weights() | |
| def forward( | |
| self, | |
| input_ids, | |
| attention_mask=None, | |
| token_type_ids=None, | |
| position_ids=None, | |
| head_mask=None, | |
| labels=None, | |
| return_dict=False, | |
| ): | |
| outputs =self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
| sequence_output = outputs[0] | |
| sequence_output = self.dropout(sequence_output) | |
| logits = self.classifier(sequence_output) | |
| outputs = (logits,) | |
| if labels is not None: | |
| loss = self.crf(emissions = logits, tags=labels, mask=attention_mask) | |
| outputs =(-1*loss,)+outputs | |
| if not return_dict: | |
| return outputs # (loss), scores, (hidden_states), (attentions) | |
| return TokenClassifierOutput( | |
| loss=loss, | |
| logits=logits, | |
| hidden_states=outputs.hidden_states, | |
| attentions=outputs.attentions, | |
| ) | |
| """ | |
| RoBERTa for token-level classification with CRF head. | |
| """ | |
| class RobertaCrfForSequenceLabeling(RobertaPreTrainedModel): | |
| def __init__(self, config): | |
| super(RobertaCrfForSequenceLabeling, self).__init__(config) | |
| self.roberta = RobertaModel(config) | |
| if self.config.use_freezing: | |
| self.roberta = freezer.freeze_lm(self.roberta) | |
| self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
| self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
| self.crf = CRF(num_tags=config.num_labels, batch_first=True) | |
| self.init_weights() | |
| def forward( | |
| self, | |
| input_ids, | |
| attention_mask=None, | |
| token_type_ids=None, | |
| position_ids=None, | |
| head_mask=None, | |
| labels=None, | |
| return_dict=False, | |
| ): | |
| outputs =self.roberta(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
| sequence_output = outputs[0] | |
| sequence_output = self.dropout(sequence_output) | |
| logits = self.classifier(sequence_output) | |
| outputs = (logits,) | |
| if labels is not None: | |
| loss = self.crf(emissions = logits, tags=labels, mask=attention_mask) | |
| outputs =(-1*loss,)+outputs | |
| if not return_dict: | |
| return outputs # (loss), scores, (hidden_states), (attentions) | |
| return TokenClassifierOutput( | |
| loss=loss, | |
| logits=logits, | |
| hidden_states=outputs.hidden_states, | |
| attentions=outputs.attentions, | |
| ) | |
| """ | |
| ALBERT for token-level classification with CRF head. | |
| """ | |
| class AlbertCrfForSequenceLabeling(AlbertPreTrainedModel): | |
| def __init__(self, config): | |
| super(AlbertCrfForSequenceLabeling, self).__init__(config) | |
| self.bert = AlbertModel(config) | |
| if self.config.use_freezing: | |
| self.bert = freezer.freeze_lm(self.bert) | |
| self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
| self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
| self.crf = CRF(num_tags=config.num_labels, batch_first=True) | |
| self.init_weights() | |
| def forward( | |
| self, | |
| input_ids, | |
| attention_mask=None, | |
| token_type_ids=None, | |
| position_ids=None, | |
| head_mask=None, | |
| labels=None, | |
| return_dict=False, | |
| ): | |
| outputs = self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
| sequence_output = outputs[0] | |
| sequence_output = self.dropout(sequence_output) | |
| logits = self.classifier(sequence_output) | |
| outputs = (logits,) | |
| if labels is not None: | |
| loss = self.crf(emissions = logits, tags=labels, mask=attention_mask) | |
| outputs =(-1*loss,)+outputs | |
| if not return_dict: | |
| return outputs # (loss), scores, (hidden_states), (attentions) | |
| return TokenClassifierOutput( | |
| loss=loss, | |
| logits=logits, | |
| hidden_states=outputs.hidden_states, | |
| attentions=outputs.attentions, | |
| ) | |
| """ | |
| MegatronBERT for token-level classification with CRF head. | |
| """ | |
| class MegatronBertCrfForSequenceLabeling(MegatronBertPreTrainedModel): | |
| def __init__(self, config): | |
| super(MegatronBertCrfForSequenceLabeling, self).__init__(config) | |
| self.bert = MegatronBertModel(config) | |
| if self.config.use_freezing: | |
| self.bert = freezer.freeze_lm(self.bert) | |
| self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
| self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
| self.crf = CRF(num_tags=config.num_labels, batch_first=True) | |
| self.init_weights() | |
| def forward( | |
| self, | |
| input_ids, | |
| attention_mask=None, | |
| token_type_ids=None, | |
| position_ids=None, | |
| head_mask=None, | |
| labels=None, | |
| return_dict=False, | |
| ): | |
| outputs =self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
| sequence_output = outputs[0] | |
| sequence_output = self.dropout(sequence_output) | |
| logits = self.classifier(sequence_output) | |
| outputs = (logits,) | |
| if labels is not None: | |
| loss = self.crf(emissions = logits, tags=labels, mask=attention_mask) | |
| outputs =(-1*loss,)+outputs | |
| if not return_dict: | |
| return outputs # (loss), scores, (hidden_states), (attentions) | |
| return TokenClassifierOutput( | |
| loss=loss, | |
| logits=logits, | |
| hidden_states=outputs.hidden_states, | |
| attentions=outputs.attentions, | |
| ) | |