mmtrack.models.mot.base 源代码
# Copyright (c) OpenMMLab. All rights reserved.
from abc import ABCMeta, abstractmethod
from collections import OrderedDict
import mmcv
import torch
import torch.distributed as dist
from mmcv.runner import BaseModule, auto_fp16
from mmtrack.core import imshow_tracks, results2outs
from mmtrack.utils import get_root_logger
[文档]class BaseMultiObjectTracker(BaseModule, metaclass=ABCMeta):
"""Base class for multiple object tracking."""
def __init__(self, init_cfg=None):
super(BaseMultiObjectTracker, self).__init__(init_cfg)
self.logger = get_root_logger()
self.fp16_enabled = False
[文档] def freeze_module(self, module):
"""Freeze module during training."""
if isinstance(module, str):
modules = [module]
else:
if not (isinstance(module, list) or isinstance(module, tuple)):
raise TypeError('module must be a str or a list.')
else:
modules = module
for module in modules:
m = getattr(self, module)
m.eval()
for param in m.parameters():
param.requires_grad = False
@property
def with_detector(self):
"""bool: whether the framework has a detector."""
return hasattr(self, 'detector') and self.detector is not None
@property
def with_reid(self):
"""bool: whether the framework has a reid model."""
return hasattr(self, 'reid') and self.reid is not None
@property
def with_motion(self):
"""bool: whether the framework has a motion model."""
return hasattr(self, 'motion') and self.motion is not None
@property
def with_track_head(self):
"""bool: whether the framework has a track_head."""
return hasattr(self, 'track_head') and self.track_head is not None
@property
def with_tracker(self):
"""bool: whether the framework has a tracker."""
return hasattr(self, 'tracker') and self.tracker is not None
[文档] @abstractmethod
def forward_train(self, imgs, img_metas, **kwargs):
"""
Args:
img (list[Tensor]): List of tensors of shape (1, C, H, W).
Typically these should be mean centered and std scaled.
img_metas (list[dict]): List of image info dict where each dict
has: 'img_shape', 'scale_factor', 'flip', and may also contain
'filename', 'ori_shape', 'pad_shape', and 'img_norm_cfg'.
For details on the values of these keys, see
:class:`mmdet.datasets.pipelines.Collect`.
kwargs (keyword arguments): Specific to concrete implementation.
"""
pass
[文档] @abstractmethod
def simple_test(self, img, img_metas, **kwargs):
"""Test function with a single scale."""
pass
[文档] def aug_test(self, imgs, img_metas, **kwargs):
"""Test function with test time augmentation."""
pass
[文档] def forward_test(self, imgs, img_metas, **kwargs):
"""
Args:
imgs (List[Tensor]): the outer list indicates test-time
augmentations and inner Tensor should have a shape NxCxHxW,
which contains all images in the batch.
img_metas (List[List[dict]]): the outer list indicates test-time
augs (multiscale, flip, etc.) and the inner list indicates
images in a batch.
"""
for var, name in [(imgs, 'imgs'), (img_metas, 'img_metas')]:
if not isinstance(var, list):
raise TypeError(f'{name} must be a list, but got {type(var)}')
num_augs = len(imgs)
if num_augs != len(img_metas):
raise ValueError(f'num of augmentations ({len(imgs)}) '
f'!= num of image meta ({len(img_metas)})')
if num_augs == 1:
# proposals (List[List[Tensor]]): the outer list indicates
# test-time augs (multiscale, flip, etc.) and the inner list
# indicates images in a batch.
# The Tensor should have a shape Px4, where P is the number of
# proposals.
if 'proposals' in kwargs:
kwargs['proposals'] = kwargs['proposals'][0]
return self.simple_test(imgs[0], img_metas[0], **kwargs)
else:
assert imgs[0].size(0) == 1, 'aug test does not support ' \
'inference with batch size ' \
f'{imgs[0].size(0)}'
# TODO: support test augmentation for predefined proposals
assert 'proposals' not in kwargs
return self.aug_test(imgs, img_metas, **kwargs)
[文档] @auto_fp16(apply_to=('img', ))
def forward(self, img, img_metas, return_loss=True, **kwargs):
"""Calls either :func:`forward_train` or :func:`forward_test` depending
on whether ``return_loss`` is ``True``.
Note this setting will change the expected inputs. When
``return_loss=True``, img and img_meta are single-nested (i.e. Tensor
and List[dict]), and when ``resturn_loss=False``, img and img_meta
should be double nested (i.e. List[Tensor], List[List[dict]]), with
the outer list indicating test time augmentations.
"""
if return_loss:
return self.forward_train(img, img_metas, **kwargs)
else:
return self.forward_test(img, img_metas, **kwargs)
def _parse_losses(self, losses):
"""Parse the raw outputs (losses) of the network.
Args:
losses (dict): Raw output of the network, which usually contain
losses and other necessary information.
Returns:
tuple[Tensor, dict]: (loss, log_vars), loss is the loss tensor
which may be a weighted sum of all losses, log_vars contains
all the variables to be sent to the logger.
"""
log_vars = OrderedDict()
for loss_name, loss_value in losses.items():
if isinstance(loss_value, torch.Tensor):
log_vars[loss_name] = loss_value.mean()
elif isinstance(loss_value, list):
log_vars[loss_name] = sum(_loss.mean() for _loss in loss_value)
else:
raise TypeError(
f'{loss_name} is not a tensor or list of tensors')
loss = sum(_value for _key, _value in log_vars.items()
if 'loss' in _key)
log_vars['loss'] = loss
for loss_name, loss_value in log_vars.items():
# reduce loss when distributed training
if dist.is_available() and dist.is_initialized():
loss_value = loss_value.data.clone()
dist.all_reduce(loss_value.div_(dist.get_world_size()))
log_vars[loss_name] = loss_value.item()
return loss, log_vars
[文档] def train_step(self, data, optimizer):
"""The iteration step during training.
This method defines an iteration step during training, except for the
back propagation and optimizer updating, which are done in an optimizer
hook. Note that in some complicated cases or models, the whole process
including back propagation and optimizer updating is also defined in
this method, such as GAN.
Args:
data (dict): The output of dataloader.
optimizer (:obj:`torch.optim.Optimizer` | dict): The optimizer of
runner is passed to ``train_step()``. This argument is unused
and reserved.
Returns:
dict: It should contain at least 3 keys: ``loss``, ``log_vars``,
``num_samples``.
- ``loss`` is a tensor for back propagation, which can be a
weighted sum of multiple losses.
- ``log_vars`` contains all the variables to be sent to the
logger.
- ``num_samples`` indicates the batch size (when the model is
DDP, it means the batch size on each GPU), which is used for
averaging the logs.
"""
losses = self(**data)
loss, log_vars = self._parse_losses(losses)
outputs = dict(
loss=loss, log_vars=log_vars, num_samples=len(data['img_metas']))
return outputs
[文档] def val_step(self, data, optimizer):
"""The iteration step during validation.
This method shares the same signature as :func:`train_step`, but used
during val epochs. Note that the evaluation after training epochs is
not implemented with this method, but an evaluation hook.
"""
losses = self(**data)
loss, log_vars = self._parse_losses(losses)
outputs = dict(
loss=loss, log_vars=log_vars, num_samples=len(data['img_metas']))
return outputs
[文档] def show_result(self,
img,
result,
score_thr=0.0,
thickness=1,
font_scale=0.5,
show=False,
out_file=None,
wait_time=0,
backend='cv2',
**kwargs):
"""Visualize tracking results.
Args:
img (str | ndarray): Filename of loaded image.
result (dict): Tracking result.
- The value of key 'track_bboxes' is list with length
num_classes, and each element in list is ndarray with
shape(n, 6) in [id, tl_x, tl_y, br_x, br_y, score] format.
- The value of key 'det_bboxes' is list with length
num_classes, and each element in list is ndarray with
shape(n, 5) in [tl_x, tl_y, br_x, br_y, score] format.
thickness (int, optional): Thickness of lines. Defaults to 1.
font_scale (float, optional): Font scales of texts. Defaults
to 0.5.
show (bool, optional): Whether show the visualizations on the
fly. Defaults to False.
out_file (str | None, optional): Output filename. Defaults to None.
backend (str, optional): Backend to draw the bounding boxes,
options are `cv2` and `plt`. Defaults to 'cv2'.
Returns:
ndarray: Visualized image.
"""
assert isinstance(result, dict)
track_bboxes = result.get('track_bboxes', None)
track_masks = result.get('track_masks', None)
if isinstance(img, str):
img = mmcv.imread(img)
outs_track = results2outs(
bbox_results=track_bboxes,
mask_results=track_masks,
mask_shape=img.shape[:2])
img = imshow_tracks(
img,
outs_track.get('bboxes', None),
outs_track.get('labels', None),
outs_track.get('ids', None),
outs_track.get('masks', None),
classes=self.CLASSES,
score_thr=score_thr,
thickness=thickness,
font_scale=font_scale,
show=show,
out_file=out_file,
wait_time=wait_time,
backend=backend)
return img