Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ where
*reader = Some(reader_local);

// Initialize to the minimal input frontier.
let mut input_frontier = vec![<G::Timestamp as Timestamp>::minimum()];
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());

move |input, output| {

Expand All @@ -589,14 +589,12 @@ where
// and sending smaller bites than we might have otherwise done.

// Assert that the frontier never regresses.
assert!(input.frontier().frontier().iter().all(|t1| input_frontier.iter().any(|t2: &G::Timestamp| t2.less_equal(t1))));

// Test to see if strict progress has occurred (any of the old frontier less equal
// to the new frontier).
let progress = input_frontier.iter().any(|t2| !input.frontier().less_equal(t2));

if progress {
assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));

// Test to see if strict progress has occurred, which happens whenever the new
// frontier isn't equal to the previous. It is only in this case that we have any
// data processing to do.
if prev_frontier.borrow() != input.frontier().frontier() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is right. I was a bit confused about the definition of antichain equality, but it looks like the implementation equates two things that have the same elements but in different orders.

// There are two cases to handle with some care:
//
// 1. If any held capabilities are not in advance of the new input frontier,
Expand Down Expand Up @@ -663,8 +661,8 @@ where
writer.seal(input.frontier().frontier().to_owned());
}

input_frontier.clear();
input_frontier.extend(input.frontier().frontier().iter().cloned());
prev_frontier.clear();
prev_frontier.extend(input.frontier().frontier().iter().cloned());
}

if let Some(mut fuel) = effort.clone() {
Expand Down
26 changes: 14 additions & 12 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ where
*reader = Some(reader_local.clone());

// Tracks the input frontier, used to populate the lower bound of new batches.
let mut input_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());

// For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, Tr::Key, Option<Tr::Val>)>>::new();
Expand All @@ -202,11 +202,13 @@ where
}
});

// Test to see if strict progress has occurred, which happens whenever any element of
// the old frontier is not greater or equal to the new frontier. It is only in this
// case that we have any data processing to do.
let progress = input_frontier.elements().iter().any(|t2| !input.frontier().less_equal(t2));
if progress {
// Assert that the frontier never regresses.
assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));

// Test to see if strict progress has occurred, which happens whenever the new
// frontier isn't equal to the previous. It is only in this case that we have any
// data processing to do.
if prev_frontier.borrow() != input.frontier().frontier() {

// If there is at least one capability not in advance of the input frontier ...
if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
Expand Down Expand Up @@ -293,8 +295,8 @@ where
builder.push(update);
}
}
let batch = builder.done(input_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
input_frontier.clone_from(&upper);
let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
prev_frontier.clone_from(&upper);

// Communicate `batch` to the arrangement and the stream.
writer.insert(batch.clone(), Some(capability.time().clone()));
Expand Down Expand Up @@ -325,12 +327,12 @@ where
}

// Update our view of the input frontier.
input_frontier.clear();
input_frontier.extend(input.frontier().frontier().iter().cloned());
prev_frontier.clear();
prev_frontier.extend(input.frontier().frontier().iter().cloned());

// Downgrade capabilities for `reader_local`.
reader_local.set_logical_compaction(input_frontier.borrow());
reader_local.set_physical_compaction(input_frontier.borrow());
reader_local.set_logical_compaction(prev_frontier.borrow());
reader_local.set_physical_compaction(prev_frontier.borrow());
}

if let Some(mut fuel) = effort.clone() {
Expand Down