Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import torch | |
| import torch.nn.functional as F | |
| from torch.nn.utils.rnn import pad_sequence | |
| from sentence_transformers import SentenceTransformer, CrossEncoder | |
| from sentence_transformers.util import pytorch_cos_sim | |
| from transformers import AutoTokenizer, AutoModelForMaskedLM, AutoModelForCausalLM | |
| from nltk import word_tokenize | |
| from collections import defaultdict | |
| from pprint import pprint | |
| from collections import Counter | |
| from rouge_score import rouge_scorer | |
| ROUGE_TYPES = ["rouge1", "rouge2", "rougeL"] | |
| rouge_scorer = rouge_scorer.RougeScorer( | |
| ROUGE_TYPES, | |
| use_stemmer=True | |
| ) | |
| def load_rewards(args): | |
| rewards, names = [], [] | |
| for name, settings in args.rewards.items(): | |
| settings["device"] = args.device | |
| print("Loading reward:", name) | |
| pprint(settings) | |
| print() | |
| reward_cls = globals()[name] | |
| reward_func = reward_cls(**settings) | |
| rewards.append(reward_func) | |
| names.append(name) | |
| return RewardAggregator(rewards, names) | |
| class RewardAggregator: | |
| def __init__(self, reward_generators, reward_names): | |
| self.reward_generators = reward_generators | |
| self.reward_names = reward_names | |
| self.weights = [rg.weight for rg in reward_generators] | |
| self.n_rewards = len(reward_generators) | |
| def __call__(self, sources, summaries): | |
| name_to_scores = {} | |
| for rg, name in zip(self.reward_generators, self.reward_names): | |
| scores = rg(sources=sources, summaries=summaries) | |
| name_to_scores[name] = scores | |
| final_scores = [] | |
| for i in range(len(summaries)): | |
| score = 0. | |
| total_weights = 0. | |
| for name, w in zip(self.reward_names, self.weights): | |
| score += name_to_scores[name][i] * w | |
| total_weights += w | |
| score /= total_weights | |
| final_scores.append(score) | |
| return final_scores, name_to_scores | |
| class Fluency: | |
| def __init__( | |
| self, | |
| model_id="distilroberta", | |
| weight=1, | |
| type="masked", | |
| device="cuda", | |
| norm="max", | |
| max_score=40., | |
| min_score=-30., | |
| ): | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| if type == "masked": | |
| pad_token_id = tokenizer.pad_token_id | |
| model = AutoModelForMaskedLM.from_pretrained(model_id).to(device) | |
| else: | |
| pad_token_id = tokenizer.eos_token_id | |
| model = AutoModelForCausalLM.from_pretrained(model_id).to(device) | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| self.weight = weight | |
| self.device = device | |
| self.max_score = max_score | |
| self.min_score = min_score | |
| self.pad_token_id = pad_token_id | |
| self.norm = norm | |
| assert self.norm in ("max", "minmax") | |
| def ids_to_tokens(self, ids): | |
| return [self.tokenizer._convert_id_to_token(id) for id in ids] | |
| def __call__(self, sources=None, summaries=None, normalize_len=False): | |
| summaries = [s if s != "" else " " for s in summaries] # breaks if string is empty | |
| input_ids = [self.tokenizer.encode(text) for text in summaries] | |
| lens = [len(ids) for ids in input_ids] | |
| input_ids = [torch.tensor(ids) for ids in input_ids] | |
| input_ids = pad_sequence( | |
| input_ids, | |
| batch_first=True, | |
| padding_value=self.pad_token_id | |
| ).to(self.device) | |
| with torch.no_grad(): | |
| output = self.model(input_ids=input_ids, labels=input_ids) | |
| logits = output["logits"] | |
| scores = [] | |
| for i in range(logits.size(0)): | |
| i_scores = [] | |
| for j in range(logits.size(1)): | |
| tok_idx = input_ids[i, j] | |
| if tok_idx == self.pad_token_id: | |
| break | |
| score = logits[i, j, tok_idx].item() | |
| i_scores.append(score) | |
| i_score_max = np.mean(i_scores) / self.max_score | |
| i_score_minmax = (np.mean(i_scores) - self.min_score) / (self.max_score - self.min_score) | |
| if self.norm == "max": | |
| i_score = i_score_max | |
| else: | |
| i_score = i_score_minmax | |
| scores.append(i_score) | |
| return scores | |
| class BiEncoderSimilarity: | |
| def __init__( | |
| self, | |
| model_id="all-distilroberta-v1", | |
| device="cuda", | |
| weight=1 | |
| ): | |
| self.model = SentenceTransformer(model_id).to(device) | |
| self.weight = weight | |
| def __call__(self, sources=None, summaries=None): | |
| src_embs = self.model.encode(sources) | |
| sum_embs = self.model.encode(summaries) | |
| scores = [] | |
| for i in range(len(summaries)): | |
| score = pytorch_cos_sim( | |
| src_embs[i].reshape(1, -1), | |
| sum_embs[i].reshape(1, -1), | |
| )[0, 0].item() | |
| scores.append(score) | |
| return scores | |
| class CrossEncoderSimilarity: | |
| def __init__( | |
| self, | |
| model_id="all-distilroberta-v1", | |
| device="cuda", | |
| weight=1 | |
| ): | |
| self.model = CrossEncoder(model_id, device=device) | |
| self.weight = weight | |
| def __call__(self, sources=None, summaries=None): | |
| scores = self.model.predict([ | |
| (src, sum) for src, sum in zip(sources, summaries) | |
| ]) | |
| return scores.tolist() | |
| class SelectedTokenSimilarity: | |
| def __init__( | |
| self, | |
| model_id="all-distilroberta-v1", | |
| device="cuda", | |
| weight=1 | |
| ): | |
| self.model = SentenceTransformer(model_id).to(device) | |
| self.weight = weight | |
| self.tokenizer = model.tokenizer | |
| def ids_to_tokens(self, ids): | |
| return [self.tokenizer._convert_id_to_token(id) for id in ids] | |
| def align_tokens(self, src, summary): | |
| src_ids, sum_ids = self.tokenizer( | |
| [src, summary], | |
| truncation=True, | |
| max_length=self.model.max_seq_length, | |
| ).input_ids | |
| src_tokens = self.ids_to_tokens(src_ids) | |
| sum_tokens = self.ids_to_tokens(sum_ids) | |
| sum_to_src = defaultdict(list) | |
| for i, sum_tok in enumerate(sum_tokens): | |
| for j, src_tok in enumerate(src_tokens): | |
| if sum_tok == src_tok: | |
| sum_to_src[i].append(j) | |
| if len(sum_to_src[i]) == 0: | |
| sum_to_src[i] = None | |
| return sum_to_src | |
| def compute_score(self, x_sum, x_src, sum_to_src): | |
| S = pytorch_cos_sim(x_sum, x_src).cpu().numpy() | |
| scores = [] | |
| for i, J in sum_to_src.items(): | |
| if J is None: | |
| i_score = 0. | |
| else: | |
| i_scores = [S[i, j] for j in J] | |
| i_score = max(i_scores) | |
| scores.append(i_score) | |
| return np.mean(scores) | |
| def __call__(self, sources=None, summaries=None): | |
| src_embs = self.model.encode(sources, output_value="token_embeddings") | |
| sum_embs = self.model.encode(summaries, output_value="token_embeddings") | |
| scores = [] | |
| for i in range(len(summaries)): | |
| x_src = src_embs[i] | |
| x_sum = sum_embs[i] | |
| sum_to_src = self.align_tokens(sources[i], summaries[i]) | |
| score = self.compute_score(x_sum, x_src, sum_to_src) | |
| scores.append(score) | |
| return scores | |
| class NLIReward(): | |
| def __init__( | |
| self, | |
| model_id="cross-encoder/nli-distilroberta-base", | |
| device="cuda", | |
| weight=1 | |
| ): | |
| self.model = CrossEncoder(model_id, device) | |
| self.label_mapping = ['contradiction', 'entailment', 'neutral'] | |
| self.weight = weight | |
| def __call__(self, sources=None, summaries=None): | |
| scores = self.model.predict([ | |
| (src, sum) for src, sum in zip(sources, summaries) | |
| ]) | |
| probs = torch.softmax(torch.tensor(scores), dim=1) | |
| labels = [ | |
| self.label_mapping[score_max] for score_max in scores.argmax(axis=1) | |
| ] | |
| rewards = [probs[i, 1].item() for i in range(len(summaries))] | |
| rewards = [ | |
| (0 if summaries[i].strip()=="" else r) | |
| for i, r in enumerate(rewards) | |
| ] | |
| return rewards | |
| class GaussianLength: | |
| def __init__(self, mean=11, std=0.3, max_len=100, weight=1, device=None): | |
| self.weight = weight | |
| lens = np.arange(0, max_len + 1) | |
| scores = gaussian(lens, mean, std) | |
| scores /= scores.max() | |
| self.len_to_reward = dict((l, scores[l]) for l in lens) | |
| self.max_len = max_len | |
| def __call__(self, sources=None, summaries=None): | |
| lens = [len(word_tokenize(s)) for s in summaries] | |
| scores = [ | |
| self.len_to_reward[l] if l <= self.max_len else 0. | |
| for l in lens | |
| ] | |
| return scores | |
| class GaussianCR: | |
| def __init__(self, mean=0.45, std=0.3, weight=1, device=None): | |
| self.weight = weight | |
| ratios = np.arange(0, 1.1, 0.01) | |
| scores = gaussian(ratios, mean, std) | |
| scores /= scores.max() | |
| self.ratio_to_reward = dict((round(r, 3), s) for r, s in zip(ratios, scores)) | |
| def __call__(self, sources=None, summaries=None): | |
| source_lens = [len(word_tokenize(s)) for s in sources] | |
| summary_lens = [len(word_tokenize(s)) for s in summaries] | |
| ratios = [round(x / y, 2) for x, y in zip(summary_lens, source_lens)] | |
| ratios = [min(1., x) for x in ratios] | |
| return [ | |
| self.ratio_to_reward[round(ratio, 2)] | |
| for ratio in ratios | |
| ] | |
| class NoDaysReward(): | |
| def __init__(self, weight=1, device=None): | |
| self.day_words = [ | |
| "monday", "tuesday", "wednesday", | |
| "thursday", "friday", "saturday", "sunday", | |
| "today", "tomorrow", "yesterday", "tonight" | |
| ] | |
| self.weight = weight | |
| def __call__(self, sources=None, summaries=None): | |
| scores = [] | |
| for s in summaries: | |
| s = s.lower() | |
| if any([w in s for w in self.day_words]): | |
| score = 0. | |
| else: | |
| score = 1. | |
| scores.append(score) | |
| return scores | |
| def gaussian(x, mu, sig): | |
| return np.exp(-np.power(x - mu, 2.) / (2 * np.power(sig, 2.))) | |
| class RougeReward: | |
| def __init__(self, rouge_type="rougeL", weight=1, device=None): | |
| self.rouge_type = rouge_type | |
| self.weight = weight | |
| self.targets = None | |
| def __call__(self, sources=None, summaries=None): | |
| scores = [] | |
| for pred, tgt in zip(summaries, self.targets): | |
| rouge_scores = rouge_scorer.score(tgt, pred) | |
| score = rouge_scores[self.rouge_type].fmeasure | |
| scores.append(score) | |
| return scores | |
| # | |