From 9d65e695b1a81290a1cd0f7ed68a72a8d815863a Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 12 Feb 2025 14:29:49 +0100 Subject: [PATCH 1/3] Upgrade columnar, timely Signed-off-by: Moritz Hoffmann --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9ecc75a2b..b4867da22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,10 +45,10 @@ bytemuck = "1.18.0" serde = { version = "1.0", features = ["derive"] } fnv="1.0.2" timely = {workspace = true} -columnar = "0.2" +columnar = "0.3" [workspace.dependencies] -timely = { version = "0.17", default-features = false } +timely = { version = "0.18", default-features = false } #timely = { path = "../timely-dataflow/timely/", default-features = false } [features] From e0ca9badec608fb7570c23eeffcd4acd20c20f8a Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 12 Feb 2025 14:57:14 +0100 Subject: [PATCH 2/3] Correct columnar example Signed-off-by: Moritz Hoffmann --- examples/columnar.rs | 51 +++++++++++++++++--------------------------- 1 file changed, 20 insertions(+), 31 deletions(-) diff --git a/examples/columnar.rs b/examples/columnar.rs index 637e8394e..80e41c521 100644 --- a/examples/columnar.rs +++ b/examples/columnar.rs @@ -154,8 +154,8 @@ mod container { } } - use columnar::{Clear, Len, Index, AsBytes, FromBytes}; - use columnar::bytes::serialization::decode; + use columnar::{Clear, Len, Index, FromBytes}; + use columnar::bytes::{EncodeDecode, Indexed}; use columnar::common::IterOwn; use timely::Container; @@ -163,8 +163,8 @@ mod container { fn len(&self) -> usize { match self { Column::Typed(t) => t.len(), - Column::Bytes(b) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).len(), - Column::Align(a) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(a)).len(), + Column::Bytes(b) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(), + Column::Align(a) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(), } } // This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into. @@ -181,8 +181,8 @@ mod container { fn iter<'a>(&'a self) -> Self::Iter<'a> { match self { Column::Typed(t) => t.borrow().into_iter(), - Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(), - Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(), + Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_iter(), + Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_iter(), } } @@ -191,8 +191,8 @@ mod container { fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { match self { Column::Typed(t) => t.borrow().into_iter(), - Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(), - Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(), + Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_iter(), + Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_iter(), } } } @@ -202,7 +202,7 @@ mod container { fn at_capacity(&self) -> bool { match self { Self::Typed(t) => { - let length_in_bytes = t.borrow().length_in_words() * 8; + let length_in_bytes = Indexed::length_in_bytes(&t.borrow()); length_in_bytes >= (1 << 20) }, Self::Bytes(_) => true, @@ -249,7 +249,7 @@ mod container { fn length_in_bytes(&self) -> usize { match self { // We'll need one u64 for the length, then the length rounded up to a multiple of 8. - Column::Typed(t) => 8 * t.borrow().length_in_words(), + Column::Typed(t) => Indexed::length_in_bytes(&t.borrow()), Column::Bytes(b) => b.len(), Column::Align(a) => 8 * a.len(), } @@ -257,20 +257,7 @@ mod container { fn into_bytes(&self, writer: &mut W) { match self { - Column::Typed(t) => { - use columnar::Container; - // Columnar data is serialized as a sequence of `u64` values, with each `[u8]` slice - // serialize as first its length in bytes, and then as many `u64` values as needed. - // Padding should be added, but only for alignment; no specific values are required. - for (align, bytes) in t.borrow().as_bytes() { - assert!(align <= 8); - let length: u64 = bytes.len().try_into().unwrap(); - writer.write_all(bytemuck::cast_slice(std::slice::from_ref(&length))).unwrap(); - writer.write_all(bytes).unwrap(); - let padding: usize = ((8 - (length % 8)) % 8).try_into().unwrap(); - writer.write_all(&[0; 8][..padding]).unwrap(); - } - }, + Column::Typed(t) => Indexed::write(writer, &t.borrow()).unwrap(), Column::Bytes(b) => writer.write_all(b).unwrap(), Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(), } @@ -281,9 +268,11 @@ mod container { use builder::ColumnBuilder; mod builder { - use std::collections::VecDeque; - use columnar::{Columnar, Clear, Len, AsBytes, Push}; + + use columnar::{Columnar, Clear, Len, Push}; + use columnar::bytes::{EncodeDecode, Indexed}; + use super::Column; /// A container builder for `Column`. @@ -303,11 +292,11 @@ mod builder { self.current.push(item); // If there is less than 10% slop with 2MB backing allocations, mint a container. use columnar::Container; - let words = self.current.borrow().length_in_words(); + let words = Indexed::length_in_words(&self.current.borrow()); let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1); if round - words < round / 10 { let mut alloc = Vec::with_capacity(words); - columnar::bytes::serialization::encode(&mut alloc, self.current.borrow().as_bytes()); + Indexed::encode(&mut alloc, &self.current.borrow()); self.pending.push_back(Column::Align(alloc.into_boxed_slice())); self.current.clear(); } @@ -342,9 +331,9 @@ mod builder { fn finish(&mut self) -> Option<&mut Self::Container> { if !self.current.is_empty() { use columnar::Container; - let words = self.current.borrow().length_in_words(); + let words = Indexed::length_in_words(&self.current.borrow()); let mut alloc = Vec::with_capacity(words); - columnar::bytes::serialization::encode(&mut alloc, self.current.borrow().as_bytes()); + Indexed::encode(&mut alloc, &self.current.borrow()); self.pending.push_back(Column::Align(alloc.into_boxed_slice())); self.current.clear(); } @@ -405,7 +394,7 @@ pub mod batcher { } } - impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker + impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker where D: Columnar, for<'b> D::Ref<'b>: Ord + Copy, From 0c5736896da050414db7ce33cf8d946a488b3f46 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 12 Feb 2025 15:07:58 +0100 Subject: [PATCH 3/3] Tack on some auxiliary changes Fix versions in mdbook to precise versions (=), make check against Timely master more robust. Signed-off-by: Moritz Hoffmann --- .github/workflows/deploy.yml | 4 ++-- .github/workflows/test-timely-master.yml | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 5acef5c16..134e49053 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -15,8 +15,8 @@ jobs: run: | export DIFFERENTIAL_VERSION=$(cargo metadata --format-version 1 | jq -r '.packages[] | select(.name == "differential-dataflow") | .version') export TIMELY_VERSION=$(cargo metadata --format-version 1 | jq -r '.packages[] | select(.name == "timely") | .version') - sed -i "s/^differential-dataflow = .*/differential = \"$DIFFERENTIAL_VERSION\"/" mdbook/src/chapter_0/chapter_0_0.md - sed -i "s/^timely = .*/timely = \"$TIMELY_VERSION\"/" mdbook/src/chapter_0/chapter_0_0.md + sed -i "s/^differential-dataflow = .*/differential = \"=$DIFFERENTIAL_VERSION\"/" mdbook/src/chapter_0/chapter_0_0.md + sed -i "s/^timely = .*/timely = \"=$TIMELY_VERSION\"/" mdbook/src/chapter_0/chapter_0_0.md - run: cd mdbook && mdbook build - uses: JamesIves/github-pages-deploy-action@v4 with: diff --git a/.github/workflows/test-timely-master.yml b/.github/workflows/test-timely-master.yml index d198f24f5..cdc04866a 100644 --- a/.github/workflows/test-timely-master.yml +++ b/.github/workflows/test-timely-master.yml @@ -25,10 +25,13 @@ jobs: [patch.crates-io] timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } EOF + - name: Cargo upgrade + run: | + cargo install cargo-edit + cargo upgrade -p timely --incompatible - name: Cargo check against Timely master run: cargo check --all-targets - name: Cargo test against Timely master - working-directory: timely_master run: cargo test notify: