Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion src/lattice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,45 @@ implement_lattice!((), ());
/// ```
pub fn antichain_join<T: Lattice>(one: &[T], other: &[T]) -> Antichain<T> {
let mut upper = Antichain::new();
antichain_join_into(one, other, &mut upper);
upper
}

/// Returns the "smallest" minimal antichain "greater or equal" to both inputs.
///
/// This method is primarily meant for cases where one cannot use the methods
/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
/// references rather than owned antichains.
///
/// This function is similar to [antichain_join] but reuses an existing allocation.
/// The provided antichain is cleared before inserting elements.
///
/// # Examples
///
/// ```
/// # extern crate timely;
/// # extern crate differential_dataflow;
/// # use timely::PartialOrder;
/// # use timely::order::Product;
/// # use timely::progress::Antichain;
/// # use differential_dataflow::lattice::Lattice;
/// # use differential_dataflow::lattice::antichain_join_into;
/// # fn main() {
///
/// let mut join = Antichain::new();
/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
/// let f2 = &[Product::new(4, 6)];
/// antichain_join_into(f1, f2, &mut join);
/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]);
/// # }
/// ```
pub fn antichain_join_into<T: Lattice>(one: &[T], other: &[T], upper: &mut Antichain<T>) {
upper.clear();
for time1 in one {
for time2 in other {
upper.insert(time1.join(time2));
}
}
upper
}

/// Returns the "greatest" minimal antichain "less or equal" to both inputs.
Expand Down
17 changes: 11 additions & 6 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ where
queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
logical_compaction: Antichain<Tr::Time>,
physical_compaction: Antichain<Tr::Time>,
temp_antichain: Antichain<Tr::Time>,

operator: ::timely::dataflow::operators::generic::OperatorInfo,
logging: Option<::logging::Logger>,
Expand All @@ -57,19 +58,21 @@ where
fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
// This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
// Instead, it determines the joint consequences of both guarantees and moves forward with that.
let new_frontier = crate::lattice::antichain_join(&self.logical_compaction.borrow()[..], &frontier[..]);
self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), new_frontier.borrow());
self.logical_compaction = new_frontier;
crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow());
::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
self.temp_antichain.clear();
}
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.logical_compaction.borrow()
}
fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
// This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
// Instead, it determines the joint consequences of both guarantees and moves forward with that.
let new_frontier = crate::lattice::antichain_join(&self.physical_compaction.borrow()[..], &frontier[..]);
self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), new_frontier.borrow());
self.physical_compaction = new_frontier;
crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow());
::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
self.temp_antichain.clear();
}
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.physical_compaction.borrow()
Expand Down Expand Up @@ -105,6 +108,7 @@ where
queues: Rc::downgrade(&queues),
logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
temp_antichain: Antichain::new(),
operator,
logging,
};
Expand Down Expand Up @@ -544,6 +548,7 @@ where
physical_compaction: self.physical_compaction.clone(),
operator: self.operator.clone(),
logging: self.logging.clone(),
temp_antichain: Antichain::new(),
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,15 @@ where
Some((CursorList::new(cursors, &storage), storage))
}
fn set_logical_compaction(&mut self, frontier: AntichainRef<T>) {
// TODO: Re-use allocation
self.logical_frontier = frontier.to_owned();
self.logical_frontier.clear();
self.logical_frontier.extend(frontier.iter().cloned());
}
fn get_logical_compaction(&mut self) -> AntichainRef<T> { self.logical_frontier.borrow() }
fn set_physical_compaction(&mut self, frontier: AntichainRef<T>) {
// We should never request to rewind the frontier.
debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier);
self.physical_frontier = frontier.to_owned();
self.physical_frontier.clear();
self.physical_frontier.extend(frontier.iter().cloned());
self.consider_merges();
}
fn get_physical_compaction(&mut self) -> AntichainRef<T> { self.physical_frontier.borrow() }
Expand Down