Skip to content

Commit 2cae3dd

Browse files
authored
Merge branch 'master' into feat/cli-login-logout
2 parents fbc8362 + f8be024 commit 2cae3dd

File tree

75 files changed

+2919
-3700
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+2919
-3700
lines changed

Cargo.lock

Lines changed: 2 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ dirs = "6.0.0"
135135
dlopen2 = "0.8.2"
136136
dotenvy = "0.15.7"
137137
elasticsearch = { version = "9.1.0-alpha.1", features = ["rustls-tls"], default-features = false }
138-
enum_dispatch = "0.3.13"
139138
enumset = "1.1"
140139
env_logger = "0.11.9"
141140
err_trail = { version = "0.11.0", features = ["tracing"] }

DEPENDENCIES.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,6 @@ embedded-io: 0.6.1, "Apache-2.0 OR MIT",
297297
encode_unicode: 1.0.0, "Apache-2.0 OR MIT",
298298
encoding_rs: 0.8.35, "(Apache-2.0 OR MIT) AND BSD-3-Clause",
299299
enum-as-inner: 0.6.1, "Apache-2.0 OR MIT",
300-
enum_dispatch: 0.3.13, "Apache-2.0 OR MIT",
301300
enumset: 1.1.10, "Apache-2.0 OR MIT",
302301
enumset_derive: 0.14.0, "Apache-2.0 OR MIT",
303302
equator: 0.4.2, "MIT",

core/binary_protocol/src/framing.rs

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,40 @@ impl<'a> RequestFrame<'a> {
4848
/// Size of the frame header: `[length:4][code:4]`.
4949
pub const HEADER_SIZE: usize = 8;
5050

51+
/// Validate a frame length field and return the payload size.
52+
///
53+
/// Transport layers that read the length and code fields incrementally
54+
/// (e.g. compio completion-based I/O) can use this to validate the length
55+
/// before reading the payload, without buffering the entire frame.
56+
///
57+
/// # Errors
58+
/// Returns `WireError::Validation` if `frame_length < 4` (too small to
59+
/// contain even the command code).
60+
pub fn payload_length(frame_length: u32) -> Result<u32, WireError> {
61+
frame_length
62+
.checked_sub(4)
63+
.ok_or(WireError::Validation(Cow::Borrowed(
64+
"request frame length must be at least 4 (code size)",
65+
)))
66+
}
67+
68+
/// Construct a frame from pre-parsed header fields and a payload slice.
69+
///
70+
/// Used by transport layers that read the header incrementally (e.g.
71+
/// compio completion-based I/O) and then read the payload separately.
72+
#[must_use]
73+
pub const fn from_parts(code: u32, payload: &'a [u8]) -> Self {
74+
Self { code, payload }
75+
}
76+
5177
/// Decode a request frame from a complete buffer.
5278
///
5379
/// # Errors
5480
/// Returns `WireError::UnexpectedEof` if the buffer is too short.
5581
pub fn decode(buf: &'a [u8]) -> Result<(Self, usize), WireError> {
56-
let length = read_u32_le(buf, 0)? as usize;
57-
if length < 4 {
58-
return Err(WireError::Validation(Cow::Borrowed(
59-
"request frame length must be at least 4 (code size)",
60-
)));
61-
}
82+
let frame_length = read_u32_le(buf, 0)?;
83+
let payload_len = Self::payload_length(frame_length)? as usize;
6284
let code = read_u32_le(buf, 4)?;
63-
let payload_len = length - 4;
6485
let payload = read_bytes(buf, Self::HEADER_SIZE, payload_len)?;
6586
let total = Self::HEADER_SIZE + payload_len;
6687
Ok((Self { code, payload }, total))
@@ -222,6 +243,23 @@ mod tests {
222243
assert!(RequestFrame::decode(&buf).is_err());
223244
}
224245

246+
#[test]
247+
fn payload_length_valid() {
248+
assert_eq!(RequestFrame::payload_length(4).unwrap(), 0);
249+
assert_eq!(RequestFrame::payload_length(104).unwrap(), 100);
250+
assert_eq!(
251+
RequestFrame::payload_length(u32::MAX).unwrap(),
252+
u32::MAX - 4
253+
);
254+
}
255+
256+
#[test]
257+
fn payload_length_too_small() {
258+
assert!(RequestFrame::payload_length(0).is_err());
259+
assert!(RequestFrame::payload_length(1).is_err());
260+
assert!(RequestFrame::payload_length(3).is_err());
261+
}
262+
225263
#[test]
226264
fn request_encoded_size() {
227265
assert_eq!(RequestFrame::encoded_size(0), Some(8));

core/binary_protocol/src/requests/system/get_snapshot.rs

Lines changed: 87 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,57 @@
1616
// under the License.
1717

1818
use crate::WireError;
19-
use crate::codec::{WireDecode, WireEncode};
20-
use bytes::BytesMut;
19+
use crate::codec::{WireDecode, WireEncode, read_u8};
20+
use bytes::{BufMut, BytesMut};
2121

22-
/// `GetSnapshot` request. Wire format: empty.
22+
/// `GetSnapshot` request.
23+
///
24+
/// Wire format:
25+
/// ```text
26+
/// [compression:1][types_count:1][snapshot_type:1]*
27+
/// ```
2328
#[derive(Debug, Clone, PartialEq, Eq)]
24-
pub struct GetSnapshotRequest;
29+
pub struct GetSnapshotRequest {
30+
/// Compression method code (1=Stored, 2=Deflated, 3=Bzip2, 4=Zstd, 5=Lzma, 6=Xz).
31+
pub compression: u8,
32+
/// Snapshot type codes (1=FilesystemOverview, 2=ProcessList, 3=ResourceUsage,
33+
/// 4=Test, 5=ServerLogs, 6=ServerConfig, 100=All).
34+
pub snapshot_types: Vec<u8>,
35+
}
2536

2637
impl WireEncode for GetSnapshotRequest {
2738
fn encoded_size(&self) -> usize {
28-
0
39+
1 + 1 + self.snapshot_types.len()
2940
}
3041

31-
fn encode(&self, _buf: &mut BytesMut) {}
42+
fn encode(&self, buf: &mut BytesMut) {
43+
buf.put_u8(self.compression);
44+
let count =
45+
u8::try_from(self.snapshot_types.len()).expect("snapshot_types count exceeds u8::MAX");
46+
buf.put_u8(count);
47+
for &code in &self.snapshot_types {
48+
buf.put_u8(code);
49+
}
50+
}
3251
}
3352

3453
impl WireDecode for GetSnapshotRequest {
35-
fn decode(_buf: &[u8]) -> Result<(Self, usize), WireError> {
36-
Ok((Self, 0))
54+
fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
55+
let compression = read_u8(buf, 0)?;
56+
let types_count = read_u8(buf, 1)? as usize;
57+
let mut pos = 2;
58+
let mut snapshot_types = Vec::with_capacity(types_count);
59+
for _ in 0..types_count {
60+
snapshot_types.push(read_u8(buf, pos)?);
61+
pos += 1;
62+
}
63+
Ok((
64+
Self {
65+
compression,
66+
snapshot_types,
67+
},
68+
pos,
69+
))
3770
}
3871
}
3972

@@ -43,11 +76,54 @@ mod tests {
4376

4477
#[test]
4578
fn roundtrip() {
46-
let req = GetSnapshotRequest;
79+
let req = GetSnapshotRequest {
80+
compression: 2,
81+
snapshot_types: vec![1, 5],
82+
};
83+
let bytes = req.to_bytes();
84+
assert_eq!(bytes.len(), 4); // 1 + 1 + 2
85+
let (decoded, consumed) = GetSnapshotRequest::decode(&bytes).unwrap();
86+
assert_eq!(consumed, bytes.len());
87+
assert_eq!(decoded, req);
88+
}
89+
90+
#[test]
91+
fn roundtrip_empty_types() {
92+
let req = GetSnapshotRequest {
93+
compression: 1,
94+
snapshot_types: vec![],
95+
};
96+
let bytes = req.to_bytes();
97+
assert_eq!(bytes.len(), 2);
98+
let (decoded, consumed) = GetSnapshotRequest::decode(&bytes).unwrap();
99+
assert_eq!(consumed, 2);
100+
assert_eq!(decoded, req);
101+
}
102+
103+
#[test]
104+
fn roundtrip_all_type() {
105+
let req = GetSnapshotRequest {
106+
compression: 2,
107+
snapshot_types: vec![100],
108+
};
47109
let bytes = req.to_bytes();
48-
assert!(bytes.is_empty());
49110
let (decoded, consumed) = GetSnapshotRequest::decode(&bytes).unwrap();
50-
assert_eq!(consumed, 0);
111+
assert_eq!(consumed, bytes.len());
51112
assert_eq!(decoded, req);
52113
}
114+
115+
#[test]
116+
fn truncated_returns_error() {
117+
let req = GetSnapshotRequest {
118+
compression: 2,
119+
snapshot_types: vec![1, 2, 3],
120+
};
121+
let bytes = req.to_bytes();
122+
for i in 0..bytes.len() {
123+
assert!(
124+
GetSnapshotRequest::decode(&bytes[..i]).is_err(),
125+
"expected error for truncation at byte {i}"
126+
);
127+
}
128+
}
53129
}

core/binary_protocol/src/requests/users/create_user.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ impl WireEncode for CreateUserRequest {
4242
+ self.password.len()
4343
+ 1 // status
4444
+ 1 // has_permissions
45-
+ 4 // permissions_len (always present)
46-
+ self.permissions.as_ref().map_or(0, WireEncode::encoded_size)
45+
+ self
46+
.permissions
47+
.as_ref()
48+
.map_or(0, |p| 4 + p.encoded_size())
4749
}
4850

4951
fn encode(&self, buf: &mut BytesMut) {
@@ -60,7 +62,6 @@ impl WireEncode for CreateUserRequest {
6062
buf.put_slice(&perm_bytes);
6163
} else {
6264
buf.put_u8(0);
63-
buf.put_u32_le(0);
6465
}
6566
}
6667
}
@@ -80,10 +81,9 @@ impl WireDecode for CreateUserRequest {
8081
let has_permissions = read_u8(buf, pos)?;
8182
pos += 1;
8283

83-
let perm_len = read_u32_le(buf, pos)? as usize;
84-
pos += 4;
85-
86-
let permissions = if has_permissions == 1 && perm_len > 0 {
84+
let permissions = if has_permissions == 1 {
85+
let perm_len = read_u32_le(buf, pos)? as usize;
86+
pos += 4;
8787
let (perms, consumed) = WirePermissions::decode(&buf[pos..])?;
8888
if consumed != perm_len {
8989
return Err(WireError::Validation(Cow::Owned(format!(
@@ -197,8 +197,8 @@ mod tests {
197197
};
198198
let bytes = req.to_bytes();
199199
// username: [1, b'u'] + password: [1, b'p'] + status: [0]
200-
// + has_perm: [0] + perm_len: [0,0,0,0]
201-
let expected: &[u8] = &[1, b'u', 1, b'p', 0, 0, 0, 0, 0, 0];
200+
// + has_perm: [0]
201+
let expected: &[u8] = &[1, b'u', 1, b'p', 0, 0];
202202
assert_eq!(&bytes[..], expected);
203203
}
204204
}

core/binary_protocol/src/requests/users/update_permissions.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ impl WireEncode for UpdatePermissionsRequest {
3636
fn encoded_size(&self) -> usize {
3737
self.user_id.encoded_size()
3838
+ 1 // has_permissions
39-
+ 4 // permissions_len (always present)
40-
+ self.permissions.as_ref().map_or(0, WireEncode::encoded_size)
39+
+ self
40+
.permissions
41+
.as_ref()
42+
.map_or(0, |p| 4 + p.encoded_size())
4143
}
4244

4345
fn encode(&self, buf: &mut BytesMut) {
@@ -50,7 +52,6 @@ impl WireEncode for UpdatePermissionsRequest {
5052
buf.put_slice(&perm_bytes);
5153
} else {
5254
buf.put_u8(0);
53-
buf.put_u32_le(0);
5455
}
5556
}
5657
}
@@ -62,10 +63,9 @@ impl WireDecode for UpdatePermissionsRequest {
6263
let has_permissions = read_u8(buf, pos)?;
6364
pos += 1;
6465

65-
let perm_len = read_u32_le(buf, pos)? as usize;
66-
pos += 4;
67-
68-
let permissions = if has_permissions == 1 && perm_len > 0 {
66+
let permissions = if has_permissions == 1 {
67+
let perm_len = read_u32_le(buf, pos)? as usize;
68+
pos += 4;
6969
let (perms, consumed) = WirePermissions::decode(&buf[pos..])?;
7070
if consumed != perm_len {
7171
return Err(WireError::Validation(Cow::Owned(format!(

core/integration/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ harness_derive = { workspace = true }
4242
humantime = { workspace = true }
4343
iggy = { workspace = true }
4444
iggy-cli = { workspace = true }
45+
iggy_binary_protocol = { workspace = true }
4546
iggy_common = { workspace = true }
4647
iggy_connector_sdk = { workspace = true, features = ["api"] }
4748
keyring = { workspace = true }
@@ -68,7 +69,6 @@ serial_test = { workspace = true }
6869
server = { workspace = true }
6970
socket2 = { workspace = true }
7071
sqlx = { workspace = true }
71-
strum = { workspace = true }
7272
sysinfo = { workspace = true }
7373
tempfile = { workspace = true }
7474
test-case = { workspace = true }

0 commit comments

Comments
 (0)