ray.train.huggingface.AccelerateTrainer
ray.train.huggingface.AccelerateTrainer#
- class ray.train.huggingface.AccelerateTrainer(*args, **kwargs)[source]#
Bases:
ray.train.torch.torch_trainer.TorchTrainerA Trainer for data parallel HuggingFace Accelerate training with PyTorch.
This Trainer is a wrapper around the
TorchTrainer, providing the following extra functionality: 1. Loading and parsing of Accelerate configuration files (created byaccelerate configCLI command), 2. Applying the configuration files on all workers, making sure the environment is set up correctly.This Trainer runs the function
train_loop_per_workeron multiple Ray Actors. These actors already have the necessary torch process group configured for distributed PyTorch training, as well as all environment variables required by Accelerate, as defined in the configuration file. This allows you to use Accelerate APIs (such asAccelerator) insidetrain_loop_per_workeras you would without Ray.Inside the
train_loop_per_workerfunction, In addition to Accelerate APIs, you can use any of the Ray AIR session methods. See full example code below.def train_loop_per_worker(): # Report intermediate results for callbacks or logging and # checkpoint data. train.report(...) # Get dict of last saved checkpoint. train.get_checkpoint() # Get the Dataset shard for the given key. train.get_dataset_shard("my_dataset") # Get the total number of workers executing training. train.get_context().get_world_size() # Get the rank of this worker. train.get_context().get_world_rank() # Get the rank of the worker on the current node. train.get_context().get_local_rank()
For more information, see the documentation of
TorchTrainer.Note
You need to use
ray.train.report()to communicate results and checkpoints back to Ray Train.Accelerate integrations with DeepSpeed, FSDP, MegatronLM etc. are fully supported. If the Accelerate configuration contains a path to a DeepSpeed config file (
deepspeed_config_file), that file will also be loaded and applied on the workers.The following Accelerate configuration options will be ignored and automatically set by the Trainer according to Ray AIR configs (eg.
ScalingConfig): - Number of machines (num_machines) - Number of processes (num_processes) - Rank of the current machine (machine_rank) - Local rank of the current machine - GPU IDs (gpu_ids) - Number of CPU threads per process (num_cpu_threads_per_process) - IP of the head process (main_process_ip) - Port of the head process (main_process_port) - Whether all machines are on the same network (same_network) - Whether to force a CPU-only mode (cpu/use_cpu) - rdzv backend (rdzv_backend) - Main training function (main_training_function) - Type of launcherThis Trainer requires
accelerate>=0.17.0package.Example
import torch import torch.nn as nn from accelerate import Accelerator import ray from ray import train from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig from ray.train.huggingface import AccelerateTrainer # If using GPUs, set this to True. use_gpu = False # Define NN layers archicture, epochs, and number of workers input_size = 1 layer_size = 32 output_size = 1 num_epochs = 30 num_workers = 3 # Define your network structure class NeuralNetwork(nn.Module): def __init__(self): super(NeuralNetwork, self).__init__() self.layer1 = nn.Linear(input_size, layer_size) self.relu = nn.ReLU() self.layer2 = nn.Linear(layer_size, output_size) def forward(self, input): return self.layer2(self.relu(self.layer1(input))) # Define your train worker loop def train_loop_per_worker(): torch.manual_seed(42) # Initialize the Accelerator accelerator = Accelerator() # Fetch training set dataset_shard = train.get_dataset_shard("train") model = NeuralNetwork() # Loss function, optimizer, prepare model for training. # This moves the data and prepares model for distributed # execution loss_fn = nn.MSELoss() optimizer = torch.optim.Adam( model.parameters(), lr=0.01, weight_decay=0.01 ) model, optimizer = accelerator.prepare(model, optimizer) # Iterate over epochs and batches for epoch in range(num_epochs): for batches in dataset_shard.iter_torch_batches( batch_size=32, dtypes=torch.float ): # Add batch or unsqueeze as an additional dimension [32, x] inputs, labels = torch.unsqueeze(batches["x"], 1), batches["y"] output = model(inputs) # Make output shape same as the as labels loss = loss_fn(output.squeeze(), labels) # Zero out grads, do backward, and update optimizer optimizer.zero_grad() accelerator.backward(loss) optimizer.step() # Print what's happening with loss per 30 epochs if epoch % 20 == 0: print(f"epoch: {epoch}/{num_epochs}, loss: {loss:.3f}") # Report and record metrics, checkpoint model at end of each # epoch train.report( {"loss": loss.item(), "epoch": epoch}, checkpoint=Checkpoint.from_dict( dict( epoch=epoch, model=accelerator.unwrap_model(model).state_dict(), ) ), ) train_dataset = ray.data.from_items( [{"x": x, "y": 2 * x + 1} for x in range(2000)] ) # Define scaling and run configs scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu) run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1)) trainer = AccelerateTrainer( train_loop_per_worker=train_loop_per_worker, # Instead of using a dict, you can run ``accelerate config``. # The default value of None will then load that configuration # file. accelerate_config={}, scaling_config=scaling_config, run_config=run_config, datasets={"train": train_dataset}, ) result = trainer.fit() best_checkpoint_loss = result.metrics["loss"] # Assert loss is less 0.09 assert best_checkpoint_loss <= 0.09
- Parameters
train_loop_per_worker – The training function to execute. This can either take in no arguments or a
configdict.train_loop_config – Configurations to pass into
train_loop_per_workerif it accepts an argument.accelerate_config – Accelerate configuration to be applied on every worker. This can be a path to a file generated with
accelerate config, a configuration dict or None, in which case it will load the configuration file from the default location as defined by Accelerate.torch_config – Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the
backend_configarg ofDataParallelTrainer.scaling_config – Configuration for how to scale data parallel training.
dataset_config – Configuration for dataset ingest.
run_config – Configuration for the execution of the training run.
datasets – Any Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a
preprocessoris provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessorif one is provided.preprocessor – A
ray.data.Preprocessorto preprocess the provided datasets.resume_from_checkpoint – A checkpoint to resume training from.
Methods
can_restore(path)Checks whether a given directory contains a restorable Train experiment.
fit()Runs training.
Returns a copy of this Trainer's final dataset configs.
restore(path[, train_loop_per_worker, ...])Restores a DataParallelTrainer from a previously interrupted/failed run.
setup()Called during fit() to perform initial setup on the Trainer.