mirror of
https://github.com/langchain-ai/datafusion.git
synced 2026-06-30 21:27:59 -04:00
Support newlines_in_values CSV option (#11533)
* feat!: support `newlines_in_values` CSV option This significantly simplifies the UX when dealing with large CSV files that must support newlines in (quoted) values. By default, large CSV files will be repartitioned into multiple parallel range scans. This is great for performance in the common case but when large CSVs contain newlines in values the parallel scan will fail due to splitting on newlines within quotes rather than actual line terminators. With the current implementation, this behaviour can be controlled by the session-level `datafusion.optimizer.repartition_file_scans` and `datafusion.optimizer.repartition_file_min_size` settings. This commit introduces a `newlines_in_values` option to `CsvOptions` and plumbs it through to `CsvExec`, which includes it in the test for whether parallel execution is supported. This provides a convenient and searchable way to disable file scan repartitioning on a per-CSV basis. BREAKING CHANGE: This adds new public fields to types with all public fields, which is a breaking change. * docs: normalise `newlines_in_values` documentation * test: add/fix sqllogictests for `newlines_in_values` * docs: document `datafusion.catalog.newlines_in_values` * fix: typo in config.md * chore: suppress lint on too many arguments for `CsvExec::new` * fix: always checkout `*.slt` with LF line endings This is a bit of a stab in the dark, but it might fix multiline tests on Windows. * fix: always checkout `newlines_in_values.csv` with `LF` line endings The default git behaviour of converting line endings for checked out files causes the `csv_files.slt` test to fail when testing `newlines_in_values`. This appears to be due to the quoted newlines being converted to CRLF, which are not then normalised when the CSV is read. Assuming that the sqllogictests do normalise line endings in the expected output, this could then lead to a "spurious" diff from the actual output. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
.github/ export-ignore
|
||||
datafusion/core/tests/data/newlines_in_values.csv text eol=lf
|
||||
datafusion/proto/src/generated/prost.rs linguist-generated
|
||||
datafusion/proto/src/generated/pbjson.rs linguist-generated
|
||||
|
||||
@@ -184,6 +184,16 @@ config_namespace! {
|
||||
/// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
|
||||
/// if not specified explicitly in the statement.
|
||||
pub has_header: bool, default = false
|
||||
|
||||
/// Specifies whether newlines in (quoted) CSV values are supported.
|
||||
///
|
||||
/// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
|
||||
/// if not specified explicitly in the statement.
|
||||
///
|
||||
/// Parsing newlines in quoted values may be affected by execution behaviour such as
|
||||
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
|
||||
/// parsed successfully, which may reduce performance.
|
||||
pub newlines_in_values: bool, default = false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1593,6 +1603,14 @@ config_namespace! {
|
||||
pub quote: u8, default = b'"'
|
||||
pub escape: Option<u8>, default = None
|
||||
pub double_quote: Option<bool>, default = None
|
||||
/// Specifies whether newlines in (quoted) values are supported.
|
||||
///
|
||||
/// Parsing newlines in quoted values may be affected by execution behaviour such as
|
||||
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
|
||||
/// parsed successfully, which may reduce performance.
|
||||
///
|
||||
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
|
||||
pub newlines_in_values: Option<bool>, default = None
|
||||
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
|
||||
pub schema_infer_max_rec: usize, default = 100
|
||||
pub date_format: Option<String>, default = None
|
||||
@@ -1665,6 +1683,18 @@ impl CsvOptions {
|
||||
self
|
||||
}
|
||||
|
||||
/// Specifies whether newlines in (quoted) values are supported.
|
||||
///
|
||||
/// Parsing newlines in quoted values may be affected by execution behaviour such as
|
||||
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
|
||||
/// parsed successfully, which may reduce performance.
|
||||
///
|
||||
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
|
||||
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
|
||||
self.newlines_in_values = Some(newlines_in_values);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a `CompressionTypeVariant` of CSV
|
||||
/// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
|
||||
pub fn with_file_compression_type(
|
||||
|
||||
@@ -233,6 +233,18 @@ impl CsvFormat {
|
||||
self
|
||||
}
|
||||
|
||||
/// Specifies whether newlines in (quoted) values are supported.
|
||||
///
|
||||
/// Parsing newlines in quoted values may be affected by execution behaviour such as
|
||||
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
|
||||
/// parsed successfully, which may reduce performance.
|
||||
///
|
||||
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
|
||||
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
|
||||
self.options.newlines_in_values = Some(newlines_in_values);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a `FileCompressionType` of CSV
|
||||
/// - defaults to `FileCompressionType::UNCOMPRESSED`
|
||||
pub fn with_file_compression_type(
|
||||
@@ -330,6 +342,9 @@ impl FileFormat for CsvFormat {
|
||||
self.options.quote,
|
||||
self.options.escape,
|
||||
self.options.comment,
|
||||
self.options
|
||||
.newlines_in_values
|
||||
.unwrap_or(state.config_options().catalog.newlines_in_values),
|
||||
self.options.compression.into(),
|
||||
);
|
||||
Ok(Arc::new(exec))
|
||||
@@ -1052,6 +1067,41 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
|
||||
#[tokio::test]
|
||||
async fn test_csv_parallel_newlines_in_values(n_partitions: usize) -> Result<()> {
|
||||
let config = SessionConfig::new()
|
||||
.with_repartition_file_scans(true)
|
||||
.with_repartition_file_min_size(0)
|
||||
.with_target_partitions(n_partitions);
|
||||
let csv_options = CsvReadOptions::default()
|
||||
.has_header(true)
|
||||
.newlines_in_values(true);
|
||||
let ctx = SessionContext::new_with_config(config);
|
||||
let testdata = arrow_test_data();
|
||||
ctx.register_csv(
|
||||
"aggr",
|
||||
&format!("{testdata}/csv/aggregate_test_100.csv"),
|
||||
csv_options,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let query = "select sum(c3) from aggr;";
|
||||
let query_result = ctx.sql(query).await?.collect().await?;
|
||||
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
|
||||
|
||||
#[rustfmt::skip]
|
||||
let expected = ["+--------------+",
|
||||
"| sum(aggr.c3) |",
|
||||
"+--------------+",
|
||||
"| 781 |",
|
||||
"+--------------+"];
|
||||
assert_batches_eq!(expected, &query_result);
|
||||
assert_eq!(1, actual_partitions); // csv won't be scanned in parallel when newlines_in_values is set
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read a single empty csv file in parallel
|
||||
///
|
||||
/// empty_0_byte.csv:
|
||||
|
||||
@@ -63,6 +63,14 @@ pub struct CsvReadOptions<'a> {
|
||||
pub escape: Option<u8>,
|
||||
/// If enabled, lines beginning with this byte are ignored.
|
||||
pub comment: Option<u8>,
|
||||
/// Specifies whether newlines in (quoted) values are supported.
|
||||
///
|
||||
/// Parsing newlines in quoted values may be affected by execution behaviour such as
|
||||
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
|
||||
/// parsed successfully, which may reduce performance.
|
||||
///
|
||||
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
|
||||
pub newlines_in_values: bool,
|
||||
/// An optional schema representing the CSV files. If None, CSV reader will try to infer it
|
||||
/// based on data in file.
|
||||
pub schema: Option<&'a Schema>,
|
||||
@@ -95,6 +103,7 @@ impl<'a> CsvReadOptions<'a> {
|
||||
delimiter: b',',
|
||||
quote: b'"',
|
||||
escape: None,
|
||||
newlines_in_values: false,
|
||||
file_extension: DEFAULT_CSV_EXTENSION,
|
||||
table_partition_cols: vec![],
|
||||
file_compression_type: FileCompressionType::UNCOMPRESSED,
|
||||
@@ -133,6 +142,18 @@ impl<'a> CsvReadOptions<'a> {
|
||||
self
|
||||
}
|
||||
|
||||
/// Specifies whether newlines in (quoted) values are supported.
|
||||
///
|
||||
/// Parsing newlines in quoted values may be affected by execution behaviour such as
|
||||
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
|
||||
/// parsed successfully, which may reduce performance.
|
||||
///
|
||||
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
|
||||
pub fn newlines_in_values(mut self, newlines_in_values: bool) -> Self {
|
||||
self.newlines_in_values = newlines_in_values;
|
||||
self
|
||||
}
|
||||
|
||||
/// Specify the file extension for CSV file selection
|
||||
pub fn file_extension(mut self, file_extension: &'a str) -> Self {
|
||||
self.file_extension = file_extension;
|
||||
@@ -490,6 +511,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
|
||||
.with_delimiter(self.delimiter)
|
||||
.with_quote(self.quote)
|
||||
.with_escape(self.escape)
|
||||
.with_newlines_in_values(self.newlines_in_values)
|
||||
.with_schema_infer_max_rec(self.schema_infer_max_records)
|
||||
.with_file_compression_type(self.file_compression_type.to_owned());
|
||||
|
||||
|
||||
@@ -59,6 +59,7 @@ pub struct CsvExec {
|
||||
quote: u8,
|
||||
escape: Option<u8>,
|
||||
comment: Option<u8>,
|
||||
newlines_in_values: bool,
|
||||
/// Execution metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
/// Compression type of the file associated with CsvExec
|
||||
@@ -68,6 +69,7 @@ pub struct CsvExec {
|
||||
|
||||
impl CsvExec {
|
||||
/// Create a new CSV reader execution plan provided base and specific configurations
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
base_config: FileScanConfig,
|
||||
has_header: bool,
|
||||
@@ -75,6 +77,7 @@ impl CsvExec {
|
||||
quote: u8,
|
||||
escape: Option<u8>,
|
||||
comment: Option<u8>,
|
||||
newlines_in_values: bool,
|
||||
file_compression_type: FileCompressionType,
|
||||
) -> Self {
|
||||
let (projected_schema, projected_statistics, projected_output_ordering) =
|
||||
@@ -91,6 +94,7 @@ impl CsvExec {
|
||||
delimiter,
|
||||
quote,
|
||||
escape,
|
||||
newlines_in_values,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
file_compression_type,
|
||||
cache,
|
||||
@@ -126,6 +130,17 @@ impl CsvExec {
|
||||
self.escape
|
||||
}
|
||||
|
||||
/// Specifies whether newlines in (quoted) values are supported.
|
||||
///
|
||||
/// Parsing newlines in quoted values may be affected by execution behaviour such as
|
||||
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
|
||||
/// parsed successfully, which may reduce performance.
|
||||
///
|
||||
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
|
||||
pub fn newlines_in_values(&self) -> bool {
|
||||
self.newlines_in_values
|
||||
}
|
||||
|
||||
fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
|
||||
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
|
||||
}
|
||||
@@ -196,15 +211,15 @@ impl ExecutionPlan for CsvExec {
|
||||
/// Redistribute files across partitions according to their size
|
||||
/// See comments on [`FileGroupPartitioner`] for more detail.
|
||||
///
|
||||
/// Return `None` if can't get repartitioned(empty/compressed file).
|
||||
/// Return `None` if can't get repartitioned (empty, compressed file, or `newlines_in_values` set).
|
||||
fn repartitioned(
|
||||
&self,
|
||||
target_partitions: usize,
|
||||
config: &ConfigOptions,
|
||||
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
|
||||
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
|
||||
// Parallel execution on compressed CSV file is not supported yet.
|
||||
if self.file_compression_type.is_compressed() {
|
||||
// Parallel execution on compressed CSV files or files that must support newlines in values is not supported yet.
|
||||
if self.file_compression_type.is_compressed() || self.newlines_in_values {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -589,6 +604,7 @@ mod tests {
|
||||
b'"',
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
file_compression_type.to_owned(),
|
||||
);
|
||||
assert_eq!(13, csv.base_config.file_schema.fields().len());
|
||||
@@ -658,6 +674,7 @@ mod tests {
|
||||
b'"',
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
file_compression_type.to_owned(),
|
||||
);
|
||||
assert_eq!(13, csv.base_config.file_schema.fields().len());
|
||||
@@ -727,6 +744,7 @@ mod tests {
|
||||
b'"',
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
file_compression_type.to_owned(),
|
||||
);
|
||||
assert_eq!(13, csv.base_config.file_schema.fields().len());
|
||||
@@ -793,6 +811,7 @@ mod tests {
|
||||
b'"',
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
file_compression_type.to_owned(),
|
||||
);
|
||||
assert_eq!(14, csv.base_config.file_schema.fields().len());
|
||||
@@ -858,6 +877,7 @@ mod tests {
|
||||
b'"',
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
file_compression_type.to_owned(),
|
||||
);
|
||||
assert_eq!(13, csv.base_config.file_schema.fields().len());
|
||||
@@ -953,6 +973,7 @@ mod tests {
|
||||
b'"',
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
file_compression_type.to_owned(),
|
||||
);
|
||||
|
||||
|
||||
@@ -1472,6 +1472,7 @@ pub(crate) mod tests {
|
||||
b'"',
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
FileCompressionType::UNCOMPRESSED,
|
||||
))
|
||||
}
|
||||
@@ -1496,6 +1497,7 @@ pub(crate) mod tests {
|
||||
b'"',
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
FileCompressionType::UNCOMPRESSED,
|
||||
))
|
||||
}
|
||||
@@ -3770,6 +3772,7 @@ pub(crate) mod tests {
|
||||
b'"',
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
compression_type,
|
||||
)),
|
||||
vec![("a".to_string(), "a".to_string())],
|
||||
|
||||
@@ -186,6 +186,7 @@ fn try_swapping_with_csv(
|
||||
csv.quote(),
|
||||
csv.escape(),
|
||||
csv.comment(),
|
||||
csv.newlines_in_values(),
|
||||
csv.file_compression_type,
|
||||
)) as _
|
||||
})
|
||||
@@ -1700,6 +1701,7 @@ mod tests {
|
||||
0,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
FileCompressionType::UNCOMPRESSED,
|
||||
))
|
||||
}
|
||||
@@ -1723,6 +1725,7 @@ mod tests {
|
||||
0,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
FileCompressionType::UNCOMPRESSED,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -1503,6 +1503,7 @@ mod tests {
|
||||
b'"',
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
FileCompressionType::UNCOMPRESSED,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -99,6 +99,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result<Arc<Cs
|
||||
b'"',
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
FileCompressionType::UNCOMPRESSED,
|
||||
)))
|
||||
}
|
||||
@@ -283,6 +284,7 @@ pub fn csv_exec_sorted(
|
||||
0,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
FileCompressionType::UNCOMPRESSED,
|
||||
))
|
||||
}
|
||||
@@ -339,6 +341,7 @@ pub fn csv_exec_ordered(
|
||||
b'"',
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
FileCompressionType::UNCOMPRESSED,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
id,message
|
||||
1,"hello
|
||||
world"
|
||||
2,"something
|
||||
else"
|
||||
3,"
|
||||
many
|
||||
lines
|
||||
make
|
||||
good test
|
||||
"
|
||||
4,unquoted
|
||||
value,end
|
||||
|
@@ -410,6 +410,7 @@ message CsvOptions {
|
||||
string null_value = 12; // Optional representation of null value
|
||||
bytes comment = 13; // Optional comment character as a byte
|
||||
bytes double_quote = 14; // Indicates if quotes are doubled
|
||||
bytes newlines_in_values = 15; // Indicates if newlines are supported in values
|
||||
}
|
||||
|
||||
// Options controlling CSV format
|
||||
|
||||
@@ -860,6 +860,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
|
||||
quote: proto_opts.quote[0],
|
||||
escape: proto_opts.escape.first().copied(),
|
||||
double_quote: proto_opts.has_header.first().map(|h| *h != 0),
|
||||
newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
|
||||
compression: proto_opts.compression().into(),
|
||||
schema_infer_max_rec: proto_opts.schema_infer_max_rec as usize,
|
||||
date_format: (!proto_opts.date_format.is_empty())
|
||||
|
||||
@@ -1884,6 +1884,9 @@ impl serde::Serialize for CsvOptions {
|
||||
if !self.double_quote.is_empty() {
|
||||
len += 1;
|
||||
}
|
||||
if !self.newlines_in_values.is_empty() {
|
||||
len += 1;
|
||||
}
|
||||
let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?;
|
||||
if !self.has_header.is_empty() {
|
||||
#[allow(clippy::needless_borrow)]
|
||||
@@ -1936,6 +1939,10 @@ impl serde::Serialize for CsvOptions {
|
||||
#[allow(clippy::needless_borrow)]
|
||||
struct_ser.serialize_field("doubleQuote", pbjson::private::base64::encode(&self.double_quote).as_str())?;
|
||||
}
|
||||
if !self.newlines_in_values.is_empty() {
|
||||
#[allow(clippy::needless_borrow)]
|
||||
struct_ser.serialize_field("newlinesInValues", pbjson::private::base64::encode(&self.newlines_in_values).as_str())?;
|
||||
}
|
||||
struct_ser.end()
|
||||
}
|
||||
}
|
||||
@@ -1969,6 +1976,8 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
|
||||
"comment",
|
||||
"double_quote",
|
||||
"doubleQuote",
|
||||
"newlines_in_values",
|
||||
"newlinesInValues",
|
||||
];
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
@@ -1987,6 +1996,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
|
||||
NullValue,
|
||||
Comment,
|
||||
DoubleQuote,
|
||||
NewlinesInValues,
|
||||
}
|
||||
impl<'de> serde::Deserialize<'de> for GeneratedField {
|
||||
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
|
||||
@@ -2022,6 +2032,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
|
||||
"nullValue" | "null_value" => Ok(GeneratedField::NullValue),
|
||||
"comment" => Ok(GeneratedField::Comment),
|
||||
"doubleQuote" | "double_quote" => Ok(GeneratedField::DoubleQuote),
|
||||
"newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues),
|
||||
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
|
||||
}
|
||||
}
|
||||
@@ -2055,6 +2066,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
|
||||
let mut null_value__ = None;
|
||||
let mut comment__ = None;
|
||||
let mut double_quote__ = None;
|
||||
let mut newlines_in_values__ = None;
|
||||
while let Some(k) = map_.next_key()? {
|
||||
match k {
|
||||
GeneratedField::HasHeader => {
|
||||
@@ -2155,6 +2167,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
|
||||
Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
|
||||
;
|
||||
}
|
||||
GeneratedField::NewlinesInValues => {
|
||||
if newlines_in_values__.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("newlinesInValues"));
|
||||
}
|
||||
newlines_in_values__ =
|
||||
Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
|
||||
;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(CsvOptions {
|
||||
@@ -2172,6 +2192,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
|
||||
null_value: null_value__.unwrap_or_default(),
|
||||
comment: comment__.unwrap_or_default(),
|
||||
double_quote: double_quote__.unwrap_or_default(),
|
||||
newlines_in_values: newlines_in_values__.unwrap_or_default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -633,6 +633,9 @@ pub struct CsvOptions {
|
||||
/// Indicates if quotes are doubled
|
||||
#[prost(bytes = "vec", tag = "14")]
|
||||
pub double_quote: ::prost::alloc::vec::Vec<u8>,
|
||||
/// Indicates if newlines are supported in values
|
||||
#[prost(bytes = "vec", tag = "15")]
|
||||
pub newlines_in_values: ::prost::alloc::vec::Vec<u8>,
|
||||
}
|
||||
/// Options controlling CSV format
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
|
||||
@@ -900,6 +900,9 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
|
||||
quote: vec![opts.quote],
|
||||
escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
|
||||
double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]),
|
||||
newlines_in_values: opts
|
||||
.newlines_in_values
|
||||
.map_or_else(Vec::new, |h| vec![h as u8]),
|
||||
compression: compression.into(),
|
||||
schema_infer_max_rec: opts.schema_infer_max_rec as u64,
|
||||
date_format: opts.date_format.clone().unwrap_or_default(),
|
||||
|
||||
@@ -1007,6 +1007,7 @@ message CsvScanExecNode {
|
||||
oneof optional_comment {
|
||||
string comment = 6;
|
||||
}
|
||||
bool newlines_in_values = 7;
|
||||
}
|
||||
|
||||
message AvroScanExecNode {
|
||||
|
||||
@@ -633,6 +633,9 @@ pub struct CsvOptions {
|
||||
/// Indicates if quotes are doubled
|
||||
#[prost(bytes = "vec", tag = "14")]
|
||||
pub double_quote: ::prost::alloc::vec::Vec<u8>,
|
||||
/// Indicates if newlines are supported in values
|
||||
#[prost(bytes = "vec", tag = "15")]
|
||||
pub newlines_in_values: ::prost::alloc::vec::Vec<u8>,
|
||||
}
|
||||
/// Options controlling CSV format
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
|
||||
Generated
+18
@@ -3605,6 +3605,9 @@ impl serde::Serialize for CsvScanExecNode {
|
||||
if !self.quote.is_empty() {
|
||||
len += 1;
|
||||
}
|
||||
if self.newlines_in_values {
|
||||
len += 1;
|
||||
}
|
||||
if self.optional_escape.is_some() {
|
||||
len += 1;
|
||||
}
|
||||
@@ -3624,6 +3627,9 @@ impl serde::Serialize for CsvScanExecNode {
|
||||
if !self.quote.is_empty() {
|
||||
struct_ser.serialize_field("quote", &self.quote)?;
|
||||
}
|
||||
if self.newlines_in_values {
|
||||
struct_ser.serialize_field("newlinesInValues", &self.newlines_in_values)?;
|
||||
}
|
||||
if let Some(v) = self.optional_escape.as_ref() {
|
||||
match v {
|
||||
csv_scan_exec_node::OptionalEscape::Escape(v) => {
|
||||
@@ -3654,6 +3660,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
|
||||
"hasHeader",
|
||||
"delimiter",
|
||||
"quote",
|
||||
"newlines_in_values",
|
||||
"newlinesInValues",
|
||||
"escape",
|
||||
"comment",
|
||||
];
|
||||
@@ -3664,6 +3672,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
|
||||
HasHeader,
|
||||
Delimiter,
|
||||
Quote,
|
||||
NewlinesInValues,
|
||||
Escape,
|
||||
Comment,
|
||||
}
|
||||
@@ -3691,6 +3700,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
|
||||
"hasHeader" | "has_header" => Ok(GeneratedField::HasHeader),
|
||||
"delimiter" => Ok(GeneratedField::Delimiter),
|
||||
"quote" => Ok(GeneratedField::Quote),
|
||||
"newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues),
|
||||
"escape" => Ok(GeneratedField::Escape),
|
||||
"comment" => Ok(GeneratedField::Comment),
|
||||
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
|
||||
@@ -3716,6 +3726,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
|
||||
let mut has_header__ = None;
|
||||
let mut delimiter__ = None;
|
||||
let mut quote__ = None;
|
||||
let mut newlines_in_values__ = None;
|
||||
let mut optional_escape__ = None;
|
||||
let mut optional_comment__ = None;
|
||||
while let Some(k) = map_.next_key()? {
|
||||
@@ -3744,6 +3755,12 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
|
||||
}
|
||||
quote__ = Some(map_.next_value()?);
|
||||
}
|
||||
GeneratedField::NewlinesInValues => {
|
||||
if newlines_in_values__.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("newlinesInValues"));
|
||||
}
|
||||
newlines_in_values__ = Some(map_.next_value()?);
|
||||
}
|
||||
GeneratedField::Escape => {
|
||||
if optional_escape__.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("escape"));
|
||||
@@ -3763,6 +3780,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
|
||||
has_header: has_header__.unwrap_or_default(),
|
||||
delimiter: delimiter__.unwrap_or_default(),
|
||||
quote: quote__.unwrap_or_default(),
|
||||
newlines_in_values: newlines_in_values__.unwrap_or_default(),
|
||||
optional_escape: optional_escape__,
|
||||
optional_comment: optional_comment__,
|
||||
})
|
||||
|
||||
Generated
+2
@@ -1542,6 +1542,8 @@ pub struct CsvScanExecNode {
|
||||
pub delimiter: ::prost::alloc::string::String,
|
||||
#[prost(string, tag = "4")]
|
||||
pub quote: ::prost::alloc::string::String,
|
||||
#[prost(bool, tag = "7")]
|
||||
pub newlines_in_values: bool,
|
||||
#[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")]
|
||||
pub optional_escape: ::core::option::Option<csv_scan_exec_node::OptionalEscape>,
|
||||
#[prost(oneof = "csv_scan_exec_node::OptionalComment", tags = "6")]
|
||||
|
||||
@@ -211,6 +211,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
|
||||
} else {
|
||||
None
|
||||
},
|
||||
scan.newlines_in_values,
|
||||
FileCompressionType::UNCOMPRESSED,
|
||||
))),
|
||||
#[cfg(feature = "parquet")]
|
||||
@@ -1579,6 +1580,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
|
||||
} else {
|
||||
None
|
||||
},
|
||||
newlines_in_values: exec.newlines_in_values(),
|
||||
},
|
||||
)),
|
||||
});
|
||||
|
||||
@@ -293,3 +293,45 @@ id0 "value0"
|
||||
id1 "value1"
|
||||
id2 "value2"
|
||||
id3 "value3"
|
||||
|
||||
# Handling of newlines in values
|
||||
|
||||
statement ok
|
||||
SET datafusion.optimizer.repartition_file_min_size = 1;
|
||||
|
||||
statement ok
|
||||
CREATE EXTERNAL TABLE stored_table_with_newlines_in_values_unsafe (
|
||||
col1 TEXT,
|
||||
col2 TEXT
|
||||
) STORED AS CSV
|
||||
LOCATION '../core/tests/data/newlines_in_values.csv';
|
||||
|
||||
statement error incorrect number of fields
|
||||
select * from stored_table_with_newlines_in_values_unsafe;
|
||||
|
||||
statement ok
|
||||
CREATE EXTERNAL TABLE stored_table_with_newlines_in_values_safe (
|
||||
col1 TEXT,
|
||||
col2 TEXT
|
||||
) STORED AS CSV
|
||||
LOCATION '../core/tests/data/newlines_in_values.csv'
|
||||
OPTIONS ('format.newlines_in_values' 'true');
|
||||
|
||||
query TT
|
||||
select * from stored_table_with_newlines_in_values_safe;
|
||||
----
|
||||
id message
|
||||
1
|
||||
01)hello
|
||||
02)world
|
||||
2
|
||||
01)something
|
||||
02)else
|
||||
3
|
||||
01)
|
||||
02)many
|
||||
03)lines
|
||||
04)make
|
||||
05)good test
|
||||
4 unquoted
|
||||
value end
|
||||
|
||||
@@ -168,6 +168,7 @@ datafusion.catalog.format NULL
|
||||
datafusion.catalog.has_header false
|
||||
datafusion.catalog.information_schema true
|
||||
datafusion.catalog.location NULL
|
||||
datafusion.catalog.newlines_in_values false
|
||||
datafusion.execution.aggregate.scalar_update_factor 10
|
||||
datafusion.execution.batch_size 8192
|
||||
datafusion.execution.coalesce_batches true
|
||||
@@ -252,6 +253,7 @@ datafusion.catalog.format NULL Type of `TableProvider` to use when loading `defa
|
||||
datafusion.catalog.has_header false Default value for `format.has_header` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement.
|
||||
datafusion.catalog.information_schema true Should DataFusion provide access to `information_schema` virtual tables for displaying schema information
|
||||
datafusion.catalog.location NULL Location scanned to load tables for `default` schema
|
||||
datafusion.catalog.newlines_in_values false Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance.
|
||||
datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected.
|
||||
datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption
|
||||
datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting
|
||||
|
||||
@@ -44,6 +44,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
|
||||
| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema |
|
||||
| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema |
|
||||
| datafusion.catalog.has_header | false | Default value for `format.has_header` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. |
|
||||
| datafusion.catalog.newlines_in_values | false | Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance. |
|
||||
| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption |
|
||||
| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting |
|
||||
| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files |
|
||||
|
||||
Reference in New Issue
Block a user