From b90209ba50674e2f89ba9205b0eda3f2b806a145 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 26 May 2024 13:22:27 -0400 Subject: [PATCH 1/3] Remove all opinions on OwnedItem --- src/trace/implementations/mod.rs | 48 ++++++++++++-------------------- src/trace/implementations/rhh.rs | 15 ++++++---- 2 files changed, 27 insertions(+), 36 deletions(-) diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index a41f6924c..681e03504 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -90,15 +90,15 @@ pub trait Layout { /// The represented update. type Target: Update + ?Sized; /// Container for update keys. - type KeyContainer: BatchContainer::Key> + PushInto<::Key>; + type KeyContainer: BatchContainer + PushInto<::Key>; /// Container for update vals. - type ValContainer: BatchContainer::Val> + PushInto<::Val>; + type ValContainer: BatchContainer + PushInto<::Val>; /// Container for update vals. type UpdContainer: PushInto<(::Time, ::Diff)> + - for<'a> BatchContainer = &'a (::Time, ::Diff), OwnedItem = (::Time, ::Diff)>; + for<'a> BatchContainer = &'a (::Time, ::Diff)>; /// Container for offsets. - type OffsetContainer: BatchContainer + PushInto; + type OffsetContainer: for<'a> BatchContainer = usize> + PushInto; } /// A layout that uses vectors @@ -142,7 +142,7 @@ where /// Examples include types that implement `Clone` who prefer pub trait PreferredContainer : ToOwned { /// The preferred container for the type. - type Container: BatchContainer + PushInto; + type Container: BatchContainer + PushInto; } impl PreferredContainer for T { @@ -191,7 +191,6 @@ where } use std::convert::TryInto; -use std::ops::Deref; use abomonation_derive::Abomonation; use crate::trace::cursor::IntoOwned; @@ -285,46 +284,33 @@ impl<'a> Iterator for OffsetListIter<'a> { } } -/// Helper struct to provide `IntoOwned` for `Copy` types. -#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] -pub struct Wrapper(T); - -impl Deref for Wrapper { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.0 +impl PushInto for OffsetList { + fn push_into(&mut self, item: usize) { + self.push(item); } } -impl<'a, T: Copy + Ord> IntoOwned<'a> for Wrapper { - type Owned = T; - +impl<'a> IntoOwned<'a> for usize { + type Owned = usize; fn into_owned(self) -> Self::Owned { - self.0 + self } fn clone_onto(&self, other: &mut Self::Owned) { - *other = self.0; + *other = *self; } fn borrow_as(owned: &'a Self::Owned) -> Self { - Self(*owned) - } -} - -impl PushInto for OffsetList { - fn push_into(&mut self, item: usize) { - self.push(item); + *owned } } impl BatchContainer for OffsetList { type OwnedItem = usize; - type ReadItem<'a> = Wrapper; + type ReadItem<'a> = usize; fn copy(&mut self, item: Self::ReadItem<'_>) { - self.push(item.0); + self.push(item); } fn copy_range(&mut self, other: &Self, start: usize, end: usize) { @@ -342,7 +328,7 @@ impl BatchContainer for OffsetList { } fn index(&self, index: usize) -> Self::ReadItem<'_> { - Wrapper(self.index(index)) + self.index(index) } fn len(&self) -> usize { @@ -425,8 +411,10 @@ impl BuilderInput> for TimelyStack<(( <::Container as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = K::Owned>, V: Ord+ToOwned+PreferredContainer + ?Sized, V::Owned: Columnation + Ord+Clone+'static, + for<'a> <::Container as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = V::Owned>, T: Columnation + Ord+Lattice+Timestamp+Clone, R: Columnation + Semigroup+Clone, { diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index e5279cb8b..98f1341c0 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -150,6 +150,7 @@ mod val_batch { impl RhhValStorage where ::Key: Default + HashOrdered, + for<'a> ::ReadItem<'a>: HashOrdered, { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn values_for_key(&self, index: usize) -> (usize, usize) { @@ -181,7 +182,7 @@ mod val_batch { /// If `offset` is specified, we will insert it at the appropriate location. If it is not specified, /// we leave `keys_offs` ready to receive it as the next `push`. This is so that builders that may /// not know the final offset at the moment of key insertion can prepare for receiving the offset. - fn insert_key(&mut self, key: ::Key, offset: Option) { + fn insert_key(&mut self, key: ::ReadItem<'_>, offset: Option) { let desired = self.desired_location(&key); // Were we to push the key now, it would be at `self.keys.len()`, so while that is wrong, // push additional blank entries in. @@ -194,7 +195,7 @@ mod val_batch { // Now we insert the key. Even if it is no longer the desired location because of contention. // If an offset has been supplied we insert it, and otherwise leave it for future determination. - self.keys.push(key); + self.keys.copy(key); if let Some(offset) = offset { self.keys_offs.push(offset); } @@ -330,6 +331,7 @@ mod val_batch { where ::Key: Default + HashOrdered, RhhValBatch: Batch::Time>, + for<'a> ::ReadItem<'a>: HashOrdered, { fn new(batch1: &RhhValBatch, batch2: &RhhValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { @@ -422,6 +424,7 @@ mod val_batch { impl RhhValMerger where ::Key: Default + HashOrdered, + for<'a> ::ReadItem<'a>: HashOrdered, { /// Copy the next key in `source`. /// @@ -445,7 +448,7 @@ mod val_batch { // If we have pushed any values, copy the key as well. if self.result.vals.len() > init_vals { - self.result.insert_key(source.keys.index(cursor).into_owned(), Some(self.result.vals.len())); + self.result.insert_key(source.keys.index(cursor), Some(self.result.vals.len())); } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. @@ -465,7 +468,7 @@ mod val_batch { let (lower1, upper1) = source1.values_for_key(self.key_cursor1); let (lower2, upper2) = source2.values_for_key(self.key_cursor2); if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) { - self.result.insert_key(source1.keys.index(self.key_cursor1).into_owned(), Some(off)); + self.result.insert_key(source1.keys.index(self.key_cursor1), Some(off)); } // Increment cursors in either case; the keys are merged. self.key_cursor1 += 1; @@ -736,9 +739,9 @@ mod val_batch { impl Builder for RhhValBuilder where ::Key: Default + HashOrdered, - // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, CI: for<'a> BuilderInput = ::Key, Time=::Time, Diff=::Diff>, for<'a> L::ValContainer: PushInto>, + for<'a> ::ReadItem<'a>: HashOrdered + IntoOwned<'a, Owned = ::Key>, { type Input = CI; type Time = ::Time; @@ -796,7 +799,7 @@ mod val_batch { self.push_update(time, diff); self.result.vals.push(val); // Insert the key, but with no specified offset. - self.result.insert_key(key, None); + self.result.insert_key(IntoOwned::borrow_as(&key), None); } } } From 35c7628d70371b85c7efc2379a5dcb20602d431c Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 26 May 2024 13:27:17 -0400 Subject: [PATCH 2/3] Remove BatchContainer::OwnedItem --- .../implementations/huffman_container.rs | 1 - src/trace/implementations/mod.rs | 14 +- src/trace/implementations/option_container.rs | 152 ------------------ 3 files changed, 3 insertions(+), 164 deletions(-) delete mode 100644 src/trace/implementations/option_container.rs diff --git a/src/trace/implementations/huffman_container.rs b/src/trace/implementations/huffman_container.rs index 83f20be0e..c3d2496b6 100644 --- a/src/trace/implementations/huffman_container.rs +++ b/src/trace/implementations/huffman_container.rs @@ -50,7 +50,6 @@ impl PushInto> for HuffmanContainer { } impl BatchContainer for HuffmanContainer { - type OwnedItem = Vec; type ReadItem<'a> = Wrapped<'a, B>; fn copy(&mut self, item: Self::ReadItem<'_>) { diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 681e03504..c0c7d1aa5 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -45,7 +45,6 @@ pub mod merge_batcher_col; pub mod ord_neu; pub mod rhh; pub mod huffman_container; -pub mod option_container; // Opinionated takes on default spines. pub use self::ord_neu::OrdValSpine as ValSpine; @@ -306,7 +305,6 @@ impl<'a> IntoOwned<'a> for usize { } impl BatchContainer for OffsetList { - type OwnedItem = usize; type ReadItem<'a> = usize; fn copy(&mut self, item: Self::ReadItem<'_>) { @@ -445,14 +443,11 @@ pub mod containers { use timely::container::PushInto; use std::borrow::ToOwned; - use crate::trace::IntoOwned; /// A general-purpose container resembling `Vec`. pub trait BatchContainer: 'static { - /// An type that all `Self::ReadItem<'_>` can be converted into. - type OwnedItem; /// The type that can be read back out of the container. - type ReadItem<'a>: Copy + IntoOwned<'a, Owned = Self::OwnedItem> + Ord + for<'b> PartialOrd>; + type ReadItem<'a>: Copy + Ord + for<'b> PartialOrd>; /// Push an item into this container fn push(&mut self, item: D) where Self: PushInto { @@ -535,8 +530,7 @@ pub mod containers { // All `T: Clone` also implement `ToOwned`, but without the constraint Rust // struggles to understand why the owned type must be `T` (i.e. the one blanket impl). impl BatchContainer for Vec { - type OwnedItem = T; - type ReadItem<'a> = &'a Self::OwnedItem; + type ReadItem<'a> = &'a T; fn copy(&mut self, item: &T) { self.push(item.clone()); @@ -561,8 +555,7 @@ pub mod containers { // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now // be presented with the actual contained type, rather than a type that borrows into it. impl + 'static> BatchContainer for TimelyStack { - type OwnedItem = T; - type ReadItem<'a> = &'a Self::OwnedItem; + type ReadItem<'a> = &'a T; fn copy(&mut self, item: &T) { self.copy(item); @@ -626,7 +619,6 @@ pub mod containers { where B: Ord + Clone + Sized + 'static, { - type OwnedItem = Vec; type ReadItem<'a> = &'a [B]; fn copy(&mut self, item: Self::ReadItem<'_>) { diff --git a/src/trace/implementations/option_container.rs b/src/trace/implementations/option_container.rs deleted file mode 100644 index abc8ef541..000000000 --- a/src/trace/implementations/option_container.rs +++ /dev/null @@ -1,152 +0,0 @@ -//! A container optimized for identical contents. - -use crate::trace::cursor::IntoOwned; -use crate::trace::implementations::BatchContainer; - -/// A container that effectively represents default values. -/// -/// This container is meant to be a minimal non-trivial container, -/// and may be useful in unifying `OrdVal` and `OrdKey` spines. -pub struct OptionContainer { - /// Number of default items pushed. - defaults: usize, - /// Spill-over for non-empty rows. - container: C, -} - -use timely::container::PushInto; -impl PushInto for OptionContainer -where - C: BatchContainer + PushInto, - C::OwnedItem: Default + Ord, -{ - fn push_into(&mut self, item: C::OwnedItem) { - if item == Default::default() && self.container.is_empty() { - self.defaults += 1; - } - else { - self.container.push(item) - } - } -} - -impl BatchContainer for OptionContainer -where - C: BatchContainer , - C::OwnedItem: Default + Ord, -{ - type OwnedItem = C::OwnedItem; - type ReadItem<'a> = OptionWrapper<'a, C>; - - fn copy<'a>(&mut self, item: Self::ReadItem<'a>) { - if item.eq(&IntoOwned::borrow_as(&Default::default())) && self.container.is_empty() { - self.defaults += 1; - } - else { - if let Some(item) = item.inner { - self.container.copy(item); - } - else { - self.container.copy(IntoOwned::borrow_as(&Default::default())); - } - } - } - fn with_capacity(size: usize) -> Self { - Self { - defaults: 0, - container: C::with_capacity(size), - } - } - fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { - Self { - defaults: 0, - container: C::merge_capacity(&cont1.container, &cont2.container), - } - } - fn index(&self, index: usize) -> Self::ReadItem<'_> { - if index < self.defaults { - OptionWrapper { inner: None } - } - else { - OptionWrapper { inner: Some(self.container.index(index - self.defaults))} - } - } - fn len(&self) -> usize { - self.container.len() + self.defaults - } -} - -/// A read wrapper capable of cheaply representing a default value. -pub struct OptionWrapper<'a, C: BatchContainer> { - inner: Option>, -} - -impl<'a, C: BatchContainer> Copy for OptionWrapper<'a, C> { } -impl<'a, C: BatchContainer> Clone for OptionWrapper<'a, C> { - fn clone(&self) -> Self { *self } -} - - -use std::cmp::Ordering; - -impl<'a, 'b, C: BatchContainer> PartialEq> for OptionWrapper<'b, C> -where - C::OwnedItem: Default + Ord, -{ - fn eq(&self, other: &OptionWrapper<'a, C>) -> bool { - match (&self.inner, &other.inner) { - (None, None) => true, - (None, Some(item2)) => item2.eq(& as IntoOwned>::borrow_as(&Default::default())), - (Some(item1), None) => item1.eq(& as IntoOwned>::borrow_as(&Default::default())), - (Some(item1), Some(item2)) => item1.eq(item2) - } - } -} - -impl<'a, C: BatchContainer> Eq for OptionWrapper<'a, C> where C::OwnedItem: Default + Ord { } - -impl<'a, 'b, C: BatchContainer> PartialOrd> for OptionWrapper<'b, C> where -C::OwnedItem: Default + Ord, -{ - fn partial_cmp(&self, other: &OptionWrapper<'a, C>) -> Option { - let default = Default::default(); - match (&self.inner, &other.inner) { - (None, None) => Some(Ordering::Equal), - (None, Some(item2)) => item2.partial_cmp(&C::ReadItem::<'_>::borrow_as(&default)).map(|x| x.reverse()), - (Some(item1), None) => item1.partial_cmp(&C::ReadItem::<'_>::borrow_as(&default)), - (Some(item1), Some(item2)) => item1.partial_cmp(item2) - } - } -} -impl<'a, C: BatchContainer> Ord for OptionWrapper<'a, C> where -C::OwnedItem: Default + Ord, -{ - fn cmp(&self, other: &Self) -> Ordering { - self.partial_cmp(other).unwrap() - } -} - - -impl<'a, C: BatchContainer> IntoOwned<'a> for OptionWrapper<'a, C> -where - C::OwnedItem : Default + Ord, -{ - type Owned = C::OwnedItem; - - fn into_owned(self) -> Self::Owned { - self.inner.map(|r| r.into_owned()).unwrap_or_else(Default::default) - } - fn clone_onto(&self, other: &mut Self::Owned) { - if let Some(item) = &self.inner { - item.clone_onto(other) - } - else { - *other = Default::default(); - } - } - fn borrow_as(owned: &'a Self::Owned) -> Self { - Self { - inner: Some(IntoOwned::borrow_as(owned)) - } - } -} From b0a818bb69d76dbbd2579d673dbc93db57aa9902 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 26 May 2024 13:38:20 -0400 Subject: [PATCH 3/3] Remove PushInto bounds --- src/trace/implementations/mod.rs | 5 ++-- src/trace/implementations/ord_neu.rs | 40 ++++++++++++++-------------- src/trace/implementations/rhh.rs | 30 ++++++++++----------- 3 files changed, 38 insertions(+), 37 deletions(-) diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index c0c7d1aa5..6a0fe72c9 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -89,15 +89,16 @@ pub trait Layout { /// The represented update. type Target: Update + ?Sized; /// Container for update keys. + // NB: The `PushInto` constraint is only required by `rhh.rs` to push default values. type KeyContainer: BatchContainer + PushInto<::Key>; /// Container for update vals. - type ValContainer: BatchContainer + PushInto<::Val>; + type ValContainer: BatchContainer; /// Container for update vals. type UpdContainer: PushInto<(::Time, ::Diff)> + for<'a> BatchContainer = &'a (::Time, ::Diff)>; /// Container for offsets. - type OffsetContainer: for<'a> BatchContainer = usize> + PushInto; + type OffsetContainer: for<'a> BatchContainer = usize>; } /// A layout that uses vectors diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 01b3d7c8c..4c7fc006a 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -215,9 +215,9 @@ mod val_batch { // Mark explicit types because type inference fails to resolve it. let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; - keys_offs.push(0); + keys_offs.copy(0); let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs; - vals_offs.push(0); + vals_offs.copy(0); OrdValMerger { key_cursor1: 0, @@ -281,7 +281,7 @@ mod val_batch { while lower < upper { self.stash_updates_for_val(source, lower); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); + self.result.vals_offs.copy(off); self.result.vals.copy(source.vals.index(lower)); } lower += 1; @@ -290,7 +290,7 @@ mod val_batch { // If we have pushed any values, copy the key as well. if self.result.vals.len() > init_vals { self.result.keys.copy(source.keys.index(cursor)); - self.result.keys_offs.push(self.result.vals.len()); + self.result.keys_offs.copy(self.result.vals.len()); } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. @@ -310,7 +310,7 @@ mod val_batch { let (lower2, upper2) = source2.values_for_key(self.key_cursor2); if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) { self.result.keys.copy(source1.keys.index(self.key_cursor1)); - self.result.keys_offs.push(off); + self.result.keys_offs.copy(off); } // Increment cursors in either case; the keys are merged. self.key_cursor1 += 1; @@ -343,7 +343,7 @@ mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); + self.result.vals_offs.copy(off); self.result.vals.copy(source1.vals.index(lower1)); } lower1 += 1; @@ -352,7 +352,7 @@ mod val_batch { self.stash_updates_for_val(source1, lower1); self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); + self.result.vals_offs.copy(off); self.result.vals.copy(source1.vals.index(lower1)); } lower1 += 1; @@ -362,7 +362,7 @@ mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); + self.result.vals_offs.copy(off); self.result.vals.copy(source2.vals.index(lower2)); } lower2 += 1; @@ -373,7 +373,7 @@ mod val_batch { while lower1 < upper1 { self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); + self.result.vals_offs.copy(off); self.result.vals.copy(source1.vals.index(lower1)); } lower1 += 1; @@ -381,7 +381,7 @@ mod val_batch { while lower2 < upper2 { self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); + self.result.vals_offs.copy(off); self.result.vals.copy(source2.vals.index(lower2)); } lower2 += 1; @@ -577,16 +577,16 @@ mod val_batch { self.push_update(time, diff); } else { // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); + self.result.vals_offs.copy(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); self.result.vals.push(val); } } else { // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); + self.result.vals_offs.copy(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); + self.result.keys_offs.copy(self.result.vals.len()); self.push_update(time, diff); self.result.vals.push(val); self.result.keys.push(key); @@ -597,10 +597,10 @@ mod val_batch { #[inline(never)] fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { // Record the final offsets - self.result.vals_offs.push(self.result.updates.len()); + self.result.vals_offs.copy(self.result.updates.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); + self.result.keys_offs.copy(self.result.vals.len()); OrdValBatch { updates: self.result.updates.len() + self.singletons, storage: self.result, @@ -749,7 +749,7 @@ mod key_batch { }; let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; - keys_offs.push(0); + keys_offs.copy(0); OrdKeyMerger { key_cursor1: 0, @@ -809,7 +809,7 @@ mod key_batch { fn copy_key(&mut self, source: &OrdKeyStorage, cursor: usize) { self.stash_updates_for_key(source, cursor); if let Some(off) = self.consolidate_updates() { - self.result.keys_offs.push(off); + self.result.keys_offs.copy(off); self.result.keys.copy(source.keys.index(cursor)); } } @@ -829,7 +829,7 @@ mod key_batch { self.stash_updates_for_key(source1, self.key_cursor1); self.stash_updates_for_key(source2, self.key_cursor2); if let Some(off) = self.consolidate_updates() { - self.result.keys_offs.push(off); + self.result.keys_offs.copy(off); self.result.keys.copy(source1.keys.index(self.key_cursor1)); } // Increment cursors in either case; the keys are merged. @@ -1015,7 +1015,7 @@ mod key_batch { self.push_update(time, diff); } else { // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); + self.result.keys_offs.copy(self.result.updates.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); @@ -1027,7 +1027,7 @@ mod key_batch { #[inline(never)] fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { // Record the final offsets - self.result.keys_offs.push(self.result.updates.len()); + self.result.keys_offs.copy(self.result.updates.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } OrdKeyBatch { diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 98f1341c0..168ce761b 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -190,14 +190,14 @@ mod val_batch { // We insert a default (dummy) key and repeat the offset to indicate this. let current_offset = self.keys_offs.index(self.keys.len()).into_owned(); self.keys.push(Default::default()); - self.keys_offs.push(current_offset); + self.keys_offs.copy(current_offset); } // Now we insert the key. Even if it is no longer the desired location because of contention. // If an offset has been supplied we insert it, and otherwise leave it for future determination. self.keys.copy(key); if let Some(offset) = offset { - self.keys_offs.push(offset); + self.keys_offs.copy(offset); } self.key_count += 1; } @@ -363,9 +363,9 @@ mod val_batch { // Mark explicit types because type inference fails to resolve it. let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; - keys_offs.push(0); + keys_offs.copy(0); let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs; - vals_offs.push(0); + vals_offs.copy(0); RhhValMerger { key_cursor1: 0, @@ -440,7 +440,7 @@ mod val_batch { while lower < upper { self.stash_updates_for_val(source, lower); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); + self.result.vals_offs.copy(off); self.result.vals.copy(source.vals.index(lower)); } lower += 1; @@ -501,7 +501,7 @@ mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); + self.result.vals_offs.copy(off); self.result.vals.copy(source1.vals.index(lower1)); } lower1 += 1; @@ -510,7 +510,7 @@ mod val_batch { self.stash_updates_for_val(source1, lower1); self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); + self.result.vals_offs.copy(off); self.result.vals.copy(source1.vals.index(lower1)); } lower1 += 1; @@ -520,7 +520,7 @@ mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); + self.result.vals_offs.copy(off); self.result.vals.copy(source2.vals.index(lower2)); } lower2 += 1; @@ -531,7 +531,7 @@ mod val_batch { while lower1 < upper1 { self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); + self.result.vals_offs.copy(off); self.result.vals.copy(source1.vals.index(lower1)); } lower1 += 1; @@ -539,7 +539,7 @@ mod val_batch { while lower2 < upper2 { self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); + self.result.vals_offs.copy(off); self.result.vals.copy(source2.vals.index(lower2)); } lower2 += 1; @@ -786,16 +786,16 @@ mod val_batch { self.push_update(time, diff); } else { // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); + self.result.vals_offs.copy(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); self.result.vals.push(val); } } else { // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); + self.result.vals_offs.copy(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); + self.result.keys_offs.copy(self.result.vals.len()); self.push_update(time, diff); self.result.vals.push(val); // Insert the key, but with no specified offset. @@ -807,10 +807,10 @@ mod val_batch { #[inline(never)] fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> RhhValBatch { // Record the final offsets - self.result.vals_offs.push(self.result.updates.len()); + self.result.vals_offs.copy(self.result.updates.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); + self.result.keys_offs.copy(self.result.vals.len()); RhhValBatch { updates: self.result.updates.len() + self.singletons, storage: self.result,