Source code for mmtrack.utils.benchmark
# Copyright (c) OpenMMLab. All rights reserved.
import copy
import time
from functools import partial
from typing import List, Optional, Union
import torch
import torch.nn as nn
from mmcv.cnn import fuse_conv_bn
from mmengine import MMLogger
from mmengine.config import Config
from mmengine.device import get_max_cuda_memory
from mmengine.dist import get_world_size
from mmengine.runner import Runner, load_checkpoint
from mmengine.utils.dl_utils import set_multi_processing
from torch.nn.parallel import DistributedDataParallel
from mmtrack.datasets import BaseSOTDataset
from mmtrack.registry import DATASETS, MODELS
try:
import psutil
except ImportError:
psutil = None
def custom_round(value: Union[int, float],
factor: Union[int, float],
precision: int = 2) -> float:
"""Custom round function."""
return round(value / factor, precision)
gb_round = partial(custom_round, factor=1024**3)
def print_log(msg: str, logger: Optional[MMLogger] = None) -> None:
"""Print a log message."""
if logger is None:
print(msg, flush=True)
else:
logger.info(msg)
def print_process_memory(p: 'psutil.Process',
logger: Optional[MMLogger] = None) -> None:
"""print process memory info."""
mem_used = gb_round(psutil.virtual_memory().used)
memory_full_info = p.memory_full_info()
uss_mem = gb_round(memory_full_info.uss)
pss_mem = gb_round(memory_full_info.pss)
for children in p.children():
child_mem_info = children.memory_full_info()
uss_mem += gb_round(child_mem_info.uss)
pss_mem += gb_round(child_mem_info.pss)
process_count = 1 + len(p.children())
print_log(
f'(GB) mem_used: {mem_used:.2f} | uss: {uss_mem:.2f} | '
f'pss: {pss_mem:.2f} | total_proc: {process_count}', logger)
class BaseBenchmark:
"""The benchmark base class.
The ``run`` method is an external calling interface, and it will
call the ``run_once`` method ``repeat_num`` times for benchmarking.
Finally, call the ``average_multiple_runs`` method to further process
the results of multiple runs.
Args:
max_iter (int): maximum iterations of benchmark.
log_interval (int): interval of logging.
num_warmup (int): Number of Warmup.
logger (MMLogger, optional): Formatted logger used to record messages.
"""
def __init__(self,
max_iter: int,
log_interval: int,
num_warmup: int,
logger: Optional[MMLogger] = None):
self.max_iter = max_iter
self.log_interval = log_interval
self.num_warmup = num_warmup
self.logger = logger
def run(self, repeat_num: int = 1) -> dict:
"""benchmark entry method.
Args:
repeat_num (int): Number of repeat benchmark.
Defaults to 1.
"""
assert repeat_num >= 1
results = []
for _ in range(repeat_num):
results.append(self.run_once())
results = self.average_multiple_runs(results)
return results
def run_once(self) -> dict:
"""Executes the benchmark once."""
raise NotImplementedError()
def average_multiple_runs(self, results: List[dict]) -> dict:
"""Average the results of multiple runs."""
raise NotImplementedError()
[docs]class InferenceBenchmark(BaseBenchmark):
"""The inference benchmark class. It will be statistical inference FPS,
CUDA memory and CPU memory information.
Args:
cfg (mmengine.Config): config.
checkpoint (str): Accept local filepath, URL, ``torchvision://xxx``,
``open-mmlab://xxx``.
distributed (bool): distributed testing flag.
is_fuse_conv_bn (bool): Whether to fuse conv and bn, this will
slightly increase the inference speed.
max_iter (int): maximum iterations of benchmark. Defaults to 2000.
log_interval (int): interval of logging. Defaults to 50.
num_warmup (int): Number of Warmup. Defaults to 5.
logger (MMLogger, optional): Formatted logger used to record messages.
"""
def __init__(self,
cfg: Config,
checkpoint: str,
distributed: bool,
is_fuse_conv_bn: bool,
max_iter: int = 2000,
log_interval: int = 50,
num_warmup: int = 5,
logger: Optional[MMLogger] = None):
super().__init__(max_iter, log_interval, num_warmup, logger)
assert get_world_size(
) == 1, 'Inference benchmark does not allow distributed multi-GPU'
self.cfg = copy.deepcopy(cfg)
self.distributed = distributed
if psutil is None:
raise ImportError('psutil is not installed, please install it by: '
'pip install psutil')
self._process = psutil.Process()
env_cfg = self.cfg.get('env_cfg')
if env_cfg.get('cudnn_benchmark'):
torch.backends.cudnn.benchmark = True
mp_cfg: dict = env_cfg.get('mp_cfg', {})
set_multi_processing(**mp_cfg, distributed=self.distributed)
print_log('before build: ', self.logger)
print_process_memory(self._process, self.logger)
self.model = self._init_model(checkpoint, is_fuse_conv_bn)
# Because multiple processes will occupy additional CPU resources,
# FPS statistics will be more unstable when num_workers is not 0.
# It is reasonable to set num_workers to 0.
dataloader_cfg = cfg.test_dataloader
dataloader_cfg['num_workers'] = 0
dataloader_cfg['batch_size'] = 1
dataloader_cfg['persistent_workers'] = False
self.data_loader = Runner.build_dataloader(dataloader_cfg)
print_log('after build: ', self.logger)
print_process_memory(self._process, self.logger)
def _init_model(self, checkpoint: str, is_fuse_conv_bn: bool) -> nn.Module:
"""Initialize the model."""
model = MODELS.build(self.cfg.model)
if checkpoint is not None:
load_checkpoint(model, checkpoint, map_location='cpu')
if is_fuse_conv_bn:
model = fuse_conv_bn(model)
model = model.cuda()
if self.distributed:
model = DistributedDataParallel(
model,
device_ids=[torch.cuda.current_device()],
broadcast_buffers=False,
find_unused_parameters=False)
model.eval()
return model
[docs] def run_once(self) -> dict:
"""Executes the benchmark once."""
pure_inf_time = 0
fps = 0
for i, data in enumerate(self.data_loader):
if (i + 1) % self.log_interval == 0:
print_log('==================================', self.logger)
torch.cuda.synchronize()
start_time = time.perf_counter()
with torch.no_grad():
self.model.test_step(data)
torch.cuda.synchronize()
elapsed = time.perf_counter() - start_time
if i >= self.num_warmup:
pure_inf_time += elapsed
if (i + 1) % self.log_interval == 0:
fps = (i + 1 - self.num_warmup) / pure_inf_time
cuda_memory = get_max_cuda_memory()
print_log(
f'Done image [{i + 1:<3}/{self.max_iter}], '
f'fps: {fps:.1f} img/s, '
f'times per image: {1000 / fps:.1f} ms/img, '
f'cuda memory: {cuda_memory} MB', self.logger)
print_process_memory(self._process, self.logger)
if (i + 1) == self.max_iter:
fps = (i + 1 - self.num_warmup) / pure_inf_time
break
return {'fps': fps}
[docs] def average_multiple_runs(self, results: List[dict]) -> dict:
"""Average the results of multiple runs."""
print_log('============== Done ==================', self.logger)
fps_list_ = [round(result['fps'], 1) for result in results]
avg_fps_ = sum(fps_list_) / len(fps_list_)
outputs = {'avg_fps': avg_fps_, 'fps_list': fps_list_}
if len(fps_list_) > 1:
times_pre_image_list_ = [
round(1000 / result['fps'], 1) for result in results
]
avg_times_pre_image_ = sum(times_pre_image_list_) / len(
times_pre_image_list_)
print_log(
f'Overall fps: {fps_list_}[{avg_fps_:.1f}] img/s, '
'times per image: '
f'{times_pre_image_list_}[{avg_times_pre_image_:.1f}] '
'ms/img', self.logger)
else:
print_log(
f'Overall fps: {fps_list_[0]:.1f} img/s, '
f'times per image: {1000 / fps_list_[0]:.1f} ms/img',
self.logger)
print_log(f'cuda memory: {get_max_cuda_memory()} MB', self.logger)
print_process_memory(self._process, self.logger)
return outputs
[docs]class DataLoaderBenchmark(BaseBenchmark):
"""The dataloader benchmark class. It will be statistical inference FPS and
CPU memory information.
Args:
cfg (mmengine.Config): config.
distributed (bool): distributed testing flag.
dataset_type (str): benchmark data type, only supports ``train``,
``val`` and ``test``.
max_iter (int): maximum iterations of benchmark. Defaults to 2000.
log_interval (int): interval of logging. Defaults to 50.
num_warmup (int): Number of Warmup. Defaults to 5.
logger (MMLogger, optional): Formatted logger used to record messages.
"""
def __init__(self,
cfg: Config,
distributed: bool,
dataset_type: str,
max_iter: int = 2000,
log_interval: int = 50,
num_warmup: int = 5,
logger: Optional[MMLogger] = None):
super().__init__(max_iter, log_interval, num_warmup, logger)
assert dataset_type in ['train', 'val', 'test'], \
'dataset_type only supports train,' \
f' val and test, but got {dataset_type}'
assert get_world_size(
) == 1, 'Dataloader benchmark does not allow distributed multi-GPU'
self.cfg = copy.deepcopy(cfg)
self.distributed = distributed
if psutil is None:
raise ImportError('psutil is not installed, please install it by: '
'pip install psutil')
self._process = psutil.Process()
mp_cfg = self.cfg.get('env_cfg', {}).get('mp_cfg')
if mp_cfg is not None:
set_multi_processing(distributed=self.distributed, **mp_cfg)
else:
set_multi_processing(distributed=self.distributed)
print_log('before build: ', self.logger)
print_process_memory(self._process, self.logger)
if dataset_type == 'train':
self.data_loader = Runner.build_dataloader(cfg.train_dataloader)
elif dataset_type == 'test':
self.data_loader = Runner.build_dataloader(cfg.test_dataloader)
else:
self.data_loader = Runner.build_dataloader(cfg.val_dataloader)
self.batch_size = self.data_loader.batch_size
self.num_workers = self.data_loader.num_workers
print_log('after build: ', self.logger)
print_process_memory(self._process, self.logger)
[docs] def run_once(self) -> dict:
"""Executes the benchmark once."""
pure_inf_time = 0
fps = 0
# benchmark with 2000 image and take the average
start_time = time.perf_counter()
for i, data in enumerate(self.data_loader):
elapsed = time.perf_counter() - start_time
if (i + 1) % self.log_interval == 0:
print_log('==================================', self.logger)
if i >= self.num_warmup:
pure_inf_time += elapsed
if (i + 1) % self.log_interval == 0:
fps = (i + 1 - self.num_warmup) / pure_inf_time
print_log(
f'Done batch [{i + 1:<3}/{self.max_iter}], '
f'fps: {fps:.1f} batch/s, '
f'times per batch: {1000 / fps:.1f} ms/batch, '
f'batch size: {self.batch_size}, num_workers: '
f'{self.num_workers}', self.logger)
print_process_memory(self._process, self.logger)
if (i + 1) == self.max_iter:
fps = (i + 1 - self.num_warmup) / pure_inf_time
break
start_time = time.perf_counter()
return {'fps': fps}
[docs] def average_multiple_runs(self, results: List[dict]) -> dict:
"""Average the results of multiple runs."""
print_log('============== Done ==================', self.logger)
fps_list_ = [round(result['fps'], 1) for result in results]
avg_fps_ = sum(fps_list_) / len(fps_list_)
outputs = {'avg_fps': avg_fps_, 'fps_list': fps_list_}
if len(fps_list_) > 1:
times_pre_image_list_ = [
round(1000 / result['fps'], 1) for result in results
]
avg_times_pre_image_ = sum(times_pre_image_list_) / len(
times_pre_image_list_)
print_log(
f'Overall fps: {fps_list_}[{avg_fps_:.1f}] img/s, '
'times per batch: '
f'{times_pre_image_list_}[{avg_times_pre_image_:.1f}] '
f'ms/batch, batch size: {self.batch_size}, num_workers: '
f'{self.num_workers}', self.logger)
else:
print_log(
f'Overall fps: {fps_list_[0]:.1f} batch/s, '
f'times per batch: {1000 / fps_list_[0]:.1f} ms/batch, '
f'batch size: {self.batch_size}, num_workers: '
f'{self.num_workers}', self.logger)
print_process_memory(self._process, self.logger)
return outputs
[docs]class DatasetBenchmark(BaseBenchmark):
"""The dataset benchmark class. It will be statistical inference FPS, FPS
pre transform and CPU memory information.
Args:
cfg (mmengine.Config): config.
dataset_type (str): benchmark data type, only supports ``train``,
``val`` and ``test``.
max_iter (int): maximum iterations of benchmark. Defaults to 2000.
log_interval (int): interval of logging. Defaults to 50.
num_warmup (int): Number of Warmup. Defaults to 5.
logger (MMLogger, optional): Formatted logger used to record messages.
"""
def __init__(self,
cfg: Config,
dataset_type: str,
max_iter: int = 2000,
log_interval: int = 50,
num_warmup: int = 5,
logger: Optional[MMLogger] = None):
super().__init__(max_iter, log_interval, num_warmup, logger)
assert dataset_type in ['train', 'val', 'test'], \
'dataset_type only supports train,' \
f' val and test, but got {dataset_type}'
assert get_world_size(
) == 1, 'Dataset benchmark does not allow distributed multi-GPU'
self.cfg = copy.deepcopy(cfg)
if dataset_type == 'train':
dataloader_cfg = copy.deepcopy(cfg.train_dataloader)
elif dataset_type == 'test':
dataloader_cfg = copy.deepcopy(cfg.test_dataloader)
else:
dataloader_cfg = copy.deepcopy(cfg.val_dataloader)
dataset_cfg = dataloader_cfg.pop('dataset')
dataset = DATASETS.build(dataset_cfg)
if hasattr(dataset, 'full_init'):
dataset.full_init()
self.dataset = dataset
self.dataset_type = dataset_type
[docs] def run_once(self) -> dict:
"""Executes the benchmark once."""
pure_inf_time = 0
fps = 0
if self.dataset_type == 'test' and isinstance(self.dataset,
BaseSOTDataset):
total_index = []
for video_ind in range(self.dataset.num_videos):
total_index.extend([
(video_ind, frame_ind) for frame_ind in range(
self.dataset.get_len_per_video(video_ind))
])
else:
total_index = list(range(len(self.dataset)))
start_time = time.perf_counter()
for i, idx in enumerate(total_index):
if (i + 1) % self.log_interval == 0:
print_log('==================================', self.logger)
get_data_info_start_time = time.perf_counter()
valid_idx = idx[0] if isinstance(idx, tuple) else idx
self.dataset.get_data_info(valid_idx)
get_data_info_elapsed = time.perf_counter(
) - get_data_info_start_time
if (i + 1) % self.log_interval == 0:
print_log(f'get_data_info - {get_data_info_elapsed * 1000} ms',
self.logger)
self.dataset[idx]
elapsed = time.perf_counter() - start_time - get_data_info_elapsed
if i >= self.num_warmup:
# print_log(f'{elapsed}', self.logger)
pure_inf_time += elapsed
if (i + 1) % self.log_interval == 0:
fps = (i + 1 - self.num_warmup) / pure_inf_time
print_log(
f'Done img [{i + 1:<3}/{self.max_iter}], '
f'fps: {fps:.1f} img/s, '
f'times per img: {1000 / fps:.1f} ms/img', self.logger)
if (i + 1) == self.max_iter:
fps = (i + 1 - self.num_warmup) / pure_inf_time
break
start_time = time.perf_counter()
return {'fps': fps}
[docs] def average_multiple_runs(self, results: List[dict]) -> dict:
"""Average the results of multiple runs."""
print_log('============== Done ==================', self.logger)
fps_list_ = [round(result['fps'], 1) for result in results]
avg_fps_ = sum(fps_list_) / len(fps_list_)
outputs = {'avg_fps': avg_fps_, 'fps_list': fps_list_}
if len(fps_list_) > 1:
times_pre_image_list_ = [
round(1000 / result['fps'], 1) for result in results
]
avg_times_pre_image_ = sum(times_pre_image_list_) / len(
times_pre_image_list_)
print_log(
f'Overall fps: {fps_list_}[{avg_fps_:.1f}] img/s, '
'times per img: '
f'{times_pre_image_list_}[{avg_times_pre_image_:.1f}] '
'ms/img', self.logger)
else:
print_log(
f'Overall fps: {fps_list_[0]:.1f} img/s, '
f'times per img: {1000 / fps_list_[0]:.1f} ms/img',
self.logger)
return outputs