Data Loading and Preprocessing#

This guide covers how to leverage Ray Data to load data for distributed training jobs. You may want to use Ray Data for training over framework built-in data loading utilities for a few reasons:

  1. To leverage the full Ray cluster to speed up preprocessing of your data.

  2. To make data loading agnostic of the underlying framework.

  3. Advanced Ray Data features such as global shuffles.

Overview#

Ray Data is the recommended way to work with large datasets in Ray Train. Ray Data provides automatic loading, sharding, and streamed ingest of Data across multiple Train workers. To get started, pass in one or more datasets under the datasets keyword argument for Trainer (e.g., Trainer(datasets={...})).

In a nutshell, datasets passed to your Trainer can be accessed from the training function with train.get_dataset_shard("train") like this:

from ray import train

# Datasets can be accessed in your train_func via ``get_dataset_shard``.
def train_func(config):
    train_data_shard = train.get_dataset_shard("train")
    validation_data_shard = train.get_dataset_shard("validation")
    ...

# Random split the dataset into 80% training data and 20% validation data.
dataset = ray.data.read_csv("...")
train_dataset, validation_dataset = dataset.train_test_split(
    test_size=0.2, shuffle=True,
)

trainer = TorchTrainer(
    train_func,
    datasets={"train": train_dataset, "validation": validation_dataset},
    scaling_config=ScalingConfig(num_workers=8),
)
trainer.fit()

Basics#

Let’s use a single Torch training workload as a running example. A very basic example of using Ray Data with TorchTrainer looks like this:

import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

import numpy as np
from typing import Dict

# Load the data.
train_ds = ray.data.read_parquet("s3://[email protected]/iris.parquet")
## Uncomment to randomize the block order each epoch.
# train_ds = train_ds.randomize_block_order()


# Define a preprocessing function.
def normalize_length(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    new_col = batch["sepal.length"] / np.max(batch["sepal.length"])
    batch["normalized.sepal.length"] = new_col
    del batch["sepal.length"]
    return batch


# Preprocess your data any way you want. This will be re-run each epoch.
# You can use Ray Data preprocessors here as well,
# e.g., preprocessor.fit_transform(train_ds)
train_ds = train_ds.map_batches(normalize_length)


def train_loop_per_worker():
    # Get an iterator to the dataset we passed in below.
    it = train.get_dataset_shard("train")

    # Train for 10 epochs over the data. We'll use a shuffle buffer size
    # of 10k elements, and prefetch up to 10 batches of size 128 each.
    for _ in range(10):
        for batch in it.iter_batches(
            local_shuffle_buffer_size=10000, batch_size=128, prefetch_batches=10
        ):
            print("Do some training on batch", batch)


my_trainer = TorchTrainer(
    train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=2),
    datasets={"train": train_ds},
)
my_trainer.fit()

In this basic example, the train_ds object is created in your Ray script before the Trainer is even instantiated. The train_ds object is passed to the Trainer via the datasets argument, and is accessible to the train_loop_per_worker function via the train.get_dataset_shard method.

Splitting data across workers#

By default, Train will split the "train" dataset across workers using Dataset.streaming_split. This means that each worker sees a disjoint subset of the data, instead of iterating over the entire dataset. To customize this, we can pass in a DataConfig to the Trainer constructor. For example, the following splits dataset "a" but not "b".

dataset_a = ray.data.read_text(
    "s3://[email protected]/sms_spam_collection_subset.txt"
)
dataset_b = ray.data.read_csv("s3://[email protected]/dow_jones.csv")

my_trainer = TorchTrainer(
    train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=2),
    datasets={"a": dataset_a, "b": dataset_b},
    dataset_config=ray.train.DataConfig(
        datasets_to_split=["a"],
    ),
)

Performance#

This section covers common options for improving ingest performance.

Materializing your dataset#

Datasets are lazy and their execution is streamed, which means that on each epoch, all preprocessing operations will be re-run. If this loading / preprocessing is expensive, you may benefit from materializing your dataset in memory. This tells Ray Data to compute all the blocks of the dataset fully and pin them in Ray object store memory. This means that when iterating over the dataset repeatedly, the preprocessing operations do not need to be re-run, greatly improving performance. However, the trade-off is that if the preprocessed data is too large to fit into Ray object store memory, this could slow things down because data needs to be spilled to disk.

# Load the data.
train_ds = ray.data.read_parquet("s3://[email protected]/iris.parquet")

# Preprocess the data. Transformations that are made to the materialize call below
# will only be run once.
train_ds = train_ds.map_batches(normalize_length)

# Materialize the dataset in object store memory.
train_ds = train_ds.materialize()

# Add per-epoch preprocessing. Transformations that you want to run per-epoch, such
# as data augmentation, should go after the materialize call.
train_ds = train_ds.map_batches(augment_data)

Ray Data execution options#

Under the hood, Train configures some default Data options for ingest: limiting the data ingest memory usage to 2GB per worker, and telling it to optimize the locality of the output data for ingest. See help(DataConfig.default_ingest_options()) if you want to learn more and further customize these settings.

Common options you may want to adjust:

  • resource_limits.object_store_memory, which sets the amount of Ray object memory to use for Data ingestion. Increasing this can improve performance up to a point where it can trigger disk spilling and slow things down.

  • preserve_order. This is off by default, and lets Ray Data compute blocks out of order. Setting this to True will avoid this source of nondeterminism.

You can pass in custom execution options to the data config, which will apply to all data executions for the Trainer. For example, if you want to adjust the ingest memory size to 10GB per worker:

from ray.train import DataConfig

options = DataConfig.default_ingest_options()
options.resource_limits.object_store_memory = 10e9


my_trainer = TorchTrainer(
    train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=2),
    dataset_config=ray.train.DataConfig(
        execution_options=options,
    ),
)

Other performance tips#

  • Adjust the prefetch_batches argument for DataIterator.iter_batches. This can be useful if bottlenecked on the network.

  • Finally, you can use print(ds.stats()) or print(iterator.stats()) to print detailed timing information about Ray Data performance.

Custom data config (advanced)#

For use cases not covered by the default config class, you can also fully customize exactly how your input datasets are splitted. To do this, you need to define a custom DataConfig class (DeveloperAPI). The DataConfig class is responsible for that shared setup and splitting of data across nodes.

# Note that this example class is doing the same thing as the basic DataConfig
# impl included with Ray Train.
from typing import Optional, Dict, List

from ray.data import Dataset, DataIterator, NodeIdStr
from ray.actor import ActorHandle


class MyCustomDataConfig(DataConfig):
    def configure(
        self,
        datasets: Dict[str, Dataset],
        world_size: int,
        worker_handles: Optional[List[ActorHandle]],
        worker_node_ids: Optional[List[NodeIdStr]],
        **kwargs,
    ) -> List[Dict[str, DataIterator]]:
        assert len(datasets) == 1, "This example only handles the simple case"

        # Configure Ray Data for ingest.
        ctx = ray.data.DataContext.get_current()
        ctx.execution_options = DataConfig.default_ingest_options()

        # Split the stream into shards.
        iterator_shards = datasets["train"].streaming_split(
            world_size, equal=True, locality_hints=worker_node_ids
        )

        # Return the assigned iterators for each worker.
        return [{"train": it} for it in iterator_shards]


my_trainer = TorchTrainer(
    train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=2),
    datasets={"train": train_ds},
    dataset_config=MyCustomDataConfig(),
)
my_trainer.fit()

What do you need to know about this DataConfig class?

  • It must be serializable, since it will be copied from the driver script to the driving actor of the Trainer.

  • Its configure method is called on the main actor of the Trainer group to create the data iterators for each worker.

In general, you can use DataConfig for any shared setup that has to occur ahead of time before the workers start reading data. The setup will be run at the start of each Trainer run.

Migrating from the legacy DatasetConfig API#

Starting from Ray 2.6, the DatasetConfig API is deprecated, and it will be removed in a future release. If your workloads are still using it, consider migrating to the new DataConfig API as soon as possible.

The main difference is that preprocessing is no longer part of the Trainer because Dataset operations are now lazily applied. This means that you can apply any operation to your Datasets before passing them to the Trainer, and the operation will be re-executed before each epoch.

In the following example with the legacy DatasetConfig API, we pass two Datasets (“train” and “test”) to the Trainer and apply an “add_noise” preprocessor per epoch to the “train” Dataset. Also, we will split the “train” Dataset, but not the “test” Dataset.

import random
import ray

from ray.air.config import DatasetConfig
from ray.data.preprocessors.batch_mapper import BatchMapper
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

train_ds = ray.data.range_tensor(1000)
test_ds = ray.data.range_tensor(10)

# A randomized preprocessor that adds a random float to all values.
add_noise = BatchMapper(lambda df: df + random.random(), batch_format="pandas")

my_trainer = TorchTrainer(
    lambda: None,
    scaling_config=ScalingConfig(num_workers=1),
    datasets={
        "train": train_ds,
        "test": test_ds,
    },
    dataset_config={
        "train": DatasetConfig(
            split=True,
            # Apply the preprocessor for each epoch.
            per_epoch_preprocessor=add_noise,
        ),
        "test": DatasetConfig(
            split=False,
        ),
    },
)
my_trainer.fit()

To migrate this example to the new DatasetConfig API, we apply the “add_noise” preprocesor to the “train” Dataset prior to passing it to the Trainer. Then, we use DataConfig(datasets_to_split=["train"]) to specify which Datasets need to be split. Note that the datasets_to_split argument is optional. By default, only the “train” Dataset will be split. If you don’t want to split the “train” Dataset either, use datasets_to_split=[].

from ray.train import DataConfig

train_ds = ray.data.range_tensor(1000)
test_ds = ray.data.range_tensor(10)

# Apply the preprocessor before passing the Dataset to the Trainer.
# This operation is lazy. It will be re-executed for each epoch.
train_ds = add_noise.transform(train_ds)

my_trainer = TorchTrainer(
    lambda: None,
    scaling_config=ScalingConfig(num_workers=1),
    datasets={
        "train": train_ds,
        "test": test_ds,
    },
    # Specify which datasets to split.
    dataset_config=DataConfig(
        datasets_to_split=["train"],
    ),
)
my_trainer.fit()