Loading Data
Contents
Loading Data#
Ray Data loads data from various sources. This guide shows you how to:
Read files like images
Load in-memory data like pandas DataFrames
Read databases like MySQL
Reading files#
Ray Data reads files from local disk or cloud storage in a variety of file formats. To view the full list of supported file formats, see the Input/Output reference.
To read Parquet files, call read_parquet().
import ray
ds = ray.data.read_parquet("local:///tmp/iris.parquet")
print(ds.schema())
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string
To read raw images, call read_images(). Ray Data represents
images as NumPy ndarrays.
import ray
ds = ray.data.read_images("local:///tmp/batoidea/JPEGImages/")
print(ds.schema())
Column Type
------ ----
image numpy.ndarray(shape=(32, 32, 3), dtype=uint8)
To read lines of text, call read_text().
import ray
ds = ray.data.read_text("local:///tmp/this.txt")
print(ds.schema())
Column Type
------ ----
text string
To read CSV files, call read_csv().
import ray
ds = ray.data.read_csv("local:///tmp/iris.csv")
print(ds.schema())
Column Type
------ ----
sepal length (cm) double
sepal width (cm) double
petal length (cm) double
petal width (cm) double
target int64
To read raw binary files, call read_binary_files().
import ray
ds = ray.data.read_binary_files("local:///tmp/file.dat")
print(ds.schema())
Column Type
------ ----
bytes binary
To read TFRecords files, call read_tfrecords().
import ray
ds = ray.data.read_tfrecords("local:///tmp/iris.tfrecords")
print(ds.schema())
Column Type
------ ----
sepal length (cm) double
sepal width (cm) double
petal length (cm) double
petal width (cm) double
target int64
Reading files from local disk#
To read files from local disk, call a function like read_parquet() and
specify paths with the local:// schema. Paths can point to files or directories.
To read formats other than Parquet, see the Input/Output reference.
Tip
If your files are accessible on every node, exclude local:// to parallelize the
read tasks across the cluster.
import ray
ds = ray.data.read_parquet("local:///tmp/iris.parquet")
print(ds.schema())
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string
Reading files from cloud storage#
To read files in cloud storage, authenticate all nodes with your cloud service provider.
Then, call a method like read_parquet() and specify URIs with the
appropriate schema. URIs can point to buckets, folders, or objects.
To read formats other than Parquet, see the Input/Output reference.
To read files from Amazon S3, specify URIs with the s3:// scheme.
import ray
ds = ray.data.read_parquet("s3://[email protected]/iris.parquet")
print(ds.schema())
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string
To read files from Google Cloud Storage, install the Filesystem interface to Google Cloud Storage
pip install gcsfs
Then, create a GCSFileSystem and specify URIs with the gcs:// scheme.
import ray
ds = ray.data.read_parquet("s3://[email protected]/iris.parquet")
print(ds.schema())
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string
To read files from Azure Blob Storage, install the Filesystem interface to Azure-Datalake Gen1 and Gen2 Storage
pip install adlfs
Then, create a AzureBlobFileSystem and specify URIs with the az:// scheme.
import adlfs
import ray
ds = ray.data.read_parquet(
"az://ray-example-data/iris.parquet",
adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
)
print(ds.schema())
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string
Reading files from NFS#
To read files from NFS filesystems, call a function like read_parquet()
and specify files on the mounted filesystem. Paths can point to files or directories.
To read formats other than Parquet, see the Input/Output reference.
import ray
ds = ray.data.read_parquet("/mnt/cluster_storage/iris.parquet")
print(ds.schema())
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string
Handling compressed files#
To read a compressed file, specify compression in arrow_open_stream_args.
You can use any Codec supported by Arrow.
import ray
ds = ray.data.read_csv(
"s3://[email protected]/iris.csv.gz",
arrow_open_stream_args={"compression": "gzip"},
)
Loading data from other libraries#
Loading data from single-node data libraries#
Ray Data interoperates with libraries like pandas, NumPy, and Arrow.
To create a Dataset from Python objects, call
from_items() and pass in a list of Dict. Ray Data treats
each Dict as a row.
import ray
ds = ray.data.from_items([
{"food": "spam", "price": 9.34},
{"food": "ham", "price": 5.37},
{"food": "eggs", "price": 0.94}
])
print(ds)
MaterializedDataset(
num_blocks=3,
num_rows=3,
schema={food: string, price: double}
)
You can also create a Dataset from a list of regular
Python objects.
import ray
ds = ray.data.from_items([1, 2, 3, 4, 5])
print(ds)
MaterializedDataset(num_blocks=5, num_rows=5, schema={item: int64})
To create a Dataset from a NumPy array, call
from_numpy(). Ray Data treats the outer axis as the row
dimension.
import numpy as np
import ray
array = np.ones((3, 2, 2))
ds = ray.data.from_numpy(array)
print(ds)
MaterializedDataset(
num_blocks=1,
num_rows=3,
schema={data: numpy.ndarray(shape=(2, 2), dtype=double)}
)
To create a Dataset from a pandas DataFrame, call
from_pandas().
import pandas as pd
import ray
df = pd.DataFrame({
"food": ["spam", "ham", "eggs"],
"price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_pandas(df)
print(ds)
MaterializedDataset(
num_blocks=1,
num_rows=3,
schema={food: object, price: float64}
)
To create a Dataset from an Arrow table, call
from_arrow().
import pyarrow as pa
table = pa.table({
"food": ["spam", "ham", "eggs"],
"price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_arrow(table)
print(ds)
MaterializedDataset(
num_blocks=1,
num_rows=3,
schema={food: string, price: double}
)
Loading data from distributed DataFrame libraries#
Ray Data interoperates with distributed data processing frameworks like Dask, Spark, Modin, and Mars.
To create a Dataset from a
Dask DataFrame, call
from_dask(). This function constructs a
Dataset backed by the distributed Pandas DataFrame partitions that underly
the Dask DataFrame.
import dask.dataframe as dd
import pandas as pd
import ray
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
ddf = dd.from_pandas(df, npartitions=4)
# Create a Dataset from a Dask DataFrame.
ds = ray.data.from_dask(ddf)
ds.show(3)
{'string': 'spam', 'number': 0}
{'string': 'ham', 'number': 1}
{'string': 'eggs', 'number': 2}
To create a Dataset from a Spark DataFrame,
call from_spark(). This function creates a Dataset backed by
the distributed Spark DataFrame partitions that underly the Spark DataFrame.
import ray
import raydp
spark = raydp.init_spark(app_name="Spark -> Datasets Example",
num_executors=2,
executor_cores=2,
executor_memory="500MB")
df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"])
ds = ray.data.from_spark(df)
ds.show(3)
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}
To create a Dataset from a Modin DataFrame, call
from_modin(). This function constructs a Dataset backed by
the distributed Pandas DataFrame partitions that underly the Modin DataFrame.
import modin.pandas as md
import pandas as pd
import ray
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
mdf = md.DataFrame(df)
# Create a Dataset from a Modin DataFrame.
ds = ray.data.from_modin(mdf)
ds.show(3)
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}
To create a Dataset from a Mars DataFrame, call
from_mars(). This function constructs a Dataset
backed by the distributed Pandas DataFrame partitions that underly the Mars
DataFrame.
import mars
import mars.dataframe as md
import pandas as pd
import ray
cluster = mars.new_cluster_in_ray(worker_num=2, worker_cpu=1)
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
mdf = md.DataFrame(df, num_partitions=8)
# Create a tabular Dataset from a Mars DataFrame.
ds = ray.data.from_mars(mdf)
ds.show(3)
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}
Loading data from ML libraries#
Ray Data interoperates with HuggingFace and TensorFlow datasets.
To convert a 🤗 Dataset to a Ray Datasets, call
from_huggingface(). This function accesses the underlying Arrow
table and converts it to a Dataset directly.
Warning
from_huggingface doesn’t support parallel
reads. This isn’t an issue with in-memory 🤗 Datasets, but may fail with
large memory-mapped 🤗 Datasets. Also, 🤗 IterableDataset objects aren’t
supported.
import ray.data
from datasets import load_dataset
hf_ds = load_dataset("wikitext", "wikitext-2-raw-v1")
ray_ds = ray.data.from_huggingface(hf_ds["train"])
ray_ds.take(2)
[{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}]
To convert a TensorFlow dataset to a Ray Dataset, call from_tf().
Warning
from_tf doesn’t support parallel reads. Only use this
function with small datasets like MNIST or CIFAR.
import ray
import tensorflow_datasets as tfds
tf_ds, _ = tfds.load("cifar10", split=["train", "test"])
ds = ray.data.from_tf(tf_ds)
print(ds)
MaterializedDataset(
num_blocks=...,
num_rows=50000,
schema={
id: binary,
image: numpy.ndarray(shape=(32, 32, 3), dtype=uint8),
label: int64
}
)
Reading databases#
Ray Data reads from databases like MySQL, Postgres, and MongoDB.
Reading SQL databases#
Call read_sql() to read data from a database that provides a
Python DB API2-compliant connector.
To read from MySQL, install MySQL Connector/Python. It’s the first-party MySQL database connector.
pip install mysql-connector-python
Then, define your connection logic and query the database.
import mysql.connector
import ray
def create_connection():
return mysql.connector.connect(
user="admin",
password=...,
host="example-mysql-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
connection_timeout=30,
database="example",
)
# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
"SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
"SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)
To read from PostgreSQL, install Psycopg 2. It’s the most popular PostgreSQL database connector.
pip install psycopg2-binary
Then, define your connection logic and query the database.
import psycopg2
import ray
def create_connection():
return psycopg2.connect(
user="postgres",
password=...,
host="example-postgres-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
dbname="example",
)
# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
"SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
"SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)
To read from Snowflake, install the Snowflake Connector for Python.
pip install snowflake-connector-python
Then, define your connection logic and query the database.
import snowflake.connector
import ray
def create_connection():
return snowflake.connector.connect(
user=...,
password=...
account="ZZKXUVH-IPB52023",
database="example",
)
# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
"SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
"SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)
To read from Databricks, install the Databricks SQL Connector for Python.
pip install databricks-sql-connector
Then, define your connection logic and read from the Databricks SQL warehouse.
from databricks import sql
import ray
def create_connection():
return sql.connect(
server_hostname="dbc-1016e3a4-d292.cloud.databricks.com",
http_path="/sql/1.0/warehouses/a918da1fc0b7fed0",
access_token=...,
# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
"SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
"SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)
To read from BigQuery, install the Python Client for Google BigQuery. This package includes a DB API2-compliant database connector.
pip install google-cloud-bigquery
Then, define your connection logic and query the dataset.
from google.cloud import bigquery
from google.cloud.bigquery import dbapi
import ray
def create_connection():
client = bigquery.Client(...)
return dbapi.Connection(client)
# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
"SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
"SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)
Reading MongoDB#
To read data from MongoDB, call read_mongo() and specify the
the source URI, database, and collection. You also need to specify a pipeline to
run against the collection.
import ray
# Read a local MongoDB.
ds = ray.data.read_mongo(
uri="mongodb://localhost:27017",
database="my_db",
collection="my_collection",
pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}],
)
# Reading a remote MongoDB is the same.
ds = ray.data.read_mongo(
uri="mongodb://username:[email protected]:27017/?authSource=admin",
database="my_db",
collection="my_collection",
pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}],
)
# Write back to MongoDB.
ds.write_mongo(
MongoDatasource(),
uri="mongodb://username:[email protected]:27017/?authSource=admin",
database="my_db",
collection="my_collection",
)
Creating synthetic data#
Synthetic datasets can be useful for testing and benchmarking.
To create a synthetic Dataset from a range of integers, call
range(). Ray Data stores the integer range in a single column.
import ray
ds = ray.data.range(10000)
print(ds.schema())
Column Type
------ ----
id int64
To create a synthetic Dataset containing arrays, call
range_tensor(). Ray Data packs an integer range into ndarrays of
the provided shape.
import ray
ds = ray.data.range_tensor(10, shape=(64, 64))
print(ds.schema())
Column Type
------ ----
data numpy.ndarray(shape=(64, 64), dtype=int64)
Loading other data sources#
If Ray Data can’t load your data, subclass
Datasource. Then, construct an instance of your custom
datasource and pass it to read_datasource().
# Read from a custom datasource.
ds = ray.data.read_datasource(YourCustomDatasource(), **read_args)
# Write to a custom datasource.
ds.write_datasource(YourCustomDatasource(), **write_args)
For an example, see Implementing a Custom Datasource.
Performance considerations#
The dataset parallelism determines the number of blocks the base data will be split
into for parallel reads. Ray Data will decide internally how many read tasks to run
concurrently to best utilize the cluster, ranging from 1...parallelism tasks. In
other words, the higher the parallelism, the smaller the data blocks in the Dataset and
hence the more opportunity for parallel execution.
This default parallelism can be overridden via the parallelism argument; see the
performance guide for more information on how to tune this read parallelism.