diff --git a/src/lattice.rs b/src/lattice.rs index 69428d2f2..2b27fd637 100644 --- a/src/lattice.rs +++ b/src/lattice.rs @@ -222,12 +222,45 @@ implement_lattice!((), ()); /// ``` pub fn antichain_join(one: &[T], other: &[T]) -> Antichain { let mut upper = Antichain::new(); + antichain_join_into(one, other, &mut upper); + upper +} + +/// Returns the "smallest" minimal antichain "greater or equal" to both inputs. +/// +/// This method is primarily meant for cases where one cannot use the methods +/// of `Antichain`'s `PartialOrder` implementation, such as when one has only +/// references rather than owned antichains. +/// +/// This function is similar to [antichain_join] but reuses an existing allocation. +/// The provided antichain is cleared before inserting elements. +/// +/// # Examples +/// +/// ``` +/// # extern crate timely; +/// # extern crate differential_dataflow; +/// # use timely::PartialOrder; +/// # use timely::order::Product; +/// # use timely::progress::Antichain; +/// # use differential_dataflow::lattice::Lattice; +/// # use differential_dataflow::lattice::antichain_join_into; +/// # fn main() { +/// +/// let mut join = Antichain::new(); +/// let f1 = &[Product::new(3, 7), Product::new(5, 6)]; +/// let f2 = &[Product::new(4, 6)]; +/// antichain_join_into(f1, f2, &mut join); +/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]); +/// # } +/// ``` +pub fn antichain_join_into(one: &[T], other: &[T], upper: &mut Antichain) { + upper.clear(); for time1 in one { for time2 in other { upper.insert(time1.join(time2)); } } - upper } /// Returns the "greatest" minimal antichain "less or equal" to both inputs. diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index e2f72ed50..13c3b02cd 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -36,6 +36,7 @@ where queues: Weak>>>, logical_compaction: Antichain, physical_compaction: Antichain, + temp_antichain: Antichain, operator: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>, @@ -57,9 +58,10 @@ where fn set_logical_compaction(&mut self, frontier: AntichainRef) { // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`. // Instead, it determines the joint consequences of both guarantees and moves forward with that. - let new_frontier = crate::lattice::antichain_join(&self.logical_compaction.borrow()[..], &frontier[..]); - self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), new_frontier.borrow()); - self.logical_compaction = new_frontier; + crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain); + self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow()); + ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain); + self.temp_antichain.clear(); } fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_compaction.borrow() @@ -67,9 +69,10 @@ where fn set_physical_compaction(&mut self, frontier: AntichainRef) { // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`. // Instead, it determines the joint consequences of both guarantees and moves forward with that. - let new_frontier = crate::lattice::antichain_join(&self.physical_compaction.borrow()[..], &frontier[..]); - self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), new_frontier.borrow()); - self.physical_compaction = new_frontier; + crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain); + self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow()); + ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain); + self.temp_antichain.clear(); } fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_compaction.borrow() @@ -105,6 +108,7 @@ where queues: Rc::downgrade(&queues), logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(), physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(), + temp_antichain: Antichain::new(), operator, logging, }; @@ -544,6 +548,7 @@ where physical_compaction: self.physical_compaction.clone(), operator: self.operator.clone(), logging: self.logging.clone(), + temp_antichain: Antichain::new(), } } } diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index b2f3b01e5..8624247e6 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -212,14 +212,15 @@ where Some((CursorList::new(cursors, &storage), storage)) } fn set_logical_compaction(&mut self, frontier: AntichainRef) { - // TODO: Re-use allocation - self.logical_frontier = frontier.to_owned(); + self.logical_frontier.clear(); + self.logical_frontier.extend(frontier.iter().cloned()); } fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_frontier.borrow() } fn set_physical_compaction(&mut self, frontier: AntichainRef) { // We should never request to rewind the frontier. debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier); - self.physical_frontier = frontier.to_owned(); + self.physical_frontier.clear(); + self.physical_frontier.extend(frontier.iter().cloned()); self.consider_merges(); } fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_frontier.borrow() }