From dea27e3fd4d2c2da6c57e348d4597c3597944c1c Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 26 Apr 2024 15:23:38 -0400 Subject: [PATCH 1/5] Use new merge batcher, remove ValOwned Signed-off-by: Moritz Hoffmann --- Cargo.lock | 4 +- Cargo.toml | 11 +- src/compute/src/compute_state.rs | 1 - src/compute/src/extensions/reduce.rs | 77 +++++---- src/compute/src/render.rs | 41 ++--- src/compute/src/render/context.rs | 2 +- src/compute/src/render/reduce.rs | 240 +++++++++++++++------------ src/compute/src/render/threshold.rs | 21 ++- src/compute/src/render/top_k.rs | 49 +++--- src/compute/src/typedefs.rs | 7 +- src/timely-util/src/operator.rs | 2 +- 11 files changed, 254 insertions(+), 201 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e5d1410829e7a..e4b3964d982ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1973,7 +1973,7 @@ checksum = "0e25ea47919b1560c4e3b7fe0aaab9becf5b84a10325ddf7db0f0ba5e1026499" [[package]] name = "differential-dataflow" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/differential-dataflow.git#07ed40c3c44e6fc43efa5decdaa6e57ba9ce7fac" +source = "git+https://github.com/antiguru/differential-dataflow.git?branch=container_merge_batcher#c5b1b1efcbcf4b54d234cb093db248f626dac160" dependencies = [ "abomonation", "abomonation_derive", @@ -2029,7 +2029,7 @@ checksum = "923dea538cea0aa3025e8685b20d6ee21ef99c4f77e954a30febbaac5ec73a97" [[package]] name = "dogsdogsdogs" version = "0.1.0" -source = "git+https://github.com/MaterializeInc/differential-dataflow.git#07ed40c3c44e6fc43efa5decdaa6e57ba9ce7fac" +source = "git+https://github.com/antiguru/differential-dataflow.git?branch=container_merge_batcher#c5b1b1efcbcf4b54d234cb093db248f626dac160" dependencies = [ "abomonation", "abomonation_derive", diff --git a/Cargo.toml b/Cargo.toml index 4b44d91ba772b..baf62bc6df960 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -174,6 +174,13 @@ debug = 2 # tend to get rewritten or disappear (e.g., because a PR is force pushed or gets # merged), after which point it becomes impossible to build that historical # version of Materialize. +[patch."https://github.com/TimelyDataflow/timely-dataflow"] +# Projects that do not reliably release to crates.io. +timely = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } +timely_bytes = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } +timely_communication = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } +timely_container = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } +timely_logging = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } [patch.crates-io] # Projects that do not reliably release to crates.io. timely = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } @@ -181,8 +188,8 @@ timely_bytes = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } timely_communication = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } timely_container = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } timely_logging = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -differential-dataflow = { git = "https://github.com/MaterializeInc/differential-dataflow.git" } -dogsdogsdogs = { git = "https://github.com/MaterializeInc/differential-dataflow.git" } +differential-dataflow = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "container_merge_batcher" } +dogsdogsdogs = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "container_merge_batcher" } # Waiting on https://github.com/sfackler/rust-postgres/pull/752. postgres = { git = "https://github.com/MaterializeInc/rust-postgres" } diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index 896612a954c49..232a2895e757f 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -1137,7 +1137,6 @@ impl IndexPeek { for<'a> Tr::Key<'a>: ToDatumIter, for<'a> Tr::Val<'a>: ToDatumIter, Tr::KeyOwned: Columnation + Data + FromDatumIter + ToDatumIter, - Tr::ValOwned: Columnation + Data + ToDatumIter, { let max_result_size = usize::cast_from(max_result_size); let count_byte_size = std::mem::size_of::(); diff --git a/src/compute/src/extensions/reduce.rs b/src/compute/src/extensions/reduce.rs index 74a1adf1ca4dc..643df0651f405 100644 --- a/src/compute/src/extensions/reduce.rs +++ b/src/compute/src/extensions/reduce.rs @@ -28,15 +28,20 @@ where G::Timestamp: Lattice + Ord, { /// Applies `reduce` to arranged data, and returns an arrangement of output data. - fn mz_reduce_abelian(&self, name: &str, logic: L) -> Arranged> + fn mz_reduce_abelian( + &self, + name: &str, + from: F, + logic: L, + ) -> Arranged> where T2: for<'a> Trace = T1::Key<'a>, Time = G::Timestamp> + 'static, - T2::ValOwned: Data, + V: Data, + F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, - L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned, T2::Diff)>) - + 'static, + T2::Builder: Builder, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>) + 'static, Arranged>: ArrangementSize; } @@ -48,20 +53,26 @@ where T1::Diff: Semigroup, { /// Applies `reduce` to arranged data, and returns an arrangement of output data. - fn mz_reduce_abelian(&self, name: &str, logic: L) -> Arranged> + fn mz_reduce_abelian( + &self, + name: &str, + from: F, + logic: L, + ) -> Arranged> where T2: for<'a> Trace = T1::Key<'a>, Time = G::Timestamp> + 'static, - T2::ValOwned: Data, + V: Data, + F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, - L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned, T2::Diff)>) - + 'static, + T2::Builder: Builder, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>) + 'static, Arranged>: ArrangementSize, { // Allow access to `reduce_abelian` since we're within Mz's wrapper and force arrangement size logging. #[allow(clippy::disallowed_methods)] - Arranged::<_, _>::reduce_abelian::<_, T2>(self, name, logic).log_arrangement_size() + Arranged::<_, _>::reduce_abelian::<_, _, _, T2>(self, name, from, logic) + .log_arrangement_size() } } @@ -76,10 +87,12 @@ where /// are produced as a result. The method is useful for reductions that need to present different /// output views on the same input data. An example is producing an error-free reduction output /// along with a separate error output indicating when the error-free output is valid. - fn reduce_pair( + fn reduce_pair( &self, name1: &str, name2: &str, + from1: F1, + from2: F2, logic1: L1, logic2: L2, ) -> (Arranged>, Arranged>) @@ -87,21 +100,21 @@ where T1: Trace + for<'a> TraceReader = Tr::Key<'a>, KeyOwned = Tr::KeyOwned, Time = G::Timestamp> + 'static, - T1::ValOwned: Data, T1::Diff: Abelian, T1::Batch: Batch, - T1::Builder: Builder, - L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T1::ValOwned, T1::Diff)>) - + 'static, + T1::Builder: Builder, + L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V1, T1::Diff)>) + 'static, + V1: Data, + F1: Fn(T1::Val<'_>) -> V1 + 'static, T2: Trace + for<'a> TraceReader = Tr::Key<'a>, KeyOwned = Tr::KeyOwned, Time = G::Timestamp> + 'static, - T2::ValOwned: Data, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, - L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T2::ValOwned, T2::Diff)>) - + 'static, + T2::Builder: Builder, + L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V2, T2::Diff)>) + 'static, + V2: Data, + F2: Fn(T2::Val<'_>) -> V2 + 'static, Arranged>: ArrangementSize, Arranged>: ArrangementSize; } @@ -112,10 +125,12 @@ where Tr: TraceReader