From a3453dd4c76d2b43252eb78372d4f3b2377d9853 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 10 Sep 2021 13:15:12 +0200 Subject: [PATCH 1/4] antichain_join_into: Same as antichain_join, permits allocation reuse Signed-off-by: Moritz Hoffmann --- src/lattice.rs | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/src/lattice.rs b/src/lattice.rs index 69428d2f2..2b27fd637 100644 --- a/src/lattice.rs +++ b/src/lattice.rs @@ -222,12 +222,45 @@ implement_lattice!((), ()); /// ``` pub fn antichain_join(one: &[T], other: &[T]) -> Antichain { let mut upper = Antichain::new(); + antichain_join_into(one, other, &mut upper); + upper +} + +/// Returns the "smallest" minimal antichain "greater or equal" to both inputs. +/// +/// This method is primarily meant for cases where one cannot use the methods +/// of `Antichain`'s `PartialOrder` implementation, such as when one has only +/// references rather than owned antichains. +/// +/// This function is similar to [antichain_join] but reuses an existing allocation. +/// The provided antichain is cleared before inserting elements. +/// +/// # Examples +/// +/// ``` +/// # extern crate timely; +/// # extern crate differential_dataflow; +/// # use timely::PartialOrder; +/// # use timely::order::Product; +/// # use timely::progress::Antichain; +/// # use differential_dataflow::lattice::Lattice; +/// # use differential_dataflow::lattice::antichain_join_into; +/// # fn main() { +/// +/// let mut join = Antichain::new(); +/// let f1 = &[Product::new(3, 7), Product::new(5, 6)]; +/// let f2 = &[Product::new(4, 6)]; +/// antichain_join_into(f1, f2, &mut join); +/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]); +/// # } +/// ``` +pub fn antichain_join_into(one: &[T], other: &[T], upper: &mut Antichain) { + upper.clear(); for time1 in one { for time2 in other { upper.insert(time1.join(time2)); } } - upper } /// Returns the "greatest" minimal antichain "less or equal" to both inputs. From 0b61402ff87d6fc416e59dc564e020cefc90ae44 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 10 Sep 2021 13:16:27 +0200 Subject: [PATCH 2/4] spine/set_physical_compaction: Reuse allocation Signed-off-by: Moritz Hoffmann --- src/trace/implementations/spine_fueled.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index b2f3b01e5..697e97818 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -219,7 +219,8 @@ where fn set_physical_compaction(&mut self, frontier: AntichainRef) { // We should never request to rewind the frontier. debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier); - self.physical_frontier = frontier.to_owned(); + self.physical_frontier.clear(); + self.physical_frontier.extend(frontier.iter().cloned()); self.consider_merges(); } fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_frontier.borrow() } From dc49c3340eb1cfd5c48439ce2fc288ce03ff27f7 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 10 Sep 2021 13:18:55 +0200 Subject: [PATCH 3/4] agent: Avoid temporary allocations by keeping a temporary buffer This commit changes the TraceAgent to keep a buffer for temporary antichains around, which is used for setting the logical and physical compaction. The upside is that it avoids temporary allocations, but on the downside it keeps another antichain around. This should be fine, unless there is a defective use case where the size of the antichain grows significantly and the potentially unused memory will not be reclaimed anymore. Signed-off-by: Moritz Hoffmann --- src/operators/arrange/agent.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index e2f72ed50..13c3b02cd 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -36,6 +36,7 @@ where queues: Weak>>>, logical_compaction: Antichain, physical_compaction: Antichain, + temp_antichain: Antichain, operator: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>, @@ -57,9 +58,10 @@ where fn set_logical_compaction(&mut self, frontier: AntichainRef) { // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`. // Instead, it determines the joint consequences of both guarantees and moves forward with that. - let new_frontier = crate::lattice::antichain_join(&self.logical_compaction.borrow()[..], &frontier[..]); - self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), new_frontier.borrow()); - self.logical_compaction = new_frontier; + crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain); + self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow()); + ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain); + self.temp_antichain.clear(); } fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_compaction.borrow() @@ -67,9 +69,10 @@ where fn set_physical_compaction(&mut self, frontier: AntichainRef) { // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`. // Instead, it determines the joint consequences of both guarantees and moves forward with that. - let new_frontier = crate::lattice::antichain_join(&self.physical_compaction.borrow()[..], &frontier[..]); - self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), new_frontier.borrow()); - self.physical_compaction = new_frontier; + crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain); + self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow()); + ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain); + self.temp_antichain.clear(); } fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_compaction.borrow() @@ -105,6 +108,7 @@ where queues: Rc::downgrade(&queues), logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(), physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(), + temp_antichain: Antichain::new(), operator, logging, }; @@ -544,6 +548,7 @@ where physical_compaction: self.physical_compaction.clone(), operator: self.operator.clone(), logging: self.logging.clone(), + temp_antichain: Antichain::new(), } } } From ca257effea26f43ec73abd22d9196132f366d2ad Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 10 Sep 2021 15:06:07 +0200 Subject: [PATCH 4/4] spine/set_logical_compaction: Reuse frontier allocation Signed-off-by: Moritz Hoffmann --- src/trace/implementations/spine_fueled.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 697e97818..8624247e6 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -212,8 +212,8 @@ where Some((CursorList::new(cursors, &storage), storage)) } fn set_logical_compaction(&mut self, frontier: AntichainRef) { - // TODO: Re-use allocation - self.logical_frontier = frontier.to_owned(); + self.logical_frontier.clear(); + self.logical_frontier.extend(frontier.iter().cloned()); } fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_frontier.borrow() } fn set_physical_compaction(&mut self, frontier: AntichainRef) {