import numpy as np from scipy import special import torch from transformers import AutoTokenizer from .hashing import get_seed_rng device = torch.device("cuda" if torch.cuda.is_available() else "cpu") class WmDetector(): def __init__(self, tokenizer: AutoTokenizer, ngram: int = 1, seed: int = 0 ): # model config self.tokenizer = tokenizer self.vocab_size = self.tokenizer.vocab_size # watermark config self.ngram = ngram self.seed = seed self.rng = torch.Generator() self.rng.manual_seed(self.seed) def aggregate_scores( self, scores: list[np.array], aggregation: str = 'mean' ) -> float: """Aggregate scores along a text.""" if aggregation == 'sum': return scores.sum(axis=0) elif aggregation == 'mean': return scores.mean(axis=0) elif aggregation == 'max': return scores.max(axis=0) else: raise ValueError(f'Aggregation {aggregation} not supported.') def get_details( self, text: str, scoring_method: str="v2", ntoks_max: int = None, ) -> list[dict]: """ Get score increment for each token in text. Args: text: input text scoring_method: 'none': score all ngrams 'v1': only score tokens for which wm window is unique 'v2': only score unique {wm window+tok} is unique ntoks_max: maximum number of tokens Output: token_details: list of dicts containing token info and scores """ tokens_id = self.tokenizer.encode(text, add_special_tokens=False) if ntoks_max is not None: tokens_id = tokens_id[:ntoks_max] total_len = len(tokens_id) token_details = [] seen_grams = set() # Add initial tokens that can't be scored (not enough context) num_start = min(self.ngram, total_len) for i in range(num_start): token_details.append({ 'token_id': tokens_id[i], 'is_scored': False, 'score': float('nan'), 'token_text': self.tokenizer.decode([tokens_id[i]]) }) # Score remaining tokens for cur_pos in range(self.ngram, total_len): ngram_tokens = tokens_id[cur_pos-self.ngram:cur_pos] is_scored = True if scoring_method == 'v1': tup_for_unique = tuple(ngram_tokens) is_scored = tup_for_unique not in seen_grams if is_scored: seen_grams.add(tup_for_unique) elif scoring_method == 'v2': tup_for_unique = tuple(ngram_tokens + [tokens_id[cur_pos]]) is_scored = tup_for_unique not in seen_grams if is_scored: seen_grams.add(tup_for_unique) score = float('nan') if is_scored: score = self.score_tok(ngram_tokens, tokens_id[cur_pos]) score = float(score) token_details.append({ 'token_id': tokens_id[cur_pos], 'is_scored': is_scored, 'score': score, 'token_text': self.tokenizer.decode([tokens_id[cur_pos]]) }) return token_details def get_pvalues_by_tok( self, token_details: list[dict] ) -> tuple[list[float], dict]: """ Get p-value for each token so far. Args: token_details: list of dicts containing token info and scores from get_details() Returns: tuple containing: - list of p-values, with nan for unscored tokens - dict with auxiliary information: - final_score: final running score - ntoks_scored: final number of scored tokens - final_pvalue: last non-nan pvalue (0.5 if none available) """ pvalues = [] running_score = 0 ntoks_scored = 0 eps = 1e-10 # small constant to avoid numerical issues last_valid_pvalue = 0.5 # default value if no tokens are scored for token in token_details: if token['is_scored']: running_score += token['score'] ntoks_scored += 1 pvalue = self.get_pvalue(running_score, ntoks_scored, eps) last_valid_pvalue = pvalue pvalues.append(pvalue) else: pvalues.append(float('nan')) aux_info = { 'final_score': running_score, 'ntoks_scored': ntoks_scored, 'final_pvalue': last_valid_pvalue } return pvalues, aux_info def score_tok(self, ngram_tokens: list[int], token_id: int): """ for each token in the text, compute the score increment """ raise NotImplementedError def get_pvalue(self, score: float, ntoks: int, eps: float): """ compute the p-value for a couple of score and number of tokens """ raise NotImplementedError class MarylandDetector(WmDetector): def __init__(self, tokenizer: AutoTokenizer, ngram: int = 1, seed: int = 0, gamma: float = 0.5, delta: float = 1.0, **kwargs): super().__init__(tokenizer, ngram, seed, **kwargs) self.gamma = gamma self.delta = delta def score_tok(self, ngram_tokens, token_id): """ score_t = 1 if token_id in greenlist else 0 """ seed = get_seed_rng(self.seed, ngram_tokens) self.rng.manual_seed(seed) scores = torch.zeros(self.vocab_size) vocab_permutation = torch.randperm(self.vocab_size, generator=self.rng) greenlist = vocab_permutation[:int(self.gamma * self.vocab_size)] # gamma * n toks in the greenlist scores[greenlist] = 1 return scores[token_id] def get_pvalue(self, score: int, ntoks: int, eps: float): """ from cdf of a binomial distribution """ pvalue = special.betainc(score, 1 + ntoks - score, self.gamma) return max(pvalue, eps) class MarylandDetectorZ(WmDetector): def __init__(self, tokenizer: AutoTokenizer, ngram: int = 1, seed: int = 0, gamma: float = 0.5, delta: float = 1.0, **kwargs): super().__init__(tokenizer, ngram, seed, **kwargs) self.gamma = gamma self.delta = delta def score_tok(self, ngram_tokens, token_id): """ same as MarylandDetector but using zscore """ seed = get_seed_rng(self.seed, ngram_tokens) self.rng.manual_seed(seed) scores = torch.zeros(self.vocab_size) vocab_permutation = torch.randperm(self.vocab_size, generator=self.rng) greenlist = vocab_permutation[:int(self.gamma * self.vocab_size)] # gamma * n scores[greenlist] = 1 return scores[token_id] def get_pvalue(self, score: int, ntoks: int, eps: float): """ from cdf of a normal distribution """ zscore = (score - self.gamma * ntoks) / np.sqrt(self.gamma * (1 - self.gamma) * ntoks) pvalue = 0.5 * special.erfc(zscore / np.sqrt(2)) return max(pvalue, eps) class OpenaiDetector(WmDetector): def __init__(self, tokenizer: AutoTokenizer, ngram: int = 1, seed: int = 0, **kwargs): super().__init__(tokenizer, ngram, seed, **kwargs) def score_tok(self, ngram_tokens, token_id): """ score_t = -log(1 - rt[token_id]]) """ seed = get_seed_rng(self.seed, ngram_tokens) self.rng.manual_seed(seed) rs = torch.rand(self.vocab_size, generator=self.rng) # n scores = -(1 - rs).log() return scores[token_id] def get_pvalue(self, score: float, ntoks: int, eps: float): """ from cdf of a gamma distribution """ pvalue = special.gammaincc(ntoks, score) return max(pvalue, eps) class OpenaiDetectorZ(WmDetector): def __init__(self, tokenizer: AutoTokenizer, ngram: int = 1, seed: int = 0, **kwargs): super().__init__(tokenizer, ngram, seed, **kwargs) def score_tok(self, ngram_tokens, token_id): """ same as OpenaiDetector but using zscore """ seed = get_seed_rng(self.seed, ngram_tokens) self.rng.manual_seed(seed) rs = torch.rand(self.vocab_size, generator=self.rng) # n scores = -(1 - rs).log() return scores[token_id] def get_pvalue(self, score: float, ntoks: int, eps: float): """ from cdf of a normal distribution """ mu0 = 1 sigma0 = np.pi / np.sqrt(6) zscore = (score/ntoks - mu0) / (sigma0 / np.sqrt(ntoks)) pvalue = 0.5 * special.erfc(zscore / np.sqrt(2)) return max(pvalue, eps)