Skip to content

Commit 7241bd1

Browse files
lewiszlwalamb
andauthored
Support limit in StreamingTableExec (apache#10309)
* Support limit in StreamingTableExec * Update Display and add test * refine limit test * Add unit test for limit * clippy --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent a0fccbf commit 7241bd1

File tree

11 files changed

+197
-6
lines changed

11 files changed

+197
-6
lines changed

datafusion/core/src/datasource/stream.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ impl TableProvider for StreamTable {
242242
_state: &SessionState,
243243
projection: Option<&Vec<usize>>,
244244
_filters: &[Expr],
245-
_limit: Option<usize>,
245+
limit: Option<usize>,
246246
) -> Result<Arc<dyn ExecutionPlan>> {
247247
let projected_schema = match projection {
248248
Some(p) => {
@@ -258,6 +258,7 @@ impl TableProvider for StreamTable {
258258
projection,
259259
projected_schema,
260260
true,
261+
limit,
261262
)?))
262263
}
263264

datafusion/core/src/datasource/streaming.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,15 @@ impl TableProvider for StreamingTable {
8888
_state: &SessionState,
8989
projection: Option<&Vec<usize>>,
9090
_filters: &[Expr],
91-
_limit: Option<usize>,
91+
limit: Option<usize>,
9292
) -> Result<Arc<dyn ExecutionPlan>> {
93-
// TODO: push limit down
9493
Ok(Arc::new(StreamingTableExec::try_new(
9594
self.schema.clone(),
9695
self.partitions.clone(),
9796
projection,
9897
None,
9998
self.infinite,
99+
limit,
100100
)?))
101101
}
102102
}

datafusion/core/src/physical_optimizer/projection_pushdown.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ fn try_swapping_with_streaming_table(
259259
Some(new_projections.as_ref()),
260260
lex_orderings,
261261
streaming_table.is_infinite(),
262+
streaming_table.limit(),
262263
)
263264
.map(|e| Some(Arc::new(e) as _))
264265
}
@@ -1860,6 +1861,7 @@ mod tests {
18601861
]
18611862
.into_iter(),
18621863
true,
1864+
None,
18631865
)?;
18641866
let projection = Arc::new(ProjectionExec::try_new(
18651867
vec![

datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,6 +1475,7 @@ mod tests {
14751475
Some(&projection),
14761476
vec![sort_exprs],
14771477
true,
1478+
None,
14781479
)
14791480
.unwrap(),
14801481
)

datafusion/core/src/test/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ pub fn stream_exec_ordered(
331331
None,
332332
vec![sort_exprs],
333333
true,
334+
None,
334335
)
335336
.unwrap(),
336337
)

datafusion/physical-plan/src/limit.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ impl ExecutionPlan for LocalLimitExec {
431431
}
432432

433433
/// A Limit stream skips `skip` rows, and then fetch up to `fetch` rows.
434-
struct LimitStream {
434+
pub struct LimitStream {
435435
/// The remaining number of rows to skip
436436
skip: usize,
437437
/// The remaining number of rows to produce
@@ -446,7 +446,7 @@ struct LimitStream {
446446
}
447447

448448
impl LimitStream {
449-
fn new(
449+
pub fn new(
450450
input: SendableRecordBatchStream,
451451
skip: usize,
452452
fetch: Option<usize>,

datafusion/physical-plan/src/streaming.rs

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ use datafusion_common::{internal_err, plan_err, Result};
3131
use datafusion_execution::TaskContext;
3232
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
3333

34+
use crate::limit::LimitStream;
35+
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
3436
use async_trait::async_trait;
3537
use futures::stream::StreamExt;
3638
use log::debug;
@@ -58,7 +60,9 @@ pub struct StreamingTableExec {
5860
projected_schema: SchemaRef,
5961
projected_output_ordering: Vec<LexOrdering>,
6062
infinite: bool,
63+
limit: Option<usize>,
6164
cache: PlanProperties,
65+
metrics: ExecutionPlanMetricsSet,
6266
}
6367

6468
impl StreamingTableExec {
@@ -69,6 +73,7 @@ impl StreamingTableExec {
6973
projection: Option<&Vec<usize>>,
7074
projected_output_ordering: impl IntoIterator<Item = LexOrdering>,
7175
infinite: bool,
76+
limit: Option<usize>,
7277
) -> Result<Self> {
7378
for x in partitions.iter() {
7479
let partition_schema = x.schema();
@@ -99,7 +104,9 @@ impl StreamingTableExec {
99104
projection: projection.cloned().map(Into::into),
100105
projected_output_ordering,
101106
infinite,
107+
limit,
102108
cache,
109+
metrics: ExecutionPlanMetricsSet::new(),
103110
})
104111
}
105112

@@ -127,6 +134,10 @@ impl StreamingTableExec {
127134
self.infinite
128135
}
129136

137+
pub fn limit(&self) -> Option<usize> {
138+
self.limit
139+
}
140+
130141
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
131142
fn compute_properties(
132143
schema: SchemaRef,
@@ -180,6 +191,9 @@ impl DisplayAs for StreamingTableExec {
180191
if self.infinite {
181192
write!(f, ", infinite_source=true")?;
182193
}
194+
if let Some(fetch) = self.limit {
195+
write!(f, ", fetch={fetch}")?;
196+
}
183197

184198
display_orderings(f, &self.projected_output_ordering)?;
185199

@@ -224,14 +238,116 @@ impl ExecutionPlan for StreamingTableExec {
224238
ctx: Arc<TaskContext>,
225239
) -> Result<SendableRecordBatchStream> {
226240
let stream = self.partitions[partition].execute(ctx);
227-
Ok(match self.projection.clone() {
241+
let projected_stream = match self.projection.clone() {
228242
Some(projection) => Box::pin(RecordBatchStreamAdapter::new(
229243
self.projected_schema.clone(),
230244
stream.map(move |x| {
231245
x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into))
232246
}),
233247
)),
234248
None => stream,
249+
};
250+
Ok(match self.limit {
251+
None => projected_stream,
252+
Some(fetch) => {
253+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
254+
Box::pin(LimitStream::new(
255+
projected_stream,
256+
0,
257+
Some(fetch),
258+
baseline_metrics,
259+
))
260+
}
235261
})
236262
}
263+
264+
fn metrics(&self) -> Option<MetricsSet> {
265+
Some(self.metrics.clone_inner())
266+
}
267+
}
268+
269+
#[cfg(test)]
270+
mod test {
271+
use super::*;
272+
use crate::collect_partitioned;
273+
use crate::streaming::PartitionStream;
274+
use crate::test::{make_partition, TestPartitionStream};
275+
use arrow::record_batch::RecordBatch;
276+
277+
#[tokio::test]
278+
async fn test_no_limit() {
279+
let exec = TestBuilder::new()
280+
// make 2 batches, each with 100 rows
281+
.with_batches(vec![make_partition(100), make_partition(100)])
282+
.build();
283+
284+
let counts = collect_num_rows(Arc::new(exec)).await;
285+
assert_eq!(counts, vec![200]);
286+
}
287+
288+
#[tokio::test]
289+
async fn test_limit() {
290+
let exec = TestBuilder::new()
291+
// make 2 batches, each with 100 rows
292+
.with_batches(vec![make_partition(100), make_partition(100)])
293+
// limit to only the first 75 rows back
294+
.with_limit(Some(75))
295+
.build();
296+
297+
let counts = collect_num_rows(Arc::new(exec)).await;
298+
assert_eq!(counts, vec![75]);
299+
}
300+
301+
/// Runs the provided execution plan and returns a vector of the number of
302+
/// rows in each partition
303+
async fn collect_num_rows(exec: Arc<dyn ExecutionPlan>) -> Vec<usize> {
304+
let ctx = Arc::new(TaskContext::default());
305+
let partition_batches = collect_partitioned(exec, ctx).await.unwrap();
306+
partition_batches
307+
.into_iter()
308+
.map(|batches| batches.iter().map(|b| b.num_rows()).sum::<usize>())
309+
.collect()
310+
}
311+
312+
#[derive(Default)]
313+
struct TestBuilder {
314+
schema: Option<SchemaRef>,
315+
partitions: Vec<Arc<dyn PartitionStream>>,
316+
projection: Option<Vec<usize>>,
317+
projected_output_ordering: Vec<LexOrdering>,
318+
infinite: bool,
319+
limit: Option<usize>,
320+
}
321+
322+
impl TestBuilder {
323+
fn new() -> Self {
324+
Self::default()
325+
}
326+
327+
/// Set the batches for the stream
328+
fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
329+
let stream = TestPartitionStream::new_with_batches(batches);
330+
self.schema = Some(stream.schema().clone());
331+
self.partitions = vec![Arc::new(stream)];
332+
self
333+
}
334+
335+
/// Set the limit for the stream
336+
fn with_limit(mut self, limit: Option<usize>) -> Self {
337+
self.limit = limit;
338+
self
339+
}
340+
341+
fn build(self) -> StreamingTableExec {
342+
StreamingTableExec::try_new(
343+
self.schema.unwrap(),
344+
self.partitions,
345+
self.projection.as_ref(),
346+
self.projected_output_ordering,
347+
self.infinite,
348+
self.limit,
349+
)
350+
.unwrap()
351+
}
352+
}
237353
}

datafusion/physical-plan/src/test.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@ use std::sync::Arc;
2323

2424
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
2525
use arrow_schema::{DataType, Field, Schema, SchemaRef};
26+
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
2627
use futures::{Future, FutureExt};
2728

2829
use crate::memory::MemoryExec;
30+
use crate::stream::RecordBatchStreamAdapter;
31+
use crate::streaming::PartitionStream;
2932
use crate::ExecutionPlan;
3033

3134
pub mod exec;
@@ -121,3 +124,26 @@ pub fn mem_exec(partitions: usize) -> MemoryExec {
121124
let projection = None;
122125
MemoryExec::try_new(&data, schema, projection).unwrap()
123126
}
127+
128+
// construct a stream partition for test purposes
129+
pub struct TestPartitionStream {
130+
pub schema: SchemaRef,
131+
pub batches: Vec<RecordBatch>,
132+
}
133+
134+
impl TestPartitionStream {
135+
/// Create a new stream partition with the provided batches
136+
pub fn new_with_batches(batches: Vec<RecordBatch>) -> Self {
137+
let schema = batches[0].schema();
138+
Self { schema, batches }
139+
}
140+
}
141+
impl PartitionStream for TestPartitionStream {
142+
fn schema(&self) -> &SchemaRef {
143+
&self.schema
144+
}
145+
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
146+
let stream = futures::stream::iter(self.batches.clone().into_iter().map(Ok));
147+
Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream))
148+
}
149+
}

datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,6 +1512,7 @@ mod tests {
15121512
None,
15131513
orderings,
15141514
is_infinite,
1515+
None,
15151516
)?) as _;
15161517
Ok(source)
15171518
}

datafusion/physical-plan/src/windows/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,7 @@ mod tests {
624624
None,
625625
Some(sort_exprs),
626626
infinite_source,
627+
None,
627628
)?))
628629
}
629630

0 commit comments

Comments
 (0)