Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .github/workflows/_test_examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
uses: ./.github/actions/utils/setup-rust-with-cache
with:
cache-targets: false # Only cache registry and git deps, not target dir (sccache handles that)
save-cache: "false" # Only builds server+examples, let Rust test job save comprehensive cache
save-cache: "false" # Only builds server+examples, let Rust test job save comprehensive cache

- name: Setup Node with cache for examples
if: inputs.component == 'examples-suite'
Expand Down Expand Up @@ -152,6 +152,15 @@ jobs:
if: inputs.component == 'examples-suite' && inputs.task == 'examples-node'
run: |
echo "Running Node.js examples tests..."
# Build the local Node SDK first (examples use file: link to it)
cd foreign/node
npm ci
npm run build
cd ../..
# Install examples dependencies (will use the local SDK)
cd examples/node
npm ci
cd ../..
./scripts/run-node-examples-from-readme.sh

- name: Run Java examples
Expand Down
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ humantime = "2.3.0"
hwlocality = "1.0.0-alpha.11"
iceberg = "0.8.0"
iceberg-catalog-rest = "0.8.0"
iggy = { path = "core/sdk", version = "0.8.2-edge.2" }
iggy_binary_protocol = { path = "core/binary_protocol", version = "0.8.2-edge.2" }
iggy_common = { path = "core/common", version = "0.8.2-edge.2" }
iggy = { path = "core/sdk", version = "0.8.3-edge.1" }
iggy_binary_protocol = { path = "core/binary_protocol", version = "0.8.3-edge.1" }
iggy_common = { path = "core/common", version = "0.8.3-edge.1" }
iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.3-edge.1" }
integration = { path = "core/integration" }
journal = { path = "core/journal" }
Expand Down
14 changes: 7 additions & 7 deletions DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -386,14 +386,14 @@ icu_provider: 2.1.1, "Unicode-3.0",
ident_case: 1.0.1, "Apache-2.0 OR MIT",
idna: 1.1.0, "Apache-2.0 OR MIT",
idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
iggy: 0.8.2-edge.2, "Apache-2.0",
iggy-bench: 0.3.2-edge.1, "Apache-2.0",
iggy: 0.8.3-edge.1, "Apache-2.0",
iggy-bench: 0.3.3-edge.1, "Apache-2.0",
iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
iggy-cli: 0.10.2-edge.1, "Apache-2.0",
iggy-cli: 0.10.3-edge.1, "Apache-2.0",
iggy-connectors: 0.2.3-edge.1, "Apache-2.0",
iggy-mcp: 0.2.2-edge.1, "Apache-2.0",
iggy_binary_protocol: 0.8.2-edge.2, "Apache-2.0",
iggy_common: 0.8.2-edge.2, "Apache-2.0",
iggy-mcp: 0.2.3-edge.1, "Apache-2.0",
iggy_binary_protocol: 0.8.3-edge.1, "Apache-2.0",
iggy_common: 0.8.3-edge.1, "Apache-2.0",
iggy_connector_elasticsearch_sink: 0.2.2-edge.1, "Apache-2.0",
iggy_connector_elasticsearch_source: 0.2.2-edge.1, "Apache-2.0",
iggy_connector_iceberg_sink: 0.2.2-edge.1, "Apache-2.0",
Expand Down Expand Up @@ -720,7 +720,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT",
serde_yaml_ng: 0.10.0, "MIT",
serial_test: 3.3.1, "MIT",
serial_test_derive: 3.3.1, "MIT",
server: 0.6.2-edge.1, "Apache-2.0",
server: 0.6.3-edge.1, "Apache-2.0",
sha1: 0.10.6, "Apache-2.0 OR MIT",
sha2: 0.10.9, "Apache-2.0 OR MIT",
sha3: 0.10.8, "Apache-2.0 OR MIT",
Expand Down
2 changes: 1 addition & 1 deletion core/ai/mcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[package]
name = "iggy-mcp"
version = "0.2.2-edge.1"
version = "0.2.3-edge.1"
description = "MCP Server for Iggy message streaming platform"
edition = "2024"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion core/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[package]
name = "iggy-bench"
version = "0.3.2-edge.1"
version = "0.3.3-edge.1"
edition = "2024"
license = "Apache-2.0"
repository = "https://github.com/apache/iggy"
Expand Down
2 changes: 1 addition & 1 deletion core/binary_protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[package]
name = "iggy_binary_protocol"
version = "0.8.2-edge.2"
version = "0.8.3-edge.1"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2024"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion core/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[package]
name = "iggy-cli"
version = "0.10.2-edge.1"
version = "0.10.3-edge.1"
edition = "2024"
authors = ["bartosz.ciesla@gmail.com"]
repository = "https://github.com/apache/iggy"
Expand Down
2 changes: 1 addition & 1 deletion core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
[package]
name = "iggy_common"
version = "0.8.2-edge.2"
version = "0.8.3-edge.1"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2024"
license = "Apache-2.0"
Expand Down
2 changes: 2 additions & 0 deletions core/common/src/error/iggy_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ pub enum IggyError {
ProducerClosed = 4057,
#[error("Invalid offset: {0}")]
InvalidOffset(u64) = 4100,
#[error("Invalid reserved field value: {0}, expected: 0")]
InvalidReservedField(u64) = 4101,
#[error("Consumer group with ID: {0} for topic with ID: {1} was not found.")]
ConsumerGroupIdNotFound(Identifier, Identifier) = 5000,
#[error("Invalid consumer group ID")]
Expand Down
1 change: 1 addition & 0 deletions core/common/src/types/message/iggy_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl IggyMessage {
origin_timestamp: IggyTimestamp::now().as_micros(),
user_headers_length,
payload_length: payload.len() as u32,
reserved: 0,
};

let user_headers = user_headers.map(|h| h.to_bytes());
Expand Down
142 changes: 141 additions & 1 deletion core/common/src/types/message/message_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use std::ops::Range;

pub const IGGY_MESSAGE_HEADER_SIZE: usize = 8 + 16 + 8 + 8 + 8 + 4 + 4;
pub const IGGY_MESSAGE_HEADER_SIZE: usize = 8 + 16 + 8 + 8 + 8 + 4 + 4 + 8;
pub const IGGY_MESSAGE_HEADER_RANGE: Range<usize> = 0..IGGY_MESSAGE_HEADER_SIZE;

pub const IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE: Range<usize> = 0..8;
Expand All @@ -31,6 +31,7 @@ pub const IGGY_MESSAGE_TIMESTAMP_OFFSET_RANGE: Range<usize> = 32..40;
pub const IGGY_MESSAGE_ORIGIN_TIMESTAMP_OFFSET_RANGE: Range<usize> = 40..48;
pub const IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE: Range<usize> = 48..52;
pub const IGGY_MESSAGE_PAYLOAD_LENGTH_OFFSET_RANGE: Range<usize> = 52..56;
pub const IGGY_MESSAGE_RESERVED_OFFSET_RANGE: Range<usize> = 56..64;

#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
pub struct IggyMessageHeader {
Expand All @@ -41,6 +42,8 @@ pub struct IggyMessageHeader {
pub origin_timestamp: u64,
pub user_headers_length: u32,
pub payload_length: u32,
// Reserved for future use
pub reserved: u64,
}

impl Sizeable for IggyMessageHeader {
Expand Down Expand Up @@ -91,6 +94,17 @@ impl IggyMessageHeader {
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
),
reserved: {
let reserved = u64::from_le_bytes(
bytes[IGGY_MESSAGE_RESERVED_OFFSET_RANGE]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
if reserved != 0 {
return Err(IggyError::InvalidReservedField(reserved));
}
reserved
},
})
}
}
Expand All @@ -105,6 +119,7 @@ impl BytesSerializable for IggyMessageHeader {
bytes.put_u64_le(self.origin_timestamp);
bytes.put_u32_le(self.user_headers_length);
bytes.put_u32_le(self.payload_length);
bytes.put_u64_le(self.reserved);
bytes.freeze()
}

Expand Down Expand Up @@ -155,6 +170,16 @@ impl BytesSerializable for IggyMessageHeader {
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);

let reserved = u64::from_le_bytes(
bytes[IGGY_MESSAGE_RESERVED_OFFSET_RANGE]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);

if reserved != 0 {
return Err(IggyError::InvalidReservedField(reserved));
}

Ok(IggyMessageHeader {
checksum,
id,
Expand All @@ -163,6 +188,121 @@ impl BytesSerializable for IggyMessageHeader {
origin_timestamp,
user_headers_length: headers_length,
payload_length,
reserved,
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn should_serialize_and_deserialize_header() {
let header = IggyMessageHeader {
checksum: 123456789,
id: 987654321,
offset: 100,
timestamp: 1000000,
origin_timestamp: 999999,
user_headers_length: 50,
payload_length: 200,
reserved: 0,
};

let bytes = header.to_bytes();
assert_eq!(bytes.len(), IGGY_MESSAGE_HEADER_SIZE);

let deserialized = IggyMessageHeader::from_bytes(bytes).unwrap();
assert_eq!(header, deserialized);
}

#[test]
fn should_serialize_header_to_correct_size() {
let header = IggyMessageHeader::default();
let bytes = header.to_bytes();
assert_eq!(bytes.len(), IGGY_MESSAGE_HEADER_SIZE);
assert_eq!(bytes.len(), 64);
}

#[test]
fn should_fail_to_deserialize_invalid_size() {
let bytes = Bytes::from(vec![0u8; 56]);
let result = IggyMessageHeader::from_bytes(bytes);
assert!(result.is_err());
}

#[test]
fn should_deserialize_from_raw_bytes() {
let header = IggyMessageHeader {
checksum: 111,
id: 222,
offset: 333,
timestamp: 444,
origin_timestamp: 555,
user_headers_length: 66,
payload_length: 77,
reserved: 0,
};

let bytes = header.to_bytes();
let deserialized = IggyMessageHeader::from_raw_bytes(&bytes).unwrap();
assert_eq!(header.checksum, deserialized.checksum);
assert_eq!(header.id, deserialized.id);
assert_eq!(header.offset, deserialized.offset);
assert_eq!(header.timestamp, deserialized.timestamp);
assert_eq!(header.origin_timestamp, deserialized.origin_timestamp);
assert_eq!(header.user_headers_length, deserialized.user_headers_length);
assert_eq!(header.payload_length, deserialized.payload_length);
}

#[test]
fn should_reject_non_zero_reserved_field_from_bytes() {
let header = IggyMessageHeader {
checksum: 123456789,
id: 987654321,
offset: 100,
timestamp: 1000000,
origin_timestamp: 999999,
user_headers_length: 50,
payload_length: 200,
reserved: 0,
};

let mut bytes = header.to_bytes().to_vec();
let non_zero_reserved: u64 = 42;
bytes[IGGY_MESSAGE_RESERVED_OFFSET_RANGE].copy_from_slice(&non_zero_reserved.to_le_bytes());

let result = IggyMessageHeader::from_bytes(Bytes::from(bytes));
assert!(result.is_err());
assert_eq!(
result.unwrap_err(),
IggyError::InvalidReservedField(non_zero_reserved)
);
}

#[test]
fn should_reject_non_zero_reserved_field_from_raw_bytes() {
let header = IggyMessageHeader {
checksum: 111,
id: 222,
offset: 333,
timestamp: 444,
origin_timestamp: 555,
user_headers_length: 66,
payload_length: 77,
reserved: 0,
};

let mut bytes = header.to_bytes().to_vec();
let non_zero_reserved: u64 = 123456789;
bytes[IGGY_MESSAGE_RESERVED_OFFSET_RANGE].copy_from_slice(&non_zero_reserved.to_le_bytes());

let result = IggyMessageHeader::from_raw_bytes(&bytes);
assert!(result.is_err());
assert_eq!(
result.unwrap_err(),
IggyError::InvalidReservedField(non_zero_reserved)
);
}
}
Loading
Loading