diff --git a/examples/columnation.rs b/examples/columnation.rs new file mode 100644 index 000000000..fcc1238b9 --- /dev/null +++ b/examples/columnation.rs @@ -0,0 +1,101 @@ +extern crate timely; +extern crate differential_dataflow; + +use timely::dataflow::operators::probe::Handle; + +use differential_dataflow::input::Input; + +fn main() { + + let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap(); + let size: usize = std::env::args().nth(2).unwrap().parse().unwrap(); + + let mode = std::env::args().any(|a| a == "new"); + + if mode { + println!("Running NEW arrangement"); + } + else { + println!("Running OLD arrangement"); + } + + let timer1 = ::std::time::Instant::now(); + let timer2 = timer1.clone(); + + // define a new computational scope, in which to run BFS + timely::execute_from_args(std::env::args(), move |worker| { + + // define BFS dataflow; return handles to roots and edges inputs + let mut probe = Handle::new(); + let (mut data_input, mut keys_input) = worker.dataflow(|scope| { + + use differential_dataflow::operators::{arrange::Arrange, JoinCore}; + use differential_dataflow::trace::implementations::ord::{OrdKeySpine, ColKeySpine}; + + let (data_input, data) = scope.new_collection::(); + let (keys_input, keys) = scope.new_collection::(); + + if mode { + let data = data.arrange::>(); + let keys = keys.arrange::>(); + keys.join_core(&data, |_k, &(), &()| Option::<()>::None) + .probe_with(&mut probe); + } + else { + let data = data.arrange::>(); + let keys = keys.arrange::>(); + keys.join_core(&data, |_k, &(), &()| Option::<()>::None) + .probe_with(&mut probe); + } + + (data_input, keys_input) + }); + + // Load up data in batches. + let mut counter = 0; + while counter < 10 * keys { + let mut i = worker.index(); + while i < size { + let val = (counter + i) % keys; + data_input.insert(format!("{:?}", val)); + i += worker.peers(); + } + counter += size; + data_input.advance_to(data_input.time() + 1); + data_input.flush(); + keys_input.advance_to(keys_input.time() + 1); + keys_input.flush(); + while probe.less_than(data_input.time()) { + worker.step(); + } + } + println!("{:?}\tloading complete", timer1.elapsed()); + + let mut queries = 0; + + while queries < 10 * keys { + let mut i = worker.index(); + while i < size { + let val = (queries + i) % keys; + keys_input.insert(format!("{:?}", val)); + i += worker.peers(); + } + queries += size; + data_input.advance_to(data_input.time() + 1); + data_input.flush(); + keys_input.advance_to(keys_input.time() + 1); + keys_input.flush(); + while probe.less_than(data_input.time()) { + worker.step(); + } + } + + println!("{:?}\tqueries complete", timer1.elapsed()); + + // loop { } + + }).unwrap(); + + println!("{:?}\tshut down", timer2.elapsed()); + +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 4bc4a6a50..2ff81e8b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,6 +71,8 @@ //! of the new and old counts of the old and new degrees of the affected node). #![forbid(missing_docs)] +#![allow(array_into_iter)] + use std::fmt::Debug; diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index d452c7113..34988adb7 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -542,9 +542,6 @@ where // Capabilities for the lower envelope of updates in `batcher`. let mut capabilities = Antichain::>::new(); - let mut buffer = Vec::new(); - - let (activator, effort) = if let Some(effort) = self.inner.scope().config().get::("differential/idle_merge_effort").cloned() { (Some(self.scope().activator_for(&info.address[..])), Some(effort)) @@ -569,8 +566,7 @@ where input.for_each(|cap, data| { capabilities.insert(cap.retain()); - data.swap(&mut buffer); - batcher.push_batch(&mut buffer); + batcher.push_batch(data); }); // The frontier may have advanced by multiple elements, which is an issue because diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index f1f2264ff..83f4f63f9 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -1,5 +1,6 @@ //! A general purpose `Batcher` implementation based on radix sort. +use timely::communication::message::RefOrMut; use timely::progress::frontier::Antichain; use ::difference::Semigroup; @@ -33,8 +34,20 @@ where } #[inline(never)] - fn push_batch(&mut self, batch: &mut Vec<((B::Key,B::Val),B::Time,B::R)>) { - self.sorter.push(batch); + fn push_batch(&mut self, batch: RefOrMut>) { + // `batch` is either a shared reference or an owned allocations. + match batch { + RefOrMut::Ref(reference) => { + // This is a moment at which we could capture the allocations backing + // `batch` into a different form of region, rather than just cloning. + let mut owned: Vec<_> = self.sorter.empty(); + owned.clone_from(reference); + self.sorter.push(&mut owned); + }, + RefOrMut::Mut(reference) => { + self.sorter.push(reference); + } + } } // Sealing a batch means finding those updates with times not greater or equal to any time @@ -58,7 +71,6 @@ where for mut buffer in merged.drain(..) { for ((key, val), time, diff) in buffer.drain(..) { if upper.less_equal(&time) { - // keep_count += 1; self.frontier.insert(time.clone()); if keep.len() == keep.capacity() { if keep.len() > 0 { @@ -69,7 +81,6 @@ where keep.push(((key, val), time, diff)); } else { - // seal_count += 1; builder.push((key, val, time, diff)); } } diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index a7c4fcae1..5c995e048 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -12,13 +12,16 @@ use std::rc::Rc; use std::convert::{TryFrom, TryInto}; use std::marker::PhantomData; use std::fmt::Debug; +use std::ops::Deref; +use timely::container::columnation::TimelyStack; +use timely::container::columnation::Columnation; use timely::progress::{Antichain, frontier::AntichainRef}; use ::difference::Semigroup; use lattice::Lattice; -use trace::layers::{Trie, TupleBuilder}; +use trace::layers::{Trie, TupleBuilder, BatchContainer}; use trace::layers::Builder as TrieBuilder; use trace::layers::Cursor as TrieCursor; use trace::layers::ordered::{OrdOffset, OrderedLayer, OrderedBuilder, OrderedCursor}; @@ -46,67 +49,111 @@ pub type OrdKeySpine = Spine>>; /// A trace implementation for empty values using a spine of abomonated ordered lists. pub type OrdKeySpineAbom = Spine, Vec>>>; +/// A trace implementation backed by columnar storage. +pub type ColValSpine = Spine, TimelyStack>>>; +/// A trace implementation backed by columnar storage. +pub type ColKeySpine = Spine>>>; + + +/// A container that can retain/discard from some offset onward. +pub trait RetainFrom { + /// Retains elements from an index onwards that satisfy a predicate. + fn retain_frombool>(&mut self, index: usize, predicate: P); +} + +impl RetainFrom for Vec { + fn retain_frombool>(&mut self, index: usize, mut predicate: P) { + let mut write_position = index; + for position in index .. self.len() { + if predicate(position, &self[position]) { + self.swap(position, write_position); + write_position += 1; + } + } + self.truncate(write_position); + } +} + +impl RetainFrom for TimelyStack { + fn retain_frombool>(&mut self, index: usize, mut predicate: P) { + let mut position = index; + self.retain_from(index, |item| { + let result = predicate(position, item); + position += 1; + result + }) + } +} /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Debug, Abomonation)] -pub struct OrdValBatch +pub struct OrdValBatch, CV=Vec> where - K: Ord, - V: Ord, - T: Lattice, - O: OrdOffset, >::Error: Debug, >::Error: Debug + K: Ord+Clone, + V: Ord+Clone, + T: Clone+Lattice, + R: Clone, + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, + CV: BatchContainer+Deref+RetainFrom, { /// Where all the dataz is. - pub layer: OrderedLayer, O>, O>, + pub layer: OrderedLayer, O, CV>, O, CK>, /// Description of the update times this layer represents. pub desc: Description, } -impl BatchReader for OrdValBatch +impl BatchReader for OrdValBatch where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, + CV: BatchContainer+Deref+RetainFrom, { type Key = K; type Val = V; type Time = T; type R = R; - type Cursor = OrdValCursor; + type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { OrdValCursor { cursor: self.layer.cursor(), phantom: std::marker::PhantomData } } - fn len(&self) -> usize { , O>, O> as Trie>::tuples(&self.layer) } + fn len(&self) -> usize { , O, CV>, O, CK> as Trie>::tuples(&self.layer) } fn description(&self) -> &Description { &self.desc } } -impl Batch for OrdValBatch +impl Batch for OrdValBatch where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+timely::progress::Timestamp+Ord+Clone+::std::fmt::Debug+'static, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, + CV: BatchContainer+Deref+RetainFrom, { type Batcher = MergeBatcher; - type Builder = OrdValBuilder; - type Merger = OrdValMerger; + type Builder = OrdValBuilder; + type Merger = OrdValMerger; fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { OrdValMerger::new(self, other, compaction_frontier) } } -impl OrdValBatch +impl OrdValBatch where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+::std::fmt::Debug+'static, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, + CV: BatchContainer+Deref+RetainFrom, { - fn advance_builder_from(layer: &mut OrderedBuilder, O>, O>, frontier: AntichainRef, key_pos: usize) { + fn advance_builder_from(layer: &mut OrderedBuilder, O, CV>, O, CK>, frontier: AntichainRef, key_pos: usize) { let key_start = key_pos; let val_start: usize = layer.offs[key_pos].try_into().unwrap(); @@ -151,62 +198,59 @@ where layer.vals.vals.vals.truncate(write_position); layer.vals.offs[layer.vals.keys.len()] = O::try_from(write_position).unwrap(); - // 3. For each `(key, off)` pair, (values already sorted), filter vals, and rewrite `off`. - // This may leave `key` with an empty range. Filtering happens in step 4. + // 3. Remove values with empty histories. In addition, we need to update offsets + // in `layer.offs` to correctly reference the potentially moved values. let mut write_position = val_start; - for i in key_start .. layer.keys.len() { - - // NB: batch.layer.offs[i+1] must remain as is for the next iteration. - // instead, we update batch.layer.offs[i] - - let lower: usize = layer.offs[i].try_into().unwrap(); - let upper: usize = layer.offs[i+1].try_into().unwrap(); - - layer.offs[i] = O::try_from(write_position).unwrap(); - - // values should already be sorted, but some might now be empty. - for index in lower .. upper { - let val_lower: usize = layer.vals.offs[index].try_into().unwrap(); - let val_upper: usize = layer.vals.offs[index+1].try_into().unwrap(); - if val_lower < val_upper { - layer.vals.keys.swap(write_position, index); - layer.vals.offs[write_position+1] = layer.vals.offs[index+1]; - write_position += 1; - } + let vals_off = &mut layer.vals.offs; + let mut keys_pos = key_start; + let keys_off = &mut layer.offs; + layer.vals.keys.retain_from(val_start, |index, _item| { + // As we pass each key offset, record its new position. + if index == keys_off[keys_pos].try_into().unwrap() { + keys_off[keys_pos] = O::try_from(write_position).unwrap(); + keys_pos += 1; } - // batch.layer.offs[i+1] = write_position; - } - layer.vals.keys.truncate(write_position); + let lower = vals_off[index].try_into().unwrap(); + let upper = vals_off[index+1].try_into().unwrap(); + if lower < upper { + vals_off[write_position+1] = vals_off[index+1]; + write_position += 1; + true + } + else { false } + }); + debug_assert_eq!(write_position, layer.vals.keys.len()); layer.vals.offs.truncate(write_position + 1); layer.offs[layer.keys.len()] = O::try_from(write_position).unwrap(); // 4. Remove empty keys. + let offs = &mut layer.offs; let mut write_position = key_start; - for i in key_start .. layer.keys.len() { - - let lower: usize = layer.offs[i].try_into().unwrap(); - let upper: usize = layer.offs[i+1].try_into().unwrap(); - + layer.keys.retain_from(key_start, |index, _item| { + let lower = offs[index].try_into().unwrap(); + let upper = offs[index+1].try_into().unwrap(); if lower < upper { - layer.keys.swap(write_position, i); - // batch.layer.offs updated via `dedup` below; keeps me sane. + offs[write_position+1] = offs[index+1]; write_position += 1; + true } - } - layer.offs.dedup(); - layer.keys.truncate(write_position); - layer.offs.truncate(write_position+1); + else { false } + }); + debug_assert_eq!(write_position, layer.keys.len()); + layer.offs.truncate(layer.keys.len()+1); } } /// State for an in-progress merge. -pub struct OrdValMerger +pub struct OrdValMerger, CV=Vec> where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+::std::fmt::Debug+'static, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, + CV: BatchContainer+Deref+RetainFrom, { // first batch, and position therein. lower1: usize, @@ -215,20 +259,22 @@ where lower2: usize, upper2: usize, // result that we are currently assembling. - result: , O>, O> as Trie>::MergeBuilder, + result: , O, CV>, O, CK> as Trie>::MergeBuilder, description: Description, should_compact: bool, } -impl Merger> for OrdValMerger +impl Merger> for OrdValMerger where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+timely::progress::Timestamp+Ord+Clone+::std::fmt::Debug+'static, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, + CV: BatchContainer+Deref+RetainFrom, { - fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option>) -> Self { + fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -244,12 +290,12 @@ where upper1: batch1.layer.keys(), lower2: 0, upper2: batch2.layer.keys(), - result: <, O>, O> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), + result: <, O, CV>, O, CK> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), description: description, should_compact: compaction_frontier.is_some(), } } - fn done(self) -> OrdValBatch { + fn done(self) -> OrdValBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -259,7 +305,7 @@ where desc: self.description, } } - fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.vals.len(); let mut effort = 0isize; @@ -298,7 +344,7 @@ where // if we are supplied a frontier, we should compact. if self.should_compact { - OrdValBatch::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -311,31 +357,35 @@ where /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdValCursor +pub struct OrdValCursor, CV=Vec> where V: Ord+Clone, T: Lattice+Ord+Clone, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, + CV: BatchContainer+Deref+RetainFrom, { - phantom: std::marker::PhantomData, - cursor: OrderedCursor, O>>, + phantom: std::marker::PhantomData<(K, CK, CV)>, + cursor: OrderedCursor, O, CV>>, } -impl Cursor for OrdValCursor +impl Cursor for OrdValCursor where K: Ord+Clone, V: Ord+Clone, T: Lattice+Ord+Clone, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, + CV: BatchContainer+Deref+RetainFrom, { type Key = K; type Val = V; type Time = T; type R = R; - type Storage = OrdValBatch; + type Storage = OrdValBatch; fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { &self.cursor.child.key(&storage.layer.vals) } @@ -358,34 +408,38 @@ where /// A builder for creating layers from unsorted update tuples. -pub struct OrdValBuilder +pub struct OrdValBuilder, CV=Vec> where - K: Ord, - V: Ord, - T: Ord+Lattice, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + K: Ord+Clone, + V: Ord+Clone, + T: Ord+Clone+Lattice, + R: Clone+Semigroup, + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, + CV: BatchContainer+Deref+RetainFrom, { - builder: OrderedBuilder, O>, O>, + builder: OrderedBuilder, O, CV>, O, CK>, } -impl Builder> for OrdValBuilder +impl Builder> for OrdValBuilder where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+timely::progress::Timestamp+Ord+Clone+::std::fmt::Debug+'static, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, + CV: BatchContainer+Deref+RetainFrom, { fn new() -> Self { OrdValBuilder { - builder: OrderedBuilder::, O>, O>::new() + builder: OrderedBuilder::, O, CV>, O, CK>::new() } } fn with_capacity(cap: usize) -> Self { OrdValBuilder { - builder: , O>, O> as TupleBuilder>::with_capacity(cap) + builder: , O, CV>, O, CK> as TupleBuilder>::with_capacity(cap) } } @@ -395,7 +449,7 @@ where } #[inline(never)] - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { OrdValBatch { layer: self.builder.done(), desc: Description::new(lower, upper, since) @@ -408,31 +462,34 @@ where /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Debug, Abomonation)] -pub struct OrdKeyBatch +pub struct OrdKeyBatch> where - K: Ord, - T: Lattice, - O: OrdOffset, >::Error: Debug, >::Error: Debug + K: Ord+Clone, + T: Clone+Lattice, + R: Clone, + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, { /// Where all the dataz is. - pub layer: OrderedLayer, O>, + pub layer: OrderedLayer, O, CK>, /// Description of the update times this layer represents. pub desc: Description, } -impl BatchReader for OrdKeyBatch +impl BatchReader for OrdKeyBatch where K: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + R: Clone+Semigroup, + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, { type Key = K; type Val = (); type Time = T; type R = R; - type Cursor = OrdKeyCursor; + type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { OrdKeyCursor { valid: true, @@ -440,34 +497,36 @@ where phantom: PhantomData } } - fn len(&self) -> usize { , O> as Trie>::tuples(&self.layer) } + fn len(&self) -> usize { , O, CK> as Trie>::tuples(&self.layer) } fn description(&self) -> &Description { &self.desc } } -impl Batch for OrdKeyBatch +impl Batch for OrdKeyBatch where K: Ord+Clone+'static, T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, { type Batcher = MergeBatcher; - type Builder = OrdKeyBuilder; - type Merger = OrdKeyMerger; + type Builder = OrdKeyBuilder; + type Merger = OrdKeyMerger; fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { OrdKeyMerger::new(self, other, compaction_frontier) } } -impl OrdKeyBatch +impl OrdKeyBatch where K: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, { - fn advance_builder_from(layer: &mut OrderedBuilder, O>, frontier: AntichainRef, key_pos: usize) { + fn advance_builder_from(layer: &mut OrderedBuilder, O, CK>, frontier: AntichainRef, key_pos: usize) { let key_start = key_pos; let time_start: usize = layer.offs[key_pos].try_into().unwrap(); @@ -512,31 +571,31 @@ where layer.offs[layer.keys.len()] = O::try_from(write_position).unwrap(); // 4. Remove empty keys. + let offs = &mut layer.offs; let mut write_position = key_start; - for i in key_start .. layer.keys.len() { - - let lower: usize = layer.offs[i].try_into().unwrap(); - let upper: usize = layer.offs[i+1].try_into().unwrap(); - + layer.keys.retain_from(key_start, |index, _item| { + let lower = offs[index].try_into().unwrap(); + let upper = offs[index+1].try_into().unwrap(); if lower < upper { - layer.keys.swap(write_position, i); - // batch.layer.offs updated via `dedup` below; keeps me sane. + offs[write_position+1] = offs[index+1]; write_position += 1; + true } - } - layer.offs.dedup(); - layer.keys.truncate(write_position); - layer.offs.truncate(write_position+1); + else { false } + }); + debug_assert_eq!(write_position, layer.keys.len()); + layer.offs.truncate(layer.keys.len()+1); } } /// State for an in-progress merge. -pub struct OrdKeyMerger +pub struct OrdKeyMerger> where K: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, { // first batch, and position therein. lower1: usize, @@ -545,19 +604,20 @@ where lower2: usize, upper2: usize, // result that we are currently assembling. - result: , O> as Trie>::MergeBuilder, + result: , O, CK> as Trie>::MergeBuilder, description: Description, should_compact: bool, } -impl Merger> for OrdKeyMerger +impl Merger> for OrdKeyMerger where K: Ord+Clone+'static, T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, { - fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option>) -> Self { + fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -573,12 +633,12 @@ where upper1: batch1.layer.keys(), lower2: 0, upper2: batch2.layer.keys(), - result: <, O> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), + result: <, O, CK> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), description: description, should_compact: compaction_frontier.is_some(), } } - fn done(self) -> OrdKeyBatch { + fn done(self) -> OrdKeyBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -588,7 +648,7 @@ where desc: self.description, } } - fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.len(); let mut effort = 0isize; @@ -633,7 +693,7 @@ where // if we are supplied a frontier, we should compact. if self.should_compact { - OrdKeyBatch::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -647,25 +707,26 @@ where /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdKeyCursor { +pub struct OrdKeyCursor> { valid: bool, cursor: OrderedCursor>, - phantom: PhantomData<(K, O)> + phantom: PhantomData<(K, O, CK)>, } -impl Cursor for OrdKeyCursor +impl Cursor for OrdKeyCursor where K: Ord+Clone, T: Lattice+Ord+Clone, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, { type Key = K; type Val = (); type Time = T; type R = R; - type Storage = OrdKeyBatch; + type Storage = OrdKeyBatch; fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } @@ -688,33 +749,35 @@ where /// A builder for creating layers from unsorted update tuples. -pub struct OrdKeyBuilder +pub struct OrdKeyBuilder> where - K: Ord, - T: Ord+Lattice, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + K: Ord+Clone, + T: Ord+Clone+Lattice, + R: Clone+Semigroup, + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, { - builder: OrderedBuilder, O>, + builder: OrderedBuilder, O, CK>, } -impl Builder> for OrdKeyBuilder +impl Builder> for OrdKeyBuilder where K: Ord+Clone+'static, T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, { fn new() -> Self { OrdKeyBuilder { - builder: OrderedBuilder::, O>::new() + builder: OrderedBuilder::, O, CK>::new() } } fn with_capacity(cap: usize) -> Self { OrdKeyBuilder { - builder: , O> as TupleBuilder>::with_capacity(cap) + builder: , O, CK> as TupleBuilder>::with_capacity(cap) } } @@ -724,7 +787,7 @@ where } #[inline(never)] - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { OrdKeyBatch { layer: self.builder.done(), desc: Description::new(lower, upper, since) diff --git a/src/trace/layers/mod.rs b/src/trace/layers/mod.rs index 29ebd46e5..e0d26088a 100644 --- a/src/trace/layers/mod.rs +++ b/src/trace/layers/mod.rs @@ -4,6 +4,9 @@ //! in the next layer. Similarly, ranges of elements in the layer itself may correspond //! to single elements in the layer above. +use timely::container::columnation::TimelyStack; +use timely::container::columnation::Columnation; + pub mod ordered; pub mod ordered_leaf; // pub mod hashed; @@ -106,6 +109,73 @@ pub trait Cursor { fn reposition(&mut self, storage: &Storage, lower: usize, upper: usize); } +/// A general-purpose container resembling `Vec`. +pub trait BatchContainer: Default { + /// The type of contained item. + type Item; + /// Inserts an owned item. + fn push(&mut self, item: Self::Item); + /// Inserts a borrowed item. + fn copy(&mut self, item: &Self::Item); + /// Extends from a slice of items. + fn copy_slice(&mut self, slice: &[Self::Item]); + /// Creates a new container with sufficient capacity. + fn with_capacity(size: usize) -> Self; + /// Reserves additional capacity. + fn reserve(&mut self, additional: usize); + /// Creates a new container with sufficient capacity. + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self; +} + +impl BatchContainer for Vec { + type Item = T; + fn push(&mut self, item: T) { + self.push(item); + } + fn copy(&mut self, item: &T) { + self.push(item.clone()); + } + fn copy_slice(&mut self, slice: &[T]) { + self.extend_from_slice(slice); + } + fn with_capacity(size: usize) -> Self { + Vec::with_capacity(size) + } + fn reserve(&mut self, additional: usize) { + self.reserve(additional); + } + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + Vec::with_capacity(cont1.len() + cont2.len()) + } +} + +impl BatchContainer for TimelyStack { + type Item = T; + fn push(&mut self, item: T) { + self.copy(&item); + } + fn copy(&mut self, item: &T) { + self.copy(item); + } + fn copy_slice(&mut self, slice: &[T]) { + self.reserve_items(slice.iter()); + for item in slice.iter() { + self.copy(item); + } + } + fn with_capacity(size: usize) -> Self { + Self::with_capacity(size) + } + fn reserve(&mut self, _additional: usize) { + } + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + let mut new = Self::default(); + new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2))); + new + } +} + + /// Reports the number of elements satisfing the predicate. /// /// This methods *relies strongly* on the assumption that the predicate diff --git a/src/trace/layers/ordered.rs b/src/trace/layers/ordered.rs index c6b6f1a1c..e2d34a8bd 100644 --- a/src/trace/layers/ordered.rs +++ b/src/trace/layers/ordered.rs @@ -1,9 +1,9 @@ //! Implementation using ordered keys and exponential search. -use super::{Trie, Cursor, Builder, MergeBuilder, TupleBuilder, advance}; +use super::{Trie, Cursor, Builder, MergeBuilder, TupleBuilder, BatchContainer, advance}; use std::convert::{TryFrom, TryInto}; use std::fmt::Debug; -use std::ops::{Sub,Add}; +use std::ops::{Sub,Add,Deref}; /// Trait for types used as offsets into an ordered layer. /// This is usually `usize`, but `u32` can also be used in applications @@ -20,13 +20,14 @@ where /// /// In this representation, the values for `keys[i]` are found at `vals[offs[i] .. offs[i+1]]`. #[derive(Debug, Eq, PartialEq, Clone, Abomonation)] -pub struct OrderedLayer +pub struct OrderedLayer> where K: Ord, + C: BatchContainer+Deref, O: OrdOffset, >::Error: Debug, >::Error: Debug { /// The keys of the layer. - pub keys: Vec, + pub keys: C, /// The offsets associate with each key. /// /// The bounds for `keys[i]` are `(offs[i], offs[i+1]`). The offset array is guaranteed to be one @@ -36,16 +37,17 @@ where pub vals: L, } -impl Trie for OrderedLayer +impl Trie for OrderedLayer where K: Ord+Clone, + C: BatchContainer+Deref, L: Trie, O: OrdOffset, >::Error: Debug, >::Error: Debug { type Item = (K, L::Item); type Cursor = OrderedCursor; - type MergeBuilder = OrderedBuilder; - type TupleBuilder = OrderedBuilder; + type MergeBuilder = OrderedBuilder; + type TupleBuilder = OrderedBuilder; fn keys(&self) -> usize { self.keys.len() } fn tuples(&self) -> usize { self.vals.tuples() } @@ -72,26 +74,28 @@ where } /// Assembles a layer of this -pub struct OrderedBuilder +pub struct OrderedBuilder> where - K: Ord, + K: Ord+Clone, + C: BatchContainer+Deref, O: OrdOffset, >::Error: Debug, >::Error: Debug { /// Keys - pub keys: Vec, + pub keys: C, /// Offsets pub offs: Vec, /// The next layer down pub vals: L, } -impl Builder for OrderedBuilder +impl Builder for OrderedBuilder where K: Ord+Clone, + C: BatchContainer+Deref, L: Builder, O: OrdOffset, >::Error: Debug, >::Error: Debug { - type Trie = OrderedLayer; + type Trie = OrderedLayer; fn boundary(&mut self) -> usize { self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).unwrap(); self.keys.len() @@ -108,9 +112,10 @@ where } } -impl MergeBuilder for OrderedBuilder +impl MergeBuilder for OrderedBuilder where K: Ord+Clone, + C: BatchContainer+Deref, L: MergeBuilder, O: OrdOffset, >::Error: Debug, >::Error: Debug { @@ -118,7 +123,7 @@ where let mut offs = Vec::with_capacity(other1.keys() + other2.keys() + 1); offs.push(O::try_from(0 as usize).unwrap()); OrderedBuilder { - keys: Vec::with_capacity(other1.keys() + other2.keys()), + keys: C::merge_capacity(&other1.keys, &other2.keys), offs: offs, vals: L::with_capacity(&other1.vals, &other2.vals), } @@ -129,7 +134,7 @@ where let other_basis = other.offs[lower]; let self_basis = self.offs.last().map(|&x| x).unwrap_or(O::try_from(0).unwrap()); - self.keys.extend_from_slice(&other.keys[lower .. upper]); + self.keys.copy_slice(&other.keys[lower .. upper]); for index in lower .. upper { self.offs.push((other.offs[index + 1] + self_basis) - other_basis); } @@ -154,9 +159,10 @@ where } } -impl OrderedBuilder +impl OrderedBuilder where K: Ord+Clone, + C: BatchContainer+Deref, L: MergeBuilder, O: OrdOffset, >::Error: Debug, >::Error: Debug { @@ -183,7 +189,7 @@ where (&trie2.vals, trie2.offs[*lower2].try_into().unwrap(), trie2.offs[*lower2 + 1].try_into().unwrap()) ); if upper > lower { - self.keys.push(trie1.keys[*lower1].clone()); + self.keys.copy(&trie1.keys[*lower1]); self.offs.push(O::try_from(upper).unwrap()); } @@ -201,19 +207,20 @@ where } } -impl TupleBuilder for OrderedBuilder +impl TupleBuilder for OrderedBuilder where K: Ord+Clone, + C: BatchContainer+Deref, L: TupleBuilder, O: OrdOffset, >::Error: Debug, >::Error: Debug { type Item = (K, L::Item); - fn new() -> Self { OrderedBuilder { keys: Vec::new(), offs: vec![O::try_from(0).unwrap()], vals: L::new() } } + fn new() -> Self { OrderedBuilder { keys: C::default(), offs: vec![O::try_from(0).unwrap()], vals: L::new() } } fn with_capacity(cap: usize) -> Self { let mut offs = Vec::with_capacity(cap + 1); offs.push(O::try_from(0).unwrap()); OrderedBuilder{ - keys: Vec::with_capacity(cap), + keys: C::with_capacity(cap), offs: offs, vals: L::with_capacity(cap), } @@ -236,23 +243,22 @@ where /// A cursor with a child cursor that is updated as we move. #[derive(Debug)] pub struct OrderedCursor { - // keys: OwningRef, [K]>, - // offs: OwningRef, [usize]>, pos: usize, bounds: (usize, usize), /// The cursor for the trie layer below this one. pub child: L::Cursor, } -impl Cursor> for OrderedCursor +impl Cursor> for OrderedCursor where - K: Ord, + K: Ord+Clone, + C: BatchContainer+Deref, L: Trie, O: OrdOffset, >::Error: Debug, >::Error: Debug { type Key = K; - fn key<'a>(&self, storage: &'a OrderedLayer) -> &'a Self::Key { &storage.keys[self.pos] } - fn step(&mut self, storage: &OrderedLayer) { + fn key<'a>(&self, storage: &'a OrderedLayer) -> &'a Self::Key { &storage.keys[self.pos] } + fn step(&mut self, storage: &OrderedLayer) { self.pos += 1; if self.valid(storage) { self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().unwrap(), storage.offs[self.pos + 1].try_into().unwrap()); @@ -261,21 +267,21 @@ where self.pos = self.bounds.1; } } - fn seek(&mut self, storage: &OrderedLayer, key: &Self::Key) { + fn seek(&mut self, storage: &OrderedLayer, key: &Self::Key) { self.pos += advance(&storage.keys[self.pos .. self.bounds.1], |k| k.lt(key)); if self.valid(storage) { self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().unwrap(), storage.offs[self.pos + 1].try_into().unwrap()); } } // fn size(&self) -> usize { self.bounds.1 - self.bounds.0 } - fn valid(&self, _storage: &OrderedLayer) -> bool { self.pos < self.bounds.1 } - fn rewind(&mut self, storage: &OrderedLayer) { + fn valid(&self, _storage: &OrderedLayer) -> bool { self.pos < self.bounds.1 } + fn rewind(&mut self, storage: &OrderedLayer) { self.pos = self.bounds.0; if self.valid(storage) { self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().unwrap(), storage.offs[self.pos + 1].try_into().unwrap()); } } - fn reposition(&mut self, storage: &OrderedLayer, lower: usize, upper: usize) { + fn reposition(&mut self, storage: &OrderedLayer, lower: usize, upper: usize) { self.pos = lower; self.bounds = (lower, upper); if self.valid(storage) { diff --git a/src/trace/layers/ordered_leaf.rs b/src/trace/layers/ordered_leaf.rs index b01f299d3..9e7727669 100644 --- a/src/trace/layers/ordered_leaf.rs +++ b/src/trace/layers/ordered_leaf.rs @@ -2,22 +2,29 @@ use ::difference::Semigroup; -use super::{Trie, Cursor, Builder, MergeBuilder, TupleBuilder, advance}; +use super::{Trie, Cursor, Builder, MergeBuilder, TupleBuilder, BatchContainer, advance}; +use std::ops::Deref; /// A layer of unordered values. #[derive(Debug, Eq, PartialEq, Clone, Abomonation)] -pub struct OrderedLeaf { +pub struct OrderedLeaf> +where + C: BatchContainer+Deref, +{ /// Unordered values. - pub vals: Vec<(K, R)>, + pub vals: C, } -impl Trie for OrderedLeaf { +impl Trie for OrderedLeaf +where + C: BatchContainer+Deref, +{ type Item = (K, R); type Cursor = OrderedLeafCursor; - type MergeBuilder = OrderedLeafBuilder; - type TupleBuilder = OrderedLeafBuilder; + type MergeBuilder = OrderedLeafBuilder; + type TupleBuilder = OrderedLeafBuilder; fn keys(&self) -> usize { self.vals.len() } - fn tuples(&self) -> usize { as Trie>::keys(&self) } + fn tuples(&self) -> usize { as Trie>::keys(&self) } fn cursor_from(&self, lower: usize, upper: usize) -> Self::Cursor { OrderedLeafCursor { bounds: (lower, upper), @@ -27,26 +34,35 @@ impl Trie for OrderedLeaf { } /// A builder for unordered values. -pub struct OrderedLeafBuilder { +pub struct OrderedLeafBuilder> +where + C: BatchContainer+Deref, +{ /// Unordered values. - pub vals: Vec<(K, R)>, + pub vals: C, } -impl Builder for OrderedLeafBuilder { - type Trie = OrderedLeaf; +impl Builder for OrderedLeafBuilder +where + C: BatchContainer+Deref, +{ + type Trie = OrderedLeaf; fn boundary(&mut self) -> usize { self.vals.len() } fn done(self) -> Self::Trie { OrderedLeaf { vals: self.vals } } } -impl MergeBuilder for OrderedLeafBuilder { +impl MergeBuilder for OrderedLeafBuilder +where + C: BatchContainer+Deref, +{ fn with_capacity(other1: &Self::Trie, other2: &Self::Trie) -> Self { OrderedLeafBuilder { - vals: Vec::with_capacity( as Trie>::keys(other1) + as Trie>::keys(other2)), + vals: C::merge_capacity(&other1.vals, &other2.vals), } } #[inline] fn copy_range(&mut self, other: &Self::Trie, lower: usize, upper: usize) { - self.vals.extend_from_slice(&other.vals[lower .. upper]); + self.vals.copy_slice(&other.vals[lower .. upper]); } fn push_merge(&mut self, other1: (&Self::Trie, usize, usize), other2: (&Self::Trie, usize, usize)) -> usize { @@ -63,7 +79,7 @@ impl MergeBuilder for OrderedLeafBuilder // determine how far we can advance lower1 until we reach/pass lower2 let step = 1 + advance(&trie1.vals[(1+lower1)..upper1], |x| x.0 < trie2.vals[lower2].0); let step = std::cmp::min(step, 1000); - as MergeBuilder>::copy_range(self, trie1, lower1, lower1 + step); + as MergeBuilder>::copy_range(self, trie1, lower1, lower1 + step); lower1 += step; } ::std::cmp::Ordering::Equal => { @@ -81,23 +97,26 @@ impl MergeBuilder for OrderedLeafBuilder // determine how far we can advance lower2 until we reach/pass lower1 let step = 1 + advance(&trie2.vals[(1+lower2)..upper2], |x| x.0 < trie1.vals[lower1].0); let step = std::cmp::min(step, 1000); - as MergeBuilder>::copy_range(self, trie2, lower2, lower2 + step); + as MergeBuilder>::copy_range(self, trie2, lower2, lower2 + step); lower2 += step; } } } - if lower1 < upper1 { as MergeBuilder>::copy_range(self, trie1, lower1, upper1); } - if lower2 < upper2 { as MergeBuilder>::copy_range(self, trie2, lower2, upper2); } + if lower1 < upper1 { as MergeBuilder>::copy_range(self, trie1, lower1, upper1); } + if lower2 < upper2 { as MergeBuilder>::copy_range(self, trie2, lower2, upper2); } self.vals.len() } } -impl TupleBuilder for OrderedLeafBuilder { +impl TupleBuilder for OrderedLeafBuilder +where + C: BatchContainer+Deref, +{ type Item = (K, R); - fn new() -> Self { OrderedLeafBuilder { vals: Vec::new() } } - fn with_capacity(cap: usize) -> Self { OrderedLeafBuilder { vals: Vec::with_capacity(cap) } } + fn new() -> Self { OrderedLeafBuilder { vals: C::default() } } + fn with_capacity(cap: usize) -> Self { OrderedLeafBuilder { vals: C::with_capacity(cap) } } #[inline] fn push_tuple(&mut self, tuple: (K, R)) { self.vals.push(tuple) } } @@ -110,23 +129,26 @@ pub struct OrderedLeafCursor { bounds: (usize, usize), } -impl Cursor> for OrderedLeafCursor { +impl Cursor> for OrderedLeafCursor +where + C: BatchContainer+Deref, +{ type Key = (K, R); - fn key<'a>(&self, storage: &'a OrderedLeaf) -> &'a Self::Key { &storage.vals[self.pos] } - fn step(&mut self, storage: &OrderedLeaf) { + fn key<'a>(&self, storage: &'a OrderedLeaf) -> &'a Self::Key { &storage.vals[self.pos] } + fn step(&mut self, storage: &OrderedLeaf) { self.pos += 1; if !self.valid(storage) { self.pos = self.bounds.1; } } - fn seek(&mut self, _storage: &OrderedLeaf, _key: &Self::Key) { + fn seek(&mut self, _storage: &OrderedLeaf, _key: &Self::Key) { panic!("seeking in an OrderedLeafCursor; should be fine, panic is wrong."); } - fn valid(&self, _storage: &OrderedLeaf) -> bool { self.pos < self.bounds.1 } - fn rewind(&mut self, _storage: &OrderedLeaf) { + fn valid(&self, _storage: &OrderedLeaf) -> bool { self.pos < self.bounds.1 } + fn rewind(&mut self, _storage: &OrderedLeaf) { self.pos = self.bounds.0; } - fn reposition(&mut self, _storage: &OrderedLeaf, lower: usize, upper: usize) { + fn reposition(&mut self, _storage: &OrderedLeaf, lower: usize, upper: usize) { self.pos = lower; self.bounds = (lower, upper); } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index f1775050f..fc340e432 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -13,6 +13,7 @@ pub mod implementations; pub mod layers; pub mod wrappers; +use timely::communication::message::RefOrMut; use timely::progress::{Antichain, frontier::AntichainRef}; use timely::progress::Timestamp; @@ -290,7 +291,7 @@ pub trait Batcher { /// Allocates a new empty batcher. fn new() -> Self; /// Adds an unordered batch of elements to the batcher. - fn push_batch(&mut self, batch: &mut Vec<((Output::Key, Output::Val), Output::Time, Output::R)>); + fn push_batch(&mut self, batch: RefOrMut>); /// Returns all updates not greater or equal to an element of `upper`. fn seal(&mut self, upper: Antichain) -> Output; /// Returns the lower envelope of contained update times. @@ -336,6 +337,7 @@ pub trait Merger { pub mod rc_blanket_impls { use std::rc::Rc; + use timely::communication::message::RefOrMut; use timely::progress::{Antichain, frontier::AntichainRef}; use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description}; @@ -415,7 +417,7 @@ pub mod rc_blanket_impls { /// Functionality for collecting and batching updates. impl Batcher> for RcBatcher { fn new() -> Self { RcBatcher { batcher: >::new() } } - fn push_batch(&mut self, batch: &mut Vec<((B::Key, B::Val), B::Time, B::R)>) { self.batcher.push_batch(batch) } + fn push_batch(&mut self, batch: RefOrMut>) { self.batcher.push_batch(batch) } fn seal(&mut self, upper: Antichain) -> Rc { Rc::new(self.batcher.seal(upper)) } fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } } @@ -450,6 +452,7 @@ pub mod abomonated_blanket_impls { use abomonation::{Abomonation, measure}; use abomonation::abomonated::Abomonated; + use timely::communication::message::RefOrMut; use timely::progress::{Antichain, frontier::AntichainRef}; use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description}; @@ -530,7 +533,7 @@ pub mod abomonated_blanket_impls { /// Functionality for collecting and batching updates. impl Batcher>> for AbomonatedBatcher { fn new() -> Self { AbomonatedBatcher { batcher: >::new() } } - fn push_batch(&mut self, batch: &mut Vec<((B::Key, B::Val), B::Time, B::R)>) { self.batcher.push_batch(batch) } + fn push_batch(&mut self, batch: RefOrMut>) { self.batcher.push_batch(batch) } fn seal(&mut self, upper: Antichain) -> Abomonated> { let batch = self.batcher.seal(upper); let mut bytes = Vec::with_capacity(measure(&batch)); diff --git a/tests/trace.rs b/tests/trace.rs index f87ffa33e..d00c4497e 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -21,11 +21,12 @@ fn get_trace() -> Spine>> { { let mut batcher = <::Batch as Batch>::Batcher::new(); - batcher.push_batch(&mut vec![ + use timely::communication::message::RefOrMut; + batcher.push_batch(RefOrMut::Mut(&mut vec![ ((1, 2), 0, 1), ((2, 3), 1, 1), ((2, 3), 2, -1), - ]); + ])); let batch_ts = &[1, 2, 3]; let batches = batch_ts.iter().map(move |i| batcher.seal(Antichain::from_elem(*i)));