Computer Vision#

This guide explains how to perform common computer vision tasks like:

Reading image data#

Datasets like ImageNet store files like this:

root/dog/xxx.png
root/dog/xxy.png
root/dog/[...]/xxz.png

root/cat/123.png
root/cat/nsdf3.png
root/cat/[...]/asd932_.png

To load images stored in this layout, read the raw images and include the class names.

import ray
from ray.data.datasource.partitioning import Partitioning

root = "s3://[email protected]/cifar-10/images"
partitioning = Partitioning("dir", field_names=["class"], base_dir=root)
dataset = ray.data.read_images(root, partitioning=partitioning)

Then, apply a user-defined function to encode the class names as integer targets.

from typing import Dict

import numpy as np

CLASS_TO_LABEL = {
    "airplane": 0,
    "automobile": 1,
    "bird": 2,
    "cat": 3,
    "deer": 4,
    "dog": 5,
    "frog": 6,
    "horse": 7,
    "ship": 8,
    "truck": 9,
}

def add_label_column(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    labels = []
    for name in batch["class"]:
        label = CLASS_TO_LABEL[name]
        labels.append(label)
    batch["label"] = np.array(labels)
    return batch

def remove_class_column(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    del batch["class"]
    return batch

dataset = dataset.map_batches(add_label_column).map_batches(remove_class_column)

Tip

You can also use LabelEncoder to encode labels.

To load NumPy arrays into a Dataset, separately read the image and label arrays.

import ray

images = ray.data.read_numpy("s3://[email protected]/cifar-10/images.npy")
labels = ray.data.read_numpy("s3://[email protected]/cifar-10/labels.npy")

Then, combine the datasets and rename the columns.

dataset = images.zip(labels)
dataset = dataset.map_batches(
    lambda batch: batch.rename(columns={"data": "image", "data_1": "label"}),
    batch_format="pandas",
)

Image datasets often contain tf.train.Example messages that look like this:

features {
    feature {
        key: "image"
        value {
            bytes_list {
                value: ...  # Raw image bytes
            }
        }
    }
    feature {
        key: "label"
        value {
            int64_list {
                value: 3
            }
        }
    }
}

To load examples stored in this format, read the TFRecords into a Dataset.

import ray

dataset = ray.data.read_tfrecords(
    "s3://[email protected]/cifar-10/tfrecords"
)

Then, apply a user-defined function to decode the raw image bytes.

import io
from typing import Dict

import numpy as np
from PIL import Image

def decode_bytes(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    images = []
    for data in batch["image"]:
        image = Image.open(io.BytesIO(data))
        images.append(np.array(image))
    batch["image"] = np.array(images)
    return batch

dataset = dataset.map_batches(decode_bytes, batch_format="numpy")

To load image data stored in Parquet files, call ray.data.read_parquet().

import ray

dataset = ray.data.read_parquet("s3://[email protected]/cifar-10/parquet")

For more information on creating datasets, see Loading Data.

Transforming images#

To transform images, create a Preprocessor. They’re the standard way to preprocess data with Ray.

To apply TorchVision transforms, create a TorchVisionPreprocessor.

Create two TorchVisionPreprocessors – one to normalize images, and another to augment images. Later, you’ll pass the preprocessors to Trainers and Predictors.

from torchvision import transforms

from ray.data.preprocessors import TorchVisionPreprocessor

transform = transforms.Compose([transforms.ToTensor(), transforms.CenterCrop(224)])
preprocessor = TorchVisionPreprocessor(columns=["image"], transform=transform)

per_epoch_transform = transforms.RandomHorizontalFlip(p=0.5)
per_epoch_preprocessor = TorchVisionPreprocessor(
    columns=["image"], transform=per_epoch_transform
)

To apply TorchVision transforms, create a BatchMapper.

Create two BatchMapper – one to normalize images, and another to augment images. Later, you’ll pass the preprocessors to Trainers and Predictors.

from typing import Dict

import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import imagenet_utils

from ray.data.preprocessors import BatchMapper

def preprocess(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["image"] = imagenet_utils.preprocess_input(batch["image"])
    batch["image"] = tf.image.resize(batch["image"], (224, 224)).numpy()
    return batch

preprocessor = BatchMapper(preprocess, batch_format="numpy")

def augment(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["image"] = tf.image.random_flip_left_right(batch["image"]).numpy()
    return batch

per_epoch_preprocessor = BatchMapper(augment, batch_format="numpy")

For more information on transforming data, see Using Preprocessors and Transforming Data.

Training vision models#

Trainers let you train models in parallel.

To train a vision model, define the training loop per worker.

import torch.nn as nn
import torch.optim as optim
from torchvision import models

from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchCheckpoint, TorchTrainer

def train_one_epoch(model, *, criterion, optimizer, batch_size, epoch):
    dataset_shard = train.get_dataset_shard("train")

    running_loss = 0
    for i, batch in enumerate(
        dataset_shard.iter_torch_batches(
            batch_size=batch_size, local_shuffle_buffer_size=256
        )
    ):
        inputs, labels = batch["image"], batch["label"]

        outputs = model(inputs)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % 2000 == 1999:
            train.report(
                metrics={
                    "epoch": epoch,
                    "batch": i,
                    "running_loss": running_loss / 2000,
                },
                checkpoint=TorchCheckpoint.from_model(model),
            )
            running_loss = 0

def train_loop_per_worker(config):
    model = train.torch.prepare_model(models.resnet50())
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=config["lr"])

    for epoch in range(config["epochs"]):
        train_one_epoch(
            model,
            criterion=criterion,
            optimizer=optimizer,
            batch_size=config["batch_size"],
            epoch=epoch,
        )

Then, create a TorchTrainer and call fit().

dataset = per_epoch_preprocessor.transform(dataset)
trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config={"batch_size": 32, "lr": 0.02, "epochs": 1},
    datasets={"train": dataset},
    scaling_config=ScalingConfig(num_workers=2),
    preprocessor=preprocessor,
)
results = trainer.fit()

For more in-depth examples, see the Ray Train documentation.

To train a vision model, define the training loop per worker.

import tensorflow as tf

from ray import train
from ray.air.integrations.keras import ReportCheckpointCallback

def train_loop_per_worker(config):
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

    train_shard = train.get_dataset_shard("train")
    train_dataset = train_shard.to_tf(
        "image",
        "label",
        batch_size=config["batch_size"],
        local_shuffle_buffer_size=256,
    )

    with strategy.scope():
        model = tf.keras.applications.resnet50.ResNet50(weights=None)
        optimizer = tf.keras.optimizers.Adam(config["lr"])
        model.compile(
            optimizer=optimizer,
            loss="sparse_categorical_crossentropy",
            metrics=["accuracy"],
        )

    model.fit(
        train_dataset,
        epochs=config["epochs"],
        callbacks=[ReportCheckpointCallback()],
    )

Then, create a TensorflowTrainer and call fit().

from ray.train import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer

# The following transform operation is lazy.
# It will be re-run every epoch.
dataset = per_epoch_preprocessor.transform(dataset)
trainer = TensorflowTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config={"batch_size": 32, "lr": 0.02, "epochs": 1},
    datasets={"train": dataset},
    scaling_config=ScalingConfig(num_workers=2),
    preprocessor=preprocessor,
)
results = trainer.fit()

For more information, check out the Ray Train documentation.

Creating checkpoints#

Checkpoints are required for batch inference and model serving. They contain model state and optionally a preprocessor.

If you’re going from training to prediction, don’t create a new checkpoint. Trainer.fit() returns a Result object. Use Result.checkpoint instead.

To create a TorchCheckpoint, pass a Torch model and the Preprocessor you created in Transforming images to TorchCheckpoint.from_model().

from torchvision import models

from ray.train.torch import TorchCheckpoint

model = models.resnet50(pretrained=True)
checkpoint = TorchCheckpoint.from_model(model, preprocessor=preprocessor)

To create a TensorflowCheckpoint, pass a TensorFlow model and the Preprocessor you created in Transforming images to TensorflowCheckpoint.from_model().

import tensorflow as tf

from ray.train.tensorflow import TensorflowCheckpoint

model = tf.keras.applications.resnet50.ResNet50()
checkpoint = TensorflowCheckpoint.from_model(model, preprocessor=preprocessor)

Batch predicting images#

BatchPredictor lets you perform inference on large image datasets.

To create a BatchPredictor, call BatchPredictor.from_checkpoint and pass the checkpoint you created in Creating checkpoints.

from ray.train.batch_predictor import BatchPredictor
from ray.train.torch import TorchPredictor

predictor = BatchPredictor.from_checkpoint(checkpoint, TorchPredictor)
predictor.predict(dataset, feature_columns=["image"], keep_columns=["label"])

To create a BatchPredictor, call BatchPredictor.from_checkpoint and pass the checkpoint you created in Creating checkpoints.

import tensorflow as tf

from ray.train.batch_predictor import BatchPredictor
from ray.train.tensorflow import TensorflowPredictor

predictor = BatchPredictor.from_checkpoint(
    checkpoint,
    TensorflowPredictor,
    model_definition=tf.keras.applications.resnet50.ResNet50,
)
predictor.predict(dataset, feature_columns=["image"], keep_columns=["label"])

Serving vision models#

Deployment lets you deploy a model to an endpoint and make predictions over the Internet.

Deployments use HTTP adapters to define how HTTP messages are converted to model inputs. For example, json_to_ndarray() converts HTTP messages like this:

{"array": [[1, 2], [3, 4]]}

To NumPy ndarrays like this:

array([[1., 2.],
        [3., 4.]])

To deploy a Torch model to an endpoint, create a predictor from the checkpoint you created in Creating checkpoints and serve via a Ray Serve deployment.

from io import BytesIO
import numpy as np
from PIL import Image
from ray import serve
from ray.train.torch import TorchPredictor

@serve.deployment
class TorchDeployment:
    def __init__(self, checkpoint):
        self.predictor = TorchPredictor.from_checkpoint(checkpoint)

    async def __call__(self, request):
        image = Image.open(BytesIO(await request.body()))
        return self.predictor.predict(np.array(image)[np.newaxis])

serve.run(TorchDeployment.bind(checkpoint))

Then, make a request to classify an image.

import requests

response = requests.get("http://placekitten.com/200/300")
response = requests.post("http://localhost:8000/", data=response.content)
predictions = response.json()

For more in-depth examples, read about Ray Serve.

To deploy a TensorFlow model to an endpoint, use the checkpoint you created in Creating checkpoints to create a Ray Serve deployment serving the model.

from io import BytesIO
import numpy as np
from PIL import Image
import tensorflow as tf

from ray import serve
from ray.train.tensorflow import TensorflowPredictor

@serve.deployment
class TensorflowDeployment:
    def __init__(self, checkpoint):
        self.predictor = TensorflowPredictor.from_checkpoint(
            checkpoint,
            model_definition=tf.keras.applications.resnet50.ResNet50,
        )

    async def __call__(self, request):
        image = Image.open(BytesIO(await request.body()))
        return self.predictor.predict({"image": np.array(image)[np.newaxis]})

serve.run(TensorflowDeployment.bind(checkpoint))

Then, make a request to classify an image.

import requests

response = requests.get("http://placekitten.com/200/300")
response = requests.post("http://localhost:8000/", data=response.content)
predictions = response.json()

For more information, see Ray Serve.