Skip to content

Commit c19a600

Browse files
authored
feat(rust): add sans-IO frame codec and command dispatch table (#2967)
1 parent 5cc7c37 commit c19a600

File tree

22 files changed

+1261
-158
lines changed

22 files changed

+1261
-158
lines changed

core/binary_protocol/src/codec.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,32 @@ pub fn read_u8(buf: &[u8], offset: usize) -> Result<u8, WireError> {
7575
})
7676
}
7777

78+
/// Helper to read a `u16` LE from `buf` at `offset`.
79+
///
80+
/// # Errors
81+
/// Returns `WireError::UnexpectedEof` if fewer than 2 bytes remain.
82+
#[allow(clippy::missing_panics_doc)]
83+
#[inline]
84+
pub fn read_u16_le(buf: &[u8], offset: usize) -> Result<u16, WireError> {
85+
let end = offset
86+
.checked_add(2)
87+
.ok_or_else(|| WireError::UnexpectedEof {
88+
offset,
89+
need: 2,
90+
have: buf.len().saturating_sub(offset),
91+
})?;
92+
let slice = buf
93+
.get(offset..end)
94+
.ok_or_else(|| WireError::UnexpectedEof {
95+
offset,
96+
need: 2,
97+
have: buf.len().saturating_sub(offset),
98+
})?;
99+
Ok(u16::from_le_bytes(
100+
slice.try_into().expect("slice is exactly 2 bytes"),
101+
))
102+
}
103+
78104
/// Helper to read a `u32` LE from `buf` at `offset`.
79105
///
80106
/// # Errors
@@ -224,3 +250,35 @@ pub fn read_bytes(buf: &[u8], offset: usize, len: usize) -> Result<&[u8], WireEr
224250
have: buf.len().saturating_sub(offset),
225251
})
226252
}
253+
254+
/// Cap a pre-allocation hint so a bogus wire count cannot cause OOM.
255+
/// The actual count is validated by the decode loop - this only limits
256+
/// the upfront allocation.
257+
#[inline]
258+
#[must_use]
259+
pub fn capped_capacity(count: usize, remaining: usize, min_item_size: usize) -> usize {
260+
if min_item_size == 0 {
261+
return count;
262+
}
263+
count.min(remaining / min_item_size)
264+
}
265+
266+
#[cfg(test)]
267+
mod tests {
268+
use super::*;
269+
270+
#[test]
271+
fn capped_capacity_limits_allocation() {
272+
assert_eq!(capped_capacity(1_000_000, 100, 10), 10);
273+
assert_eq!(capped_capacity(5, 100, 10), 5);
274+
assert_eq!(capped_capacity(10, 100, 10), 10);
275+
assert_eq!(capped_capacity(11, 100, 10), 10);
276+
assert_eq!(capped_capacity(0, 100, 10), 0);
277+
assert_eq!(capped_capacity(100, 0, 10), 0);
278+
}
279+
280+
#[test]
281+
fn capped_capacity_zero_item_size_returns_count() {
282+
assert_eq!(capped_capacity(1_000_000, 100, 0), 1_000_000);
283+
}
284+
}

core/binary_protocol/src/codes.rs

Lines changed: 5 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -87,58 +87,14 @@ pub const DELETE_CONSUMER_GROUP_CODE: u32 = 603;
8787
pub const JOIN_CONSUMER_GROUP_CODE: u32 = 604;
8888
pub const LEAVE_CONSUMER_GROUP_CODE: u32 = 605;
8989

90+
/// Lookup the human-readable name for a command code.
91+
///
9092
/// # Errors
9193
/// Returns `WireError::UnknownCommand` if the code is not recognized.
9294
pub const fn command_name(code: u32) -> Result<&'static str, WireError> {
93-
match code {
94-
PING_CODE => Ok("ping"),
95-
GET_STATS_CODE => Ok("stats"),
96-
GET_SNAPSHOT_FILE_CODE => Ok("snapshot"),
97-
GET_CLUSTER_METADATA_CODE => Ok("cluster.metadata"),
98-
GET_ME_CODE => Ok("me"),
99-
GET_CLIENT_CODE => Ok("client.get"),
100-
GET_CLIENTS_CODE => Ok("client.list"),
101-
GET_USER_CODE => Ok("user.get"),
102-
GET_USERS_CODE => Ok("user.list"),
103-
CREATE_USER_CODE => Ok("user.create"),
104-
DELETE_USER_CODE => Ok("user.delete"),
105-
UPDATE_USER_CODE => Ok("user.update"),
106-
UPDATE_PERMISSIONS_CODE => Ok("user.permissions"),
107-
CHANGE_PASSWORD_CODE => Ok("user.password"),
108-
LOGIN_USER_CODE => Ok("user.login"),
109-
LOGOUT_USER_CODE => Ok("user.logout"),
110-
GET_PERSONAL_ACCESS_TOKENS_CODE => Ok("personal_access_token.list"),
111-
CREATE_PERSONAL_ACCESS_TOKEN_CODE => Ok("personal_access_token.create"),
112-
DELETE_PERSONAL_ACCESS_TOKEN_CODE => Ok("personal_access_token.delete"),
113-
LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE => Ok("personal_access_token.login"),
114-
POLL_MESSAGES_CODE => Ok("message.poll"),
115-
SEND_MESSAGES_CODE => Ok("message.send"),
116-
FLUSH_UNSAVED_BUFFER_CODE => Ok("message.flush_unsaved_buffer"),
117-
GET_CONSUMER_OFFSET_CODE => Ok("consumer_offset.get"),
118-
STORE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.store"),
119-
DELETE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.delete"),
120-
GET_STREAM_CODE => Ok("stream.get"),
121-
GET_STREAMS_CODE => Ok("stream.list"),
122-
CREATE_STREAM_CODE => Ok("stream.create"),
123-
DELETE_STREAM_CODE => Ok("stream.delete"),
124-
UPDATE_STREAM_CODE => Ok("stream.update"),
125-
PURGE_STREAM_CODE => Ok("stream.purge"),
126-
GET_TOPIC_CODE => Ok("topic.get"),
127-
GET_TOPICS_CODE => Ok("topic.list"),
128-
CREATE_TOPIC_CODE => Ok("topic.create"),
129-
DELETE_TOPIC_CODE => Ok("topic.delete"),
130-
UPDATE_TOPIC_CODE => Ok("topic.update"),
131-
PURGE_TOPIC_CODE => Ok("topic.purge"),
132-
CREATE_PARTITIONS_CODE => Ok("partition.create"),
133-
DELETE_PARTITIONS_CODE => Ok("partition.delete"),
134-
DELETE_SEGMENTS_CODE => Ok("segment.delete"),
135-
GET_CONSUMER_GROUP_CODE => Ok("consumer_group.get"),
136-
GET_CONSUMER_GROUPS_CODE => Ok("consumer_group.list"),
137-
CREATE_CONSUMER_GROUP_CODE => Ok("consumer_group.create"),
138-
DELETE_CONSUMER_GROUP_CODE => Ok("consumer_group.delete"),
139-
JOIN_CONSUMER_GROUP_CODE => Ok("consumer_group.join"),
140-
LEAVE_CONSUMER_GROUP_CODE => Ok("consumer_group.leave"),
141-
_ => Err(WireError::UnknownCommand(code)),
95+
match crate::dispatch::lookup_command(code) {
96+
Some(meta) => Ok(meta.name),
97+
None => Err(WireError::UnknownCommand(code)),
14298
}
14399
}
144100

core/binary_protocol/src/consensus/operation.rs

Lines changed: 37 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -95,64 +95,48 @@ impl Operation {
9595
}
9696

9797
/// Bidirectional mapping: `Operation` -> client command code.
98+
///
99+
/// Delegates to the dispatch table as the single source of truth.
98100
#[must_use]
99101
pub const fn to_command_code(&self) -> Option<u32> {
100-
use crate::codes;
101102
match self {
102103
Self::Reserved => None,
103-
Self::CreateStream => Some(codes::CREATE_STREAM_CODE),
104-
Self::UpdateStream => Some(codes::UPDATE_STREAM_CODE),
105-
Self::DeleteStream => Some(codes::DELETE_STREAM_CODE),
106-
Self::PurgeStream => Some(codes::PURGE_STREAM_CODE),
107-
Self::CreateTopic => Some(codes::CREATE_TOPIC_CODE),
108-
Self::UpdateTopic => Some(codes::UPDATE_TOPIC_CODE),
109-
Self::DeleteTopic => Some(codes::DELETE_TOPIC_CODE),
110-
Self::PurgeTopic => Some(codes::PURGE_TOPIC_CODE),
111-
Self::CreatePartitions => Some(codes::CREATE_PARTITIONS_CODE),
112-
Self::DeletePartitions => Some(codes::DELETE_PARTITIONS_CODE),
113-
Self::DeleteSegments => Some(codes::DELETE_SEGMENTS_CODE),
114-
Self::CreateConsumerGroup => Some(codes::CREATE_CONSUMER_GROUP_CODE),
115-
Self::DeleteConsumerGroup => Some(codes::DELETE_CONSUMER_GROUP_CODE),
116-
Self::CreateUser => Some(codes::CREATE_USER_CODE),
117-
Self::UpdateUser => Some(codes::UPDATE_USER_CODE),
118-
Self::DeleteUser => Some(codes::DELETE_USER_CODE),
119-
Self::ChangePassword => Some(codes::CHANGE_PASSWORD_CODE),
120-
Self::UpdatePermissions => Some(codes::UPDATE_PERMISSIONS_CODE),
121-
Self::CreatePersonalAccessToken => Some(codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE),
122-
Self::DeletePersonalAccessToken => Some(codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE),
123-
Self::SendMessages => Some(codes::SEND_MESSAGES_CODE),
124-
Self::StoreConsumerOffset => Some(codes::STORE_CONSUMER_OFFSET_CODE),
104+
Self::CreateStream
105+
| Self::UpdateStream
106+
| Self::DeleteStream
107+
| Self::PurgeStream
108+
| Self::CreateTopic
109+
| Self::UpdateTopic
110+
| Self::DeleteTopic
111+
| Self::PurgeTopic
112+
| Self::CreatePartitions
113+
| Self::DeletePartitions
114+
| Self::DeleteSegments
115+
| Self::CreateConsumerGroup
116+
| Self::DeleteConsumerGroup
117+
| Self::CreateUser
118+
| Self::UpdateUser
119+
| Self::DeleteUser
120+
| Self::ChangePassword
121+
| Self::UpdatePermissions
122+
| Self::CreatePersonalAccessToken
123+
| Self::DeletePersonalAccessToken
124+
| Self::SendMessages
125+
| Self::StoreConsumerOffset => match crate::dispatch::lookup_by_operation(*self) {
126+
Some(meta) => Some(meta.code),
127+
None => None,
128+
},
125129
}
126130
}
127131

128132
/// Bidirectional mapping: client command code -> `Operation`.
133+
///
134+
/// Delegates to the dispatch table as the single source of truth.
129135
#[must_use]
130136
pub const fn from_command_code(code: u32) -> Option<Self> {
131-
use crate::codes;
132-
match code {
133-
codes::CREATE_STREAM_CODE => Some(Self::CreateStream),
134-
codes::UPDATE_STREAM_CODE => Some(Self::UpdateStream),
135-
codes::DELETE_STREAM_CODE => Some(Self::DeleteStream),
136-
codes::PURGE_STREAM_CODE => Some(Self::PurgeStream),
137-
codes::CREATE_TOPIC_CODE => Some(Self::CreateTopic),
138-
codes::UPDATE_TOPIC_CODE => Some(Self::UpdateTopic),
139-
codes::DELETE_TOPIC_CODE => Some(Self::DeleteTopic),
140-
codes::PURGE_TOPIC_CODE => Some(Self::PurgeTopic),
141-
codes::CREATE_PARTITIONS_CODE => Some(Self::CreatePartitions),
142-
codes::DELETE_PARTITIONS_CODE => Some(Self::DeletePartitions),
143-
codes::DELETE_SEGMENTS_CODE => Some(Self::DeleteSegments),
144-
codes::CREATE_CONSUMER_GROUP_CODE => Some(Self::CreateConsumerGroup),
145-
codes::DELETE_CONSUMER_GROUP_CODE => Some(Self::DeleteConsumerGroup),
146-
codes::CREATE_USER_CODE => Some(Self::CreateUser),
147-
codes::UPDATE_USER_CODE => Some(Self::UpdateUser),
148-
codes::DELETE_USER_CODE => Some(Self::DeleteUser),
149-
codes::CHANGE_PASSWORD_CODE => Some(Self::ChangePassword),
150-
codes::UPDATE_PERMISSIONS_CODE => Some(Self::UpdatePermissions),
151-
codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE => Some(Self::CreatePersonalAccessToken),
152-
codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE => Some(Self::DeletePersonalAccessToken),
153-
codes::SEND_MESSAGES_CODE => Some(Self::SendMessages),
154-
codes::STORE_CONSUMER_OFFSET_CODE => Some(Self::StoreConsumerOffset),
155-
_ => None,
137+
match crate::dispatch::lookup_command(code) {
138+
Some(meta) => meta.operation,
139+
None => None,
156140
}
157141
}
158142
}
@@ -204,10 +188,11 @@ mod tests {
204188

205189
#[test]
206190
fn read_only_commands_have_no_operation() {
207-
assert!(Operation::from_command_code(crate::PING_CODE).is_none());
208-
assert!(Operation::from_command_code(crate::GET_STATS_CODE).is_none());
209-
assert!(Operation::from_command_code(crate::GET_STREAM_CODE).is_none());
210-
assert!(Operation::from_command_code(crate::POLL_MESSAGES_CODE).is_none());
191+
use crate::codes::{GET_STATS_CODE, GET_STREAM_CODE, PING_CODE, POLL_MESSAGES_CODE};
192+
assert!(Operation::from_command_code(PING_CODE).is_none());
193+
assert!(Operation::from_command_code(GET_STATS_CODE).is_none());
194+
assert!(Operation::from_command_code(GET_STREAM_CODE).is_none());
195+
assert!(Operation::from_command_code(POLL_MESSAGES_CODE).is_none());
211196
}
212197

213198
#[test]

0 commit comments

Comments
 (0)