Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| # @Time : 2021/8/19 10:54 上午 | |
| # @Author : JianingWang | |
| # @File : classification.py | |
| import torch | |
| from torch import nn | |
| from torch.nn import CrossEntropyLoss, MSELoss, BCEWithLogitsLoss | |
| from transformers import RobertaModel | |
| from transformers.activations import ACT2FN | |
| from transformers.models.electra import ElectraModel | |
| from transformers.models.roformer import RoFormerModel | |
| from transformers.models.albert import AlbertModel | |
| from transformers.models.bert import BertModel, BertPreTrainedModel | |
| from transformers.models.deberta_v2 import DebertaV2Model, DebertaV2PreTrainedModel | |
| from transformers.modeling_outputs import SequenceClassifierOutput | |
| from transformers.models.roberta import RobertaPreTrainedModel | |
| from transformers.models.bert.modeling_bert import BertForSequenceClassification | |
| from transformers.models.megatron_bert import MegatronBertPreTrainedModel, MegatronBertModel | |
| PRETRAINED_MODEL_MAP = { | |
| "bert": BertPreTrainedModel, | |
| "deberta-v2": DebertaV2PreTrainedModel, | |
| "roberta": RobertaPreTrainedModel, | |
| "erlangshen": MegatronBertPreTrainedModel | |
| } | |
| class BertPooler(nn.Module): | |
| def __init__(self, hidden_size, hidden_act, hidden_dropout_prob): | |
| super().__init__() | |
| self.dense = nn.Linear(hidden_size, hidden_size) | |
| # self.activation = nn.Tanh() | |
| self.activation = ACT2FN[hidden_act] | |
| # self.dropout = nn.Dropout(hidden_dropout_prob) | |
| def forward(self, features): | |
| x = features[:, 0, :] # take <s> token (equiv. to [CLS]) | |
| # x = self.dropout(x) | |
| x = self.dense(x) | |
| x = self.activation(x) | |
| return x | |
| def build_cls_model(config): | |
| BaseClass = PRETRAINED_MODEL_MAP[config.model_type] | |
| class BertForClassification(BaseClass): | |
| def __init__(self, config): | |
| super().__init__(config) | |
| self.num_labels = config.num_labels | |
| self.config = config | |
| self.model_type = config.model_type | |
| self.problem_type = config.problem_type | |
| if self.model_type == "bert": | |
| self.bert = BertModel(config) | |
| elif self.model_type == "albert": | |
| self.albert = AlbertModel(config) | |
| # elif self.model_type == "chinesebert": | |
| # self.bert = ChineseBertModel(config) | |
| elif self.model_type == "roformer": | |
| self.roformer = RoFormerModel(config) | |
| elif self.model_type == "electra": | |
| self.electra = ElectraModel(config) | |
| elif self.model_type == "deberta-v2": | |
| self.deberta = DebertaV2Model(config) | |
| elif self.model_type == "roberta": | |
| self.roberta = RobertaModel(config) | |
| elif self.model_type == "erlangshen": | |
| self.bert = MegatronBertModel(config) | |
| self.pooler = BertPooler(config.hidden_size, config.hidden_act, config.hidden_dropout_prob) | |
| if hasattr(config, "cls_dropout_rate"): | |
| cls_dropout_rate = config.cls_dropout_rate | |
| else: | |
| cls_dropout_rate = config.hidden_dropout_prob | |
| self.dropout = nn.Dropout(cls_dropout_rate) | |
| add_feature_dims = config.additional_feature_dims if hasattr(config, "additional_feature_dims") else 0 | |
| # self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) | |
| cls_hidden = config.hidden_size + add_feature_dims | |
| if hasattr(config, "is_relation_task"): | |
| cls_hidden = config.hidden_size * 2 | |
| self.classifier = nn.Linear(cls_hidden, config.num_labels) | |
| self.init_weights() | |
| def forward( | |
| self, | |
| input_ids=None, | |
| attention_mask=None, | |
| token_type_ids=None, | |
| position_ids=None, | |
| head_mask=None, | |
| inputs_embeds=None, | |
| labels=None, | |
| output_attentions=None, | |
| output_hidden_states=None, | |
| return_dict=None, | |
| pseudo_label=None, | |
| pinyin_ids=None, | |
| additional_features=None | |
| ): | |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
| logits, outputs = None, None | |
| inputs = {"input_ids": input_ids, "attention_mask": attention_mask, "token_type_ids": token_type_ids, "position_ids": position_ids, | |
| "head_mask": head_mask, "inputs_embeds": inputs_embeds, "output_attentions": output_attentions, | |
| "output_hidden_states": output_hidden_states, "return_dict": return_dict, "pinyin_ids": pinyin_ids} | |
| inputs = {k: v for k, v in inputs.items() if v is not None} | |
| if self.model_type == "chinesebert": | |
| outputs = self.bert(**inputs) | |
| elif self.model_type == "bert": | |
| outputs = self.bert(**inputs) | |
| elif self.model_type == "albert": | |
| outputs = self.albert(**inputs) | |
| elif self.model_type == "electra": | |
| outputs = self.electra(**inputs) | |
| elif self.model_type == "roformer": | |
| outputs = self.roformer(**inputs) | |
| elif self.model_type == "deberta-v2": | |
| outputs = self.deberta(**inputs) | |
| elif self.model_type == "roberta": | |
| outputs = self.roberta(**inputs) | |
| elif self.model_type == "erlangshen": | |
| outputs = self.bert(**inputs) | |
| if hasattr(self.config, "is_relation_task"): | |
| w = torch.logical_and(input_ids >= min(self.config.start_token_ids), input_ids <= max(self.config.start_token_ids)) | |
| start_index = w.nonzero()[:, 1].view(-1, 2) | |
| # <start_entity> + <end_entity> 进分类 | |
| pooler_output = torch.cat([torch.cat([x[y[0], :], x[y[1], :]]).unsqueeze(0) for x, y in zip(outputs.last_hidden_state, start_index)]) | |
| # [CLS] + <start_entity> + <end_entity> 进分类 | |
| # pooler_output = torch.cat([torch.cat([z, x[y[0], :], x[y[1], :]]).unsqueeze(0) for x, y, z in zip(outputs.last_hidden_state, start_index, outputs.last_hidden_state[:, 0])]) | |
| elif "pooler_output" in outputs: | |
| pooler_output = outputs.pooler_output | |
| else: | |
| pooler_output = self.pooler(outputs[0]) | |
| pooler_output = self.dropout(pooler_output) | |
| # pooler_output = self.LayerNorm(pooler_output) | |
| if additional_features is not None: | |
| pooler_output = torch.cat((pooler_output, additional_features), dim=1) | |
| logits = self.classifier(pooler_output) | |
| loss = None | |
| if labels is not None: | |
| if self.problem_type == "regression": | |
| loss_fct = MSELoss() | |
| if self.num_labels == 1: | |
| loss = loss_fct(logits.squeeze(), labels.squeeze()) | |
| else: | |
| loss = loss_fct(logits, labels) | |
| elif self.problem_type == "multi_label_classification": | |
| loss_fct = BCEWithLogitsLoss() | |
| loss = loss_fct(logits.view(-1, self.num_labels), labels.float().view(-1, self.num_labels)) | |
| # elif self.problem_type in ["single_label_classification"] or hasattr(self.config, "is_relation_task"): | |
| else: | |
| # loss_fct = FocalLoss() | |
| loss_fct = CrossEntropyLoss() | |
| # 伪标签 | |
| if pseudo_label is not None: | |
| train_logits, pseudo_logits = logits[pseudo_label > 0.9], logits[pseudo_label < 0.1] | |
| train_labels, pseudo_labels = labels[pseudo_label > 0.9], labels[pseudo_label < 0.1] | |
| train_loss = loss_fct(train_logits.view(-1, self.num_labels), train_labels.view(-1)) if train_labels.nelement() else 0 | |
| pseudo_loss = loss_fct(pseudo_logits.view(-1, self.num_labels), pseudo_labels.view(-1)) if pseudo_labels.nelement() else 0 | |
| loss = 0.9 * train_loss + 0.1 * pseudo_loss | |
| else: | |
| loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) | |
| return SequenceClassifierOutput( | |
| loss=loss, | |
| logits=logits, | |
| hidden_states=outputs.hidden_states, | |
| attentions=outputs.attentions, | |
| ) | |
| return BertForClassification | |