|
""" |
|
This file contains the code to watermark given sentences using PECCAVI |
|
""" |
|
import os |
|
import sys |
|
import time |
|
import random |
|
import torch |
|
from utils.paraphraser import Paraphraser |
|
from utils.entailment import EntailmentAnalyzer |
|
from utils.sampling import SamplingProcessor |
|
|
|
from utils.non_melting_point import NgramProcessor |
|
from utils.masking_methods import MaskingProcessor |
|
from tqdm import tqdm |
|
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) |
|
from renderers.highlighter import highlight_common_words,reparaphrased_sentences_html |
|
from renderers.tree import generate_subplot1, generate_subplot2 |
|
from renderers.plot_3d import gen_three_D_plot |
|
|
|
|
|
|
|
from transformers import pipeline, AutoTokenizer, AutoModelForMaskedLM |
|
from transformers import BertTokenizer, BertForMaskedLM |
|
from pathlib import Path |
|
|
|
|
|
from utils.config import load_config |
|
import logging |
|
|
|
project_root = Path(__file__).parent.parent |
|
config_path = project_root / "utils" / "config.yaml" |
|
|
|
|
|
logging.basicConfig(level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
|
|
class Watermarker: |
|
def __init__(self, config): |
|
self.config = config |
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
tqdm.write(f"[Watermarker] Initializing on device: {self.device}") |
|
self.user_prompt = None |
|
self.paraphrased_sentences = None |
|
self.analyzed_paraphrased_sentences = None |
|
self.selected_sentences = None |
|
self.discarded_sentences = None |
|
self.common_grams = None |
|
|
|
self.common_grams_position = None |
|
self.masked_sentences = None |
|
self.masked_words = None |
|
self.masked_logits = None |
|
self.sampled_sentences = None |
|
self.reparaphrased_sentences = None |
|
self.distortion_list = None |
|
self.detectability_list = None |
|
self.euclidean_dist_list = None |
|
|
|
self.masking_strategies = ['random', 'pseudorandom','entropy'] |
|
self.sampling_strategies = ['inverse_transform', 'exponential_minimum', 'temperature', 'greedy'] |
|
self.masking_results = dict() |
|
self.sampling_results = dict() |
|
|
|
|
|
self.tokenizer = BertTokenizer.from_pretrained("bert-large-cased-whole-word-masking") |
|
self.model = BertForMaskedLM.from_pretrained("bert-large-cased-whole-word-masking").to(self.device) |
|
|
|
self.paraphraser = Paraphraser(self.config['Paraphrase']) |
|
self.entailment_analyzer = EntailmentAnalyzer(self.config['Entailment']) |
|
self.ngram_processor = NgramProcessor() |
|
self.masker = MaskingProcessor(self.tokenizer, self.model) |
|
self.sampler = SamplingProcessor(self.tokenizer) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def Paraphrase(self, prompt:str, threshold:int=0.7): |
|
""" |
|
This function paraphrases the given prompt using PECCAVI |
|
Args: |
|
prompt: str: The prompt to be paraphrased |
|
threshold: int: The threshold for the similarity score |
|
Returns: |
|
str: The paraphrased sentence |
|
""" |
|
start_time = time.time() |
|
self.user_prompt = prompt |
|
self.paraphrased_sentences = self.paraphraser.paraphrase(self.user_prompt) |
|
if self.paraphrased_sentences is None: |
|
print("Error in generating paraphrases", "Error: Could not complete step") |
|
return None |
|
|
|
self.analyzed_paraphrased_sentences, self.selected_sentences, self.discarded_sentences = self.entailment_analyzer.analyze_entailment(self.user_prompt, self.paraphrased_sentences, threshold) |
|
|
|
self.selected_sentences_list = [key for key in self.selected_sentences.keys()] |
|
self.discarded_sentences_list = [key for key in self.discarded_sentences.keys()] |
|
self.full_list = self.selected_sentences_list.copy() |
|
self.full_list.extend(self.discarded_sentences_list) |
|
self.full_list.append(self.user_prompt) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.common_grams = self.ngram_processor.find_filtered_ngrams(self.full_list) |
|
print(f"Common grams: {self.common_grams}") |
|
|
|
if self.user_prompt in self.full_list: |
|
self.full_list.remove(self.user_prompt) |
|
|
|
|
|
|
|
|
|
|
|
execution_time = time.time() - start_time |
|
time_info = f"Step 1 completed in {execution_time:.2f} seconds" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def Masking(self) : |
|
""" |
|
For each masking strategy in self.masking_strategies, mask the sentences in self.selected_sentences_list |
|
Return structure: |
|
{ |
|
"<masking_strategy1>": |
|
{ |
|
"Original sentence 1": |
|
{ |
|
"masked_sentence": "The sentence with appropriate [MASK] tokens", |
|
"mask_logits": |
|
{ |
|
3: |
|
{ # Example: mask index 3 |
|
"tokens": ["word1", "word2", ...], # Top predicted tokens |
|
"logits": [score1, score2, ...] # Corresponding predicted scores |
|
}, |
|
7: |
|
{ |
|
"tokens": ["wordA", "wordB", ...], |
|
"logits": [scoreA, scoreB, ...] |
|
}, |
|
# ... possibly additional mask positions |
|
} |
|
}, |
|
"Original sentence 2": |
|
{ |
|
"masked_sentence": "Another masked sentence", |
|
"mask_logits": { ... } |
|
}, |
|
# ... more sentences processed for this strategy |
|
}, |
|
"<masking_strategy2>": |
|
{ |
|
# Similar structure for each original sentence processed with masking_strategy2 |
|
}, |
|
# ... additional masking strategies if defined in self.masking_strategies |
|
} |
|
""" |
|
tqdm.write("[Watermarker] Starting Masking process.") |
|
for strategy in self.masking_strategies: |
|
tqdm.write(f"[Watermarker] Processing masking strategy: {strategy}") |
|
results = self.masker.process_sentences(self.full_list, self.common_grams, strategy) |
|
self.masking_results[strategy] = results |
|
tqdm.write("[Watermarker] Masking process completed.") |
|
return self.masking_results |
|
|
|
|
|
def Sampling(self) : |
|
""" |
|
For each masking strategy in self.masking_results, sample a sentence from the |
|
masked sentences using the given sampling strategy. |
|
Return structure: |
|
{ |
|
"inverse_transform (SAMPLING STRATEGY)": |
|
{ |
|
"random (MASKING STRATEGY)": |
|
{ |
|
"Original sentence 1": |
|
{ |
|
"masked_sentence": "Masked version of sentence 1", |
|
"sampled_sentence": "Sampled version of sentence 1" |
|
}, |
|
"Original sentence 2": |
|
{ |
|
"masked_sentence": "Masked version of sentence 2", |
|
"sampled_sentence": "Sampled version of sentence 2" |
|
}, |
|
# ... additional original sentences |
|
}, |
|
"pseudorandom": |
|
{ |
|
# Similar structure for each original sentence |
|
}, |
|
"entropy": |
|
{ |
|
# Similar structure for each original sentence |
|
}, |
|
}, |
|
"exponential_minimum": |
|
{ |
|
# Similar nested dictionaries for each masking strategy and original sentence |
|
}, |
|
"greedy": |
|
{ |
|
# Similar nested dictionaries for each masking strategy and original sentence |
|
} |
|
} |
|
""" |
|
tqdm.write("[Watermarker] Starting Sampling process.") |
|
for strategy in self.sampling_strategies: |
|
tqdm.write(f"[Watermarker] Processing sampling strategy: {strategy}") |
|
self.sampling_results[strategy] = {} |
|
for mask_strategy in self.masking_strategies: |
|
results = self.sampler.process_masked_sentences( |
|
self.masking_results[mask_strategy], |
|
sampling_technique=strategy, |
|
temperature=1.0 |
|
) |
|
self.sampling_results[strategy][mask_strategy] = results |
|
tqdm.write("[Watermarker] Sampling process completed.") |
|
return self.sampling_results |
|
|
|
def re_paraphrasing(self): |
|
tqdm.write("[Watermarker] Starting re-paraphrasing process.") |
|
self.reparaphrasing_results = {} |
|
for sampling_strategy, mask_dict in tqdm(self.sampling_results.items(), desc="Sampling Strategies", leave=True): |
|
self.reparaphrasing_results[sampling_strategy] = {} |
|
for mask_strategy, sentences_data in tqdm(mask_dict.items(), desc="Masking Strategies", leave=False): |
|
self.reparaphrasing_results[sampling_strategy][mask_strategy] = {} |
|
for original_sentence, result in tqdm(sentences_data.items(), desc="Sentences", leave=False): |
|
sampled_sentence = result.get("sampled_sentence", None) |
|
if sampled_sentence: |
|
new_paraphrases = self.paraphraser.paraphrase(sampled_sentence, |
|
num_return_sequences=10, |
|
num_beams=10) |
|
else: |
|
new_paraphrases = [] |
|
self.reparaphrasing_results[sampling_strategy][mask_strategy][original_sentence] = { |
|
"masking_strategy": mask_strategy, |
|
"sampling_strategy": sampling_strategy, |
|
"sampled_sentence": sampled_sentence, |
|
"re_paraphrased_sentences": new_paraphrases |
|
} |
|
tqdm.write("[Watermarker] Re-paraphrasing process completed.") |
|
return self.reparaphrasing_results |
|
|
|
def calculate_distortion(self): |
|
return None |
|
|
|
if __name__ == "__main__": |
|
|
|
config = load_config(config_path)['PECCAVI_TEXT'] |
|
watermarker = Watermarker(config) |
|
|
|
logger.info("Starting main Watermarker process.") |
|
print("==> Paraphrasing:") |
|
watermarker.Paraphrase("The quick brown fox jumps over small cat the lazy dog everyday again and again.") |
|
logger.info("Paraphrasing completed.") |
|
|
|
|
|
results_str = [] |
|
results_str.append("========== WATERMARKING RESULTS ==========\n\n") |
|
|
|
|
|
results_str.append("==> Common N-grams:\n") |
|
if watermarker.common_grams: |
|
for ngram, positions in watermarker.common_grams.items(): |
|
results_str.append(f" {ngram}: {positions}\n") |
|
else: |
|
results_str.append(" No common n-grams found.\n") |
|
|
|
|
|
results_str.append("\n==> Selected Sentences:\n") |
|
if watermarker.selected_sentences: |
|
for sentence in watermarker.selected_sentences: |
|
results_str.append(f" {sentence}\n") |
|
else: |
|
results_str.append(" No selected sentences available.\n") |
|
|
|
|
|
results_str.append("\n==> Masking Results:\n") |
|
masking_results = watermarker.Masking() |
|
for masking_strategy, results_dict in masking_results.items(): |
|
results_str.append(f"\n-- Masking Strategy: {masking_strategy} --\n") |
|
for original_sentence, data in results_dict.items(): |
|
masked_sentence = data.get("masked_sentence", "") |
|
results_str.append("Original:\n") |
|
results_str.append(f" {original_sentence}\n") |
|
results_str.append("Masked:\n") |
|
results_str.append(f" {masked_sentence}\n") |
|
results_str.append("-----\n") |
|
|
|
|
|
results_str.append("\n==> Sampling Results:\n") |
|
sampling_results = watermarker.Sampling() |
|
for sampling_strategy, mask_strategy_dict in sampling_results.items(): |
|
results_str.append(f"\n-- Sampling Strategy: {sampling_strategy} --\n") |
|
for mask_strategy, sentences in mask_strategy_dict.items(): |
|
results_str.append(f"\n Masking Strategy: {mask_strategy}\n") |
|
for original_sentence, res in sentences.items(): |
|
masked_sentence = res.get("masked_sentence", "") |
|
sampled_sentence = res.get("sampled_sentence", "") |
|
results_str.append(" Original:\n") |
|
results_str.append(f" {original_sentence}\n") |
|
results_str.append(" Masked:\n") |
|
results_str.append(f" {masked_sentence}\n") |
|
results_str.append(" Sampled:\n") |
|
results_str.append(f" {sampled_sentence}\n") |
|
results_str.append(" -----\n") |
|
|
|
|
|
results_str.append("\n==> Re-paraphrasing Results:\n") |
|
reparaphrasing_results = watermarker.re_paraphrasing() |
|
for sampling_strategy, mask_dict in reparaphrasing_results.items(): |
|
results_str.append(f"\n-- Sampling Strategy: {sampling_strategy} --\n") |
|
for mask_strategy, orig_sentence_dict in mask_dict.items(): |
|
results_str.append(f"\n Masking Strategy: {mask_strategy}\n") |
|
for original_sentence, data in orig_sentence_dict.items(): |
|
sampled_sentence = data.get("sampled_sentence", "") |
|
re_paraphrases = data.get("re_paraphrased_sentences", []) |
|
results_str.append(" Original:\n") |
|
results_str.append(f" {original_sentence}\n") |
|
results_str.append(" Sampled:\n") |
|
results_str.append(f" {sampled_sentence}\n") |
|
results_str.append(" Re-paraphrased (first 3 examples):\n") |
|
|
|
for idx, rp in enumerate(re_paraphrases[:3]): |
|
results_str.append(f" {idx+1}. {rp}\n") |
|
results_str.append(" -----\n") |
|
|
|
|
|
output_file = "watermarking_results.txt" |
|
with open(output_file, "w", encoding="utf-8") as f: |
|
f.writelines(results_str) |
|
|
|
logger.info("Writing results to output file.") |
|
print("\nResults have been written to", output_file) |