From 2eae4a2379789c33772cf2ab74ab78b98bf247e1 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 9 Mar 2021 15:56:19 -0500 Subject: [PATCH] remove unused implementations --- src/trace/implementations/graph.rs | 277 ----- src/trace/implementations/hash.rs | 268 ----- src/trace/implementations/mod.rs | 5 +- src/trace/implementations/spine_fueled.rs | 963 ++++++++++++------ src/trace/implementations/spine_fueled_neu.rs | 917 ----------------- src/trace/implementations/vec.rs | 300 ------ 6 files changed, 653 insertions(+), 2077 deletions(-) delete mode 100644 src/trace/implementations/graph.rs delete mode 100644 src/trace/implementations/hash.rs delete mode 100644 src/trace/implementations/spine_fueled_neu.rs delete mode 100644 src/trace/implementations/vec.rs diff --git a/src/trace/implementations/graph.rs b/src/trace/implementations/graph.rs deleted file mode 100644 index c52bffd59..000000000 --- a/src/trace/implementations/graph.rs +++ /dev/null @@ -1,277 +0,0 @@ -//! Trace and batch implementations based on sorted ranges. -//! -//! The types and type aliases in this module start with either -//! -//! * `OrdVal`: Collections whose data have the form `(key, val)` where `key` is ordered. -//! * `OrdKey`: Collections whose data have the form `key` where `key` is ordered. -//! -//! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation -//! and should consume fewer resources (computation and memory) when it applies. - -// use std::cmp::Ordering; -use std::rc::Rc; - -// use ::Diff; -// use lattice::Lattice; - -use trace::{Batch, BatchReader, Builder, Merger, Cursor, Trace, TraceReader}; -use trace::description::Description; -use trace::rc_blanket_impls::RcBatchCursor; - -// use trace::layers::MergeBuilder; - -use super::spine_fueled::Spine; -use super::merge_batcher::MergeBatcher; - -use timely::progress::nested::product::Product; -use timely::progress::timestamp::RootTimestamp; - -type Node = u32; - -/// -struct GraphSpine where N: Ord+Clone+'static { - spine: Spine, isize, Rc>> -} - -impl TraceReader, isize> for GraphSpine -where - N: Ord+Clone+'static, -{ - type Batch = Rc>; - type Cursor = RcBatchCursor, isize, GraphBatch>; - - fn cursor_through(&mut self, upper: &[Product]) -> Option<(Self::Cursor, , isize>>::Storage)> { - - let mut batch = Vec::new(); - self.spine.map_batches(|b| batch.push(b.clone())); - assert!(batch.len() <= 1); - - if upper == &[] { - batch.pop().map(|b| (b.cursor(), b)) - } - else { None } - } - fn set_logical_compaction(&mut self, frontier: &[Product]) { - self.spine.set_logical_compaction(frontier) - } - fn get_logical_compaction(&mut self) -> &[Product] { self.spine.get_logical_compaction() } - fn set_physical_compaction(&mut self, frontier: &[Product]) { - self.spine.set_physical_compaction(frontier) - } - fn get_physical_compaction(&mut self) -> &[Product] { &self.spine.get_physical_compaction() } - - fn map_batches(&self, f: F) { - self.spine.map_batches(f) - } -} - -// A trace implementation for any key type that can be borrowed from or converted into `Key`. -// TODO: Almost all this implementation seems to be generic with respect to the trace and batch types. -impl Trace, isize> for GraphSpine -where - N: Ord+Clone+'static, -{ - - fn new() -> Self { - GraphSpine { - spine: Spine::, isize, Rc>>::new() - } - } - - // Ideally, this method acts as insertion of `batch`, even if we are not yet able to begin - // merging the batch. This means it is a good time to perform amortized work proportional - // to the size of batch. - fn insert(&mut self, batch: Self::Batch) { - self.spine.insert(batch) - } - - fn close(&mut self) { - self.spine.close() - } -} - -/// -#[derive(Debug, Abomonation)] -pub struct GraphBatch { - index: usize, - peers: usize, - keys: Vec, - nodes: Vec, - edges: Vec, - desc: Description>, -} - -impl BatchReader, isize> for GraphBatch where N: Ord+Clone+'static { - type Cursor = GraphCursor; - fn cursor(&self) -> Self::Cursor { GraphCursor { key: self.index as Node, key_pos: 0, val_pos: 0 } } - fn len(&self) -> usize { self.edges.len() } - fn description(&self) -> &Description> { &self.desc } -} - -impl Batch, isize> for GraphBatch where N: Ord+Clone+'static { - type Batcher = MergeBatcher, isize, Self>; - type Builder = GraphBuilder; - type Merger = GraphMerger; - - fn begin_merge(&self, other: &Self) -> Self::Merger { - GraphMerger::new(self, other) - } -} - -/// -pub struct GraphMerger { } - -impl Merger, isize, GraphBatch> for GraphMerger where N: Ord+Clone+'static { - fn new(_batch1: &GraphBatch, _batch2: &GraphBatch) -> Self { - panic!("Cannot merge GraphBatch; they are static"); - } - fn done(self) -> GraphBatch { - panic!("Cannot merge GraphBatch; they are static"); - } - fn work(&mut self, _source1: &GraphBatch, _source2: &GraphBatch, _frontier: &Option>>, _fuel: &mut usize) { - panic!("Cannot merge GraphBatch; they are static"); - } -} - -/// A cursor for navigating a single layer. -#[derive(Debug)] -pub struct GraphCursor { - key: Node, - key_pos: usize, - val_pos: usize, -} - -impl Cursor, isize> for GraphCursor where N: Ord+Clone { - - type Storage = GraphBatch; - - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Node { &storage.keys[self.key_pos] } - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a N { &storage.edges[self.val_pos] } - fn map_times, isize)>(&mut self, _storage: &Self::Storage, mut logic: L) { - logic(&Product::new(RootTimestamp, ()), 1); - } - fn key_valid(&self, storage: &Self::Storage) -> bool { (self.key_pos + 1) < storage.nodes.len() } - fn val_valid(&self, storage: &Self::Storage) -> bool { - self.val_pos < storage.nodes[self.key_pos + 1] - } - fn step_key(&mut self, storage: &Self::Storage){ - if self.key_valid(storage) { - self.key_pos += 1; - self.key += storage.peers as Node; - } - } - fn seek_key(&mut self, storage: &Self::Storage, key: &Node) { - if self.key_valid(storage) { - self.key_pos = (*key as usize) / storage.peers; - if self.key_pos + 1 >= storage.nodes.len() { - self.key_pos = storage.nodes.len() - 1; - } - self.val_pos = storage.nodes[self.key_pos]; - self.key = (storage.peers * self.key_pos + storage.index) as Node; - } - } - fn step_val(&mut self, storage: &Self::Storage) { - if self.val_valid(storage) { - self.val_pos += 1; - } - } - fn seek_val(&mut self, storage: &Self::Storage, val: &N) { - if self.val_valid(storage) { - let lower = self.val_pos; - let upper = storage.nodes[self.key_pos + 1]; - - self.val_pos += advance(&storage.edges[lower .. upper], |tuple| tuple < val); - } - } - fn rewind_keys(&mut self, storage: &Self::Storage) { self.key_pos = 0; self.key = storage.index as Node; } - fn rewind_vals(&mut self, storage: &Self::Storage) { - if self.key_valid(storage) { - self.val_pos = storage.nodes[self.key_pos]; - } - } -} - -/// A builder for creating layers from unsorted update tuples. -pub struct GraphBuilder { - index: usize, - peers: usize, - keys: Vec, - nodes: Vec, - edges: Vec, - -} - -impl Builder, isize, GraphBatch> for GraphBuilder where N: Ord+Clone+'static { - - fn new() -> Self { Self::with_capacity(0) } - fn with_capacity(cap: usize) -> Self { - GraphBuilder { - index: 0, - peers: 1, - keys: Vec::new(), - nodes: Vec::new(), - edges: Vec::with_capacity(cap), - } - } - - #[inline] - fn push(&mut self, (key, val, _time, _diff): (Node, N, Product, isize)) { - while self.nodes.len() <= (key as usize) / self.peers { - self.keys.push((self.peers * self.nodes.len() + self.index) as Node); - self.nodes.push(self.edges.len()); - } - - self.edges.push(val); - } - - #[inline(never)] - fn done(mut self, lower: &[Product], upper: &[Product], since: &[Product]) -> GraphBatch { - println!("GraphBuilder::done(): {} nodes, {} edges.", self.nodes.len(), self.edges.len()); - - self.nodes.push(self.edges.len()); - GraphBatch { - index: self.index, - peers: self.peers, - keys: self.keys, - nodes: self.nodes, - edges: self.edges, - desc: Description::new(lower, upper, since) - } - } -} - - -/// Reports the number of elements satisfing the predicate. -/// -/// This methods *relies strongly* on the assumption that the predicate -/// stays false once it becomes false, a joint property of the predicate -/// and the slice. This allows `advance` to use exponential search to -/// count the number of elements in time logarithmic in the result. -#[inline(never)] -pub fn advancebool>(slice: &[T], function: F) -> usize { - - // start with no advance - let mut index = 0; - if index < slice.len() && function(&slice[index]) { - - // advance in exponentially growing steps. - let mut step = 1; - while index + step < slice.len() && function(&slice[index + step]) { - index += step; - step = step << 1; - } - - // advance in exponentially shrinking steps. - step = step >> 1; - while step > 0 { - if index + step < slice.len() && function(&slice[index + step]) { - index += step; - } - step = step >> 1; - } - - index += 1; - } - - index -} diff --git a/src/trace/implementations/hash.rs b/src/trace/implementations/hash.rs deleted file mode 100644 index 0f245cb36..000000000 --- a/src/trace/implementations/hash.rs +++ /dev/null @@ -1,268 +0,0 @@ -//! Trace and batch implementations based on Robin Hood hashing. -//! -//! The types and type aliases in this module start with either -//! -//! * `HashVal`: Collections whose data have the form `(key, val)` where `key` is hash-ordered. -//! * `HashKey`: Collections whose data have the form `key` where `key` is hash-ordered. -//! -//! Although `HashVal` is more general than `HashKey`, the latter has a simpler representation -//! and should consume fewer resources (computation and memory) when it applies. - -use std::rc::Rc; - -use ::Diff; -use hashable::HashOrdered; - -use trace::layers::{Trie, TupleBuilder}; -use trace::layers::Builder as TrieBuilder; -use trace::layers::Cursor as TrieCursor; -use trace::layers::hashed::{HashedLayer, HashedBuilder, HashedCursor}; -use trace::layers::ordered::{OrderedLayer, OrderedBuilder}; -use trace::layers::ordered_leaf::{OrderedLeaf, OrderedLeafBuilder}; - -use lattice::Lattice; -use trace::{Batch, BatchReader, Builder, Cursor}; -use trace::description::Description; - -use super::spine::Spine; -use super::radix_batcher::RadixBatcher; - -/// A trace implementation using a spine of hash-map batches. -pub type HashValSpine = Spine>>; -/// A trace implementation for empty values using a spine of hash-map batches. -pub type HashKeySpine = Spine>>; - - -/// An immutable collection of update tuples, from a contiguous interval of logical times. -#[derive(Debug)] -pub struct HashValBatch { - /// Where all the dataz is. - pub layer: HashedLayer>>, - /// Description of the update times this layer represents. - pub desc: Description, -} - -impl BatchReader for Rc> -where K: Clone+Default+HashOrdered+'static, V: Clone+Ord+'static, T: Lattice+Ord+Clone+Default+'static, R: Diff { - type Cursor = HashValCursor; - fn cursor(&self) -> (Self::Cursor, >::Storage) { - let cursor = HashValCursor { - cursor: self.layer.cursor() - }; - - (cursor, self.clone()) - } - fn len(&self) -> usize { >> as Trie>::tuples(&self.layer) } - fn description(&self) -> &Description { &self.desc } -} - -impl Batch for Rc> -where K: Clone+Default+HashOrdered+'static, V: Clone+Ord+'static, T: Lattice+Ord+Clone+Default+'static, R: Diff { - type Batcher = RadixBatcher; - type Builder = HashValBuilder; - fn merge(&self, other: &Self) -> Self { - - // Things are horribly wrong if this is not true. - assert!(self.desc.upper() == other.desc.lower()); - - // one of self.desc.since or other.desc.since needs to be not behind the other... - let since = if self.desc.since().iter().all(|t1| other.desc.since().iter().any(|t2| t2.less_equal(t1))) { - other.desc.since() - } - else { - self.desc.since() - }; - - Rc::new(HashValBatch { - layer: >> as Trie>::merge(&self.layer, &other.layer), - desc: Description::new(self.desc.lower(), other.desc.upper(), since), - }) - } -} - -/// A cursor for navigating a single layer. -#[derive(Debug)] -pub struct HashValCursor { - cursor: HashedCursor>>, -} - -impl Cursor for HashValCursor -where K: Clone+HashOrdered, V: Ord+Clone, T: Lattice+Ord+Clone, R: Diff { - - type Storage = Rc>;//OrderedLayer>>; - - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { &self.cursor.child.key(&storage.layer.vals) } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { - self.cursor.child.child.rewind(&storage.layer.vals.vals); - while self.cursor.child.child.valid(&storage.layer.vals.vals) { - logic(&self.cursor.child.child.key(&storage.layer.vals.vals).0, self.cursor.child.child.key(&storage.layer.vals.vals).1); - self.cursor.child.child.step(&storage.layer.vals.vals); - } - } - fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) } - fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.child.valid(&storage.layer.vals) } - fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); } - fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek(&storage.layer, key); } - fn step_val(&mut self, storage: &Self::Storage) { self.cursor.child.step(&storage.layer.vals); } - fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.child.seek(&storage.layer.vals, val); } - fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); } - fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.child.rewind(&storage.layer.vals); } -} - -/// A builder for creating layers from unsorted update tuples. -pub struct HashValBuilder { - builder: HashedBuilder>>, -} - -impl Builder>> for HashValBuilder -where K: Clone+Default+HashOrdered+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+Default+'static, R: Diff { - - fn new() -> Self { - HashValBuilder { - builder: HashedBuilder::>>::new() - } - } - fn with_capacity(cap: usize) -> Self { - HashValBuilder { - builder: HashedBuilder::>>::with_capacity(cap) - } - } - - #[inline] - fn push(&mut self, (key, val, time, diff): (K, V, T, R)) { - self.builder.push_tuple((key, (val, (time, diff)))); - } - - #[inline(never)] - fn done(self, lower: &[T], upper: &[T], since: &[T]) -> Rc> { - Rc::new(HashValBatch { - layer: self.builder.done(), - desc: Description::new(lower, upper, since) - }) - } -} - - - - -/// An immutable collection of update tuples, from a contiguous interval of logical times. -#[derive(Debug)] -pub struct HashKeyBatch { - /// Where all the dataz is. - pub layer: HashedLayer>, - /// Description of the update times this layer represents. - pub desc: Description, -} - -impl BatchReader for Rc> -where K: Clone+Default+HashOrdered+'static, T: Lattice+Ord+Clone+Default+'static, R: Diff { - type Cursor = HashKeyCursor; - // fn cursor(&self) -> (Self::Cursor, >::Storage) { - - fn cursor(&self) -> (Self::Cursor, >::Storage) { - let cursor = HashKeyCursor { - empty: (), - valid: true, - cursor: self.layer.cursor(), - }; - (cursor, self.clone()) - - // HashKeyCursor { - // empty: (), - // valid: true, - // cursor: self.layer.cursor(OwningRef::new(self.clone()).map(|x| &x.layer).erase_owner()), - // } - } - fn len(&self) -> usize { > as Trie>::tuples(&self.layer) } - fn description(&self) -> &Description { &self.desc } -} - -impl Batch for Rc> -where K: Clone+Default+HashOrdered+'static, T: Lattice+Ord+Clone+Default+'static, R: Diff { - type Batcher = RadixBatcher; - type Builder = HashKeyBuilder; - fn merge(&self, other: &Self) -> Self { - - // Things are horribly wrong if this is not true. - assert!(self.desc.upper() == other.desc.lower()); - - // one of self.desc.since or other.desc.since needs to be not behind the other... - let since = if self.desc.since().iter().all(|t1| other.desc.since().iter().any(|t2| t2.less_equal(t1))) { - other.desc.since() - } - else { - self.desc.since() - }; - - Rc::new(HashKeyBatch { - layer: > as Trie>::merge(&self.layer, &other.layer), - desc: Description::new(self.desc.lower(), other.desc.upper(), since), - }) - } -} - -/// A cursor for navigating a single layer. -#[derive(Debug)] -pub struct HashKeyCursor { - valid: bool, - empty: (), - cursor: HashedCursor>, -} - -impl Cursor for HashKeyCursor { - - type Storage = Rc>; - - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } - fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { unsafe { ::std::mem::transmute(&self.empty) } } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { - self.cursor.child.rewind(&storage.layer.vals); - while self.cursor.child.valid(&storage.layer.vals) { - logic(&self.cursor.child.key(&storage.layer.vals).0, self.cursor.child.key(&storage.layer.vals).1); - self.cursor.child.step(&storage.layer.vals); - } - } - fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) } - fn val_valid(&self, _storage: &Self::Storage) -> bool { self.valid } - fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); self.valid = true; } - fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek(&storage.layer, key); self.valid = true; } - fn step_val(&mut self, _storage: &Self::Storage) { self.valid = false; } - fn seek_val(&mut self, _storage: &Self::Storage, _val: &()) { } - fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); self.valid = true; } - fn rewind_vals(&mut self, _storage: &Self::Storage) { self.valid = true; } -} - -/// A builder for creating layers from unsorted update tuples. -pub struct HashKeyBuilder { - builder: HashedBuilder>, -} - -impl Builder>> for HashKeyBuilder -where K: Clone+Default+HashOrdered+'static, T: Lattice+Ord+Clone+Default+'static, R: Diff { - - fn new() -> Self { - HashKeyBuilder { - builder: HashedBuilder::>::new() - } - } - fn with_capacity(cap: usize) -> Self { - HashKeyBuilder { - builder: HashedBuilder::>::with_capacity(cap) - } - } - - #[inline] - fn push(&mut self, (key, _, time, diff): (K, (), T, R)) { - self.builder.push_tuple((key, (time, diff))); - } - - #[inline(never)] - fn done(self, lower: &[T], upper: &[T], since: &[T]) -> Rc> { - Rc::new(HashKeyBatch { - layer: self.builder.done(), - desc: Description::new(lower, upper, since) - }) - } -} - diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 1663dfbc5..4eee120de 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -38,13 +38,10 @@ //! into 'ordered builder'". Then the builder would be bright enough to emit a "batch" for the composite //! trace, rather than just a batch of the type merged. -// pub mod spine_fueled; -pub mod spine_fueled_neu; -pub use self::spine_fueled_neu as spine_fueled; +pub mod spine_fueled; mod merge_batcher; pub use self::merge_batcher::MergeBatcher as Batcher; pub mod ord; -// pub mod hash; diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 368136cc5..b2f3b01e5 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -3,92 +3,84 @@ //! The `Spine` is a general-purpose trace implementation based on collection and merging //! immutable batches of updates. It is generic with respect to the batch type, and can be //! instantiated for any implementor of `trace::Batch`. +//! +//! ## Design +//! +//! This spine is represented as a list of layers, where each element in the list is either +//! +//! 1. MergeState::Vacant empty +//! 2. MergeState::Single a single batch +//! 3. MergeState::Double a pair of batches +//! +//! Each "batch" has the option to be `None`, indicating a non-batch that nonetheless acts +//! as a number of updates proportionate to the level at which it exists (for bookkeeping). +//! +//! Each of the batches at layer i contains at most 2^i elements. The sequence of batches +//! should have the upper bound of one match the lower bound of the next. Batches may be +//! logically empty, with matching upper and lower bounds, as a bookkeeping mechanism. +//! +//! Each batch at layer i is treated as if it contains exactly 2^i elements, even though it +//! may actually contain fewer elements. This allows us to decouple the physical representation +//! from logical amounts of effort invested in each batch. It allows us to begin compaction and +//! to reduce the number of updates, without compromising our ability to continue to move +//! updates along the spine. We are explicitly making the trade-off that while some batches +//! might compact at lower levels, we want to treat them as if they contained their full set of +//! updates for accounting reasons (to apply work to higher levels). +//! +//! We maintain the invariant that for any in-progress merge at level k there should be fewer +//! than 2^k records at levels lower than k. That is, even if we were to apply an unbounded +//! amount of effort to those records, we would not have enough records to prompt a merge into +//! the in-progress merge. Ideally, we maintain the extended invariant that for any in-progress +//! merge at level k, the remaining effort required (number of records minus applied effort) is +//! less than the number of records that would need to be added to reach 2^k records in layers +//! below. +//! +//! ## Mathematics +//! +//! When a merge is initiated, there should be a non-negative *deficit* of updates before the layers +//! below could plausibly produce a new batch for the currently merging layer. We must determine a +//! factor of proportionality, so that newly arrived updates provide at least that amount of "fuel" +//! towards the merging layer, so that the merge completes before lower levels invade. +//! +//! ### Deficit: +//! +//! A new merge is initiated only in response to the completion of a prior merge, or the introduction +//! of new records from outside. The latter case is special, and will maintain our invariant trivially, +//! so we will focus on the former case. +//! +//! When a merge at level k completes, assuming we have maintained our invariant then there should be +//! fewer than 2^k records at lower levels. The newly created merge at level k+1 will require up to +//! 2^k+2 units of work, and should not expect a new batch until strictly more than 2^k records are +//! added. This means that a factor of proportionality of four should be sufficient to ensure that +//! the merge completes before a new merge is initiated. +//! +//! When new records get introduced, we will need to roll up any batches at lower levels, which we +//! treat as the introduction of records. Each of these virtual records introduced should either be +//! accounted for the fuel it should contribute, as it results in the promotion of batches closer to +//! in-progress merges. +//! +//! ### Fuel sharing +//! +//! We like the idea of applying fuel preferentially to merges at *lower* levels, under the idea that +//! they are easier to complete, and we benefit from fewer total merges in progress. This does delay +//! the completion of merges at higher levels, and may not obviously be a total win. If we choose to +//! do this, we should make sure that we correctly account for completed merges at low layers: they +//! should still extract fuel from new updates even though they have completed, at least until they +//! have paid back any "debt" to higher layers by continuing to provide fuel as updates arrive. + use std::fmt::Debug; +use ::logging::Logger; use ::difference::Semigroup; use lattice::Lattice; use trace::{Batch, BatchReader, Trace, TraceReader}; -// use trace::cursor::cursor_list::CursorList; use trace::cursor::{Cursor, CursorList}; use trace::Merger; use ::timely::dataflow::operators::generic::OperatorInfo; - -enum MergeState> { - Merging(B, B, Option>, >::Merger), - Complete(B), -} - -impl> MergeState { - fn complete(mut self, logger: &mut Option<::logging::Logger>, operator: usize, scale: usize) -> B { - if let MergeState::Merging(ref source1, ref source2, ref frontier, ref mut in_progress) = self { - let mut fuel = usize::max_value(); - in_progress.work(source1, source2, frontier, &mut fuel); - assert!(fuel > 0); - } - match self { - // ALLOC: Here is where we may de-allocate batches. - MergeState::Merging(b1, b2, _, finished) => { - let finished = finished.done(); - logger.as_ref().map(|l| - l.log(::logging::MergeEvent { - operator, - scale, - length1: b1.len(), - length2: b2.len(), - complete: Some(finished.len()), - }) - ); - finished - }, - MergeState::Complete(x) => x, - } - } - - fn is_complete(&self) -> bool { - match *self { - MergeState::Complete(_) => true, - _ => false, - } - } - fn begin_merge(batch1: B, batch2: B, frontier: Option>) -> Self { - assert!(batch1.upper() == batch2.lower()); - let begin_merge = >::begin_merge(&batch1, &batch2); - MergeState::Merging(batch1, batch2, frontier, begin_merge) - } - fn work(mut self, fuel: &mut usize, logger: &mut Option<::logging::Logger>, operator: usize, scale: usize) -> Self { - if let MergeState::Merging(ref source1, ref source2, ref frontier, ref mut in_progress) = self { - in_progress.work(source1, source2, frontier, fuel); - } - if *fuel > 0 { - match self { - // ALLOC: Here is where we may de-allocate batches. - MergeState::Merging(b1, b2, _, finished) => { - let finished = finished.done(); - logger.as_ref().map(|l| - l.log(::logging::MergeEvent { - operator, - scale, - length1: b1.len(), - length2: b2.len(), - complete: Some(finished.len()), - }) - ); - MergeState::Complete(finished) - }, - MergeState::Complete(x) => MergeState::Complete(x), - } - } - else { self } - } - fn len(&self) -> usize { - match *self { - MergeState::Merging(ref batch1, ref batch2, _, _) => batch1.len() + batch2.len(), - MergeState::Complete(ref batch) => batch.len(), - } - } -} +use ::timely::progress::{Antichain, frontier::AntichainRef}; +use ::timely::order::PartialOrder; /// An append-only collection of update tuples. /// @@ -97,13 +89,13 @@ impl> MergeState { /// other immutable collections. pub struct Spine> { operator: OperatorInfo, - logger: Option<::logging::Logger>, + logger: Option, phantom: ::std::marker::PhantomData<(K, V, R)>, - logical_frontier: Vec, // Times after which the trace must accumulate correctly. - physical_frontier: Vec, // Times after which the trace must be able to subset its inputs. - merging: Vec>>,// Several possibly shared collections of updates. + logical_frontier: Antichain, // Times after which the trace must accumulate correctly. + physical_frontier: Antichain, // Times after which the trace must be able to subset its inputs. + merging: Vec>,// Several possibly shared collections of updates. pending: Vec, // Batches at times in advance of `frontier`. - upper: Vec, + upper: Antichain, effort: usize, activator: Option, } @@ -112,7 +104,7 @@ impl TraceReader for Spine where K: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). V: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). - T: Lattice+Ord+Clone+Debug+Default, + T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, R: Semigroup, B: Batch+Clone+'static, { @@ -124,7 +116,16 @@ where type Batch = B; type Cursor = CursorList>::Cursor>; - fn cursor_through(&mut self, upper: &[T]) -> Option<(Self::Cursor, >::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)> { + + // If `upper` is the minimum frontier, we can return an empty cursor. + // This can happen with operators that are written to expect the ability to acquire cursors + // for their prior frontiers, and which start at `[T::minimum()]`, such as `Reduce`, sadly. + if upper.less_equal(&T::minimum()) { + let cursors = Vec::new(); + let storage = Vec::new(); + return Some((CursorList::new(cursors, &storage), storage)); + } // The supplied `upper` should have the property that for each of our // batch `lower` and `upper` frontiers, the supplied upper is comparable @@ -137,34 +138,47 @@ where // supplied upper it had better be empty. // We shouldn't grab a cursor into a closed trace, right? - assert!(self.logical_frontier.len() > 0, "cursor_through({:?}) called for closed trace", upper); + assert!(self.logical_frontier.borrow().len() > 0); // Check that `upper` is greater or equal to `self.physical_frontier`. // Otherwise, the cut could be in `self.merging` and it is user error anyhow. - assert!(upper.iter().all(|t1| self.physical_frontier.iter().any(|t2| t2.less_equal(t1)))); + // assert!(upper.iter().all(|t1| self.physical_frontier.iter().any(|t2| t2.less_equal(t1)))); + assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &upper)); let mut cursors = Vec::new(); let mut storage = Vec::new(); for merge_state in self.merging.iter().rev() { - match *merge_state { - Some(MergeState::Merging(ref batch1, ref batch2, _, _)) => { - if !batch1.is_empty() { - cursors.push(batch1.cursor()); - storage.push(batch1.clone()); - } - if !batch2.is_empty() { - cursors.push(batch2.cursor()); - storage.push(batch2.clone()); + match merge_state { + MergeState::Double(variant) => { + match variant { + MergeVariant::InProgress(batch1, batch2, _) => { + if !batch1.is_empty() { + cursors.push(batch1.cursor()); + storage.push(batch1.clone()); + } + if !batch2.is_empty() { + cursors.push(batch2.cursor()); + storage.push(batch2.clone()); + } + }, + MergeVariant::Complete(Some((batch, _))) => { + if !batch.is_empty() { + cursors.push(batch.cursor()); + storage.push(batch.clone()); + } + } + MergeVariant::Complete(None) => { }, } }, - Some(MergeState::Complete(ref batch)) => { + MergeState::Single(Some(batch)) => { if !batch.is_empty() { cursors.push(batch.cursor()); storage.push(batch.clone()); } }, - None => { } + MergeState::Single(None) => { }, + MergeState::Vacant => { }, } } @@ -180,10 +194,10 @@ where // TODO: It is not clear if this is the 100% correct logic, due // to the possible non-total-orderedness of the frontiers. - let include_lower = upper.iter().all(|t1| batch.lower().iter().any(|t2| t2.less_equal(t1))); - let include_upper = upper.iter().all(|t1| batch.upper().iter().any(|t2| t2.less_equal(t1))); + let include_lower = PartialOrder::less_equal(&batch.lower().borrow(), &upper); + let include_upper = PartialOrder::less_equal(&batch.upper().borrow(), &upper); - if include_lower != include_upper && upper != batch.lower() { + if include_lower != include_upper && upper != batch.lower().borrow() { panic!("`cursor_through`: `upper` straddles batch"); } @@ -197,30 +211,26 @@ where Some((CursorList::new(cursors, &storage), storage)) } - fn set_logical_compaction(&mut self, frontier: &[T]) { - self.logical_frontier = frontier.to_vec(); - - // Commenting out for now; causes problems in `read_upper()`. - // If one has an urgent need to release these resources, it - // is probably best just to drop the trace. - - // if self.logical_frontier.len() == 0 { - // self.drop_batches(); - // } + fn set_logical_compaction(&mut self, frontier: AntichainRef) { + // TODO: Re-use allocation + self.logical_frontier = frontier.to_owned(); } - fn get_logical_compaction(&mut self) -> &[T] { &self.logical_frontier[..] } - fn set_physical_compaction(&mut self, frontier: &[T]) { - self.physical_frontier = frontier.to_vec(); + fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_frontier.borrow() } + fn set_physical_compaction(&mut self, frontier: AntichainRef) { + // We should never request to rewind the frontier. + debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier); + self.physical_frontier = frontier.to_owned(); self.consider_merges(); } - fn get_physical_compaction(&mut self) -> &[T] { &self.physical_frontier[..] } + fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_frontier.borrow() } fn map_batches(&self, mut f: F) { for batch in self.merging.iter().rev() { - match *batch { - Some(MergeState::Merging(ref batch1, ref batch2, _, _)) => { f(batch1); f(batch2); }, - Some(MergeState::Complete(ref batch)) => { f(batch); }, - None => { }, + match batch { + MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => { f(batch1); f(batch2); }, + MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => { f(batch) }, + MergeState::Single(Some(batch)) => { f(batch) }, + _ => { }, } } for batch in self.pending.iter() { @@ -235,23 +245,43 @@ impl Trace for Spine where K: Ord+Clone, V: Ord+Clone, - T: Lattice+Ord+Clone+Debug+Default, + T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, R: Semigroup, B: Batch+Clone+'static, { - fn new( info: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>, activator: Option, ) -> Self { - Self::with_effort(4, info, logging, activator) + Self::with_effort(1, info, logging, activator) } - fn exert(&mut self, effort: usize) { - let batch_size = effort.next_power_of_two(); - let batch_index = batch_size.trailing_zeros() as usize; - self.work_for(batch_size, batch_index); + /// Apply some amount of effort to trace maintenance. + /// + /// The units of effort are updates, and the method should be + /// thought of as analogous to inserting as many empty updates, + /// where the trace is permitted to perform proportionate work. + fn exert(&mut self, effort: &mut isize) { + // If there is work to be done, ... + self.tidy_layers(); + if !self.reduced() { + + // If any merges exist, we can directly call `apply_fuel`. + if self.merging.iter().any(|b| b.is_double()) { + self.apply_fuel(effort); + } + // Otherwise, we'll need to introduce fake updates to move merges along. + else { + // Introduce an empty batch with roughly *effort number of virtual updates. + let level = (*effort as usize).next_power_of_two().trailing_zeros() as usize; + self.introduce_batch(None, level); + } + // We were not in reduced form, so let's check again in the future. + if let Some(activator) = &self.activator { + activator.activate(); + } + } } // Ideally, this method acts as insertion of `batch`, even if we are not yet able to begin @@ -259,26 +289,28 @@ where // to the size of batch. fn insert(&mut self, batch: Self::Batch) { + // Log the introduction of a batch. self.logger.as_ref().map(|l| l.log(::logging::BatchEvent { operator: self.operator.global_id, length: batch.len() })); assert!(batch.lower() != batch.upper()); - assert_eq!(batch.lower(), &self.upper[..]); + assert_eq!(batch.lower(), &self.upper); - self.upper = batch.upper().to_vec(); + self.upper.clone_from(batch.upper()); // TODO: Consolidate or discard empty batches. self.pending.push(batch); self.consider_merges(); } + /// Completes the trace with a final empty batch. fn close(&mut self) { - if !self.upper.is_empty() { + if !self.upper.borrow().is_empty() { use trace::Builder; let builder = B::Builder::new(); - let batch = builder.done(&self.upper[..], &[], &[T::minimum()]); + let batch = builder.done(self.upper.clone(), Antichain::new(), Antichain::from_elem(T::minimum())); self.insert(batch); } } @@ -307,11 +339,30 @@ where fn drop_batches(&mut self) { if let Some(logger) = &self.logger { for batch in self.merging.drain(..) { - if let Some(batch) = batch { - logger.log(::logging::DropEvent { - operator: self.operator.global_id, - length: batch.len(), - }); + match batch { + MergeState::Single(Some(batch)) => { + logger.log(::logging::DropEvent { + operator: self.operator.global_id, + length: batch.len(), + }); + }, + MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => { + logger.log(::logging::DropEvent { + operator: self.operator.global_id, + length: batch1.len(), + }); + logger.log(::logging::DropEvent { + operator: self.operator.global_id, + length: batch2.len(), + }); + }, + MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => { + logger.log(::logging::DropEvent { + operator: self.operator.global_id, + length: batch.len(), + }); + } + _ => { }, } } for batch in self.pending.drain(..) { @@ -328,10 +379,40 @@ impl Spine where K: Ord+Clone, V: Ord+Clone, - T: Lattice+Ord+Clone+Debug+Default, + T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, R: Semigroup, B: Batch, { + /// True iff there is at most one non-empty batch in `self.merging`. + /// + /// When true, there is no maintenance work to perform in the trace, other than compaction. + /// We do not yet have logic in place to determine if compaction would improve a trace, so + /// for now we are ignoring that. + fn reduced(&self) -> bool { + let mut non_empty = 0; + for index in 0 .. self.merging.len() { + if self.merging[index].is_double() { return false; } + if self.merging[index].len() > 0 { non_empty += 1; } + if non_empty > 1 { return false; } + } + true + } + + /// Describes the merge progress of layers in the trace. + /// + /// Intended for diagnostics rather than public consumption. + #[allow(dead_code)] + fn describe(&self) -> Vec<(usize, usize)> { + self.merging + .iter() + .map(|b| match b { + MergeState::Vacant => (0, 0), + x @ MergeState::Single(_) => (1, x.len()), + x @ MergeState::Double(_) => (2, x.len()), + }) + .collect() + } + /// Allocates a fueled `Spine` with a specified effort multiplier. /// /// This trace will merge batches progressively, with each inserted batch applying a multiple @@ -351,226 +432,486 @@ where operator, logger, phantom: ::std::marker::PhantomData, - logical_frontier: vec![::minimum()], - physical_frontier: vec![::minimum()], + logical_frontier: Antichain::from_elem(::minimum()), + physical_frontier: Antichain::from_elem(::minimum()), merging: Vec::new(), pending: Vec::new(), - upper: vec![Default::default()], + upper: Antichain::from_elem(::minimum()), effort, activator, } } - // Migrate data from `self.pending` into `self.merging`. + /// Migrate data from `self.pending` into `self.merging`. + /// + /// This method reflects on the bookmarks held by others that may prevent merging, and in the + /// case that new batches can be introduced to the pile of mergeable batches, it gets on that. #[inline(never)] fn consider_merges(&mut self) { - // We have a new design here, in an attempt to rationalize progressive merging of batches. - // - // Batches arrive with a number of records, and are assigned a power-of-two "size", which is this - // number rounded up to the next power of two. Batches are placed in a slot based on their size, - // and each slot can be one of: - // - // i. empty, - // ii. occupied by a batch, - // iii. occupied by a merge in progress (two batches). + // TODO: Consider merging pending batches before introducing them. + // TODO: We could use a `VecDeque` here to draw from the front and append to the back. + while self.pending.len() > 0 && PartialOrder::less_equal(self.pending[0].upper(), &self.physical_frontier) + // self.physical_frontier.iter().all(|t1| self.pending[0].upper().iter().any(|t2| t2.less_equal(t1))) + { + // Batch can be taken in optimized insertion. + // Otherwise it is inserted normally at the end of the method. + let mut batch = Some(self.pending.remove(0)); + + // If `batch` and the most recently inserted batch are both empty, we can just fuse them. + // We can also replace a structurally empty batch with this empty batch, preserving the + // apparent record count but now with non-trivial lower and upper bounds. + if batch.as_ref().unwrap().len() == 0 { + if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) { + if self.merging[position].is_single() && self.merging[position].len() == 0 { + self.insert_at(batch.take(), position); + let merged = self.complete_at(position); + self.merging[position] = MergeState::Single(merged); + } + } + } + + // Normal insertion for the batch. + if let Some(batch) = batch { + let index = batch.len().next_power_of_two(); + self.introduce_batch(Some(batch), index.trailing_zeros() as usize); + } + } + + // Having performed all of our work, if more than one batch remains reschedule ourself. + if !self.reduced() { + if let Some(activator) = &self.activator { + activator.activate(); + } + } + } + + /// Introduces a batch at an indicated level. + /// + /// The level indication is often related to the size of the batch, but + /// it can also be used to artificially fuel the computation by supplying + /// empty batches at non-trivial indices, to move merges along. + pub fn introduce_batch(&mut self, batch: Option, batch_index: usize) { + + // Step 0. Determine an amount of fuel to use for the computation. // - // Our goal is to track the behavior of hypothetical ideal merge strategy, in which whenever two - // elements want to occupy the same slot, we merge them and re-evaluate which slot they should be in. - // Perhaps they are fine where they were, but most likely they want to advance to the next slot. - // If the next slot is free they move there and stop, and if it is occupied we continue to merge - // until we find a free slot (or introduce a new empty slot at the end of the list). + // Fuel is used to drive maintenance of the data structure, + // and in particular are used to make progress through merges + // that are in progress. The amount of fuel to use should be + // proportional to the number of records introduced, so that + // we are guaranteed to complete all merges before they are + // required as arguments to merges again. // - // We do not actually want to execute the merges in this *sequence*, because it may involve a great - // deal of computation all at once, but we do want to execute this *set* of merges. + // The fuel use policy is negotiable, in that we might aim + // to use relatively less when we can, so that we return + // control promptly, or we might account more work to larger + // batches. Not clear to me which are best, of if there + // should be a configuration knob controlling this. + + // The amount of fuel to use is proportional to 2^batch_index, scaled + // by a factor of self.effort which determines how eager we are in + // performing maintenance work. We need to ensure that each merge in + // progress receives fuel for each introduced batch, and so multiply + // by that as well. + if batch_index > 32 { println!("Large batch index: {}", batch_index); } + + // We believe that eight units of fuel is sufficient for each introduced + // record, accounted as four for each record, and a potential four more + // for each virtual record associated with promoting existing smaller + // batches. We could try and make this be less, or be scaled to merges + // based on their deficit at time of instantiation. For now, we remain + // conservative. + let mut fuel = 8 << batch_index; + // Scale up by the effort parameter, which is calibrated to one as the + // minimum amount of effort. + fuel *= self.effort; + // Convert to an `isize` so we can observe any fuel shortfall. + let mut fuel = fuel as isize; + + // Step 1. Apply fuel to each in-progress merge. // - // Our strategy is that we initiate merges and perform work progressively, and when a merge completes - // we immediately consider initiating a merge with the next slot (if appropriate). The intent is that - // any merge we initiate will cascade along exactly as in the eager merging case, as no new batch can - // overtake any merges in progress. + // Before we can introduce new updates, we must apply any + // fuel to in-progress merges, as this fuel is what ensures + // that the merges will be complete by the time we insert + // the updates. + self.apply_fuel(&mut fuel); + + // Step 2. We must ensure the invariant that adjacent layers do not + // contain two batches will be satisfied when we insert the + // batch. We forcibly completing all merges at layers lower + // than and including `batch_index`, so that the new batch + // is inserted into an empty layer. // - // Our main technical issue is that we need to ensure that merges complete before other batches want - // to initiate a merge with their resulting batch. + // We could relax this to "strictly less than `batch_index`" + // if the layer above has only a single batch in it, which + // seems not implausible if it has been the focus of effort. // - // To this end, as batches are introduced, we perform an amount of work on all existing merges that - // is proportional to the size of the introduced batch. The intent is that once a merge is initiated, - // in the eager case, we must introduce at least as many records as the slot the merge lands in before - // we will spill out of the slot the merge is departing. There is the technicality that the amount of - // work we perform on a merge is proportional to the new batch size *times* the number of slots above - // it until the next merge is reached. These slots could be merges in progress, if the one we work on - // would cascade in the reference merge sequence. - - // We maintain a few invariants as we execute, with the intent of maintaining a minimal - // representation. First, batches are ordered from newest to oldest. + // This should be interpreted as the introduction of some + // volume of fake updates, and we will need to fuel merges + // by a proportional amount to ensure that they are not + // surprised later on. The number of fake updates should + // correspond to the deficit for the layer, which perhaps + // we should track explicitly. + self.roll_up(batch_index); + + // Step 3. This insertion should be into an empty layer. It is a + // logical error otherwise, as we may be violating our + // invariant, from which all wonderment derives. + self.insert_at(batch, batch_index); + + // Step 4. Tidy the largest layers. // - // 1. merging[i].lower == merging[i+1].upper (unless either is None). - // 2. large batches never have small indices. + // It is important that we not tidy only smaller layers, + // as their ascension is what ensures the merging and + // eventual compaction of the largest layers. + self.tidy_layers(); + } - while self.pending.len() > 0 && - self.physical_frontier.iter().all(|t1| self.pending[0].upper().iter().any(|t2| t2.less_equal(t1))) - { - // this could be a VecDeque, if we ever notice this. - let batch = self.pending.remove(0); - - // Step 0: Determine batch size and target slot. - let batch_size = batch.len().next_power_of_two(); - let batch_index = batch_size.trailing_zeros() as usize; - while self.merging.len() <= batch_index { self.merging.push(None); } - - if self.merging.len() > 32 { eprintln!("large progressive merge; len: {:?}", self.merging.len()); } - - // Step 1: Forcibly merge batches in lower slots. - for position in 0 .. batch_index { - if let Some(batch) = self.merging[position].take() { - let batch = batch.complete(&mut self.logger, self.operator.global_id, position); - if let Some(batch2) = self.merging[position+1].take() { - let batch2 = batch2.complete(&mut self.logger, self.operator.global_id, position); - self.logger.as_ref().map(|l| l.log( - ::logging::MergeEvent { - operator: self.operator.global_id, - scale: position, - length1: batch.len(), - length2: batch2.len(), - complete: None, - } - )); - self.merging[position+1] = Some(MergeState::begin_merge(batch2, batch, None)); - } - else { - self.merging[position+1] = Some(MergeState::Complete(batch)); - }; - } + /// Ensures that an insertion at layer `index` will succeed. + /// + /// This method is subject to the constraint that all existing batches + /// should occur at higher levels, which requires it to "roll up" batches + /// present at lower levels before the method is called. In doing this, + /// we should not introduce more virtual records than 2^index, as that + /// is the amount of excess fuel we have budgeted for completing merges. + fn roll_up(&mut self, index: usize) { + + // Ensure entries sufficient for `index`. + while self.merging.len() <= index { + self.merging.push(MergeState::Vacant); + } + + // We only need to roll up if there are non-vacant layers. + if self.merging[.. index].iter().any(|m| !m.is_vacant()) { + + // Collect and merge all batches at layers up to but not including `index`. + let mut merged = None; + for i in 0 .. index { + self.insert_at(merged, i); + merged = self.complete_at(i); } - // Step 2: Insert new batch at target position - if let Some(batch2) = self.merging[batch_index].take() { - let batch2 = batch2.complete(&mut self.logger, self.operator.global_id, batch_index); - let frontier = if batch_index == self.merging.len()-1 { Some(self.get_logical_compaction.clone()) } else { None }; + // The merged results should be introduced at level `index`, which should + // be ready to absorb them (possibly creating a new merge at the time). + self.insert_at(merged, index); + + // If the insertion results in a merge, we should complete it to ensure + // the upcoming insertion at `index` does not panic. + if self.merging[index].is_double() { + let merged = self.complete_at(index); + self.insert_at(merged, index + 1); + } + } + } + + /// Applies an amount of fuel to merges in progress. + /// + /// The supplied `fuel` is for each in progress merge, and if we want to spend + /// the fuel non-uniformly (e.g. prioritizing merges at low layers) we could do + /// so in order to maintain fewer batches on average (at the risk of completing + /// merges of large batches later, but tbh probably not much later). + pub fn apply_fuel(&mut self, fuel: &mut isize) { + // For the moment our strategy is to apply fuel independently to each merge + // in progress, rather than prioritizing small merges. This sounds like a + // great idea, but we need better accounting in place to ensure that merges + // that borrow against later layers but then complete still "acquire" fuel + // to pay back their debts. + for index in 0 .. self.merging.len() { + // Give each level independent fuel, for now. + let mut fuel = *fuel; + // Pass along various logging stuffs, in case we need to report success. + self.merging[index].work(&mut fuel); + // `fuel` could have a deficit at this point, meaning we over-spent when + // we took a merge step. We could ignore this, or maintain the deficit + // and account future fuel against it before spending again. It isn't + // clear why that would be especially helpful to do; we might want to + // avoid overspends at multiple layers in the same invocation (to limit + // latencies), but there is probably a rich policy space here. + + // If a merge completes, we can immediately merge it in to the next + // level, which is "guaranteed" to be complete at this point, by our + // fueling discipline. + if self.merging[index].is_complete() { + let complete = self.complete_at(index); + self.insert_at(complete, index+1); + } + } + } + + /// Inserts a batch at a specific location. + /// + /// This is a non-public internal method that can panic if we try and insert into a + /// layer which already contains two batches (and is still in the process of merging). + fn insert_at(&mut self, batch: Option, index: usize) { + // Ensure the spine is large enough. + while self.merging.len() <= index { + self.merging.push(MergeState::Vacant); + } + + // Insert the batch at the location. + match self.merging[index].take() { + MergeState::Vacant => { + self.merging[index] = MergeState::Single(batch); + } + MergeState::Single(old) => { + // Log the initiation of a merge. self.logger.as_ref().map(|l| l.log( ::logging::MergeEvent { operator: self.operator.global_id, - scale: batch_index, - length1: batch.len(), - length2: batch2.len(), + scale: index, + length1: old.as_ref().map(|b| b.len()).unwrap_or(0), + length2: batch.as_ref().map(|b| b.len()).unwrap_or(0), complete: None, } )); - self.merging[batch_index] = Some(MergeState::begin_merge(batch2, batch, frontier)); + let compaction_frontier = Some(self.logical_frontier.borrow()); + self.merging[index] = MergeState::begin_merge(old, batch, compaction_frontier); } - else { - self.merging[batch_index] = Some(MergeState::Complete(batch)); + MergeState::Double(_) => { + panic!("Attempted to insert batch into incomplete merge!") } + }; + } - self.work_for(batch_size, batch_index); + /// Completes and extracts what ever is at layer `index`. + fn complete_at(&mut self, index: usize) -> Option { + if let Some((merged, inputs)) = self.merging[index].complete() { + if let Some((input1, input2)) = inputs { + // Log the completion of a merge from existing parts. + self.logger.as_ref().map(|l| l.log( + ::logging::MergeEvent { + operator: self.operator.global_id, + scale: index, + length1: input1.len(), + length2: input2.len(), + complete: Some(merged.len()), + } + )); + } + Some(merged) + } + else { + None } } - /// Performs merging work commensurate with `batch_size` records. - /// - /// This method is public so that it can be invoked by external parties. - /// In principle, this work might be better suited to a thread pool with - /// more insight into available cycles. As long as the method is used as - /// above, the merging work needs to happen immediately (or thereabouts) - /// to avoid falling behind on merges. - pub fn work_for(&mut self, batch_size: usize, batch_index: usize) { - - // Step 3: Perform `size` work on each in-progress merge, from large to small. - // For non-merges, accumulate fuel, as we may need to apply it to merges - // that result at us. - let mut fuel = 0; - for position in (batch_index .. self.merging.len()).rev() { - - // We add fuel for any merge that may lead to this location. - fuel += (2 * batch_size).saturating_mul(self.effort); - - // We now move to the right, merging until we stop merging or run out of fuel. - let mut new_position = position; - while self.merging[new_position].as_ref().map(|x| !x.is_complete()).unwrap_or(false) && fuel > 0 { - if let Some(mut batch) = self.merging[new_position].take() { - - // Apply work with accumulated fuel. - batch = batch.work(&mut fuel, &mut self.logger, self.operator.global_id, position); - - // If we have a complete batch, and it wants to be in the next slot ... - if batch.is_complete() && batch.len() >= (1 << new_position) {//.next_power_of_two().trailing_zeros() as usize > new_position { - - new_position += 1; - if self.merging.len() <= new_position { self.merging.push(None); } - - // If the next slot is actually occupied, must start a merge. - if let Some(mut batch2) = self.merging[new_position].take() { - if !batch2.is_complete() { - let mut temp_fuel = usize::max_value(); - batch2 = batch2.work(&mut temp_fuel, &mut self.logger, self.operator.global_id, position); - self.logger.as_ref().map(|l| l.log( - ::logging::MergeShortfall { - operator: self.operator.global_id, - scale: new_position, - shortfall: usize::max_value() - temp_fuel, - } - )); - } - let batch1 = batch.complete(&mut self.logger, self.operator.global_id, position); - let batch2 = batch2.complete(&mut self.logger, self.operator.global_id, position); - // if this is the last position, engage compaction. - let frontier = if new_position+1 == self.merging.len() { Some(self.get_logical_compaction.clone()) } else { None }; - self.logger.as_ref().map(|l| l.log( - ::logging::MergeEvent { - operator: self.operator.global_id, - scale: position, - length1: batch1.len(), - length2: batch2.len(), - complete: None, + /// Attempts to draw down large layers to size appropriate layers. + fn tidy_layers(&mut self) { + + // If the largest layer is complete (not merging), we can attempt + // to draw it down to the next layer. This is permitted if we can + // maintain our invariant that below each merge there are at most + // half the records that would be required to invade the merge. + if !self.merging.is_empty() { + let mut length = self.merging.len(); + if self.merging[length-1].is_single() { + + // To move a batch down, we require that it contain few + // enough records that the lower level is appropriate, + // and that moving the batch would not create a merge + // violating our invariant. + + let appropriate_level = self.merging[length-1].len().next_power_of_two().trailing_zeros() as usize; + + // Continue only as far as is appropriate + while appropriate_level < length-1 { + + match self.merging[length-2].take() { + // Vacant or structurally empty batches can be absorbed. + MergeState::Vacant | MergeState::Single(None) => { + self.merging.remove(length-2); + length = self.merging.len(); + } + // Single batches may initiate a merge, if sizes are + // within bounds, but terminate the loop either way. + MergeState::Single(Some(batch)) => { + + // Determine the number of records that might lead + // to a merge. Importantly, this is not the number + // of actual records, but the sum of upper bounds + // based on indices. + let mut smaller = 0; + for (index, batch) in self.merging[..(length-2)].iter().enumerate() { + match batch { + MergeState::Vacant => { }, + MergeState::Single(_) => { smaller += 1 << index; }, + MergeState::Double(_) => { smaller += 2 << index; }, } - )); - self.merging[new_position] = Some(MergeState::begin_merge(batch2, batch1, frontier)); + } + + if smaller <= (1 << length) / 8 { + self.merging.remove(length-2); + self.insert_at(Some(batch), length-2); + } + else { + self.merging[length-2] = MergeState::Single(Some(batch)); + } + return; } - else { - self.merging[new_position] = Some(batch); + // If a merge is in progress there is nothing to do. + MergeState::Double(state) => { + self.merging[length-2] = MergeState::Double(state); + return; } } - else { - self.merging[new_position] = Some(batch); - } - } - else { - // We can't be here. The while condition ensures that an entry exists. } } } + } +} - // Step 4: Consider migrating complete batches to lower bins, if appropriate. - for index in (1 .. self.merging.len()).rev() { - if self.merging[index].as_ref().map(|x| x.is_complete() && x.len() < (1 << (index-1))).unwrap_or(false) { - if self.merging[index-1].is_none() { - self.merging[index-1] = self.merging[index].take(); - } - } + +/// Describes the state of a layer. +/// +/// A layer can be empty, contain a single batch, or contain a pair of batches +/// that are in the process of merging into a batch for the next layer. +enum MergeState> { + /// An empty layer, containing no updates. + Vacant, + /// A layer containing a single batch. + /// + /// The `None` variant is used to represent a structurally empty batch present + /// to ensure the progress of maintenance work. + Single(Option), + /// A layer containing two batches, in the process of merging. + Double(MergeVariant), +} + +impl> MergeState { + + /// The number of actual updates contained in the level. + fn len(&self) -> usize { + match self { + MergeState::Single(Some(b)) => b.len(), + MergeState::Double(MergeVariant::InProgress(b1,b2,_)) => b1.len() + b2.len(), + MergeState::Double(MergeVariant::Complete(Some((b, _)))) => b.len(), + _ => 0, } - while self.merging.last().map(|x| x.is_none()) == Some(true) { self.merging.pop(); } - - if let Some(activator) = self.activator.as_mut() { - let mut should_activate = false; - let mut status = String::new(); - for batch in self.merging.iter() { - if let Some(batch) = batch { - if !batch.is_complete() { - should_activate = true; - status.push('M'); - } - else { - status.push('C'); - } + } - } - else { - status.push('E'); - } + /// True only for the MergeState::Vacant variant. + fn is_vacant(&self) -> bool { + if let MergeState::Vacant = self { true } else { false } + } + + /// True only for the MergeState::Single variant. + fn is_single(&self) -> bool { + if let MergeState::Single(_) = self { true } else { false } + } + + /// True only for the MergeState::Double variant. + fn is_double(&self) -> bool { + if let MergeState::Double(_) = self { true } else { false } + } + + /// Immediately complete any merge. + /// + /// The result is either a batch, if there is a non-trivial batch to return + /// or `None` if there is no meaningful batch to return. This does not distinguish + /// between Vacant entries and structurally empty batches, which should be done + /// with the `is_complete()` method. + /// + /// There is the addional option of input batches. + fn complete(&mut self) -> Option<(B, Option<(B, B)>)> { + match std::mem::replace(self, MergeState::Vacant) { + MergeState::Vacant => None, + MergeState::Single(batch) => batch.map(|b| (b, None)), + MergeState::Double(variant) => variant.complete(), + } + } + + /// True iff the layer is a complete merge, ready for extraction. + fn is_complete(&mut self) -> bool { + if let MergeState::Double(MergeVariant::Complete(_)) = self { + true + } + else { + false + } + } + + /// Performs a bounded amount of work towards a merge. + /// + /// If the merge completes, the resulting batch is returned. + /// If a batch is returned, it is the obligation of the caller + /// to correctly install the result. + fn work(&mut self, fuel: &mut isize) { + // We only perform work for merges in progress. + if let MergeState::Double(layer) = self { + layer.work(fuel) + } + } + + /// Extract the merge state, typically temporarily. + fn take(&mut self) -> Self { + std::mem::replace(self, MergeState::Vacant) + } + + /// Initiates the merge of an "old" batch with a "new" batch. + /// + /// The upper frontier of the old batch should match the lower + /// frontier of the new batch, with the resulting batch describing + /// their composed interval, from the lower frontier of the old + /// batch to the upper frontier of the new batch. + /// + /// Either batch may be `None` which corresponds to a structurally + /// empty batch whose upper and lower froniers are equal. This + /// option exists purely for bookkeeping purposes, and no computation + /// is performed to merge the two batches. + fn begin_merge(batch1: Option, batch2: Option, compaction_frontier: Option>) -> MergeState { + let variant = + match (batch1, batch2) { + (Some(batch1), Some(batch2)) => { + assert!(batch1.upper() == batch2.lower()); + let begin_merge = >::begin_merge(&batch1, &batch2, compaction_frontier); + MergeVariant::InProgress(batch1, batch2, begin_merge) } - println!("activator present; should_activate: {:?},\t{:?}", should_activate, status); - if should_activate { - activator.activate(); + (None, Some(x)) => MergeVariant::Complete(Some((x, None))), + (Some(x), None) => MergeVariant::Complete(Some((x, None))), + (None, None) => MergeVariant::Complete(None), + }; + + MergeState::Double(variant) + } +} + +enum MergeVariant> { + /// Describes an actual in-progress merge between two non-trivial batches. + InProgress(B, B, >::Merger), + /// A merge that requires no further work. May or may not represent a non-trivial batch. + Complete(Option<(B, Option<(B, B)>)>), +} + +impl> MergeVariant { + + /// Completes and extracts the batch, unless structurally empty. + /// + /// The result is either `None`, for structurally empty batches, + /// or a batch and optionally input batches from which it derived. + fn complete(mut self) -> Option<(B, Option<(B, B)>)> { + let mut fuel = isize::max_value(); + self.work(&mut fuel); + if let MergeVariant::Complete(batch) = self { batch } + else { panic!("Failed to complete a merge!"); } + } + + /// Applies some amount of work, potentially completing the merge. + /// + /// In case the work completes, the source batches are returned. + /// This allows the caller to manage the released resources. + fn work(&mut self, fuel: &mut isize) { + let variant = std::mem::replace(self, MergeVariant::Complete(None)); + if let MergeVariant::InProgress(b1,b2,mut merge) = variant { + merge.work(&b1,&b2,fuel); + if *fuel > 0 { + *self = MergeVariant::Complete(Some((merge.done(), Some((b1,b2))))); } + else { + *self = MergeVariant::InProgress(b1,b2,merge); + } + } + else { + *self = variant; } } } diff --git a/src/trace/implementations/spine_fueled_neu.rs b/src/trace/implementations/spine_fueled_neu.rs deleted file mode 100644 index b2f3b01e5..000000000 --- a/src/trace/implementations/spine_fueled_neu.rs +++ /dev/null @@ -1,917 +0,0 @@ -//! An append-only collection of update batches. -//! -//! The `Spine` is a general-purpose trace implementation based on collection and merging -//! immutable batches of updates. It is generic with respect to the batch type, and can be -//! instantiated for any implementor of `trace::Batch`. -//! -//! ## Design -//! -//! This spine is represented as a list of layers, where each element in the list is either -//! -//! 1. MergeState::Vacant empty -//! 2. MergeState::Single a single batch -//! 3. MergeState::Double a pair of batches -//! -//! Each "batch" has the option to be `None`, indicating a non-batch that nonetheless acts -//! as a number of updates proportionate to the level at which it exists (for bookkeeping). -//! -//! Each of the batches at layer i contains at most 2^i elements. The sequence of batches -//! should have the upper bound of one match the lower bound of the next. Batches may be -//! logically empty, with matching upper and lower bounds, as a bookkeeping mechanism. -//! -//! Each batch at layer i is treated as if it contains exactly 2^i elements, even though it -//! may actually contain fewer elements. This allows us to decouple the physical representation -//! from logical amounts of effort invested in each batch. It allows us to begin compaction and -//! to reduce the number of updates, without compromising our ability to continue to move -//! updates along the spine. We are explicitly making the trade-off that while some batches -//! might compact at lower levels, we want to treat them as if they contained their full set of -//! updates for accounting reasons (to apply work to higher levels). -//! -//! We maintain the invariant that for any in-progress merge at level k there should be fewer -//! than 2^k records at levels lower than k. That is, even if we were to apply an unbounded -//! amount of effort to those records, we would not have enough records to prompt a merge into -//! the in-progress merge. Ideally, we maintain the extended invariant that for any in-progress -//! merge at level k, the remaining effort required (number of records minus applied effort) is -//! less than the number of records that would need to be added to reach 2^k records in layers -//! below. -//! -//! ## Mathematics -//! -//! When a merge is initiated, there should be a non-negative *deficit* of updates before the layers -//! below could plausibly produce a new batch for the currently merging layer. We must determine a -//! factor of proportionality, so that newly arrived updates provide at least that amount of "fuel" -//! towards the merging layer, so that the merge completes before lower levels invade. -//! -//! ### Deficit: -//! -//! A new merge is initiated only in response to the completion of a prior merge, or the introduction -//! of new records from outside. The latter case is special, and will maintain our invariant trivially, -//! so we will focus on the former case. -//! -//! When a merge at level k completes, assuming we have maintained our invariant then there should be -//! fewer than 2^k records at lower levels. The newly created merge at level k+1 will require up to -//! 2^k+2 units of work, and should not expect a new batch until strictly more than 2^k records are -//! added. This means that a factor of proportionality of four should be sufficient to ensure that -//! the merge completes before a new merge is initiated. -//! -//! When new records get introduced, we will need to roll up any batches at lower levels, which we -//! treat as the introduction of records. Each of these virtual records introduced should either be -//! accounted for the fuel it should contribute, as it results in the promotion of batches closer to -//! in-progress merges. -//! -//! ### Fuel sharing -//! -//! We like the idea of applying fuel preferentially to merges at *lower* levels, under the idea that -//! they are easier to complete, and we benefit from fewer total merges in progress. This does delay -//! the completion of merges at higher levels, and may not obviously be a total win. If we choose to -//! do this, we should make sure that we correctly account for completed merges at low layers: they -//! should still extract fuel from new updates even though they have completed, at least until they -//! have paid back any "debt" to higher layers by continuing to provide fuel as updates arrive. - - -use std::fmt::Debug; - -use ::logging::Logger; -use ::difference::Semigroup; -use lattice::Lattice; -use trace::{Batch, BatchReader, Trace, TraceReader}; -use trace::cursor::{Cursor, CursorList}; -use trace::Merger; - -use ::timely::dataflow::operators::generic::OperatorInfo; -use ::timely::progress::{Antichain, frontier::AntichainRef}; -use ::timely::order::PartialOrder; - -/// An append-only collection of update tuples. -/// -/// A spine maintains a small number of immutable collections of update tuples, merging the collections when -/// two have similar sizes. In this way, it allows the addition of more tuples, which may then be merged with -/// other immutable collections. -pub struct Spine> { - operator: OperatorInfo, - logger: Option, - phantom: ::std::marker::PhantomData<(K, V, R)>, - logical_frontier: Antichain, // Times after which the trace must accumulate correctly. - physical_frontier: Antichain, // Times after which the trace must be able to subset its inputs. - merging: Vec>,// Several possibly shared collections of updates. - pending: Vec, // Batches at times in advance of `frontier`. - upper: Antichain, - effort: usize, - activator: Option, -} - -impl TraceReader for Spine -where - K: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). - V: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). - T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, - R: Semigroup, - B: Batch+Clone+'static, -{ - type Key = K; - type Val = V; - type Time = T; - type R = R; - - type Batch = B; - type Cursor = CursorList>::Cursor>; - - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)> { - - // If `upper` is the minimum frontier, we can return an empty cursor. - // This can happen with operators that are written to expect the ability to acquire cursors - // for their prior frontiers, and which start at `[T::minimum()]`, such as `Reduce`, sadly. - if upper.less_equal(&T::minimum()) { - let cursors = Vec::new(); - let storage = Vec::new(); - return Some((CursorList::new(cursors, &storage), storage)); - } - - // The supplied `upper` should have the property that for each of our - // batch `lower` and `upper` frontiers, the supplied upper is comparable - // to the frontier; it should not be incomparable, because the frontiers - // that we created form a total order. If it is, there is a bug. - // - // We should acquire a cursor including all batches whose upper is less - // or equal to the supplied upper, excluding all batches whose lower is - // greater or equal to the supplied upper, and if a batch straddles the - // supplied upper it had better be empty. - - // We shouldn't grab a cursor into a closed trace, right? - assert!(self.logical_frontier.borrow().len() > 0); - - // Check that `upper` is greater or equal to `self.physical_frontier`. - // Otherwise, the cut could be in `self.merging` and it is user error anyhow. - // assert!(upper.iter().all(|t1| self.physical_frontier.iter().any(|t2| t2.less_equal(t1)))); - assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &upper)); - - let mut cursors = Vec::new(); - let mut storage = Vec::new(); - - for merge_state in self.merging.iter().rev() { - match merge_state { - MergeState::Double(variant) => { - match variant { - MergeVariant::InProgress(batch1, batch2, _) => { - if !batch1.is_empty() { - cursors.push(batch1.cursor()); - storage.push(batch1.clone()); - } - if !batch2.is_empty() { - cursors.push(batch2.cursor()); - storage.push(batch2.clone()); - } - }, - MergeVariant::Complete(Some((batch, _))) => { - if !batch.is_empty() { - cursors.push(batch.cursor()); - storage.push(batch.clone()); - } - } - MergeVariant::Complete(None) => { }, - } - }, - MergeState::Single(Some(batch)) => { - if !batch.is_empty() { - cursors.push(batch.cursor()); - storage.push(batch.clone()); - } - }, - MergeState::Single(None) => { }, - MergeState::Vacant => { }, - } - } - - for batch in self.pending.iter() { - - if !batch.is_empty() { - - // For a non-empty `batch`, it is a catastrophic error if `upper` - // requires some-but-not-all of the updates in the batch. We can - // determine this from `upper` and the lower and upper bounds of - // the batch itself. - // - // TODO: It is not clear if this is the 100% correct logic, due - // to the possible non-total-orderedness of the frontiers. - - let include_lower = PartialOrder::less_equal(&batch.lower().borrow(), &upper); - let include_upper = PartialOrder::less_equal(&batch.upper().borrow(), &upper); - - if include_lower != include_upper && upper != batch.lower().borrow() { - panic!("`cursor_through`: `upper` straddles batch"); - } - - // include pending batches - if include_upper { - cursors.push(batch.cursor()); - storage.push(batch.clone()); - } - } - } - - Some((CursorList::new(cursors, &storage), storage)) - } - fn set_logical_compaction(&mut self, frontier: AntichainRef) { - // TODO: Re-use allocation - self.logical_frontier = frontier.to_owned(); - } - fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_frontier.borrow() } - fn set_physical_compaction(&mut self, frontier: AntichainRef) { - // We should never request to rewind the frontier. - debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier); - self.physical_frontier = frontier.to_owned(); - self.consider_merges(); - } - fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_frontier.borrow() } - - fn map_batches(&self, mut f: F) { - for batch in self.merging.iter().rev() { - match batch { - MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => { f(batch1); f(batch2); }, - MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => { f(batch) }, - MergeState::Single(Some(batch)) => { f(batch) }, - _ => { }, - } - } - for batch in self.pending.iter() { - f(batch); - } - } -} - -// A trace implementation for any key type that can be borrowed from or converted into `Key`. -// TODO: Almost all this implementation seems to be generic with respect to the trace and batch types. -impl Trace for Spine -where - K: Ord+Clone, - V: Ord+Clone, - T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, - R: Semigroup, - B: Batch+Clone+'static, -{ - fn new( - info: ::timely::dataflow::operators::generic::OperatorInfo, - logging: Option<::logging::Logger>, - activator: Option, - ) -> Self { - Self::with_effort(1, info, logging, activator) - } - - /// Apply some amount of effort to trace maintenance. - /// - /// The units of effort are updates, and the method should be - /// thought of as analogous to inserting as many empty updates, - /// where the trace is permitted to perform proportionate work. - fn exert(&mut self, effort: &mut isize) { - // If there is work to be done, ... - self.tidy_layers(); - if !self.reduced() { - - // If any merges exist, we can directly call `apply_fuel`. - if self.merging.iter().any(|b| b.is_double()) { - self.apply_fuel(effort); - } - // Otherwise, we'll need to introduce fake updates to move merges along. - else { - // Introduce an empty batch with roughly *effort number of virtual updates. - let level = (*effort as usize).next_power_of_two().trailing_zeros() as usize; - self.introduce_batch(None, level); - } - // We were not in reduced form, so let's check again in the future. - if let Some(activator) = &self.activator { - activator.activate(); - } - } - } - - // Ideally, this method acts as insertion of `batch`, even if we are not yet able to begin - // merging the batch. This means it is a good time to perform amortized work proportional - // to the size of batch. - fn insert(&mut self, batch: Self::Batch) { - - // Log the introduction of a batch. - self.logger.as_ref().map(|l| l.log(::logging::BatchEvent { - operator: self.operator.global_id, - length: batch.len() - })); - - assert!(batch.lower() != batch.upper()); - assert_eq!(batch.lower(), &self.upper); - - self.upper.clone_from(batch.upper()); - - // TODO: Consolidate or discard empty batches. - self.pending.push(batch); - self.consider_merges(); - } - - /// Completes the trace with a final empty batch. - fn close(&mut self) { - if !self.upper.borrow().is_empty() { - use trace::Builder; - let builder = B::Builder::new(); - let batch = builder.done(self.upper.clone(), Antichain::new(), Antichain::from_elem(T::minimum())); - self.insert(batch); - } - } -} - -// Drop implementation allows us to log batch drops, to zero out maintained totals. -impl Drop for Spine -where - T: Lattice+Ord, - R: Semigroup, - B: Batch, -{ - fn drop(&mut self) { - self.drop_batches(); - } -} - - -impl Spine -where - T: Lattice+Ord, - R: Semigroup, - B: Batch, -{ - /// Drops and logs batches. Used in `set_logical_compaction` and drop. - fn drop_batches(&mut self) { - if let Some(logger) = &self.logger { - for batch in self.merging.drain(..) { - match batch { - MergeState::Single(Some(batch)) => { - logger.log(::logging::DropEvent { - operator: self.operator.global_id, - length: batch.len(), - }); - }, - MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => { - logger.log(::logging::DropEvent { - operator: self.operator.global_id, - length: batch1.len(), - }); - logger.log(::logging::DropEvent { - operator: self.operator.global_id, - length: batch2.len(), - }); - }, - MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => { - logger.log(::logging::DropEvent { - operator: self.operator.global_id, - length: batch.len(), - }); - } - _ => { }, - } - } - for batch in self.pending.drain(..) { - logger.log(::logging::DropEvent { - operator: self.operator.global_id, - length: batch.len(), - }); - } - } - } -} - -impl Spine -where - K: Ord+Clone, - V: Ord+Clone, - T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, - R: Semigroup, - B: Batch, -{ - /// True iff there is at most one non-empty batch in `self.merging`. - /// - /// When true, there is no maintenance work to perform in the trace, other than compaction. - /// We do not yet have logic in place to determine if compaction would improve a trace, so - /// for now we are ignoring that. - fn reduced(&self) -> bool { - let mut non_empty = 0; - for index in 0 .. self.merging.len() { - if self.merging[index].is_double() { return false; } - if self.merging[index].len() > 0 { non_empty += 1; } - if non_empty > 1 { return false; } - } - true - } - - /// Describes the merge progress of layers in the trace. - /// - /// Intended for diagnostics rather than public consumption. - #[allow(dead_code)] - fn describe(&self) -> Vec<(usize, usize)> { - self.merging - .iter() - .map(|b| match b { - MergeState::Vacant => (0, 0), - x @ MergeState::Single(_) => (1, x.len()), - x @ MergeState::Double(_) => (2, x.len()), - }) - .collect() - } - - /// Allocates a fueled `Spine` with a specified effort multiplier. - /// - /// This trace will merge batches progressively, with each inserted batch applying a multiple - /// of the batch's length in effort to each merge. The `effort` parameter is that multiplier. - /// This value should be at least one for the merging to happen; a value of zero is not helpful. - pub fn with_effort( - mut effort: usize, - operator: OperatorInfo, - logger: Option<::logging::Logger>, - activator: Option, - ) -> Self { - - // Zero effort is .. not smart. - if effort == 0 { effort = 1; } - - Spine { - operator, - logger, - phantom: ::std::marker::PhantomData, - logical_frontier: Antichain::from_elem(::minimum()), - physical_frontier: Antichain::from_elem(::minimum()), - merging: Vec::new(), - pending: Vec::new(), - upper: Antichain::from_elem(::minimum()), - effort, - activator, - } - } - - /// Migrate data from `self.pending` into `self.merging`. - /// - /// This method reflects on the bookmarks held by others that may prevent merging, and in the - /// case that new batches can be introduced to the pile of mergeable batches, it gets on that. - #[inline(never)] - fn consider_merges(&mut self) { - - // TODO: Consider merging pending batches before introducing them. - // TODO: We could use a `VecDeque` here to draw from the front and append to the back. - while self.pending.len() > 0 && PartialOrder::less_equal(self.pending[0].upper(), &self.physical_frontier) - // self.physical_frontier.iter().all(|t1| self.pending[0].upper().iter().any(|t2| t2.less_equal(t1))) - { - // Batch can be taken in optimized insertion. - // Otherwise it is inserted normally at the end of the method. - let mut batch = Some(self.pending.remove(0)); - - // If `batch` and the most recently inserted batch are both empty, we can just fuse them. - // We can also replace a structurally empty batch with this empty batch, preserving the - // apparent record count but now with non-trivial lower and upper bounds. - if batch.as_ref().unwrap().len() == 0 { - if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) { - if self.merging[position].is_single() && self.merging[position].len() == 0 { - self.insert_at(batch.take(), position); - let merged = self.complete_at(position); - self.merging[position] = MergeState::Single(merged); - } - } - } - - // Normal insertion for the batch. - if let Some(batch) = batch { - let index = batch.len().next_power_of_two(); - self.introduce_batch(Some(batch), index.trailing_zeros() as usize); - } - } - - // Having performed all of our work, if more than one batch remains reschedule ourself. - if !self.reduced() { - if let Some(activator) = &self.activator { - activator.activate(); - } - } - } - - /// Introduces a batch at an indicated level. - /// - /// The level indication is often related to the size of the batch, but - /// it can also be used to artificially fuel the computation by supplying - /// empty batches at non-trivial indices, to move merges along. - pub fn introduce_batch(&mut self, batch: Option, batch_index: usize) { - - // Step 0. Determine an amount of fuel to use for the computation. - // - // Fuel is used to drive maintenance of the data structure, - // and in particular are used to make progress through merges - // that are in progress. The amount of fuel to use should be - // proportional to the number of records introduced, so that - // we are guaranteed to complete all merges before they are - // required as arguments to merges again. - // - // The fuel use policy is negotiable, in that we might aim - // to use relatively less when we can, so that we return - // control promptly, or we might account more work to larger - // batches. Not clear to me which are best, of if there - // should be a configuration knob controlling this. - - // The amount of fuel to use is proportional to 2^batch_index, scaled - // by a factor of self.effort which determines how eager we are in - // performing maintenance work. We need to ensure that each merge in - // progress receives fuel for each introduced batch, and so multiply - // by that as well. - if batch_index > 32 { println!("Large batch index: {}", batch_index); } - - // We believe that eight units of fuel is sufficient for each introduced - // record, accounted as four for each record, and a potential four more - // for each virtual record associated with promoting existing smaller - // batches. We could try and make this be less, or be scaled to merges - // based on their deficit at time of instantiation. For now, we remain - // conservative. - let mut fuel = 8 << batch_index; - // Scale up by the effort parameter, which is calibrated to one as the - // minimum amount of effort. - fuel *= self.effort; - // Convert to an `isize` so we can observe any fuel shortfall. - let mut fuel = fuel as isize; - - // Step 1. Apply fuel to each in-progress merge. - // - // Before we can introduce new updates, we must apply any - // fuel to in-progress merges, as this fuel is what ensures - // that the merges will be complete by the time we insert - // the updates. - self.apply_fuel(&mut fuel); - - // Step 2. We must ensure the invariant that adjacent layers do not - // contain two batches will be satisfied when we insert the - // batch. We forcibly completing all merges at layers lower - // than and including `batch_index`, so that the new batch - // is inserted into an empty layer. - // - // We could relax this to "strictly less than `batch_index`" - // if the layer above has only a single batch in it, which - // seems not implausible if it has been the focus of effort. - // - // This should be interpreted as the introduction of some - // volume of fake updates, and we will need to fuel merges - // by a proportional amount to ensure that they are not - // surprised later on. The number of fake updates should - // correspond to the deficit for the layer, which perhaps - // we should track explicitly. - self.roll_up(batch_index); - - // Step 3. This insertion should be into an empty layer. It is a - // logical error otherwise, as we may be violating our - // invariant, from which all wonderment derives. - self.insert_at(batch, batch_index); - - // Step 4. Tidy the largest layers. - // - // It is important that we not tidy only smaller layers, - // as their ascension is what ensures the merging and - // eventual compaction of the largest layers. - self.tidy_layers(); - } - - /// Ensures that an insertion at layer `index` will succeed. - /// - /// This method is subject to the constraint that all existing batches - /// should occur at higher levels, which requires it to "roll up" batches - /// present at lower levels before the method is called. In doing this, - /// we should not introduce more virtual records than 2^index, as that - /// is the amount of excess fuel we have budgeted for completing merges. - fn roll_up(&mut self, index: usize) { - - // Ensure entries sufficient for `index`. - while self.merging.len() <= index { - self.merging.push(MergeState::Vacant); - } - - // We only need to roll up if there are non-vacant layers. - if self.merging[.. index].iter().any(|m| !m.is_vacant()) { - - // Collect and merge all batches at layers up to but not including `index`. - let mut merged = None; - for i in 0 .. index { - self.insert_at(merged, i); - merged = self.complete_at(i); - } - - // The merged results should be introduced at level `index`, which should - // be ready to absorb them (possibly creating a new merge at the time). - self.insert_at(merged, index); - - // If the insertion results in a merge, we should complete it to ensure - // the upcoming insertion at `index` does not panic. - if self.merging[index].is_double() { - let merged = self.complete_at(index); - self.insert_at(merged, index + 1); - } - } - } - - /// Applies an amount of fuel to merges in progress. - /// - /// The supplied `fuel` is for each in progress merge, and if we want to spend - /// the fuel non-uniformly (e.g. prioritizing merges at low layers) we could do - /// so in order to maintain fewer batches on average (at the risk of completing - /// merges of large batches later, but tbh probably not much later). - pub fn apply_fuel(&mut self, fuel: &mut isize) { - // For the moment our strategy is to apply fuel independently to each merge - // in progress, rather than prioritizing small merges. This sounds like a - // great idea, but we need better accounting in place to ensure that merges - // that borrow against later layers but then complete still "acquire" fuel - // to pay back their debts. - for index in 0 .. self.merging.len() { - // Give each level independent fuel, for now. - let mut fuel = *fuel; - // Pass along various logging stuffs, in case we need to report success. - self.merging[index].work(&mut fuel); - // `fuel` could have a deficit at this point, meaning we over-spent when - // we took a merge step. We could ignore this, or maintain the deficit - // and account future fuel against it before spending again. It isn't - // clear why that would be especially helpful to do; we might want to - // avoid overspends at multiple layers in the same invocation (to limit - // latencies), but there is probably a rich policy space here. - - // If a merge completes, we can immediately merge it in to the next - // level, which is "guaranteed" to be complete at this point, by our - // fueling discipline. - if self.merging[index].is_complete() { - let complete = self.complete_at(index); - self.insert_at(complete, index+1); - } - } - } - - /// Inserts a batch at a specific location. - /// - /// This is a non-public internal method that can panic if we try and insert into a - /// layer which already contains two batches (and is still in the process of merging). - fn insert_at(&mut self, batch: Option, index: usize) { - // Ensure the spine is large enough. - while self.merging.len() <= index { - self.merging.push(MergeState::Vacant); - } - - // Insert the batch at the location. - match self.merging[index].take() { - MergeState::Vacant => { - self.merging[index] = MergeState::Single(batch); - } - MergeState::Single(old) => { - // Log the initiation of a merge. - self.logger.as_ref().map(|l| l.log( - ::logging::MergeEvent { - operator: self.operator.global_id, - scale: index, - length1: old.as_ref().map(|b| b.len()).unwrap_or(0), - length2: batch.as_ref().map(|b| b.len()).unwrap_or(0), - complete: None, - } - )); - let compaction_frontier = Some(self.logical_frontier.borrow()); - self.merging[index] = MergeState::begin_merge(old, batch, compaction_frontier); - } - MergeState::Double(_) => { - panic!("Attempted to insert batch into incomplete merge!") - } - }; - } - - /// Completes and extracts what ever is at layer `index`. - fn complete_at(&mut self, index: usize) -> Option { - if let Some((merged, inputs)) = self.merging[index].complete() { - if let Some((input1, input2)) = inputs { - // Log the completion of a merge from existing parts. - self.logger.as_ref().map(|l| l.log( - ::logging::MergeEvent { - operator: self.operator.global_id, - scale: index, - length1: input1.len(), - length2: input2.len(), - complete: Some(merged.len()), - } - )); - } - Some(merged) - } - else { - None - } - } - - /// Attempts to draw down large layers to size appropriate layers. - fn tidy_layers(&mut self) { - - // If the largest layer is complete (not merging), we can attempt - // to draw it down to the next layer. This is permitted if we can - // maintain our invariant that below each merge there are at most - // half the records that would be required to invade the merge. - if !self.merging.is_empty() { - let mut length = self.merging.len(); - if self.merging[length-1].is_single() { - - // To move a batch down, we require that it contain few - // enough records that the lower level is appropriate, - // and that moving the batch would not create a merge - // violating our invariant. - - let appropriate_level = self.merging[length-1].len().next_power_of_two().trailing_zeros() as usize; - - // Continue only as far as is appropriate - while appropriate_level < length-1 { - - match self.merging[length-2].take() { - // Vacant or structurally empty batches can be absorbed. - MergeState::Vacant | MergeState::Single(None) => { - self.merging.remove(length-2); - length = self.merging.len(); - } - // Single batches may initiate a merge, if sizes are - // within bounds, but terminate the loop either way. - MergeState::Single(Some(batch)) => { - - // Determine the number of records that might lead - // to a merge. Importantly, this is not the number - // of actual records, but the sum of upper bounds - // based on indices. - let mut smaller = 0; - for (index, batch) in self.merging[..(length-2)].iter().enumerate() { - match batch { - MergeState::Vacant => { }, - MergeState::Single(_) => { smaller += 1 << index; }, - MergeState::Double(_) => { smaller += 2 << index; }, - } - } - - if smaller <= (1 << length) / 8 { - self.merging.remove(length-2); - self.insert_at(Some(batch), length-2); - } - else { - self.merging[length-2] = MergeState::Single(Some(batch)); - } - return; - } - // If a merge is in progress there is nothing to do. - MergeState::Double(state) => { - self.merging[length-2] = MergeState::Double(state); - return; - } - } - } - } - } - } -} - - -/// Describes the state of a layer. -/// -/// A layer can be empty, contain a single batch, or contain a pair of batches -/// that are in the process of merging into a batch for the next layer. -enum MergeState> { - /// An empty layer, containing no updates. - Vacant, - /// A layer containing a single batch. - /// - /// The `None` variant is used to represent a structurally empty batch present - /// to ensure the progress of maintenance work. - Single(Option), - /// A layer containing two batches, in the process of merging. - Double(MergeVariant), -} - -impl> MergeState { - - /// The number of actual updates contained in the level. - fn len(&self) -> usize { - match self { - MergeState::Single(Some(b)) => b.len(), - MergeState::Double(MergeVariant::InProgress(b1,b2,_)) => b1.len() + b2.len(), - MergeState::Double(MergeVariant::Complete(Some((b, _)))) => b.len(), - _ => 0, - } - } - - /// True only for the MergeState::Vacant variant. - fn is_vacant(&self) -> bool { - if let MergeState::Vacant = self { true } else { false } - } - - /// True only for the MergeState::Single variant. - fn is_single(&self) -> bool { - if let MergeState::Single(_) = self { true } else { false } - } - - /// True only for the MergeState::Double variant. - fn is_double(&self) -> bool { - if let MergeState::Double(_) = self { true } else { false } - } - - /// Immediately complete any merge. - /// - /// The result is either a batch, if there is a non-trivial batch to return - /// or `None` if there is no meaningful batch to return. This does not distinguish - /// between Vacant entries and structurally empty batches, which should be done - /// with the `is_complete()` method. - /// - /// There is the addional option of input batches. - fn complete(&mut self) -> Option<(B, Option<(B, B)>)> { - match std::mem::replace(self, MergeState::Vacant) { - MergeState::Vacant => None, - MergeState::Single(batch) => batch.map(|b| (b, None)), - MergeState::Double(variant) => variant.complete(), - } - } - - /// True iff the layer is a complete merge, ready for extraction. - fn is_complete(&mut self) -> bool { - if let MergeState::Double(MergeVariant::Complete(_)) = self { - true - } - else { - false - } - } - - /// Performs a bounded amount of work towards a merge. - /// - /// If the merge completes, the resulting batch is returned. - /// If a batch is returned, it is the obligation of the caller - /// to correctly install the result. - fn work(&mut self, fuel: &mut isize) { - // We only perform work for merges in progress. - if let MergeState::Double(layer) = self { - layer.work(fuel) - } - } - - /// Extract the merge state, typically temporarily. - fn take(&mut self) -> Self { - std::mem::replace(self, MergeState::Vacant) - } - - /// Initiates the merge of an "old" batch with a "new" batch. - /// - /// The upper frontier of the old batch should match the lower - /// frontier of the new batch, with the resulting batch describing - /// their composed interval, from the lower frontier of the old - /// batch to the upper frontier of the new batch. - /// - /// Either batch may be `None` which corresponds to a structurally - /// empty batch whose upper and lower froniers are equal. This - /// option exists purely for bookkeeping purposes, and no computation - /// is performed to merge the two batches. - fn begin_merge(batch1: Option, batch2: Option, compaction_frontier: Option>) -> MergeState { - let variant = - match (batch1, batch2) { - (Some(batch1), Some(batch2)) => { - assert!(batch1.upper() == batch2.lower()); - let begin_merge = >::begin_merge(&batch1, &batch2, compaction_frontier); - MergeVariant::InProgress(batch1, batch2, begin_merge) - } - (None, Some(x)) => MergeVariant::Complete(Some((x, None))), - (Some(x), None) => MergeVariant::Complete(Some((x, None))), - (None, None) => MergeVariant::Complete(None), - }; - - MergeState::Double(variant) - } -} - -enum MergeVariant> { - /// Describes an actual in-progress merge between two non-trivial batches. - InProgress(B, B, >::Merger), - /// A merge that requires no further work. May or may not represent a non-trivial batch. - Complete(Option<(B, Option<(B, B)>)>), -} - -impl> MergeVariant { - - /// Completes and extracts the batch, unless structurally empty. - /// - /// The result is either `None`, for structurally empty batches, - /// or a batch and optionally input batches from which it derived. - fn complete(mut self) -> Option<(B, Option<(B, B)>)> { - let mut fuel = isize::max_value(); - self.work(&mut fuel); - if let MergeVariant::Complete(batch) = self { batch } - else { panic!("Failed to complete a merge!"); } - } - - /// Applies some amount of work, potentially completing the merge. - /// - /// In case the work completes, the source batches are returned. - /// This allows the caller to manage the released resources. - fn work(&mut self, fuel: &mut isize) { - let variant = std::mem::replace(self, MergeVariant::Complete(None)); - if let MergeVariant::InProgress(b1,b2,mut merge) = variant { - merge.work(&b1,&b2,fuel); - if *fuel > 0 { - *self = MergeVariant::Complete(Some((merge.done(), Some((b1,b2))))); - } - else { - *self = MergeVariant::InProgress(b1,b2,merge); - } - } - else { - *self = variant; - } - } -} diff --git a/src/trace/implementations/vec.rs b/src/trace/implementations/vec.rs deleted file mode 100644 index f9ccfc284..000000000 --- a/src/trace/implementations/vec.rs +++ /dev/null @@ -1,300 +0,0 @@ -//! Trace and batch implementations based on sorted ranges. -//! -//! The types and type aliases in this module start with either -//! -//! * `OrdVal`: Collections whose data have the form `(key, val)` where `key` is ordered. -//! * `OrdKey`: Collections whose data have the form `key` where `key` is ordered. -//! -//! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation -//! and should consume fewer resources (computation and memory) when it applies. - -use std::cmp::Ordering; - -use ::Diff; -use lattice::Lattice; - -use trace::{Batch, BatchReader, Builder, Merger, Cursor}; -use trace::description::Description; - -// use trace::layers::MergeBuilder; - -// use super::spine_fueled::Spine; -use super::merge_batcher::MergeBatcher; - -#[derive(Debug, Abomonation)] -pub struct VecBatch { - pub list: Vec<(K, V, T, R)>, - pub desc: Description, -} - -impl BatchReader for VecBatch -where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Diff { - type Cursor = VecCursor; - fn cursor(&self) -> Self::Cursor { VecCursor { key_pos: 0, val_pos: 0 } } - fn len(&self) -> usize { self.list.len() } - fn description(&self) -> &Description { &self.desc } -} - -impl Batch for VecBatch -where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+::std::fmt::Debug+'static, R: Diff { - type Batcher = MergeBatcher; - type Builder = VecBuilder; - type Merger = VecMerger; - - fn begin_merge(&self, other: &Self) -> Self::Merger { - VecMerger::new(self, other) - } -} - -impl VecBatch -where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+::std::fmt::Debug+'static, R: Diff { - fn advance_builder_from(list: &mut Vec<(K,V,T,R)>, frontier: &[T], key_pos: usize) { - - for index in key_pos .. list.len() { - list[index].2 = list[index].2.advance_by(frontier); - } - - // TODO: Already sorted by key, val; could sort restricted ranges. - list[key_pos ..].sort(); - - for index in key_pos .. (list.len() - 1) { - if list[index].0 == list[index+1].0 && list[index].1 == list[index+1].1 && list[index].2 == list[index+1].2 { - list[index+1].3 = list[index+1].3 + list[index].3; - list[index].3 = R::zero(); - } - } - - let mut write_position = key_pos; - for index in key_pos .. list.len() { - if !list[index].3.is_zero() { - list.swap(write_position, index); - write_position += 1; - } - } - - list.truncate(write_position); - } -} - -/// State for an in-progress merge. -pub struct VecMerger { - // first batch, and position therein. - lower1: usize, - upper1: usize, - // second batch, and position therein. - lower2: usize, - upper2: usize, - // result that we are currently assembling. - result: Vec<(K,V,T,R)>, - description: Description, -} - -impl Merger> for VecMerger -where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+::std::fmt::Debug+'static, R: Diff { - fn new(batch1: &VecBatch, batch2: &VecBatch) -> Self { - - assert!(batch1.upper() == batch2.lower()); - - let since = if batch1.description().since().iter().all(|t1| batch2.description().since().iter().any(|t2| t2.less_equal(t1))) { - batch2.description().since() - } - else { - batch1.description().since() - }; - - let description = Description::new(batch1.lower(), batch2.upper(), since); - - VecMerger { - lower1: 0, - upper1: batch1.list.len(), - lower2: 0, - upper2: batch2.list.len(), - result: Vec::with_capacity(batch1.len() + batch2.len()), - description: description, - } - } - fn done(self) -> VecBatch { - - assert!(self.lower1 == self.upper1); - assert!(self.lower2 == self.upper2); - - VecBatch { - list: self.result, - desc: self.description, - } - } - fn work(&mut self, source1: &VecBatch, source2: &VecBatch, frontier: &Option>, fuel: &mut usize) { - - let mut effort = 0; - - let initial_pos = self.result.len(); - - // while both mergees are still active - while self.lower1 < self.upper1 && self.lower2 < self.upper2 && effort < *fuel { - - let tuple1 = (&source1.list[self.lower1].0,&source1.list[self.lower1].1,&source1.list[self.lower1].2); - let tuple2 = (&source2.list[self.lower2].0,&source2.list[self.lower2].1,&source1.list[self.lower2].2); - - match tuple1.cmp(&tuple2) { - Ordering::Less => { - self.result.push(source1.list[self.lower1].clone()); - self.lower1 += 1; - effort += 1; - }, - Ordering::Equal => { - let mut new_element = source1.list[self.lower1].clone(); - new_element.3 = new_element.3 + source2.list[self.lower2].3; - self.result.push(new_element); - self.lower1 += 1; - self.lower2 += 1; - effort += 1; - } - Ordering::Greater => { - self.result.push(source2.list[self.lower2].clone()); - self.lower2 += 1; - effort += 1; - } - } - } - - if self.lower1 == self.upper1 || self.lower2 == self.upper2 { - // these are just copies, so let's bite the bullet and just do them. - if self.lower1 < self.upper1 { - self.result.extend(source1.list[self.lower1 .. self.upper1].iter().cloned()); - self.lower1 = self.upper1; - } - if self.lower2 < self.upper2 { - self.result.extend(source2.list[self.lower2 .. self.upper2].iter().cloned()); - self.lower2 = self.upper2; - } - } - - // if we are supplied a frontier, we should compact. - if let Some(frontier) = frontier.as_ref() { - VecBatch::advance_builder_from(&mut self.result, frontier, initial_pos) - } - - if effort >= *fuel { *fuel = 0; } - else { *fuel -= effort; } - } -} - -/// A cursor for navigating a single layer. -#[derive(Debug)] -pub struct VecCursor { - key_pos: usize, - val_pos: usize, -} - -impl Cursor for VecCursor -where K: Ord+Clone, V: Ord+Clone, T: Lattice+Ord+Clone, R: Diff { - - type Storage = VecBatch; - - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &storage.list[self.key_pos].0 } - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { &storage.list[self.val_pos].1 } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { - let key_val = (self.key(storage), self.val(storage)); - let mut temp_index = self.val_pos; - while temp_index < storage.list.len() && key_val == (&storage.list[temp_index].0,&storage.list[temp_index].1) { - logic(&storage.list[temp_index].2, storage.list[temp_index].3); - temp_index += 1; - } - } - fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_pos < storage.list.len() } - fn val_valid(&self, storage: &Self::Storage) -> bool { - self.val_pos < storage.list.len() && storage.list[self.key_pos].0 == storage.list[self.val_pos].0 - } - fn step_key(&mut self, storage: &Self::Storage){ - if let Some(key) = self.get_key(storage) { - let step = advance(&storage.list[self.key_pos..], |tuple| &tuple.0 == key); - self.key_pos += step; - } - } - fn seek_key(&mut self, storage: &Self::Storage, key: &K) { - if self.key_valid(storage) { - let step = advance(&storage.list[self.key_pos..], |tuple| &tuple.0 < key); - self.key_pos += step; - } - } - fn step_val(&mut self, storage: &Self::Storage) { - if let Some(val) = self.get_val(storage) { - let step = advance(&storage.list[self.val_pos..], |tuple| &tuple.0 == self.key(storage) && &tuple.1 == val); - self.val_pos += step; - } - } - fn seek_val(&mut self, storage: &Self::Storage, val: &V) { - if self.val_valid(storage) { - let step = advance(&storage.list[self.val_pos..], |tuple| &tuple.0 == self.key(storage) && &tuple.1 < val); - self.val_pos += step; - } - } - fn rewind_keys(&mut self, _storage: &Self::Storage) { self.key_pos = 0; } - fn rewind_vals(&mut self, storage: &Self::Storage) { - if let Some(key) = self.get_key(storage) { - self.rewind_keys(storage); - self.seek_key(storage, key); - } - } -} - - -/// A builder for creating layers from unsorted update tuples. -pub struct VecBuilder { - list: Vec<(K,V,T,R)>, -} - -impl Builder> for VecBuilder -where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+::std::fmt::Debug+'static, R: Diff { - - fn new() -> Self { VecBuilder { list: Vec::new() } } - fn with_capacity(cap: usize) -> Self { VecBuilder { list: Vec::with_capacity(cap) } } - - #[inline] - fn push(&mut self, (key, val, time, diff): (K, V, T, R)) { self.list.push((key,val,time,diff)); } - - #[inline(never)] - fn done(mut self, lower: &[T], upper: &[T], since: &[T]) -> VecBatch { - self.list.sort(); - VecBatch { - list: self.list, - desc: Description::new(lower, upper, since) - } - } -} - - -/// Reports the number of elements satisfing the predicate. -/// -/// This methods *relies strongly* on the assumption that the predicate -/// stays false once it becomes false, a joint property of the predicate -/// and the slice. This allows `advance` to use exponential search to -/// count the number of elements in time logarithmic in the result. -#[inline(never)] -pub fn advancebool>(slice: &[T], function: F) -> usize { - - // start with no advance - let mut index = 0; - if index < slice.len() && function(&slice[index]) { - - // advance in exponentially growing steps. - let mut step = 1; - while index + step < slice.len() && function(&slice[index + step]) { - index += step; - step = step << 1; - } - - // advance in exponentially shrinking steps. - step = step >> 1; - while step > 0 { - if index + step < slice.len() && function(&slice[index + step]) { - index += step; - } - step = step >> 1; - } - - index += 1; - } - - index -}