diff --git a/Cargo.toml b/Cargo.toml index a35cad91f..d494087d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,6 @@ serde = "1.0" serde_derive = "1.0" abomonation = "0.7" abomonation_derive = "0.5" -timely_sort="0.1.6" #timely = { version = "0.11", default-features = false } timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false } #timely = { path = "../timely-dataflow/timely/", default-features = false } diff --git a/dogsdogsdogs/Cargo.toml b/dogsdogsdogs/Cargo.toml index 2ea48707d..cae04f734 100644 --- a/dogsdogsdogs/Cargo.toml +++ b/dogsdogsdogs/Cargo.toml @@ -8,7 +8,6 @@ license = "MIT" abomonation = "0.7" abomonation_derive = "0.5" timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false } -timely_sort = "0.1.6" differential-dataflow = { path = "../", default-features = false } serde = "1" serde_derive = "1" diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 3715ccfe4..b0dad9b28 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -6,8 +6,6 @@ use timely::dataflow::channels::pact::{Pipeline, Exchange}; use timely::dataflow::operators::Operator; use timely::progress::Antichain; -use timely_sort::Unsigned; - use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid}; use differential_dataflow::lattice::Lattice; @@ -58,7 +56,7 @@ where let mut key: Tr::Key = supplied_key0; let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| { logic1(&update.0, &mut key); - key.hashed().as_u64() + key.hashed().into() }); let mut key1: Tr::Key = supplied_key1; diff --git a/src/hashable.rs b/src/hashable.rs index 2b0279e01..c0929b278 100644 --- a/src/hashable.rs +++ b/src/hashable.rs @@ -15,11 +15,6 @@ //! distributed integers can perhaps do something simpler (like report their own value). use std::hash::Hasher; -use std::ops::Deref; - -use abomonation::Abomonation; - -use timely_sort::Unsigned; /// Types with a `hashed` method, producing an unsigned output of some type. /// @@ -28,7 +23,7 @@ use timely_sort::Unsigned; /// can take advantage of the smaller size. pub trait Hashable { /// The type of the output value. - type Output: Unsigned+Copy; + type Output: Into+Copy; /// A well-distributed integer derived from the data. fn hashed(&self) -> Self::Output; } @@ -41,139 +36,3 @@ impl Hashable for T { h.finish() } } - -/// A marker trait for types whose `Ord` implementation orders first by `hashed()`. -/// -/// Types implementing this trait *must* implement `Ord` and satisfy the property that two values -/// with different hashes have the same order as their hashes. This trait allows implementations -/// that sort by hash value to rely on the `Ord` implementation of the type. -pub trait HashOrdered : Ord+Hashable { } -impl HashOrdered for OrdWrapper { } -impl HashOrdered for HashableWrapper { } -impl HashOrdered for UnsignedWrapper { } - -// It would be great to use the macros for these, but I couldn't figure out how to get it -// to work with constraints (i.e. `Hashable`) on the generic parameters. -impl Abomonation for OrdWrapper { - #[inline] unsafe fn entomb(&self, write: &mut W) -> ::std::io::Result<()> { - self.item.entomb(write) - } - #[inline] unsafe fn exhume<'a,'b>(&'a mut self, mut bytes: &'b mut [u8]) -> Option<&'b mut [u8]> { - let temp = bytes; - bytes = self.item.exhume(temp)?; - Some(bytes) - } -} - -// It would be great to use the macros for these, but I couldn't figure out how to get it -// to work with constraints (i.e. `Hashable`) on the generic parameters. -impl Abomonation for HashableWrapper { - - #[inline] unsafe fn entomb(&self, write: &mut W) -> ::std::io::Result<()> { - self.item.entomb(write) - } - #[inline] unsafe fn exhume<'a,'b>(&'a mut self, mut bytes: &'b mut [u8]) -> Option<&'b mut [u8]> { - let temp = bytes; - bytes = self.item.exhume(temp)?; - Some(bytes) - } -} - -impl Abomonation for UnsignedWrapper { - - #[inline] unsafe fn entomb(&self, write: &mut W) -> ::std::io::Result<()> { - self.item.entomb(write) - } - #[inline] unsafe fn exhume<'a,'b>(&'a mut self, mut bytes: &'b mut [u8]) -> Option<&'b mut [u8]> { - let temp = bytes; - bytes = self.item.exhume(temp)?; - Some(bytes) - } -} - - -/// A wrapper around hashable types that ensures an implementation of `Ord` that compares -/// hash values first. -#[derive(Clone, Eq, PartialEq, Debug, Default)] -pub struct OrdWrapper { - /// The item, so you can grab it. - pub item: T -} - -impl PartialOrd for OrdWrapper { - #[inline] - fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> { - (self.item.hashed(), &self.item).partial_cmp(&(other.item.hashed(), &other.item)) - } -} -impl Ord for OrdWrapper { - #[inline] - fn cmp(&self, other: &Self) -> ::std::cmp::Ordering { - (self.item.hashed(), &self.item).cmp(&(other.item.hashed(), &other.item)) - } -} - -impl Hashable for OrdWrapper { - type Output = T::Output; - fn hashed(&self) -> T::Output { self.item.hashed() } -} - -impl Deref for OrdWrapper { - type Target = T; - fn deref(&self) -> &T { &self.item } -} - - -/// Wrapper to stash hash value with the actual value. -#[derive(Clone, Default, Ord, PartialOrd, Eq, PartialEq, Debug, Copy)] -pub struct HashableWrapper { - hash: T::Output, - /// The item, for reference. - pub item: T, -} - -impl Hashable for HashableWrapper { - type Output = T::Output; - #[inline] - fn hashed(&self) -> T::Output { self.hash } -} - -impl Deref for HashableWrapper { - type Target = T; - #[inline] - fn deref(&self) -> &T { &self.item } -} - -impl From for HashableWrapper { - #[inline] - fn from(item: T) -> HashableWrapper { - HashableWrapper { - hash: item.hashed(), - item, - } - } -} - -/// A wrapper around an unsigned integer, providing `hashed` as the value itself. -#[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Default, Debug, Copy)] -pub struct UnsignedWrapper { - /// The item. - pub item: T, -} - -impl Hashable for UnsignedWrapper { - type Output = T; - #[inline] - fn hashed(&self) -> Self::Output { self.item } -} - -impl Deref for UnsignedWrapper { - type Target = T; - #[inline] - fn deref(&self) -> &T { &self.item } -} - -impl From for UnsignedWrapper { - #[inline] - fn from(item: T) -> Self { UnsignedWrapper { item } } -} diff --git a/src/lib.rs b/src/lib.rs index 814254d8f..4bc4a6a50 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -92,7 +92,6 @@ impl ExchangeData for T { } extern crate fnv; extern crate timely; -extern crate timely_sort; #[macro_use] extern crate abomonation_derive; diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 289ed4c56..0ec09eb6e 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -187,7 +187,6 @@ where /// use differential_dataflow::operators::reduce::Reduce; /// use differential_dataflow::trace::Trace; /// use differential_dataflow::trace::implementations::ord::OrdValSpine; - /// use differential_dataflow::hashable::OrdWrapper; /// /// fn main() { /// ::timely::execute(Config::thread(), |worker| { @@ -247,7 +246,6 @@ where /// use differential_dataflow::operators::reduce::Reduce; /// use differential_dataflow::trace::Trace; /// use differential_dataflow::trace::implementations::ord::OrdValSpine; - /// use differential_dataflow::hashable::OrdWrapper; /// /// fn main() { /// ::timely::execute(Config::thread(), |worker| { @@ -361,7 +359,6 @@ where /// use differential_dataflow::trace::Trace; /// use differential_dataflow::trace::TraceReader; /// use differential_dataflow::trace::implementations::ord::OrdValSpine; - /// use differential_dataflow::hashable::OrdWrapper; /// use differential_dataflow::input::Input; /// /// fn main() { diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 23fbe655e..e2af5df93 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -26,8 +26,6 @@ use timely::progress::Timestamp; use timely::progress::{Antichain, frontier::AntichainRef}; use timely::dataflow::operators::Capability; -use timely_sort::Unsigned; - use ::{Data, ExchangeData, Collection, AsCollection, Hashable}; use ::difference::Semigroup; use lattice::Lattice; @@ -274,7 +272,7 @@ where Tr: 'static, { // while the arrangement is already correctly distributed, the query stream may not be. - let exchange = Exchange::new(move |update: &(Tr::Key,G::Timestamp)| update.0.hashed().as_u64()); + let exchange = Exchange::new(move |update: &(Tr::Key,G::Timestamp)| update.0.hashed().into()); queries.binary_frontier(&self.stream, exchange, Pipeline, "TraceQuery", move |_capability, _info| { let mut trace = Some(self.trace.clone()); @@ -484,7 +482,7 @@ where Tr::Batch: Batch, Tr::Cursor: Cursor, { - let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().as_u64()); + let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); self.arrange_core(exchange, name) } diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index aef44d465..fd36934c7 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -115,8 +115,6 @@ use timely::progress::Timestamp; use timely::progress::Antichain; use timely::dataflow::operators::Capability; -use timely_sort::Unsigned; - use ::{ExchangeData, Hashable}; use lattice::Lattice; use trace::{Trace, TraceReader, Batch, Cursor}; @@ -157,7 +155,7 @@ where let reader = &mut reader; - let exchange = Exchange::new(move |update: &(Tr::Key,Option,G::Timestamp)| (update.0).hashed().as_u64()); + let exchange = Exchange::new(move |update: &(Tr::Key,Option,G::Timestamp)| (update.0).hashed().into()); stream.unary_frontier(exchange, name, move |_capability, info| { diff --git a/src/operators/join.rs b/src/operators/join.rs index 59db65e0d..3762a25a4 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -237,7 +237,6 @@ pub trait JoinCore where G::Time /// use differential_dataflow::operators::join::JoinCore; /// use differential_dataflow::trace::Trace; /// use differential_dataflow::trace::implementations::ord::OrdValSpine; - /// use differential_dataflow::hashable::OrdWrapper; /// /// fn main() { /// ::timely::example(|scope| { diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index f037ba463..3c27c5476 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -253,7 +253,6 @@ pub trait ReduceCore where G::Timestam /// use differential_dataflow::operators::reduce::ReduceCore; /// use differential_dataflow::trace::Trace; /// use differential_dataflow::trace::implementations::ord::OrdValSpine; - /// use differential_dataflow::hashable::OrdWrapper; /// /// fn main() { /// ::timely::example(|scope| { diff --git a/tests/trace.rs b/tests/trace.rs index 6bb32990e..1c22f4665 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -6,8 +6,6 @@ use std::rc::Rc; use timely::dataflow::operators::generic::OperatorInfo; use timely::progress::{Antichain, frontier::AntichainRef}; -use differential_dataflow::hashable::UnsignedWrapper; - use differential_dataflow::trace::implementations::ord::OrdValBatch; use differential_dataflow::trace::{Trace, TraceReader, Batch, Batcher}; use differential_dataflow::trace::cursor::CursorDebug; @@ -15,18 +13,18 @@ use differential_dataflow::trace::implementations::spine_fueled::Spine; pub type OrdValSpine = Spine>>; -type IntegerTrace = OrdValSpine, u64, usize, i64>; +type IntegerTrace = OrdValSpine; -fn get_trace() -> Spine, u64, usize, i64, Rc, u64, usize, i64>>> { +fn get_trace() -> Spine>> { let op_info = OperatorInfo::new(0, 0, &[]); let mut trace = IntegerTrace::new(op_info, None, None); { - let mut batcher = <::Batch as Batch, u64, usize, i64>>::Batcher::new(); + let mut batcher = <::Batch as Batch>::Batcher::new(); batcher.push_batch(&mut vec![ - ((1.into(), 2), 0, 1), - ((2.into(), 3), 1, 1), - ((2.into(), 3), 2, -1), + ((1, 2), 0, 1), + ((2, 3), 1, 1), + ((2, 3), 2, -1), ]); let batch_ts = &[1, 2, 3]; @@ -44,21 +42,21 @@ fn test_trace() { let (mut cursor1, storage1) = trace.cursor_through(AntichainRef::new(&[1])).unwrap(); let vec_1 = cursor1.to_vec(&storage1); - assert_eq!(vec_1, vec![((1.into(), 2), vec![(0, 1)])]); + assert_eq!(vec_1, vec![((1, 2), vec![(0, 1)])]); let (mut cursor2, storage2) = trace.cursor_through(AntichainRef::new(&[2])).unwrap(); let vec_2 = cursor2.to_vec(&storage2); println!("--> {:?}", vec_2); assert_eq!(vec_2, vec![ - ((1.into(), 2), vec![(0, 1)]), - ((2.into(), 3), vec![(1, 1)]), + ((1, 2), vec![(0, 1)]), + ((2, 3), vec![(1, 1)]), ]); let (mut cursor3, storage3) = trace.cursor_through(AntichainRef::new(&[3])).unwrap(); let vec_3 = cursor3.to_vec(&storage3); assert_eq!(vec_3, vec![ - ((1.into(), 2), vec![(0, 1)]), - ((2.into(), 3), vec![(1, 1), (2, -1)]), + ((1, 2), vec![(0, 1)]), + ((2, 3), vec![(1, 1), (2, -1)]), ]); let (mut cursor4, storage4) = trace.cursor();