From ee9c3aece1f8540f7966bec764c56ccbde6a78de Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Fri, 3 Jan 2025 16:58:07 +0800 Subject: [PATCH 01/13] Add H2O.ai Database-like Ops benchmark to dfbench --- benchmarks/bench.sh | 118 +++++++++++++++++++ benchmarks/queries/h2o/groupby.sql | 10 ++ benchmarks/queries/h2o/join.sql | 5 + benchmarks/src/bin/dfbench.rs | 4 +- benchmarks/src/bin/h2o.rs | 135 --------------------- benchmarks/src/h2o.rs | 181 +++++++++++++++++++++++++++++ benchmarks/src/lib.rs | 1 + 7 files changed, 318 insertions(+), 136 deletions(-) create mode 100644 benchmarks/queries/h2o/groupby.sql create mode 100644 benchmarks/queries/h2o/join.sql delete mode 100644 benchmarks/src/bin/h2o.rs create mode 100644 benchmarks/src/h2o.rs diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index bc44e24dfe5e3..f4c0e8dc6147e 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -80,6 +80,9 @@ clickbench_1: ClickBench queries against a single parquet file clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) external_aggr: External aggregation benchmark +h2o_small: h2oai with small dataset (1e7 rows), file format is parquet +h2o_medium: h2oai with medium dataset (1e8 rows), file format is parquet +h2o_large: h2oai with large dataset (1e9 rows), file format is parquet ********** * Supported Configuration (Environment Variables) @@ -142,6 +145,9 @@ main() { all) data_tpch "1" data_tpch "10" + data_h2o "SMALL" + data_h2o "MEDIUM" + data_h2o "LARGE" data_clickbench_1 data_clickbench_partitioned data_imdb @@ -172,6 +178,18 @@ main() { imdb) data_imdb ;; + h2o_small) + data_h2o "SMALL" + ;; + h2o_medium) + data_h2o "MEDIUM" + ;; + h2o_large) + data_h2o "LARGE" + ;; + h2o_small_csv) + data_h2o "SMALL" "CSV" + ;; external_aggr) # same data as for tpch data_tpch "1" @@ -221,6 +239,9 @@ main() { run_clickbench_1 run_clickbench_partitioned run_clickbench_extended + run_h2o "SMALL" "PARQUET" "groupby" + run_h2o "MEDIUM" "PARQUET" "groupby" + run_h2o "LARGE" "PARQUET" "groupby" run_imdb run_external_aggr ;; @@ -254,6 +275,18 @@ main() { imdb) run_imdb ;; + h2o_small) + run_h2o "SMALL" "PARQUET" "groupby" + ;; + h2o_medium) + run_h2o "MEDIUM" "PARQUET" "groupby" + ;; + h2o_large) + run_h2o "LARGE" "PARQUET" "groupby" + ;; + h2o_small_csv) + run_h2o "SMALL" "CSV" "groupby" + ;; external_aggr) run_external_aggr ;; @@ -541,6 +574,91 @@ run_imdb() { $CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" } +data_h2o() { + # Default values for size and data format + SIZE=${1:-"SMALL"} + DATA_FORMAT=${2:-"PARQUET"} + + # Ensure the Python version is 3.10 or higher + REQUIRED_PYTHON="python3.10" + if ! command -v $REQUIRED_PYTHON &> /dev/null + then + echo "$REQUIRED_PYTHON could not be found. Please install Python 3.10 or higher." + return 1 + fi + + # Install falsa and other dependencies + echo "Installing falsa..." + + # Set virtual environment directory + VIRTUAL_ENV="${PWD}/.venv" + + # Check if the virtual environment already exists + if [ ! -d "$VIRTUAL_ENV" ]; then + # Create a virtual environment using Python 3.10 + $REQUIRED_PYTHON -m venv "$VIRTUAL_ENV" + fi + + # Activate the virtual environment and install dependencies + source "$VIRTUAL_ENV/bin/activate" + + # Ensure 'falsa' is installed (avoid unnecessary reinstall) + pip install --quiet --upgrade falsa + + # Create directory if it doesn't exist + H2O_DIR="${DATA_DIR}/h2o" + mkdir -p "${H2O_DIR}" + + # Generate h2o test data + echo "Generating h2o test data in ${H2O_DIR} with size=${SIZE} and format=${DATA_FORMAT}" + falsa groupby --path-prefix="${H2O_DIR}" --size "${SIZE}" --data-format "${DATA_FORMAT}" + + # Deactivate virtual environment after completion + deactivate + } + +run_h2o() { + # Default values for size and data format + SIZE=${1:-"SMALL"} + DATA_FORMAT=${2:-"PARQUET"} + DATA_FORMAT=$(echo "$DATA_FORMAT" | tr '[:upper:]' '[:lower:]') + RUN_Type=${3:-"groupby"} + + # Data directory and results file path + H2O_DIR="${DATA_DIR}/h2o" + RESULTS_FILE="${RESULTS_DIR}/h2o.json" + + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running h2o benchmark..." + + # Set the file name based on the size + case "$SIZE" in + "SMALL") + FILE_NAME="G1_1e7_1e7_100_0.${DATA_FORMAT}" # For small dataset + ;; + "MEDIUM") + FILE_NAME="G1_1e8_1e8_100_0.${DATA_FORMAT}" # For medium dataset + ;; + "BIG") + FILE_NAME="G1_1e9_1e9_100_0.${DATA_FORMAT}" # For big dataset + ;; + *) + echo "Invalid size. Valid options are SMALL, MEDIUM, or BIG." + return 1 + ;; + esac + + # Set the query file name based on the RUN_Type + QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql" + + # Run the benchmark using the dynamically constructed file path and query file + $CARGO_COMMAND --bin dfbench -- h2o \ + --iterations 5 \ + --path "${H2O_DIR}/${FILE_NAME}" \ + --queries-path "${QUERY_FILE}" \ + -o "${RESULTS_FILE}" +} + # Runs the external aggregation benchmark run_external_aggr() { # Use TPC-H SF1 dataset diff --git a/benchmarks/queries/h2o/groupby.sql b/benchmarks/queries/h2o/groupby.sql new file mode 100644 index 0000000000000..c2101ef8ada2d --- /dev/null +++ b/benchmarks/queries/h2o/groupby.sql @@ -0,0 +1,10 @@ +SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1; +SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2; +SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3; +SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4; +SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6; +SELECT id4, id5, MEDIAN(v3) AS median_v3, STDDEV(v3) AS sd_v3 FROM x GROUP BY id4, id5; +SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3; +SELECT id6, largest2_v3 FROM (SELECT id6, v3 AS largest2_v3, ROW_NUMBER() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS order_v3 FROM x WHERE v3 IS NOT NULL) sub_query WHERE order_v3 <= 2; +SELECT id2, id4, POWER(CORR(v1, v2), 2) AS r2 FROM x GROUP BY id2, id4; +SELECT id1, id2, id3, id4, id5, id6, SUM(v3) AS v3, COUNT(*) AS count FROM x GROUP BY id1, id2, id3, id4, id5, id6; diff --git a/benchmarks/queries/h2o/join.sql b/benchmarks/queries/h2o/join.sql new file mode 100644 index 0000000000000..8546b9292dbb4 --- /dev/null +++ b/benchmarks/queries/h2o/join.sql @@ -0,0 +1,5 @@ +SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1; +SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x INNER JOIN medium ON x.id2 = medium.id2; +SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id2 = medium.id2; +SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x JOIN medium ON x.id5 = medium.id5; +SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x JOIN large ON x.id3 = large.id3; diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index 81aa5437dd5f5..14e0636a18c06 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -33,7 +33,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, sort_tpch, tpch}; +use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, sort_tpch, tpch, h2o}; #[derive(Debug, StructOpt)] #[structopt(about = "benchmark command")] @@ -45,6 +45,7 @@ enum Options { Sort(sort::RunOpt), SortTpch(sort_tpch::RunOpt), Imdb(imdb::RunOpt), + H2o(h2o::RunOpt), } // Main benchmark runner entrypoint @@ -60,5 +61,6 @@ pub async fn main() -> Result<()> { Options::Sort(opt) => opt.run().await, Options::SortTpch(opt) => opt.run().await, Options::Imdb(opt) => opt.run().await, + Options::H2o(opt) => opt.run().await, } } diff --git a/benchmarks/src/bin/h2o.rs b/benchmarks/src/bin/h2o.rs deleted file mode 100644 index 328db3d85b13c..0000000000000 --- a/benchmarks/src/bin/h2o.rs +++ /dev/null @@ -1,135 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! DataFusion h2o benchmarks - -use datafusion::arrow::datatypes::{DataType, Field, Schema}; -use datafusion::config::ConfigOptions; -use datafusion::datasource::file_format::csv::CsvFormat; -use datafusion::datasource::listing::{ - ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, -}; -use datafusion::datasource::MemTable; -use datafusion::prelude::CsvReadOptions; -use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext}; -use datafusion_benchmarks::util::BenchmarkRun; -use datafusion_common::utils::get_available_parallelism; -use std::path::PathBuf; -use std::sync::Arc; -use structopt::StructOpt; -use tokio::time::Instant; - -#[derive(Debug, StructOpt)] -#[structopt(name = "datafusion-h2o", about = "DataFusion h2o benchmarks")] -enum Opt { - GroupBy(GroupBy), //TODO add Join queries -} - -#[derive(Debug, StructOpt)] -struct GroupBy { - /// Query number - #[structopt(short, long)] - query: usize, - /// Path to data file - #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] - path: PathBuf, - /// Activate debug mode to see query results - #[structopt(short, long)] - debug: bool, - /// Load the data into a MemTable before executing the query - #[structopt(short = "m", long = "mem-table")] - mem_table: bool, - /// Path to machine readable output file - #[structopt(parse(from_os_str), short = "o", long = "output")] - output_path: Option, -} - -#[tokio::main] -async fn main() -> Result<()> { - let opt = Opt::from_args(); - println!("Running benchmarks with the following options: {opt:?}"); - match opt { - Opt::GroupBy(config) => group_by(&config).await, - } -} - -async fn group_by(opt: &GroupBy) -> Result<()> { - let mut rundata = BenchmarkRun::new(); - let path = opt.path.to_str().unwrap(); - let mut config = ConfigOptions::from_env()?; - config.execution.batch_size = 65535; - - let ctx = SessionContext::new_with_config(config.into()); - - let schema = Schema::new(vec![ - Field::new("id1", DataType::Utf8, false), - Field::new("id2", DataType::Utf8, false), - Field::new("id3", DataType::Utf8, false), - Field::new("id4", DataType::Int32, false), - Field::new("id5", DataType::Int32, false), - Field::new("id6", DataType::Int32, false), - Field::new("v1", DataType::Int32, false), - Field::new("v2", DataType::Int32, false), - Field::new("v3", DataType::Float64, false), - ]); - - if opt.mem_table { - let listing_config = ListingTableConfig::new(ListingTableUrl::parse(path)?) - .with_listing_options(ListingOptions::new(Arc::new(CsvFormat::default()))) - .with_schema(Arc::new(schema)); - let csv = ListingTable::try_new(listing_config)?; - let partition_size = get_available_parallelism(); - let memtable = - MemTable::load(Arc::new(csv), Some(partition_size), &ctx.state()).await?; - ctx.register_table("x", Arc::new(memtable))?; - } else { - ctx.register_csv("x", path, CsvReadOptions::default().schema(&schema)) - .await?; - } - rundata.start_new_case(&opt.query.to_string()); - let sql = match opt.query { - 1 => "select id1, sum(v1) as v1 from x group by id1", - 2 => "select id1, id2, sum(v1) as v1 from x group by id1, id2", - 3 => "select id3, sum(v1) as v1, mean(v3) as v3 from x group by id3", - 4 => "select id4, mean(v1) as v1, mean(v2) as v2, mean(v3) as v3 from x group by id4", - 5 => "select id6, sum(v1) as v1, sum(v2) as v2, sum(v3) as v3 from x group by id6", - 6 => "select id4, id5, median(v3) as median_v3, stddev(v3) as sd_v3 from x group by id4, id5", - 7 => "select id3, max(v1)-min(v2) as range_v1_v2 from x group by id3", - 8 => "select id6, largest2_v3 from (select id6, v3 as largest2_v3, row_number() over (partition by id6 order by v3 desc) as order_v3 from x where v3 is not null) sub_query where order_v3 <= 2", - 9 => "select id2, id4, pow(corr(v1, v2), 2) as r2 from x group by id2, id4", - 10 => "select id1, id2, id3, id4, id5, id6, sum(v3) as v3, count(*) as count from x group by id1, id2, id3, id4, id5, id6", - _ => unimplemented!(), - }; - - println!("Executing {sql}"); - let start = Instant::now(); - let df = ctx.sql(sql).await?; - let batches = df.collect().await?; - let elapsed = start.elapsed(); - let numrows = batches.iter().map(|b| b.num_rows()).sum::(); - if opt.debug { - pretty::print_batches(&batches)?; - } - rundata.write_iter(elapsed, numrows); - println!( - "h2o groupby query {} took {} ms", - opt.query, - elapsed.as_secs_f64() * 1000.0 - ); - rundata.maybe_write_json(opt.output_path.as_ref())?; - Ok(()) -} diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs new file mode 100644 index 0000000000000..4bddf5ea66895 --- /dev/null +++ b/benchmarks/src/h2o.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::path::{Path, PathBuf}; +use crate::util::{BenchmarkRun, CommonOpt}; +use datafusion::{error::Result, prelude::SessionContext}; +use datafusion_common::{exec_datafusion_err, instant::Instant, DataFusionError}; +use structopt::StructOpt; + + +/// Run the H2O benchmark +#[derive(Debug, StructOpt, Clone)] +#[structopt(verbatim_doc_comment)] +pub struct RunOpt { + /// Query number (between 1 and 10). If not specified, runs all queries + #[structopt(short, long)] + query: Option, + + /// Common options + #[structopt(flatten)] + common: CommonOpt, + + /// Path to queries.sql (single file) + #[structopt( + parse(from_os_str), + short = "r", + long = "queries-path", + default_value = "benchmarks/queries/h2o/groupby.sql" + )] + queries_path: PathBuf, + + /// Path to group by parquet data + #[structopt( + parse(from_os_str), + short = "p", + long = "path", + default_value = "benchmarks/data/G1_1e7_1e7_100_0.parquet" + )] + path: PathBuf, + + /// If present, write results json here + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option, +} + +impl RunOpt { + pub async fn run(self) -> Result<()> { + println!("Running benchmarks with the following options: {self:?}"); + let queries = AllQueries::try_new(&self.queries_path)?; + let query_range = match self.query { + Some(query_id) => query_id..=query_id, + None => queries.min_query_id()..=queries.max_query_id(), + }; + + let config = self.common.config(); + let ctx = SessionContext::new_with_config(config); + + // Register data + self.register_data(&ctx).await?; + + let iterations = self.common.iterations; + let mut benchmark_run = BenchmarkRun::new(); + for query_id in query_range { + benchmark_run.start_new_case(&format!("Query {query_id}")); + let sql = queries.get_query(query_id)?; + println!("Q{query_id}: {sql}"); + + for i in 0..iterations { + let start = Instant::now(); + let results = ctx.sql(sql).await?.collect().await?; + let elapsed = start.elapsed(); + let ms = elapsed.as_secs_f64() * 1000.0; + let row_count: usize = results.iter().map(|b| b.num_rows()).sum(); + println!( + "Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows" + ); + benchmark_run.write_iter(elapsed, row_count); + } + if self.common.debug { + ctx.sql(sql).await?.explain(false, false)?.show().await?; + } + benchmark_run.maybe_write_json(self.output_path.as_ref())?; + } + + Ok(()) + } + + async fn register_data(&self, ctx: &SessionContext) -> Result<()> { + let csv_options = Default::default(); + let parquet_options = Default::default(); + let path = self.path.as_os_str().to_str().unwrap(); + + if self.path.extension().map(|s| s == "csv").unwrap_or(false) { + ctx.register_csv("x", path, csv_options).await.map_err(|e| { + DataFusionError::Context( + format!("Registering 'table' as {path}"), + Box::new(e), + ) + }).expect("error registering csv"); + } + + if self.path.extension().map(|s| s == "parquet").unwrap_or(false) { + ctx.register_parquet("x", path, parquet_options).await.map_err(|e| { + DataFusionError::Context( + format!("Registering 'table' as {path}"), + Box::new(e), + ) + }).expect("error registering parquet"); + } + Ok(()) + } +} + +struct AllQueries { + queries: Vec, +} + +impl AllQueries { + fn try_new(path: &Path) -> Result { + let all_queries = std::fs::read_to_string(path) + .map_err(|e| exec_datafusion_err!("Could not open {path:?}: {e}"))?; + + Ok(Self { + queries: all_queries.lines().map(|s| s.to_string()).collect(), + }) + } + + fn get_query(&self, query_id: usize) -> Result<&str> { + self.queries + .get(query_id + 1) + .ok_or_else(|| { + let min_id = self.min_query_id(); + let max_id = self.max_query_id(); + exec_datafusion_err!( + "Invalid query id {query_id}. Must be between {min_id} and {max_id}" + ) + }) + .map(|s| s.as_str()) + } + + fn min_query_id(&self) -> usize { + 1 + } + + fn max_query_id(&self) -> usize { + self.queries.len() + } +} + +#[derive(Debug, StructOpt)] +struct GroupBy { + /// Query number + #[structopt(short, long)] + query: usize, + /// Path to data file + #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] + path: PathBuf, + /// Activate debug mode to see query results + #[structopt(short, long)] + debug: bool, + /// Load the data into a MemTable before executing the query + #[structopt(short = "m", long = "mem-table")] + mem_table: bool, + /// Path to machine readable output file + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option, +} \ No newline at end of file diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index 2d37d78764d78..b59e7f4404db6 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -23,3 +23,4 @@ pub mod sort; pub mod sort_tpch; pub mod tpch; pub mod util; +pub mod h2o; From 1ca57cdfbe3bab85eac61694ba2c0af05d74acc8 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Fri, 3 Jan 2025 17:21:21 +0800 Subject: [PATCH 02/13] Fix query and fmt --- benchmarks/src/bin/dfbench.rs | 4 ++- benchmarks/src/h2o.rs | 49 +++++++++++++++++++++-------------- benchmarks/src/lib.rs | 2 +- 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index 14e0636a18c06..db6c29f4a46a6 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -33,7 +33,9 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, sort_tpch, tpch, h2o}; +use datafusion_benchmarks::{ + clickbench, h2o, imdb, parquet_filter, sort, sort_tpch, tpch, +}; #[derive(Debug, StructOpt)] #[structopt(about = "benchmark command")] diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index 4bddf5ea66895..dd1f599c5b22d 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -15,13 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::path::{Path, PathBuf}; use crate::util::{BenchmarkRun, CommonOpt}; use datafusion::{error::Result, prelude::SessionContext}; use datafusion_common::{exec_datafusion_err, instant::Instant, DataFusionError}; +use std::path::{Path, PathBuf}; use structopt::StructOpt; - /// Run the H2O benchmark #[derive(Debug, StructOpt, Clone)] #[structopt(verbatim_doc_comment)] @@ -105,21 +104,32 @@ impl RunOpt { let path = self.path.as_os_str().to_str().unwrap(); if self.path.extension().map(|s| s == "csv").unwrap_or(false) { - ctx.register_csv("x", path, csv_options).await.map_err(|e| { - DataFusionError::Context( - format!("Registering 'table' as {path}"), - Box::new(e), - ) - }).expect("error registering csv"); + ctx.register_csv("x", path, csv_options) + .await + .map_err(|e| { + DataFusionError::Context( + format!("Registering 'table' as {path}"), + Box::new(e), + ) + }) + .expect("error registering csv"); } - if self.path.extension().map(|s| s == "parquet").unwrap_or(false) { - ctx.register_parquet("x", path, parquet_options).await.map_err(|e| { - DataFusionError::Context( - format!("Registering 'table' as {path}"), - Box::new(e), - ) - }).expect("error registering parquet"); + if self + .path + .extension() + .map(|s| s == "parquet") + .unwrap_or(false) + { + ctx.register_parquet("x", path, parquet_options) + .await + .map_err(|e| { + DataFusionError::Context( + format!("Registering 'table' as {path}"), + Box::new(e), + ) + }) + .expect("error registering parquet"); } Ok(()) } @@ -139,9 +149,10 @@ impl AllQueries { }) } + /// Returns the text of query `query_id` fn get_query(&self, query_id: usize) -> Result<&str> { self.queries - .get(query_id + 1) + .get(query_id) .ok_or_else(|| { let min_id = self.min_query_id(); let max_id = self.max_query_id(); @@ -153,11 +164,11 @@ impl AllQueries { } fn min_query_id(&self) -> usize { - 1 + 0 } fn max_query_id(&self) -> usize { - self.queries.len() + self.queries.len() - 1 } } @@ -178,4 +189,4 @@ struct GroupBy { /// Path to machine readable output file #[structopt(parse(from_os_str), short = "o", long = "output")] output_path: Option, -} \ No newline at end of file +} diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index b59e7f4404db6..858a5b9df7f86 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -17,10 +17,10 @@ //! DataFusion benchmark runner pub mod clickbench; +pub mod h2o; pub mod imdb; pub mod parquet_filter; pub mod sort; pub mod sort_tpch; pub mod tpch; pub mod util; -pub mod h2o; From 785128132b936df35909bb419c397a3b8327c7aa Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Fri, 3 Jan 2025 17:48:20 +0800 Subject: [PATCH 03/13] Change venv --- benchmarks/bench.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index f4c0e8dc6147e..881d540638f94 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -591,7 +591,7 @@ data_h2o() { echo "Installing falsa..." # Set virtual environment directory - VIRTUAL_ENV="${PWD}/.venv" + VIRTUAL_ENV="${PWD}/venv" # Check if the virtual environment already exists if [ ! -d "$VIRTUAL_ENV" ]; then From 713586ca3c61cdf9522f9dee0a34d18402d68a28 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Fri, 3 Jan 2025 17:50:45 +0800 Subject: [PATCH 04/13] Make sure venv version support falsa --- benchmarks/bench.sh | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 881d540638f94..65d396fadb12b 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -593,11 +593,8 @@ data_h2o() { # Set virtual environment directory VIRTUAL_ENV="${PWD}/venv" - # Check if the virtual environment already exists - if [ ! -d "$VIRTUAL_ENV" ]; then - # Create a virtual environment using Python 3.10 - $REQUIRED_PYTHON -m venv "$VIRTUAL_ENV" - fi + # Create a virtual environment using Python 3.10 + $REQUIRED_PYTHON -m venv "$VIRTUAL_ENV" # Activate the virtual environment and install dependencies source "$VIRTUAL_ENV/bin/activate" From 319641c7b067198d65f652b4c25e7c996145431f Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Fri, 3 Jan 2025 22:23:28 +0800 Subject: [PATCH 05/13] Fix default path --- benchmarks/src/h2o.rs | 25 +++---------------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index dd1f599c5b22d..e93aac99b316a 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -25,7 +25,6 @@ use structopt::StructOpt; #[derive(Debug, StructOpt, Clone)] #[structopt(verbatim_doc_comment)] pub struct RunOpt { - /// Query number (between 1 and 10). If not specified, runs all queries #[structopt(short, long)] query: Option, @@ -34,6 +33,7 @@ pub struct RunOpt { common: CommonOpt, /// Path to queries.sql (single file) + /// default value is the groupby.sql file in the h2o benchmark #[structopt( parse(from_os_str), short = "r", @@ -42,12 +42,12 @@ pub struct RunOpt { )] queries_path: PathBuf, - /// Path to group by parquet data + /// Path to data file (parquet or csv) #[structopt( parse(from_os_str), short = "p", long = "path", - default_value = "benchmarks/data/G1_1e7_1e7_100_0.parquet" + default_value = "benchmarks/data/h2o/G1_1e7_1e7_100_0.parquet" )] path: PathBuf, @@ -171,22 +171,3 @@ impl AllQueries { self.queries.len() - 1 } } - -#[derive(Debug, StructOpt)] -struct GroupBy { - /// Query number - #[structopt(short, long)] - query: usize, - /// Path to data file - #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] - path: PathBuf, - /// Activate debug mode to see query results - #[structopt(short, long)] - debug: bool, - /// Load the data into a MemTable before executing the query - #[structopt(short = "m", long = "mem-table")] - mem_table: bool, - /// Path to machine readable output file - #[structopt(parse(from_os_str), short = "o", long = "output")] - output_path: Option, -} From 34fd4d22808ee3c43e1d056ee926e62f91d7021b Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Sun, 5 Jan 2025 11:26:19 +0800 Subject: [PATCH 06/13] Support groupby only now --- benchmarks/bench.sh | 114 +++++++++++++++++++++++++----------------- benchmarks/src/h2o.rs | 8 +-- 2 files changed, 73 insertions(+), 49 deletions(-) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 65d396fadb12b..e0f71c5535b36 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -80,9 +80,9 @@ clickbench_1: ClickBench queries against a single parquet file clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) external_aggr: External aggregation benchmark -h2o_small: h2oai with small dataset (1e7 rows), file format is parquet -h2o_medium: h2oai with medium dataset (1e8 rows), file format is parquet -h2o_large: h2oai with large dataset (1e9 rows), file format is parquet +h2o_small: h2oai benchmark with small dataset (1e7 rows), default file format is parquet +h2o_medium: h2oai benchmark with medium dataset (1e8 rows), default file format is parquet +h2o_big: h2oai benchmark with large dataset (1e9 rows), default file format is parquet ********** * Supported Configuration (Environment Variables) @@ -184,11 +184,8 @@ main() { h2o_medium) data_h2o "MEDIUM" ;; - h2o_large) - data_h2o "LARGE" - ;; - h2o_small_csv) - data_h2o "SMALL" "CSV" + h2o_big) + data_h2o "BIG" ;; external_aggr) # same data as for tpch @@ -241,7 +238,7 @@ main() { run_clickbench_extended run_h2o "SMALL" "PARQUET" "groupby" run_h2o "MEDIUM" "PARQUET" "groupby" - run_h2o "LARGE" "PARQUET" "groupby" + run_h2o "BIG" "PARQUET" "groupby" run_imdb run_external_aggr ;; @@ -281,11 +278,8 @@ main() { h2o_medium) run_h2o "MEDIUM" "PARQUET" "groupby" ;; - h2o_large) - run_h2o "LARGE" "PARQUET" "groupby" - ;; - h2o_small_csv) - run_h2o "SMALL" "CSV" "groupby" + h2o_big) + run_h2o "BIG" "PARQUET" "groupby" ;; external_aggr) run_external_aggr @@ -575,45 +569,75 @@ run_imdb() { } data_h2o() { - # Default values for size and data format - SIZE=${1:-"SMALL"} - DATA_FORMAT=${2:-"PARQUET"} + # Default values for size and data format + SIZE=${1:-"SMALL"} + DATA_FORMAT=${2:-"PARQUET"} - # Ensure the Python version is 3.10 or higher - REQUIRED_PYTHON="python3.10" - if ! command -v $REQUIRED_PYTHON &> /dev/null - then - echo "$REQUIRED_PYTHON could not be found. Please install Python 3.10 or higher." - return 1 - fi + # Function to compare Python versions + version_ge() { + [ "$(printf '%s\n' "$1" "$2" | sort -V | head -n1)" = "$2" ] + } - # Install falsa and other dependencies - echo "Installing falsa..." + # Find the highest available Python version (3.10 or higher) + REQUIRED_VERSION="3.10" + PYTHON_CMD=$(command -v python3 || true) - # Set virtual environment directory - VIRTUAL_ENV="${PWD}/venv" + if [ -n "$PYTHON_CMD" ]; then + PYTHON_VERSION=$($PYTHON_CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')") + if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then + echo "Found Python version $PYTHON_VERSION, which is suitable." + else + echo "Python version $PYTHON_VERSION found, but version $REQUIRED_VERSION or higher is required." + PYTHON_CMD="" + fi + fi - # Create a virtual environment using Python 3.10 - $REQUIRED_PYTHON -m venv "$VIRTUAL_ENV" + # Fall back to checking specific Python versions if no suitable one found + if [ -z "$PYTHON_CMD" ]; then + for CMD in python3.10 python3.11 python3.12; do + if command -v "$CMD" &> /dev/null; then + PYTHON_CMD="$CMD" + break + fi + done + fi - # Activate the virtual environment and install dependencies - source "$VIRTUAL_ENV/bin/activate" + # If no suitable Python version found, exit with an error + if [ -z "$PYTHON_CMD" ]; then + echo "Python 3.10 or higher is required. Please install it." + return 1 + fi + + echo "Using Python command: $PYTHON_CMD" + + # Install falsa and other dependencies + echo "Installing falsa..." - # Ensure 'falsa' is installed (avoid unnecessary reinstall) - pip install --quiet --upgrade falsa + # Set virtual environment directory + VIRTUAL_ENV="${PWD}/venv" - # Create directory if it doesn't exist - H2O_DIR="${DATA_DIR}/h2o" - mkdir -p "${H2O_DIR}" + # Create a virtual environment using the detected Python command + $PYTHON_CMD -m venv "$VIRTUAL_ENV" - # Generate h2o test data - echo "Generating h2o test data in ${H2O_DIR} with size=${SIZE} and format=${DATA_FORMAT}" - falsa groupby --path-prefix="${H2O_DIR}" --size "${SIZE}" --data-format "${DATA_FORMAT}" + # Activate the virtual environment and install dependencies + source "$VIRTUAL_ENV/bin/activate" - # Deactivate virtual environment after completion - deactivate - } + # Ensure 'falsa' is installed (avoid unnecessary reinstall) + pip install --quiet --upgrade falsa + + # Create directory if it doesn't exist + H2O_DIR="${DATA_DIR}/h2o" + mkdir -p "${H2O_DIR}" + + # Generate h2o test data + echo "Generating h2o test data in ${H2O_DIR} with size=${SIZE} and format=${DATA_FORMAT}" + falsa groupby --path-prefix="${H2O_DIR}" --size "${SIZE}" --data-format "${DATA_FORMAT}" + + # Deactivate virtual environment after completion + deactivate +} +## todo now only support groupby, after https://github.com/mrpowers-io/falsa/issues/21 done, we can add support for join run_h2o() { # Default values for size and data format SIZE=${1:-"SMALL"} @@ -650,10 +674,10 @@ run_h2o() { # Run the benchmark using the dynamically constructed file path and query file $CARGO_COMMAND --bin dfbench -- h2o \ - --iterations 5 \ + --iterations 3 \ --path "${H2O_DIR}/${FILE_NAME}" \ --queries-path "${QUERY_FILE}" \ - -o "${RESULTS_FILE}" + -o "${RESULTS_FILE}"\ } # Runs the external aggregation benchmark diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index e93aac99b316a..1d41e05a8ccfc 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -78,7 +78,7 @@ impl RunOpt { let sql = queries.get_query(query_id)?; println!("Q{query_id}: {sql}"); - for i in 0..iterations { + for i in 1..=iterations { let start = Instant::now(); let results = ctx.sql(sql).await?.collect().await?; let elapsed = start.elapsed(); @@ -152,7 +152,7 @@ impl AllQueries { /// Returns the text of query `query_id` fn get_query(&self, query_id: usize) -> Result<&str> { self.queries - .get(query_id) + .get(query_id - 1) .ok_or_else(|| { let min_id = self.min_query_id(); let max_id = self.max_query_id(); @@ -164,10 +164,10 @@ impl AllQueries { } fn min_query_id(&self) -> usize { - 0 + 1 } fn max_query_id(&self) -> usize { - self.queries.len() - 1 + self.queries.len() } } From a3be15ea2a55dcaa4134690cba5fa29935795176 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Sun, 5 Jan 2025 11:42:53 +0800 Subject: [PATCH 07/13] fix --- benchmarks/bench.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index e0f71c5535b36..5f0f54124bfe3 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -677,7 +677,7 @@ run_h2o() { --iterations 3 \ --path "${H2O_DIR}/${FILE_NAME}" \ --queries-path "${QUERY_FILE}" \ - -o "${RESULTS_FILE}"\ + -o "${RESULTS_FILE}" } # Runs the external aggregation benchmark From 548893cbfc6ad9628a214ec10abc0e544abdd7b2 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Mon, 6 Jan 2025 11:15:39 +0800 Subject: [PATCH 08/13] Address comments --- benchmarks/bench.sh | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 5f0f54124bfe3..e5cdd26ce0d8a 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -83,6 +83,9 @@ external_aggr: External aggregation benchmark h2o_small: h2oai benchmark with small dataset (1e7 rows), default file format is parquet h2o_medium: h2oai benchmark with medium dataset (1e8 rows), default file format is parquet h2o_big: h2oai benchmark with large dataset (1e9 rows), default file format is parquet +h2o_small_csv h2oai benchmark with small dataset (1e7 rows), file format is csv +h2o_medium_csv h2oai benchmark with medium dataset (1e8 rows), file format is csv +h2o_big_csv h2oai benchmark with large dataset (1e9 rows), file format is csv ********** * Supported Configuration (Environment Variables) @@ -187,6 +190,15 @@ main() { h2o_big) data_h2o "BIG" ;; + h2o_small_csv) + data_h2o "SMALL" "CSV" + ;; + h2o_medium_csv) + data_h2o "MEDIUM" "CSV" + ;; + h2o_big_csv) + data_h2o "BIG" "CSV" + ;; external_aggr) # same data as for tpch data_tpch "1" @@ -281,6 +293,15 @@ main() { h2o_big) run_h2o "BIG" "PARQUET" "groupby" ;; + h2o_small_csv) + run_h2o "SMALL" "CSV" "groupby" + ;; + h2o_medium_csv) + run_h2o "MEDIUM" "CSV" "groupby" + ;; + h2o_big_csv) + run_h2o "BIG" "CSV" "groupby" + ;; external_aggr) run_external_aggr ;; From 5aab4a69022f8c770e42ef481c7c37c162de833f Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Mon, 6 Jan 2025 11:19:58 +0800 Subject: [PATCH 09/13] fix --- benchmarks/bench.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index e5cdd26ce0d8a..0c4e6235705ca 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -150,7 +150,7 @@ main() { data_tpch "10" data_h2o "SMALL" data_h2o "MEDIUM" - data_h2o "LARGE" + data_h2o "BIG" data_clickbench_1 data_clickbench_partitioned data_imdb From 9e737b8858a97a27ec5e031e4c2bafab1bd3ca4d Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Mon, 6 Jan 2025 11:32:16 +0800 Subject: [PATCH 10/13] support python version higher --- benchmarks/bench.sh | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 0c4e6235705ca..eb6caa91d4092 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -613,15 +613,20 @@ data_h2o() { fi fi - # Fall back to checking specific Python versions if no suitable one found - if [ -z "$PYTHON_CMD" ]; then - for CMD in python3.10 python3.11 python3.12; do - if command -v "$CMD" &> /dev/null; then - PYTHON_CMD="$CMD" - break - fi - done - fi + # Search for suitable Python versions if the default is unsuitable + if [ -z "$PYTHON_CMD" ]; then + # Loop through all available Python3 commands on the system + for CMD in $(compgen -c | grep -E '^python3(\.[0-9]+)?$'); do + if command -v "$CMD" &> /dev/null; then + PYTHON_VERSION=$($CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')") + if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then + PYTHON_CMD="$CMD" + echo "Found suitable Python version: $PYTHON_VERSION ($CMD)" + break + fi + fi + done + fi # If no suitable Python version found, exit with an error if [ -z "$PYTHON_CMD" ]; then From c5fbed69d318fb5c9d1e4330a10fbd9e581c891d Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Mon, 6 Jan 2025 11:41:22 +0800 Subject: [PATCH 11/13] support higer python such as python 3.13 --- benchmarks/bench.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index eb6caa91d4092..cc23f7b3de244 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -599,6 +599,8 @@ data_h2o() { [ "$(printf '%s\n' "$1" "$2" | sort -V | head -n1)" = "$2" ] } + export PYO3_USE_ABI3_FORWARD_COMPATIBILITY=1 + # Find the highest available Python version (3.10 or higher) REQUIRED_VERSION="3.10" PYTHON_CMD=$(command -v python3 || true) From 0b92321ce1246d2e55db1435de6b924d77835ec7 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 9 Jan 2025 12:21:51 +0800 Subject: [PATCH 12/13] Addressed new comments --- benchmarks/README.md | 48 +++++++++++++++++++++++++++---------------- benchmarks/bench.sh | 39 ++++++++--------------------------- benchmarks/src/h2o.rs | 4 +++- 3 files changed, 42 insertions(+), 49 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index cccd7f44f5047..dc1b2d0ed410b 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -32,7 +32,7 @@ DataFusion is included in the benchmark setups for several popular benchmarks that compare performance with other engines. For example: * [ClickBench] scripts are in the [ClickBench repo](https://github.com/ClickHouse/ClickBench/tree/main/datafusion) -* [H2o.ai `db-benchmark`] scripts are in [db-benchmark](db-benchmark) directory +* [H2o.ai `db-benchmark`] scripts are in [db-benchmark](https://github.com/apache/datafusion/tree/main/benchmarks/src/h2o.rs) [ClickBench]: https://github.com/ClickHouse/ClickBench/tree/main [H2o.ai `db-benchmark`]: https://github.com/h2oai/db-benchmark @@ -405,31 +405,43 @@ cargo run --release --bin external_aggr -- benchmark -n 4 --iterations 3 -p '... ``` -# Older Benchmarks +## h2o benchmarks for groupby -## h2o benchmarks +### Generate data for h2o benchmarks +There are three options for generating data for h2o benchmarks: `small`, `medium`, and `big`. The data is generated in the `data` directory. +1. Generate small data (1e7 rows) ```bash -cargo run --release --bin h2o group-by --query 1 --path /mnt/bigdata/h2oai/N_1e7_K_1e2_single.csv --mem-table --debug +./bench.sh data h2o_small ``` -Example run: +2. Generate medium data (1e8 rows) +```bash +./bench.sh data h2o_medium ``` -Running benchmarks with the following options: GroupBy(GroupBy { query: 1, path: "/mnt/bigdata/h2oai/N_1e7_K_1e2_single.csv", debug: false }) -Executing select id1, sum(v1) as v1 from x group by id1 -+-------+--------+ -| id1 | v1 | -+-------+--------+ -| id063 | 199420 | -| id094 | 200127 | -| id044 | 198886 | -... -| id093 | 200132 | -| id003 | 199047 | -+-------+--------+ -h2o groupby query 1 took 1669 ms + +3. Generate large data (1e9 rows) +```bash +./bench.sh data h2o_big +``` + +### Run h2o benchmarks +There are three options for running h2o benchmarks: `small`, `medium`, and `big`. +1. Run small data benchmark +```bash +./bench.sh run h2o_small +``` + +2. Run medium data benchmark +```bash +./bench.sh run h2o_medium +``` + +3. Run large data benchmark +```bash +./bench.sh run h2o_big ``` [1]: http://www.tpc.org/tpch/ diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index cc23f7b3de244..20cb32722c879 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -80,12 +80,9 @@ clickbench_1: ClickBench queries against a single parquet file clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) external_aggr: External aggregation benchmark -h2o_small: h2oai benchmark with small dataset (1e7 rows), default file format is parquet -h2o_medium: h2oai benchmark with medium dataset (1e8 rows), default file format is parquet -h2o_big: h2oai benchmark with large dataset (1e9 rows), default file format is parquet -h2o_small_csv h2oai benchmark with small dataset (1e7 rows), file format is csv -h2o_medium_csv h2oai benchmark with medium dataset (1e8 rows), file format is csv -h2o_big_csv h2oai benchmark with large dataset (1e9 rows), file format is csv +h2o_small: h2oai benchmark with small dataset (1e7 rows), default file format is csv +h2o_medium: h2oai benchmark with medium dataset (1e8 rows), default file format is csv +h2o_big: h2oai benchmark with large dataset (1e9 rows), default file format is csv ********** * Supported Configuration (Environment Variables) @@ -182,21 +179,12 @@ main() { data_imdb ;; h2o_small) - data_h2o "SMALL" - ;; - h2o_medium) - data_h2o "MEDIUM" - ;; - h2o_big) - data_h2o "BIG" - ;; - h2o_small_csv) data_h2o "SMALL" "CSV" ;; - h2o_medium_csv) + h2o_medium) data_h2o "MEDIUM" "CSV" ;; - h2o_big_csv) + h2o_big) data_h2o "BIG" "CSV" ;; external_aggr) @@ -285,21 +273,12 @@ main() { run_imdb ;; h2o_small) - run_h2o "SMALL" "PARQUET" "groupby" - ;; - h2o_medium) - run_h2o "MEDIUM" "PARQUET" "groupby" - ;; - h2o_big) - run_h2o "BIG" "PARQUET" "groupby" - ;; - h2o_small_csv) run_h2o "SMALL" "CSV" "groupby" ;; - h2o_medium_csv) + h2o_medium) run_h2o "MEDIUM" "CSV" "groupby" ;; - h2o_big_csv) + h2o_big) run_h2o "BIG" "CSV" "groupby" ;; external_aggr) @@ -592,7 +571,7 @@ run_imdb() { data_h2o() { # Default values for size and data format SIZE=${1:-"SMALL"} - DATA_FORMAT=${2:-"PARQUET"} + DATA_FORMAT=${2:-"CSV"} # Function to compare Python versions version_ge() { @@ -669,7 +648,7 @@ data_h2o() { run_h2o() { # Default values for size and data format SIZE=${1:-"SMALL"} - DATA_FORMAT=${2:-"PARQUET"} + DATA_FORMAT=${2:-"CSV"} DATA_FORMAT=$(echo "$DATA_FORMAT" | tr '[:upper:]' '[:lower:]') RUN_Type=${3:-"groupby"} diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index 1d41e05a8ccfc..53a516ceb56d4 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -43,11 +43,13 @@ pub struct RunOpt { queries_path: PathBuf, /// Path to data file (parquet or csv) + /// Default value is the G1_1e7_1e7_100_0.csv file in the h2o benchmark + /// This is the small csv file with 10^7 rows #[structopt( parse(from_os_str), short = "p", long = "path", - default_value = "benchmarks/data/h2o/G1_1e7_1e7_100_0.parquet" + default_value = "benchmarks/data/h2o/G1_1e7_1e7_100_0.csv" )] path: PathBuf, From c0a972f758bfcfa58b88c09486a0992523eca996 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 9 Jan 2025 13:37:54 +0800 Subject: [PATCH 13/13] Add specific query example --- benchmarks/README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/benchmarks/README.md b/benchmarks/README.md index dc1b2d0ed410b..332cac8459d75 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -444,5 +444,12 @@ There are three options for running h2o benchmarks: `small`, `medium`, and `big` ./bench.sh run h2o_big ``` +4. Run a specific query with a specific data path + +For example, to run query 1 with the small data generated above: +```bash +cargo run --release --bin dfbench -- h2o --path ./benchmarks/data/h2o/G1_1e7_1e7_100_0.csv --query 1 +``` + [1]: http://www.tpc.org/tpch/ [2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page