mirror of
https://github.com/langchain-ai/datafusion.git
synced 2026-07-01 21:24:06 -04:00
Add deterministic per-file timing summary to sqllogictest runner (#20569)
## Which issue does this PR close? * Part of #20524. ## Rationale for this change The sqllogictest runner executes files in parallel, but it was hard to pinpoint which test files dominate wall-clock time. This change adds **deterministic per-file elapsed timing observability** so we can identify long-tail files and prioritize follow-up optimization work, while keeping default output usable for both local development (TTY) and CI (non-TTY). ## What changes are included in this PR? * Collect per-file elapsed durations in the sqllogictest runner and aggregate them at end-of-run. * Print a **deterministic timing summary** (stable sort: elapsed desc, path asc; stable formatting) via `MultiProgress` to avoid interleaved progress-bar noise. * Add CLI flags and environment variables to control output: * `--timing-summary auto|off|top|full` (also `SLT_TIMING_SUMMARY`) * `--timing-top-n <N>` (also `SLT_TIMING_TOP_N`, must be `>= 1`) * Default behavior: * `auto` maps to `off` for local TTY runs and `top` for CI/non-TTY runs. * Add optional debug logging for slow files (over 30s) behind `SLT_TIMING_DEBUG_SLOW_FILES=1`. * Update `datafusion/sqllogictest/README.md` with usage examples. ## Are these changes tested? * Covered by existing `sqllogictests` integration test execution; no new unit tests were added. * Manual validation plan (ran locally / in CI as applicable): * `cargo test --test sqllogictests -- push_down_filter_ --test-threads 16` * `cargo test --test sqllogictests -- --test-threads 16` * `cargo test --test sqllogictests -- --timing-summary top --timing-top-n 10` * `cargo test --test sqllogictests -- --timing-summary full` * Verified output properties: * Summary ordering is deterministic across repeated runs (elapsed desc, path asc). * `auto` mode is quiet on TTY but prints a top-N summary on non-TTY/CI. * Pass/fail behavior and error reporting are unchanged. ## Are there any user-facing changes? Yes (test-runner UX only): * New optional timing summary output for `sqllogictests`. * New CLI flags / env vars documented in `datafusion/sqllogictest/README.md`: * `--timing-summary auto|off|top|full` / `SLT_TIMING_SUMMARY` * `--timing-top-n <N>` / `SLT_TIMING_TOP_N` * `SLT_TIMING_DEBUG_SLOW_FILES=1` (optional debug logging for slow files >30s) No public DataFusion APIs are changed. ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.
This commit is contained in:
@@ -70,6 +70,36 @@ cargo test --test sqllogictests -- ddl --complete
|
||||
RUST_LOG=debug cargo test --test sqllogictests -- ddl
|
||||
```
|
||||
|
||||
### Per-file timing summary
|
||||
|
||||
The sqllogictest runner can emit deterministic per-file elapsed timings to help
|
||||
identify slow test files.
|
||||
|
||||
By default (`--timing-summary auto`), timing summary output is disabled in local
|
||||
TTY runs and shows a top-slowest summary in non-TTY/CI runs.
|
||||
|
||||
`--timing-top-n` / `SLT_TIMING_TOP_N` must be a positive integer (`>= 1`).
|
||||
|
||||
```shell
|
||||
# Show top 10 slowest files (good for CI)
|
||||
cargo test --test sqllogictests -- --timing-summary top --timing-top-n 10
|
||||
```
|
||||
|
||||
```shell
|
||||
# Show full per-file timing table
|
||||
cargo test --test sqllogictests -- --timing-summary full
|
||||
```
|
||||
|
||||
```shell
|
||||
# Same controls via environment variables
|
||||
SLT_TIMING_SUMMARY=top SLT_TIMING_TOP_N=15 cargo test --test sqllogictests
|
||||
```
|
||||
|
||||
```shell
|
||||
# Optional debug logging for per-task slow files (>30s), disabled by default
|
||||
SLT_TIMING_DEBUG_SLOW_FILES=1 cargo test --test sqllogictests
|
||||
```
|
||||
|
||||
## Cookbook: Adding Tests
|
||||
|
||||
1. Add queries
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
use clap::{ColorChoice, Parser};
|
||||
use clap::{ColorChoice, Parser, ValueEnum};
|
||||
use datafusion::common::instant::Instant;
|
||||
use datafusion::common::utils::get_available_parallelism;
|
||||
use datafusion::common::{DataFusionError, Result, exec_datafusion_err, exec_err};
|
||||
@@ -49,6 +49,7 @@ use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::Duration;
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
mod postgres_container;
|
||||
@@ -58,6 +59,21 @@ const DATAFUSION_TESTING_TEST_DIRECTORY: &str = "../../datafusion-testing/data/"
|
||||
const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_";
|
||||
const SQLITE_PREFIX: &str = "sqlite";
|
||||
const ERRS_PER_FILE_LIMIT: usize = 10;
|
||||
const TIMING_DEBUG_SLOW_FILES_ENV: &str = "SLT_TIMING_DEBUG_SLOW_FILES";
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
|
||||
enum TimingSummaryMode {
|
||||
Auto,
|
||||
Off,
|
||||
Top,
|
||||
Full,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct FileTiming {
|
||||
relative_path: PathBuf,
|
||||
elapsed: Duration,
|
||||
}
|
||||
|
||||
pub fn main() -> Result<()> {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
@@ -100,6 +116,7 @@ async fn run_tests() -> Result<()> {
|
||||
env_logger::init();
|
||||
|
||||
let options: Options = Parser::parse();
|
||||
let timing_debug_slow_files = is_env_truthy(TIMING_DEBUG_SLOW_FILES_ENV);
|
||||
if options.list {
|
||||
// nextest parses stdout, so print messages to stderr
|
||||
eprintln!("NOTICE: --list option unsupported, quitting");
|
||||
@@ -160,7 +177,7 @@ async fn run_tests() -> Result<()> {
|
||||
let is_ci = !stderr().is_terminal();
|
||||
let completed_count = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let errors: Vec<_> = futures::stream::iter(test_files)
|
||||
let file_results: Vec<_> = futures::stream::iter(test_files)
|
||||
.map(|test_file| {
|
||||
let validator = if options.include_sqlite
|
||||
&& test_file.relative_path.starts_with(SQLITE_PREFIX)
|
||||
@@ -182,7 +199,7 @@ async fn run_tests() -> Result<()> {
|
||||
currently_running_sql_tracker.clone();
|
||||
let file_start = Instant::now();
|
||||
SpawnedTask::spawn(async move {
|
||||
match (
|
||||
let result = match (
|
||||
options.postgres_runner,
|
||||
options.complete,
|
||||
options.substrait_round_trip,
|
||||
@@ -197,7 +214,7 @@ async fn run_tests() -> Result<()> {
|
||||
currently_running_sql_tracker_clone,
|
||||
colored_output,
|
||||
)
|
||||
.await?
|
||||
.await
|
||||
}
|
||||
(false, false, _) => {
|
||||
run_test_file(
|
||||
@@ -209,7 +226,7 @@ async fn run_tests() -> Result<()> {
|
||||
currently_running_sql_tracker_clone,
|
||||
colored_output,
|
||||
)
|
||||
.await?
|
||||
.await
|
||||
}
|
||||
(false, true, _) => {
|
||||
run_complete_file(
|
||||
@@ -219,7 +236,7 @@ async fn run_tests() -> Result<()> {
|
||||
m_style_clone,
|
||||
currently_running_sql_tracker_clone,
|
||||
)
|
||||
.await?
|
||||
.await
|
||||
}
|
||||
(true, false, _) => {
|
||||
run_test_file_with_postgres(
|
||||
@@ -230,7 +247,7 @@ async fn run_tests() -> Result<()> {
|
||||
filters.as_ref(),
|
||||
currently_running_sql_tracker_clone,
|
||||
)
|
||||
.await?
|
||||
.await
|
||||
}
|
||||
(true, true, _) => {
|
||||
run_complete_file_with_postgres(
|
||||
@@ -240,22 +257,35 @@ async fn run_tests() -> Result<()> {
|
||||
m_style_clone,
|
||||
currently_running_sql_tracker_clone,
|
||||
)
|
||||
.await?
|
||||
.await
|
||||
}
|
||||
};
|
||||
// Log slow files (>30s) for CI debugging
|
||||
|
||||
let elapsed = file_start.elapsed();
|
||||
if elapsed.as_secs() > 30 {
|
||||
if timing_debug_slow_files && elapsed.as_secs() > 30 {
|
||||
eprintln!(
|
||||
"Slow file: {} took {:.1}s",
|
||||
relative_path_for_timing.display(),
|
||||
elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
|
||||
(result, elapsed)
|
||||
})
|
||||
.join()
|
||||
.map(move |result| (result, relative_path, currently_running_sql_tracker))
|
||||
.map(move |result| {
|
||||
let elapsed = match &result {
|
||||
Ok((_, elapsed)) => *elapsed,
|
||||
Err(_) => Duration::ZERO,
|
||||
};
|
||||
|
||||
(
|
||||
result.map(|(thread_result, _)| thread_result),
|
||||
relative_path,
|
||||
currently_running_sql_tracker,
|
||||
elapsed,
|
||||
)
|
||||
})
|
||||
})
|
||||
// run up to num_cpus streams in parallel
|
||||
.buffer_unordered(options.test_threads)
|
||||
@@ -274,10 +304,30 @@ async fn run_tests() -> Result<()> {
|
||||
}
|
||||
}
|
||||
})
|
||||
.flat_map(|(result, test_file_path, current_sql)| {
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let mut file_timings: Vec<FileTiming> = file_results
|
||||
.iter()
|
||||
.map(|(_, path, _, elapsed)| FileTiming {
|
||||
relative_path: path.clone(),
|
||||
elapsed: *elapsed,
|
||||
})
|
||||
.collect();
|
||||
|
||||
file_timings.sort_by(|a, b| {
|
||||
b.elapsed
|
||||
.cmp(&a.elapsed)
|
||||
.then_with(|| a.relative_path.cmp(&b.relative_path))
|
||||
});
|
||||
|
||||
print_timing_summary(&options, &m, is_ci, &file_timings)?;
|
||||
|
||||
let errors: Vec<_> = file_results
|
||||
.into_iter()
|
||||
.filter_map(|(result, test_file_path, current_sql, _)| {
|
||||
// Filter out any Ok() leaving only the DataFusionErrors
|
||||
futures::stream::iter(match result {
|
||||
// Tokio panic error
|
||||
match result {
|
||||
Err(e) => {
|
||||
let error = DataFusionError::External(Box::new(e));
|
||||
let current_sql = current_sql.get_currently_running_sqls();
|
||||
@@ -307,10 +357,9 @@ async fn run_tests() -> Result<()> {
|
||||
}
|
||||
}
|
||||
Ok(thread_result) => thread_result.err(),
|
||||
})
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
.collect();
|
||||
|
||||
m.println(format!(
|
||||
"Completed {} test files in {}",
|
||||
@@ -332,6 +381,69 @@ async fn run_tests() -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
fn print_timing_summary(
|
||||
options: &Options,
|
||||
progress: &MultiProgress,
|
||||
is_ci: bool,
|
||||
file_timings: &[FileTiming],
|
||||
) -> Result<()> {
|
||||
let mode = options.timing_summary_mode(is_ci);
|
||||
if mode == TimingSummaryMode::Off || file_timings.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let top_n = options.timing_top_n;
|
||||
debug_assert!(matches!(
|
||||
mode,
|
||||
TimingSummaryMode::Top | TimingSummaryMode::Full
|
||||
));
|
||||
let count = if mode == TimingSummaryMode::Full {
|
||||
file_timings.len()
|
||||
} else {
|
||||
top_n
|
||||
};
|
||||
|
||||
progress.println("Per-file elapsed summary (deterministic):")?;
|
||||
for (idx, timing) in file_timings.iter().take(count).enumerate() {
|
||||
progress.println(format!(
|
||||
"{:>3}. {:>8.3}s {}",
|
||||
idx + 1,
|
||||
timing.elapsed.as_secs_f64(),
|
||||
timing.relative_path.display()
|
||||
))?;
|
||||
}
|
||||
|
||||
if mode != TimingSummaryMode::Full && file_timings.len() > count {
|
||||
progress.println(format!(
|
||||
"... {} more files omitted (use --timing-summary full to show all)",
|
||||
file_timings.len() - count
|
||||
))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_env_truthy(name: &str) -> bool {
|
||||
std::env::var_os(name)
|
||||
.and_then(|value| value.into_string().ok())
|
||||
.is_some_and(|value| {
|
||||
matches!(
|
||||
value.trim().to_ascii_lowercase().as_str(),
|
||||
"1" | "true" | "yes" | "on"
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_timing_top_n(arg: &str) -> std::result::Result<usize, String> {
|
||||
let parsed = arg
|
||||
.parse::<usize>()
|
||||
.map_err(|error| format!("invalid value '{arg}': {error}"))?;
|
||||
if parsed == 0 {
|
||||
return Err("must be >= 1".to_string());
|
||||
}
|
||||
Ok(parsed)
|
||||
}
|
||||
|
||||
async fn run_test_file_substrait_round_trip(
|
||||
test_file: TestFile,
|
||||
validator: Validator,
|
||||
@@ -825,6 +937,24 @@ struct Options {
|
||||
)]
|
||||
test_threads: usize,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
env = "SLT_TIMING_SUMMARY",
|
||||
value_enum,
|
||||
default_value_t = TimingSummaryMode::Auto,
|
||||
help = "Per-file timing summary mode: auto|off|top|full"
|
||||
)]
|
||||
timing_summary: TimingSummaryMode,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
env = "SLT_TIMING_TOP_N",
|
||||
default_value_t = 10,
|
||||
value_parser = parse_timing_top_n,
|
||||
help = "Number of files to show when timing summary mode is auto/top (must be >= 1)"
|
||||
)]
|
||||
timing_top_n: usize,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
value_name = "MODE",
|
||||
@@ -835,6 +965,19 @@ struct Options {
|
||||
}
|
||||
|
||||
impl Options {
|
||||
fn timing_summary_mode(&self, is_ci: bool) -> TimingSummaryMode {
|
||||
match self.timing_summary {
|
||||
TimingSummaryMode::Auto => {
|
||||
if is_ci {
|
||||
TimingSummaryMode::Top
|
||||
} else {
|
||||
TimingSummaryMode::Off
|
||||
}
|
||||
}
|
||||
mode => mode,
|
||||
}
|
||||
}
|
||||
|
||||
/// Because this test can be run as a cargo test, commands like
|
||||
///
|
||||
/// ```shell
|
||||
@@ -886,7 +1029,7 @@ impl Options {
|
||||
ColorChoice::Never => false,
|
||||
ColorChoice::Auto => {
|
||||
// CARGO_TERM_COLOR takes precedence over auto-detection
|
||||
let cargo_term_color = ColorChoice::from_str(
|
||||
let cargo_term_color = <ColorChoice as FromStr>::from_str(
|
||||
&std::env::var("CARGO_TERM_COLOR")
|
||||
.unwrap_or_else(|_| "auto".to_string()),
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user