From b28ffa56fd571b6e2759ffc56e272d878b84770c Mon Sep 17 00:00:00 2001 From: Quang Le Date: Mon, 25 May 2026 17:03:24 +0700 Subject: [PATCH 1/3] feat(core): implement consensus/instance --- crates/core/src/consensus/instance.rs | 384 ++++++++++++++++++++++++++ crates/core/src/consensus/mod.rs | 3 + 2 files changed, 387 insertions(+) create mode 100644 crates/core/src/consensus/instance.rs diff --git a/crates/core/src/consensus/instance.rs b/crates/core/src/consensus/instance.rs new file mode 100644 index 00000000..a2c55ac9 --- /dev/null +++ b/crates/core/src/consensus/instance.rs @@ -0,0 +1,384 @@ +//! Consensus instance I/O channels. +//! +//! `InstanceIo` owns the bounded channels and one-time lifecycle flags for one +//! consensus instance. +//! +//! # Usage +//! +//! Keep one `InstanceIo` in component state for each active instance. +//! Producer paths enqueue through the crate-visible senders with `try_send`, so +//! a full channel surfaces immediately to the caller. +//! +//! Receiver ownership is explicit. Tokio receivers are single-consumer, so the +//! task that drives an instance must call each needed `take_*_rx` method once at +//! its ownership boundary. A second call returns `Error::ReceiverAlreadyTaken`. +//! +//! The receive buffer accepts `RECV_BUFFER_SIZE` inbound messages before the +//! runner starts. The hash, protobuf value, verify, error, and decided-at +//! channels have capacity one because each represents one input or output slot +//! for this instance. +//! +//! `mark_proposed` and `mark_participated` reject duplicate entrypoints. +//! `maybe_start` performs the one-way transition into running state; only the +//! caller that receives `true` should spawn the runner. + +use std::{ + error::Error as StdError, + sync::{ + Mutex, + atomic::{AtomicBool, Ordering}, + }, +}; + +use prost_types::Any; +use tokio::{sync::mpsc, time::Instant}; + +/// Receive-buffer channel capacity. +pub const RECV_BUFFER_SIZE: usize = 100; + +/// Instance I/O errors. +#[derive(Debug, thiserror::Error, PartialEq, Eq)] +pub enum Error { + /// Participate was already called for this instance. + #[error("already participated")] + AlreadyParticipated, + + /// Propose was already called for this instance. + #[error("already proposed")] + AlreadyProposed, + + /// Receiver ownership was already transferred. + #[error("receiver already taken: {channel}")] + ReceiverAlreadyTaken { + /// Channel name. + channel: &'static str, + }, + + /// Receiver mutex was poisoned. + #[error("receiver state poisoned: {channel}")] + ReceiverStatePoisoned { + /// Channel name. + channel: &'static str, + }, +} + +/// Instance I/O result. +pub type Result = std::result::Result; + +/// Boxed error returned by the runner on unsuccessful completion. +pub type RunnerError = Box; + +type ReceiverSlot = Mutex>>; + +/// Completion result sent by a consensus instance runner. +pub type RunnerResult = std::result::Result<(), RunnerError>; + +/// Async input/output state for a single consensus instance. +/// +/// Sender fields are crate-visible so component code can enqueue directly. +/// Receiver fields stay private because each receiver must move exactly once to +/// the task that owns that stream. +// TODO: Remove once the instance runner wires these senders. +#[allow(dead_code)] +#[derive(Debug)] +pub struct InstanceIo { + participated: AtomicBool, + proposed: AtomicBool, + running: AtomicBool, + + /// Buffers inbound messages that may arrive before the runner starts. + pub(crate) recv_tx: mpsc::Sender, + recv_rx: ReceiverSlot, + + /// Supplies the local proposal hash. + pub(crate) hash_tx: mpsc::Sender<[u8; 32]>, + hash_rx: ReceiverSlot<[u8; 32]>, + + /// Supplies the local proposal value. + /// + /// Uses `Any` as a concrete wire container so instance I/O can store and + /// forward protobuf payloads before consensus-specific code decodes the + /// concrete value type. + pub(crate) value_tx: mpsc::Sender, + value_rx: ReceiverSlot, + + /// Supplies the value used to verify an external proposal. + /// + /// Uses the same `Any` representation as `value_tx`, keeping proposal and + /// verification paths on one payload format. + pub(crate) verify_tx: mpsc::Sender, + verify_rx: ReceiverSlot, + + /// Publishes the runner completion result. + pub(crate) err_tx: mpsc::Sender, + err_rx: ReceiverSlot, + + /// Publishes the decision timestamp. + pub(crate) decided_at_tx: mpsc::Sender, + decided_at_rx: ReceiverSlot, +} + +impl InstanceIo { + /// Creates empty channels and clears all lifecycle flags. + pub fn new() -> Self { + let (recv_tx, recv_rx) = mpsc::channel(RECV_BUFFER_SIZE); + let (hash_tx, hash_rx) = mpsc::channel(1); + let (value_tx, value_rx) = mpsc::channel(1); + let (verify_tx, verify_rx) = mpsc::channel(1); + let (err_tx, err_rx) = mpsc::channel(1); + let (decided_at_tx, decided_at_rx) = mpsc::channel(1); + + Self { + participated: AtomicBool::new(false), + proposed: AtomicBool::new(false), + running: AtomicBool::new(false), + recv_tx, + recv_rx: Mutex::new(Some(recv_rx)), + hash_tx, + hash_rx: Mutex::new(Some(hash_rx)), + value_tx, + value_rx: Mutex::new(Some(value_rx)), + verify_tx, + verify_rx: Mutex::new(Some(verify_rx)), + err_tx, + err_rx: Mutex::new(Some(err_rx)), + decided_at_tx, + decided_at_rx: Mutex::new(Some(decided_at_rx)), + } + } + + /// Marks the participate entrypoint as used. + /// + /// Returns [`Error::AlreadyParticipated`] on duplicate calls. + pub fn mark_participated(&self) -> Result<()> { + self.participated + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .map(|_| ()) + .map_err(|_| Error::AlreadyParticipated) + } + + /// Marks the propose entrypoint as used. + /// + /// Returns [`Error::AlreadyProposed`] on duplicate calls. + pub fn mark_proposed(&self) -> Result<()> { + self.proposed + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .map(|_| ()) + .map_err(|_| Error::AlreadyProposed) + } + + /// Returns `true` if this call owns starting the runner. + /// + /// This is a one-way transition. Completion does not reset the flag. + pub fn maybe_start(&self) -> bool { + self.running + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + } + + /// Transfers receive-buffer ownership to the runner. + pub fn take_recv_rx(&self) -> Result> { + take_receiver(&self.recv_rx, "recv") + } + + /// Transfers local proposal hash ownership to the runner. + pub fn take_hash_rx(&self) -> Result> { + take_receiver(&self.hash_rx, "hash") + } + + /// Transfers local proposal value ownership to the runner. + pub fn take_value_rx(&self) -> Result> { + take_receiver(&self.value_rx, "value") + } + + /// Transfers external proposal verification ownership to the runner. + pub fn take_verify_rx(&self) -> Result> { + take_receiver(&self.verify_rx, "verify") + } + + /// Transfers runner result ownership to the waiting task. + pub fn take_err_rx(&self) -> Result> { + take_receiver(&self.err_rx, "err") + } + + /// Transfers decision timestamp ownership to the waiting task. + pub fn take_decided_at_rx(&self) -> Result> { + take_receiver(&self.decided_at_rx, "decided_at") + } +} + +impl Default for InstanceIo { + fn default() -> Self { + Self::new() + } +} + +fn take_receiver( + receiver: &Mutex>>, + channel: &'static str, +) -> Result> { + receiver + .lock() + .map_err(|_| Error::ReceiverStatePoisoned { channel })? + .take() + .ok_or(Error::ReceiverAlreadyTaken { channel }) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use tokio::sync::mpsc::error::TrySendError; + + use super::*; + + #[derive(Debug, thiserror::Error)] + #[error("test error")] + struct TestError; + + type TestIo = InstanceIo; + + #[test] + fn mark_participated() { + let io = TestIo::<()>::new(); + + assert_eq!(Ok(()), io.mark_participated()); + assert_eq!(Err(Error::AlreadyParticipated), io.mark_participated()); + } + + #[test] + fn mark_proposed() { + let io = TestIo::<()>::new(); + + assert_eq!(Ok(()), io.mark_proposed()); + assert_eq!(Err(Error::AlreadyProposed), io.mark_proposed()); + } + + #[test] + fn maybe_start() { + let io = TestIo::<()>::new(); + + assert!(io.maybe_start()); + assert!(!io.maybe_start()); + assert!(!io.maybe_start()); + } + + #[test] + fn recv_buffer_capacity_is_100() { + let io = TestIo::::new(); + + for msg in 0..RECV_BUFFER_SIZE { + assert!(io.recv_tx.try_send(msg).is_ok()); + } + + assert!(matches!( + io.recv_tx.try_send(RECV_BUFFER_SIZE), + Err(TrySendError::Full(RECV_BUFFER_SIZE)) + )); + } + + #[test] + fn single_item_channels_have_capacity_1() { + let io = TestIo::<()>::new(); + + assert!(io.hash_tx.try_send([0; 32]).is_ok()); + match io.hash_tx.try_send([1; 32]) { + Err(TrySendError::Full(value)) => assert_eq!([1; 32], value), + result => panic!("unexpected hash send result: {result:?}"), + } + + assert!(io.value_tx.try_send(proto_value()).is_ok()); + assert!(matches!( + io.value_tx.try_send(proto_value()), + Err(TrySendError::Full(_)) + )); + + assert!(io.verify_tx.try_send(proto_value()).is_ok()); + assert!(matches!( + io.verify_tx.try_send(proto_value()), + Err(TrySendError::Full(_)) + )); + + assert!(io.err_tx.try_send(Ok(())).is_ok()); + assert!(matches!( + io.err_tx.try_send(Err(Box::new(TestError))), + Err(TrySendError::Full(Err(_))) + )); + + let decided_at = Instant::now(); + assert!(io.decided_at_tx.try_send(decided_at).is_ok()); + assert!(matches!( + io.decided_at_tx.try_send(decided_at), + Err(TrySendError::Full(_)) + )); + } + + #[test] + fn recv_buffering_before_start_does_not_start_instance() { + let io = TestIo::::new(); + + assert!(io.recv_tx.try_send(1).is_ok()); + assert!(io.maybe_start()); + assert!(!io.maybe_start()); + } + + #[test] + fn receiver_ownership_can_only_be_taken_once() { + let io = TestIo::<()>::new(); + + assert!(io.take_recv_rx().is_ok()); + assert_receiver_already_taken(io.take_recv_rx(), "recv"); + + assert!(io.take_hash_rx().is_ok()); + assert_receiver_already_taken(io.take_hash_rx(), "hash"); + + assert!(io.take_value_rx().is_ok()); + assert_receiver_already_taken(io.take_value_rx(), "value"); + + assert!(io.take_verify_rx().is_ok()); + assert_receiver_already_taken(io.take_verify_rx(), "verify"); + + assert!(io.take_err_rx().is_ok()); + assert_receiver_already_taken(io.take_err_rx(), "err"); + + assert!(io.take_decided_at_rx().is_ok()); + assert_receiver_already_taken(io.take_decided_at_rx(), "decided_at"); + } + + #[test] + fn concurrent_maybe_start_returns_true_once() { + let io = Arc::new(TestIo::<()>::new()); + let mut handles = Vec::new(); + + for _ in 0..32 { + let io = Arc::clone(&io); + handles.push(std::thread::spawn(move || io.maybe_start())); + } + + let started = handles + .into_iter() + .map(|handle| match handle.join() { + Ok(started) => started, + Err(_) => panic!("maybe_start thread panicked"), + }) + .filter(|started| *started) + .count(); + + assert_eq!(1, started); + assert!(!io.maybe_start()); + } + + fn assert_receiver_already_taken( + result: std::result::Result, Error>, + channel: &'static str, + ) { + assert!(matches!( + result, + Err(Error::ReceiverAlreadyTaken { channel: actual }) if actual == channel + )); + } + + fn proto_value() -> Any { + Any::default() + } +} diff --git a/crates/core/src/consensus/mod.rs b/crates/core/src/consensus/mod.rs index c2712567..35e33056 100644 --- a/crates/core/src/consensus/mod.rs +++ b/crates/core/src/consensus/mod.rs @@ -7,5 +7,8 @@ /// Consensus protocols. pub mod protocols; +/// Consensus instance I/O channels. +pub mod instance; + /// Consensus round timers. pub mod timer; From 5e39abc3d7d31d31ba2fac62fda452294767d32b Mon Sep 17 00:00:00 2001 From: Quang Le Date: Mon, 25 May 2026 17:04:23 +0700 Subject: [PATCH 2/3] fix: lint --- crates/core/src/consensus/instance.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/core/src/consensus/instance.rs b/crates/core/src/consensus/instance.rs index a2c55ac9..15c8a6dd 100644 --- a/crates/core/src/consensus/instance.rs +++ b/crates/core/src/consensus/instance.rs @@ -10,8 +10,9 @@ //! a full channel surfaces immediately to the caller. //! //! Receiver ownership is explicit. Tokio receivers are single-consumer, so the -//! task that drives an instance must call each needed `take_*_rx` method once at -//! its ownership boundary. A second call returns `Error::ReceiverAlreadyTaken`. +//! task that drives an instance must call each needed `take_*_rx` method once +//! at its ownership boundary. A second call returns +//! `Error::ReceiverAlreadyTaken`. //! //! The receive buffer accepts `RECV_BUFFER_SIZE` inbound messages before the //! runner starts. The hash, protobuf value, verify, error, and decided-at From fe581ae0dace20bbf632135e9a44d9a623dda4bb Mon Sep 17 00:00:00 2001 From: Quang Le Date: Mon, 25 May 2026 17:30:43 +0700 Subject: [PATCH 3/3] fix: address comments --- crates/core/src/consensus/instance.rs | 42 +++++++++++---------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/crates/core/src/consensus/instance.rs b/crates/core/src/consensus/instance.rs index 15c8a6dd..6c26546f 100644 --- a/crates/core/src/consensus/instance.rs +++ b/crates/core/src/consensus/instance.rs @@ -26,7 +26,7 @@ use std::{ error::Error as StdError, sync::{ - Mutex, + Mutex, PoisonError, atomic::{AtomicBool, Ordering}, }, }; @@ -54,13 +54,6 @@ pub enum Error { /// Channel name. channel: &'static str, }, - - /// Receiver mutex was poisoned. - #[error("receiver state poisoned: {channel}")] - ReceiverStatePoisoned { - /// Channel name. - channel: &'static str, - }, } /// Instance I/O result. @@ -83,6 +76,8 @@ pub type RunnerResult = std::result::Result<(), RunnerError>; #[allow(dead_code)] #[derive(Debug)] pub struct InstanceIo { + // Lifecycle flags are duplicate/start guards only. They do not publish or + // synchronize channel payloads or runner state. participated: AtomicBool, proposed: AtomicBool, running: AtomicBool, @@ -97,16 +92,15 @@ pub struct InstanceIo { /// Supplies the local proposal value. /// - /// Uses `Any` as a concrete wire container so instance I/O can store and - /// forward protobuf payloads before consensus-specific code decodes the - /// concrete value type. + /// `Any` is only the wire container at this boundary. Runner wiring owns + /// the codec and type-url convention before decoding the concrete value. pub(crate) value_tx: mpsc::Sender, value_rx: ReceiverSlot, /// Supplies the value used to verify an external proposal. /// - /// Uses the same `Any` representation as `value_tx`, keeping proposal and - /// verification paths on one payload format. + /// Uses the same `Any` wire-container convention as `value_tx`, keeping + /// proposal and verification paths on one payload format. pub(crate) verify_tx: mpsc::Sender, verify_rx: ReceiverSlot, @@ -220,7 +214,7 @@ fn take_receiver( ) -> Result> { receiver .lock() - .map_err(|_| Error::ReceiverStatePoisoned { channel })? + .unwrap_or_else(PoisonError::into_inner) .take() .ok_or(Error::ReceiverAlreadyTaken { channel }) } @@ -237,11 +231,9 @@ mod tests { #[error("test error")] struct TestError; - type TestIo = InstanceIo; - #[test] fn mark_participated() { - let io = TestIo::<()>::new(); + let io = InstanceIo::<()>::new(); assert_eq!(Ok(()), io.mark_participated()); assert_eq!(Err(Error::AlreadyParticipated), io.mark_participated()); @@ -249,7 +241,7 @@ mod tests { #[test] fn mark_proposed() { - let io = TestIo::<()>::new(); + let io = InstanceIo::<()>::new(); assert_eq!(Ok(()), io.mark_proposed()); assert_eq!(Err(Error::AlreadyProposed), io.mark_proposed()); @@ -257,7 +249,7 @@ mod tests { #[test] fn maybe_start() { - let io = TestIo::<()>::new(); + let io = InstanceIo::<()>::new(); assert!(io.maybe_start()); assert!(!io.maybe_start()); @@ -266,7 +258,7 @@ mod tests { #[test] fn recv_buffer_capacity_is_100() { - let io = TestIo::::new(); + let io = InstanceIo::::new(); for msg in 0..RECV_BUFFER_SIZE { assert!(io.recv_tx.try_send(msg).is_ok()); @@ -280,7 +272,7 @@ mod tests { #[test] fn single_item_channels_have_capacity_1() { - let io = TestIo::<()>::new(); + let io = InstanceIo::<()>::new(); assert!(io.hash_tx.try_send([0; 32]).is_ok()); match io.hash_tx.try_send([1; 32]) { @@ -315,8 +307,8 @@ mod tests { } #[test] - fn recv_buffering_before_start_does_not_start_instance() { - let io = TestIo::::new(); + fn recv_tx_send_does_not_consume_start_token() { + let io = InstanceIo::::new(); assert!(io.recv_tx.try_send(1).is_ok()); assert!(io.maybe_start()); @@ -325,7 +317,7 @@ mod tests { #[test] fn receiver_ownership_can_only_be_taken_once() { - let io = TestIo::<()>::new(); + let io = InstanceIo::<()>::new(); assert!(io.take_recv_rx().is_ok()); assert_receiver_already_taken(io.take_recv_rx(), "recv"); @@ -348,7 +340,7 @@ mod tests { #[test] fn concurrent_maybe_start_returns_true_once() { - let io = Arc::new(TestIo::<()>::new()); + let io = Arc::new(InstanceIo::<()>::new()); let mut handles = Vec::new(); for _ in 0..32 {