Spaces:
Running
Running
| import numpy as np | |
| from scipy import special | |
| import torch | |
| from transformers import AutoTokenizer | |
| from .hashing import get_seed_rng | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| class WmDetector(): | |
| def __init__(self, | |
| tokenizer: AutoTokenizer, | |
| ngram: int = 1, | |
| seed: int = 0 | |
| ): | |
| # model config | |
| self.tokenizer = tokenizer | |
| self.vocab_size = self.tokenizer.vocab_size | |
| # watermark config | |
| self.ngram = ngram | |
| self.seed = seed | |
| self.rng = torch.Generator() | |
| self.rng.manual_seed(self.seed) | |
| def aggregate_scores( | |
| self, | |
| scores: list[np.array], | |
| aggregation: str = 'mean' | |
| ) -> float: | |
| """Aggregate scores along a text.""" | |
| if aggregation == 'sum': | |
| return scores.sum(axis=0) | |
| elif aggregation == 'mean': | |
| return scores.mean(axis=0) | |
| elif aggregation == 'max': | |
| return scores.max(axis=0) | |
| else: | |
| raise ValueError(f'Aggregation {aggregation} not supported.') | |
| def get_details( | |
| self, | |
| text: str, | |
| scoring_method: str="v2", | |
| ntoks_max: int = None, | |
| ) -> list[dict]: | |
| """ | |
| Get score increment for each token in text. | |
| Args: | |
| text: input text | |
| scoring_method: | |
| 'none': score all ngrams | |
| 'v1': only score tokens for which wm window is unique | |
| 'v2': only score unique {wm window+tok} is unique | |
| ntoks_max: maximum number of tokens | |
| Output: | |
| token_details: list of dicts containing token info and scores | |
| """ | |
| tokens_id = self.tokenizer.encode(text, add_special_tokens=False) | |
| if ntoks_max is not None: | |
| tokens_id = tokens_id[:ntoks_max] | |
| total_len = len(tokens_id) | |
| token_details = [] | |
| seen_grams = set() | |
| # Add initial tokens that can't be scored (not enough context) | |
| num_start = min(self.ngram, total_len) | |
| for i in range(num_start): | |
| token_details.append({ | |
| 'token_id': tokens_id[i], | |
| 'is_scored': False, | |
| 'score': float('nan'), | |
| 'token_text': self.tokenizer.decode([tokens_id[i]]) | |
| }) | |
| # Score remaining tokens | |
| for cur_pos in range(self.ngram, total_len): | |
| ngram_tokens = tokens_id[cur_pos-self.ngram:cur_pos] | |
| is_scored = True | |
| if scoring_method == 'v1': | |
| tup_for_unique = tuple(ngram_tokens) | |
| is_scored = tup_for_unique not in seen_grams | |
| if is_scored: | |
| seen_grams.add(tup_for_unique) | |
| elif scoring_method == 'v2': | |
| tup_for_unique = tuple(ngram_tokens + [tokens_id[cur_pos]]) | |
| is_scored = tup_for_unique not in seen_grams | |
| if is_scored: | |
| seen_grams.add(tup_for_unique) | |
| score = float('nan') | |
| if is_scored: | |
| score = self.score_tok(ngram_tokens, tokens_id[cur_pos]) | |
| score = float(score) | |
| token_details.append({ | |
| 'token_id': tokens_id[cur_pos], | |
| 'is_scored': is_scored, | |
| 'score': score, | |
| 'token_text': self.tokenizer.decode([tokens_id[cur_pos]]) | |
| }) | |
| return token_details | |
| def get_pvalues_by_tok( | |
| self, | |
| token_details: list[dict] | |
| ) -> tuple[list[float], dict]: | |
| """ | |
| Get p-value for each token so far. | |
| Args: | |
| token_details: list of dicts containing token info and scores from get_details() | |
| Returns: | |
| tuple containing: | |
| - list of p-values, with nan for unscored tokens | |
| - dict with auxiliary information: | |
| - final_score: final running score | |
| - ntoks_scored: final number of scored tokens | |
| - final_pvalue: last non-nan pvalue (0.5 if none available) | |
| """ | |
| pvalues = [] | |
| running_score = 0 | |
| ntoks_scored = 0 | |
| eps = 1e-10 # small constant to avoid numerical issues | |
| last_valid_pvalue = 0.5 # default value if no tokens are scored | |
| for token in token_details: | |
| if token['is_scored']: | |
| running_score += token['score'] | |
| ntoks_scored += 1 | |
| pvalue = self.get_pvalue(running_score, ntoks_scored, eps) | |
| last_valid_pvalue = pvalue | |
| pvalues.append(pvalue) | |
| else: | |
| pvalues.append(float('nan')) | |
| aux_info = { | |
| 'final_score': running_score, | |
| 'ntoks_scored': ntoks_scored, | |
| 'final_pvalue': last_valid_pvalue | |
| } | |
| return pvalues, aux_info | |
| def score_tok(self, ngram_tokens: list[int], token_id: int): | |
| """ for each token in the text, compute the score increment """ | |
| raise NotImplementedError | |
| def get_pvalue(self, score: float, ntoks: int, eps: float): | |
| """ compute the p-value for a couple of score and number of tokens """ | |
| raise NotImplementedError | |
| class MarylandDetector(WmDetector): | |
| def __init__(self, | |
| tokenizer: AutoTokenizer, | |
| ngram: int = 1, | |
| seed: int = 0, | |
| gamma: float = 0.5, | |
| delta: float = 1.0, | |
| **kwargs): | |
| super().__init__(tokenizer, ngram, seed, **kwargs) | |
| self.gamma = gamma | |
| self.delta = delta | |
| def score_tok(self, ngram_tokens, token_id): | |
| """ | |
| score_t = 1 if token_id in greenlist else 0 | |
| """ | |
| seed = get_seed_rng(self.seed, ngram_tokens) | |
| self.rng.manual_seed(seed) | |
| scores = torch.zeros(self.vocab_size) | |
| vocab_permutation = torch.randperm(self.vocab_size, generator=self.rng) | |
| greenlist = vocab_permutation[:int(self.gamma * self.vocab_size)] # gamma * n toks in the greenlist | |
| scores[greenlist] = 1 | |
| return scores[token_id] | |
| def get_pvalue(self, score: int, ntoks: int, eps: float): | |
| """ from cdf of a binomial distribution """ | |
| pvalue = special.betainc(score, 1 + ntoks - score, self.gamma) | |
| return max(pvalue, eps) | |
| class MarylandDetectorZ(WmDetector): | |
| def __init__(self, | |
| tokenizer: AutoTokenizer, | |
| ngram: int = 1, | |
| seed: int = 0, | |
| gamma: float = 0.5, | |
| delta: float = 1.0, | |
| **kwargs): | |
| super().__init__(tokenizer, ngram, seed, **kwargs) | |
| self.gamma = gamma | |
| self.delta = delta | |
| def score_tok(self, ngram_tokens, token_id): | |
| """ same as MarylandDetector but using zscore """ | |
| seed = get_seed_rng(self.seed, ngram_tokens) | |
| self.rng.manual_seed(seed) | |
| scores = torch.zeros(self.vocab_size) | |
| vocab_permutation = torch.randperm(self.vocab_size, generator=self.rng) | |
| greenlist = vocab_permutation[:int(self.gamma * self.vocab_size)] # gamma * n | |
| scores[greenlist] = 1 | |
| return scores[token_id] | |
| def get_pvalue(self, score: int, ntoks: int, eps: float): | |
| """ from cdf of a normal distribution """ | |
| zscore = (score - self.gamma * ntoks) / np.sqrt(self.gamma * (1 - self.gamma) * ntoks) | |
| pvalue = 0.5 * special.erfc(zscore / np.sqrt(2)) | |
| return max(pvalue, eps) | |
| class OpenaiDetector(WmDetector): | |
| def __init__(self, | |
| tokenizer: AutoTokenizer, | |
| ngram: int = 1, | |
| seed: int = 0, | |
| **kwargs): | |
| super().__init__(tokenizer, ngram, seed, **kwargs) | |
| def score_tok(self, ngram_tokens, token_id): | |
| """ | |
| score_t = -log(1 - rt[token_id]]) | |
| """ | |
| seed = get_seed_rng(self.seed, ngram_tokens) | |
| self.rng.manual_seed(seed) | |
| rs = torch.rand(self.vocab_size, generator=self.rng) # n | |
| scores = -(1 - rs).log() | |
| return scores[token_id] | |
| def get_pvalue(self, score: float, ntoks: int, eps: float): | |
| """ from cdf of a gamma distribution """ | |
| pvalue = special.gammaincc(ntoks, score) | |
| return max(pvalue, eps) | |
| class OpenaiDetectorZ(WmDetector): | |
| def __init__(self, | |
| tokenizer: AutoTokenizer, | |
| ngram: int = 1, | |
| seed: int = 0, | |
| **kwargs): | |
| super().__init__(tokenizer, ngram, seed, **kwargs) | |
| def score_tok(self, ngram_tokens, token_id): | |
| """ same as OpenaiDetector but using zscore """ | |
| seed = get_seed_rng(self.seed, ngram_tokens) | |
| self.rng.manual_seed(seed) | |
| rs = torch.rand(self.vocab_size, generator=self.rng) # n | |
| scores = -(1 - rs).log() | |
| return scores[token_id] | |
| def get_pvalue(self, score: float, ntoks: int, eps: float): | |
| """ from cdf of a normal distribution """ | |
| mu0 = 1 | |
| sigma0 = np.pi / np.sqrt(6) | |
| zscore = (score/ntoks - mu0) / (sigma0 / np.sqrt(ntoks)) | |
| pvalue = 0.5 * special.erfc(zscore / np.sqrt(2)) | |
| return max(pvalue, eps) | |