import sys import torch import torch.nn.functional as F import numpy as np from collections import defaultdict np.set_printoptions(precision=4) from scipy.stats import rankdata """Information Retrieval metrics Useful Resources: http://www.cs.utexas.edu/~mooney/ir-course/slides/Evaluation.ppt http://www.nii.ac.jp/TechReports/05-014E.pdf http://www.stanford.edu/class/cs276/handouts/EvaluationNew-handout-6-per.pdf http://hal.archives-ouvertes.fr/docs/00/72/67/60/PDF/07-busa-fekete.pdf Learning to Rank for Information Retrieval (Tie-Yan Liu) """ def mean_reciprocal_rank(rs): """Score is reciprocal of the rank of the first relevant item First element is 'rank 1'. Relevance is binary (nonzero is relevant). Example from http://en.wikipedia.org/wiki/Mean_reciprocal_rank >>> rs = [[0, 0, 1], [0, 1, 0], [1, 0, 0]] >>> mean_reciprocal_rank(rs) 0.61111111111111105 >>> rs = np.array([[0, 0, 0], [0, 1, 0], [1, 0, 0]]) >>> mean_reciprocal_rank(rs) 0.5 >>> rs = [[0, 0, 0, 1], [1, 0, 0], [1, 0, 0]] >>> mean_reciprocal_rank(rs) 0.75 Args: rs: Iterator of relevance scores (list or numpy) in rank order (first element is the first item) Returns: Mean reciprocal rank """ rs = (np.asarray(r).nonzero()[0] for r in rs) return np.mean([1. / (r[0] + 1) if r.size else 0. for r in rs]) def r_precision(r): """Score is precision after all relevant documents have been retrieved Relevance is binary (nonzero is relevant). >>> r = [0, 0, 1] >>> r_precision(r) 0.33333333333333331 >>> r = [0, 1, 0] >>> r_precision(r) 0.5 >>> r = [1, 0, 0] >>> r_precision(r) 1.0 Args: r: Relevance scores (list or numpy) in rank order (first element is the first item) Returns: R Precision """ r = np.asarray(r) != 0 z = r.nonzero()[0] if not z.size: return 0. return np.mean(r[:z[-1] + 1]) def precision_at_k(r, k): """Score is precision @ k Relevance is binary (nonzero is relevant). >>> r = [0, 0, 1] >>> precision_at_k(r, 1) 0.0 >>> precision_at_k(r, 2) 0.0 >>> precision_at_k(r, 3) 0.33333333333333331 >>> precision_at_k(r, 4) Traceback (most recent call last): File "", line 1, in ? ValueError: Relevance score length < k Args: r: Relevance scores (list or numpy) in rank order (first element is the first item) Returns: Precision @ k Raises: ValueError: len(r) must be >= k """ assert k >= 1 r = np.asarray(r)[:k] != 0 if r.size != k: raise ValueError('Relevance score length < k') return np.mean(r) def average_precision(r): """Score is average precision (area under PR curve) Relevance is binary (nonzero is relevant). >>> r = [1, 1, 0, 1, 0, 1, 0, 0, 0, 1] >>> delta_r = 1. / sum(r) >>> sum([sum(r[:x + 1]) / (x + 1.) * delta_r for x, y in enumerate(r) if y]) 0.7833333333333333 >>> average_precision(r) 0.78333333333333333 Args: r: Relevance scores (list or numpy) in rank order (first element is the first item) Returns: Average precision """ r = np.asarray(r) != 0 out = [precision_at_k(r, k + 1) for k in range(r.size) if r[k]] if not out: return 0. return np.mean(out) def mean_average_precision(rs): """Score is mean average precision Relevance is binary (nonzero is relevant). >>> rs = [[1, 1, 0, 1, 0, 1, 0, 0, 0, 1]] >>> mean_average_precision(rs) 0.78333333333333333 >>> rs = [[1, 1, 0, 1, 0, 1, 0, 0, 0, 1], [0]] >>> mean_average_precision(rs) 0.39166666666666666 Args: rs: Iterator of relevance scores (list or numpy) in rank order (first element is the first item) Returns: Mean average precision """ return np.mean([average_precision(r) for r in rs]) def dcg_at_k(r, k, method=0): """Score is discounted cumulative gain (dcg) Relevance is positive real values. Can use binary as the previous methods. Example from http://www.stanford.edu/class/cs276/handouts/EvaluationNew-handout-6-per.pdf >>> r = [3, 2, 3, 0, 0, 1, 2, 2, 3, 0] >>> dcg_at_k(r, 1) 3.0 >>> dcg_at_k(r, 1, method=1) 3.0 >>> dcg_at_k(r, 2) 5.0 >>> dcg_at_k(r, 2, method=1) 4.2618595071429155 >>> dcg_at_k(r, 10) 9.6051177391888114 >>> dcg_at_k(r, 11) 9.6051177391888114 Args: r: Relevance scores (list or numpy) in rank order (first element is the first item) k: Number of results to consider method: If 0 then weights are [1.0, 1.0, 0.6309, 0.5, 0.4307, ...] If 1 then weights are [1.0, 0.6309, 0.5, 0.4307, ...] Returns: Discounted cumulative gain """ r = np.asfarray(r)[:k] if r.size: if method == 0: return r[0] + np.sum(r[1:] / np.log2(np.arange(2, r.size + 1))) elif method == 1: return np.sum(r / np.log2(np.arange(2, r.size + 2))) else: raise ValueError('method must be 0 or 1.') return 0. def ndcg_at_k(r, k, method=0): """Score is normalized discounted cumulative gain (ndcg) Relevance is positive real values. Can use binary as the previous methods. Example from http://www.stanford.edu/class/cs276/handouts/EvaluationNew-handout-6-per.pdf >>> r = [3, 2, 3, 0, 0, 1, 2, 2, 3, 0] >>> ndcg_at_k(r, 1) 1.0 >>> r = [2, 1, 2, 0] >>> ndcg_at_k(r, 4) 0.9203032077642922 >>> ndcg_at_k(r, 4, method=1) 0.96519546960144276 >>> ndcg_at_k([0], 1) 0.0 >>> ndcg_at_k([1], 2) 1.0 Args: r: Relevance scores (list or numpy) in rank order (first element is the first item) k: Number of results to consider method: If 0 then weights are [1.0, 1.0, 0.6309, 0.5, 0.4307, ...] If 1 then weights are [1.0, 0.6309, 0.5, 0.4307, ...] Returns: Normalized discounted cumulative gain """ dcg_max = dcg_at_k(sorted(r, reverse=True), k, method) if not dcg_max: return 0. return dcg_at_k(r, k, method) / dcg_max """ Wealth inequality """ def gini(arr): ## Gini = \frac{2\sum_i^n i\times y_i}{n\sum_i^n y_i} - \frac{n+1}{n} sorted_arr = arr.copy() sorted_arr.sort() n = arr.size coef_ = 2. / n const_ = (n + 1.) / n weighted_sum = sum([(i + 1) * yi for i, yi in enumerate(sorted_arr)]) return coef_ * weighted_sum / (sorted_arr.sum()) - const_ """ Expected envy and inferiority under probabilistic recommendation as weighted sampling with replacement """ def expected_utility_u(Ru, ps, k): return Ru @ ps * k def expected_utility(R, Pi, k): U = (R * Pi * k).sum(axis=1) # if not agg: return U def expected_envy_u_v(Ru, pus, pvs, k): return Ru @ (pvs - pus) * k def prob_in(ps, k): return 1 - (1 - ps) ** k def prob_in_approx(ps, k): return k * ps def expected_inferiority_u_v(Ru, Rv, pus, pvs, k, compensate=False, approx=False): differ = Rv - Ru if not compensate: differ = np.clip(differ, a_min=0, a_max=None) if not approx: return differ @ (prob_in(pus, k) * prob_in(pvs, k)) else: return differ @ (prob_in_approx(pus, k) * prob_in_approx(pvs, k)) def expected_envy(R, Pi, k): """ Measure expected envy for k-sized recommendation according to rec strategy Pi with respect to relevancy scores R :param R: m x n real-valued matrix :param Pi: m x n Markov matrix :return: E: m x n envy matrix where Euv = envy from u to v if not agg, sum of E if agg """ assert np.all(np.isclose(Pi.sum(axis=1), 1.)) or np.array_equal(Pi, Pi.astype(bool)) # binary matrix for discrete rec m, n = len(R), len(R[0]) E = np.zeros((m, m)) for u in range(m): for v in range(m): if v == u: continue E[u, v] = expected_envy_u_v(R[u], Pi[u], Pi[v], k=k) E = np.clip(E, a_min=0., a_max=None) # if not agg: return E def expected_inferiority(R, Pi, k, compensate=True, approx=False): """ Measure expected inferiority for k-sized recommendation according to rec strategy Pi with respect to relevancy scores R :param R: :param Pi: :param k: :param agg: :return: I: m x n """ assert np.all(np.isclose(Pi.sum(axis=1), 1.)) or np.array_equal(Pi, Pi.astype(bool)) # binary matrix for discrete rec m, n = len(R), len(R[0]) I = np.zeros((m, m)) for u in range(m): for v in range(m): if v == u: continue I[u, v] = expected_inferiority_u_v(R[u], R[v], Pi[u], Pi[v], k=k, approx=approx, compensate=compensate) I = np.clip(I, a_min=0., a_max=None) # if not agg: return I def expected_envy_torch(R, Pi, k): m, n = len(R), len(R[0]) E = torch.zeros(m, m) for u in range(m): for v in range(m): if v == u: continue E[u, v] = expected_envy_u_v(R[u], Pi[u], Pi[v], k=k) E = torch.clamp(E, min=0.) return E def expected_envy_torch_vec(R, P, k): res = R @ P.transpose(0, 1) envy_mat = (res - torch.diagonal(res, 0).reshape(-1, 1)) return k * (torch.clamp(envy_mat, min=0.)) def expected_inferiority_torch(R, Pi, k, compensate=False, approx=False): m, n = R.shape I = torch.zeros((m, m)) for u in range(m): for v in range(m): if v == u: continue if not approx: joint_prob = prob_in(Pi[v], k) * prob_in(Pi[u], k) else: joint_prob = prob_in_approx(Pi[v], k) * prob_in_approx(Pi[u], k) if not compensate: I[u, v] = torch.clamp(R[v] - R[u], min=0., max=None) @ joint_prob else: I[u, v] = (R[v] - R[u]) @ joint_prob return torch.clamp(I, min=0.) def expected_inferiority_torch_vec(R, P, k, compensate=False, approx=False): m, n = R.shape I = torch.zeros((m, m)) P_pow_k = 1 - (1 - P).pow(k) if not approx else P * k for i in range(m): first_term = torch.clamp(R - R[i], min=0.) if not compensate else R - R[i] I[i] = (first_term * (P_pow_k[i] * P_pow_k)).sum(1) return I def slow_onehot(idx, P): m = P.shape[0] res = torch.zeros_like(P) for i in range(m): res[i, idx[i]] = 1. return res def eiu_cut_off(R, Pi, k, agg=True): """ Evaluate envy, inferiority, utility based on top-k cut-off recommendation :param R: :param Pi: :return: envy, inferiority, utility """ # print('Start evaluation!') m, n = R.shape # _, rec = torch.topk(Pi, k, dim=1) # rec_onehot = F.one_hot(rec, num_classes=n).sum(1).float() rec_onehot = slow_onehot(torch.topk(Pi, k, dim=1)[1], Pi) envy = expected_envy_torch_vec(R, rec_onehot, k=1) inferiority = expected_inferiority_torch_vec(R, rec_onehot, k=1, compensate=False, approx=False) utility = expected_utility(R, rec_onehot, k=1) if agg: envy = envy.sum(-1).mean() inferiority = inferiority.sum(-1).mean() utility = utility.mean() return envy, inferiority, utility def eiu_cut_off2(R, Pi, k, agg=True): """ Evaluate envy, inferiority, utility based on top-k cut-off recommendation :param R: :param Pi: :return: envy, inferiority, utility """ # print('Start evaluation!') S, U = R if not isinstance(S, torch.Tensor): S = torch.tensor(S) if not isinstance(U, torch.Tensor): U = torch.tensor(U) if not isinstance(Pi, torch.Tensor): Pi = torch.tensor(Pi) m, n = U.shape # _, rec = torch.topk(Pi, k, dim=1) # rec_onehot = F.one_hot(rec, num_classes=n).sum(1).float() rec_onehot = slow_onehot(torch.topk(Pi, k, dim=1)[1], Pi) envy = expected_envy_torch_vec(U, rec_onehot, k=1) inferiority = expected_inferiority_torch_vec(S, rec_onehot, k=1, compensate=False, approx=False) utility = expected_utility(U, rec_onehot, k=1) if agg: envy = envy.sum(-1).mean() inferiority = inferiority.sum(-1).mean() utility = utility.mean() return envy, inferiority, utility """ Global congestion metrics """ def get_competitors(rec_per_job, rec): m = rec.shape[0] competitors = [] for i in range(m): if len(rec[i]) == 1: competitors.append([rec_per_job[rec[i]]]) else: competitors.append(rec_per_job[rec[i]]) return np.array(competitors) def get_better_competitor_scores(rec, R): m, n = R.shape _, k = rec.shape user_ids_per_job = defaultdict(list) for i, r in enumerate(rec): for j in r: user_ids_per_job[j.item()].append(i) mean_competitor_scores_per_job = np.zeros((m, k)) for i in range(m): my_rec_jobs = rec[i].numpy() my_mean_competitors = np.zeros(k) for j_, j in enumerate(my_rec_jobs): my_score = R[i, j] all_ids = user_ids_per_job[j].copy() all_ids.remove(i) other_scores = R[all_ids, j] if not all_ids: other_scores = np.zeros(1) # TODO if no competition, then it is the negative of my own score my_mean_competitors[j_] = other_scores.mean() - my_score # my_mean_competitors[my_mean_competitors < 0] = 0. # TODO only keep the better competitors mean_competitor_scores_per_job[i] = my_mean_competitors return mean_competitor_scores_per_job def get_num_better_competitors(rec, R): m, n = R.shape _, k = rec.shape user_ids_per_job = defaultdict(list) for i, r in enumerate(rec): for j in r: user_ids_per_job[j.item()].append(i) num_better_competitors = np.zeros((m, k)) for i in range(m): my_rec_jobs = rec[i].numpy() better_competitors = np.zeros(k) for j_, j in enumerate(my_rec_jobs): my_score = R[i, j] all_ids = user_ids_per_job[j].copy() all_ids.remove(i) other_scores = R[all_ids, j] better_competitors[j_] = ((other_scores - my_score) > 0).sum() num_better_competitors[i] = better_competitors return num_better_competitors def get_scores_ids_per_job(rec, R): scores_per_job = defaultdict(list) ids_per_job = defaultdict(list) for i in range(len(rec)): u = rec[i] for jb in u: jb = jb.item() ids_per_job[jb].append(i) scores_per_job[jb].append(R[i, jb].item()) return scores_per_job, ids_per_job def get_rank(a, method='ordinal', axis=None, descending=False): if descending: a = np.array(a) * -1 return stats.rankdata(a, method=method, axis=axis) def get_ranks_per_job(scores_rec): ranks_per_job = defaultdict(list) for jb in scores_rec: ranks_per_job[jb] = get_rank(scores_rec[jb], descending=True) return ranks_per_job def get_ranks_per_user(ranks_per_job, ids_per_job): for k, v in ranks_per_job.items(): ranks_per_job[k] = [i - 1 for i in v] ranks_per_user = defaultdict(list) for k, v in ids_per_job.items(): rks = ranks_per_job[k] for i, u in enumerate(v): ranks_per_user[u].append(rks[i]) return ranks_per_user def calculate_global_metrics(res, R, k=10): # get rec m, n = res.shape if not torch.is_tensor(res): res = torch.from_numpy(res) _, rec = torch.topk(res, k, dim=1) rec_onehot = slow_onehot(rec, res) # rec_onehot = F.one_hot(rec, num_classes=n).sum(1).float() try: rec_per_job = rec_onehot.sum(axis=0).numpy() except: rec_per_job = rec_onehot.sum(axis=0).cpu().numpy() rec = rec.cpu() R = R.cpu() opt_competitors = get_competitors(rec_per_job, rec) # mean competitors per person mean_competitors = opt_competitors.mean() # mean better competitors per person mean_better_competitors = get_num_better_competitors(rec, R).mean() # mean competitor scores - my score mean_diff_scores = get_better_competitor_scores(rec, R) mean_diff_scores[mean_diff_scores < 0] = 0. mean_diff_scores = mean_diff_scores.mean() # mean rank # scores_opt, ids_opt = get_scores_ids_per_job(rec, R) # ranks_opt = get_ranks_per_job(scores_opt) # ranks_per_user_opt = get_ranks_per_user(ranks_opt, ids_opt) # mean_rank = np.array(list(ranks_per_user_opt.values())).mean() # gini gini_index = gini(rec_per_job) return {'mean_competitors': mean_competitors, 'mean_better_competitors': mean_better_competitors, \ 'mean_scores_diff': mean_diff_scores, 'mean_rank': mean_better_competitors, 'gini_index': gini_index} def calculate_global_metrics2(res, R, k=10): # get rec S, U = R m, n = res.shape if not torch.is_tensor(res): res = torch.from_numpy(res) _, rec = torch.topk(res, k, dim=1) rec_onehot = slow_onehot(rec, res) # rec_onehot = F.one_hot(rec, num_classes=n).sum(1).float() try: rec_per_job = rec_onehot.sum(axis=0).numpy() except: rec_per_job = rec_onehot.sum(axis=0).cpu().numpy() rec = rec.cpu() S = S.cpu() U = U.cpu() opt_competitors = get_competitors(rec_per_job, rec) # mean competitors per person mean_competitors = opt_competitors.mean() # mean better competitors per person mean_better_competitors = get_num_better_competitors(rec, S).mean() # mean competitor scores - my score mean_diff_scores = get_better_competitor_scores(rec, S) mean_diff_scores[mean_diff_scores < 0] = 0. mean_diff_scores = mean_diff_scores.mean() # mean rank scores_opt, ids_opt = get_scores_ids_per_job(rec, S) ranks_opt = get_ranks_per_job(scores_opt) ranks_per_user_opt = get_ranks_per_user(ranks_opt, ids_opt) mean_rank = np.array(list(ranks_per_user_opt.values())).mean() # gini gini_index = gini(rec_per_job) return {'mean_competitors': mean_competitors, 'mean_better_competitors': mean_better_competitors, \ 'mean_scores_diff': mean_diff_scores, 'mean_rank': mean_rank, 'gini_index': gini_index} def get_scores_per_job(rec, S): scores_per_job = defaultdict(list) for i in range(len(rec)): u = rec[i] for jb in u: jb = jb.item() scores_per_job[jb].append(S[i, jb].item()) return scores_per_job