|
import argparse |
|
import json |
|
import os |
|
import pickle |
|
from concurrent.futures import ProcessPoolExecutor, as_completed |
|
from copy import deepcopy |
|
from typing import Any, Dict, List, Optional, Tuple |
|
|
|
from rich.progress import track |
|
|
|
from evalplus.data import write_jsonl |
|
from tools.tsr.coverage_init import collect_coverage_info |
|
from tools.tsr.mutation_init import collect_mutation_info |
|
from tools.tsr.sample_init import collect_sample_info |
|
from tools.tsr.utils import get_problems, get_task_ids, to_path |
|
|
|
|
|
def global_util_init(dataset: str): |
|
global problems |
|
global task_ids |
|
global problem_count |
|
problems = get_problems(dataset) |
|
task_ids = get_task_ids(dataset) |
|
problem_count = len(problems) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def merge_set_cover(*args) -> Dict[str, List[str]]: |
|
merged_set_cover = {task_id: [] for task_id in task_ids} |
|
for set_cover_dict in args: |
|
for task_id, plus_tests in set_cover_dict.items(): |
|
for plus_test in plus_tests: |
|
if plus_test not in merged_set_cover[task_id]: |
|
merged_set_cover[task_id].append(plus_test) |
|
return merged_set_cover |
|
|
|
|
|
def greedy_cover( |
|
task_id: str, tests: Dict[str, List[Any]], exclude_model: str |
|
) -> Tuple[str, List[str]]: |
|
q, U = [], set() |
|
for test_name, test_cover in tests.items(): |
|
cover_set = set() |
|
for model_path, i_code in test_cover: |
|
if exclude_model not in model_path: |
|
cover_set.add((model_path, i_code)) |
|
q.append((test_name, cover_set)) |
|
U = U.union(cover_set) |
|
|
|
min_cover = [] |
|
while len(U) > 0: |
|
max_uncover_set, max_test_name = {}, "" |
|
for test_name, cover_set in q: |
|
if len(cover_set) > len(max_uncover_set): |
|
max_uncover_set = cover_set |
|
max_test_name = test_name |
|
min_cover.append(max_test_name) |
|
U = U - max_uncover_set |
|
qq = [] |
|
for test_name, cover_set in q: |
|
new_cover_set = U.intersection(cover_set) |
|
if len(new_cover_set) != 0: |
|
qq.append((test_name, new_cover_set)) |
|
q = qq |
|
return task_id, min_cover |
|
|
|
|
|
def parallel_greedy_cover( |
|
info_dict: Optional[Dict[str, Dict[str, List[Any]]]], |
|
exclude_model: str, |
|
type: str, |
|
**kwargs, |
|
) -> Dict[str, List[str]]: |
|
plus_tests = {task_id: [] for task_id in task_ids} |
|
|
|
with ProcessPoolExecutor(max_workers=32) as executor: |
|
futures = [] |
|
for task_id in task_ids: |
|
if type == "sample": |
|
path_task_id = to_path(task_id) |
|
sample_dir = kwargs["sample_dir"] |
|
with open(os.path.join(sample_dir, f"{path_task_id}.pkl"), "rb") as f: |
|
td = pickle.load(f) |
|
args = (task_id, td, exclude_model) |
|
else: |
|
args = (task_id, info_dict[task_id], exclude_model) |
|
futures.append(executor.submit(greedy_cover, *args)) |
|
for future in track(as_completed(futures), f"min set cover :: {type}"): |
|
task_id, min_cover = future.result() |
|
plus_tests[task_id] = min_cover |
|
|
|
return plus_tests |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_coverage_set_cover( |
|
coverage_dir: str, exclude_model: str, dataset: str |
|
) -> Dict[str, List[str]]: |
|
coverage_info_dict = collect_coverage_info(coverage_dir, dataset) |
|
return parallel_greedy_cover(coverage_info_dict, exclude_model, "coverage") |
|
|
|
|
|
def get_mutation_set_cover( |
|
mutation_dir: str, exclude_model: str, dataset: str |
|
) -> Dict[str, List[str]]: |
|
mutation_info_dict = collect_mutation_info( |
|
os.path.join(mutation_dir, "eval_results.json"), dataset |
|
) |
|
return parallel_greedy_cover(mutation_info_dict, exclude_model, "mutation") |
|
|
|
|
|
def get_sample_set_cover( |
|
sample_dir: str, sample_eval_dir: str, exclude_model: str, dataset: str |
|
) -> Dict[str, List[str]]: |
|
collect_sample_info(sample_dir, sample_eval_dir, dataset) |
|
return parallel_greedy_cover(None, exclude_model, "sample", sample_dir=sample_dir) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compute_avg_test(set_cover_info: Dict[str, List[str]]) -> float: |
|
sum_tests = sum( |
|
len(problems[task_id]["base_input"]) + len(set_cover_info[task_id]) |
|
for task_id in task_ids |
|
) |
|
return sum_tests / problem_count |
|
|
|
|
|
def gen_report(set_cover_info: Dict[str, List[str]], sample_eval_dir: str, model: str): |
|
tsr_dict = {"ntests": compute_avg_test(set_cover_info), "pass@1": 0} |
|
model_path = os.path.join(sample_eval_dir, f"{model}_temp_0.0", "eval_results.json") |
|
with open(model_path, "r") as f: |
|
mdict = json.load(f) |
|
correct_cnt = 0 |
|
for task_id in task_ids: |
|
legacy_task_id = task_id |
|
if legacy_task_id not in mdict["eval"]: |
|
legacy_task_id = legacy_task_id.replace("/", "_") |
|
if mdict["eval"][legacy_task_id]["base"][0][0] != "success": |
|
continue |
|
correct = True |
|
for plus_id in set_cover_info[task_id]: |
|
index = int(plus_id.split("_")[-1]) |
|
if mdict["eval"][legacy_task_id]["plus"][0][1][index] == False: |
|
correct = False |
|
break |
|
if correct: |
|
correct_cnt += 1 |
|
tsr_dict["pass@1"] = correct_cnt / problem_count |
|
return tsr_dict |
|
|
|
|
|
def dump_humaneval_plus_mini(set_cover_info: Dict[str, List[str]], mini_path: str): |
|
new_problems = [] |
|
for task_id in task_ids: |
|
otask = problems[task_id] |
|
task = { |
|
"task_id": task_id, |
|
"prompt": otask["prompt"], |
|
"contract": otask["contract"], |
|
"canonical_solution": otask["canonical_solution"], |
|
"entry_point": otask["entry_point"], |
|
"base_input": otask["base_input"], |
|
"plus_input": [], |
|
"atol": otask["atol"], |
|
} |
|
for plus_test in set_cover_info[task_id]: |
|
index = int(plus_test.split("_")[-1]) |
|
task["plus_input"].append(otask["plus_input"][index]) |
|
new_problems.append(deepcopy(task)) |
|
write_jsonl(os.path.join(mini_path, "HumanEvalPlus-Mini.jsonl"), new_problems) |
|
|
|
|
|
def main(flags): |
|
coverage_dir = os.path.join(flags.report_dir, "coverage_cache") |
|
mutation_dir = os.path.join(flags.report_dir, "mutation_cache") |
|
sample_dir = os.path.join(flags.report_dir, "sample_cache") |
|
os.makedirs(flags.report_dir, exist_ok=True) |
|
|
|
exclude_model: str = flags.model |
|
if exclude_model.endswith("b"): |
|
exclude_model = "".join(exclude_model.split("-")[:-1]) |
|
|
|
coverage_set_cover = get_coverage_set_cover( |
|
coverage_dir, exclude_model, flags.dataset |
|
) |
|
mutation_set_cover = get_mutation_set_cover( |
|
mutation_dir, exclude_model, flags.dataset |
|
) |
|
sample_set_cover = get_sample_set_cover( |
|
sample_dir, flags.sample_eval_dir, exclude_model, flags.dataset |
|
) |
|
merged_set_cover = merge_set_cover( |
|
coverage_set_cover, mutation_set_cover, sample_set_cover |
|
) |
|
|
|
if flags.model != "ALL": |
|
final_report = dict() |
|
|
|
final_report["coverage"] = gen_report( |
|
coverage_set_cover, flags.sample_eval_dir, flags.model |
|
) |
|
|
|
final_report["mutation"] = gen_report( |
|
mutation_set_cover, flags.sample_eval_dir, flags.model |
|
) |
|
|
|
final_report["sample"] = gen_report( |
|
sample_set_cover, flags.sample_eval_dir, flags.model |
|
) |
|
|
|
final_report["full"] = gen_report( |
|
merged_set_cover, flags.sample_eval_dir, flags.model |
|
) |
|
with open( |
|
os.path.join(flags.report_dir, f"report_{flags.model}.json"), "w" |
|
) as f: |
|
json.dump(final_report, f, indent=4) |
|
else: |
|
dump_humaneval_plus_mini(merged_set_cover, flags.mini_path) |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("--model", required=True, type=str, help="Model for testing") |
|
parser.add_argument("--dataset", type=str, choices=["humaneval", "mbpp"]) |
|
parser.add_argument( |
|
"--report_dir", type=str, help="Path to JSON report and cache files" |
|
) |
|
parser.add_argument( |
|
"--sample_eval_dir", type=str, help="Path to sample evaluation files" |
|
) |
|
parser.add_argument("--mini_path", type=str, help="Path to Mini Dataset") |
|
args = parser.parse_args() |
|
|
|
global_util_init(args.dataset) |
|
main(args) |
|
|