From cf8f72658d443c73c5c7784dbc3d95a865ac5ce4 Mon Sep 17 00:00:00 2001 From: Daniel Hegberg Date: Sat, 23 Nov 2024 19:11:03 -0800 Subject: [PATCH 1/3] Add csv loading benchmarks. --- .gitignore | 3 + datafusion/core/Cargo.toml | 4 ++ datafusion/core/benches/csv_load.rs | 91 ++++++++++++++++++++++++++++ datafusion/core/src/test_util/csv.rs | 69 +++++++++++++++++++++ datafusion/core/src/test_util/mod.rs | 2 + test-utils/src/data_gen.rs | 32 +++++++--- 6 files changed, 194 insertions(+), 7 deletions(-) create mode 100644 datafusion/core/benches/csv_load.rs create mode 100644 datafusion/core/src/test_util/csv.rs diff --git a/.gitignore b/.gitignore index 8195760513f7c..1fa79249ff8e0 100644 --- a/.gitignore +++ b/.gitignore @@ -67,6 +67,9 @@ datafusion/sqllogictest/test_files/scratch* # temp file for core datafusion/core/*.parquet +# Generated core benchmark data +datafusion/core/benches/data/* + # rat filtered_rat.txt rat.txt diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 6c5a31e3624af..f3ec4f73f5cf2 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -159,6 +159,10 @@ nix = { version = "0.29.0", features = ["fs"] } harness = false name = "aggregate_query_sql" +[[bench]] +harness = false +name = "csv_load" + [[bench]] harness = false name = "distinct_query_sql" diff --git a/datafusion/core/benches/csv_load.rs b/datafusion/core/benches/csv_load.rs new file mode 100644 index 0000000000000..1c2414ebe7a2e --- /dev/null +++ b/datafusion/core/benches/csv_load.rs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; +extern crate arrow; +extern crate datafusion; + +mod data_utils; +use crate::criterion::Criterion; +use datafusion::error::Result; +use datafusion::execution::context::SessionContext; +use datafusion::prelude::CsvReadOptions; +use datafusion::test_util::csv::TestCsvFile; +use parking_lot::Mutex; +use test_utils::AccessLogGenerator; +use std::sync::Arc; +use std::time::Duration; +use tokio::runtime::Runtime; + +fn load_csv(ctx: Arc>, path: &str, options: CsvReadOptions) { + let rt = Runtime::new().unwrap(); + let df = rt.block_on( + ctx.lock() + .read_csv( + path, + options, + ) + ).unwrap(); + criterion::black_box(rt.block_on(df.collect()).unwrap()); +} + +fn create_context() -> Result>> { + let ctx = SessionContext::new(); + Ok(Arc::new(Mutex::new(ctx))) +} + +fn generate_test_file() -> TestCsvFile { + let write_location = std::env::current_dir() + .unwrap() + .join("benches") + .join("data"); + + // Make sure the write directory exists. + std::fs::create_dir_all(&write_location).unwrap(); + let file_path = write_location.join("logs.csv"); + + let generator = AccessLogGenerator::new().with_include_nulls(true); + let num_batches = 2; + let test_file = TestCsvFile::try_new( + file_path.clone(), + generator.take(num_batches as usize) + ).expect("Failed to create the test file."); + + test_file +} + +fn criterion_benchmark(c: &mut Criterion) { + let ctx = create_context().unwrap(); + let test_file = generate_test_file(); + + let mut group = c.benchmark_group("load csv testing"); + group.measurement_time(Duration::from_secs(20)); + + group.bench_function("default csv read options", |b| { + b.iter(|| { + load_csv( + ctx.clone(), + test_file.path().to_str().unwrap(), + CsvReadOptions::default(), + ) + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/core/src/test_util/csv.rs b/datafusion/core/src/test_util/csv.rs new file mode 100644 index 0000000000000..94c7efb954022 --- /dev/null +++ b/datafusion/core/src/test_util/csv.rs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Helpers for writing csv files and reading them back + +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; + +use crate::arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use crate::error::Result; + +use arrow::csv::WriterBuilder; + +/// a CSV file that has been created for testing. +pub struct TestCsvFile { + path: PathBuf, + schema: SchemaRef, +} + +impl TestCsvFile { + /// Creates a new csv file at the specified location + pub fn try_new( + path: PathBuf, + batches: impl IntoIterator, + ) -> Result { + let file = File::create(&path).unwrap(); + let builder = WriterBuilder::new().with_header(true); + let mut writer = builder.build(file); + + let mut batches = batches.into_iter(); + let first_batch = batches.next().expect("need at least one record batch"); + let schema = first_batch.schema(); + + let mut num_rows = 0; + for batch in batches { + writer.write(&batch)?; + num_rows += batch.num_rows(); + } + + println!("Generated test dataset with {num_rows} rows"); + + Ok(Self { path, schema }) + } + + /// The schema of this csv file + pub fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + /// The path to the csv file + pub fn path(&self) -> &std::path::Path { + self.path.as_path() + } +} diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index c4c84d667a068..09608887c0f1d 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -20,6 +20,8 @@ #[cfg(feature = "parquet")] pub mod parquet; +pub mod csv; + use std::any::Any; use std::collections::HashMap; use std::fs::File; diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs index 45ad51bb44d66..4227f2d9a737b 100644 --- a/test-utils/src/data_gen.rs +++ b/test-utils/src/data_gen.rs @@ -33,6 +33,7 @@ struct GeneratorOptions { pods_per_host: Range, containers_per_pod: Range, entries_per_container: Range, + include_nulls: bool, } impl Default for GeneratorOptions { @@ -42,6 +43,7 @@ impl Default for GeneratorOptions { pods_per_host: 1..15, containers_per_pod: 1..3, entries_per_container: 1024..8192, + include_nulls: false, } } } @@ -149,13 +151,23 @@ impl BatchBuilder { self.image.append(image).unwrap(); self.time.append_value(time); - self.client_addr.append_value(format!( - "{}.{}.{}.{}", - rng.gen::(), - rng.gen::(), - rng.gen::(), - rng.gen::() - )); + if self.options.include_nulls { + // Append a null value if the option is set + // Use both "NULL" as a string and a null value + if rng.gen_bool(0.5) { + self.client_addr.append_null(); + } else { + self.client_addr.append_value("NULL"); + } + } else { + self.client_addr.append_value(format!( + "{}.{}.{}.{}", + rng.gen::(), + rng.gen::(), + rng.gen::(), + rng.gen::() + )); + } self.request_duration.append_value(rng.gen()); self.request_user_agent .append_value(random_string(rng, 20..100)); @@ -317,6 +329,12 @@ impl AccessLogGenerator { self.options.entries_per_container = range; self } + + // Set the condition for null values in the generated data + pub fn with_include_nulls(mut self, include_nulls: bool) -> Self { + self.options.include_nulls = include_nulls; + self + } } impl Iterator for AccessLogGenerator { From c0499cf10a3b0c4ec5843a16da07ab0de5b49d64 Mon Sep 17 00:00:00 2001 From: Daniel Hegberg Date: Mon, 2 Dec 2024 21:43:57 -0800 Subject: [PATCH 2/3] Fix fmt. --- datafusion/core/benches/csv_load.rs | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/datafusion/core/benches/csv_load.rs b/datafusion/core/benches/csv_load.rs index 1c2414ebe7a2e..804369d751958 100644 --- a/datafusion/core/benches/csv_load.rs +++ b/datafusion/core/benches/csv_load.rs @@ -27,20 +27,14 @@ use datafusion::execution::context::SessionContext; use datafusion::prelude::CsvReadOptions; use datafusion::test_util::csv::TestCsvFile; use parking_lot::Mutex; -use test_utils::AccessLogGenerator; use std::sync::Arc; use std::time::Duration; +use test_utils::AccessLogGenerator; use tokio::runtime::Runtime; -fn load_csv(ctx: Arc>, path: &str, options: CsvReadOptions) { +fn load_csv(ctx: Arc>, path: &str, options: CsvReadOptions) { let rt = Runtime::new().unwrap(); - let df = rt.block_on( - ctx.lock() - .read_csv( - path, - options, - ) - ).unwrap(); + let df = rt.block_on(ctx.lock().read_csv(path, options)).unwrap(); criterion::black_box(rt.block_on(df.collect()).unwrap()); } @@ -61,10 +55,9 @@ fn generate_test_file() -> TestCsvFile { let generator = AccessLogGenerator::new().with_include_nulls(true); let num_batches = 2; - let test_file = TestCsvFile::try_new( - file_path.clone(), - generator.take(num_batches as usize) - ).expect("Failed to create the test file."); + let test_file = + TestCsvFile::try_new(file_path.clone(), generator.take(num_batches as usize)) + .expect("Failed to create the test file."); test_file } From 823ce0f8214150d51367c00aec1791146208c7d8 Mon Sep 17 00:00:00 2001 From: Daniel Hegberg Date: Mon, 2 Dec 2024 23:20:21 -0800 Subject: [PATCH 3/3] Fix clippy. --- datafusion/core/benches/csv_load.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/datafusion/core/benches/csv_load.rs b/datafusion/core/benches/csv_load.rs index 804369d751958..5f707b31a6a93 100644 --- a/datafusion/core/benches/csv_load.rs +++ b/datafusion/core/benches/csv_load.rs @@ -55,11 +55,8 @@ fn generate_test_file() -> TestCsvFile { let generator = AccessLogGenerator::new().with_include_nulls(true); let num_batches = 2; - let test_file = - TestCsvFile::try_new(file_path.clone(), generator.take(num_batches as usize)) - .expect("Failed to create the test file."); - - test_file + TestCsvFile::try_new(file_path.clone(), generator.take(num_batches as usize)) + .expect("Failed to create test file.") } fn criterion_benchmark(c: &mut Criterion) {