https://github.com/apache/incubator-mxnet/pull/10696
MXNet inherent distributed training mechanism, parameter server, provides efficient communication in ASGD and fault tolerance especially in cloud environment. But we found that in small-scale number of nodes (8-64) mpi allreduce can achieved the scaling efficiency close to linear while there's no extra server node deployment. So we suggest to add mpi-allreduce as an alternative choice for customer in MXNet multi-node distributed training.
Following is the performance data we collect for MXNet multi-node in 10GbE Network.
Machine: SKX6148, Network: 10GbE, Topology: VGG16, Local Batch Size: 64, KVStore Type: dist_sync. Parameter Server
work num | server num | Per Node FPS(pic/s) | Scaling Efficiency |
---|---|---|---|
8 | 8(worker and server share node) | 19.87 | 67.81% |
8 | 8 | 27.3 | 93.17% |
8 | 4 | 22.7 | 77.47% |
8 | 2 | 11.11 | 37.90% |
Command line: python tools/launch.py -n 8 -s <server_num> --launcher ssh -H hosts python example/image-classification/train_vgg16.py --kv-store dist_sync
Following is the result of MXNet multinode with mpi allreduce supported from our proof of concept (ready):
Node Num | Per Node FPS(pic/s) | Scaling Efficiency |
---|---|---|
8 | 27.76 | 94.74% |
Command line: mpirun -n 8 -ppn 1 -machinefile hosts python example/image-classification/train_vgg16.py --kv-store dist_sync_mpi
MPI Allreduce's good scalability comes from the tremendous communication time decrease compared with parameter server if not allocated enough server num.
Following is allreduce benchmark (400M payload 8 worker num) for reference:
Method | server num | worker num | Time(s) |
---|---|---|---|
ParameterServer (push+pull) | 1 | 8 | 6.5 |
ParameterServer (push+pull) | 2 | 8 | 3.4 |
ParameterServer (push+pull) | 4 | 8 | 2.0 |
ParameterServer (push+pull) | 8 | 8 | 1.2 |
MPI.Allreduce | N/A | 8 | 1.0 |
From the performance data, we can draw the following conclusions:
a) MPI Allreduce don't need extra server node and it can achieve the best performance which parameter server can reach in synchronous SGD multi-node training. While for parameter server, if not well configured, insufficient servers will be the hot spots in terms of network bandwidth.
b) Unlike parameter server, MPI Allreduce is easy for hardware deployment. Because in parameter server, the configuration of server:worker ratio needs meticulously calculated and it's not fixed (depending upon topology and network bandwidth). Moreover, if the parameter servers are configured with low end machine, mxnet has to be compiled with low end machine configuration for the final deployment. (e.g. If the server doesn't support AVX2 instruction while workers support AVX2 instructions, the MXNet has to be compiled without AVX2 supported.)
Following is the overview architecture of mpi-allreduce and parameter server:
AllReduce-based distributed training
PS-based distributed training (ref here)
No coincidence, in 2016-2017, both Baidu and Uber propose their mpi-allreduce-based distributed training framework (tensorflow-allreduce, horovod) for tensorflow, considering the drawbacks we mentioned in tensorflow inherent parameter server based distributed training.
http://research.baidu.com/bringing-hpc-techniques-deep-learning
https://github.com/uber/horovod
https://github.com/baidu-research/tensorflow-allreduce/tree/master/tensorflow/contrib/mpi
Moreover, we noticed that most of mainstream Deep Learning frameworks have mpi-allreduce-based distributed training mechnism:
Framework | Distributed Communication Mechanism |
---|---|
Tensorflow | PS + mpi-allreduce(baidu allreduce) |
MXNet | PS |
Caffe | mpi-allreduce |
Torch + PyTorch | mpi-allreduce |
Chainer | mpi-allreduce(mpi4py) |
Besides existing distributed training mechanism parameter server in MXNet, we suggest to add mpi-allreduce as an alternative distributed training mechanism which can significantly enhance multi-node scaling efficiency for synchronous SGD distributed training with least cost.
Our POC already implemented python package named mpi-collectives as mxnet submodule. The initial experiment shows good performance just as upper data shows. For allreduce based mxnet multi-node, most of exposed interfaces in mpi_collectives.kvstore is the same as default kvstore. Following is the examples of using them:
Note: Following examples is based upon the assumption that mpi.kvstore (dist_sync_mpi) is implemented in python package. If mpi.kvstore is implemented in CPP, there's no need to import mpi_collectives package.
Typical example of image classification:
import mxnet as mx
import mpi_collectives as mpi
def fit(args, network, data_loader, **kwargs):
"""
train a model
args : argparse returns
network : the symbol definition of the nerual network
data_loader : function that returns the train and val data iterators
"""
# kvstore
kv = mpi.kvstore.create('dist_sync_mpi')# data iterators
(train, val) = data_loader(args, kv)...
# run
model.fit(train,
begin_epoch=args.load_epoch if args.load_epoch else 0,
num_epoch=args.num_epochs,
eval_data=val,
eval_metric=eval_metrics,
kvstore=kv,
optimizer=args.optimizer,
optimizer_params=optimizer_params,
initializer=initializer,
arg_params=arg_params,
aux_params=aux_params,
batch_end_callback=batch_end_callbacks,
epoch_end_callback=checkpoint,
allow_missing=True,
monitor=monitor)
There are two types of node in a MPI parallelism distributed training architecture: coordinator and normal worker node. Each node will have a dedicated CPU thread to send and receive MPI messages.
To add MPI allReduce distributed training to MXNet and make it cooperate with existing components, like dependency engine, execution engine, operators, NDArray, a new KVStore-MPI is implemented to extend the functionality of the existing KVStore in MXNet. In this section, we will introduce the design and APIs of KVStore-MPI.
1) CPP
To support more frontend languages and improve the efficiency of KVStore-MPI, implementing KVStore-MPI with CPP and making it at the same level of the original KVStore is our preferrable option since originally KVStore factory pattern is implemented in CPP level.
2) Python
Python is widely used by data scientists and undoubtedly the dominant language in deep learning and machine learning field. A Python oriented KVStore-MPI can be easily implemented to support most of MXNet distributed training workloads. Besides that, pretty few code of MXNet core components need be changed for a Python oriented KVStore-MPI interface.
In order to support MPI allReduce distributed training in MXNet, a new option is added to the original job launching script. The new launching script will be backward compatible.
Python launch.py -n 8 -H hosts --launcher ssh --sync-dst-dir /tmp/mxnet_job/ --mode mpi python image_classification.py --dataset cifar10 --model vgg11 --num-epochs 1 --kvstore dist_sync
Options:
Currently, our allReduce-based distributed training solution is designed and demoed on CPU multi-node without GPU card. But it can be easily extended to multi-node with multi-card on each node. In that scenario, tensors on each GPU card will be gathered to host node firstly with the existing methods for parameter server, like KVStore-local-device and KVStore-NCCL. Then our new allReduce method will handle tensors among all CPU nodes.
a) If allReduce-based solution is implemented in CPP level
If you want to build mxnet with MPI support, you should specify USE_MPI_DIST_KVSTORE flag in make/config.mk
USE_MPI_DIST_KVSTORE = 1
And you need specify MPI root dir.
MPI_ROOT=/usr/mpi
b) If allReduce-based soluton is implemented in python package as mxnet submodule
Same as the upper, you need to specify USE_MPI_DIST_KVSTORE and MPI_ROOT. Besides, this submodule will built after libmxnet has been built out.
following MPI KVStore interfaces will be exposed and used in Mxnet