FEIR-viz-tool / utils /metrics.py
NanLi2021's picture
init
c3279e7
import sys
import torch
import torch.nn.functional as F
import numpy as np
from collections import defaultdict
np.set_printoptions(precision=4)
from scipy.stats import rankdata
"""Information Retrieval metrics
Useful Resources:
http://www.cs.utexas.edu/~mooney/ir-course/slides/Evaluation.ppt
http://www.nii.ac.jp/TechReports/05-014E.pdf
http://www.stanford.edu/class/cs276/handouts/EvaluationNew-handout-6-per.pdf
http://hal.archives-ouvertes.fr/docs/00/72/67/60/PDF/07-busa-fekete.pdf
Learning to Rank for Information Retrieval (Tie-Yan Liu)
"""
def mean_reciprocal_rank(rs):
"""Score is reciprocal of the rank of the first relevant item
First element is 'rank 1'. Relevance is binary (nonzero is relevant).
Example from http://en.wikipedia.org/wiki/Mean_reciprocal_rank
>>> rs = [[0, 0, 1], [0, 1, 0], [1, 0, 0]]
>>> mean_reciprocal_rank(rs)
0.61111111111111105
>>> rs = np.array([[0, 0, 0], [0, 1, 0], [1, 0, 0]])
>>> mean_reciprocal_rank(rs)
0.5
>>> rs = [[0, 0, 0, 1], [1, 0, 0], [1, 0, 0]]
>>> mean_reciprocal_rank(rs)
0.75
Args:
rs: Iterator of relevance scores (list or numpy) in rank order
(first element is the first item)
Returns:
Mean reciprocal rank
"""
rs = (np.asarray(r).nonzero()[0] for r in rs)
return np.mean([1. / (r[0] + 1) if r.size else 0. for r in rs])
def r_precision(r):
"""Score is precision after all relevant documents have been retrieved
Relevance is binary (nonzero is relevant).
>>> r = [0, 0, 1]
>>> r_precision(r)
0.33333333333333331
>>> r = [0, 1, 0]
>>> r_precision(r)
0.5
>>> r = [1, 0, 0]
>>> r_precision(r)
1.0
Args:
r: Relevance scores (list or numpy) in rank order
(first element is the first item)
Returns:
R Precision
"""
r = np.asarray(r) != 0
z = r.nonzero()[0]
if not z.size:
return 0.
return np.mean(r[:z[-1] + 1])
def precision_at_k(r, k):
"""Score is precision @ k
Relevance is binary (nonzero is relevant).
>>> r = [0, 0, 1]
>>> precision_at_k(r, 1)
0.0
>>> precision_at_k(r, 2)
0.0
>>> precision_at_k(r, 3)
0.33333333333333331
>>> precision_at_k(r, 4)
Traceback (most recent call last):
File "<stdin>", line 1, in ?
ValueError: Relevance score length < k
Args:
r: Relevance scores (list or numpy) in rank order
(first element is the first item)
Returns:
Precision @ k
Raises:
ValueError: len(r) must be >= k
"""
assert k >= 1
r = np.asarray(r)[:k] != 0
if r.size != k:
raise ValueError('Relevance score length < k')
return np.mean(r)
def average_precision(r):
"""Score is average precision (area under PR curve)
Relevance is binary (nonzero is relevant).
>>> r = [1, 1, 0, 1, 0, 1, 0, 0, 0, 1]
>>> delta_r = 1. / sum(r)
>>> sum([sum(r[:x + 1]) / (x + 1.) * delta_r for x, y in enumerate(r) if y])
0.7833333333333333
>>> average_precision(r)
0.78333333333333333
Args:
r: Relevance scores (list or numpy) in rank order
(first element is the first item)
Returns:
Average precision
"""
r = np.asarray(r) != 0
out = [precision_at_k(r, k + 1) for k in range(r.size) if r[k]]
if not out:
return 0.
return np.mean(out)
def mean_average_precision(rs):
"""Score is mean average precision
Relevance is binary (nonzero is relevant).
>>> rs = [[1, 1, 0, 1, 0, 1, 0, 0, 0, 1]]
>>> mean_average_precision(rs)
0.78333333333333333
>>> rs = [[1, 1, 0, 1, 0, 1, 0, 0, 0, 1], [0]]
>>> mean_average_precision(rs)
0.39166666666666666
Args:
rs: Iterator of relevance scores (list or numpy) in rank order
(first element is the first item)
Returns:
Mean average precision
"""
return np.mean([average_precision(r) for r in rs])
def dcg_at_k(r, k, method=0):
"""Score is discounted cumulative gain (dcg)
Relevance is positive real values. Can use binary
as the previous methods.
Example from
http://www.stanford.edu/class/cs276/handouts/EvaluationNew-handout-6-per.pdf
>>> r = [3, 2, 3, 0, 0, 1, 2, 2, 3, 0]
>>> dcg_at_k(r, 1)
3.0
>>> dcg_at_k(r, 1, method=1)
3.0
>>> dcg_at_k(r, 2)
5.0
>>> dcg_at_k(r, 2, method=1)
4.2618595071429155
>>> dcg_at_k(r, 10)
9.6051177391888114
>>> dcg_at_k(r, 11)
9.6051177391888114
Args:
r: Relevance scores (list or numpy) in rank order
(first element is the first item)
k: Number of results to consider
method: If 0 then weights are [1.0, 1.0, 0.6309, 0.5, 0.4307, ...]
If 1 then weights are [1.0, 0.6309, 0.5, 0.4307, ...]
Returns:
Discounted cumulative gain
"""
r = np.asfarray(r)[:k]
if r.size:
if method == 0:
return r[0] + np.sum(r[1:] / np.log2(np.arange(2, r.size + 1)))
elif method == 1:
return np.sum(r / np.log2(np.arange(2, r.size + 2)))
else:
raise ValueError('method must be 0 or 1.')
return 0.
def ndcg_at_k(r, k, method=0):
"""Score is normalized discounted cumulative gain (ndcg)
Relevance is positive real values. Can use binary
as the previous methods.
Example from
http://www.stanford.edu/class/cs276/handouts/EvaluationNew-handout-6-per.pdf
>>> r = [3, 2, 3, 0, 0, 1, 2, 2, 3, 0]
>>> ndcg_at_k(r, 1)
1.0
>>> r = [2, 1, 2, 0]
>>> ndcg_at_k(r, 4)
0.9203032077642922
>>> ndcg_at_k(r, 4, method=1)
0.96519546960144276
>>> ndcg_at_k([0], 1)
0.0
>>> ndcg_at_k([1], 2)
1.0
Args:
r: Relevance scores (list or numpy) in rank order
(first element is the first item)
k: Number of results to consider
method: If 0 then weights are [1.0, 1.0, 0.6309, 0.5, 0.4307, ...]
If 1 then weights are [1.0, 0.6309, 0.5, 0.4307, ...]
Returns:
Normalized discounted cumulative gain
"""
dcg_max = dcg_at_k(sorted(r, reverse=True), k, method)
if not dcg_max:
return 0.
return dcg_at_k(r, k, method) / dcg_max
"""
Wealth inequality
"""
def gini(arr):
## Gini = \frac{2\sum_i^n i\times y_i}{n\sum_i^n y_i} - \frac{n+1}{n}
sorted_arr = arr.copy()
sorted_arr.sort()
n = arr.size
coef_ = 2. / n
const_ = (n + 1.) / n
weighted_sum = sum([(i + 1) * yi for i, yi in enumerate(sorted_arr)])
return coef_ * weighted_sum / (sorted_arr.sum()) - const_
"""
Expected envy and inferiority under probabilistic recommendation as weighted sampling with replacement
"""
def expected_utility_u(Ru, ps, k):
return Ru @ ps * k
def expected_utility(R, Pi, k):
U = (R * Pi * k).sum(axis=1)
# if not agg:
return U
def expected_envy_u_v(Ru, pus, pvs, k):
return Ru @ (pvs - pus) * k
def prob_in(ps, k):
return 1 - (1 - ps) ** k
def prob_in_approx(ps, k):
return k * ps
def expected_inferiority_u_v(Ru, Rv, pus, pvs, k, compensate=False, approx=False):
differ = Rv - Ru
if not compensate:
differ = np.clip(differ, a_min=0, a_max=None)
if not approx:
return differ @ (prob_in(pus, k) * prob_in(pvs, k))
else:
return differ @ (prob_in_approx(pus, k) * prob_in_approx(pvs, k))
def expected_envy(R, Pi, k):
"""
Measure expected envy for k-sized recommendation according to rec strategy Pi with respect to relevancy scores R
:param R: m x n real-valued matrix
:param Pi: m x n Markov matrix
:return: E: m x n envy matrix where Euv = envy from u to v if not agg, sum of E if agg
"""
assert np.all(np.isclose(Pi.sum(axis=1), 1.)) or np.array_equal(Pi,
Pi.astype(bool)) # binary matrix for discrete rec
m, n = len(R), len(R[0])
E = np.zeros((m, m))
for u in range(m):
for v in range(m):
if v == u:
continue
E[u, v] = expected_envy_u_v(R[u], Pi[u], Pi[v], k=k)
E = np.clip(E, a_min=0., a_max=None)
# if not agg:
return E
def expected_inferiority(R, Pi, k, compensate=True, approx=False):
"""
Measure expected inferiority for k-sized recommendation according to rec strategy Pi with respect to relevancy scores R
:param R:
:param Pi:
:param k:
:param agg:
:return: I: m x n
"""
assert np.all(np.isclose(Pi.sum(axis=1), 1.)) or np.array_equal(Pi,
Pi.astype(bool)) # binary matrix for discrete rec
m, n = len(R), len(R[0])
I = np.zeros((m, m))
for u in range(m):
for v in range(m):
if v == u:
continue
I[u, v] = expected_inferiority_u_v(R[u], R[v], Pi[u], Pi[v], k=k, approx=approx, compensate=compensate)
I = np.clip(I, a_min=0., a_max=None)
# if not agg:
return I
def expected_envy_torch(R, Pi, k):
m, n = len(R), len(R[0])
E = torch.zeros(m, m)
for u in range(m):
for v in range(m):
if v == u:
continue
E[u, v] = expected_envy_u_v(R[u], Pi[u], Pi[v], k=k)
E = torch.clamp(E, min=0.)
return E
def expected_envy_torch_vec(R, P, k):
res = R @ P.transpose(0, 1)
envy_mat = (res - torch.diagonal(res, 0).reshape(-1, 1))
return k * (torch.clamp(envy_mat, min=0.))
def expected_inferiority_torch(R, Pi, k, compensate=False, approx=False):
m, n = R.shape
I = torch.zeros((m, m))
for u in range(m):
for v in range(m):
if v == u:
continue
if not approx:
joint_prob = prob_in(Pi[v], k) * prob_in(Pi[u], k)
else:
joint_prob = prob_in_approx(Pi[v], k) * prob_in_approx(Pi[u], k)
if not compensate:
I[u, v] = torch.clamp(R[v] - R[u], min=0., max=None) @ joint_prob
else:
I[u, v] = (R[v] - R[u]) @ joint_prob
return torch.clamp(I, min=0.)
def expected_inferiority_torch_vec(R, P, k, compensate=False, approx=False):
m, n = R.shape
I = torch.zeros((m, m))
P_pow_k = 1 - (1 - P).pow(k) if not approx else P * k
for i in range(m):
first_term = torch.clamp(R - R[i], min=0.) if not compensate else R - R[i]
I[i] = (first_term * (P_pow_k[i] * P_pow_k)).sum(1)
return I
def slow_onehot(idx, P):
m = P.shape[0]
res = torch.zeros_like(P)
for i in range(m):
res[i, idx[i]] = 1.
return res
def eiu_cut_off(R, Pi, k, agg=True):
"""
Evaluate envy, inferiority, utility based on top-k cut-off recommendation
:param R:
:param Pi:
:return: envy, inferiority, utility
"""
# print('Start evaluation!')
m, n = R.shape
# _, rec = torch.topk(Pi, k, dim=1)
# rec_onehot = F.one_hot(rec, num_classes=n).sum(1).float()
rec_onehot = slow_onehot(torch.topk(Pi, k, dim=1)[1], Pi)
envy = expected_envy_torch_vec(R, rec_onehot, k=1)
inferiority = expected_inferiority_torch_vec(R, rec_onehot, k=1, compensate=False, approx=False)
utility = expected_utility(R, rec_onehot, k=1)
if agg:
envy = envy.sum(-1).mean()
inferiority = inferiority.sum(-1).mean()
utility = utility.mean()
return envy, inferiority, utility
def eiu_cut_off2(R, Pi, k, agg=True):
"""
Evaluate envy, inferiority, utility based on top-k cut-off recommendation
:param R:
:param Pi:
:return: envy, inferiority, utility
"""
# print('Start evaluation!')
S, U = R
if not isinstance(S, torch.Tensor):
S = torch.tensor(S)
if not isinstance(U, torch.Tensor):
U = torch.tensor(U)
if not isinstance(Pi, torch.Tensor):
Pi = torch.tensor(Pi)
m, n = U.shape
# _, rec = torch.topk(Pi, k, dim=1)
# rec_onehot = F.one_hot(rec, num_classes=n).sum(1).float()
rec_onehot = slow_onehot(torch.topk(Pi, k, dim=1)[1], Pi)
envy = expected_envy_torch_vec(U, rec_onehot, k=1)
inferiority = expected_inferiority_torch_vec(S, rec_onehot, k=1, compensate=False, approx=False)
utility = expected_utility(U, rec_onehot, k=1)
if agg:
envy = envy.sum(-1).mean()
inferiority = inferiority.sum(-1).mean()
utility = utility.mean()
return envy, inferiority, utility
"""
Global congestion metrics
"""
def get_competitors(rec_per_job, rec):
m = rec.shape[0]
competitors = []
for i in range(m):
if len(rec[i]) == 1:
competitors.append([rec_per_job[rec[i]]])
else:
competitors.append(rec_per_job[rec[i]])
return np.array(competitors)
def get_better_competitor_scores(rec, R):
m, n = R.shape
_, k = rec.shape
user_ids_per_job = defaultdict(list)
for i, r in enumerate(rec):
for j in r:
user_ids_per_job[j.item()].append(i)
mean_competitor_scores_per_job = np.zeros((m, k))
for i in range(m):
my_rec_jobs = rec[i].numpy()
my_mean_competitors = np.zeros(k)
for j_, j in enumerate(my_rec_jobs):
my_score = R[i, j]
all_ids = user_ids_per_job[j].copy()
all_ids.remove(i)
other_scores = R[all_ids, j]
if not all_ids:
other_scores = np.zeros(1) # TODO if no competition, then it is the negative of my own score
my_mean_competitors[j_] = other_scores.mean() - my_score
# my_mean_competitors[my_mean_competitors < 0] = 0. # TODO only keep the better competitors
mean_competitor_scores_per_job[i] = my_mean_competitors
return mean_competitor_scores_per_job
def get_num_better_competitors(rec, R):
m, n = R.shape
_, k = rec.shape
user_ids_per_job = defaultdict(list)
for i, r in enumerate(rec):
for j in r:
user_ids_per_job[j.item()].append(i)
num_better_competitors = np.zeros((m, k))
for i in range(m):
my_rec_jobs = rec[i].numpy()
better_competitors = np.zeros(k)
for j_, j in enumerate(my_rec_jobs):
my_score = R[i, j]
all_ids = user_ids_per_job[j].copy()
all_ids.remove(i)
other_scores = R[all_ids, j]
better_competitors[j_] = ((other_scores - my_score) > 0).sum()
num_better_competitors[i] = better_competitors
return num_better_competitors
def get_scores_ids_per_job(rec, R):
scores_per_job = defaultdict(list)
ids_per_job = defaultdict(list)
for i in range(len(rec)):
u = rec[i]
for jb in u:
jb = jb.item()
ids_per_job[jb].append(i)
scores_per_job[jb].append(R[i, jb].item())
return scores_per_job, ids_per_job
def get_rank(a, method='ordinal', axis=None, descending=False):
if descending:
a = np.array(a) * -1
return stats.rankdata(a, method=method, axis=axis)
def get_ranks_per_job(scores_rec):
ranks_per_job = defaultdict(list)
for jb in scores_rec:
ranks_per_job[jb] = get_rank(scores_rec[jb], descending=True)
return ranks_per_job
def get_ranks_per_user(ranks_per_job, ids_per_job):
for k, v in ranks_per_job.items():
ranks_per_job[k] = [i - 1 for i in v]
ranks_per_user = defaultdict(list)
for k, v in ids_per_job.items():
rks = ranks_per_job[k]
for i, u in enumerate(v):
ranks_per_user[u].append(rks[i])
return ranks_per_user
def calculate_global_metrics(res, R, k=10):
# get rec
m, n = res.shape
if not torch.is_tensor(res):
res = torch.from_numpy(res)
_, rec = torch.topk(res, k, dim=1)
rec_onehot = slow_onehot(rec, res)
# rec_onehot = F.one_hot(rec, num_classes=n).sum(1).float()
try:
rec_per_job = rec_onehot.sum(axis=0).numpy()
except:
rec_per_job = rec_onehot.sum(axis=0).cpu().numpy()
rec = rec.cpu()
R = R.cpu()
opt_competitors = get_competitors(rec_per_job, rec)
# mean competitors per person
mean_competitors = opt_competitors.mean()
# mean better competitors per person
mean_better_competitors = get_num_better_competitors(rec, R).mean()
# mean competitor scores - my score
mean_diff_scores = get_better_competitor_scores(rec, R)
mean_diff_scores[mean_diff_scores < 0] = 0.
mean_diff_scores = mean_diff_scores.mean()
# mean rank
# scores_opt, ids_opt = get_scores_ids_per_job(rec, R)
# ranks_opt = get_ranks_per_job(scores_opt)
# ranks_per_user_opt = get_ranks_per_user(ranks_opt, ids_opt)
# mean_rank = np.array(list(ranks_per_user_opt.values())).mean()
# gini
gini_index = gini(rec_per_job)
return {'mean_competitors': mean_competitors, 'mean_better_competitors': mean_better_competitors, \
'mean_scores_diff': mean_diff_scores, 'mean_rank': mean_better_competitors, 'gini_index': gini_index}
def calculate_global_metrics2(res, R, k=10):
# get rec
S, U = R
m, n = res.shape
if not torch.is_tensor(res):
res = torch.from_numpy(res)
_, rec = torch.topk(res, k, dim=1)
rec_onehot = slow_onehot(rec, res)
# rec_onehot = F.one_hot(rec, num_classes=n).sum(1).float()
try:
rec_per_job = rec_onehot.sum(axis=0).numpy()
except:
rec_per_job = rec_onehot.sum(axis=0).cpu().numpy()
rec = rec.cpu()
S = S.cpu()
U = U.cpu()
opt_competitors = get_competitors(rec_per_job, rec)
# mean competitors per person
mean_competitors = opt_competitors.mean()
# mean better competitors per person
mean_better_competitors = get_num_better_competitors(rec, S).mean()
# mean competitor scores - my score
mean_diff_scores = get_better_competitor_scores(rec, S)
mean_diff_scores[mean_diff_scores < 0] = 0.
mean_diff_scores = mean_diff_scores.mean()
# mean rank
scores_opt, ids_opt = get_scores_ids_per_job(rec, S)
ranks_opt = get_ranks_per_job(scores_opt)
ranks_per_user_opt = get_ranks_per_user(ranks_opt, ids_opt)
mean_rank = np.array(list(ranks_per_user_opt.values())).mean()
# gini
gini_index = gini(rec_per_job)
return {'mean_competitors': mean_competitors, 'mean_better_competitors': mean_better_competitors, \
'mean_scores_diff': mean_diff_scores, 'mean_rank': mean_rank, 'gini_index': gini_index}
def get_scores_per_job(rec, S):
scores_per_job = defaultdict(list)
for i in range(len(rec)):
u = rec[i]
for jb in u:
jb = jb.item()
scores_per_job[jb].append(S[i, jb].item())
return scores_per_job