|
|
|
import copy |
|
from collections import OrderedDict |
|
from typing import List, Optional, Sequence, Union |
|
|
|
import numpy as np |
|
from mmengine.evaluator import BaseMetric |
|
from mmengine.logging import MMLogger, print_log |
|
|
|
from mmdet.registry import METRICS |
|
from ..functional import eval_map |
|
|
|
|
|
@METRICS.register_module() |
|
class OpenImagesMetric(BaseMetric): |
|
"""OpenImages evaluation metric. |
|
|
|
Evaluate detection mAP for OpenImages. Please refer to |
|
https://storage.googleapis.com/openimages/web/evaluation.html for more |
|
details. |
|
|
|
Args: |
|
iou_thrs (float or List[float]): IoU threshold. Defaults to 0.5. |
|
ioa_thrs (float or List[float]): IoA threshold. Defaults to 0.5. |
|
scale_ranges (List[tuple], optional): Scale ranges for evaluating |
|
mAP. If not specified, all bounding boxes would be included in |
|
evaluation. Defaults to None |
|
use_group_of (bool): Whether consider group of groud truth bboxes |
|
during evaluating. Defaults to True. |
|
get_supercategory (bool): Whether to get parent class of the |
|
current class. Default: True. |
|
filter_labels (bool): Whether filter unannotated classes. |
|
Default: True. |
|
collect_device (str): Device name used for collecting results from |
|
different ranks during distributed training. Must be 'cpu' or |
|
'gpu'. Defaults to 'cpu'. |
|
prefix (str, optional): The prefix that will be added in the metric |
|
names to disambiguate homonymous metrics of different evaluators. |
|
If prefix is not provided in the argument, self.default_prefix |
|
will be used instead. Defaults to None. |
|
""" |
|
default_prefix: Optional[str] = 'openimages' |
|
|
|
def __init__(self, |
|
iou_thrs: Union[float, List[float]] = 0.5, |
|
ioa_thrs: Union[float, List[float]] = 0.5, |
|
scale_ranges: Optional[List[tuple]] = None, |
|
use_group_of: bool = True, |
|
get_supercategory: bool = True, |
|
filter_labels: bool = True, |
|
collect_device: str = 'cpu', |
|
prefix: Optional[str] = None) -> None: |
|
super().__init__(collect_device=collect_device, prefix=prefix) |
|
self.iou_thrs = [iou_thrs] if isinstance(iou_thrs, float) else iou_thrs |
|
self.ioa_thrs = [ioa_thrs] if (isinstance(ioa_thrs, float) |
|
or ioa_thrs is None) else ioa_thrs |
|
assert isinstance(self.iou_thrs, list) and isinstance( |
|
self.ioa_thrs, list) |
|
assert len(self.iou_thrs) == len(self.ioa_thrs) |
|
|
|
self.scale_ranges = scale_ranges |
|
self.use_group_of = use_group_of |
|
self.get_supercategory = get_supercategory |
|
self.filter_labels = filter_labels |
|
|
|
def _get_supercategory_ann(self, instances: List[dict]) -> List[dict]: |
|
"""Get parent classes's annotation of the corresponding class. |
|
|
|
Args: |
|
instances (List[dict]): A list of annotations of the instances. |
|
|
|
Returns: |
|
List[dict]: Annotations extended with super-category. |
|
""" |
|
supercat_instances = [] |
|
relation_matrix = self.dataset_meta['RELATION_MATRIX'] |
|
for instance in instances: |
|
labels = np.where(relation_matrix[instance['bbox_label']])[0] |
|
for label in labels: |
|
if label == instance['bbox_label']: |
|
continue |
|
new_instance = copy.deepcopy(instance) |
|
new_instance['bbox_label'] = label |
|
supercat_instances.append(new_instance) |
|
return supercat_instances |
|
|
|
def _process_predictions(self, pred_bboxes: np.ndarray, |
|
pred_scores: np.ndarray, pred_labels: np.ndarray, |
|
gt_instances: list, |
|
image_level_labels: np.ndarray) -> tuple: |
|
"""Process results of the corresponding class of the detection bboxes. |
|
|
|
Note: It will choose to do the following two processing according to |
|
the parameters: |
|
|
|
1. Whether to add parent classes of the corresponding class of the |
|
detection bboxes. |
|
|
|
2. Whether to ignore the classes that unannotated on that image. |
|
|
|
Args: |
|
pred_bboxes (np.ndarray): bboxes predicted by the model |
|
pred_scores (np.ndarray): scores predicted by the model |
|
pred_labels (np.ndarray): labels predicted by the model |
|
gt_instances (list): ground truth annotations |
|
image_level_labels (np.ndarray): human-verified image level labels |
|
|
|
Returns: |
|
tuple: Processed bboxes, scores, and labels. |
|
""" |
|
processed_bboxes = copy.deepcopy(pred_bboxes) |
|
processed_scores = copy.deepcopy(pred_scores) |
|
processed_labels = copy.deepcopy(pred_labels) |
|
gt_labels = np.array([ins['bbox_label'] for ins in gt_instances], |
|
dtype=np.int64) |
|
if image_level_labels is not None: |
|
allowed_classes = np.unique( |
|
np.append(gt_labels, image_level_labels)) |
|
else: |
|
allowed_classes = np.unique(gt_labels) |
|
relation_matrix = self.dataset_meta['RELATION_MATRIX'] |
|
pred_classes = np.unique(pred_labels) |
|
for pred_class in pred_classes: |
|
classes = np.where(relation_matrix[pred_class])[0] |
|
for cls in classes: |
|
if (cls in allowed_classes and cls != pred_class |
|
and self.get_supercategory): |
|
|
|
index = np.where(pred_labels == pred_class)[0] |
|
processed_scores = np.concatenate( |
|
[processed_scores, pred_scores[index]]) |
|
processed_bboxes = np.concatenate( |
|
[processed_bboxes, pred_bboxes[index]]) |
|
extend_labels = np.full(index.shape, cls, dtype=np.int64) |
|
processed_labels = np.concatenate( |
|
[processed_labels, extend_labels]) |
|
elif cls not in allowed_classes and self.filter_labels: |
|
|
|
index = np.where(processed_labels != cls)[0] |
|
processed_scores = processed_scores[index] |
|
processed_bboxes = processed_bboxes[index] |
|
processed_labels = processed_labels[index] |
|
return processed_bboxes, processed_scores, processed_labels |
|
|
|
|
|
|
|
def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None: |
|
"""Process one batch of data samples and predictions. The processed |
|
results should be stored in ``self.results``, which will be used to |
|
compute the metrics when all batches have been processed. |
|
|
|
Args: |
|
data_batch (dict): A batch of data from the dataloader. |
|
data_samples (Sequence[dict]): A batch of data samples that |
|
contain annotations and predictions. |
|
""" |
|
for data_sample in data_samples: |
|
gt = copy.deepcopy(data_sample) |
|
|
|
|
|
instances = gt['instances'] |
|
if self.get_supercategory: |
|
supercat_instances = self._get_supercategory_ann(instances) |
|
instances.extend(supercat_instances) |
|
gt_labels = [] |
|
gt_bboxes = [] |
|
is_group_ofs = [] |
|
for ins in instances: |
|
gt_labels.append(ins['bbox_label']) |
|
gt_bboxes.append(ins['bbox']) |
|
is_group_ofs.append(ins['is_group_of']) |
|
ann = dict( |
|
labels=np.array(gt_labels, dtype=np.int64), |
|
bboxes=np.array(gt_bboxes, dtype=np.float32).reshape((-1, 4)), |
|
gt_is_group_ofs=np.array(is_group_ofs, dtype=bool)) |
|
|
|
image_level_labels = gt.get('image_level_labels', None) |
|
pred = data_sample['pred_instances'] |
|
pred_bboxes = pred['bboxes'].cpu().numpy() |
|
pred_scores = pred['scores'].cpu().numpy() |
|
pred_labels = pred['labels'].cpu().numpy() |
|
|
|
pred_bboxes, pred_scores, pred_labels = self._process_predictions( |
|
pred_bboxes, pred_scores, pred_labels, instances, |
|
image_level_labels) |
|
|
|
dets = [] |
|
for label in range(len(self.dataset_meta['classes'])): |
|
index = np.where(pred_labels == label)[0] |
|
pred_bbox_scores = np.hstack( |
|
[pred_bboxes[index], pred_scores[index].reshape((-1, 1))]) |
|
dets.append(pred_bbox_scores) |
|
self.results.append((ann, dets)) |
|
|
|
def compute_metrics(self, results: list) -> dict: |
|
"""Compute the metrics from processed results. |
|
|
|
Args: |
|
results (list): The processed results of each batch. |
|
|
|
Returns: |
|
dict: The computed metrics. The keys are the names of the metrics, |
|
and the values are corresponding results. |
|
""" |
|
logger = MMLogger.get_current_instance() |
|
gts, preds = zip(*results) |
|
eval_results = OrderedDict() |
|
|
|
dataset_type = self.dataset_meta.get('dataset_type') |
|
if dataset_type not in ['oid_challenge', 'oid_v6']: |
|
dataset_type = 'oid_v6' |
|
print_log( |
|
'Cannot infer dataset type from the length of the' |
|
' classes. Set `oid_v6` as dataset type.', |
|
logger='current') |
|
mean_aps = [] |
|
for i, (iou_thr, |
|
ioa_thr) in enumerate(zip(self.iou_thrs, self.ioa_thrs)): |
|
if self.use_group_of: |
|
assert ioa_thr is not None, 'ioa_thr must have value when' \ |
|
' using group_of in evaluation.' |
|
print_log(f'\n{"-" * 15}iou_thr, ioa_thr: {iou_thr}, {ioa_thr}' |
|
f'{"-" * 15}') |
|
mean_ap, _ = eval_map( |
|
preds, |
|
gts, |
|
scale_ranges=self.scale_ranges, |
|
iou_thr=iou_thr, |
|
ioa_thr=ioa_thr, |
|
dataset=dataset_type, |
|
logger=logger, |
|
use_group_of=self.use_group_of) |
|
|
|
mean_aps.append(mean_ap) |
|
eval_results[f'AP{int(iou_thr * 100):02d}'] = round(mean_ap, 3) |
|
eval_results['mAP'] = sum(mean_aps) / len(mean_aps) |
|
return eval_results |
|
|