feat(bench): add new benchmarking script, harness, and profiling guide (#3840)

# Description

This redoes the merge-based benchmark in crates/benchmark, replacing it
with `divan` as a real harness combined with adding a script that can be
used for profiling.

# Related Issue(s)

Closes #3839 

# Documentation

Documentation is included in the updated README

---------

Signed-off-by: Abhi Agarwal <abhiaagarwal01@gmail.com>
This commit is contained in:
Abhi Agarwal
2025-10-13 06:02:35 -04:00
committed by GitHub
parent 4acc60b202
commit b351f77e39
14 changed files with 422 additions and 718 deletions
-1
View File
@@ -22,7 +22,6 @@ __blobstorage__
.githubchangeloggenerator.cache.log
.githubchangeloggenerator.cache/
.githubchangeloggenerator*
data
.zed/
# Add all Cargo.lock files except for those in binary crates
+9
View File
@@ -66,6 +66,7 @@ regex = { version = "1" }
thiserror = { version = "2" }
url = { version = "2" }
percent-encoding-rfc3986 = { version = "0.1.3" }
tempfile = { version = "3" }
uuid = { version = "1" }
# runtime / async
@@ -101,3 +102,11 @@ Arro3 = "Arro3"
AKS = "AKS"
# to avoid using 'type' as a field name.
tpe = "tpe"
# for better flamegraphs when benchmarking
[profile.bench]
debug = true
[profile.profiling]
inherits = "release"
debug = true
+12 -13
View File
@@ -7,23 +7,22 @@ license = "Apache-2.0"
keywords = ["deltalake", "delta", "datalake"]
description = "Delta-rs Benchmarks"
edition = "2021"
publish = false
[dependencies]
clap = { version = "4", features = [ "derive" ] }
chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
tokio = { version = "1", features = ["fs", "macros", "rt", "io-util"] }
# arrow
arrow = { workspace = true }
arrow-array = { workspace = true }
# serde
serde_json = { workspace = true }
# datafusion
datafusion = { workspace = true }
clap = { version = "4", features = ["derive"] }
tokio = { workspace = true, features = ["fs", "macros", "rt", "io-util"] }
url = { workspace = true }
tempfile = { workspace = true }
[dependencies.deltalake-core]
path = "../core"
version = "0"
features = ["datafusion"]
[dev-dependencies]
divan = "0.1"
[[bench]]
name = "merge"
harness = false
+47 -33
View File
@@ -4,52 +4,66 @@ The merge benchmarks are similar to the ones used by [Delta Spark](https://githu
## Dataset
Databricks maintains a public S3 bucket of the TPC-DS dataset with various factor where requesters must pay to download this dataset. Below is an example of how to list the 1gb scale factor
To generate the database, `duckdb` can be used. Install `duckdb` by following [these instructions](https://duckdb.org/#quickinstall).
```
aws s3api list-objects --bucket devrel-delta-datasets --request-payer requester --prefix tpcds-2.13/tpcds_sf1_parquet/web_returns/
Run the following commands:
```bash
duckdb
D CALL dsdgen(sf = 1);
100% ▕██████████████████████████████████████▏ (00:00:05.76 elapsed)
┌─────────┐
│ Success │
│ boolean │
├─────────┤
0 rows │
└─────────┘
D EXPORT DATABASE 'tpcds_parquet' (FORMAT PARQUET);
```
You can generate the TPC-DS dataset yourself by downloading and compiling [the generator](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp)
You may need to update the CFLAGS to include `-fcommon` to compile on newer versions of GCC.
This will generate a folder called `tpcds_parquet` containing many parquet files. Place it at `crates/benchmarks/data/tpcds_parquet` (or set `TPCDS_PARQUET_DIR`). Credits to [Xuanwo's Blog](https://xuanwo.io/links/2025/02/duckdb-is-the-best-tpc-data-generator/).
## Commands
These commands can be executed from the root of the benchmark crate. Some commands depend on the existence of the TPC-DS Dataset existing.
## Running benchmarks
### Convert
Converts a TPC-DS web_returns csv into a Delta table
Assumes the dataset is pipe delimited and records do not have a trailing delimiter
Benchmarks use Divan and time only the merge operation. A temporary Delta table is created per iteration from `web_returns.parquet` and removed afterwards.
Environment variables:
- `TPCDS_PARQUET_DIR` (optional): directory containing `web_returns.parquet`. Default: `crates/benchmarks/data/tpcds_parquet`.
From the repo root:
```
cargo run --release --bin merge -- convert data/tpcds/web_returns.dat data/web_returns
cargo bench -p delta-benchmarks --bench merge
```
### Standard
Execute the standard merge bench suite.
Results can be saved to a delta table for further analysis.
This table has the following schema:
group_id: Used to group all tests that executed as a part of this call. Default value is the timestamp of execution
name: The benchmark name that was executed
sample: The iteration number for a given benchmark name
duration_ms: How long the benchmark took in ms
data: Free field to pack any additional data
Filter a specific suite:
```
cargo run --release --bin merge -- standard data/web_returns 1 data/merge_results
cargo bench -p delta-benchmarks --bench merge -- delete_only
cargo bench -p delta-benchmarks --bench merge -- multiple_insert_only
cargo bench -p delta-benchmarks --bench merge -- upsert_file_matched
```
### Compare
Compare the results of two different runs.
The a Delta table paths and the `group_id` of each run and obtain the speedup for each test case
## Profiling script
```
cargo run --release --bin merge -- compare data/benchmarks/ 1698636172801 data/benchmarks/ 1699759539902
A simple CLI is available to run a single merge with configurable parameters (useful for profiling or ad-hoc runs). It creates a fresh temporary Delta table per sample from `web_returns.parquet`, times only the merge, and prints duration and metrics.
Run (from repo root):
```bash
cargo run --profile profiling -p delta-benchmarks -- upsert --matched 0.01 --not-matched 0.10
```
### Show
Show all benchmarks results from a delta table
Options:
- `upsert | delete | insert`: operation to benchmark
- `--matched <fraction>`: fraction of rows that match existing keys (default 0.01)
- `--not-matched <fraction>`: fraction of rows that do not match (default 0.10)
```
cargo run --release --bin merge -- show data/benchmark
```
### Flamegraphs using `samply`
Using `samply`, you can generate flamegraphs from the profile script.
To start,
```bash
cargo install samply --locked
cargo build --profile profiling -p delta-benchmarks
samply record ./target/profiling/delta-benchmarks upsert
```
+96
View File
@@ -0,0 +1,96 @@
use std::path::PathBuf;
use delta_benchmarks::{
merge_delete, merge_insert, merge_upsert, prepare_source_and_table, MergeOp, MergePerfParams,
};
use divan::{AllocProfiler, Bencher};
fn main() {
divan::main();
}
#[global_allocator]
static ALLOC: AllocProfiler = AllocProfiler::system();
fn bench_merge(bencher: Bencher, op: MergeOp, params: &MergePerfParams) {
let rt = tokio::runtime::Runtime::new().unwrap();
bencher
.with_inputs(|| {
let tmp_dir = tempfile::tempdir().unwrap();
let parquet_dir = PathBuf::from(
std::env::var("TPCDS_PARQUET_DIR")
.unwrap_or_else(|_| "data/tpcds_parquet".to_string()),
);
rt.block_on(async move {
let (source, table) = prepare_source_and_table(params, &tmp_dir, &parquet_dir)
.await
.unwrap();
(source, table, tmp_dir)
})
})
.bench_local_values(|(source, table, tmp_dir)| {
rt.block_on(async move {
let _ = divan::black_box(op(source, table).unwrap().await.unwrap());
});
drop(tmp_dir);
});
}
#[divan::bench(args = [
MergePerfParams {
sample_matched_rows: 0.05,
sample_not_matched_rows: 0.0,
}
])]
fn delete_only(bencher: Bencher, params: &MergePerfParams) {
bench_merge(bencher, merge_delete, params);
}
#[divan::bench(args = [
MergePerfParams {
sample_matched_rows: 0.00,
sample_not_matched_rows: 0.05,
},
MergePerfParams {
sample_matched_rows: 0.00,
sample_not_matched_rows: 0.50,
},
MergePerfParams {
sample_matched_rows: 0.00,
sample_not_matched_rows: 1.0,
},
])]
fn multiple_insert_only(bencher: Bencher, params: &MergePerfParams) {
bench_merge(bencher, merge_insert, params);
}
#[divan::bench(args = [
MergePerfParams {
sample_matched_rows: 0.01,
sample_not_matched_rows: 0.1,
},
MergePerfParams {
sample_matched_rows: 0.1,
sample_not_matched_rows: 0.0,
},
MergePerfParams {
sample_matched_rows: 0.1,
sample_not_matched_rows: 0.01,
},
MergePerfParams {
sample_matched_rows: 0.5,
sample_not_matched_rows: 0.001,
},
MergePerfParams {
sample_matched_rows: 0.99,
sample_not_matched_rows: 0.001,
},
MergePerfParams {
sample_matched_rows: 0.001,
sample_not_matched_rows: 0.001,
},
])]
fn upsert_file_matched(bencher: Bencher, params: &MergePerfParams) {
bench_merge(bencher, merge_upsert, params);
}
+2
View File
@@ -0,0 +1,2 @@
*
!.gitignore
-665
View File
@@ -1,665 +0,0 @@
use std::{
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use arrow::datatypes::Schema as ArrowSchema;
use arrow_array::{RecordBatch, StringArray, UInt32Array};
use chrono::Duration;
use clap::{command, Args, Parser, Subcommand};
use datafusion::common::DataFusionError;
use datafusion::functions::expr_fn::random;
use datafusion::logical_expr::{cast, col, lit};
use datafusion::{datasource::MemTable, prelude::DataFrame};
use deltalake_core::protocol::SaveMode;
use deltalake_core::{
arrow::{
self,
datatypes::{DataType, Field},
},
datafusion::prelude::{CsvReadOptions, SessionContext},
delta_datafusion::{DeltaScanConfig, DeltaTableProvider},
ensure_table_uri,
operations::merge::{MergeBuilder, MergeMetrics},
DeltaOps, DeltaTable, DeltaTableBuilder, DeltaTableError, ObjectStore, Path,
};
use serde_json::json;
use tokio::time::Instant;
/* Convert web_returns dataset from TPC DS's datagen utility into a Delta table
This table will be partitioned on `wr_returned_date_sk`
*/
pub async fn convert_tpcds_web_returns(input_path: String, table_path: String) -> Result<(), ()> {
let ctx = SessionContext::new();
let schema = ArrowSchema::new(vec![
Field::new("wr_returned_date_sk", DataType::Int64, true),
Field::new("wr_returned_time_sk", DataType::Int64, true),
Field::new("wr_item_sk", DataType::Int64, false),
Field::new("wr_refunded_customer_sk", DataType::Int64, true),
Field::new("wr_refunded_cdemo_sk", DataType::Int64, true),
Field::new("wr_refunded_hdemo_sk", DataType::Int64, true),
Field::new("wr_refunded_addr_sk", DataType::Int64, true),
Field::new("wr_returning_customer_sk", DataType::Int64, true),
Field::new("wr_returning_cdemo_sk", DataType::Int64, true),
Field::new("wr_returning_hdemo_sk", DataType::Int64, true),
Field::new("wr_returning_addr_sk", DataType::Int64, true),
Field::new("wr_web_page_sk", DataType::Int64, true),
Field::new("wr_reason_sk", DataType::Int64, true),
Field::new("wr_order_number", DataType::Int64, false),
Field::new("wr_return_quantity", DataType::Int32, true),
Field::new("wr_return_amt", DataType::Decimal128(7, 2), true),
Field::new("wr_return_tax", DataType::Decimal128(7, 2), true),
Field::new("wr_return_amt_inc_tax", DataType::Decimal128(7, 2), true),
Field::new("wr_fee", DataType::Decimal128(7, 2), true),
Field::new("wr_return_ship_cost", DataType::Decimal128(7, 2), true),
Field::new("wr_refunded_cash", DataType::Decimal128(7, 2), true),
Field::new("wr_reversed_charge", DataType::Decimal128(7, 2), true),
Field::new("wr_account_credit", DataType::Decimal128(7, 2), true),
Field::new("wr_net_loss", DataType::Decimal128(7, 2), true),
]);
let table = ctx
.read_csv(
input_path,
CsvReadOptions {
has_header: false,
delimiter: b'|',
file_extension: ".dat",
schema: Some(&schema),
..Default::default()
},
)
.await
.unwrap();
let table_url = ensure_table_uri(&table_path).unwrap();
DeltaOps::try_from_uri(table_url)
.await
.unwrap()
.write(table.collect().await.unwrap())
.with_partition_columns(vec!["wr_returned_date_sk"])
.await
.unwrap();
Ok(())
}
fn merge_upsert(source: DataFrame, table: DeltaTable) -> Result<MergeBuilder, DeltaTableError> {
DeltaOps(table)
.merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number")
.with_source_alias("source")
.with_target_alias("target")
.when_matched_update(|update| {
update
.update("wr_returned_date_sk", "source.wr_returned_date_sk")
.update("wr_returned_time_sk", "source.wr_returned_time_sk")
.update("wr_item_sk", "source.wr_item_sk")
.update("wr_refunded_customer_sk", "source.wr_refunded_customer_sk")
.update("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk")
.update("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk")
.update("wr_refunded_addr_sk", "source.wr_refunded_addr_sk")
.update("wr_returning_customer_sk", "source.wr_returning_customer_sk")
.update("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk")
.update("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk")
.update("wr_returning_addr_sk", "source.wr_returning_addr_sk")
.update("wr_web_page_sk", "source.wr_web_page_sk")
.update("wr_reason_sk", "source.wr_reason_sk")
.update("wr_order_number", "source.wr_order_number")
.update("wr_return_quantity", "source.wr_return_quantity")
.update("wr_return_amt", "source.wr_return_amt")
.update("wr_return_tax", "source.wr_return_tax")
.update("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax")
.update("wr_fee", "source.wr_fee")
.update("wr_return_ship_cost", "source.wr_return_ship_cost")
.update("wr_refunded_cash", "source.wr_refunded_cash")
.update("wr_reversed_charge", "source.wr_reversed_charge")
.update("wr_account_credit", "source.wr_account_credit")
.update("wr_net_loss", "source.wr_net_loss")
})?
.when_not_matched_insert(|insert| {
insert
.set("wr_returned_date_sk", "source.wr_returned_date_sk")
.set("wr_returned_time_sk", "source.wr_returned_time_sk")
.set("wr_item_sk", "source.wr_item_sk")
.set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk")
.set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk")
.set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk")
.set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk")
.set("wr_returning_customer_sk", "source.wr_returning_customer_sk")
.set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk")
.set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk")
.set("wr_returning_addr_sk", "source.wr_returning_addr_sk")
.set("wr_web_page_sk", "source.wr_web_page_sk")
.set("wr_reason_sk", "source.wr_reason_sk")
.set("wr_order_number", "source.wr_order_number")
.set("wr_return_quantity", "source.wr_return_quantity")
.set("wr_return_amt", "source.wr_return_amt")
.set("wr_return_tax", "source.wr_return_tax")
.set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax")
.set("wr_fee", "source.wr_fee")
.set("wr_return_ship_cost", "source.wr_return_ship_cost")
.set("wr_refunded_cash", "source.wr_refunded_cash")
.set("wr_reversed_charge", "source.wr_reversed_charge")
.set("wr_account_credit", "source.wr_account_credit")
.set("wr_net_loss", "source.wr_net_loss")
})
}
fn merge_insert(source: DataFrame, table: DeltaTable) -> Result<MergeBuilder, DeltaTableError> {
DeltaOps(table)
.merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number")
.with_source_alias("source")
.with_target_alias("target")
.when_not_matched_insert(|insert| {
insert
.set("wr_returned_date_sk", "source.wr_returned_date_sk")
.set("wr_returned_time_sk", "source.wr_returned_time_sk")
.set("wr_item_sk", "source.wr_item_sk")
.set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk")
.set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk")
.set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk")
.set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk")
.set("wr_returning_customer_sk", "source.wr_returning_customer_sk")
.set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk")
.set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk")
.set("wr_returning_addr_sk", "source.wr_returning_addr_sk")
.set("wr_web_page_sk", "source.wr_web_page_sk")
.set("wr_reason_sk", "source.wr_reason_sk")
.set("wr_order_number", "source.wr_order_number")
.set("wr_return_quantity", "source.wr_return_quantity")
.set("wr_return_amt", "source.wr_return_amt")
.set("wr_return_tax", "source.wr_return_tax")
.set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax")
.set("wr_fee", "source.wr_fee")
.set("wr_return_ship_cost", "source.wr_return_ship_cost")
.set("wr_refunded_cash", "source.wr_refunded_cash")
.set("wr_reversed_charge", "source.wr_reversed_charge")
.set("wr_account_credit", "source.wr_account_credit")
.set("wr_net_loss", "source.wr_net_loss")
})
}
fn merge_delete(source: DataFrame, table: DeltaTable) -> Result<MergeBuilder, DeltaTableError> {
DeltaOps(table)
.merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number")
.with_source_alias("source")
.with_target_alias("target")
.when_matched_delete(|delete| {
delete
})
}
async fn benchmark_merge_tpcds(
path: String,
parameters: MergePerfParams,
merge: fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>,
) -> Result<(core::time::Duration, MergeMetrics), DataFusionError> {
let table_url = ensure_table_uri(&path)?;
let table = DeltaTableBuilder::from_uri(table_url)?.load().await?;
let provider = DeltaTableProvider::try_new(
table.snapshot()?.snapshot().clone(),
table.log_store(),
DeltaScanConfig {
file_column_name: Some("file_path".to_string()),
..Default::default()
},
)
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("t1", Arc::new(provider))?;
let files = ctx
.sql("select file_path as file from t1 group by file")
.await?
.with_column("r", random())?
.filter(col("r").lt_eq(lit(parameters.sample_files)))?;
let file_sample = files.collect_partitioned().await?;
let schema = file_sample.first().unwrap().first().unwrap().schema();
let mem_table = Arc::new(MemTable::try_new(schema, file_sample)?);
ctx.register_table("file_sample", mem_table)?;
let file_sample_count = ctx.table("file_sample").await?.count().await?;
let row_sample = ctx.table("t1").await?.join(
ctx.table("file_sample").await?,
datafusion::common::JoinType::Inner,
&["file_path"],
&["file"],
None,
)?;
let matched = row_sample
.clone()
.filter(random().lt_eq(lit(parameters.sample_matched_rows)))?;
let rand = cast(random() * lit(u32::MAX), DataType::Int64);
let not_matched = row_sample
.filter(random().lt_eq(lit(parameters.sample_not_matched_rows)))?
.with_column("wr_item_sk", rand.clone())?
.with_column("wr_order_number", rand)?;
let source = matched.union(not_matched)?;
let start = Instant::now();
let (table, metrics) = merge(source, table)?.await?;
let end = Instant::now();
let duration = end.duration_since(start);
println!("File sample count: {file_sample_count}");
println!("{metrics:?}");
println!("Seconds: {}", duration.as_secs_f32());
// Clean up and restore to original state.
let (table, _) = DeltaOps(table).restore().with_version_to_restore(0).await?;
let (table, _) = DeltaOps(table)
.vacuum()
.with_retention_period(Duration::seconds(0))
.with_enforce_retention_duration(false)
.await?;
table
.object_store()
.delete(&Path::parse("_delta_log/00000000000000000001.json")?)
.await?;
table
.object_store()
.delete(&Path::parse("_delta_log/00000000000000000002.json")?)
.await?;
table
.object_store()
.delete(&Path::parse("_delta_log/00000000000000000003.json")?)
.await?;
let _ = table
.object_store()
.delete(&Path::parse("_delta_log/00000000000000000004.json")?)
.await;
Ok((duration, metrics))
}
#[derive(Subcommand, Debug)]
enum Command {
Convert(Convert),
Bench(BenchArg),
Standard(Standard),
Compare(Compare),
Show(Show),
}
#[derive(Debug, Args)]
struct Convert {
tpcds_path: String,
delta_path: String,
}
#[derive(Debug, Args)]
struct Standard {
delta_path: String,
samples: Option<u32>,
output_path: Option<String>,
group_id: Option<String>,
}
#[derive(Debug, Args)]
struct Compare {
before_path: String,
before_group_id: String,
after_path: String,
after_group_id: String,
}
#[derive(Debug, Args)]
struct Show {
path: String,
}
#[derive(Debug, Args)]
struct BenchArg {
table_path: String,
#[command(subcommand)]
name: MergeBench,
}
struct Bench {
name: String,
op: fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>,
params: MergePerfParams,
}
impl Bench {
fn new<S: ToString>(
name: S,
op: fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>,
params: MergePerfParams,
) -> Self {
Bench {
name: name.to_string(),
op,
params,
}
}
}
#[derive(Debug, Args, Clone)]
struct MergePerfParams {
pub sample_files: f32,
pub sample_matched_rows: f32,
pub sample_not_matched_rows: f32,
}
#[derive(Debug, Clone, Subcommand)]
enum MergeBench {
Upsert(MergePerfParams),
Delete(MergePerfParams),
Insert(MergePerfParams),
}
#[derive(Parser, Debug)]
#[command(about)]
struct MergePrefArgs {
#[command(subcommand)]
command: Command,
}
#[tokio::main]
async fn main() {
type MergeOp = fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>;
match MergePrefArgs::parse().command {
Command::Convert(Convert {
tpcds_path,
delta_path,
}) => {
convert_tpcds_web_returns(tpcds_path, delta_path)
.await
.unwrap();
}
Command::Bench(BenchArg { table_path, name }) => {
let (merge_op, params): (MergeOp, MergePerfParams) = match name {
MergeBench::Upsert(params) => (merge_upsert, params),
MergeBench::Delete(params) => (merge_delete, params),
MergeBench::Insert(params) => (merge_insert, params),
};
benchmark_merge_tpcds(table_path, params, merge_op)
.await
.unwrap();
}
Command::Standard(Standard {
delta_path,
samples,
output_path,
group_id,
}) => {
let benches = vec![Bench::new(
"delete_only_fileMatchedFraction_0.05_rowMatchedFraction_0.05",
merge_delete,
MergePerfParams {
sample_files: 0.05,
sample_matched_rows: 0.05,
sample_not_matched_rows: 0.0,
},
),
Bench::new(
"multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.05",
merge_insert,
MergePerfParams {
sample_files: 0.05,
sample_matched_rows: 0.00,
sample_not_matched_rows: 0.05,
},
),
Bench::new(
"multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.50",
merge_insert,
MergePerfParams {
sample_files: 0.05,
sample_matched_rows: 0.00,
sample_not_matched_rows: 0.50,
},
),
Bench::new(
"multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_1.0",
merge_insert,
MergePerfParams {
sample_files: 0.05,
sample_matched_rows: 0.00,
sample_not_matched_rows: 1.0,
},
),
Bench::new(
"upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.01_rowNotMatchedFraction_0.1",
merge_upsert,
MergePerfParams {
sample_files: 0.05,
sample_matched_rows: 0.01,
sample_not_matched_rows: 0.1,
},
),
Bench::new(
"upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.0_rowNotMatchedFraction_0.1",
merge_upsert,
MergePerfParams {
sample_files: 0.05,
sample_matched_rows: 0.00,
sample_not_matched_rows: 0.1,
},
),
Bench::new(
"upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.0",
merge_upsert,
MergePerfParams {
sample_files: 0.05,
sample_matched_rows: 0.1,
sample_not_matched_rows: 0.0,
},
),
Bench::new(
"upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.01",
merge_upsert,
MergePerfParams {
sample_files: 0.05,
sample_matched_rows: 0.1,
sample_not_matched_rows: 0.01,
},
),
Bench::new(
"upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.5_rowNotMatchedFraction_0.001",
merge_upsert,
MergePerfParams {
sample_files: 0.05,
sample_matched_rows: 0.5,
sample_not_matched_rows: 0.001,
},
),
Bench::new(
"upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.99_rowNotMatchedFraction_0.001",
merge_upsert,
MergePerfParams {
sample_files: 0.05,
sample_matched_rows: 0.99,
sample_not_matched_rows: 0.001,
},
),
Bench::new(
"upsert_fileMatchedFraction_0.05_rowMatchedFraction_1.0_rowNotMatchedFraction_0.001",
merge_upsert,
MergePerfParams {
sample_files: 0.05,
sample_matched_rows: 1.0,
sample_not_matched_rows: 0.001,
},
),
Bench::new(
"upsert_fileMatchedFraction_0.5_rowMatchedFraction_0.001_rowNotMatchedFraction_0.001",
merge_upsert,
MergePerfParams {
sample_files: 0.5,
sample_matched_rows: 0.001,
sample_not_matched_rows: 0.001,
},
),
Bench::new(
"upsert_fileMatchedFraction_1.0_rowMatchedFraction_0.001_rowNotMatchedFraction_0.001",
merge_upsert,
MergePerfParams {
sample_files: 1.0,
sample_matched_rows: 0.001,
sample_not_matched_rows: 0.001,
},
)
];
let num_samples = samples.unwrap_or(1);
let group_id = group_id.unwrap_or(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
.to_string(),
);
let output = output_path.unwrap_or("data/benchmarks".into());
let mut group_ids = vec![];
let mut name = vec![];
let mut samples = vec![];
let mut duration_ms = vec![];
let mut data = vec![];
for bench in benches {
for sample in 0..num_samples {
println!("Test: {} Sample: {sample}", bench.name);
let res =
benchmark_merge_tpcds(delta_path.clone(), bench.params.clone(), bench.op)
.await
.unwrap();
group_ids.push(group_id.clone());
name.push(bench.name.clone());
samples.push(sample);
duration_ms.push(res.0.as_millis() as u32);
data.push(json!(res.1).to_string());
}
}
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("group_id", DataType::Utf8, false),
Field::new("name", DataType::Utf8, false),
Field::new("sample", DataType::UInt32, false),
Field::new("duration_ms", DataType::UInt32, false),
Field::new("data", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(group_ids)),
Arc::new(StringArray::from(name)),
Arc::new(UInt32Array::from(samples)),
Arc::new(UInt32Array::from(duration_ms)),
Arc::new(StringArray::from(data)),
],
)
.unwrap();
let output_url = ensure_table_uri(&output).unwrap();
DeltaOps::try_from_uri(output_url)
.await
.unwrap()
.write(vec![batch])
.with_save_mode(SaveMode::Append)
.await
.unwrap();
}
Command::Compare(Compare {
before_path,
before_group_id,
after_path,
after_group_id,
}) => {
let before_url = ensure_table_uri(&before_path).unwrap();
let before_table = DeltaTableBuilder::from_uri(before_url)
.unwrap()
.load()
.await
.unwrap();
let after_url = ensure_table_uri(&after_path).unwrap();
let after_table = DeltaTableBuilder::from_uri(after_url)
.unwrap()
.load()
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("before", Arc::new(before_table))
.unwrap();
ctx.register_table("after", Arc::new(after_table)).unwrap();
let before_stats = ctx
.sql(&format!(
"
select name as before_name,
avg(cast(duration_ms as float)) as before_duration_avg
from before where group_id = {before_group_id}
group by name
",
))
.await
.unwrap();
let after_stats = ctx
.sql(&format!(
"
select name as after_name,
avg(cast(duration_ms as float)) as after_duration_avg
from after where group_id = {after_group_id}
group by name
",
))
.await
.unwrap();
before_stats
.join(
after_stats,
datafusion::common::JoinType::Inner,
&["before_name"],
&["after_name"],
None,
)
.unwrap()
.select(vec![
col("before_name").alias("name"),
col("before_duration_avg"),
col("after_duration_avg"),
(col("before_duration_avg") / (col("after_duration_avg"))),
])
.unwrap()
.sort(vec![col("name").sort(true, true)])
.unwrap()
.show()
.await
.unwrap();
}
Command::Show(Show { path }) => {
let table_url = ensure_table_uri(&path).unwrap();
let stats = DeltaTableBuilder::from_uri(table_url)
.unwrap()
.load()
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("stats", Arc::new(stats)).unwrap();
ctx.sql("select * from stats")
.await
.unwrap()
.show()
.await
.unwrap();
}
}
}
+184
View File
@@ -0,0 +1,184 @@
use std::path::Path;
use deltalake_core::datafusion::functions::expr_fn;
use deltalake_core::kernel::engine::arrow_conversion::TryIntoKernel;
use deltalake_core::kernel::{StructField, StructType};
use deltalake_core::operations::merge::MergeBuilder;
use deltalake_core::{arrow, DeltaResult};
use deltalake_core::{
datafusion::{
logical_expr::{cast, lit},
prelude::{DataFrame, ParquetReadOptions, SessionContext},
},
DeltaOps, DeltaTable, DeltaTableError,
};
use tempfile::TempDir;
use url::Url;
pub type MergeOp = fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>;
#[derive(Debug, Clone)]
pub struct MergePerfParams {
pub sample_matched_rows: f32,
pub sample_not_matched_rows: f32,
}
pub fn merge_upsert(source: DataFrame, table: DeltaTable) -> Result<MergeBuilder, DeltaTableError> {
deltalake_core::DeltaOps(table)
.merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number")
.with_source_alias("source")
.with_target_alias("target")
.when_matched_update(|update| {
update
.update("wr_returned_date_sk", "source.wr_returned_date_sk")
.update("wr_returned_time_sk", "source.wr_returned_time_sk")
.update("wr_item_sk", "source.wr_item_sk")
.update("wr_refunded_customer_sk", "source.wr_refunded_customer_sk")
.update("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk")
.update("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk")
.update("wr_refunded_addr_sk", "source.wr_refunded_addr_sk")
.update("wr_returning_customer_sk", "source.wr_returning_customer_sk")
.update("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk")
.update("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk")
.update("wr_returning_addr_sk", "source.wr_returning_addr_sk")
.update("wr_web_page_sk", "source.wr_web_page_sk")
.update("wr_reason_sk", "source.wr_reason_sk")
.update("wr_order_number", "source.wr_order_number")
.update("wr_return_quantity", "source.wr_return_quantity")
.update("wr_return_amt", "source.wr_return_amt")
.update("wr_return_tax", "source.wr_return_tax")
.update("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax")
.update("wr_fee", "source.wr_fee")
.update("wr_return_ship_cost", "source.wr_return_ship_cost")
.update("wr_refunded_cash", "source.wr_refunded_cash")
.update("wr_reversed_charge", "source.wr_reversed_charge")
.update("wr_account_credit", "source.wr_account_credit")
.update("wr_net_loss", "source.wr_net_loss")
})?
.when_not_matched_insert(|insert| {
insert
.set("wr_returned_date_sk", "source.wr_returned_date_sk")
.set("wr_returned_time_sk", "source.wr_returned_time_sk")
.set("wr_item_sk", "source.wr_item_sk")
.set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk")
.set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk")
.set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk")
.set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk")
.set("wr_returning_customer_sk", "source.wr_returning_customer_sk")
.set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk")
.set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk")
.set("wr_returning_addr_sk", "source.wr_returning_addr_sk")
.set("wr_web_page_sk", "source.wr_web_page_sk")
.set("wr_reason_sk", "source.wr_reason_sk")
.set("wr_order_number", "source.wr_order_number")
.set("wr_return_quantity", "source.wr_return_quantity")
.set("wr_return_amt", "source.wr_return_amt")
.set("wr_return_tax", "source.wr_return_tax")
.set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax")
.set("wr_fee", "source.wr_fee")
.set("wr_return_ship_cost", "source.wr_return_ship_cost")
.set("wr_refunded_cash", "source.wr_refunded_cash")
.set("wr_reversed_charge", "source.wr_reversed_charge")
.set("wr_account_credit", "source.wr_account_credit")
.set("wr_net_loss", "source.wr_net_loss")
})
}
pub fn merge_insert(source: DataFrame, table: DeltaTable) -> Result<MergeBuilder, DeltaTableError> {
deltalake_core::DeltaOps(table)
.merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number")
.with_source_alias("source")
.with_target_alias("target")
.when_not_matched_insert(|insert| {
insert
.set("wr_returned_date_sk", "source.wr_returned_date_sk")
.set("wr_returned_time_sk", "source.wr_returned_time_sk")
.set("wr_item_sk", "source.wr_item_sk")
.set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk")
.set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk")
.set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk")
.set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk")
.set("wr_returning_customer_sk", "source.wr_returning_customer_sk")
.set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk")
.set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk")
.set("wr_returning_addr_sk", "source.wr_returning_addr_sk")
.set("wr_web_page_sk", "source.wr_web_page_sk")
.set("wr_reason_sk", "source.wr_reason_sk")
.set("wr_order_number", "source.wr_order_number")
.set("wr_return_quantity", "source.wr_return_quantity")
.set("wr_return_amt", "source.wr_return_amt")
.set("wr_return_tax", "source.wr_return_tax")
.set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax")
.set("wr_fee", "source.wr_fee")
.set("wr_return_ship_cost", "source.wr_return_ship_cost")
.set("wr_refunded_cash", "source.wr_refunded_cash")
.set("wr_reversed_charge", "source.wr_reversed_charge")
.set("wr_account_credit", "source.wr_account_credit")
.set("wr_net_loss", "source.wr_net_loss")
})
}
pub fn merge_delete(source: DataFrame, table: DeltaTable) -> Result<MergeBuilder, DeltaTableError> {
deltalake_core::DeltaOps(table)
.merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number")
.with_source_alias("source")
.with_target_alias("target")
.when_matched_delete(|delete| delete)
}
/// Prepare source DataFrame and target Delta table from DuckDB-generated TPC-DS parquet.
/// Creates a temporary Delta table from web_returns.parquet as the target.
/// Returns (source_df, target_table) for benchmarking.
pub async fn prepare_source_and_table(
params: &MergePerfParams,
tmp_dir: &TempDir,
parquet_dir: &Path,
) -> DeltaResult<(DataFrame, DeltaTable)> {
let ctx = SessionContext::new();
let parquet_path = parquet_dir
.join("web_returns.parquet")
.to_str()
.unwrap()
.to_owned();
let parquet_df = ctx
.read_parquet(&parquet_path, ParquetReadOptions::default())
.await?;
let temp_table_url = Url::from_directory_path(tmp_dir).unwrap();
let schema = parquet_df.schema();
let delta_schema: StructType = schema.as_arrow().try_into_kernel().unwrap();
let batches = parquet_df.collect().await?;
let fields: Vec<StructField> = delta_schema.fields().cloned().collect();
let table = DeltaOps::try_from_uri(temp_table_url)
.await?
.create()
.with_columns(fields)
.await?;
let table = DeltaOps(table).write(batches).await?;
// Now prepare source DataFrame with sampling
let source = ctx
.read_parquet(&parquet_path, ParquetReadOptions::default())
.await?;
// Split matched and not-matched portions
let matched = source
.clone()
.filter(expr_fn::random().lt_eq(lit(params.sample_matched_rows)))?;
let rand = cast(
expr_fn::random() * lit(u32::MAX),
arrow::datatypes::DataType::Int64,
);
let not_matched = source
.filter(expr_fn::random().lt_eq(lit(params.sample_not_matched_rows)))?
.with_column("wr_item_sk", rand.clone())?
.with_column("wr_order_number", rand)?;
let source = matched.union(not_matched)?;
Ok((source, table))
}
+66
View File
@@ -0,0 +1,66 @@
use std::{path::PathBuf, time::Instant};
use clap::{Parser, ValueEnum};
use delta_benchmarks::{
merge_delete, merge_insert, merge_upsert, prepare_source_and_table, MergeOp, MergePerfParams,
};
#[derive(Copy, Clone, Debug, ValueEnum)]
enum OpKind {
Upsert,
Delete,
Insert,
}
#[derive(Parser, Debug)]
#[command(about = "Run a merge benchmark with configurable parameters")]
struct Cli {
/// Operation to benchmark
#[arg(value_enum)]
op: OpKind,
/// Fraction of rows that match an existing key (0.0-1.0)
#[arg(long, default_value_t = 0.01)]
matched: f32,
/// Fraction of rows that do not match (0.0-1.0)
#[arg(long, default_value_t = 0.10)]
not_matched: f32,
}
#[tokio::main]
async fn main() {
let cli = Cli::parse();
let op_fn: MergeOp = match cli.op {
OpKind::Upsert => merge_upsert,
OpKind::Delete => merge_delete,
OpKind::Insert => merge_insert,
};
let params = MergePerfParams {
sample_matched_rows: cli.matched,
sample_not_matched_rows: cli.not_matched,
};
let tmp_dir = tempfile::tempdir().expect("create tmp dir");
let parquet_dir = PathBuf::from(
std::env::var("TPCDS_PARQUET_DIR")
.unwrap_or_else(|_| "crates/benchmarks/data/tpcds_parquet".to_string()),
);
let (source, table) = prepare_source_and_table(&params, &tmp_dir, &parquet_dir)
.await
.expect("prepare inputs");
let start = Instant::now();
let (_table, metrics) = op_fn(source, table)
.expect("build merge")
.await
.expect("execute merge");
let elapsed = start.elapsed();
println!("duration_ms={} metrics={:?}", elapsed.as_millis(), metrics)
}
+1 -1
View File
@@ -34,7 +34,7 @@ moka = { version = "0.12", optional = true, features = ["future"] }
[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tempfile = "3"
tempfile = { workspace = true }
httpmock = { version = "0.8.0-alpha.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
+2 -2
View File
@@ -69,7 +69,7 @@ tokio = { workspace = true, features = [
# caching
foyer = { version = "0.20.0", optional = true, features = ["serde"] }
tempfile = { version = "3.19.1", optional = true }
tempfile = { workspace = true, optional = true }
# other deps (these should be organized and pulled into workspace.dependencies as necessary)
cfg-if = "1"
@@ -98,7 +98,7 @@ pretty_assertions = "1.2.1"
pretty_env_logger = "0.5.0"
rstest = { version = "0.26.1" }
serial_test = "3"
tempfile = "3"
tempfile = { workspace = true }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
+1 -1
View File
@@ -31,7 +31,7 @@ deltalake-test = { path = "../test" }
pretty_env_logger = "0.5.0"
rand = "0.8"
serde_json = { workspace = true }
tempfile = "3"
tempfile = { workspace = true }
[features]
integration_test = []
+1 -1
View File
@@ -31,7 +31,7 @@ deltalake-test = { path = "../test" }
pretty_env_logger = "0.5.0"
rand = "0.8"
serde_json = { workspace = true }
tempfile = "3"
tempfile = { workspace = true }
fs_extra = "1.3.0"
[features]
+1 -1
View File
@@ -28,7 +28,7 @@ url = { workspace = true }
dotenvy = "0"
fs_extra = "1.3.0"
futures = { version = "0.3" }
tempfile = "3"
tempfile = { workspace = true }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
[features]