From 16f55f8ac8fbd6ef9e051fbf789adbf92fa4d360 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 9 Apr 2024 12:00:52 -0400 Subject: [PATCH 1/4] Extract batcher input to assoc type, arrange_core freestanding Signed-off-by: Moritz Hoffmann --- .github/workflows/deploy.yml | 2 +- .github/workflows/test.yml | 4 +- interactive/src/command.rs | 6 +- interactive/src/logging.rs | 6 +- src/operators/arrange/arrangement.rs | 290 +++++++++--------- src/operators/consolidate.rs | 2 +- src/trace/implementations/merge_batcher.rs | 1 + .../implementations/merge_batcher_col.rs | 1 + src/trace/mod.rs | 4 +- 9 files changed, 165 insertions(+), 151 deletions(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index babc088a8..41c27604b 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -9,7 +9,7 @@ jobs: deploy: runs-on: ubuntu-22.04 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - run: cargo install mdbook --version 0.4.31 - run: cd mdbook && mdbook build - uses: JamesIves/github-pages-deploy-action@v4 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e4643725d..e3c9b6087 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,7 +16,7 @@ jobs: toolchain: - stable - 1.72 - name: cargo test on ${{ matrix.os }} + name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }} runs-on: ${{ matrix.os }}-latest steps: - uses: actions/checkout@v4 @@ -24,7 +24,7 @@ jobs: with: toolchain: ${{ matrix.toolchain }} - name: Cargo test - run: cargo test + run: cargo test --workspace --all-targets # Check formatting with rustfmt mdbook: diff --git a/interactive/src/command.rs b/interactive/src/command.rs index c31963afe..d795ec13a 100644 --- a/interactive/src/command.rs +++ b/interactive/src/command.rs @@ -151,7 +151,7 @@ where println!("\tTimely logging connection {} of {}", index, number); let socket = listener.incoming().next().unwrap().unwrap(); socket.set_nonblocking(true).expect("failed to set nonblocking"); - streams.push(EventReader::::new(socket)); + streams.push(EventReader::,_>::new(socket)); } println!("\tAll logging connections established"); @@ -174,7 +174,7 @@ where for _ in 0 .. number { let socket = listener.incoming().next().unwrap().unwrap(); socket.set_nonblocking(true).expect("failed to set nonblocking"); - streams.push(EventReader::::new(socket)); + streams.push(EventReader::,_>::new(socket)); } } crate::logging::publish_differential_logging(manager, worker, granularity, &name_as, streams); @@ -195,4 +195,4 @@ where pub fn serialize_into(&self, writer: W) { bincode::serialize_into(writer, self).expect("bincode: serialization failed"); } -} \ No newline at end of file +} diff --git a/interactive/src/logging.rs b/interactive/src/logging.rs index cfc524660..be07fa4b6 100644 --- a/interactive/src/logging.rs +++ b/interactive/src/logging.rs @@ -30,7 +30,7 @@ where V: ExchangeData+Hash+LoggingValue+Datum, A: Allocate, I : IntoIterator, - ::Item: EventIterator+'static + ::Item: EventIterator>+'static { let (operates, channels, schedule, messages, shutdown, park, text) = worker.dataflow(move |scope| { @@ -217,7 +217,7 @@ where V: ExchangeData+Hash+LoggingValue+Datum, A: Allocate, I : IntoIterator, - ::Item: EventIterator+'static + ::Item: EventIterator>+'static { let (merge,batch) = worker.dataflow(move |scope| { @@ -280,4 +280,4 @@ where manager.traces.set_unkeyed(&Plan::Source(format!("logs/{}/differential/arrange/batch", name)), &batch); manager.traces.set_unkeyed(&Plan::Source(format!("logs/{}/differential/arrange/merge", name)), &merge); -} \ No newline at end of file +} diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index fe65a702d..617dfca2b 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -19,7 +19,7 @@ use timely::dataflow::operators::{Enter, Map}; use timely::order::PartialOrder; -use timely::dataflow::{Scope, Stream}; +use timely::dataflow::{Scope, Stream, StreamCore}; use timely::dataflow::operators::generic::Operator; use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange}; use timely::progress::Timestamp; @@ -511,7 +511,7 @@ where V: ExchangeData, R: ExchangeData, Tr::Batch: Batch, - Tr::Batcher: Batcher, + Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, Tr::Builder: Builder, { self.arrange_named("Arrange") @@ -527,7 +527,7 @@ where V: ExchangeData, R: ExchangeData, Tr::Batch: Batch, - Tr::Batcher: Batcher, + Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, Tr::Builder: Builder, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); @@ -547,7 +547,7 @@ where R: Clone, Tr: Trace+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, + Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, Tr::Builder: Builder, ; } @@ -561,164 +561,174 @@ where R: Semigroup, { fn arrange_core(&self, pact: P, name: &str) -> Arranged> - where - P: ParallelizationContract>, - Tr: Trace+'static, - Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, + where + P: ParallelizationContract>, + Tr: Trace+'static, + Tr::Batch: Batch, + Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, + Tr::Builder: Builder, { - // The `Arrange` operator is tasked with reacting to an advancing input - // frontier by producing the sequence of batches whose lower and upper - // bounds are those frontiers, containing updates at times greater or - // equal to lower and not greater or equal to upper. - // - // The operator uses its batch type's `Batcher`, which accepts update - // triples and responds to requests to "seal" batches (presented as new - // upper frontiers). - // - // Each sealed batch is presented to the trace, and if at all possible - // transmitted along the outgoing channel. Empty batches may not have - // a corresponding capability, as they are only retained for actual data - // held by the batcher, which may prevents the operator from sending an - // empty batch. - - let mut reader: Option> = None; - - // fabricate a data-parallel operator using the `unary_notify` pattern. - let stream = { - - let reader = &mut reader; - - self.inner.unary_frontier(pact, name, move |_capability, info| { - - // Acquire a logger for arrange events. - let logger = { - let scope = self.scope(); - let register = scope.log_register(); - register.get::("differential/arrange") - }; - - // Where we will deposit received updates, and from which we extract batches. - let mut batcher = Tr::Batcher::new(logger.clone(), info.global_id); - - // Capabilities for the lower envelope of updates in `batcher`. - let mut capabilities = Antichain::>::new(); - - let activator = Some(self.scope().activator_for(&info.address[..])); - let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - // If there is default exertion logic set, install it. - if let Some(exert_logic) = self.inner.scope().config().get::("differential/default_exert_logic").cloned() { - empty_trace.set_exert_logic(exert_logic); - } - - let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); - - *reader = Some(reader_local); - - // Initialize to the minimal input frontier. - let mut prev_frontier = Antichain::from_elem(::minimum()); - - move |input, output| { - - // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. - // We don't have to keep all capabilities, but we need to be able to form output messages - // when we realize that time intervals are complete. + arrange_core(&self.inner, pact, name) + } +} +fn arrange_core(stream: &StreamCore::Input>, pact: P, name: &str) -> Arranged> +where + G: Scope, + G::Timestamp: Lattice, + P: ParallelizationContract::Input>, + Tr: Trace+'static, + Tr::Batch: Batch, + Tr::Batcher: Batcher