Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,34 @@
use std::collections::VecDeque;

use timely::communication::message::RefOrMut;
use timely::progress::frontier::Antichain;
use timely::progress::{frontier::Antichain, Timestamp};

use ::difference::Semigroup;

use trace::{Batcher, Builder};
use trace::implementations::Update;

/// Creates batches from unordered tuples.
pub struct MergeBatcher<U: Update> {
sorter: MergeSorter<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>,
lower: Antichain<U::Time>,
frontier: Antichain<U::Time>,
phantom: ::std::marker::PhantomData<U>,
pub struct MergeBatcher<K, V, T, D> {
sorter: MergeSorter<(K, V), T, D>,
lower: Antichain<T>,
frontier: Antichain<T>,
}

impl<U: Update> Batcher for MergeBatcher<U> {
type Item = ((U::KeyOwned,U::ValOwned),U::Time,U::Diff);
type Time = U::Time;
impl<K, V, T, D> Batcher for MergeBatcher<K, V, T, D>
where
K: Ord + Clone,
V: Ord + Clone,
T: Timestamp,
D: Semigroup,
{
type Item = ((K,V),T,D);
type Time = T;

fn new() -> Self {
MergeBatcher {
sorter: MergeSorter::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(<U::Time as timely::progress::Timestamp>::minimum()),
phantom: ::std::marker::PhantomData,
lower: Antichain::from_elem(<T as timely::progress::Timestamp>::minimum()),
}
}

Expand All @@ -53,7 +56,7 @@ impl<U: Update> Batcher for MergeBatcher<U> {
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline(never)]
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<U::Time>) -> B::Output {
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {

let mut merged = Vec::new();
self.sorter.finish_into(&mut merged);
Expand Down Expand Up @@ -126,23 +129,23 @@ impl<U: Update> Batcher for MergeBatcher<U> {
let mut buffer = Vec::new();
self.sorter.push(&mut buffer);
// We recycle buffers with allocations (capacity, and not zero-sized).
while buffer.capacity() > 0 && std::mem::size_of::<((U::KeyOwned,U::ValOwned),U::Time,U::Diff)>() > 0 {
while buffer.capacity() > 0 && std::mem::size_of::<((K,V),T,D)>() > 0 {
buffer = Vec::new();
self.sorter.push(&mut buffer);
}

let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<U::Time as timely::progress::Timestamp>::minimum()));
let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<T as timely::progress::Timestamp>::minimum()));
self.lower = upper;
seal
}

// the frontier of elements remaining after the most recent call to `self.seal`.
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<U::Time> {
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> {
self.frontier.borrow()
}
}

struct MergeSorter<D: Ord, T: Ord, R: Semigroup> {
struct MergeSorter<D, T, R> {
queue: Vec<Vec<Vec<(D, T, R)>>>, // each power-of-two length list of allocations.
stash: Vec<Vec<(D, T, R)>>,
}
Expand Down
50 changes: 23 additions & 27 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,42 @@
//! A general purpose `Batcher` implementation based on radix sort for TimelyStack.

use std::marker::PhantomData;
use timely::Container;
use timely::communication::message::RefOrMut;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::progress::frontier::Antichain;
use timely::progress::{frontier::Antichain, Timestamp};

use ::difference::Semigroup;

use trace::{Batcher, Builder};
use trace::implementations::Update;

/// Creates batches from unordered tuples.
pub struct ColumnatedMergeBatcher<U: Update>
pub struct ColumnatedMergeBatcher<K, V, T, D>
where
U::KeyOwned: Columnation,
U::ValOwned: Columnation,
U::Time: Columnation,
U::Diff: Columnation,
K: Columnation,
V: Columnation,
T: Columnation,
D: Columnation,
{
sorter: MergeSorterColumnation<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>,
lower: Antichain<U::Time>,
frontier: Antichain<U::Time>,
phantom: PhantomData<U>,
sorter: MergeSorterColumnation<(K, V), T, D>,
lower: Antichain<T>,
frontier: Antichain<T>,
}

impl<U: Update> Batcher for ColumnatedMergeBatcher<U>
impl<K, V, T, D> Batcher for ColumnatedMergeBatcher<K, V, T, D>
where
U::KeyOwned: Columnation + 'static,
U::ValOwned: Columnation + 'static,
U::Time: Columnation + 'static,
U::Diff: Columnation + 'static,
K: Columnation + Ord + Clone + 'static,
V: Columnation + Ord + Clone + 'static,
T: Columnation + Timestamp + 'static,
D: Columnation + Semigroup + 'static,
{
type Item = ((U::KeyOwned,U::ValOwned),U::Time,U::Diff);
type Time = U::Time;
type Item = ((K,V),T,D);
type Time = T;

fn new() -> Self {
ColumnatedMergeBatcher {
sorter: MergeSorterColumnation::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(<U::Time as timely::progress::Timestamp>::minimum()),
phantom: PhantomData,
lower: Antichain::from_elem(<T as Timestamp>::minimum()),
}
}

Expand All @@ -64,7 +60,7 @@ where
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline]
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<U::Time>) -> B::Output {
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {

let mut merged = Default::default();
self.sorter.finish_into(&mut merged);
Expand Down Expand Up @@ -106,7 +102,7 @@ where
if upper.less_equal(time) {
self.frontier.insert(time.clone());
if keep.is_empty() {
if keep.capacity() != MergeSorterColumnation::<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>::buffer_size() {
if keep.capacity() != MergeSorterColumnation::<(K, V), T, D>::buffer_size() {
keep = self.sorter.empty();
}
} else if keep.len() == keep.capacity() {
Expand Down Expand Up @@ -134,13 +130,13 @@ where
// Drain buffers (fast reclamation).
self.sorter.clear_stash();

let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<U::Time as timely::progress::Timestamp>::minimum()));
let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<T as timely::progress::Timestamp>::minimum()));
self.lower = upper;
seal
}

// the frontier of elements remaining after the most recent call to `self.seal`.
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<U::Time> {
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> {
self.frontier.borrow()
}
}
Expand Down Expand Up @@ -186,13 +182,13 @@ impl<T: Columnation> TimelyStackQueue<T> {
}
}

struct MergeSorterColumnation<D: Ord+Columnation, T: Ord+Columnation, R: Semigroup+Columnation> {
struct MergeSorterColumnation<D: Columnation, T: Columnation, R: Columnation> {
queue: Vec<Vec<TimelyStack<(D, T, R)>>>, // each power-of-two length list of allocations.
stash: Vec<TimelyStack<(D, T, R)>>,
pending: Vec<(D, T, R)>,
}

impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semigroup+Columnation+'static> MergeSorterColumnation<D, T, R> {
impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Columnation+'static> MergeSorterColumnation<D, T, R> {

const BUFFER_SIZE_BYTES: usize = 64 << 10;

Expand Down
12 changes: 6 additions & 6 deletions src/trace/implementations/ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,41 +42,41 @@ use trace::abomonated_blanket_impls::AbomonatedBuilder;
/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<
Rc<OrdValBatch<Vector<((K,V),T,R), O>>>,
MergeBatcher<((K,V),T,R)>,
MergeBatcher<K,V,T,R>,
RcBuilder<OrdValBuilder<Vector<((K,V),T,R), O>>>,
>;

/// A trace implementation using a spine of abomonated ordered lists.
pub type OrdValSpineAbom<K, V, T, R, O=usize> = Spine<
Rc<Abomonated<OrdValBatch<Vector<((K,V),T,R), O>>, Vec<u8>>>,
MergeBatcher<((K,V),T,R)>,
MergeBatcher<K,V,T,R>,
AbomonatedBuilder<OrdValBuilder<Vector<((K,V),T,R), O>>>,
>;

/// A trace implementation for empty values using a spine of ordered lists.
pub type OrdKeySpine<K, T, R, O=usize> = Spine<
Rc<OrdKeyBatch<Vector<((K,()),T,R), O>>>,
MergeBatcher<((K,()),T,R)>,
MergeBatcher<K,(),T,R>,
RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R), O>>>,
>;

/// A trace implementation for empty values using a spine of abomonated ordered lists.
pub type OrdKeySpineAbom<K, T, R, O=usize> = Spine<
Rc<Abomonated<OrdKeyBatch<Vector<((K,()),T,R), O>>, Vec<u8>>>,
MergeBatcher<((K,()),T,R)>,
MergeBatcher<K,(),T,R>,
AbomonatedBuilder<OrdKeyBuilder<Vector<((K,()),T,R), O>>>,
>;

/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R, O=usize> = Spine<
Rc<OrdValBatch<TStack<((K,V),T,R), O>>>,
ColumnatedMergeBatcher<((K,V),T,R)>,
ColumnatedMergeBatcher<K,V,T,R>,
RcBuilder<OrdValBuilder<TStack<((K,V),T,R), O>>>,
>;
/// A trace implementation backed by columnar storage.
pub type ColKeySpine<K, T, R, O=usize> = Spine<
Rc<OrdKeyBatch<TStack<((K,()),T,R), O>>>,
ColumnatedMergeBatcher<((K,()),T,R)>,
ColumnatedMergeBatcher<K,(),T,R>,
RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R), O>>>,
>;

Expand Down
6 changes: 3 additions & 3 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use self::val_batch::{OrdValBatch, OrdValBuilder};
/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<
Rc<OrdValBatch<Vector<((K,V),T,R), O>>>,
MergeBatcher<((K,V),T,R)>,
MergeBatcher<K,V,T,R>,
RcBuilder<OrdValBuilder<Vector<((K,V),T,R), O>>>,
>;
// /// A trace implementation for empty values using a spine of ordered lists.
Expand All @@ -31,14 +31,14 @@ pub type OrdValSpine<K, V, T, R, O=usize> = Spine<
/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R, O=usize> = Spine<
Rc<OrdValBatch<TStack<((K,V),T,R), O>>>,
ColumnatedMergeBatcher<((K,V),T,R)>,
ColumnatedMergeBatcher<K,V,T,R>,
RcBuilder<OrdValBuilder<TStack<((K,V),T,R), O>>>,
>;

/// A trace implementation backed by columnar storage.
pub type PreferredSpine<K, V, T, R, O=usize> = Spine<
Rc<OrdValBatch<Preferred<K,V,T,R,O>>>,
ColumnatedMergeBatcher<Preferred<K,V,T,R,O>>,
ColumnatedMergeBatcher<<K as ToOwned>::Owned,<V as ToOwned>::Owned,T,R>,
RcBuilder<OrdValBuilder<Preferred<K,V,T,R,O>>>,
>;

Expand Down
4 changes: 2 additions & 2 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder};
/// A trace implementation using a spine of ordered lists.
pub type VecSpine<K, V, T, R, O=usize> = Spine<
Rc<RhhValBatch<Vector<((K,V),T,R), O>>>,
MergeBatcher<((K,V),T,R)>,
MergeBatcher<K,V,T,R>,
RcBuilder<RhhValBuilder<Vector<((K,V),T,R), O>>>,
>;
// /// A trace implementation for empty values using a spine of ordered lists.
Expand All @@ -29,7 +29,7 @@ pub type VecSpine<K, V, T, R, O=usize> = Spine<
/// A trace implementation backed by columnar storage.
pub type ColSpine<K, V, T, R, O=usize> = Spine<
Rc<RhhValBatch<TStack<((K,V),T,R), O>>>,
ColumnatedMergeBatcher<((K,V),T,R)>,
ColumnatedMergeBatcher<K,V,T,R>,
RcBuilder<RhhValBuilder<TStack<((K,V),T,R), O>>>,
>;
// /// A trace implementation backed by columnar storage.
Expand Down