Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ fn main() {
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"flat" => {
use differential_dataflow::trace::implementations::ord_neu::FlatKeySpine;
let data = data.arrange::<FlatKeySpine<_,_,_>>();
let keys = keys.arrange::<FlatKeySpine<_,_,_>>();
keys.join_core(&data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);
}
_ => {
println!("unreconized mode: {:?}", mode)
}
Expand Down
26 changes: 1 addition & 25 deletions src/trace/cursor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,7 @@ pub mod cursor_list;

pub use self::cursor_list::CursorList;

use std::borrow::Borrow;

/// A reference type corresponding to an owned type, supporting conversion in each direction.
///
/// This trait can be implemented by a GAT, and enables owned types to be borrowed as a GAT.
/// This trait is analogous to `ToOwned`, but not as prescriptive. Specifically, it avoids the
/// requirement that the other trait implement `Borrow`, for which a borrow must result in a
/// `&'self Borrowed`, which cannot move the lifetime into a GAT borrowed type.
pub trait IntoOwned<'a> {
/// Owned type into which this type can be converted.
type Owned;
/// Conversion from an instance of this type to the owned type.
fn into_owned(self) -> Self::Owned;
/// Clones `self` onto an existing instance of the owned type.
fn clone_onto(self, other: &mut Self::Owned);
/// Borrows an owned instance as oneself.
fn borrow_as(owned: &'a Self::Owned) -> Self;
}

impl<'a, T: ToOwned+?Sized> IntoOwned<'a> for &'a T {
type Owned = T::Owned;
fn into_owned(self) -> Self::Owned { self.to_owned() }
fn clone_onto(self, other: &mut Self::Owned) { <T as ToOwned>::clone_into(self, other) }
fn borrow_as(owned: &'a Self::Owned) -> Self { owned.borrow() }
}
pub use timely::container::flatcontainer::IntoOwned;

/// A cursor for navigating ordered `(key, val, time, diff)` updates.
pub trait Cursor {
Expand Down
129 changes: 114 additions & 15 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ where
type OffsetContainer = OffsetList;
}

/// A layout based on timely stacks
pub struct FlatLayout<U: Update> {
phantom: std::marker::PhantomData<U>,
}

/// A type with a preferred container.
///
/// Examples include types that implement `Clone` who prefer
Expand Down Expand Up @@ -291,21 +296,6 @@ impl PushInto<usize> for OffsetList {
}
}

impl<'a> IntoOwned<'a> for usize {
type Owned = usize;
fn into_owned(self) -> Self::Owned {
self
}

fn clone_onto(self, other: &mut Self::Owned) {
*other = self;
}

fn borrow_as(owned: &'a Self::Owned) -> Self {
*owned
}
}

impl BatchContainer for OffsetList {
type Owned = usize;
type ReadItem<'a> = usize;
Expand Down Expand Up @@ -410,6 +400,77 @@ where
}
}

mod flatcontainer {
use timely::container::columnation::{Columnation, TimelyStack};
use timely::container::flatcontainer::{Containerized, FlatStack, Push, Region};
use timely::progress::Timestamp;
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::implementations::{BuilderInput, FlatLayout, Layout, OffsetList, Update};

impl<U: Update> Layout for FlatLayout<U>
where
U::Key: Containerized,
for<'a> <U::Key as Containerized>::Region: Push<U::Key> + Push<<<U::Key as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Key as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
U::Val: Containerized,
for<'a> <U::Val as Containerized>::Region: Push<U::Val> + Push<<<U::Val as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Val as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
U::Time: Containerized,
<U::Time as Containerized>::Region: Region<Owned=U::Time>,
for<'a> <U::Time as Containerized>::Region: Push<U::Time> + Push<<<U::Time as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Time as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
U::Diff: Containerized,
<U::Diff as Containerized>::Region: Region<Owned=U::Diff>,
for<'a> <U::Diff as Containerized>::Region: Push<U::Diff> + Push<<<U::Diff as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Diff as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
{
type Target = U;
type KeyContainer = FlatStack<<U::Key as Containerized>::Region>;
type ValContainer = FlatStack<<U::Val as Containerized>::Region>;
type TimeContainer = FlatStack<<U::Time as Containerized>::Region>;
type DiffContainer = FlatStack<<U::Diff as Containerized>::Region>;
type OffsetContainer = OffsetList;
}

impl<K,V,T,R> BuilderInput<FlatLayout<((K, V), T, R)>> for TimelyStack<((K, V), T, R)>
where
K: Ord + Columnation + Containerized + Clone + 'static,
for<'a> K::Region: Push<K> + Push<<K::Region as Region>::ReadItem<'a>>,
for<'a> <K::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> K: PartialEq<<K::Region as Region>::ReadItem<'a>>,
V: Ord + Columnation + Containerized + Clone + 'static,
for<'a> V::Region: Push<V> + Push<<V::Region as Region>::ReadItem<'a>>,
for<'a> <V::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> V: PartialEq<<V::Region as Region>::ReadItem<'a>>,
T: Timestamp + Lattice + Columnation + Containerized + Clone + 'static,
for<'a> T::Region: Region<Owned=T> + Push<T> + Push<<T::Region as Region>::ReadItem<'a>>,
for<'a> <T::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> T: PartialEq<<T::Region as Region>::ReadItem<'a>>,
R: Ord + Clone + Semigroup + Columnation + Containerized + 'static,
for<'a> R::Region: Region<Owned=R> + Push<R> + Push<<R::Region as Region>::ReadItem<'a>>,
for<'a> <R::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> R: PartialEq<<R::Region as Region>::ReadItem<'a>>,

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be that some of the regions in flat container don't have a proper PartialEq implementation.

{
type Key<'a> = &'a K;
type Val<'a> = &'a V;
type Time = T;
type Diff = R;

fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
(key, val, time.clone(), diff.clone())
}

fn key_eq(this: &&K, other: <<K as Containerized>::Region as Region>::ReadItem<'_>) -> bool {
**this == <K as Containerized>::Region::reborrow(other)
}

fn val_eq(this: &&V, other: <<V as Containerized>::Region as Region>::ReadItem<'_>) -> bool {
**this == <V as Containerized>::Region::reborrow(other)
}
}
}

impl<K,V,T,R> BuilderInput<Preferred<K, V, T, R>> for TimelyStack<((<K as ToOwned>::Owned, <V as ToOwned>::Owned), T, R)>
where
K: Ord+ToOwned+PreferredContainer + ?Sized,
Expand Down Expand Up @@ -599,6 +660,44 @@ pub mod containers {
}
}

mod flatcontainer {
use timely::container::flatcontainer::{FlatStack, Push, Region};
use crate::trace::implementations::BatchContainer;

impl<R> BatchContainer for FlatStack<R>
where
for<'a> R: Region + Push<<R as Region>::ReadItem<'a>> + 'static,
for<'a> R::ReadItem<'a>: Copy + Ord,
{
type Owned = R::Owned;
type ReadItem<'a> = R::ReadItem<'a>;

fn copy(&mut self, item: Self::ReadItem<'_>) {
self.copy(item);
}

fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
}

fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
Self::merge_capacity([cont1, cont2].into_iter())
}

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
R::reborrow(item)
}

fn index(&self, index: usize) -> Self::ReadItem<'_> {
self.get(index)
}

fn len(&self) -> usize {
self.len()
}
}
}

/// A container that accepts slices `[B::Item]`.
pub struct SliceContainer<B> {
/// Offsets that bound each contained slice.
Expand Down
16 changes: 15 additions & 1 deletion src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger};
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
use crate::trace::rc_blanket_impls::RcBuilder;

use super::{Update, Layout, Vector, TStack, Preferred};
use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout};

pub use self::val_batch::{OrdValBatch, OrdValBuilder};
pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
Expand All @@ -37,6 +37,13 @@ pub type ColValSpine<K, V, T, R> = Spine<
RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>,
>;

/// A trace implementation backed by flatcontainer storage.
pub type FlatValSpine<K, V, T, R> = Spine<
Rc<OrdValBatch<FlatLayout<((K,V),T,R)>>>,
MergeBatcher<ColumnationMerger<((K,V),T,R)>, T>,
RcBuilder<OrdValBuilder<FlatLayout<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>,
>;

/// A trace implementation using a spine of ordered lists.
pub type OrdKeySpine<K, T, R> = Spine<
Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>,
Expand All @@ -53,6 +60,13 @@ pub type ColKeySpine<K, T, R> = Spine<
RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>,
>;

/// A trace implementation backed by flatcontainer storage.
pub type FlatKeySpine<K, T, R> = Spine<
Rc<OrdValBatch<FlatLayout<((K,()),T,R)>>>,
MergeBatcher<ColumnationMerger<((K,()),T,R)>, T>,
RcBuilder<OrdValBuilder<FlatLayout<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>,
>;

/// A trace implementation backed by columnar storage.
pub type PreferredSpine<K, V, T, R> = Spine<
Rc<OrdValBatch<Preferred<K,V,T,R>>>,
Expand Down