Distributed Training Based on the AllReduce Architecture
Overview
In the AllReduce architecture, all devices used for training form a logical ring, as shown in Figure 4-6. There is no central node to aggregate the calculated gradients. Each device receives data from the upstream device and transmits data to the downstream device to fully utilize the bandwidth.
The Ascend platform provides the NPUDistributedOptimizer high-level distributed API to implement gradient aggregation in the AllReduce architecture. NPUDistributedOptimizer encapsulates the single-server training optimizer into an NPU-based distributed training optimizer, to support single-server multi-device and multi-server multi-device scenarios. Gradients are calculated in each device before aggregation. After the NPUDistributedOptimizer optimizer is called, AllReduce operators are inserted between the gradient computation and update operators in the generated training graph.
Distributed Training with Estimator
To perform distributed training by using the Estimator method, modify the training script as follows:
- TensorFlow uses train_distribute in Runconfig to specify a distributed training policy. The Ascend platform does not support train_distribute. You need to delete related code.
- During model training, class NPUDistributedOptimizer encapsulates the single-server training optimizer into an NPU distributed training optimizer.
from npu_bridge.estimator.npu.npu_optimizer import NPUDistributedOptimizer def cnn_model_fn(features,labels,mode): # Construct the network. xxx # Calculate the loss. xxx #Configure the TrainingOp(for TRAIN mode) if mode == tf.estimator.ModeKeys.TRAIN: optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001) # Use the SGD optimizer. distributedOptimizer=NPUDistributedOptimizer(optimizer) # Use the NPU-based distributed computing to update gradients. train_op=distributedOptimizer.minimize(loss=loss,global_step=tf.train.get_global_step()) # Minimize the loss. return tf.estimator.EstimatorSpec(mode=mode,loss=loss,train_op=train_op)
- If the original script uses a TensorFlow API to calculate the gradients, for example, grads = tf.gradients(loss, tvars), after NPUDistributedOptimizer is constructed, replace the API with the compute_gradients and apply_gradients methods of NPUDistributedOptimizer.
- In Estimator mode, when NPUDistributedOptimizer is used to implement the AllReduce function, NPUBroadcastGlobalVariablesHook is automatically added to NPUEstimator. Therefore, you do not need to manually implement the broadcast function.
Distributed Training with sess.run
To perform distributed training by using the sess.run method, modify the training script as follows:
- When creating a session, you need to manually add the GradFusionOptimizer optimizer.
from npu_bridge.estimator import npu_ops from tensorflow.core.protobuf.rewriter_config_pb2 import RewriterConfig # Create a session. config = tf.ConfigProto() custom_op = config.graph_options.rewrite_options.custom_optimizers.add() custom_op.name = "NpuOptimizer" custom_op.parameter_map["use_off_line"].b = True config.graph_options.rewrite_options.remapping = RewriterConfig.OFF config.graph_options.rewrite_options.optimizers.extend(["GradFusionOptimizer"]) # Add for distributed training. sess = tf.Session(config=config)
- After the variables are initialized and before the training, the variables are broadcast through the collective communication API broadcast.
from npu_bridge.hccl import hccl_ops def broadcast_global_variables(root_rank, index): """Broadcasts all global variables from root rank to all other processes. Arguments: root_rank: rank of the process from which global variables will be broadcasted to all other processes. index: rank_id """ op_list = [] for var in tf.global_variables(): # the input and out tensor of HCOMBroadcast interface are list if "float" in var.dtype.name: inputs = [var] outputs=hccl_ops.broadcast(tensor=inputs,root_rank=root_rank) if outputs is not None: op_list.append(outputs[0].op) op_list.append(tf.assign(var, outputs[0])) return tf.group(op_list) ... bcast_op = broadcast_global_variables(root_rank, index) sess = tf.Session() ... sess.run(bcast_op)
- During the training, after the gradient data of each device is calculated, the gradient data is aggregated through the collective communication API allreduce.
from npu_bridge.hccl import hccl_ops grads = [ hccl_ops.allreduce(grad, "sum") for grad in grads ]
Alternatively, use the NPUDistributedOptimizer distributed training optimizer to aggregate gradient data.
from npu_bridge.estimator.npu.npu_optimizer import NPUDistributedOptimizer optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001) # Use the SGD optimizer. distributedOptimizer=NPUDistributedOptimizer(optimizer) # Use the NPU-based distributed computing to update gradients.
Distributed Training with Keras
To perform distributed training by using the Keras method, modify the training script as follows:
- Modify the optimizer during Keras model build. Use the TensorFlow single-server training optimizer (do not use the Keras optimizer) and use class NPUDistributedOptimizer to encapsulate the single-server training optimizer. For example:
from npu_bridge.estimator.npu.npu_optimizer import NPUDistributedOptimizer opt = tf.compat.v1.train.AdamOptimizer(learning_rate=0.1) opt = NPUDistributedOptimizer(opt) keras_model.compile(optimizer=opt,loss='sparse_categorical_crossentropy')
In the distributed scenario, the dynamic learning rate cannot be set in the callback function.
- (Optional) If a session is created, you need to manually add the GradFusionOptimizer optimizer.
import tensorflow as tf import tensorflow.python.keras as keras from tensorflow.python.keras import backend as K from tensorflow.core.protobuf.rewriter_config_pb2 import RewriterConfig from npu_bridge.estimator import npu_ops sess_config = tf.ConfigProto() custom_op = sess_config.graph_options.rewrite_options.custom_optimizers.add() custom_op.name = "NpuOptimizer" custom_op.parameter_map["use_off_line"].b = True sess_config.graph_options.rewrite_options.remapping = RewriterConfig.OFF sess_config.graph_options.rewrite_options.optimizers.extend(["GradFusionOptimizer"]) # Add for distributed training. sess = tf.Session(config=sess_config) K.set_session(sess) # Preprocess the data... # Construct a model... # Build the model... # Train the model... sess.close()