Skip to content

Commit c5e55c9

Browse files
authored
Add "ring" queue mode for ROS2 payload and serde_bytes (#914)
1 parent 469151f commit c5e55c9

File tree

5 files changed

+88
-20
lines changed

5 files changed

+88
-20
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ serde = { version = "1.0", default-features = false, features = [
180180
"derive",
181181
"alloc",
182182
] }
183+
serde_bytes = { version = "0.11", default-features = false, features = [
184+
"alloc",
185+
] }
183186
serde_json = { version = "1.0", default-features = false }
184187
erased-serde = { version = "0.4", default-features = false, features = [
185188
"alloc",

components/bridges/cu_ros2_bridge/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ Bridge-level config:
1414
Per-channel config (`bridges[].channels`):
1515
- `route`: ROS 2 topic path used by that channel.
1616

17+
Rx-only options:
18+
- `queue_mode`: `"fifo"` (default) First In First Out mode. Delivers samples in order, one per cycle. `"ring"` creates a buffer and drops oldest sample when full.
19+
- `ring_size`: depth of the ring buffer when `queue_mode` is `"ring"` (default `1`).
20+
1721
Example:
1822

1923
```ron
@@ -28,7 +32,10 @@ bridges: [
2832
},
2933
channels: [
3034
Tx(id: "outgoing", route: "/output"),
35+
// fifo: ordered delivery, one sample per cycle (default)
3136
Rx(id: "incoming", route: "/input"),
37+
// ring: always receive the freshest sample
38+
Rx(id: "sensor", route: "/sensor", config: {"queue_mode": "ring"}),
3239
],
3340
),
3441
],

components/bridges/cu_ros2_bridge/src/lib.rs

Lines changed: 75 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,49 @@ where
134134
payload_registry().register::<Payload>();
135135
}
136136

137+
/// Per-channel Rx queue strategy, configured via `queue_mode` in the channel config.
138+
/// - `"fifo"` (default): ordered delivery, one sample per cycle.
139+
/// - `"ring"`: creates a buffer and drops the oldest sample when full; set `ring_size` (default 1) for buffer depth.
137140
#[derive(Debug, Clone)]
138-
struct Ros2ChannelConfig<Id: Copy> {
141+
enum RxQueueConfig {
142+
Fifo,
143+
Ring { size: usize },
144+
}
145+
146+
impl RxQueueConfig {
147+
fn from_config(config: Option<&ComponentConfig>) -> CuResult<Self> {
148+
let Some(cfg) = config else {
149+
return Ok(Self::Fifo);
150+
};
151+
match cfg.get::<String>("queue_mode")?.as_deref() {
152+
Some("ring") => Ok(Self::Ring {
153+
size: cfg.get::<u32>("ring_size")?.unwrap_or(1) as usize,
154+
}),
155+
Some("fifo") | None => Ok(Self::Fifo),
156+
Some(other) => Err(CuError::from(format!(
157+
"Ros2Bridge: unknown queue_mode '{other}'"
158+
))),
159+
}
160+
}
161+
}
162+
163+
#[derive(Debug, Clone)]
164+
struct Ros2TxChannelConfig<Id: Copy> {
165+
id: Id,
166+
route: String,
167+
}
168+
169+
#[derive(Debug, Clone)]
170+
struct Ros2RxChannelConfig<Id: Copy> {
139171
id: Id,
140172
route: String,
173+
queue: RxQueueConfig,
141174
}
142175

143-
type Ros2Subscriber =
144-
zenoh::pubsub::Subscriber<zenoh::handlers::FifoChannelHandler<zenoh::sample::Sample>>;
176+
enum Ros2Subscriber {
177+
Fifo(zenoh::pubsub::Subscriber<zenoh::handlers::FifoChannelHandler<zenoh::sample::Sample>>),
178+
Ring(zenoh::pubsub::Subscriber<zenoh::handlers::RingChannelHandler<zenoh::sample::Sample>>),
179+
}
145180

146181
struct Ros2TxChannel<Id: Copy> {
147182
id: Id,
@@ -156,6 +191,7 @@ struct Ros2RxChannel<Id: Copy> {
156191
id: Id,
157192
route: String,
158193
entity_id: u32,
194+
queue: RxQueueConfig,
159195
subscriber: Option<Ros2Subscriber>,
160196
subscriber_token: Option<zenoh::liveliness::LivelinessToken>,
161197
}
@@ -183,9 +219,9 @@ where
183219
namespace: String,
184220
node: String,
185221
#[reflect(ignore)]
186-
tx_channels: Vec<Ros2ChannelConfig<Tx::Id>>,
222+
tx_channels: Vec<Ros2TxChannelConfig<Tx::Id>>,
187223
#[reflect(ignore)]
188-
rx_channels: Vec<Ros2ChannelConfig<Rx::Id>>,
224+
rx_channels: Vec<Ros2RxChannelConfig<Rx::Id>>,
189225
#[reflect(ignore)]
190226
ctx: Option<Ros2Context<Tx::Id, Rx::Id>>,
191227
}
@@ -405,8 +441,20 @@ where
405441
))?;
406442

407443
let keyexpr = topic.pubsub_keyexpr(&node)?;
408-
let subscriber = zenoh::Wait::wait(ctx.session.declare_subscriber(keyexpr))
409-
.map_err(cu_error_map("Ros2Bridge: Failed to declare subscriber"))?;
444+
let subscriber = match ctx.rx_channels[rx_idx].queue {
445+
RxQueueConfig::Fifo => Ros2Subscriber::Fifo(
446+
zenoh::Wait::wait(ctx.session.declare_subscriber(keyexpr))
447+
.map_err(cu_error_map("Ros2Bridge: Failed to declare subscriber"))?,
448+
),
449+
RxQueueConfig::Ring { size } => Ros2Subscriber::Ring(
450+
zenoh::Wait::wait(
451+
ctx.session
452+
.declare_subscriber(keyexpr)
453+
.with(zenoh::handlers::RingChannel::new(size)),
454+
)
455+
.map_err(cu_error_map("Ros2Bridge: Failed to declare subscriber"))?,
456+
),
457+
};
410458

411459
ctx.rx_channels[rx_idx].subscriber_token = Some(subscriber_token);
412460
ctx.rx_channels[rx_idx].subscriber = Some(subscriber);
@@ -451,7 +499,7 @@ where
451499
let mut tx_cfgs = Vec::with_capacity(tx_channels.len());
452500
for channel in tx_channels {
453501
let route = Self::channel_route(channel)?;
454-
tx_cfgs.push(Ros2ChannelConfig {
502+
tx_cfgs.push(Ros2TxChannelConfig {
455503
id: channel.channel.id,
456504
route,
457505
});
@@ -460,9 +508,11 @@ where
460508
let mut rx_cfgs = Vec::with_capacity(rx_channels.len());
461509
for channel in rx_channels {
462510
let route = Self::channel_route(channel)?;
463-
rx_cfgs.push(Ros2ChannelConfig {
511+
let queue = RxQueueConfig::from_config(channel.config.as_ref())?;
512+
rx_cfgs.push(Ros2RxChannelConfig {
464513
id: channel.channel.id,
465514
route,
515+
queue,
466516
});
467517
}
468518

@@ -510,6 +560,7 @@ where
510560
id: channel.id,
511561
route: channel.route.clone(),
512562
entity_id: (index + 1) as u32,
563+
queue: channel.queue.clone(),
513564
subscriber: None,
514565
subscriber_token: None,
515566
})
@@ -596,15 +647,15 @@ where
596647

597648
msg.tov = Tov::Time(ctx.now());
598649

599-
let sample = {
600-
let subscriber = bridge_ctx.rx_channels[rx_idx]
601-
.subscriber
602-
.as_mut()
603-
.ok_or_else(|| CuError::from("Ros2Bridge: Rx subscriber not initialized"))?;
604-
subscriber
605-
.try_recv()
606-
.map_err(|e| CuError::from(format!("Ros2Bridge: receive failed: {e}")))?
607-
};
650+
let subscriber = bridge_ctx.rx_channels[rx_idx]
651+
.subscriber
652+
.as_mut()
653+
.ok_or_else(|| CuError::from("Ros2Bridge: Rx subscriber not initialized"))?;
654+
let sample = match subscriber {
655+
Ros2Subscriber::Fifo(s) => s.try_recv(),
656+
Ros2Subscriber::Ring(s) => s.try_recv(),
657+
}
658+
.map_err(|e| CuError::from(format!("Ros2Bridge: receive failed: {e}")))?;
608659

609660
if let Some(sample) = sample {
610661
let payload = sample.payload().to_bytes();
@@ -633,8 +684,12 @@ where
633684

634685
for channel in rx_channels {
635686
if let Some(subscriber) = channel.subscriber {
636-
zenoh::Wait::wait(subscriber.undeclare())
637-
.map_err(cu_error_map("Ros2Bridge: Failed to undeclare subscriber"))?;
687+
match subscriber {
688+
Ros2Subscriber::Fifo(s) => zenoh::Wait::wait(s.undeclare())
689+
.map_err(cu_error_map("Ros2Bridge: Failed to undeclare subscriber"))?,
690+
Ros2Subscriber::Ring(s) => zenoh::Wait::wait(s.undeclare())
691+
.map_err(cu_error_map("Ros2Bridge: Failed to undeclare subscriber"))?,
692+
}
638693
}
639694
}
640695

components/payloads/cu_ros2_payloads/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ description = "ROS2 Payloads that can be constructed from/to Copper Payloads for
1313
[dependencies]
1414
compact_str = { workspace = true }
1515
serde = { workspace = true }
16+
serde_bytes = { workspace = true }
1617
cu-sensor-payloads = { workspace = true }
1718
cu29 = { workspace = true }
1819

components/payloads/cu_ros2_payloads/src/sensor_msgs.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub struct PointCloud2 {
3737
pub is_bigendian: bool,
3838
pub point_step: u32,
3939
pub row_step: u32,
40+
#[serde(with = "serde_bytes")]
4041
pub data: Vec<u8>,
4142
pub is_dense: bool,
4243
}
@@ -50,6 +51,7 @@ pub struct Image {
5051
pub encoding: String,
5152
pub is_bigendian: u8,
5253
pub step: u32,
54+
#[serde(with = "serde_bytes")]
5355
pub data: Vec<u8>,
5456
}
5557

0 commit comments

Comments
 (0)