Spaces:
Runtime error
Runtime error
# adversarial_framework.py | |
from typing import Literal, Dict, List, Tuple | |
from difflib import SequenceMatcher | |
from sentence_transformers import SentenceTransformer, util | |
from numpy.polynomial.polynomial import Polynomial | |
import nlpaug.augmenter.word as naw | |
import nltk | |
import numpy as np | |
import pandas as pd | |
import base64 | |
from datetime import datetime | |
from io import BytesIO | |
import matplotlib.pyplot as plt | |
nltk.download('averaged_perceptron_tagger_eng') | |
class StatisticalEvaluator: | |
""" | |
Computes statistical insights over response similarity scores. | |
Useful for summarizing adversarial robustness. | |
""" | |
def __init__(self, scores: List[float]): | |
self.scores = np.array(scores) | |
def mean(self) -> float: | |
return round(np.mean(self.scores), 2) | |
def median(self) -> float: | |
return round(np.median(self.scores), 2) | |
def variance(self) -> float: | |
return round(np.var(self.scores), 2) | |
def std_dev(self) -> float: | |
return round(np.std(self.scores), 2) | |
def min_score(self) -> float: | |
return round(np.min(self.scores), 2) | |
def max_score(self) -> float: | |
return round(np.max(self.scores), 2) | |
def summary(self) -> Dict[str, float]: | |
return { | |
"mean": self.mean(), | |
"median": self.median(), | |
"std_dev": self.std_dev(), | |
"variance": self.variance(), | |
"min": self.min_score(), | |
"max": self.max_score(), | |
} | |
class SimilarityCalculator: | |
def __init__(self, model_name: str = "sentence-transformers/paraphrase-MiniLM-L3-v2"): | |
self.model = SentenceTransformer(model_name) | |
def cosine_similarity(self, original: str, perturbed: str) -> float: | |
emb1 = self.model.encode(original, convert_to_tensor=True) | |
emb2 = self.model.encode(perturbed, convert_to_tensor=True) | |
raw_score = util.pytorch_cos_sim(emb1, emb2).item() | |
clamped_score = max(0.0, min(raw_score, 1.0)) | |
return round(clamped_score * 100, 2) | |
def sequence_similarity(self, original: str, perturbed: str) -> float: | |
return round(SequenceMatcher(None, original, perturbed).ratio() * 100, 2) | |
class AdversarialRiskCalculator: | |
def __init__(self, alpha: float = 2, beta: float = 1.5): | |
self.alpha = alpha | |
self.beta = beta | |
def compute_ari(self, query_sim: float, response_sim: float) -> float: | |
q, r = query_sim / 100, response_sim / 100 | |
ari = ((1 - r) ** self.alpha) * ((1 + (1 - q)) ** self.beta) | |
return round(ari * 100, 2) | |
class PSCAnalyzer: | |
def __init__(self, degree: int = 5, r: int = 10): | |
self.r = r | |
self.degree = degree | |
def _bin_data(self, x: np.ndarray, y: np.ndarray, mode='max') -> Tuple[np.ndarray, np.ndarray]: | |
bins = np.linspace(min(x), max(x), self.r + 1) | |
best_x, best_y = [], [] | |
for i in range(self.r): | |
mask = (x >= bins[i]) & (x < bins[i + 1]) | |
sub_x, sub_y = x[mask], y[mask] | |
if len(sub_x) > 0: | |
if mode == 'max': | |
idx = np.argmax(sub_y) | |
elif mode == 'min': | |
idx = np.argmin(sub_y) | |
else: | |
raise ValueError("mode must be 'max' or 'min'") | |
best_x.append(sub_x[idx]) | |
best_y.append(sub_y[idx]) | |
return np.array(best_x), np.array(best_y) | |
def fit_and_auc(self, x, y): | |
x = np.array(x) | |
y = np.array(y) | |
coeffs = np.polyfit(x, y, self.degree) | |
poly_fn = np.poly1d(coeffs) | |
fitted_y = poly_fn(x) | |
auc_val = round(np.trapz(fitted_y, x), 4) | |
return auc_val, fitted_y | |
def plot_curve(self, x: np.ndarray, y: np.ndarray, fitted: np.ndarray, title: str, label: str, save_path=None): | |
plt.figure(figsize=(8, 5)) | |
plt.plot(x, y, 'o', label='Sampled Points') | |
plt.plot(x, fitted, '--', label='Fitted Curve') | |
plt.xlabel('Perturbation / Queries') | |
plt.ylabel(label) | |
plt.title(title) | |
plt.legend() | |
plt.grid(True) | |
if save_path: | |
plt.savefig(save_path) | |
plt.show() | |
def evaluate(self, x_vals: List[float], y_vals: List[float], mode: str = 'max', label: str = 'ASR Curve') -> float: | |
x, y = self._bin_data(np.array(x_vals), np.array(y_vals), mode=mode) | |
auc_val, fitted = self.fit_and_auc(x, y) | |
self.plot_curve(x, y, fitted, title=f"PSC-{label}", label=label) | |
return auc_val | |
def run_psc_analysis(self, x_vals: List[float], y_vals: List[float], save_csv: str ="psc_export.csv", plot : bool = True): | |
auc = self.evaluate(x_vals, y_vals, mode="max", label="Semantic Similarity" if plot else "") | |
df = pd.DataFrame({"perturbation_level": x_vals, "response_similarity": y_vals}) | |
df.to_csv(save_csv, index=False) | |
print(f"π PSC-AUC: {auc} | π CSV saved to: {save_csv}") | |
return auc | |
class TextPerturber: | |
def __init__(self): | |
self.methods = { | |
"synonym": naw.SynonymAug(aug_src='wordnet'), | |
"delete": naw.RandomWordAug(action="delete"), | |
"contextual": naw.ContextualWordEmbsAug() | |
} | |
def perturb(self, input_text: str, aug_method: Literal["synonym", "delete", "contextual"] = "synonym") -> str: | |
if aug_method not in self.methods: | |
raise ValueError(f"Invalid method '{aug_method}'. Choose from {list(self.methods.keys())}.") | |
result = self.methods[aug_method].augment(input_text) | |
return result[0] if isinstance(result, list) else result | |
class AdversarialAttackPipeline: | |
def __init__(self, answer_generator): | |
self.similarity = SimilarityCalculator() | |
self.risk_calculator = AdversarialRiskCalculator() | |
self.perturber = TextPerturber() | |
self.answer_generator = answer_generator | |
def run(self, query: str, top_k: int = 3, perturb_method: str = "synonym") -> Dict: | |
normal_response = self.answer_generator(query, top_k) | |
perturbed_query = self.perturber.perturb(query, perturb_method) | |
adversarial_response = self.perturber.perturb(normal_response, perturb_method) | |
perturbed_response = self.answer_generator(perturbed_query, top_k) | |
cos_metrics = { | |
"query_sim": self.similarity.cosine_similarity(query, perturbed_query), | |
"adversarial_sim": self.similarity.cosine_similarity(normal_response, adversarial_response), | |
"response_sim": self.similarity.cosine_similarity(normal_response, perturbed_response), | |
} | |
seq_metrics = { | |
"query_seq_match": self.similarity.sequence_similarity(query, perturbed_query), | |
"adv_seq_match": self.similarity.sequence_similarity(normal_response, adversarial_response), | |
"resp_seq_match": self.similarity.sequence_similarity(normal_response, perturbed_response), | |
} | |
ari = self.risk_calculator.compute_ari(cos_metrics['query_sim'], cos_metrics['response_sim']) | |
self._print_report(query, normal_response, perturbed_query, perturbed_response, adversarial_response, cos_metrics, seq_metrics, ari) | |
return { | |
"normal_query": query, | |
"normal_response": normal_response, | |
"perturbed_query": perturbed_query, | |
"perturbed_response": perturbed_response, | |
"adversarial_response": adversarial_response, | |
"cos_sim": cos_metrics, | |
"seq_match": seq_metrics, | |
"ari": ari, | |
} | |
def _print_report(self, query, normal, pert_q, pert_r, adv_r, cos, seq, ari): | |
print("π΅ Original Query:", query) | |
print("\nπ’ Normal Response:", normal) | |
print("\nπ΄ Direct Perturbation of Generated Response:", adv_r) | |
print("\nπ Perturbed Query:", pert_q) | |
print("\nπ΄ Perturbed Response:", pert_r) | |
print(f"\nπ Cosine Sim β Perturbed Query: {cos['query_sim']}%, Adversarial: {cos['adversarial_sim']}%, Perturbed Response: {cos['response_sim']}%") | |
print(f"\nπ Seq Match β Perturbed Query: {seq['query_seq_match']}%, Adversarial: {seq['adv_seq_match']}%, Perturbed Response: {seq['resp_seq_match']}%") | |
print(f"\nπΊ ARI (Adversarial Risk Index): {ari}") | |
def plot_to_base64(self, fig): | |
""" | |
Converts a Matplotlib figure to a base64-encoded image string. | |
Useful for sending plots in web apps or saving as embeddable outputs. | |
""" | |
buf = BytesIO() | |
fig.savefig(buf, format='png') | |
buf.seek(0) | |
image_base64 = base64.b64encode(buf.read()).decode('utf-8') | |
buf.close() | |
return f"data:image/png;base64,{image_base64}" | |
def evaluate_adversarial_robustness(self, query, method, k, psc_degree: int = 4, | |
ep_min: float = 0.1, ep_max: float = 4.1, | |
ep_gap: float = 0.2): | |
""" | |
Evaluate semantic robustness of the pipeline over increasing perturbation intensities. | |
Added as instance method so pipeline context (e.g. answer generator) does not need to be recreated. | |
""" | |
epsilons = np.arange(ep_min, ep_max, ep_gap) | |
x_vals, y_vals, ari_vals = [], [], [] | |
for epsilon in epsilons: | |
result = self.run(query=query, top_k=k, perturb_method=method) | |
x_vals.append(round(epsilon, 2)) | |
y_vals.append(result['cos_sim']['response_sim']) | |
ari_vals.append(result['ari']) | |
auc = PSCAnalyzer(degree=psc_degree, r=10).evaluate(x_vals, y_vals, mode="max", label="Semantic Similarity") | |
stats = StatisticalEvaluator(y_vals).summary() | |
stats_text = "\n".join([f"{k}: {v}" for k, v in stats.items()]) | |
# Save CSV with full results and stats | |
df = pd.DataFrame({ | |
"Perturbation_Level": x_vals, | |
"Response_Similarity": y_vals, | |
"ARI": ari_vals | |
}) | |
summary_df = pd.DataFrame.from_dict(stats, orient='index', columns=["Response_Similarity_Stats"]) | |
summary_df.reset_index(inplace=True) | |
summary_df.rename(columns={"index": "Metric"}, inplace=True) | |
export_df = pd.concat([df, pd.DataFrame([{}]), summary_df], ignore_index=True) | |
export_df.to_csv("gradio_output.csv", index=False) | |
coeffs = np.polyfit(x_vals, y_vals, psc_degree) | |
poly_fn = np.poly1d(coeffs) | |
fitted = poly_fn(x_vals) | |
fig, ax = plt.subplots() | |
ax.plot(x_vals, y_vals, 'o', label='Sampled Points') | |
ax.plot(x_vals, fitted, '--', label='Fitted Curve') | |
ax.set_xlabel('Perturbation Level (Epsilon)') | |
ax.set_ylabel('Semantic Similarity') | |
ax.set_title('Perturbation Sensitivity Curve (PSC)') | |
ax.legend() | |
ax.grid(True) | |
return stats_text, auc, fig | |