Add sqlite test files, progress bar, and automatic postgres container management into sqllogictests (#13936)

* Fix md5 return_type to only return Utf8 as per current code impl.

* Add support for sqlite test files to sqllogictest

* Force version 0.24.0 of sqllogictest dependency until issue with labels is fixed.

* Removed workaround for bug that was fixed.

* Git submodule update ... err update, link to sqlite tests.

* Git submodule update

* Readd submodule

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
This commit is contained in:
Bruce Ritchie
2025-01-01 07:51:54 -05:00
committed by GitHub
parent 9eca7d165c
commit aafec07e08
8 changed files with 615 additions and 105 deletions
+4
View File
@@ -4,3 +4,7 @@
[submodule "testing"]
path = testing
url = https://github.com/apache/arrow-testing
[submodule "datafusion-testing"]
path = datafusion-testing
url = https://github.com/apache/datafusion-testing.git
branch = main
Submodule datafusion-testing added at e2e320c947
+8 -1
View File
@@ -45,9 +45,11 @@ datafusion-common = { workspace = true, default-features = true }
datafusion-common-runtime = { workspace = true, default-features = true }
futures = { workspace = true }
half = { workspace = true, default-features = true }
indicatif = "0.17"
itertools = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
once_cell = { version = "1.20", optional = true }
postgres-protocol = { version = "0.6.7", optional = true }
postgres-types = { version = "0.2.8", features = ["derive", "with-chrono-0_4"], optional = true }
rust_decimal = { version = "1.36.0", features = ["tokio-pg"] }
@@ -56,6 +58,8 @@ rust_decimal = { version = "1.36.0", features = ["tokio-pg"] }
sqllogictest = "=0.24.0"
sqlparser = { workspace = true }
tempfile = { workspace = true }
testcontainers = { version = "0.23", features = ["default"], optional = true }
testcontainers-modules = { version = "0.11", features = ["postgres"], optional = true }
thiserror = "2.0.0"
tokio = { workspace = true }
tokio-postgres = { version = "0.7.12", optional = true }
@@ -65,9 +69,12 @@ avro = ["datafusion/avro"]
postgres = [
"bytes",
"chrono",
"tokio-postgres",
"once_cell",
"postgres-types",
"postgres-protocol",
"testcontainers",
"testcontainers-modules",
"tokio-postgres",
]
[dev-dependencies]
+42 -4
View File
@@ -28,13 +28,14 @@ This crate is a submodule of DataFusion that contains an implementation of [sqll
## Overview
This crate uses [sqllogictest-rs](https://github.com/risinglightdb/sqllogictest-rs) to parse and run `.slt` files in the
[`test_files`](test_files) directory of this crate.
[`test_files`](test_files) directory of this crate or the [`data/sqlite`](sqlite)
directory of the datafusion-testing crate.
## Testing setup
1. `rustup update stable` DataFusion uses the latest stable release of rust
2. `git submodule init`
3. `git submodule update`
3. `git submodule update --init --remote --recursive`
## Running tests: TLDR Examples
@@ -160,7 +161,7 @@ cargo test --test sqllogictests -- information
Test files that start with prefix `pg_compat_` verify compatibility
with Postgres by running the same script files both with DataFusion and with Postgres
In order to run the sqllogictests running against a previously running Postgres instance, do:
In order to have the sqllogictest run against an existing running Postgres instance, do:
```shell
PG_COMPAT=true PG_URI="postgresql://postgres@127.0.0.1/postgres" cargo test --features=postgres --test sqllogictests
@@ -172,7 +173,7 @@ The environment variables:
2. `PG_URI` contains a `libpq` style connection string, whose format is described in
[the docs](https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url)
One way to create a suitable a posgres container in docker is to use
One way to create a suitable a postgres container in docker is to use
the [Official Image](https://hub.docker.com/_/postgres) with a command
such as the following. Note the collation **must** be set to `C` otherwise
`ORDER BY` will not match DataFusion and the tests will diff.
@@ -185,6 +186,15 @@ docker run \
postgres
```
If you do not want to create a new postgres database and you have docker
installed you can skip providing a PG_URI env variable and the sqllogictest
runner will automatically create a temporary postgres docker container.
For example:
```shell
PG_COMPAT=true cargo test --features=postgres --test sqllogictests
```
## Running Tests: `tpch`
Test files in `tpch` directory runs against the `TPCH` data set (SF =
@@ -205,6 +215,34 @@ Then you need to add `INCLUDE_TPCH=true` to run tpch tests:
INCLUDE_TPCH=true cargo test --test sqllogictests
```
## Running Tests: `sqlite`
Test files in `data/sqlite` directory of the datafusion-testing crate were
sourced from the [sqlite test suite](https://www.sqlite.org/sqllogictest/dir?ci=tip) and have been cleansed and updated to
run within DataFusion's sqllogictest runner.
To run the sqlite tests you need to increase the rust stack size and add
`INCLUDE_SQLITE=true` to run the sqlite tests:
```shell
export RUST_MIN_STACK=30485760;
INCLUDE_SQLITE=true cargo test --test sqllogictests
```
Note that there are well over 5 million queries in these tests and running the
sqlite tests will take a long time. You may wish to run them in release-nonlto mode:
```shell
INCLUDE_SQLITE=true cargo test --profile release-nonlto --test sqllogictests
```
The sqlite tests can also be run with the postgres runner to verify compatibility:
```shell
export RUST_MIN_STACK=30485760;
PG_COMPAT=true INCLUDE_SQLITE=true cargo test --features=postgres --test sqllogictests
```
## Updating tests: Completion Mode
In test script completion mode, `sqllogictests` reads a prototype script and runs the statements and queries against the
+460 -70
View File
@@ -16,57 +16,129 @@
// under the License.
use clap::Parser;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_sqllogictest::{DataFusion, TestContext};
use futures::stream::StreamExt;
use itertools::Itertools;
use log::info;
use sqllogictest::{strict_column_validator, Normalizer};
use std::ffi::OsStr;
use std::fs;
use std::path::{Path, PathBuf};
use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_sqllogictest::{DataFusion, TestContext};
use futures::stream::StreamExt;
use indicatif::{
HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle,
};
use itertools::Itertools;
use log::Level::{Info, Warn};
use log::{info, log_enabled, warn};
#[cfg(feature = "postgres")]
use once_cell::sync::Lazy;
use sqllogictest::{
parse_file, strict_column_validator, AsyncDB, Condition, Normalizer, Record,
Validator,
};
#[cfg(feature = "postgres")]
use std::env::set_var;
use std::ffi::OsStr;
use std::fs;
#[cfg(feature = "postgres")]
use std::future::Future;
use std::path::{Path, PathBuf};
#[cfg(feature = "postgres")]
use std::{env, thread};
#[cfg(feature = "postgres")]
use testcontainers::core::IntoContainerPort;
#[cfg(feature = "postgres")]
use testcontainers::runners::AsyncRunner;
#[cfg(feature = "postgres")]
use testcontainers::ImageExt;
#[cfg(feature = "postgres")]
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
#[cfg(feature = "postgres")]
use tokio::sync::{mpsc, Mutex};
#[cfg(feature = "postgres")]
use ContainerCommands::{FetchHost, FetchPort};
const TEST_DIRECTORY: &str = "test_files/";
const DATAFUSION_TESTING_TEST_DIRECTORY: &str = "../../datafusion-testing/data/";
const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_";
const SQLITE_PREFIX: &str = "sqlite";
pub fn main() -> Result<()> {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.build()?
.block_on(run_tests())
}
// Trailing whitespace from lines in SLT will typically be removed, but do not fail if it is not
// If particular test wants to cover trailing whitespace on a value,
// it should project additional non-whitespace column on the right.
#[allow(clippy::ptr_arg)]
fn normalizer(s: &String) -> String {
// Trailing whitespace from lines in SLT will typically be removed, but do not fail if it is not
// If particular test wants to cover trailing whitespace on a value,
// it should project additional non-whitespace column on the right.
s.trim_end().to_owned()
fn value_normalizer(s: &String) -> String {
s.trim_end().to_string()
}
fn value_validator(
fn sqlite_value_validator(
normalizer: Normalizer,
actual: &[Vec<String>],
expected: &[String],
) -> bool {
let expected = expected.iter().map(normalizer).collect::<Vec<_>>();
let actual = actual
let normalized_expected = expected.iter().map(normalizer).collect::<Vec<_>>();
let normalized_actual = actual
.iter()
.map(|strs| strs.iter().map(normalizer).join(" "))
.collect_vec();
if log_enabled!(Info) && normalized_actual != normalized_expected {
info!("sqlite validation failed. actual vs expected:");
for i in 0..normalized_actual.len() {
info!("[{i}] {}<eol>", normalized_actual[i]);
info!(
"[{i}] {}<eol>",
if normalized_expected.len() >= i {
&normalized_expected[i]
} else {
"No more results"
}
);
}
}
normalized_actual == normalized_expected
}
fn df_value_validator(
normalizer: Normalizer,
actual: &[Vec<String>],
expected: &[String],
) -> bool {
let normalized_expected = expected.iter().map(normalizer).collect::<Vec<_>>();
let normalized_actual = actual
.iter()
.map(|strs| strs.iter().join(" "))
// Editors do not preserve trailing whitespace, so expected may or may not lack it included
.map(|str| normalizer(&str))
.collect::<Vec<_>>();
actual == expected
.map(|str| str.trim_end().to_string())
.collect_vec();
if log_enabled!(Warn) && normalized_actual != normalized_expected {
warn!("df validation failed. actual vs expected:");
for i in 0..normalized_actual.len() {
warn!("[{i}] {}<eol>", normalized_actual[i]);
warn!(
"[{i}] {}<eol>",
if normalized_expected.len() >= i {
&normalized_expected[i]
} else {
"No more results"
}
);
}
}
normalized_actual == normalized_expected
}
/// Sets up an empty directory at test_files/scratch/<name>
/// creating it if needed and clearing any file contents if it exists
/// This allows tests for inserting to external tables or copy to
/// to persist data to disk and have consistent state when running
/// persist data to disk and have consistent state when running
/// a new test
fn setup_scratch_dir(name: &Path) -> Result<()> {
// go from copy.slt --> copy
@@ -97,23 +169,89 @@ async fn run_tests() -> Result<()> {
}
options.warn_on_ignored();
#[cfg(feature = "postgres")]
let start_pg_database = options.postgres_runner && !is_pg_uri_set();
#[cfg(feature = "postgres")]
if start_pg_database {
info!("Starting postgres db ...");
thread::spawn(|| {
execute_blocking(start_postgres(
&POSTGRES_IN,
&POSTGRES_HOST,
&POSTGRES_PORT,
&POSTGRES_STOPPED,
))
});
POSTGRES_IN.tx.send(FetchHost).unwrap();
let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap();
POSTGRES_IN.tx.send(FetchPort).unwrap();
let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap();
let pg_uri = format!("postgresql://postgres:postgres@{db_host}:{db_port}/test");
info!("Postgres uri is {pg_uri}");
set_var("PG_URI", pg_uri);
}
// Run all tests in parallel, reporting failures at the end
//
// Doing so is safe because each slt file runs with its own
// `SessionContext` and should not have side effects (like
// modifying shared state like `/tmp/`)
let m = MultiProgress::with_draw_target(ProgressDrawTarget::stderr_with_hz(1));
let m_style = ProgressStyle::with_template(
"[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
)
.unwrap()
.progress_chars("##-");
let start = Instant::now();
let errors: Vec<_> = futures::stream::iter(read_test_files(&options)?)
.map(|test_file| {
let validator = if options.include_sqlite
&& test_file.relative_path.starts_with(SQLITE_PREFIX)
{
sqlite_value_validator
} else {
df_value_validator
};
let m_clone = m.clone();
let m_style_clone = m_style.clone();
SpawnedTask::spawn(async move {
let file_path = test_file.relative_path.clone();
let start = datafusion::common::instant::Instant::now();
match (options.postgres_runner, options.complete) {
(false, false) => run_test_file(test_file).await?,
(false, true) => run_complete_file(test_file).await?,
(true, false) => run_test_file_with_postgres(test_file).await?,
(true, true) => run_complete_file_with_postgres(test_file).await?,
(false, false) => {
run_test_file(test_file, validator, m_clone, m_style_clone)
.await?
}
(false, true) => {
run_complete_file(test_file, validator, m_clone, m_style_clone)
.await?
}
(true, false) => {
run_test_file_with_postgres(
test_file,
validator,
m_clone,
m_style_clone,
)
.await?
}
(true, true) => {
run_complete_file_with_postgres(
test_file,
validator,
m_clone,
m_style_clone,
)
.await?
}
}
println!("Executed {:?}. Took {:?}", file_path, start.elapsed());
Ok(()) as Result<()>
})
.join()
@@ -136,6 +274,15 @@ async fn run_tests() -> Result<()> {
.collect()
.await;
m.println(format!("Completed in {}", HumanDuration(start.elapsed())))?;
#[cfg(feature = "postgres")]
if start_pg_database {
println!("Stopping postgres db ...");
POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(());
POSTGRES_STOPPED.rx.lock().await.recv().await;
}
// report on any errors
if !errors.is_empty() {
for e in &errors {
@@ -147,60 +294,148 @@ async fn run_tests() -> Result<()> {
}
}
async fn run_test_file(test_file: TestFile) -> Result<()> {
#[cfg(feature = "postgres")]
fn is_pg_uri_set() -> bool {
match env::var("PG_URI") {
Ok(_) => true,
Err(_) => false,
}
}
async fn run_test_file(
test_file: TestFile,
validator: Validator,
mp: MultiProgress,
mp_style: ProgressStyle,
) -> Result<()> {
let TestFile {
path,
relative_path,
} = test_file;
info!("Running with DataFusion runner: {}", path.display());
let Some(test_ctx) = TestContext::try_new_for_test_file(&relative_path).await else {
info!("Skipping: {}", path.display());
return Ok(());
};
setup_scratch_dir(&relative_path)?;
let count: u64 = get_record_count(&path, "Datafusion".to_string());
let pb = mp.add(ProgressBar::new(count));
pb.set_style(mp_style);
pb.set_message(format!("{:?}", &relative_path));
let mut runner = sqllogictest::Runner::new(|| async {
Ok(DataFusion::new(
test_ctx.session_ctx().clone(),
relative_path.clone(),
pb.clone(),
))
});
runner.add_label("Datafusion");
runner.with_column_validator(strict_column_validator);
runner.with_normalizer(normalizer);
runner.with_validator(value_validator);
runner
runner.with_normalizer(value_normalizer);
runner.with_validator(validator);
let res = runner
.run_file_async(path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))
.map_err(|e| DataFusionError::External(Box::new(e)));
pb.finish_and_clear();
res
}
fn get_record_count(path: &PathBuf, label: String) -> u64 {
let records: Vec<Record<<DataFusion as AsyncDB>::ColumnType>> =
parse_file(path).unwrap();
let mut count: u64 = 0;
records.iter().for_each(|rec| match rec {
Record::Query { conditions, .. } => {
if conditions.is_empty()
|| !conditions.contains(&Condition::SkipIf {
label: label.clone(),
})
|| conditions.contains(&Condition::OnlyIf {
label: label.clone(),
})
{
count += 1;
}
}
Record::Statement { conditions, .. } => {
if conditions.is_empty()
|| !conditions.contains(&Condition::SkipIf {
label: label.clone(),
})
|| conditions.contains(&Condition::OnlyIf {
label: label.clone(),
})
{
count += 1;
}
}
_ => {}
});
count
}
#[cfg(feature = "postgres")]
async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> {
async fn run_test_file_with_postgres(
test_file: TestFile,
validator: Validator,
mp: MultiProgress,
mp_style: ProgressStyle,
) -> Result<()> {
use datafusion_sqllogictest::Postgres;
let TestFile {
path,
relative_path,
} = test_file;
info!("Running with Postgres runner: {}", path.display());
setup_scratch_dir(&relative_path)?;
let mut runner =
sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone()));
let count: u64 = get_record_count(&path, "postgresql".to_string());
let pb = mp.add(ProgressBar::new(count));
pb.set_style(mp_style);
pb.set_message(format!("{:?}", &relative_path));
let mut runner = sqllogictest::Runner::new(|| {
Postgres::connect(relative_path.clone(), pb.clone())
});
runner.add_label("postgres");
runner.with_column_validator(strict_column_validator);
runner.with_normalizer(normalizer);
runner.with_validator(value_validator);
runner.with_normalizer(value_normalizer);
runner.with_validator(validator);
runner
.run_file_async(path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
pb.finish_and_clear();
Ok(())
}
#[cfg(not(feature = "postgres"))]
async fn run_test_file_with_postgres(_test_file: TestFile) -> Result<()> {
async fn run_test_file_with_postgres(
_test_file: TestFile,
_validator: Validator,
_mp: MultiProgress,
_mp_style: ProgressStyle,
) -> Result<()> {
use datafusion_common::plan_err;
plan_err!("Can not run with postgres as postgres feature is not enabled")
}
async fn run_complete_file(test_file: TestFile) -> Result<()> {
async fn run_complete_file(
test_file: TestFile,
validator: Validator,
mp: MultiProgress,
mp_style: ProgressStyle,
) -> Result<()> {
let TestFile {
path,
relative_path,
@@ -213,30 +448,48 @@ async fn run_complete_file(test_file: TestFile) -> Result<()> {
return Ok(());
};
setup_scratch_dir(&relative_path)?;
let count: u64 = get_record_count(&path, "Datafusion".to_string());
let pb = mp.add(ProgressBar::new(count));
pb.set_style(mp_style);
pb.set_message(format!("{:?}", &relative_path));
let mut runner = sqllogictest::Runner::new(|| async {
Ok(DataFusion::new(
test_ctx.session_ctx().clone(),
relative_path.clone(),
pb.clone(),
))
});
let col_separator = " ";
runner
let res = runner
.update_test_file(
path,
col_separator,
value_validator,
normalizer,
validator,
value_normalizer,
strict_column_validator,
)
.await
// Can't use e directly because it isn't marked Send, so turn it into a string.
.map_err(|e| {
DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}"))
})
});
pb.finish_and_clear();
res
}
#[cfg(feature = "postgres")]
async fn run_complete_file_with_postgres(test_file: TestFile) -> Result<()> {
async fn run_complete_file_with_postgres(
test_file: TestFile,
validator: Validator,
mp: MultiProgress,
mp_style: ProgressStyle,
) -> Result<()> {
use datafusion_sqllogictest::Postgres;
let TestFile {
path,
@@ -247,26 +500,48 @@ async fn run_complete_file_with_postgres(test_file: TestFile) -> Result<()> {
path.display()
);
setup_scratch_dir(&relative_path)?;
let mut runner =
sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone()));
let count: u64 = get_record_count(&path, "postgresql".to_string());
let pb = mp.add(ProgressBar::new(count));
pb.set_style(mp_style);
pb.set_message(format!("{:?}", &relative_path));
let mut runner = sqllogictest::Runner::new(|| {
Postgres::connect(relative_path.clone(), pb.clone())
});
runner.add_label("postgres");
runner.with_column_validator(strict_column_validator);
runner.with_normalizer(value_normalizer);
runner.with_validator(validator);
let col_separator = " ";
runner
let res = runner
.update_test_file(
path,
col_separator,
value_validator,
normalizer,
validator,
value_normalizer,
strict_column_validator,
)
.await
// Can't use e directly because it isn't marked Send, so turn it into a string.
.map_err(|e| {
DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}"))
})
});
pb.finish_and_clear();
res
}
#[cfg(not(feature = "postgres"))]
async fn run_complete_file_with_postgres(_test_file: TestFile) -> Result<()> {
async fn run_complete_file_with_postgres(
_test_file: TestFile,
_validator: Validator,
_mp: MultiProgress,
_mp_style: ProgressStyle,
) -> Result<()> {
use datafusion_common::plan_err;
plan_err!("Can not run with postgres as postgres feature is not enabled")
}
@@ -282,11 +557,14 @@ struct TestFile {
impl TestFile {
fn new(path: PathBuf) -> Self {
let relative_path = PathBuf::from(
path.to_string_lossy()
.strip_prefix(TEST_DIRECTORY)
.unwrap_or(""),
);
let p = path.to_string_lossy();
let relative_path = PathBuf::from(if p.starts_with(TEST_DIRECTORY) {
p.strip_prefix(TEST_DIRECTORY).unwrap()
} else if p.starts_with(DATAFUSION_TESTING_TEST_DIRECTORY) {
p.strip_prefix(DATAFUSION_TESTING_TEST_DIRECTORY).unwrap()
} else {
""
});
Self {
path,
@@ -298,6 +576,14 @@ impl TestFile {
self.path.extension() == Some(OsStr::new("slt"))
}
fn check_sqlite(&self, options: &Options) -> bool {
if !self.relative_path.starts_with(SQLITE_PREFIX) {
return true;
}
options.include_sqlite
}
fn check_tpch(&self, options: &Options) -> bool {
if !self.relative_path.starts_with("tpch") {
return true;
@@ -310,15 +596,29 @@ impl TestFile {
fn read_test_files<'a>(
options: &'a Options,
) -> Result<Box<dyn Iterator<Item = TestFile> + 'a>> {
Ok(Box::new(
read_dir_recursive(TEST_DIRECTORY)?
let mut paths = read_dir_recursive(TEST_DIRECTORY)?
.into_iter()
.map(TestFile::new)
.filter(|f| options.check_test_file(&f.relative_path))
.filter(|f| f.is_slt_file())
.filter(|f| f.check_tpch(options))
.filter(|f| f.check_sqlite(options))
.filter(|f| options.check_pg_compat_file(f.path.as_path()))
.collect::<Vec<_>>();
if options.include_sqlite {
let mut sqlite_paths = read_dir_recursive(DATAFUSION_TESTING_TEST_DIRECTORY)?
.into_iter()
.map(TestFile::new)
.filter(|f| options.check_test_file(&f.relative_path))
.filter(|f| f.is_slt_file())
.filter(|f| f.check_tpch(options))
.filter(|f| options.check_pg_compat_file(f.path.as_path())),
))
.filter(|f| f.check_sqlite(options))
.filter(|f| options.check_pg_compat_file(f.path.as_path()))
.collect::<Vec<_>>();
paths.append(&mut sqlite_paths)
}
Ok(Box::new(paths.into_iter()))
}
fn read_dir_recursive<P: AsRef<Path>>(path: P) -> Result<Vec<PathBuf>> {
@@ -350,7 +650,7 @@ fn read_dir_recursive_impl(dst: &mut Vec<PathBuf>, path: &Path) -> Result<()> {
/// Parsed command line options
///
/// This structure attempts to mimic the command line options of the built in rust test runner
/// This structure attempts to mimic the command line options of the built-in rust test runner
/// accepted by IDEs such as CLion that pass arguments
///
/// See <https://github.com/apache/datafusion/issues/8287> for more details
@@ -367,6 +667,9 @@ struct Options {
)]
postgres_runner: bool,
#[clap(long, env = "INCLUDE_SQLITE", help = "Include sqlite files")]
include_sqlite: bool,
#[clap(long, env = "INCLUDE_TPCH", help = "Include tpch files")]
include_tpch: bool,
@@ -431,10 +734,13 @@ impl Options {
.any(|filter| relative_path.to_string_lossy().contains(filter))
}
/// Postgres runner executes only tests in files with specific names
/// Postgres runner executes only tests in files with specific names or in
/// specific folders
fn check_pg_compat_file(&self, path: &Path) -> bool {
let file_name = path.file_name().unwrap().to_str().unwrap().to_string();
!self.postgres_runner || file_name.starts_with(PG_COMPAT_FILE_PREFIX)
!self.postgres_runner
|| file_name.starts_with(PG_COMPAT_FILE_PREFIX)
|| (self.include_sqlite && path.to_string_lossy().contains(SQLITE_PREFIX))
}
/// Logs warning messages to stdout if any ignored options are passed
@@ -452,3 +758,87 @@ impl Options {
}
}
}
#[cfg(feature = "postgres")]
pub async fn start_postgres(
in_channel: &Channel<ContainerCommands>,
host_channel: &Channel<String>,
port_channel: &Channel<u16>,
stopped_channel: &Channel<()>,
) {
info!("Starting postgres test container with user postgres/postgres and db test");
let container = testcontainers_modules::postgres::Postgres::default()
.with_user("postgres")
.with_password("postgres")
.with_db_name("test")
.with_mapped_port(16432, 5432.tcp())
.with_tag("17-alpine")
.start()
.await
.unwrap();
// uncomment this if you are running docker in docker
// let host = "host.docker.internal".to_string();
let host = container.get_host().await.unwrap().to_string();
let port = container.get_host_port_ipv4(5432).await.unwrap();
let mut rx = in_channel.rx.lock().await;
while let Some(command) = rx.recv().await {
match command {
FetchHost => host_channel.tx.send(host.clone()).unwrap(),
FetchPort => port_channel.tx.send(port).unwrap(),
ContainerCommands::Stop => {
container.stop().await.unwrap();
stopped_channel.tx.send(()).unwrap();
rx.close();
}
}
}
}
#[cfg(feature = "postgres")]
#[derive(Debug)]
pub enum ContainerCommands {
FetchHost,
FetchPort,
Stop,
}
#[cfg(feature = "postgres")]
pub struct Channel<T> {
pub tx: UnboundedSender<T>,
pub rx: Mutex<UnboundedReceiver<T>>,
}
#[cfg(feature = "postgres")]
pub fn channel<T>() -> Channel<T> {
let (tx, rx) = mpsc::unbounded_channel();
Channel {
tx,
rx: Mutex::new(rx),
}
}
#[cfg(feature = "postgres")]
pub fn execute_blocking<F: Future>(f: F) {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(f);
}
#[cfg(feature = "postgres")]
pub struct HostPort {
pub host: String,
pub port: u16,
}
#[cfg(feature = "postgres")]
static POSTGRES_IN: Lazy<Channel<ContainerCommands>> = Lazy::new(channel);
#[cfg(feature = "postgres")]
static POSTGRES_HOST: Lazy<Channel<String>> = Lazy::new(channel);
#[cfg(feature = "postgres")]
static POSTGRES_PORT: Lazy<Channel<u16>> = Lazy::new(channel);
#[cfg(feature = "postgres")]
static POSTGRES_STOPPED: Lazy<Channel<()>> = Lazy::new(channel);
@@ -18,26 +18,49 @@
use std::sync::Arc;
use std::{path::PathBuf, time::Duration};
use super::{error::Result, normalize, DFSqlLogicTestError};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::SessionContext;
use log::info;
use indicatif::ProgressBar;
use log::Level::{Debug, Info};
use log::{debug, log_enabled, warn};
use sqllogictest::DBOutput;
use super::{error::Result, normalize, DFSqlLogicTestError};
use tokio::time::Instant;
use crate::engines::output::{DFColumnType, DFOutput};
pub struct DataFusion {
ctx: SessionContext,
relative_path: PathBuf,
pb: ProgressBar,
}
impl DataFusion {
pub fn new(ctx: SessionContext, relative_path: PathBuf) -> Self {
Self { ctx, relative_path }
pub fn new(ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar) -> Self {
Self {
ctx,
relative_path,
pb,
}
}
fn update_slow_count(&self) {
let msg = self.pb.message();
let split: Vec<&str> = msg.split(" ").collect();
let mut current_count = 0;
if split.len() > 2 {
// third match will be current slow count
current_count = split[2].parse::<i32>().unwrap();
}
current_count += 1;
self.pb
.set_message(format!("{} - {} took > 500 ms", split[0], current_count));
}
}
@@ -47,12 +70,32 @@ impl sqllogictest::AsyncDB for DataFusion {
type ColumnType = DFColumnType;
async fn run(&mut self, sql: &str) -> Result<DFOutput> {
info!(
"[{}] Running query: \"{}\"",
self.relative_path.display(),
sql
);
run_query(&self.ctx, sql).await
if log_enabled!(Debug) {
debug!(
"[{}] Running query: \"{}\"",
self.relative_path.display(),
sql
);
}
let start = Instant::now();
let result = run_query(&self.ctx, sql).await;
let duration = start.elapsed();
if duration.gt(&Duration::from_millis(500)) {
self.update_slow_count();
}
self.pb.inc(1);
if log_enabled!(Info) && duration.gt(&Duration::from_secs(2)) {
warn!(
"[{}] Running query took more than 2 sec ({duration:?}): \"{sql}\"",
self.relative_path.display()
);
}
result
}
/// Engine name of current database.
@@ -15,22 +15,24 @@
// specific language governing permissions and limitations
// under the License.
/// Postgres engine implementation for sqllogictest.
use std::path::{Path, PathBuf};
use std::str::FromStr;
use async_trait::async_trait;
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
use log::debug;
use log::{debug, info};
use sqllogictest::DBOutput;
/// Postgres engine implementation for sqllogictest.
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;
use tokio::task::JoinHandle;
use super::conversion::*;
use crate::engines::output::{DFColumnType, DFOutput};
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
use indicatif::ProgressBar;
use postgres_types::Type;
use rust_decimal::Decimal;
use tokio::time::Instant;
use tokio_postgres::{Column, Row};
use types::PgRegtype;
@@ -55,6 +57,7 @@ pub struct Postgres {
join_handle: JoinHandle<()>,
/// Relative test file path
relative_path: PathBuf,
pb: ProgressBar,
}
impl Postgres {
@@ -71,11 +74,11 @@ impl Postgres {
/// ```
///
/// See https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url for format
pub async fn connect(relative_path: PathBuf) -> Result<Self> {
pub async fn connect(relative_path: PathBuf, pb: ProgressBar) -> Result<Self> {
let uri =
std::env::var("PG_URI").map_or(PG_URI.to_string(), std::convert::identity);
debug!("Using postgres connection string: {uri}");
info!("Using postgres connection string: {uri}");
let config = tokio_postgres::Config::from_str(&uri)?;
@@ -113,6 +116,7 @@ impl Postgres {
client,
join_handle,
relative_path,
pb,
})
}
@@ -181,6 +185,22 @@ impl Postgres {
tx.commit().await?;
Ok(DBOutput::StatementComplete(0))
}
fn update_slow_count(&self) {
let msg = self.pb.message();
let split: Vec<&str> = msg.split(" ").collect();
let mut current_count = 0;
if split.len() > 2 {
// second match will be current slow count
current_count += split[2].parse::<i32>().unwrap();
}
current_count += 1;
self.pb
.set_message(format!("{} - {} took > 500 ms", split[0], current_count));
}
}
/// remove single quotes from the start and end of the string
@@ -194,16 +214,13 @@ fn no_quotes(t: &str) -> &str {
/// return a schema name
fn schema_name(relative_path: &Path) -> String {
relative_path
.file_name()
.map(|name| {
name.to_string_lossy()
.chars()
.filter(|ch| ch.is_ascii_alphabetic())
.collect::<String>()
.trim_start_matches("pg_")
.to_string()
})
.unwrap_or_else(|| "default_schema".to_string())
.to_string_lossy()
.to_string()
.chars()
.filter(|ch| ch.is_ascii_alphanumeric())
.collect::<String>()
.trim_start_matches("pg_")
.to_string()
}
impl Drop for Postgres {
@@ -221,7 +238,7 @@ impl sqllogictest::AsyncDB for Postgres {
&mut self,
sql: &str,
) -> Result<DBOutput<Self::ColumnType>, Self::Error> {
println!(
debug!(
"[{}] Running query: \"{}\"",
self.relative_path.display(),
sql
@@ -242,14 +259,24 @@ impl sqllogictest::AsyncDB for Postgres {
};
if lower_sql.starts_with("copy") {
self.pb.inc(1);
return self.run_copy_command(sql).await;
}
if !is_query_sql {
self.client.execute(sql, &[]).await?;
self.pb.inc(1);
return Ok(DBOutput::StatementComplete(0));
}
let start = Instant::now();
let rows = self.client.query(sql, &[]).await?;
let duration = start.elapsed();
if duration.gt(&Duration::from_millis(500)) {
self.update_slow_count();
}
self.pb.inc(1);
let types: Vec<Type> = if rows.is_empty() {
self.client
@@ -74,7 +74,7 @@ Testing setup:
- `rustup update stable` DataFusion uses the latest stable release of rust
- `git submodule init`
- `git submodule update`
- `git submodule update --init --remote --recursive`
Formatting instructions: