|
|
|
import copy |
|
from os.path import dirname, exists, join |
|
|
|
import numpy as np |
|
import torch |
|
from mmengine.config import Config |
|
from mmengine.dataset import pseudo_collate |
|
from mmengine.structures import InstanceData, PixelData |
|
|
|
from ..registry import TASK_UTILS |
|
from ..structures import DetDataSample |
|
from ..structures.bbox import HorizontalBoxes |
|
|
|
|
|
def _get_config_directory(): |
|
"""Find the predefined detector config directory.""" |
|
try: |
|
|
|
repo_dpath = dirname(dirname(dirname(__file__))) |
|
except NameError: |
|
|
|
import mmdet |
|
repo_dpath = dirname(dirname(mmdet.__file__)) |
|
config_dpath = join(repo_dpath, 'configs') |
|
if not exists(config_dpath): |
|
raise Exception('Cannot find config path') |
|
return config_dpath |
|
|
|
|
|
def _get_config_module(fname): |
|
"""Load a configuration as a python module.""" |
|
config_dpath = _get_config_directory() |
|
config_fpath = join(config_dpath, fname) |
|
config_mod = Config.fromfile(config_fpath) |
|
return config_mod |
|
|
|
|
|
def get_detector_cfg(fname): |
|
"""Grab configs necessary to create a detector. |
|
|
|
These are deep copied to allow for safe modification of parameters without |
|
influencing other tests. |
|
""" |
|
config = _get_config_module(fname) |
|
model = copy.deepcopy(config.model) |
|
return model |
|
|
|
|
|
def get_roi_head_cfg(fname): |
|
"""Grab configs necessary to create a roi_head. |
|
|
|
These are deep copied to allow for safe modification of parameters without |
|
influencing other tests. |
|
""" |
|
config = _get_config_module(fname) |
|
model = copy.deepcopy(config.model) |
|
|
|
roi_head = model.roi_head |
|
train_cfg = None if model.train_cfg is None else model.train_cfg.rcnn |
|
test_cfg = None if model.test_cfg is None else model.test_cfg.rcnn |
|
roi_head.update(dict(train_cfg=train_cfg, test_cfg=test_cfg)) |
|
return roi_head |
|
|
|
|
|
def _rand_bboxes(rng, num_boxes, w, h): |
|
cx, cy, bw, bh = rng.rand(num_boxes, 4).T |
|
|
|
tl_x = ((cx * w) - (w * bw / 2)).clip(0, w) |
|
tl_y = ((cy * h) - (h * bh / 2)).clip(0, h) |
|
br_x = ((cx * w) + (w * bw / 2)).clip(0, w) |
|
br_y = ((cy * h) + (h * bh / 2)).clip(0, h) |
|
|
|
bboxes = np.vstack([tl_x, tl_y, br_x, br_y]).T |
|
return bboxes |
|
|
|
|
|
def _rand_masks(rng, num_boxes, bboxes, img_w, img_h): |
|
from mmdet.structures.mask import BitmapMasks |
|
masks = np.zeros((num_boxes, img_h, img_w)) |
|
for i, bbox in enumerate(bboxes): |
|
bbox = bbox.astype(np.int32) |
|
mask = (rng.rand(1, bbox[3] - bbox[1], bbox[2] - bbox[0]) > |
|
0.3).astype(np.int64) |
|
masks[i:i + 1, bbox[1]:bbox[3], bbox[0]:bbox[2]] = mask |
|
return BitmapMasks(masks, height=img_h, width=img_w) |
|
|
|
|
|
def demo_mm_inputs(batch_size=2, |
|
image_shapes=(3, 128, 128), |
|
num_items=None, |
|
num_classes=10, |
|
sem_seg_output_strides=1, |
|
with_mask=False, |
|
with_semantic=False, |
|
use_box_type=False, |
|
device='cpu'): |
|
"""Create a superset of inputs needed to run test or train batches. |
|
|
|
Args: |
|
batch_size (int): batch size. Defaults to 2. |
|
image_shapes (List[tuple], Optional): image shape. |
|
Defaults to (3, 128, 128) |
|
num_items (None | List[int]): specifies the number |
|
of boxes in each batch item. Default to None. |
|
num_classes (int): number of different labels a |
|
box might have. Defaults to 10. |
|
with_mask (bool): Whether to return mask annotation. |
|
Defaults to False. |
|
with_semantic (bool): whether to return semantic. |
|
Defaults to False. |
|
device (str): Destination device type. Defaults to cpu. |
|
""" |
|
rng = np.random.RandomState(0) |
|
|
|
if isinstance(image_shapes, list): |
|
assert len(image_shapes) == batch_size |
|
else: |
|
image_shapes = [image_shapes] * batch_size |
|
|
|
if isinstance(num_items, list): |
|
assert len(num_items) == batch_size |
|
|
|
packed_inputs = [] |
|
for idx in range(batch_size): |
|
image_shape = image_shapes[idx] |
|
c, h, w = image_shape |
|
|
|
image = rng.randint(0, 255, size=image_shape, dtype=np.uint8) |
|
|
|
mm_inputs = dict() |
|
mm_inputs['inputs'] = torch.from_numpy(image).to(device) |
|
|
|
img_meta = { |
|
'img_id': idx, |
|
'img_shape': image_shape[1:], |
|
'ori_shape': image_shape[1:], |
|
'filename': '<demo>.png', |
|
'scale_factor': np.array([1.1, 1.2]), |
|
'flip': False, |
|
'flip_direction': None, |
|
'border': [1, 1, 1, 1] |
|
} |
|
|
|
data_sample = DetDataSample() |
|
data_sample.set_metainfo(img_meta) |
|
|
|
|
|
gt_instances = InstanceData() |
|
if num_items is None: |
|
num_boxes = rng.randint(1, 10) |
|
else: |
|
num_boxes = num_items[idx] |
|
|
|
bboxes = _rand_bboxes(rng, num_boxes, w, h) |
|
labels = rng.randint(1, num_classes, size=num_boxes) |
|
|
|
if use_box_type: |
|
gt_instances.bboxes = HorizontalBoxes(bboxes, dtype=torch.float32) |
|
else: |
|
gt_instances.bboxes = torch.FloatTensor(bboxes) |
|
gt_instances.labels = torch.LongTensor(labels) |
|
|
|
if with_mask: |
|
masks = _rand_masks(rng, num_boxes, bboxes, w, h) |
|
gt_instances.masks = masks |
|
|
|
|
|
|
|
|
|
|
|
data_sample.gt_instances = gt_instances |
|
|
|
|
|
ignore_instances = InstanceData() |
|
bboxes = _rand_bboxes(rng, num_boxes, w, h) |
|
if use_box_type: |
|
ignore_instances.bboxes = HorizontalBoxes( |
|
bboxes, dtype=torch.float32) |
|
else: |
|
ignore_instances.bboxes = torch.FloatTensor(bboxes) |
|
data_sample.ignored_instances = ignore_instances |
|
|
|
|
|
if with_semantic: |
|
|
|
gt_semantic_seg = torch.from_numpy( |
|
np.random.randint( |
|
0, |
|
num_classes, (1, h // sem_seg_output_strides, |
|
w // sem_seg_output_strides), |
|
dtype=np.uint8)) |
|
gt_sem_seg_data = dict(sem_seg=gt_semantic_seg) |
|
data_sample.gt_sem_seg = PixelData(**gt_sem_seg_data) |
|
|
|
mm_inputs['data_samples'] = data_sample.to(device) |
|
|
|
|
|
|
|
packed_inputs.append(mm_inputs) |
|
data = pseudo_collate(packed_inputs) |
|
return data |
|
|
|
|
|
def demo_mm_proposals(image_shapes, num_proposals, device='cpu'): |
|
"""Create a list of fake porposals. |
|
|
|
Args: |
|
image_shapes (list[tuple[int]]): Batch image shapes. |
|
num_proposals (int): The number of fake proposals. |
|
""" |
|
rng = np.random.RandomState(0) |
|
|
|
results = [] |
|
for img_shape in image_shapes: |
|
result = InstanceData() |
|
w, h = img_shape[1:] |
|
proposals = _rand_bboxes(rng, num_proposals, w, h) |
|
result.bboxes = torch.from_numpy(proposals).float() |
|
result.scores = torch.from_numpy(rng.rand(num_proposals)).float() |
|
result.labels = torch.zeros(num_proposals).long() |
|
results.append(result.to(device)) |
|
return results |
|
|
|
|
|
def demo_mm_sampling_results(proposals_list, |
|
batch_gt_instances, |
|
batch_gt_instances_ignore=None, |
|
assigner_cfg=None, |
|
sampler_cfg=None, |
|
feats=None): |
|
"""Create sample results that can be passed to BBoxHead.get_targets.""" |
|
assert len(proposals_list) == len(batch_gt_instances) |
|
if batch_gt_instances_ignore is None: |
|
batch_gt_instances_ignore = [None for _ in batch_gt_instances] |
|
else: |
|
assert len(batch_gt_instances_ignore) == len(batch_gt_instances) |
|
|
|
default_assigner_cfg = dict( |
|
type='MaxIoUAssigner', |
|
pos_iou_thr=0.5, |
|
neg_iou_thr=0.5, |
|
min_pos_iou=0.5, |
|
ignore_iof_thr=-1) |
|
assigner_cfg = assigner_cfg if assigner_cfg is not None \ |
|
else default_assigner_cfg |
|
default_sampler_cfg = dict( |
|
type='RandomSampler', |
|
num=512, |
|
pos_fraction=0.25, |
|
neg_pos_ub=-1, |
|
add_gt_as_proposals=True) |
|
sampler_cfg = sampler_cfg if sampler_cfg is not None \ |
|
else default_sampler_cfg |
|
bbox_assigner = TASK_UTILS.build(assigner_cfg) |
|
bbox_sampler = TASK_UTILS.build(sampler_cfg) |
|
|
|
sampling_results = [] |
|
for i in range(len(batch_gt_instances)): |
|
if feats is not None: |
|
feats = [lvl_feat[i][None] for lvl_feat in feats] |
|
|
|
proposals = proposals_list[i] |
|
proposals.priors = proposals.pop('bboxes') |
|
|
|
assign_result = bbox_assigner.assign(proposals, batch_gt_instances[i], |
|
batch_gt_instances_ignore[i]) |
|
sampling_result = bbox_sampler.sample( |
|
assign_result, proposals, batch_gt_instances[i], feats=feats) |
|
sampling_results.append(sampling_result) |
|
|
|
return sampling_results |
|
|
|
|
|
|
|
def replace_to_ceph(cfg): |
|
backend_args = dict( |
|
backend='petrel', |
|
path_mapping=dict({ |
|
'./data/': 's3://openmmlab/datasets/detection/', |
|
'data/': 's3://openmmlab/datasets/detection/' |
|
})) |
|
|
|
|
|
def _process_pipeline(dataset, name): |
|
|
|
def replace_img(pipeline): |
|
if pipeline['type'] == 'LoadImageFromFile': |
|
pipeline['backend_args'] = backend_args |
|
|
|
def replace_ann(pipeline): |
|
if pipeline['type'] == 'LoadAnnotations' or pipeline[ |
|
'type'] == 'LoadPanopticAnnotations': |
|
pipeline['backend_args'] = backend_args |
|
|
|
if 'pipeline' in dataset: |
|
replace_img(dataset.pipeline[0]) |
|
replace_ann(dataset.pipeline[1]) |
|
if 'dataset' in dataset: |
|
|
|
replace_img(dataset.dataset.pipeline[0]) |
|
replace_ann(dataset.dataset.pipeline[1]) |
|
else: |
|
|
|
replace_img(dataset.dataset.pipeline[0]) |
|
replace_ann(dataset.dataset.pipeline[1]) |
|
|
|
def _process_evaluator(evaluator, name): |
|
if evaluator['type'] == 'CocoPanopticMetric': |
|
evaluator['backend_args'] = backend_args |
|
|
|
|
|
_process_pipeline(cfg.train_dataloader.dataset, cfg.filename) |
|
_process_pipeline(cfg.val_dataloader.dataset, cfg.filename) |
|
_process_pipeline(cfg.test_dataloader.dataset, cfg.filename) |
|
_process_evaluator(cfg.val_evaluator, cfg.filename) |
|
_process_evaluator(cfg.test_evaluator, cfg.filename) |
|
|