Converting an Existing Training Loop#

The following instructions assume you have a training function that can already be run on a single worker.

Updating your training function#

First, you’ll want to update your training function to support distributed training.

Ray Train will set up your distributed process group for you and also provides utility methods to automatically prepare your model and data for distributed training.

Note

Ray Train will still work even if you don’t use the ray.train.torch.prepare_model() and ray.train.torch.prepare_data_loader() utilities below, and instead handle the logic directly inside your training function.

First, use the prepare_model() function to automatically move your model to the right device and wrap it in DistributedDataParallel:

 import torch
 from torch.nn.parallel import DistributedDataParallel
+from ray.air import session
+from ray import train
+import ray.train.torch


 def train_func():
-    device = torch.device(f"cuda:{train.get_context().get_local_rank()}" if
-        torch.cuda.is_available() else "cpu")
-    torch.cuda.set_device(device)

     # Create model.
     model = NeuralNetwork()

-    model = model.to(device)
-    model = DistributedDataParallel(model,
-        device_ids=[train.get_context().get_local_rank()] if torch.cuda.is_available() else None)

+    model = train.torch.prepare_model(model)

     ...

Then, use the prepare_data_loader function to automatically add a DistributedSampler to your DataLoader and move the batches to the right device. This step is not necessary if you are passing in Ray Data to your Trainer (see Data Loading and Preprocessing):

 import torch
 from torch.utils.data import DataLoader, DistributedSampler
+from ray import train
+import ray.train.torch


 def train_func():
-    device = torch.device(f"cuda:{train.get_context().get_local_rank()}" if
-        torch.cuda.is_available() else "cpu")
-    torch.cuda.set_device(device)

     ...

-    data_loader = DataLoader(my_dataset, batch_size=worker_batch_size, sampler=DistributedSampler(dataset))

+    data_loader = DataLoader(my_dataset, batch_size=worker_batch_size)
+    data_loader = train.torch.prepare_data_loader(data_loader)

     for X, y in data_loader:
-        X = X.to_device(device)
-        y = y.to_device(device)

Tip

Keep in mind that DataLoader takes in a batch_size which is the batch size for each worker. The global batch size can be calculated from the worker batch size (and vice-versa) with the following equation:

global_batch_size = worker_batch_size * train.get_context().get_world_size()

Creating a TorchTrainer#

Trainers are the primary Ray Train classes that are used to manage state and execute training. For distributed PyTorch, we use a TorchTrainer that you can setup like this:

from ray.air import ScalingConfig
from ray.train.torch import TorchTrainer
# For GPU Training, set `use_gpu` to True.
use_gpu = False
trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=2)
)

To customize the backend setup, you can pass a TorchConfig:

from ray.air import ScalingConfig
from ray.train.torch import TorchTrainer, TorchConfig

trainer = TorchTrainer(
    train_func,
    torch_backend=TorchConfig(...),
    scaling_config=ScalingConfig(num_workers=2),
)

For more configurability, please reference the DataParallelTrainer API.

Running your training function#

With a distributed training function and a Ray Train Trainer, you are now ready to start training!

trainer.fit()

Configuring Training#

With Ray Train, you can execute a training function (train_func) in a distributed manner by calling Trainer.fit. To pass arguments into the training function, you can expose a single config dictionary parameter:

-def train_func():
+def train_func(config):

Then, you can pass in the config dictionary as an argument to Trainer:

+config = {} # This should be populated.
 trainer = TorchTrainer(
     train_func,
+    train_loop_config=config,
     scaling_config=ScalingConfig(num_workers=2)
 )

Putting this all together, you can run your training function with different configurations. As an example:

from ray import train
from ray.air import ScalingConfig
from ray.train.torch import TorchTrainer

def train_func(config):
    for i in range(config["num_epochs"]):
        train.report({"epoch": i})

trainer = TorchTrainer(
    train_func,
    train_loop_config={"num_epochs": 2},
    scaling_config=ScalingConfig(num_workers=2)
)
result = trainer.fit()
print(result.metrics["num_epochs"])
# 1

A primary use-case for config is to try different hyperparameters. To perform hyperparameter tuning with Ray Train, please refer to the Ray Tune integration.

Accessing Training Results#

The return of a Trainer.fit is a Result object, containing information about the training run. You can access it to obtain saved checkpoints, metrics and other relevant data.

For example, you can:

  • Print the metrics for the last training iteration:

from pprint import pprint

pprint(result.metrics)
# {'_time_this_iter_s': 0.001016855239868164,
#  '_timestamp': 1657829125,
#  '_training_iteration': 2,
#  'config': {},
#  'date': '2022-07-14_20-05-25',
#  'done': True,
#  'episodes_total': None,
#  'epoch': 1,
#  'experiment_id': '5a3f8b9bf875437881a8ddc7e4dd3340',
#  'experiment_tag': '0',
#  'hostname': 'ip-172-31-43-110',
#  'iterations_since_restore': 2,
#  'node_ip': '172.31.43.110',
#  'pid': 654068,
#  'time_since_restore': 3.4353830814361572,
#  'time_this_iter_s': 0.00809168815612793,
#  'time_total_s': 3.4353830814361572,
#  'timestamp': 1657829125,
#  'timesteps_since_restore': 0,
#  'timesteps_total': None,
#  'training_iteration': 2,
#  'trial_id': '4913f_00000',
#  'warmup_time': 0.003167867660522461}
  • View the dataframe containing the metrics from all iterations:

print(result.metrics_dataframe)
  • Obtain the Checkpoint, used for resuming training, prediction and serving.

result.checkpoint  # last saved checkpoint
result.best_checkpoints  # N best saved checkpoints, as configured in run_config
result.error  # returns the Exception if training failed.

See the Result docstring for more details.

Hugging Face#

TransformersTrainer#

TransformersTrainer further extends TorchTrainer, built for interoperability with the HuggingFace Transformers library.

Users are required to provide a trainer_init_per_worker function which returns a transformers.Trainer object. The trainer_init_per_worker function will have access to preprocessed train and evaluation datasets.

Upon calling TransformersTrainer.fit(), multiple workers (ray actors) will be spawned, and each worker will create its own copy of a transformers.Trainer.

Each worker will then invoke transformers.Trainer.train(), which will perform distributed training via Pytorch DDP.

Code example

# Based on
# huggingface/notebooks/examples/language_modeling_from_scratch.ipynb

# Hugging Face imports
from datasets import load_dataset
import transformers
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer

import ray
from ray.train.huggingface import TransformersTrainer
from ray.train import ScalingConfig


# If using GPUs, set this to True.
use_gpu = False

model_checkpoint = "gpt2"
tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer"
block_size = 128

datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint)


def tokenize_function(examples):
    return tokenizer(examples["text"])


tokenized_datasets = datasets.map(
    tokenize_function, batched=True, num_proc=1, remove_columns=["text"]
)


def group_texts(examples):
    # Concatenate all texts.
    concatenated_examples = {k: sum(examples[k], []) for k in examples.keys()}
    total_length = len(concatenated_examples[list(examples.keys())[0]])
    # We drop the small remainder, we could add padding if the model
    # supported it.
    # instead of this drop, you can customize this part to your needs.
    total_length = (total_length // block_size) * block_size
    # Split by chunks of max_len.
    result = {
        k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
        for k, t in concatenated_examples.items()
    }
    result["labels"] = result["input_ids"].copy()
    return result


lm_datasets = tokenized_datasets.map(
    group_texts,
    batched=True,
    batch_size=1000,
    num_proc=1,
)
ray_train_ds = ray.data.from_huggingface(lm_datasets["train"])
ray_evaluation_ds = ray.data.from_huggingface(lm_datasets["validation"])


def trainer_init_per_worker(train_dataset, eval_dataset, **config):
    model_config = AutoConfig.from_pretrained(model_checkpoint)
    model = AutoModelForCausalLM.from_config(model_config)
    args = transformers.TrainingArguments(
        output_dir=f"{model_checkpoint}-wikitext2",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        logging_strategy="epoch",
        learning_rate=2e-5,
        weight_decay=0.01,
        no_cuda=(not use_gpu),
    )
    return transformers.Trainer(
        model=model,
        args=args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
    )


scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
trainer = TransformersTrainer(
    trainer_init_per_worker=trainer_init_per_worker,
    scaling_config=scaling_config,
    datasets={"train": ray_train_ds, "evaluation": ray_evaluation_ds},
)
result = trainer.fit()

AccelerateTrainer#

If you prefer a more fine-grained Hugging Face API than what Transformers provides, you can use AccelerateTrainer to run training functions making use of Hugging Face Accelerate. Similarly to TransformersTrainer, AccelerateTrainer is also an extension of TorchTrainer.

AccelerateTrainer allows you to pass an Accelerate configuration file generated with accelerate config to be applied on all training workers. This ensures that the worker environments are set up correctly for Accelerate, allowing you to take advantage of Accelerate APIs and integrations such as DeepSpeed and FSDP just as you would if you were running Accelerate without Ray.

Note

AccelerateTrainer will override some settings set with accelerate config, mainly related to the topology and networking. See the AccelerateTrainer API reference for more details.

Aside from Accelerate support, the usage is identical to TorchTrainer, meaning you define your own training function and use the report() API to report metrics, save checkpoints etc.

Code example
import torch
import torch.nn as nn

from accelerate import Accelerator

import ray
from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.huggingface import AccelerateTrainer


# If using GPUs, set this to True.
use_gpu = False


input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3


class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))


def train_loop_per_worker():
    accelerator = Accelerator()
    dataset_shard = train.get_dataset_shard("train")
    model = NeuralNetwork()
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1)

    model, optimizer = accelerator.prepare(model, optimizer)

    for epoch in range(num_epochs):
        for batches in dataset_shard.iter_torch_batches(
            batch_size=32, dtypes=torch.float
        ):
            inputs, labels = torch.unsqueeze(batches["x"], 1), batches["y"]
            output = model(inputs)
            loss = loss_fn(output, labels)
            optimizer.zero_grad()
            accelerator.backward(loss)
            optimizer.step()
            print(f"epoch: {epoch}, loss: {loss.item()}")

        train.report(
            {},
            checkpoint=Checkpoint.from_dict(
                dict(epoch=epoch, model=accelerator.unwrap_model(model).state_dict())
            ),
        )


train_dataset = ray.data.from_items([{"x": x, "y": 2 * x + 1} for x in range(200)])
scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
trainer = AccelerateTrainer(
    train_loop_per_worker=train_loop_per_worker,
    # Instead of using a dict, you can run ``accelerate config``.
    # The default value of None will then load that configuration
    # file.
    accelerate_config={},
    scaling_config=scaling_config,
    datasets={"train": train_dataset},
)
result = trainer.fit()