Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions misc/cargo-vet/audits.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ notes = "Reads and writes files under a prefix controlled by the caller."
[[audits.differential-dataflow]]
who = "Moritz Hoffmann <mh@materialize.com>"
criteria = "safe-to-deploy"
version = "0.12.0@git:07ed40c3c44e6fc43efa5decdaa6e57ba9ce7fac"
version = "0.12.0@git:a3f0f31d280b41254fd98fe51ef614622addd949"

[[audits.domain]]
who = "Matt Jibson <matt.jibson@gmail.com>"
Expand Down Expand Up @@ -480,22 +480,22 @@ version = "0.17.0"
[[audits.timely]]
who = "Moritz Hoffmann <mh@materialize.com>"
criteria = "safe-to-deploy"
version = "0.12.0@git:dfb4721ac698a3a26dcecf7826d8a3f4fd23952f"
version = "0.12.0@git:89bcb738c573f2b2b23ac3ec71d858c5afc54bb0"

[[audits.timely_bytes]]
who = "Moritz Hoffmann <mh@materialize.com>"
criteria = "safe-to-deploy"
version = "0.12.0@git:dfb4721ac698a3a26dcecf7826d8a3f4fd23952f"
version = "0.12.0@git:89bcb738c573f2b2b23ac3ec71d858c5afc54bb0"

[[audits.timely_communication]]
who = "Moritz Hoffmann <mh@materialize.com>"
criteria = "safe-to-deploy"
version = "0.12.0@git:dfb4721ac698a3a26dcecf7826d8a3f4fd23952f"
version = "0.12.0@git:89bcb738c573f2b2b23ac3ec71d858c5afc54bb0"

[[audits.timely_logging]]
who = "Moritz Hoffmann <mh@materialize.com>"
criteria = "safe-to-deploy"
version = "0.12.0@git:dfb4721ac698a3a26dcecf7826d8a3f4fd23952f"
version = "0.12.0@git:89bcb738c573f2b2b23ac3ec71d858c5afc54bb0"

[[audits.tokio-postgres]]
who = "Roshan Jobanputra <roshan@materialize.com>"
Expand Down
1 change: 0 additions & 1 deletion src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,6 @@ impl IndexPeek {
for<'a> Tr::Key<'a>: ToDatumIter,
for<'a> Tr::Val<'a>: ToDatumIter,
Tr::KeyOwned: Columnation + Data + FromDatumIter + ToDatumIter,
Tr::ValOwned: Columnation + Data + ToDatumIter,
{
let max_result_size = usize::cast_from(max_result_size);
let count_byte_size = std::mem::size_of::<NonZeroUsize>();
Expand Down
77 changes: 46 additions & 31 deletions src/compute/src/extensions/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,20 @@ where
G::Timestamp: Lattice + Ord,
{
/// Applies `reduce` to arranged data, and returns an arrangement of output data.
fn mz_reduce_abelian<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
fn mz_reduce_abelian<L, V, F, T2>(
&self,
name: &str,
from: F,
logic: L,
) -> Arranged<G, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = G::Timestamp> + 'static,
T2::ValOwned: Data,
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned, T2::Diff)>)
+ 'static,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>) + 'static,
Arranged<G, TraceAgent<T2>>: ArrangementSize;
}

Expand All @@ -48,20 +53,26 @@ where
T1::Diff: Semigroup,
{
/// Applies `reduce` to arranged data, and returns an arrangement of output data.
fn mz_reduce_abelian<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
fn mz_reduce_abelian<L, V, F, T2>(
&self,
name: &str,
from: F,
logic: L,
) -> Arranged<G, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = G::Timestamp> + 'static,
T2::ValOwned: Data,
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned, T2::Diff)>)
+ 'static,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>) + 'static,
Arranged<G, TraceAgent<T2>>: ArrangementSize,
{
// Allow access to `reduce_abelian` since we're within Mz's wrapper and force arrangement size logging.
#[allow(clippy::disallowed_methods)]
Arranged::<_, _>::reduce_abelian::<_, T2>(self, name, logic).log_arrangement_size()
Arranged::<_, _>::reduce_abelian::<_, _, _, T2>(self, name, from, logic)
.log_arrangement_size()
}
}

Expand All @@ -76,32 +87,34 @@ where
/// are produced as a result. The method is useful for reductions that need to present different
/// output views on the same input data. An example is producing an error-free reduction output
/// along with a separate error output indicating when the error-free output is valid.
fn reduce_pair<L1, T1, L2, T2>(
fn reduce_pair<L1, V1, F1, T1, L2, V2, F2, T2>(
&self,
name1: &str,
name2: &str,
from1: F1,
from2: F2,
logic1: L1,
logic2: L2,
) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
where
T1: Trace
+ for<'a> TraceReader<Key<'a> = Tr::Key<'a>, KeyOwned = Tr::KeyOwned, Time = G::Timestamp>
+ 'static,
T1::ValOwned: Data,
T1::Diff: Abelian,
T1::Batch: Batch,
T1::Builder: Builder<Input = ((T1::KeyOwned, T1::ValOwned), T1::Time, T1::Diff)>,
L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T1::ValOwned, T1::Diff)>)
+ 'static,
T1::Builder: Builder<Input = ((T1::KeyOwned, V1), T1::Time, T1::Diff)>,
L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V1, T1::Diff)>) + 'static,
V1: Data,
F1: Fn(T1::Val<'_>) -> V1 + 'static,
T2: Trace
+ for<'a> TraceReader<Key<'a> = Tr::Key<'a>, KeyOwned = Tr::KeyOwned, Time = G::Timestamp>
+ 'static,
T2::ValOwned: Data,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T2::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T2::ValOwned, T2::Diff)>)
+ 'static,
T2::Builder: Builder<Input = ((T2::KeyOwned, V2), T2::Time, T2::Diff)>,
L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V2, T2::Diff)>) + 'static,
V2: Data,
F2: Fn(T2::Val<'_>) -> V2 + 'static,
Arranged<G, TraceAgent<T1>>: ArrangementSize,
Arranged<G, TraceAgent<T2>>: ArrangementSize;
}
Expand All @@ -112,37 +125,39 @@ where
Tr: TraceReader<Time = G::Timestamp> + Clone + 'static,
Tr::Diff: Semigroup,
{
fn reduce_pair<L1, T1, L2, T2>(
fn reduce_pair<L1, V1, F1, T1, L2, V2, F2, T2>(
&self,
name1: &str,
name2: &str,
from1: F1,
from2: F2,
logic1: L1,
logic2: L2,
) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
where
T1: Trace
+ for<'a> TraceReader<Key<'a> = Tr::Key<'a>, KeyOwned = Tr::KeyOwned, Time = G::Timestamp>
+ 'static,
T1::ValOwned: Data,
T1::Diff: Abelian,
T1::Batch: Batch,
T1::Builder: Builder<Input = ((T1::KeyOwned, T1::ValOwned), T1::Time, T1::Diff)>,
L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T1::ValOwned, T1::Diff)>)
+ 'static,
T1::Builder: Builder<Input = ((T1::KeyOwned, V1), T1::Time, T1::Diff)>,
L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V1, T1::Diff)>) + 'static,
V1: Data,
F1: Fn(T1::Val<'_>) -> V1 + 'static,
T2: Trace
+ for<'a> TraceReader<Key<'a> = Tr::Key<'a>, KeyOwned = Tr::KeyOwned, Time = G::Timestamp>
+ 'static,
T2::ValOwned: Data,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T2::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T2::ValOwned, T2::Diff)>)
+ 'static,
T2::Builder: Builder<Input = ((T2::KeyOwned, V2), T2::Time, T2::Diff)>,
L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V2, T2::Diff)>) + 'static,
V2: Data,
F2: Fn(T2::Val<'_>) -> V2 + 'static,
Arranged<G, TraceAgent<T1>>: ArrangementSize,
Arranged<G, TraceAgent<T2>>: ArrangementSize,
{
let arranged1 = self.mz_reduce_abelian::<L1, T1>(name1, logic1);
let arranged2 = self.mz_reduce_abelian::<L2, T2>(name2, logic2);
let arranged1 = self.mz_reduce_abelian::<L1, _, _, T1>(name1, from1, logic1);
let arranged2 = self.mz_reduce_abelian::<L2, _, _, T2>(name2, from2, logic2);
(arranged1, arranged2)
}
}
8 changes: 5 additions & 3 deletions src/compute/src/logging/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, GlobalId, Timestamp};
use mz_timely_util::replay::MzReplay;
use timely::communication::Allocate;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter, Tee};
Expand Down Expand Up @@ -473,7 +474,8 @@ struct ArrangementSizeState {

type Update<D> = (D, Timestamp, Diff);
type Pusher<D> = Counter<Timestamp, Vec<Update<D>>, Tee<Timestamp, Vec<Update<D>>>>;
type OutputSession<'a, D> = Session<'a, Timestamp, Vec<Update<D>>, Pusher<D>>;
type OutputSession<'a, D> =
Session<'a, Timestamp, CapacityContainerBuilder<Vec<Update<D>>>, Pusher<D>>;

/// Bundled output sessions used by the demux operator.
struct DemuxOutput<'a> {
Expand Down Expand Up @@ -978,7 +980,7 @@ where
let diff = buffer.iter().map(|(_d, _t, r)| r).sum();
logger.log(ComputeEvent::ErrorCount { export_id, diff });

output.session(&cap).give_vec(&mut buffer);
output.session(&cap).give_container(&mut buffer);
});
}
})
Expand All @@ -1001,7 +1003,7 @@ where
let diff = buffer.iter().map(sum_batch_diffs).sum();
logger.log(ComputeEvent::ErrorCount { export_id, diff });

output.session(&cap).give_vec(&mut buffer);
output.session(&cap).give_container(&mut buffer);
});
}
})
Expand Down
Loading