Spaces:
Sleeping
Sleeping
import os | |
import numpy as np | |
from sklearn.metrics import classification_report | |
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score | |
from sklearn.preprocessing import MultiLabelBinarizer | |
import zipfile | |
import json | |
import pandas as pd | |
import torch | |
from datasets import Dataset | |
from torch.utils.data import DataLoader | |
import requests | |
from pathlib import Path | |
from .config import TAG_NAMES, DEVICE, SPACE_URL, EVAL_LIMIT | |
from .globals import global_model, global_tokenizer | |
print(np.__version__) | |
def _load_data(test_data_path): | |
eval_limit = os.getenv("EVAL_LIM") | |
if not eval_limit: | |
eval_limit = EVAL_LIMIT | |
test_data_path = Path(__file__).parent / test_data_path | |
# Check file existence | |
if not os.path.exists(test_data_path): | |
raise FileNotFoundError( | |
f"ZIP file not found at {test_data_path}. " | |
f"Current directory: {os.listdir(Path(__file__).parent)}" | |
) | |
if not zipfile.is_zipfile(test_data_path): | |
raise zipfile.BadZipFile(f"File is not a valid zip archive: {test_data_path}") | |
data = [] | |
features = ["prob_desc_description", "prob_desc_input_spec", "prob_desc_output_spec"] | |
cols = features + ["tags"] | |
try: | |
with zipfile.ZipFile(test_data_path, 'r') as zip_file: | |
# Verify zip contents | |
names = zip_file.namelist() | |
if not names: | |
raise ValueError("Empty zip archive - no files found") | |
# Process files with limit | |
for name in names[1:1+int(eval_limit)]: | |
try: | |
with zip_file.open(name) as f: | |
content = f.read() | |
d = json.loads(content) | |
# 4. Validate required fields | |
if not all(col in d for col in cols): | |
missing = [col for col in cols if col not in d] | |
raise KeyError(f"Missing required fields in {name}: {missing}") | |
row = [d[c] for c in cols] | |
data.append(row) | |
except json.JSONDecodeError as e: | |
raise ValueError(f"Invalid JSON in file {name}: {str(e)}") | |
except Exception as e: | |
raise RuntimeError(f"Error processing {name}: {str(e)}") | |
except zipfile.BadZipFile as e: | |
raise zipfile.BadZipFile(f"Corrupted zip file: {str(e)}") | |
except Exception as e: | |
raise RuntimeError(f"Unexpected error loading data: {str(e)}") | |
if not data: | |
raise ValueError("No valid data files found in zip archive") | |
return pd.DataFrame(data, columns=cols) | |
def _preprocessing(df): | |
mlb = MultiLabelBinarizer(classes = TAG_NAMES) | |
tags_to_encode = ['math', 'graphs', 'strings', 'number theory', 'trees', 'geometry', 'games', 'probabilities'] | |
# Filter tags and one-hot encode | |
df['tags_filtered'] = [[tag for tag in tags if tag in tags_to_encode] for tags in df["tags"]] | |
df.loc[df['tags_filtered'].apply(len) == 0, 'tags_filtered'] = df.loc[df['tags_filtered'].apply(len) == 0, 'tags_filtered'].apply(lambda x: ['other']) | |
encoded_tags = mlb.fit_transform(df['tags_filtered']) | |
# Create a new DataFrame with one-hot encoded columns | |
encoded_df = pd.DataFrame(encoded_tags, columns=mlb.classes_) | |
# Concatenate the encoded tags with the original DataFrame | |
df = pd.concat([df, encoded_df], axis=1) | |
texts = df["prob_desc_description"].values.tolist() | |
labels = df[TAG_NAMES].values.tolist() | |
# data: | |
# texts = ["text1", "text2", ...] # list of texts | |
# labels = [[0,1,0,0,1,0,1,1,0], [0,1,1,0,0,0,0,0,0],, ...] # list of labels | |
df = pd.DataFrame({'text':texts, 'labels': labels}) | |
return df | |
def evaluate_batch(file_path, hf_repo, backend="local", hf_token=None): | |
if backend == "local": | |
return _evaluate_local(file_path, hf_repo) | |
elif backend == "hf": | |
return _evaluate_hf_api(file_path, hf_token) | |
else: | |
raise ValueError(f"Unknown backend: {backend}") | |
def _evaluate_local(test_data_path, hf_repo): | |
global global_model, global_tokenizer | |
# Lazy-loading to avoid slow startup | |
if global_model is None: | |
from .model import QwenClassifier | |
from transformers import AutoTokenizer | |
global_model = QwenClassifier.from_pretrained(hf_repo).eval() | |
global_tokenizer = AutoTokenizer.from_pretrained(hf_repo) | |
df = _load_data(test_data_path) | |
df = _preprocessing(df) | |
hf_dataset = Dataset.from_pandas(df) | |
# Then apply tokenization | |
def tokenize_function(examples): | |
return global_tokenizer(examples["text"], padding="max_length", truncation=True, max_length=512) | |
dataset = hf_dataset.map(tokenize_function, batched=True) | |
dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels']) | |
dataloader = DataLoader(dataset, batch_size=16, shuffle=True) | |
global_model.eval() | |
all_preds = [] | |
all_labels = [] | |
with torch.no_grad(): | |
for batch in dataloader: | |
print(f"EVALUATION RUNNING ON {global_model.device}") | |
global_model = global_model.to(DEVICE) | |
batch = {k: v.to(DEVICE) for k, v in batch.items()} | |
labels = batch["labels"].type(torch.float32) | |
logits = global_model(batch["input_ids"], batch["attention_mask"]) | |
preds = torch.sigmoid(logits).cpu().numpy() > 0.5 | |
labels = labels.cpu().numpy() | |
all_preds.extend(preds) | |
all_labels.extend(labels) | |
val_acc = accuracy_score(all_labels, all_preds) | |
val_prec = precision_score(all_labels, all_preds, average='macro', zero_division=0) | |
val_rec = recall_score(all_labels, all_preds, average='macro') | |
val_f1 = f1_score(all_labels, all_preds, average='macro') | |
val_prec_per_class = precision_score(all_labels, all_preds, average=None, zero_division=0) | |
val_rec_per_class = recall_score(all_labels, all_preds, average=None) | |
val_f1_per_class = f1_score(all_labels, all_preds, average=None) | |
metrics = { | |
'Accuracy':int(100*val_acc), | |
'Precision':int(100*val_prec), | |
'Recall':int(100*val_rec), | |
'F1':int(100*val_f1), | |
'Precision_per_class':(100*val_prec_per_class).astype(int), | |
'Recall_per_class':(100*val_rec_per_class).astype(int), | |
'F1_per_class':(100*val_f1_per_class).astype(int), | |
} | |
# report = classification_report(all_labels, all_preds, target_names=TAG_NAMES, zero_division=0) | |
return metrics | |
def _evaluate_hf_api(file_path, hf_token=None): | |
try: | |
response = requests.post( | |
f"{SPACE_URL}/evaluate", | |
json={"file_path": file_path}, # This matches the Pydantic model | |
headers={ | |
"Authorization": f"Bearer {hf_token}", | |
"Content-Type": "application/json" | |
} if hf_token else {"Content-Type": "application/json"}, | |
timeout=180 | |
) | |
response.raise_for_status() # Raise HTTP errors | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
raise ValueError(f"API Error: {str(e)}\nResponse: {e.response.text if hasattr(e, 'response') else ''}") |