From 0e95080429c485b1f603d11e2f7781f9f3d1281f Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Thu, 1 Jul 2021 15:07:09 +0200 Subject: [PATCH] use PartialOrder implementation where appropriate These two places had a copy pasted implementation of the PartialOrder logic so this replaces it with calling the appropriate trait implementations Signed-off-by: Petros Angelatos --- src/operators/arrange/arrangement.rs | 18 ++++++++---------- src/operators/arrange/upsert.rs | 26 ++++++++++++++------------ 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 953d6706c..09a48e248 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -569,7 +569,7 @@ where *reader = Some(reader_local); // Initialize to the minimal input frontier. - let mut input_frontier = vec![::minimum()]; + let mut prev_frontier = Antichain::from_elem(::minimum()); move |input, output| { @@ -589,14 +589,12 @@ where // and sending smaller bites than we might have otherwise done. // Assert that the frontier never regresses. - assert!(input.frontier().frontier().iter().all(|t1| input_frontier.iter().any(|t2: &G::Timestamp| t2.less_equal(t1)))); - - // Test to see if strict progress has occurred (any of the old frontier less equal - // to the new frontier). - let progress = input_frontier.iter().any(|t2| !input.frontier().less_equal(t2)); - - if progress { + assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier())); + // Test to see if strict progress has occurred, which happens whenever the new + // frontier isn't equal to the previous. It is only in this case that we have any + // data processing to do. + if prev_frontier.borrow() != input.frontier().frontier() { // There are two cases to handle with some care: // // 1. If any held capabilities are not in advance of the new input frontier, @@ -663,8 +661,8 @@ where writer.seal(input.frontier().frontier().to_owned()); } - input_frontier.clear(); - input_frontier.extend(input.frontier().frontier().iter().cloned()); + prev_frontier.clear(); + prev_frontier.extend(input.frontier().frontier().iter().cloned()); } if let Some(mut fuel) = effort.clone() { diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index fd36934c7..978d9596f 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -185,7 +185,7 @@ where *reader = Some(reader_local.clone()); // Tracks the input frontier, used to populate the lower bound of new batches. - let mut input_frontier = Antichain::from_elem(::minimum()); + let mut prev_frontier = Antichain::from_elem(::minimum()); // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap). let mut priority_queue = BinaryHeap::)>>::new(); @@ -202,11 +202,13 @@ where } }); - // Test to see if strict progress has occurred, which happens whenever any element of - // the old frontier is not greater or equal to the new frontier. It is only in this - // case that we have any data processing to do. - let progress = input_frontier.elements().iter().any(|t2| !input.frontier().less_equal(t2)); - if progress { + // Assert that the frontier never regresses. + assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier())); + + // Test to see if strict progress has occurred, which happens whenever the new + // frontier isn't equal to the previous. It is only in this case that we have any + // data processing to do. + if prev_frontier.borrow() != input.frontier().frontier() { // If there is at least one capability not in advance of the input frontier ... if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) { @@ -293,8 +295,8 @@ where builder.push(update); } } - let batch = builder.done(input_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); - input_frontier.clone_from(&upper); + let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); + prev_frontier.clone_from(&upper); // Communicate `batch` to the arrangement and the stream. writer.insert(batch.clone(), Some(capability.time().clone())); @@ -325,12 +327,12 @@ where } // Update our view of the input frontier. - input_frontier.clear(); - input_frontier.extend(input.frontier().frontier().iter().cloned()); + prev_frontier.clear(); + prev_frontier.extend(input.frontier().frontier().iter().cloned()); // Downgrade capabilities for `reader_local`. - reader_local.set_logical_compaction(input_frontier.borrow()); - reader_local.set_physical_compaction(input_frontier.borrow()); + reader_local.set_logical_compaction(prev_frontier.borrow()); + reader_local.set_physical_compaction(prev_frontier.borrow()); } if let Some(mut fuel) = effort.clone() {