import os import numpy as np from sklearn.metrics import classification_report from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score from sklearn.preprocessing import MultiLabelBinarizer import zipfile import json import pandas as pd import torch from datasets import Dataset from torch.utils.data import DataLoader import requests from pathlib import Path from .config import TAG_NAMES, DEVICE, SPACE_URL, EVAL_LIMIT from .globals import global_model, global_tokenizer print(np.__version__) def _load_data(test_data_path): eval_limit = os.getenv("EVAL_LIM") if not eval_limit: eval_limit = EVAL_LIMIT test_data_path = Path(__file__).parent / test_data_path # Check file existence if not os.path.exists(test_data_path): raise FileNotFoundError( f"ZIP file not found at {test_data_path}. " f"Current directory: {os.listdir(Path(__file__).parent)}" ) if not zipfile.is_zipfile(test_data_path): raise zipfile.BadZipFile(f"File is not a valid zip archive: {test_data_path}") data = [] features = ["prob_desc_description", "prob_desc_input_spec", "prob_desc_output_spec"] cols = features + ["tags"] try: with zipfile.ZipFile(test_data_path, 'r') as zip_file: # Verify zip contents names = zip_file.namelist() if not names: raise ValueError("Empty zip archive - no files found") # Process files with limit for name in names[1:1+int(eval_limit)]: try: with zip_file.open(name) as f: content = f.read() d = json.loads(content) # 4. Validate required fields if not all(col in d for col in cols): missing = [col for col in cols if col not in d] raise KeyError(f"Missing required fields in {name}: {missing}") row = [d[c] for c in cols] data.append(row) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in file {name}: {str(e)}") except Exception as e: raise RuntimeError(f"Error processing {name}: {str(e)}") except zipfile.BadZipFile as e: raise zipfile.BadZipFile(f"Corrupted zip file: {str(e)}") except Exception as e: raise RuntimeError(f"Unexpected error loading data: {str(e)}") if not data: raise ValueError("No valid data files found in zip archive") return pd.DataFrame(data, columns=cols) def _preprocessing(df): mlb = MultiLabelBinarizer(classes = TAG_NAMES) tags_to_encode = ['math', 'graphs', 'strings', 'number theory', 'trees', 'geometry', 'games', 'probabilities'] # Filter tags and one-hot encode df['tags_filtered'] = [[tag for tag in tags if tag in tags_to_encode] for tags in df["tags"]] df.loc[df['tags_filtered'].apply(len) == 0, 'tags_filtered'] = df.loc[df['tags_filtered'].apply(len) == 0, 'tags_filtered'].apply(lambda x: ['other']) encoded_tags = mlb.fit_transform(df['tags_filtered']) # Create a new DataFrame with one-hot encoded columns encoded_df = pd.DataFrame(encoded_tags, columns=mlb.classes_) # Concatenate the encoded tags with the original DataFrame df = pd.concat([df, encoded_df], axis=1) texts = df["prob_desc_description"].values.tolist() labels = df[TAG_NAMES].values.tolist() # data: # texts = ["text1", "text2", ...] # list of texts # labels = [[0,1,0,0,1,0,1,1,0], [0,1,1,0,0,0,0,0,0],, ...] # list of labels df = pd.DataFrame({'text':texts, 'labels': labels}) return df def evaluate_batch(file_path, hf_repo, backend="local", hf_token=None): if backend == "local": return _evaluate_local(file_path, hf_repo) elif backend == "hf": return _evaluate_hf_api(file_path, hf_token) else: raise ValueError(f"Unknown backend: {backend}") def _evaluate_local(test_data_path, hf_repo): global global_model, global_tokenizer # Lazy-loading to avoid slow startup if global_model is None: from .model import QwenClassifier from transformers import AutoTokenizer global_model = QwenClassifier.from_pretrained(hf_repo).eval() global_tokenizer = AutoTokenizer.from_pretrained(hf_repo) df = _load_data(test_data_path) df = _preprocessing(df) hf_dataset = Dataset.from_pandas(df) # Then apply tokenization def tokenize_function(examples): return global_tokenizer(examples["text"], padding="max_length", truncation=True, max_length=512) dataset = hf_dataset.map(tokenize_function, batched=True) dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels']) dataloader = DataLoader(dataset, batch_size=16, shuffle=True) global_model.eval() all_preds = [] all_labels = [] with torch.no_grad(): for batch in dataloader: print(f"EVALUATION RUNNING ON {global_model.device}") global_model = global_model.to(DEVICE) batch = {k: v.to(DEVICE) for k, v in batch.items()} labels = batch["labels"].type(torch.float32) logits = global_model(batch["input_ids"], batch["attention_mask"]) preds = torch.sigmoid(logits).cpu().numpy() > 0.5 labels = labels.cpu().numpy() all_preds.extend(preds) all_labels.extend(labels) val_acc = accuracy_score(all_labels, all_preds) val_prec = precision_score(all_labels, all_preds, average='macro', zero_division=0) val_rec = recall_score(all_labels, all_preds, average='macro') val_f1 = f1_score(all_labels, all_preds, average='macro') val_prec_per_class = precision_score(all_labels, all_preds, average=None, zero_division=0) val_rec_per_class = recall_score(all_labels, all_preds, average=None) val_f1_per_class = f1_score(all_labels, all_preds, average=None) metrics = { 'Accuracy':int(100*val_acc), 'Precision':int(100*val_prec), 'Recall':int(100*val_rec), 'F1':int(100*val_f1), 'Precision_per_class':(100*val_prec_per_class).astype(int), 'Recall_per_class':(100*val_rec_per_class).astype(int), 'F1_per_class':(100*val_f1_per_class).astype(int), } # report = classification_report(all_labels, all_preds, target_names=TAG_NAMES, zero_division=0) return metrics def _evaluate_hf_api(file_path, hf_token=None): try: response = requests.post( f"{SPACE_URL}/evaluate", json={"file_path": file_path}, # This matches the Pydantic model headers={ "Authorization": f"Bearer {hf_token}", "Content-Type": "application/json" } if hf_token else {"Content-Type": "application/json"}, timeout=180 ) response.raise_for_status() # Raise HTTP errors return response.json() except requests.exceptions.RequestException as e: raise ValueError(f"API Error: {str(e)}\nResponse: {e.response.text if hasattr(e, 'response') else ''}")