From fb60711f32452784304ae2f970e60d1c0b13f940 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Mon, 13 May 2024 14:31:15 -0400 Subject: [PATCH 1/4] feat: porting ProgressiveEval from InfluxDB IOx --- datafusion/common/src/config.rs | 10 + datafusion/physical-plan/src/sorts/mod.rs | 1 + .../src/sorts/progressive_eval.rs | 1676 +++++++++++++++++ 3 files changed, 1687 insertions(+) create mode 100644 datafusion/physical-plan/src/sorts/progressive_eval.rs diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0f1d9b8f02644..62ff3a5ad318b 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -583,6 +583,16 @@ config_namespace! { /// When set to true, the optimizer will not attempt to convert Union to Interleave pub prefer_existing_union: bool, default = false + + + /// Number of input streams to prefect for ProgressiveEvalExec + /// Since ProgressiveEvalExec only polls one stream at a time in their stream order, + /// we do not need to prefetch all streams at once to save resources. However, if the + /// streams' IO time is way more than their CPU/procesing time, prefetching them will help + /// improve the performance. + /// Default is 2 which means we will prefetch one extra stream before polling the current one. + /// Increasing this value if IO time to read a stream is often much more than CPU time to process its previous one. + pub progressive_eval_num_prefetch_input_streams: usize, default = 2 } } diff --git a/datafusion/physical-plan/src/sorts/mod.rs b/datafusion/physical-plan/src/sorts/mod.rs index 7c084761fdc30..d8312f05f813d 100644 --- a/datafusion/physical-plan/src/sorts/mod.rs +++ b/datafusion/physical-plan/src/sorts/mod.rs @@ -22,6 +22,7 @@ mod cursor; mod index; mod merge; pub mod partial_sort; +pub mod progressive_eval; pub mod sort; pub mod sort_preserving_merge; mod stream; diff --git a/datafusion/physical-plan/src/sorts/progressive_eval.rs b/datafusion/physical-plan/src/sorts/progressive_eval.rs new file mode 100644 index 0000000000000..71c527fc54713 --- /dev/null +++ b/datafusion/physical-plan/src/sorts/progressive_eval.rs @@ -0,0 +1,1676 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the progressive eval plan + +use std::any::Any; +use std::borrow::Cow::Borrowed; +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue, Statistics}; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::{Distribution, Partitioning, PhysicalSortRequirement}; +use futures::{ready, Stream, StreamExt}; +use log::{debug, trace, warn}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::common::spawn_buffered; +use crate::metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, + MetricsSet, +}; +use crate::{ + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Metric, + PlanProperties, +}; + +/// ProgressiveEval return a stream of record batches in the order of its inputs. +/// It will stop when the number of output rows reach the given limit. +/// +/// This takes an input execution plan and a number n, and provided each partition of +/// the input plan is in an expected order, this operator will return top record batches that covers the top n rows +/// in the order of the input plan. +/// +/// ```text +/// ┌─────────────────────────┐ +/// │ ┌───┬───┬───┬───┐ │ +/// │ │ A │ B │ C │ D │ │──┐ +/// │ └───┴───┴───┴───┘ │ │ +/// └─────────────────────────┘ │ ┌───────────────────┐ ┌───────────────────────────────┐ +/// Stream 1 │ │ │ │ ┌───┬───╦═══╦───┬───╦═══╗ │ +/// ├─▶│ ProgressiveEval │───▶│ │ A │ B ║ C ║ D │ M ║ N ║ ... │ +/// │ │ │ │ └───┴─▲─╩═══╩───┴───╩═══╝ │ +/// ┌─────────────────────────┐ │ └───────────────────┘ └─┬─────┴───────────────────────┘ +/// │ ╔═══╦═══╗ │ │ +/// │ ║ M ║ N ║ │──┘ │ +/// │ ╚═══╩═══╝ │ Output only include top record batches that cover top N rows +/// └─────────────────────────┘ +/// Stream 2 +/// +/// +/// Input Streams Output stream +/// (in some order) (in same order) +/// ``` +#[derive(Debug, Clone)] +pub(crate) struct ProgressiveEvalExec { + /// Input plan + input: Arc, + + /// Corresponding value ranges of the input plan + /// None if the value ranges are not available + value_ranges: Option>, + + /// Execution metrics + metrics: ExecutionPlanMetricsSet, + + /// Optional number of rows to fetch. Stops producing rows after this fetch + fetch: Option, + + /// Cache holding plan properties like equivalences, output partitioning, output ordering etc. + cache: PlanProperties, +} + +impl ProgressiveEvalExec { + /// Create a new progressive execution plan + pub fn new( + input: Arc, + value_ranges: Option>, + fetch: Option, + ) -> Self { + let cache = Self::compute_properties(&input); + Self { + input, + value_ranges, + metrics: ExecutionPlanMetricsSet::new(), + fetch, + cache, + } + } + + /// Input schema + pub fn input(&self) -> &Arc { + &self.input + } + + /// This function creates the cache object that stores the plan properties such as equivalence properties, partitioning, ordering, etc. + fn compute_properties(input: &Arc) -> PlanProperties { + // progressive eval does not change the equivalence properties of its input + let eq_properties = input.equivalence_properties().clone(); + + // This node serializes all the data to a single partition + let output_partitioning = Partitioning::UnknownPartitioning(1); + + PlanProperties::new(eq_properties, output_partitioning, input.execution_mode()) + } +} + +impl DisplayAs for ProgressiveEvalExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter<'_>, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "ProgressiveEvalExec: ")?; + if let Some(fetch) = self.fetch { + write!(f, "fetch={fetch}, ")?; + }; + if let Some(value_ranges) = &self.value_ranges { + write!(f, "input_ranges={value_ranges:?}")?; + }; + + Ok(()) + } + } + } +} + +impl ExecutionPlan for ProgressiveEvalExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::UnspecifiedDistribution] + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + + fn required_input_ordering(&self) -> Vec>> { + let input_ordering = self + .input() + .properties() + .output_ordering() + .map(PhysicalSortRequirement::from_sort_exprs); + + vec![input_ordering] + } + + /// ProgressiveEvalExec will only accept sorted input + /// and will maintain the input order + fn maintains_input_order(&self) -> Vec { + vec![true] + } + + fn children(&self) -> Vec> { + vec![Arc::::clone(&self.input)] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(Self::new( + Arc::::clone(&children[0]), + self.value_ranges.clone(), + self.fetch, + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + trace!( + "Start ProgressiveEvalExec::execute for partition: {}", + partition + ); + if 0 != partition { + return internal_err!("ProgressiveEvalExec invalid partition {partition}"); + } + + let input_partitions = self + .input + .properties() + .output_partitioning() + .partition_count(); + trace!( + "Number of input partitions of ProgressiveEvalExec::execute: {}", + input_partitions + ); + let schema = self.schema(); + + // Add a metric to record the number of inputs + let num_inputs = Count::new(); + num_inputs.add( + self.input + .properties() + .output_partitioning() + .partition_count(), + ); + self.metrics.register(Arc::new(Metric::new( + MetricValue::Count { + name: Borrowed("num_inputs"), + count: num_inputs, + }, + None, + ))); + // Add a metric to record the number of inputs that are actually read which is <= num_inputs + let num_read_inputs_counter = + MetricBuilder::new(&self.metrics).global_counter("num_read_inputs"); + // Add other base line metrics + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + + let result = ProgressiveEvalStream::new( + Arc::clone(&self.input), + Arc::clone(&context), + schema, + baseline_metrics, + num_read_inputs_counter, + self.fetch, + )?; + + debug!("Got stream result from ProgressiveEvalStream::new_from_receivers"); + + Ok(Box::pin(result)) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Result { + self.input.statistics() + } +} + +/// Handle when to prefetch input streams and how to poll next record batch +struct InputStreams { + /// Input plan of the progressive eval exec + input_plan: Arc, + + /// Context of the progressive eval exec + context: Arc, + + /// Total input streams + input_stream_count: usize, + + /// Number of input streams to prefetch + num_input_streams_to_prefetch: usize, + + /// Index of current stream + current_stream_idx: usize, + + /// Input stream to poll data + current_input_stream: Option, + + /// Prefetched Input streams + prefetched_input_streams: Vec, + + /// Used to record number of actually read input streams + num_read_inputs_counter: Count, +} + +impl InputStreams { + fn new( + input_plan: Arc, + context: Arc, + num_input_streams_to_prefetch: usize, + num_read_inputs_counter: Count, + ) -> Result { + let input_stream_count = input_plan + .properties() + .output_partitioning() + .partition_count(); + + let current_stream_idx = 0; + let mut current_input_stream = None; + let mut capacity = 0; + if num_input_streams_to_prefetch > 1 { + capacity = num_input_streams_to_prefetch - 1; + } else { + warn!("num_input_streams_to_prefetch is {num_input_streams_to_prefetch} and not greater than 1"); + } + let mut prefetched_input_streams = Vec::with_capacity(capacity); + + for i in 0..num_input_streams_to_prefetch { + if i >= input_stream_count { + break; + } + + let input_stream = spawn_buffered( + input_plan.execute(i, Arc::::clone(&context))?, + 1, + ); + num_read_inputs_counter.add(1); + + if i == 0 { + current_input_stream = Some(input_stream); + } else { + prefetched_input_streams.push(input_stream); + } + } + + Ok(Self { + input_plan, + context, + input_stream_count, + num_input_streams_to_prefetch, + current_stream_idx, + current_input_stream, + prefetched_input_streams, + num_read_inputs_counter, + }) + } + + /// Set next available stream to current_input_stream + /// Also prefetch one more input stream if not all of them are prefetched yet + fn next_stream(&mut self) { + // No more input stream + if self.current_stream_idx >= self.input_stream_count { + // panic if we have not reached the end of all input streams + assert!( + self.prefetched_input_streams.is_empty(), + "Internal error in ProgressiveEvalStream: There should not have input streams left to read",); + + self.current_input_stream = None; + } else { + // prefetch one more input stream before setting next strem to the current input stream + if self.current_stream_idx + self.num_input_streams_to_prefetch + < self.input_stream_count + { + self.num_read_inputs_counter.add(1); + self.prefetched_input_streams.push(spawn_buffered( + self.input_plan + .execute( + self.current_stream_idx + self.num_input_streams_to_prefetch, + Arc::::clone(&self.context), + ) + .unwrap(), + 1, + )); + } + + self.current_stream_idx += 1; + if self.prefetched_input_streams.is_empty() { + self.current_input_stream = None; + } else { + self.current_input_stream = Some(self.prefetched_input_streams.remove(0)); + } + } + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + // All input streams have been read + if self.current_input_stream.is_none() { + return Poll::Ready(None); + } + + // Get next record batch + let mut poll; + loop { + poll = self + .current_input_stream + .as_mut() + .unwrap() + .poll_next_unpin(cx); + match poll { + // This input stream no longer has data, move to next stream + Poll::Ready(None) => { + self.next_stream(); + if self.current_input_stream.is_none() { + // Have reached the end of all input streams + return Poll::Ready(None); + } + } + _ => break, + } + } + + poll + } +} + +/// Concat input streams until reaching the fetch limit +struct ProgressiveEvalStream { + /// Input streams + input_streams: InputStreams, + + /// The schema of the input and output. + schema: SchemaRef, + + /// used to record execution baseline metrics + baseline_metrics: BaselineMetrics, + + /// If the stream has encountered an error + aborted: bool, + + /// Optional number of rows to fetch + fetch: Option, + + /// number of rows produced + produced: usize, +} + +impl ProgressiveEvalStream { + fn new( + input_plan: Arc, + context: Arc, + schema: SchemaRef, + baseline_metrics: BaselineMetrics, + num_read_inputs_counter: Count, + fetch: Option, + ) -> Result { + // Use config param to set number of prefetch stream + let mut num_input_streams_to_prefetch = context + .session_config() + .options() + .optimizer + .progressive_eval_num_prefetch_input_streams; + + // If there is no limit of number of rows to fecth, prefetch all input streams + if fetch.is_none() { + num_input_streams_to_prefetch = input_plan + .properties() + .output_partitioning() + .partition_count() + } + + let input_streams = InputStreams::new( + input_plan, + context, + num_input_streams_to_prefetch, + num_read_inputs_counter, + )?; + + Ok(Self { + input_streams, + schema, + baseline_metrics, + aborted: false, + fetch, + produced: 0, + }) + } +} + +impl Stream for ProgressiveEvalStream { + type Item = Result; + + // Return the next record batch until reaching the fetch limit or the end of all input streams + // Return pending if the next record batch is not ready + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + // Error in previous poll + if self.aborted { + return Poll::Ready(None); + } + + // Have reached the fetch limit + if self.produced >= self.fetch.unwrap_or(std::usize::MAX) { + return Poll::Ready(None); + } + + let poll = self.input_streams.poll_next(cx); + + let poll = match ready!(poll) { + // This input stream has data, return its next record batch + Some(Ok(batch)) => { + self.produced += batch.num_rows(); + Poll::Ready(Some(Ok(batch))) + } + // This input stream has an error, return the error and set aborted to true to stop polling next round + Some(Err(e)) => { + self.aborted = true; + Poll::Ready(Some(Err(e))) + } + // This input stream has no more data, return None (aka finished) + None => { + // Reaching here means data of all streams have read + Poll::Ready(None) + } + }; + + self.baseline_metrics.record_poll(poll) + } +} + +impl RecordBatchStream for ProgressiveEvalStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +#[cfg(test)] +mod tests { + use std::iter::FromIterator; + + use crate::collect; + use crate::memory::MemoryExec; + use crate::metrics::Timestamp; + use crate::test::assert_is_pending; + use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; + use arrow::array::ArrayRef; + use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray}; + use arrow::datatypes::Schema; + use arrow::datatypes::{DataType, Field}; + use arrow::record_batch::RecordBatch; + use datafusion_common::assert_batches_eq; + use futures::FutureExt; + + use super::*; + + #[tokio::test] + async fn test_no_input_stream() { + let task_ctx = Arc::new(TaskContext::default()); + + // no fetch limit --> return all rows + _test_progressive_eval( + &[], + None, + None, + &["++", "++"], + 0, // 0 input stream + 0, // 0 input stream is prefetched and polled + Arc::clone(&task_ctx), + ) + .await; + + // limit = 0 means select nothing + _test_progressive_eval( + &[], + None, + Some(0), + &["++", "++"], + 0, // 0 input stream + 0, // 0 input stream is prefetched and polled + Arc::clone(&task_ctx), + ) + .await; + + // limit = 1 on no data + _test_progressive_eval( + &[], + None, + Some(1), + &["++", "++"], + 0, // 0 input stream + 0, // 0 input stream is prefetched and polled + task_ctx, + ) + .await; + } + + #[tokio::test] + async fn test_one_input_stream() { + let task_ctx = Arc::new(TaskContext::default()); + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("a"), + Some("c"), + Some("e"), + Some("g"), + Some("j"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + // return all + _test_progressive_eval( + &[vec![b1.clone()]], + None, + None, // no fetch limit --> return all rows + &[ + "+---+---+-------------------------------+", + "| a | b | c |", + "+---+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | c | 1970-01-01T00:00:00.000000007 |", + "| 7 | e | 1970-01-01T00:00:00.000000006 |", + "| 9 | g | 1970-01-01T00:00:00.000000005 |", + "| 3 | j | 1970-01-01T00:00:00.000000008 |", + "+---+---+-------------------------------+", + ], + 1, // 1 input stream + 1, // 1 input stream is prefetched and polled + Arc::clone(&task_ctx), + ) + .await; + + // fetch no rows + _test_progressive_eval( + &[vec![b1.clone()]], + None, + Some(0), + &["++", "++"], + 1, + 1, + Arc::clone(&task_ctx), + ) + .await; + + // still return all even select 3 rows becasue first record batch is returned + _test_progressive_eval( + &[vec![b1.clone()]], + None, + Some(3), + &[ + "+---+---+-------------------------------+", + "| a | b | c |", + "+---+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | c | 1970-01-01T00:00:00.000000007 |", + "| 7 | e | 1970-01-01T00:00:00.000000006 |", + "| 9 | g | 1970-01-01T00:00:00.000000005 |", + "| 3 | j | 1970-01-01T00:00:00.000000008 |", + "+---+---+-------------------------------+", + ], + 1, // 1 input stream + 1, // 1 input stream is prefetched and polled + Arc::clone(&task_ctx), + ) + .await; + + // return all because fetch limit is larger + _test_progressive_eval( + &[vec![b1.clone()]], + None, + Some(7), + &[ + "+---+---+-------------------------------+", + "| a | b | c |", + "+---+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | c | 1970-01-01T00:00:00.000000007 |", + "| 7 | e | 1970-01-01T00:00:00.000000006 |", + "| 9 | g | 1970-01-01T00:00:00.000000005 |", + "| 3 | j | 1970-01-01T00:00:00.000000008 |", + "+---+---+-------------------------------+", + ], + 1, // 1 input stream + 1, // 1 input stream is prefetched and polled + Arc::clone(&task_ctx), + ) + .await; + } + + #[tokio::test] + async fn test_return_all() { + let task_ctx = Arc::new(TaskContext::default()); + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("a"), + Some("c"), + Some("e"), + Some("g"), + Some("j"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let a: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 70, 90, 30])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("b"), + Some("d"), + Some("f"), + Some("h"), + Some("j"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2, 6])); + let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + // [b1, b2] + // return all by not specifying fetch limit + _test_progressive_eval( + &[vec![b1.clone()], vec![b2.clone()]], + None, + None, // no fetch limit --> return all rows + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | c | 1970-01-01T00:00:00.000000007 |", + "| 7 | e | 1970-01-01T00:00:00.000000006 |", + "| 9 | g | 1970-01-01T00:00:00.000000005 |", + "| 3 | j | 1970-01-01T00:00:00.000000008 |", + "| 10 | b | 1970-01-01T00:00:00.000000004 |", + "| 20 | d | 1970-01-01T00:00:00.000000006 |", + "| 70 | f | 1970-01-01T00:00:00.000000002 |", + "| 90 | h | 1970-01-01T00:00:00.000000002 |", + "| 30 | j | 1970-01-01T00:00:00.000000006 |", + "+----+---+-------------------------------+", + ], + 2, // 2 input streams + 2, // all 2 input streams are prefetched and polled + Arc::clone(&task_ctx), + ) + .await; + + // [b1, b2] + // return all by specifying large limit + _test_progressive_eval( + &[vec![b1.clone()], vec![b2.clone()]], + None, + Some(10), // limit = max num rows --> return all rows + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | c | 1970-01-01T00:00:00.000000007 |", + "| 7 | e | 1970-01-01T00:00:00.000000006 |", + "| 9 | g | 1970-01-01T00:00:00.000000005 |", + "| 3 | j | 1970-01-01T00:00:00.000000008 |", + "| 10 | b | 1970-01-01T00:00:00.000000004 |", + "| 20 | d | 1970-01-01T00:00:00.000000006 |", + "| 70 | f | 1970-01-01T00:00:00.000000002 |", + "| 90 | h | 1970-01-01T00:00:00.000000002 |", + "| 30 | j | 1970-01-01T00:00:00.000000006 |", + "+----+---+-------------------------------+", + ], + 2, // 2 input streams + 2, // all 2 input streams are prefetched and polled + Arc::clone(&task_ctx), + ) + .await; + + // [b2, b1] + // return all by not specifying fetch limit + _test_progressive_eval( + &[vec![b2.clone()], vec![b1.clone()]], + None, + None, + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 10 | b | 1970-01-01T00:00:00.000000004 |", + "| 20 | d | 1970-01-01T00:00:00.000000006 |", + "| 70 | f | 1970-01-01T00:00:00.000000002 |", + "| 90 | h | 1970-01-01T00:00:00.000000002 |", + "| 30 | j | 1970-01-01T00:00:00.000000006 |", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | c | 1970-01-01T00:00:00.000000007 |", + "| 7 | e | 1970-01-01T00:00:00.000000006 |", + "| 9 | g | 1970-01-01T00:00:00.000000005 |", + "| 3 | j | 1970-01-01T00:00:00.000000008 |", + "+----+---+-------------------------------+", + ], + 2, // 2 input streams + 2, // all 2 input streams are prefetched and polled + Arc::clone(&task_ctx), + ) + .await; + + // [b2, b1] + // return all by specifying large limit + _test_progressive_eval( + &[vec![b2], vec![b1]], + None, + Some(20), + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 10 | b | 1970-01-01T00:00:00.000000004 |", + "| 20 | d | 1970-01-01T00:00:00.000000006 |", + "| 70 | f | 1970-01-01T00:00:00.000000002 |", + "| 90 | h | 1970-01-01T00:00:00.000000002 |", + "| 30 | j | 1970-01-01T00:00:00.000000006 |", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | c | 1970-01-01T00:00:00.000000007 |", + "| 7 | e | 1970-01-01T00:00:00.000000006 |", + "| 9 | g | 1970-01-01T00:00:00.000000005 |", + "| 3 | j | 1970-01-01T00:00:00.000000008 |", + "+----+---+-------------------------------+", + ], + 2, // 2 input streams + 2, // all 2 input streams are prefetched and polled + task_ctx, + ) + .await; + } + + #[tokio::test] + async fn test_return_all_on_different_length_batches() { + let task_ctx = Arc::new(TaskContext::default()); + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("a"), + Some("b"), + Some("c"), + Some("d"), + Some("e"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let a: ArrayRef = Arc::new(Int32Array::from(vec![70, 90, 30])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("c"), + Some("d"), + Some("e"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2])); + let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + // [b1, b2] + _test_progressive_eval( + &[vec![b1.clone()], vec![b2.clone()]], + None, + None, + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | d | 1970-01-01T00:00:00.000000005 |", + "| 3 | e | 1970-01-01T00:00:00.000000008 |", + "| 70 | c | 1970-01-01T00:00:00.000000004 |", + "| 90 | d | 1970-01-01T00:00:00.000000006 |", + "| 30 | e | 1970-01-01T00:00:00.000000002 |", + "+----+---+-------------------------------+", + ], + 2, // 2 input streams + 2, // all 2 input streams are prefetched and polled + Arc::clone(&task_ctx), + ) + .await; + + // [b2, b1] + _test_progressive_eval( + &[vec![b2], vec![b1]], + None, + None, + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 70 | c | 1970-01-01T00:00:00.000000004 |", + "| 90 | d | 1970-01-01T00:00:00.000000006 |", + "| 30 | e | 1970-01-01T00:00:00.000000002 |", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | d | 1970-01-01T00:00:00.000000005 |", + "| 3 | e | 1970-01-01T00:00:00.000000008 |", + "+----+---+-------------------------------+", + ], + 2, // 2 input streams + 2, // all 2 input streams are prefetched and polled + task_ctx, + ) + .await; + } + + #[tokio::test] + async fn test_fetch_limit_1() { + let task_ctx = Arc::new(TaskContext::default()); + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("a"), + Some("b"), + Some("c"), + Some("d"), + Some("e"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let a: ArrayRef = Arc::new(Int32Array::from(vec![70, 90, 30])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("c"), + Some("d"), + Some("e"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2])); + let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + // [b2, b1] + // b2 has 3 rows. b1 has 5 rows + // Fetch limit is 1 --> return all 3 rows of the first batch (b2) that covers that limit + _test_progressive_eval( + &[vec![b2.clone()], vec![b1.clone()]], + None, + Some(1), + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 70 | c | 1970-01-01T00:00:00.000000004 |", + "| 90 | d | 1970-01-01T00:00:00.000000006 |", + "| 30 | e | 1970-01-01T00:00:00.000000002 |", + "+----+---+-------------------------------+", + ], + 2, // 2 input streams + 2, // all 2 input streams are prefetched by default even thouggh only the first one is actally polled + Arc::clone(&task_ctx), + ) + .await; + + // [b1, b2] + // b1 has 5 rows. b2 has 3 rows + // Fetch limit is 1 --> return all 5 rows of the first batch (b1) that covers that limit + _test_progressive_eval( + &[vec![b1], vec![b2]], + None, + Some(1), + &[ + "+---+---+-------------------------------+", + "| a | b | c |", + "+---+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | d | 1970-01-01T00:00:00.000000005 |", + "| 3 | e | 1970-01-01T00:00:00.000000008 |", + "+---+---+-------------------------------+", + ], + 2, // 2 input streams + 2, // all 2 input streams are prefetched by default even thouggh only the first one is actally polled + task_ctx, + ) + .await; + } + + #[tokio::test] + async fn test_fetch_limit_equal_first_batch_size() { + let task_ctx = Arc::new(TaskContext::default()); + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("a"), + Some("b"), + Some("c"), + Some("d"), + Some("e"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let a: ArrayRef = Arc::new(Int32Array::from(vec![70, 90, 30])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("c"), + Some("d"), + Some("e"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2])); + let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + // [b2, b1] + // b2 has 3 rows. b1 has 5 rows + // Fetch limit is 3 --> return all 3 rows of the first batch (b2) that covers that limit + _test_progressive_eval( + &[vec![b2.clone()], vec![b1.clone()]], + None, + Some(3), + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 70 | c | 1970-01-01T00:00:00.000000004 |", + "| 90 | d | 1970-01-01T00:00:00.000000006 |", + "| 30 | e | 1970-01-01T00:00:00.000000002 |", + "+----+---+-------------------------------+", + ], + 2, // 2 input streams + 2, // all 2 input streams are prefetched by default even though only the first one is actally polled + Arc::clone(&task_ctx), + ) + .await; + + // [b1, b2] + // b1 has 5 rows. b2 has 3 rows + // Fetch limit is 5 --> return all 5 rows of first batch (b1) that covers that limit + _test_progressive_eval( + &[vec![b1], vec![b2]], + None, + Some(5), + &[ + "+---+---+-------------------------------+", + "| a | b | c |", + "+---+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | d | 1970-01-01T00:00:00.000000005 |", + "| 3 | e | 1970-01-01T00:00:00.000000008 |", + "+---+---+-------------------------------+", + ], + 2, // 2 input streams + 2, // all 2 input streams are prefetched by default even though only the first one is actally polled + task_ctx, + ) + .await; + } + + #[tokio::test] + async fn test_fetch_limit_over_first_batch_size() { + let task_ctx = Arc::new(TaskContext::default()); + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("a"), + Some("b"), + Some("c"), + Some("d"), + Some("e"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let a: ArrayRef = Arc::new(Int32Array::from(vec![70, 90, 30])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("c"), + Some("d"), + Some("e"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2])); + let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + // [b2, b1] + // b2 has 3 rows. b1 has 5 rows + // Fetch limit is 4 --> return all rows of both batches in the order of b2, b1 + _test_progressive_eval( + &[vec![b2.clone()], vec![b1.clone()]], + None, + Some(4), + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 70 | c | 1970-01-01T00:00:00.000000004 |", + "| 90 | d | 1970-01-01T00:00:00.000000006 |", + "| 30 | e | 1970-01-01T00:00:00.000000002 |", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | d | 1970-01-01T00:00:00.000000005 |", + "| 3 | e | 1970-01-01T00:00:00.000000008 |", + "+----+---+-------------------------------+", + ], + 2, // 2 input streams + 2, // all 2 input streams are prefetched and polled + Arc::clone(&task_ctx), + ) + .await; + + // [b1, b2] + // b1 has 5 rows. b2 has 3 rows + // Fetch limit is 6 --> return all rows of both batches in the order of b1, b2 + _test_progressive_eval( + &[vec![b1], vec![b2]], + None, + Some(6), + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | d | 1970-01-01T00:00:00.000000005 |", + "| 3 | e | 1970-01-01T00:00:00.000000008 |", + "| 70 | c | 1970-01-01T00:00:00.000000004 |", + "| 90 | d | 1970-01-01T00:00:00.000000006 |", + "| 30 | e | 1970-01-01T00:00:00.000000002 |", + "+----+---+-------------------------------+", + ], + 2, // 2 input streams + 2, // all 2 input streams are prefetched and polled + task_ctx, + ) + .await; + } + + #[tokio::test] + async fn test_three_partitions_with_nulls() { + let task_ctx = Arc::new(TaskContext::default()); + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("a"), + Some("b"), + Some("c"), + None, + Some("f"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let a: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 70])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("e"), + Some("g"), + Some("h"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![40, 60, 20])); + let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let a: ArrayRef = Arc::new(Int32Array::from(vec![100, 200, 700, 900])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + None, + Some("g"), + Some("h"), + Some("i"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2])); + let b3 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + // [b1, b2, b3] + // b1 has 5 rows. b2 has 3 rows. b3 has 4 rows + // Fetch limit is 1 --> return all rows of the b1 + _test_progressive_eval( + &[vec![b1.clone()], vec![b2.clone()], vec![b3.clone()]], + None, + Some(1), + &[ + "+---+---+-------------------------------+", + "| a | b | c |", + "+---+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | | 1970-01-01T00:00:00.000000005 |", + "| 3 | f | 1970-01-01T00:00:00.000000008 |", + "+---+---+-------------------------------+", + ], + 3, // 3 input streams + 2, // 2 input streams are prefetched by default even though only the first one is polled + Arc::clone(&task_ctx), + ) + .await; + + // [b1, b2, b3] + // b1 has 5 rows. b2 has 3 rows. b3 has 4 rows + // Fetch limit is 7 --> return all rows of the b1 & b2 in the order of b1, b2 + _test_progressive_eval( + &[vec![b1.clone()], vec![b2.clone()], vec![b3.clone()]], + None, + Some(7), + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | | 1970-01-01T00:00:00.000000005 |", + "| 3 | f | 1970-01-01T00:00:00.000000008 |", + "| 10 | e | 1970-01-01T00:00:00.000000040 |", + "| 20 | g | 1970-01-01T00:00:00.000000060 |", + "| 70 | h | 1970-01-01T00:00:00.000000020 |", + "+----+---+-------------------------------+", + ], + 3, // 3 input streams + 3, // since we need to poll 2 input streams, 3 streams are prefetched. Always one extra stream is prefetched + Arc::clone(&task_ctx), + ) + .await; + + // [b1, b2, b3] + // b1 has 5 rows. b2 has 3 rows. b3 has 4 rows + // Fetch limit is 50 --> return all rows of all batches in the order of b1, b2, b3 + _test_progressive_eval( + &[vec![b1], vec![b2], vec![b3]], + None, + Some(50), + &[ + "+-----+---+-------------------------------+", + "| a | b | c |", + "+-----+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | | 1970-01-01T00:00:00.000000005 |", + "| 3 | f | 1970-01-01T00:00:00.000000008 |", + "| 10 | e | 1970-01-01T00:00:00.000000040 |", + "| 20 | g | 1970-01-01T00:00:00.000000060 |", + "| 70 | h | 1970-01-01T00:00:00.000000020 |", + "| 100 | | 1970-01-01T00:00:00.000000004 |", + "| 200 | g | 1970-01-01T00:00:00.000000006 |", + "| 700 | h | 1970-01-01T00:00:00.000000002 |", + "| 900 | i | 1970-01-01T00:00:00.000000002 |", + "+-----+---+-------------------------------+", + ], + 3, // 3 input streams + 3, // 3 input streams are prefetched and polled + task_ctx, + ) + .await; + } + + #[tokio::test] + async fn test_four_partitions_with_nulls() { + let task_ctx = Arc::new(TaskContext::default()); + + // partition 1 + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("a"), + Some("b"), + Some("c"), + None, + Some("f"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + // partition 2 + let a: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 70])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("e"), + Some("g"), + Some("h"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![40, 60, 20])); + let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + // partition 3 + let a: ArrayRef = Arc::new(Int32Array::from(vec![100, 200, 700, 900])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + None, + Some("g"), + Some("h"), + Some("i"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2])); + let b3 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + // partition 4 + let a: ArrayRef = Arc::new(Int32Array::from(vec![1000, 2000])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![None, Some("x")])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![40, 60])); + let b4 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + // [b1, b2, b3, b4] + // b1 has 5 rows. b2 has 3 rows. b3 has 4 rows. b4 has 2 rows + // Fetch limit is 0 --> return nothing. + // Prefetch minum 2 input streams + _test_progressive_eval( + &[ + vec![b1.clone()], + vec![b2.clone()], + vec![b3.clone()], + vec![b4.clone()], + ], + None, + Some(0), + &["++", "++"], + 4, // 4 input streams + 2, // 2 input streams are prefetched by default even though nothing is polled + Arc::clone(&task_ctx), + ) + .await; + + // [b1, b2, b3, b4] + // b1 has 5 rows. b2 has 3 rows. b3 has 4 rows. b4 has 2 rows + // Fetch limit is 3 --> return all 5 rows of b1 + // Prefetch minum 2 input streams + _test_progressive_eval( + &[ + vec![b1.clone()], + vec![b2.clone()], + vec![b3.clone()], + vec![b4.clone()], + ], + None, + Some(3), + &[ + "+---+---+-------------------------------+", + "| a | b | c |", + "+---+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | | 1970-01-01T00:00:00.000000005 |", + "| 3 | f | 1970-01-01T00:00:00.000000008 |", + "+---+---+-------------------------------+", + ], + 4, // 4 input streams + 2, // 2 input streams are prefetched and one stream is polled + Arc::clone(&task_ctx), + ) + .await; + + // [b1, b2, b3, b4] + // b1 has 5 rows. b2 has 3 rows. b3 has 4 rows. b4 has 2 rows + // Fetch limit is 5 --> return all 5 rows of b1 + // Prefetch minum 2 input streams + _test_progressive_eval( + &[ + vec![b1.clone()], + vec![b2.clone()], + vec![b3.clone()], + vec![b4.clone()], + ], + None, + Some(5), + &[ + "+---+---+-------------------------------+", + "| a | b | c |", + "+---+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | | 1970-01-01T00:00:00.000000005 |", + "| 3 | f | 1970-01-01T00:00:00.000000008 |", + "+---+---+-------------------------------+", + ], + 4, // 4 input streams + 2, // 2 input streams are prefetched and one stream is polled + Arc::clone(&task_ctx), + ) + .await; + + // [b1, b2, b3, b4] + // b1 has 5 rows. b2 has 3 rows. b3 has 4 rows. b4 has 2 rows + // Fetch limit is 8 --> return all 8 rows of b1 and b2 + // Prefetched 3 input streams since we will always prefetch one extra one + _test_progressive_eval( + &[ + vec![b1.clone()], + vec![b2.clone()], + vec![b3.clone()], + vec![b4.clone()], + ], + None, + Some(8), + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | | 1970-01-01T00:00:00.000000005 |", + "| 3 | f | 1970-01-01T00:00:00.000000008 |", + "| 10 | e | 1970-01-01T00:00:00.000000040 |", + "| 20 | g | 1970-01-01T00:00:00.000000060 |", + "| 70 | h | 1970-01-01T00:00:00.000000020 |", + "+----+---+-------------------------------+", + ], + 4, // 4 input streams + 3, // 3 input streams are prefetched and 2 streams are polled + Arc::clone(&task_ctx), + ) + .await; + + // [b1, b2, b3, b4] + // b1 has 5 rows. b2 has 3 rows. b3 has 4 rows. b4 has 2 rows + // Fetch limit is 12 --> return all 12 rows of b1, b2 and b3 + // Prefetch 4 input streams since we will always prefetch one extra one + _test_progressive_eval( + &[ + vec![b1.clone()], + vec![b2.clone()], + vec![b3.clone()], + vec![b4.clone()], + ], + None, + Some(12), + &[ + "+-----+---+-------------------------------+", + "| a | b | c |", + "+-----+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | | 1970-01-01T00:00:00.000000005 |", + "| 3 | f | 1970-01-01T00:00:00.000000008 |", + "| 10 | e | 1970-01-01T00:00:00.000000040 |", + "| 20 | g | 1970-01-01T00:00:00.000000060 |", + "| 70 | h | 1970-01-01T00:00:00.000000020 |", + "| 100 | | 1970-01-01T00:00:00.000000004 |", + "| 200 | g | 1970-01-01T00:00:00.000000006 |", + "| 700 | h | 1970-01-01T00:00:00.000000002 |", + "| 900 | i | 1970-01-01T00:00:00.000000002 |", + "+-----+---+-------------------------------+", + ], + 4, // 4 input streams + 4, // 4 input streams are prefetched and 3 streams are polled + Arc::clone(&task_ctx), + ) + .await; + + // [b1, b2, b3, b4] + // b1 has 5 rows. b2 has 3 rows. b3 has 4 rows. b4 has 2 rows + // Fetch limit is 15 --> return all 15 rows of b1, b2, b3 and b4 + // Prefetched all 4 input streams + _test_progressive_eval( + &[ + vec![b1.clone()], + vec![b2.clone()], + vec![b3.clone()], + vec![b4.clone()], + ], + None, + Some(15), + &[ + "+------+---+-------------------------------+", + "| a | b | c |", + "+------+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | | 1970-01-01T00:00:00.000000005 |", + "| 3 | f | 1970-01-01T00:00:00.000000008 |", + "| 10 | e | 1970-01-01T00:00:00.000000040 |", + "| 20 | g | 1970-01-01T00:00:00.000000060 |", + "| 70 | h | 1970-01-01T00:00:00.000000020 |", + "| 100 | | 1970-01-01T00:00:00.000000004 |", + "| 200 | g | 1970-01-01T00:00:00.000000006 |", + "| 700 | h | 1970-01-01T00:00:00.000000002 |", + "| 900 | i | 1970-01-01T00:00:00.000000002 |", + "| 1000 | | 1970-01-01T00:00:00.000000040 |", + "| 2000 | x | 1970-01-01T00:00:00.000000060 |", + "+------+---+-------------------------------+", + ], + 4, // 4 input streams + 4, // 4 input streams are prefetched and polled + Arc::clone(&task_ctx), + ) + .await; + + // [b1, b2, b3, b4] + // b1 has 5 rows. b2 has 3 rows. b3 has 4 rows. b4 has 2 rows + // No fetch limit--> return all 15 rows of b1, b2, b3 and b4 + // Prefetched all 4 input streams + _test_progressive_eval( + &[ + vec![b1.clone()], + vec![b2.clone()], + vec![b3.clone()], + vec![b4.clone()], + ], + None, + None, // No fetch limit + &[ + "+------+---+-------------------------------+", + "| a | b | c |", + "+------+---+-------------------------------+", + "| 1 | a | 1970-01-01T00:00:00.000000008 |", + "| 2 | b | 1970-01-01T00:00:00.000000007 |", + "| 7 | c | 1970-01-01T00:00:00.000000006 |", + "| 9 | | 1970-01-01T00:00:00.000000005 |", + "| 3 | f | 1970-01-01T00:00:00.000000008 |", + "| 10 | e | 1970-01-01T00:00:00.000000040 |", + "| 20 | g | 1970-01-01T00:00:00.000000060 |", + "| 70 | h | 1970-01-01T00:00:00.000000020 |", + "| 100 | | 1970-01-01T00:00:00.000000004 |", + "| 200 | g | 1970-01-01T00:00:00.000000006 |", + "| 700 | h | 1970-01-01T00:00:00.000000002 |", + "| 900 | i | 1970-01-01T00:00:00.000000002 |", + "| 1000 | | 1970-01-01T00:00:00.000000040 |", + "| 2000 | x | 1970-01-01T00:00:00.000000060 |", + "+------+---+-------------------------------+", + ], + 4, // 4 input streams + 4, // 4 input streams are prefetched and polled + Arc::clone(&task_ctx), + ) + .await; + } + + async fn _test_progressive_eval( + partitions: &[Vec], + value_ranges: Option>, + fetch: Option, + expected_result: &[&str], + expected_num_input_streams: usize, + expected_num_read_input_streams: usize, + context: Arc, + ) { + let schema = if partitions.is_empty() { + // just whatwever schema + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); + let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); + batch.schema() + } else { + partitions[0][0].schema() + }; + + let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let progressive = Arc::new(ProgressiveEvalExec::new( + Arc::new(exec), + value_ranges, + fetch, + )); + + let progressive_clone = Arc::clone(&progressive); + + let collected = collect(progressive, context).await.unwrap(); + assert_batches_eq!(expected_result, collected.as_slice()); + + // verify metrics + let metrics = progressive_clone.metrics().unwrap(); + let num_input_streams = Count::new(); + num_input_streams.add(expected_num_input_streams); + let num_read_input_streams = Count::new(); + num_read_input_streams.add(expected_num_read_input_streams); + + assert_eq!( + metrics.sum_by_name("num_inputs"), + Some(MetricValue::Count { + name: Borrowed("num_inputs"), + count: num_input_streams + }) + ); + assert_eq!( + metrics.sum_by_name("num_read_inputs"), + Some(MetricValue::Count { + name: Borrowed("num_read_inputs"), + count: num_read_input_streams + }) + ); + } + + #[tokio::test] + async fn test_merge_metrics() { + let task_ctx = Arc::new(TaskContext::default()); + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"), Some("c")])); + let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); + + let a: ArrayRef = Arc::new(Int32Array::from(vec![10, 20])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("b"), Some("d")])); + let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); + + let schema = b1.schema(); + let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap(); + let progressive = Arc::new(ProgressiveEvalExec::new(Arc::new(exec), None, None)); + + let collected = + collect(Arc::::clone(&progressive), task_ctx) + .await + .unwrap(); + let expected = [ + "+----+---+", + "| a | b |", + "+----+---+", + "| 1 | a |", + "| 2 | c |", + "| 10 | b |", + "| 20 | d |", + "+----+---+", + ]; + assert_batches_eq!(expected, collected.as_slice()); + + // Now, validate metrics + let metrics = progressive.metrics().unwrap(); + + assert_eq!(metrics.output_rows().unwrap(), 4); + assert!(metrics.elapsed_compute().unwrap() > 0); + + let num_input_streams = Count::new(); + num_input_streams.add(2); + assert_eq!( + metrics.sum_by_name("num_inputs"), + Some(MetricValue::Count { + name: Borrowed("num_inputs"), + count: num_input_streams + }) + ); + + let num_read_input_streams = Count::new(); + num_read_input_streams.add(2); + assert_eq!( + metrics.sum_by_name("num_read_inputs"), + Some(MetricValue::Count { + name: Borrowed("num_read_inputs"), + count: num_read_input_streams + }) + ); + + let mut saw_start = false; + let mut saw_end = false; + metrics.iter().for_each(|m| match m.value() { + MetricValue::StartTimestamp(ts) => { + saw_start = true; + assert!(nanos_from_timestamp(ts) > 0); + } + MetricValue::EndTimestamp(ts) => { + saw_end = true; + assert!(nanos_from_timestamp(ts) > 0); + } + _ => {} + }); + + assert!(saw_start); + assert!(saw_end); + } + + fn nanos_from_timestamp(ts: &Timestamp) -> i64 { + ts.value().unwrap().timestamp_nanos_opt().unwrap() + } + + #[tokio::test] + async fn test_drop_cancel() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); + + let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2)); + let refs = blocking_exec.refs(); + let progressive_exec = + Arc::new(ProgressiveEvalExec::new(blocking_exec, None, None)); + + let fut = collect(progressive_exec, task_ctx); + let mut fut = fut.boxed(); + + assert_is_pending(&mut fut); + drop(fut); + assert_strong_count_converges_to_zero(refs).await; + + Ok(()) + } + + /// A [`RecordBatchStream`] that is pending forever. + #[derive(Debug)] + pub struct BlockingStream { + /// Schema mocked by this stream. + schema: SchemaRef, + + /// Ref-counting helper to check if the stream are still in memory. + _refs: Arc<()>, + } + + impl Stream for BlockingStream { + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + } + + impl RecordBatchStream for BlockingStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + } +} From d24d3b30d726f41afba95152828ff6326d73ed83 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Mon, 13 May 2024 15:33:07 -0400 Subject: [PATCH 2/4] chore: update test output --- datafusion/sqllogictest/test_files/information_schema.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 6f31973fdb6fb..ced7815933b2a 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -226,6 +226,7 @@ datafusion.optimizer.max_passes 3 datafusion.optimizer.prefer_existing_sort false datafusion.optimizer.prefer_existing_union false datafusion.optimizer.prefer_hash_join true +datafusion.optimizer.progressive_eval_num_prefetch_input_streams 2 datafusion.optimizer.repartition_aggregations true datafusion.optimizer.repartition_file_min_size 10485760 datafusion.optimizer.repartition_file_scans true @@ -307,6 +308,7 @@ datafusion.optimizer.max_passes 3 Number of times that the optimizer will attemp datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory +datafusion.optimizer.progressive_eval_num_prefetch_input_streams 2 Number of input streams to prefect for ProgressiveEvalExec Since ProgressiveEvalExec only polls one stream at a time in their stream order, we do not need to prefetch all streams at once to save resources. However, if the streams' IO time is way more than their CPU/procesing time, prefetching them will help improve the performance. Default is 2 which means we will prefetch one extra stream before polling the current one. Increasing this value if IO time to read a stream is often much more than CPU time to process its previous one. datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning. datafusion.optimizer.repartition_file_scans true When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. From 7e0e0d840c45933f24b35df29c37995bdfc55cc6 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Mon, 13 May 2024 15:35:47 -0400 Subject: [PATCH 3/4] chore: fix typos --- datafusion/common/src/config.rs | 2 +- datafusion/sqllogictest/test_files/information_schema.slt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 62ff3a5ad318b..81c733147ca5c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -585,7 +585,7 @@ config_namespace! { pub prefer_existing_union: bool, default = false - /// Number of input streams to prefect for ProgressiveEvalExec + /// Number of input streams to prefetch for ProgressiveEvalExec /// Since ProgressiveEvalExec only polls one stream at a time in their stream order, /// we do not need to prefetch all streams at once to save resources. However, if the /// streams' IO time is way more than their CPU/procesing time, prefetching them will help diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index ced7815933b2a..6e57d7260acab 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -308,7 +308,7 @@ datafusion.optimizer.max_passes 3 Number of times that the optimizer will attemp datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory -datafusion.optimizer.progressive_eval_num_prefetch_input_streams 2 Number of input streams to prefect for ProgressiveEvalExec Since ProgressiveEvalExec only polls one stream at a time in their stream order, we do not need to prefetch all streams at once to save resources. However, if the streams' IO time is way more than their CPU/procesing time, prefetching them will help improve the performance. Default is 2 which means we will prefetch one extra stream before polling the current one. Increasing this value if IO time to read a stream is often much more than CPU time to process its previous one. +datafusion.optimizer.progressive_eval_num_prefetch_input_streams 2 Number of input streams to prefetch for ProgressiveEvalExec Since ProgressiveEvalExec only polls one stream at a time in their stream order, we do not need to prefetch all streams at once to save resources. However, if the streams' IO time is way more than their CPU/procesing time, prefetching them will help improve the performance. Default is 2 which means we will prefetch one extra stream before polling the current one. Increasing this value if IO time to read a stream is often much more than CPU time to process its previous one. datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning. datafusion.optimizer.repartition_file_scans true When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. From 71efd8ff56246f88c77139f5e0d2a62831d602c6 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Tue, 14 May 2024 10:56:58 -0400 Subject: [PATCH 4/4] chore: update docs --- docs/source/user-guide/configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 0cfd81eff75a8..6e31ad218b996 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -106,6 +106,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition | | datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | | datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | +| datafusion.optimizer.progressive_eval_num_prefetch_input_streams | 2 | Number of input streams to prefetch for ProgressiveEvalExec Since ProgressiveEvalExec only polls one stream at a time in their stream order, we do not need to prefetch all streams at once to save resources. However, if the streams' IO time is way more than their CPU/procesing time, prefetching them will help improve the performance. Default is 2 which means we will prefetch one extra stream before polling the current one. Increasing this value if IO time to read a stream is often much more than CPU time to process its previous one. | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |