From dd9a340c545735ddd0c88522c735520c576ea883 Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Thu, 18 Jun 2026 19:03:11 -0300 Subject: [PATCH 01/16] feat(eth2api): add topic-preserving SSE event_stream The generated eventstream method returns EventStream, which discards the SSE event topic and fails to deserialize the beacon node's JSON-object payloads. Add a hand-written event_stream(topics) on EthBeaconNodeApiClient that yields BeaconNodeEvent { topic, data } with the raw payload preserved, so callers can dispatch by topic. --- Cargo.lock | 4 ++ Cargo.toml | 1 + crates/eth2api/Cargo.toml | 4 +- crates/eth2api/src/extensions.rs | 114 ++++++++++++++++++++++++++++++- 4 files changed, 120 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c0e2ec20..185b2efd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5471,6 +5471,7 @@ dependencies = [ "bon", "chrono", "flate2", + "futures", "hex", "k256", "pluto-build-proto", @@ -5495,6 +5496,7 @@ dependencies = [ "tokio-util", "tracing", "url", + "vise", "wiremock", ] @@ -5745,6 +5747,8 @@ dependencies = [ "chrono", "ethereum_ssz", "ethereum_ssz_derive", + "eventsource-stream", + "futures", "hex", "http", "oas3-gen-support", diff --git a/Cargo.toml b/Cargo.toml index 3093cf40..310f6d53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ crossbeam = "0.8.4" dyn-clone = "1.0" dyn-eq = "0.1.3" either = "1.13" +eventsource-stream = "0.2" futures = "0.3" futures-timer = "3.0" backon = "1.6.0" diff --git a/crates/eth2api/Cargo.toml b/crates/eth2api/Cargo.toml index fbc5c298..42d12519 100644 --- a/crates/eth2api/Cargo.toml +++ b/crates/eth2api/Cargo.toml @@ -15,10 +15,12 @@ ignored = ["bon", "http", "oas3-gen-support", "regex", "reqwest", "validator"] anyhow.workspace = true async-trait.workspace = true bon.workspace = true +eventsource-stream.workspace = true +futures.workspace = true http.workspace = true oas3-gen-support.workspace = true regex.workspace = true -reqwest.workspace = true +reqwest = { workspace = true, features = ["stream"] } serde_json.workspace = true serde_with.workspace = true serde.workspace = true diff --git a/crates/eth2api/src/extensions.rs b/crates/eth2api/src/extensions.rs index fc08e939..de1e511b 100644 --- a/crates/eth2api/src/extensions.rs +++ b/crates/eth2api/src/extensions.rs @@ -1,8 +1,11 @@ use crate::{ - ConsensusVersion, EthBeaconNodeApiClient, GetGenesisRequest, GetGenesisResponse, - GetGenesisResponseResponseData, GetSpecRequest, GetSpecResponse, ValidatorStatus, spec::phase0, + ConsensusVersion, EthBeaconNodeApiClient, EventstreamRequestQueryTopic, GetGenesisRequest, + GetGenesisResponse, GetGenesisResponseResponseData, GetSpecRequest, GetSpecResponse, + ValidatorStatus, spec::phase0, }; use chrono::{DateTime, Utc}; +use eventsource_stream::Eventsource; +use futures::{Stream, StreamExt}; use std::{collections::HashMap, time}; use tree_hash::TreeHash; @@ -34,6 +37,21 @@ pub enum EthBeaconNodeApiClientError { /// Domain type not found in the beacon spec response #[error("Domain type not found: {0}")] DomainTypeNotFound(String), + + /// Error while opening or reading the beacon node SSE event stream. + #[error("Event stream error: {0}")] + EventStream(String), +} + +/// A single Server-Sent Event from a beacon node: the event topic (the SSE +/// `event:` field, e.g. `head` or `chain_reorg`) and its raw, unparsed JSON +/// `data` payload. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BeaconNodeEvent { + /// The SSE event topic. + pub topic: String, + /// The raw JSON data payload. + pub data: String, } const FORKS: [ConsensusVersion; 6] = [ @@ -360,6 +378,55 @@ impl EthBeaconNodeApiClient { epoch, )) } + + /// Subscribes to the beacon node SSE stream (`GET /eth/v1/events`) for the + /// given topics. + /// + /// Unlike the generated [`Self::eventstream`], the returned stream + /// preserves each event's topic and yields its raw JSON `data` + /// unparsed, so callers can dispatch on the topic and deserialize the + /// payload themselves. + pub async fn event_stream( + &self, + topics: &[EventstreamRequestQueryTopic], + ) -> Result< + impl Stream> + Send, + EthBeaconNodeApiClientError, + > { + let mut url = self.base_url.clone(); + url.path_segments_mut() + .map_err(|()| EthBeaconNodeApiClientError::EventStream("URL cannot be a base".into()))? + .push("eth") + .push("v1") + .push("events"); + + // Topics are sent as repeated `topics=` query pairs. + let query: Vec<(&str, String)> = topics + .iter() + .map(|topic| ("topics", topic.to_string())) + .collect(); + + let response = self + .client + .get(url) + .query(&query) + .header(reqwest::header::ACCEPT, "text/event-stream") + .send() + .await + .map_err(|err| EthBeaconNodeApiClientError::EventStream(err.to_string()))? + .error_for_status() + .map_err(|err| EthBeaconNodeApiClientError::EventStream(err.to_string()))?; + + let stream = response.bytes_stream().eventsource().map(|item| { + item.map(|event| BeaconNodeEvent { + topic: event.event, + data: event.data, + }) + .map_err(|err| EthBeaconNodeApiClientError::EventStream(err.to_string())) + }); + + Ok(stream) + } } #[cfg(test)] @@ -452,4 +519,47 @@ mod tests { ) ); } + + #[tokio::test] + async fn event_stream_preserves_topic_and_raw_data() { + use crate::EventstreamRequestQueryTopic; + use futures::StreamExt; + use wiremock::{ + Mock, MockServer, ResponseTemplate, + matchers::{method, path}, + }; + + let server = MockServer::start().await; + + let body = "event: head\ndata: {\"slot\":\"10\"}\n\n\ + event: chain_reorg\ndata: {\"slot\":\"20\",\"depth\":\"2\"}\n\n"; + + Mock::given(method("GET")) + .and(path("/eth/v1/events")) + .respond_with(ResponseTemplate::new(200).set_body_raw(body, "text/event-stream")) + .mount(&server) + .await; + + let client = EthBeaconNodeApiClient::with_base_url(server.uri()).expect("valid url"); + let stream = client + .event_stream(&[ + EventstreamRequestQueryTopic::Head, + EventstreamRequestQueryTopic::ChainReorg, + ]) + .await + .expect("open stream"); + futures::pin_mut!(stream); + + let first = stream.next().await.expect("first event").expect("ok event"); + assert_eq!(first.topic, "head"); + assert_eq!(first.data, r#"{"slot":"10"}"#); + + let second = stream + .next() + .await + .expect("second event") + .expect("ok event"); + assert_eq!(second.topic, "chain_reorg"); + assert_eq!(second.data, r#"{"slot":"20","depth":"2"}"#); + } } From 6612bb1bc7cb22fcd4a38555e4453660194168b8 Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Thu, 18 Jun 2026 19:11:18 -0300 Subject: [PATCH 02/16] feat(app): implement beacon node SSE listener Add an actor-model SSE listener under crates/app/src/sse that subscribes to a beacon node's /eth/v1/events stream (head, chain_reorg, block, block_gossip), exports timing metrics, and notifies subscribers of chain reorgs. A reconnecting pump task forwards beacon events into the actor over an mpsc channel; the actor owns all state and processes events single- threaded. SseListenerBuilder::subscribe_chain_reorg returns an mpsc::Receiver that plugs directly into the scheduler's with_chain_reorgs, and the cloneable SseListenerHandle allows dynamic subscription at runtime. Both builder and listener live until a CancellationToken fires. --- crates/app/Cargo.toml | 2 + crates/app/src/lib.rs | 3 + crates/app/src/sse/metrics.rs | 45 ++ crates/app/src/sse/mod.rs | 746 ++++++++++++++++++++++++++++++++++ crates/app/src/sse/types.rs | 81 ++++ 5 files changed, 877 insertions(+) create mode 100644 crates/app/src/sse/metrics.rs create mode 100644 crates/app/src/sse/mod.rs create mode 100644 crates/app/src/sse/types.rs diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index 77f14edf..7b46eeb0 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -9,10 +9,12 @@ publish.workspace = true [dependencies] backon.workspace = true chrono.workspace = true +futures.workspace = true pluto-core.workspace = true pluto-eth2api.workspace = true tokio.workspace = true tokio-util.workspace = true +vise.workspace = true prost.workspace = true prost-types.workspace = true regex.workspace = true diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index 87d7061e..28529991 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -22,5 +22,8 @@ pub mod eth2wrap; /// Private key locking service. pub mod privkeylock; +/// Listen for SSE from Beacon Node +pub mod sse; + /// Utility helpers for archiving, extracting, and comparing files/directories. pub mod utils; diff --git a/crates/app/src/sse/metrics.rs b/crates/app/src/sse/metrics.rs new file mode 100644 index 00000000..0cab6a29 --- /dev/null +++ b/crates/app/src/sse/metrics.rs @@ -0,0 +1,45 @@ +//! Prometheus metrics for the beacon node SSE listener. +//! +//! The `app_beacon_node` prefix and bucket boundaries reproduce Charon's +//! `app/beacon_node` SSE metrics. All metrics are labelled by beacon node +//! address. + +use vise::{Gauge, Global, Histogram, LabeledFamily, Metrics}; + +/// Head delay buckets in seconds. +const HEAD_DELAY_BUCKETS: [f64; 6] = [2.0, 4.0, 6.0, 8.0, 10.0, 12.0]; +/// Chain reorg depth buckets in slots. +const REORG_DEPTH_BUCKETS: [f64; 6] = [1.0, 2.0, 4.0, 6.0, 8.0, 16.0]; +/// Block reception delay buckets in seconds. +const BLOCK_BUCKETS: [f64; 14] = [ + 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 6.0, 8.0, 10.0, 12.0, +]; + +/// Metrics for the beacon node SSE listener. +#[derive(Debug, Metrics)] +#[metrics(prefix = "app_beacon_node")] +pub struct SseMetrics { + /// Current beacon node head slot, supplied by the SSE endpoint. + #[metrics(labels = ["addr"])] + pub sse_head_slot: LabeledFamily>, + + /// Delay in seconds between slot start and head update. + #[metrics(buckets = &HEAD_DELAY_BUCKETS, labels = ["addr"])] + pub sse_head_delay: LabeledFamily, + + /// Chain reorg depth in slots. + #[metrics(buckets = &REORG_DEPTH_BUCKETS, labels = ["addr"])] + pub sse_chain_reorg_depth: LabeledFamily, + + /// Block reception via gossip delay in seconds. + #[metrics(buckets = &BLOCK_BUCKETS, labels = ["addr"])] + pub sse_block_gossip: LabeledFamily, + + /// Block imported into fork choice delay in seconds. + #[metrics(buckets = &BLOCK_BUCKETS, labels = ["addr"])] + pub sse_block: LabeledFamily, +} + +/// Global metrics for the beacon node SSE listener. +#[vise::register] +pub static SSE_METRICS: Global = Global::new(); diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs new file mode 100644 index 00000000..cbe66339 --- /dev/null +++ b/crates/app/src/sse/mod.rs @@ -0,0 +1,746 @@ +//! Beacon node Server-Sent-Events (SSE) listener. +//! +//! Subscribes to a beacon node's `/eth/v1/events` stream and processes `head`, +//! `chain_reorg`, `block` and `block_gossip` events to export timing metrics +//! and notify subscribers of chain reorgs. +//! +//! The listener follows the actor model: a [`SseListenerBuilder`] wires up +//! subscriptions, [`SseListenerBuilder::build`] spawns a background actor (and +//! a reconnecting stream "pump") that live until a [`CancellationToken`] fires, +//! and the returned [`SseListenerHandle`] allows interacting with the running +//! actor. +//! +//! The actor is single-threaded: it owns all of its state and processes one +//! event at a time, so no locking is required. The event source is decoupled +//! from the actor via an mpsc channel — in production a pump task forwards +//! beacon node events into it; tests push [`SseEvent`]s directly. + +use std::time::Duration; + +use backon::{BackoffBuilder, ExponentialBuilder, Retryable}; +use chrono::{DateTime, Utc}; +use futures::StreamExt; +use tokio::sync; +use tokio_util::{future::FutureExt, sync::CancellationToken}; + +use pluto_eth2api::{BeaconNodeEvent, EthBeaconNodeApiClient, EventstreamRequestQueryTopic}; + +use crate::sse::{ + metrics::SSE_METRICS, + types::{ + BLOCK_EVENT, BLOCK_GOSSIP_EVENT, BlockEventData, BlockGossipEventData, CHAIN_REORG_EVENT, + ChainReorgEventData, HEAD_EVENT, HeadEventData, SseEvent, + }, +}; + +pub mod metrics; +pub mod types; + +/// Default buffer size for the channels used by the listener. +const CHANNEL_BUFFER_SIZE: usize = 100; + +/// Base delay between SSE reconnection attempts. +const DEFAULT_RETRY: Duration = Duration::from_secs(1); + +/// Topics the listener subscribes to. +const TOPICS: [EventstreamRequestQueryTopic; 4] = [ + EventstreamRequestQueryTopic::Head, + EventstreamRequestQueryTopic::ChainReorg, + EventstreamRequestQueryTopic::BlockGossip, + EventstreamRequestQueryTopic::Block, +]; + +/// Errors that can occur while setting up or running the SSE listener. +#[derive(Debug, thiserror::Error)] +pub enum SseListenerError { + /// Beacon Node API client error. + #[error("Error while fetching data from the Eth2 API: {0}")] + EthBeaconNodeApiClientError(#[from] pluto_eth2api::EthBeaconNodeApiClientError), + + /// The underlying SSE listener actor has been terminated. + #[error("SSE listener actor has been terminated")] + Terminated, +} + +type Result = std::result::Result; + +/// A builder for the SSE listener. +/// +/// Allows setting up chain reorg subscriptions before the listener is started. +/// The listener is started by calling [`SseListenerBuilder::build`]. +pub struct SseListenerBuilder { + reorg_subs: Vec>, +} + +impl SseListenerBuilder { + /// Constructs a new [`SseListenerBuilder`] with no subscriptions. + pub fn new() -> Self { + SseListenerBuilder { + reorg_subs: Vec::new(), + } + } + + /// Subscribes to chain reorg events, returning the receiving end of a + /// channel that yields the (deduplicated) reorg epochs. + /// + /// The returned receiver can be passed directly to consumers such as the + /// scheduler's `with_chain_reorgs`. + pub fn subscribe_chain_reorg(&mut self) -> sync::mpsc::Receiver { + let (tx, rx) = sync::mpsc::channel(CHANNEL_BUFFER_SIZE); + self.reorg_subs.push(tx); + rx + } + + /// Starts the SSE listener in the background. + /// + /// Blocks until the beacon node's genesis time and slot configuration have + /// been fetched (retrying on failure), then spawns the actor and a + /// reconnecting stream pump that run until `ct` is cancelled. + pub async fn build( + self, + client: EthBeaconNodeApiClient, + ct: CancellationToken, + ) -> Result { + let (genesis_time, slot_duration, slots_per_epoch) = fetch_config(&client) + .with_cancellation_token(&ct) + .await + .ok_or(SseListenerError::Terminated)??; + + let addr = client.base_url.to_string(); + + let actor = SseListenerActor { + addr: addr.clone(), + genesis_time, + slot_duration, + slots_per_epoch, + last_reorg_epoch: 0, + reorg_subs: self.reorg_subs, + }; + + let (events_tx, events_rx) = sync::mpsc::channel(CHANNEL_BUFFER_SIZE); + let (msg_tx, msg_rx) = sync::mpsc::channel(CHANNEL_BUFFER_SIZE); + + tokio::spawn(run_pump(client, addr, events_tx, ct.clone())); + tokio::spawn(actor.run(events_rx, msg_rx, ct)); + + Ok(SseListenerHandle { sender: msg_tx }) + } +} + +impl Default for SseListenerBuilder { + fn default() -> Self { + Self::new() + } +} + +/// Messages sent to the [`SseListenerActor`]. +enum SseListenerMessage { + /// Subscribe to chain reorg events at runtime; the actor replies with the + /// receiving end of a freshly created channel. + SubscribeChainReorg { + resp: sync::oneshot::Sender>, + }, +} + +/// A handle to interact with the SSE listener actor. +/// +/// Cloning the handle is cheap and allows sending messages to the actor from +/// multiple tasks. +#[derive(Clone)] +pub struct SseListenerHandle { + sender: sync::mpsc::Sender, +} + +impl SseListenerHandle { + /// Subscribes to chain reorg events at runtime, returning the receiving end + /// of a channel that yields the (deduplicated) reorg epochs. + pub async fn subscribe_chain_reorg(&self) -> Result> { + let (tx, rx) = sync::oneshot::channel(); + self.sender + .send(SseListenerMessage::SubscribeChainReorg { resp: tx }) + .await + .map_err(|_| SseListenerError::Terminated)?; + + rx.await.map_err(|_| SseListenerError::Terminated) + } +} + +struct SseListenerActor { + addr: String, + + // Immutable network configuration. + genesis_time: DateTime, + slot_duration: Duration, + slots_per_epoch: u64, + + last_reorg_epoch: u64, + reorg_subs: Vec>, +} + +impl SseListenerActor { + async fn run( + mut self, + mut events_rx: sync::mpsc::Receiver, + mut msg_rx: sync::mpsc::Receiver, + ct: CancellationToken, + ) { + loop { + tokio::select! { + biased; + + _ = ct.cancelled() => break, + + Some(msg) = msg_rx.recv() => match msg { + SseListenerMessage::SubscribeChainReorg { resp } => { + let (tx, rx) = sync::mpsc::channel(CHANNEL_BUFFER_SIZE); + self.reorg_subs.push(tx); + let _ = resp.send(rx); + } + }, + + Some(event) = events_rx.recv() => { + self.handle_event(&event); + }, + } + } + } + + fn handle_event(&mut self, event: &SseEvent) { + match event.topic.as_str() { + HEAD_EVENT => self.handle_head(event), + CHAIN_REORG_EVENT => self.handle_chain_reorg(event), + BLOCK_GOSSIP_EVENT => self.handle_block_gossip(event), + BLOCK_EVENT => self.handle_block(event), + _ => {} + } + } + + fn handle_head(&self, event: &SseEvent) { + let Some(head) = parse_payload::(HEAD_EVENT, &event.data, &self.addr) else { + return; + }; + + let Some(slot) = parse_u64("slot", &head.slot, &self.addr) else { + return; + }; + + if i64::try_from(slot).is_err() { + tracing::warn!(addr = %self.addr, slot, "Head slot value exceeds i64 range"); + return; + } + + // The chain's head is updated once a majority of the chain votes for a + // block, which realistically happens between 2/3 and 3/3 of the slot. + let (delay, ok) = self.compute_delay(slot, event.timestamp, |delay| { + delay < to_chrono(self.slot_duration) + }); + let delay_s = delay_secs(delay); + + if ok { + SSE_METRICS.sse_head_delay[&self.addr].observe(delay_s); + } else { + tracing::debug!(addr = %self.addr, slot, delay_s, "Beacon node received head event too late"); + } + + SSE_METRICS.sse_head_slot[&self.addr].set(slot); + + tracing::debug!( + addr = %self.addr, + slot, + delay_s, + block = %head.block, + prev_ddr = %head.previous_duty_dependent_root, + curr_ddr = %head.current_duty_dependent_root, + "SSE head event" + ); + } + + fn handle_chain_reorg(&mut self, event: &SseEvent) { + let Some(reorg) = + parse_payload::(CHAIN_REORG_EVENT, &event.data, &self.addr) + else { + return; + }; + + let Some(slot) = parse_u64("slot", &reorg.slot, &self.addr) else { + return; + }; + let Some(depth) = parse_u64("depth", &reorg.depth, &self.addr) else { + return; + }; + + if slot < depth { + tracing::warn!(addr = %self.addr, slot, depth, "Invalid chain reorg event: depth exceeds slot"); + return; + } + + // `slot >= depth` is guaranteed above and `slots_per_epoch` is non-zero + // (validated by `fetch_slots_config`). + let reorg_epoch = slot + .checked_sub(depth) + .expect("slot >= depth") + .checked_div(self.slots_per_epoch) + .expect("non-zero slots per epoch"); + self.notify_chain_reorg(reorg_epoch); + + tracing::debug!( + addr = %self.addr, + slot, + epoch = %reorg.epoch, + reorg_epoch, + depth, + old_head_block = %reorg.old_head_block, + new_head_block = %reorg.new_head_block, + "SSE chain reorg event" + ); + + // Reorg depths fit comfortably in a `u32`; `f64::from` is lossless. + let depth_f64 = f64::from(u32::try_from(depth).unwrap_or(u32::MAX)); + SSE_METRICS.sse_chain_reorg_depth[&self.addr].observe(depth_f64); + } + + fn handle_block_gossip(&self, event: &SseEvent) { + let Some(gossip) = + parse_payload::(BLOCK_GOSSIP_EVENT, &event.data, &self.addr) + else { + return; + }; + + let Some(slot) = parse_u64("slot", &gossip.slot, &self.addr) else { + return; + }; + + // A block should be received via gossip between 0/3 and 1/3 of the slot. + let third = self.slot_duration.checked_div(3).expect("non-zero divisor"); + let (delay, ok) = + self.compute_delay(slot, event.timestamp, |delay| delay < to_chrono(third)); + let delay_s = delay_secs(delay); + + if !ok { + tracing::debug!(addr = %self.addr, slot, delay_s, "Beacon node received block_gossip event too late"); + } + + tracing::debug!(addr = %self.addr, slot, delay_s, block = %gossip.block, "SSE block gossip event"); + + SSE_METRICS.sse_block_gossip[&self.addr].observe(delay_s); + } + + fn handle_block(&self, event: &SseEvent) { + let Some(block) = parse_payload::(BLOCK_EVENT, &event.data, &self.addr) + else { + return; + }; + + let Some(slot) = parse_u64("slot", &block.slot, &self.addr) else { + return; + }; + + // A block should be imported into fork choice between 0/3 and 1/3 of the + // slot. + let third = self.slot_duration.checked_div(3).expect("non-zero divisor"); + let (delay, ok) = + self.compute_delay(slot, event.timestamp, |delay| delay < to_chrono(third)); + let delay_s = delay_secs(delay); + + if !ok { + tracing::debug!(addr = %self.addr, slot, delay_s, "Beacon node received block event too late"); + } + + tracing::debug!(addr = %self.addr, slot, delay_s, block = %block.block, "SSE block event"); + + SSE_METRICS.sse_block[&self.addr].observe(delay_s); + } + + /// Notifies subscribers of a chain reorg, deduplicating consecutive events + /// for the same epoch. Subscribers whose receiver has been dropped are + /// pruned. + fn notify_chain_reorg(&mut self, epoch: u64) { + if epoch == self.last_reorg_epoch { + return; + } + self.last_reorg_epoch = epoch; + + let addr = &self.addr; + self.reorg_subs.retain(|tx| match tx.try_send(epoch) { + Ok(()) => true, + Err(sync::mpsc::error::TrySendError::Full(_)) => { + tracing::warn!(addr = %addr, epoch, "Chain reorg subscriber lagging, dropping event"); + true + } + Err(sync::mpsc::error::TrySendError::Closed(_)) => false, + }); + } + + /// Computes the delay between the start of `slot` and the event timestamp, + /// reporting whether it falls within the expected window. + fn compute_delay( + &self, + slot: u64, + event_ts: DateTime, + delay_ok: impl Fn(chrono::Duration) -> bool, + ) -> (chrono::Duration, bool) { + // Slot times are small in practice (slot duration is a few whole + // seconds), so saturate on the unreachable overflow. + let slot = i64::try_from(slot).unwrap_or(i64::MAX); + let ms_per_slot = i64::try_from(self.slot_duration.as_millis()).unwrap_or(i64::MAX); + let offset = chrono::Duration::milliseconds(slot.saturating_mul(ms_per_slot)); + let slot_start = self + .genesis_time + .checked_add_signed(offset) + .unwrap_or(event_ts); + let delay = event_ts.signed_duration_since(slot_start); + + (delay, delay_ok(delay)) + } +} + +/// Fetches the genesis time, slot duration and slots per epoch, retrying on +/// failure. +async fn fetch_config(client: &EthBeaconNodeApiClient) -> Result<(DateTime, Duration, u64)> { + let genesis_time = (|| client.fetch_genesis_time()) + .retry(fast_backoff()) + .notify(|err, _| tracing::error!(err = ?err, "Failure fetching genesis time")) + .await?; + + let (slot_duration, slots_per_epoch) = (|| client.fetch_slots_config()) + .retry(fast_backoff()) + .notify(|err, _| tracing::error!(err = ?err, "Failure fetching slots config")) + .await?; + + Ok((genesis_time, slot_duration, slots_per_epoch)) +} + +/// Outcome of a single SSE stream connection. +enum StreamOutcome { + /// The stream ended cleanly (server closed the connection). + Ended, + /// A connection or read error occurred; the caller should back off. + Error, + /// The actor's event channel was closed; the pump should stop. + ChannelClosed, + /// The cancellation token fired. + Cancelled, +} + +/// Connects to the beacon node SSE stream and reconnects with exponential +/// backoff until the cancellation token fires or the actor goes away. +async fn run_pump( + client: EthBeaconNodeApiClient, + addr: String, + events_tx: sync::mpsc::Sender, + ct: CancellationToken, +) { + let mut backoff = reconnect_backoff().build(); + + loop { + match stream_once(&client, &addr, &events_tx, &ct).await { + StreamOutcome::Cancelled | StreamOutcome::ChannelClosed => break, + StreamOutcome::Ended => { + // Clean disconnect: reset the backoff and reconnect promptly. + backoff = reconnect_backoff().build(); + } + StreamOutcome::Error => { + let delay = backoff.next().unwrap_or(DEFAULT_RETRY); + tokio::select! { + biased; + _ = ct.cancelled() => break, + _ = tokio::time::sleep(delay) => {} + } + } + } + } + + tracing::debug!(addr = %addr, "SSE pump stopped"); +} + +/// Opens a single SSE connection and forwards events into `events_tx` until the +/// stream ends, errors, the channel closes, or the token is cancelled. +async fn stream_once( + client: &EthBeaconNodeApiClient, + addr: &str, + events_tx: &sync::mpsc::Sender, + ct: &CancellationToken, +) -> StreamOutcome { + tracing::debug!(addr = %addr, "Connecting to SSE stream"); + + let stream = match client.event_stream(&TOPICS).await { + Ok(stream) => stream, + Err(err) => { + tracing::warn!(err = %err, addr = %addr, "Failed to connect to SSE stream"); + return StreamOutcome::Error; + } + }; + futures::pin_mut!(stream); + + loop { + tokio::select! { + biased; + + _ = ct.cancelled() => return StreamOutcome::Cancelled, + + item = stream.next() => match item { + None => return StreamOutcome::Ended, + Some(Err(err)) => { + tracing::warn!(err = %err, addr = %addr, "SSE stream read error"); + return StreamOutcome::Error; + } + Some(Ok(BeaconNodeEvent { topic, data })) => { + if data.is_empty() { + continue; + } + + let event = SseEvent { topic, data, timestamp: Utc::now() }; + if events_tx.send(event).await.is_err() { + return StreamOutcome::ChannelClosed; + } + } + }, + } + } +} + +/// Backoff used while waiting for the beacon node configuration. +fn fast_backoff() -> ExponentialBuilder { + ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(100)) + .with_max_delay(Duration::from_secs(5)) + .with_factor(1.6) + .without_max_times() + .with_jitter() +} + +/// Backoff used between SSE reconnection attempts. +fn reconnect_backoff() -> ExponentialBuilder { + ExponentialBuilder::default() + .with_min_delay(DEFAULT_RETRY) + .with_max_delay(DEFAULT_RETRY.checked_mul(2).expect("within range")) + .with_factor(1.6) + .without_max_times() + .with_jitter() +} + +/// Parses an SSE event's JSON payload, logging and discarding malformed events. +fn parse_payload(topic: &str, data: &str, addr: &str) -> Option { + match serde_json::from_str(data) { + Ok(payload) => Some(payload), + Err(err) => { + tracing::warn!(err = ?err, addr = %addr, topic, "Failed to unmarshal SSE event"); + None + } + } +} + +/// Parses a stringified `u64` field, logging and discarding on failure. +fn parse_u64(field: &str, value: &str, addr: &str) -> Option { + match value.parse::() { + Ok(parsed) => Some(parsed), + Err(err) => { + tracing::warn!(err = ?err, addr = %addr, field, value, "Failed to parse SSE numeric field"); + None + } + } +} + +/// Converts a [`std::time::Duration`] to a [`chrono::Duration`], saturating on +/// overflow. +fn to_chrono(duration: Duration) -> chrono::Duration { + chrono::Duration::from_std(duration).unwrap_or(chrono::Duration::MAX) +} + +/// Returns the delay in fractional seconds for metrics and logging. +/// +/// Histogram values are non-negative seconds; a negative delay (an event +/// observed before the slot start, e.g. due to clock skew) is clamped to zero. +fn delay_secs(delay: chrono::Duration) -> f64 { + delay + .to_std() + .map(|delay| delay.as_secs_f64()) + .unwrap_or(0.0) +} + +#[cfg(test)] +mod tests { + use super::*; + + const SLOT_DURATION: Duration = Duration::from_secs(12); + const SLOTS_PER_EPOCH: u64 = 32; + + /// Ethereum mainnet genesis time. + fn genesis() -> DateTime { + DateTime::from_timestamp(1_606_824_023, 0).expect("valid timestamp") + } + + fn test_actor(reorg_subs: Vec>) -> SseListenerActor { + SseListenerActor { + addr: "test".to_string(), + genesis_time: genesis(), + slot_duration: SLOT_DURATION, + slots_per_epoch: SLOTS_PER_EPOCH, + last_reorg_epoch: 0, + reorg_subs, + } + } + + fn event(topic: &str, data: &str, timestamp: DateTime) -> SseEvent { + SseEvent { + topic: topic.to_string(), + data: data.to_string(), + timestamp, + } + } + + /// Returns the wall-clock time `offset_secs` into the given slot. + fn slot_time(slot: u64, offset_secs: i64) -> DateTime { + let slot = i64::try_from(slot).unwrap(); + let per_slot = i64::try_from(SLOT_DURATION.as_secs()).unwrap(); + let secs = slot + .checked_mul(per_slot) + .unwrap() + .checked_add(offset_secs) + .unwrap(); + genesis() + .checked_add_signed(chrono::Duration::seconds(secs)) + .unwrap() + } + + #[test] + fn compute_delay_inside_and_outside_window() { + let actor = test_actor(vec![]); + let slot = 10; + + let (delay, ok) = + actor.compute_delay(slot, slot_time(slot, 5), |d| d < to_chrono(SLOT_DURATION)); + assert_eq!(delay, chrono::Duration::seconds(5)); + assert!(ok); + + let (delay, ok) = + actor.compute_delay(slot, slot_time(slot, 13), |d| d < to_chrono(SLOT_DURATION)); + assert_eq!(delay, chrono::Duration::seconds(13)); + assert!(!ok); + } + + #[test] + fn chain_reorg_notifies_and_dedups() { + let (tx, mut rx) = sync::mpsc::channel(8); + let mut actor = test_actor(vec![tx]); + + // slot 64, depth 0 => reorg_epoch = 64 / 32 = 2. + let data = r#"{"slot":"64","depth":"0","epoch":"2","old_head_block":"0xaa","new_head_block":"0xbb"}"#; + actor.handle_event(&event(CHAIN_REORG_EVENT, data, genesis())); + assert_eq!(rx.try_recv().unwrap(), 2); + + // Same epoch again => deduplicated, no notification. + actor.handle_event(&event(CHAIN_REORG_EVENT, data, genesis())); + assert!(rx.try_recv().is_err()); + + // New epoch => notified. + let data = r#"{"slot":"96","depth":"0","epoch":"3"}"#; + actor.handle_event(&event(CHAIN_REORG_EVENT, data, genesis())); + assert_eq!(rx.try_recv().unwrap(), 3); + } + + #[test] + fn chain_reorg_epoch_zero_first_event_is_deduped() { + // Parity with Charon: `last_reorg_epoch` starts at 0, so a first reorg at + // epoch 0 is treated as a duplicate and not notified. + let (tx, mut rx) = sync::mpsc::channel(8); + let mut actor = test_actor(vec![tx]); + + let data = r#"{"slot":"0","depth":"0","epoch":"0"}"#; + actor.handle_event(&event(CHAIN_REORG_EVENT, data, genesis())); + assert!(rx.try_recv().is_err()); + } + + #[test] + fn chain_reorg_depth_exceeding_slot_is_ignored() { + let (tx, mut rx) = sync::mpsc::channel(8); + let mut actor = test_actor(vec![tx]); + + let data = r#"{"slot":"5","depth":"10","epoch":"0"}"#; + actor.handle_event(&event(CHAIN_REORG_EVENT, data, genesis())); + assert!(rx.try_recv().is_err()); + } + + #[test] + fn malformed_payload_is_skipped_and_processing_continues() { + let (tx, mut rx) = sync::mpsc::channel(8); + let mut actor = test_actor(vec![tx]); + + // Malformed JSON is logged and skipped, not propagated as a failure. + actor.handle_event(&event(CHAIN_REORG_EVENT, "{not json", genesis())); + assert!(rx.try_recv().is_err()); + + // A subsequent valid event is still processed. + let data = r#"{"slot":"64","depth":"0","epoch":"2"}"#; + actor.handle_event(&event(CHAIN_REORG_EVENT, data, genesis())); + assert_eq!(rx.try_recv().unwrap(), 2); + } + + #[test] + fn handles_all_event_types_without_panicking() { + let (tx, mut rx) = sync::mpsc::channel(8); + let mut actor = test_actor(vec![tx]); + let ts = slot_time(64, 5); + + actor.handle_event(&event( + HEAD_EVENT, + r#"{"slot":"64","block":"0xabc","previous_duty_dependent_root":"0x01","current_duty_dependent_root":"0x02"}"#, + ts, + )); + actor.handle_event(&event( + BLOCK_GOSSIP_EVENT, + r#"{"slot":"64","block":"0xabc"}"#, + ts, + )); + actor.handle_event(&event(BLOCK_EVENT, r#"{"slot":"64","block":"0xabc"}"#, ts)); + actor.handle_event(&event("unknown_topic", "{}", ts)); + + // None of the above are chain reorgs, so nothing is notified. + assert!(rx.try_recv().is_err()); + } + + #[tokio::test] + async fn run_loop_forwards_events_and_stops_on_cancellation() { + let ct = CancellationToken::new(); + let (events_tx, events_rx) = sync::mpsc::channel(8); + let (_msg_tx, msg_rx) = sync::mpsc::channel(8); + let (reorg_tx, mut reorg_rx) = sync::mpsc::channel(8); + + let actor = test_actor(vec![reorg_tx]); + let handle = tokio::spawn(actor.run(events_rx, msg_rx, ct.clone())); + + let data = r#"{"slot":"64","depth":"0","epoch":"2"}"#; + events_tx + .send(event(CHAIN_REORG_EVENT, data, genesis())) + .await + .unwrap(); + + assert_eq!(reorg_rx.recv().await.unwrap(), 2); + + ct.cancel(); + handle.await.unwrap(); + } + + #[tokio::test] + async fn dynamic_subscription_via_handle() { + let ct = CancellationToken::new(); + let (events_tx, events_rx) = sync::mpsc::channel(8); + let (msg_tx, msg_rx) = sync::mpsc::channel(8); + + let actor = test_actor(vec![]); + tokio::spawn(actor.run(events_rx, msg_rx, ct.clone())); + + let handle = SseListenerHandle { sender: msg_tx }; + let mut reorg_rx = handle.subscribe_chain_reorg().await.unwrap(); + + let data = r#"{"slot":"64","depth":"0","epoch":"2"}"#; + events_tx + .send(event(CHAIN_REORG_EVENT, data, genesis())) + .await + .unwrap(); + + assert_eq!(reorg_rx.recv().await.unwrap(), 2); + + ct.cancel(); + } +} diff --git a/crates/app/src/sse/types.rs b/crates/app/src/sse/types.rs new file mode 100644 index 00000000..fbc1f460 --- /dev/null +++ b/crates/app/src/sse/types.rs @@ -0,0 +1,81 @@ +//! Event payloads and topics for the beacon node SSE stream. +//! +//! Only the fields consumed by the listener (for metrics, reorg detection, and +//! debug logging) are modelled; serde ignores any other fields the beacon node +//! sends. `#[serde(default)]` keeps deserialization lenient when a field is +//! absent. + +use chrono::{DateTime, Utc}; + +/// SSE topic for `head` events. +pub const HEAD_EVENT: &str = "head"; +/// SSE topic for `chain_reorg` events. +pub const CHAIN_REORG_EVENT: &str = "chain_reorg"; +/// SSE topic for `block_gossip` events. +pub const BLOCK_GOSSIP_EVENT: &str = "block_gossip"; +/// SSE topic for `block` events. +pub const BLOCK_EVENT: &str = "block"; + +/// A raw SSE event handed from the stream source to the listener actor. +/// +/// `timestamp` is captured when the event is received (production) or set +/// explicitly (tests), and is used to compute the per-event delay. +#[derive(Debug, Clone)] +pub struct SseEvent { + /// The event topic (the SSE `event:` field). + pub topic: String, + /// The raw, unparsed JSON `data` payload. + pub data: String, + /// The time the event was received. + pub timestamp: DateTime, +} + +/// Payload of a `head` event. +#[derive(Debug, Clone, Default, serde::Deserialize)] +#[serde(default)] +pub struct HeadEventData { + /// The slot of the new head. + pub slot: String, + /// The block root of the new head. + pub block: String, + /// The previous duty dependent root. + pub previous_duty_dependent_root: String, + /// The current duty dependent root. + pub current_duty_dependent_root: String, +} + +/// Payload of a `chain_reorg` event. +#[derive(Debug, Clone, Default, serde::Deserialize)] +#[serde(default)] +pub struct ChainReorgEventData { + /// The slot at which the reorg occurred. + pub slot: String, + /// The depth of the reorg in slots. + pub depth: String, + /// The epoch at which the reorg occurred. + pub epoch: String, + /// The block root of the old head. + pub old_head_block: String, + /// The block root of the new head. + pub new_head_block: String, +} + +/// Payload of a `block_gossip` event. +#[derive(Debug, Clone, Default, serde::Deserialize)] +#[serde(default)] +pub struct BlockGossipEventData { + /// The slot of the gossiped block. + pub slot: String, + /// The block root. + pub block: String, +} + +/// Payload of a `block` event. +#[derive(Debug, Clone, Default, serde::Deserialize)] +#[serde(default)] +pub struct BlockEventData { + /// The slot of the imported block. + pub slot: String, + /// The block root. + pub block: String, +} From 53e9d07d0824499d4c1e33dad37f2d8e6c14c6ad Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Thu, 18 Jun 2026 19:39:03 -0300 Subject: [PATCH 03/16] refactor(app): expect on non-exhausting reconnect backoff The reconnect backoff is built with without_max_times, so next() never returns None. Use expect to surface a logic error instead of silently falling back to DEFAULT_RETRY. --- crates/app/src/sse/mod.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs index cbe66339..350549be 100644 --- a/crates/app/src/sse/mod.rs +++ b/crates/app/src/sse/mod.rs @@ -69,6 +69,10 @@ type Result = std::result::Result; /// Allows setting up chain reorg subscriptions before the listener is started. /// The listener is started by calling [`SseListenerBuilder::build`]. pub struct SseListenerBuilder { + // CLAUDE: Instead of using a vector of subscribers, prefer to use a + // broadcast channel. This simplifies the logic since we don't need to + // keep a vector of senders, instead a single sender that broadcasts to all + // subscribers. reorg_subs: Vec>, } @@ -216,6 +220,11 @@ impl SseListenerActor { } fn handle_head(&self, event: &SseEvent) { + // CLAUDE: + // 1. On error there is nothing being raised or logged (uses immediate return). + // What does Charon do here? + // 2. The parsing is split in two steps; can we do a single parse of all the + // fields at once? If not, why? let Some(head) = parse_payload::(HEAD_EVENT, &event.data, &self.addr) else { return; }; @@ -256,6 +265,8 @@ impl SseListenerActor { } fn handle_chain_reorg(&mut self, event: &SseEvent) { + // CLAUDE: Same situation as in `handle_head`; check other similar instances in + // other methods (I skipped adding comments to those) let Some(reorg) = parse_payload::(CHAIN_REORG_EVENT, &event.data, &self.addr) else { @@ -360,6 +371,9 @@ impl SseListenerActor { } self.last_reorg_epoch = epoch; + // CLAUDE: By using tokio's broadcast channel, we can simply send the message + // and let the channel handle the subscribers, instead of manually iterating and + // pruning them. let addr = &self.addr; self.reorg_subs.retain(|tx| match tx.try_send(epoch) { Ok(()) => true, @@ -440,7 +454,9 @@ async fn run_pump( backoff = reconnect_backoff().build(); } StreamOutcome::Error => { - let delay = backoff.next().unwrap_or(DEFAULT_RETRY); + let delay = backoff + .next() + .expect("reconnect backoff is configured without a retry limit"); tokio::select! { biased; _ = ct.cancelled() => break, @@ -499,6 +515,8 @@ async fn stream_once( } } +// CLAUDE: Check if these backoff configurations already exist in other modules in Pluto + /// Backoff used while waiting for the beacon node configuration. fn fast_backoff() -> ExponentialBuilder { ExponentialBuilder::default() @@ -519,8 +537,10 @@ fn reconnect_backoff() -> ExponentialBuilder { .with_jitter() } + /// Parses an SSE event's JSON payload, logging and discarding malformed events. fn parse_payload(topic: &str, data: &str, addr: &str) -> Option { + // CLAUDE: I'm not sure about having this function alltogether. Explore removing this helper altogether with a simplified parsing logic that parses everything in one pass without resorting to additional functions (ex `parse_u64`). match serde_json::from_str(data) { Ok(payload) => Some(payload), Err(err) => { @@ -544,6 +564,7 @@ fn parse_u64(field: &str, value: &str, addr: &str) -> Option { /// Converts a [`std::time::Duration`] to a [`chrono::Duration`], saturating on /// overflow. fn to_chrono(duration: Duration) -> chrono::Duration { + // CLAUDE: Inline this function chrono::Duration::from_std(duration).unwrap_or(chrono::Duration::MAX) } From 61200bb34f64bef42c63752560d9b49421e8f601 Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Thu, 18 Jun 2026 19:40:32 -0300 Subject: [PATCH 04/16] refactor(app): inline to_chrono helper into delay windows Compute each delay window inline where it is used instead of routing through a single-use to_chrono helper. --- crates/app/src/sse/mod.rs | 36 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs index 350549be..be4f9bd2 100644 --- a/crates/app/src/sse/mod.rs +++ b/crates/app/src/sse/mod.rs @@ -240,9 +240,9 @@ impl SseListenerActor { // The chain's head is updated once a majority of the chain votes for a // block, which realistically happens between 2/3 and 3/3 of the slot. - let (delay, ok) = self.compute_delay(slot, event.timestamp, |delay| { - delay < to_chrono(self.slot_duration) - }); + let window = + chrono::Duration::from_std(self.slot_duration).unwrap_or(chrono::Duration::MAX); + let (delay, ok) = self.compute_delay(slot, event.timestamp, |delay| delay < window); let delay_s = delay_secs(delay); if ok { @@ -323,8 +323,8 @@ impl SseListenerActor { // A block should be received via gossip between 0/3 and 1/3 of the slot. let third = self.slot_duration.checked_div(3).expect("non-zero divisor"); - let (delay, ok) = - self.compute_delay(slot, event.timestamp, |delay| delay < to_chrono(third)); + let window = chrono::Duration::from_std(third).unwrap_or(chrono::Duration::MAX); + let (delay, ok) = self.compute_delay(slot, event.timestamp, |delay| delay < window); let delay_s = delay_secs(delay); if !ok { @@ -349,8 +349,8 @@ impl SseListenerActor { // A block should be imported into fork choice between 0/3 and 1/3 of the // slot. let third = self.slot_duration.checked_div(3).expect("non-zero divisor"); - let (delay, ok) = - self.compute_delay(slot, event.timestamp, |delay| delay < to_chrono(third)); + let window = chrono::Duration::from_std(third).unwrap_or(chrono::Duration::MAX); + let (delay, ok) = self.compute_delay(slot, event.timestamp, |delay| delay < window); let delay_s = delay_secs(delay); if !ok { @@ -515,7 +515,8 @@ async fn stream_once( } } -// CLAUDE: Check if these backoff configurations already exist in other modules in Pluto +// CLAUDE: Check if these backoff configurations already exist in other modules +// in Pluto /// Backoff used while waiting for the beacon node configuration. fn fast_backoff() -> ExponentialBuilder { @@ -537,10 +538,11 @@ fn reconnect_backoff() -> ExponentialBuilder { .with_jitter() } - /// Parses an SSE event's JSON payload, logging and discarding malformed events. fn parse_payload(topic: &str, data: &str, addr: &str) -> Option { - // CLAUDE: I'm not sure about having this function alltogether. Explore removing this helper altogether with a simplified parsing logic that parses everything in one pass without resorting to additional functions (ex `parse_u64`). + // CLAUDE: I'm not sure about having this function alltogether. Explore removing + // this helper altogether with a simplified parsing logic that parses everything + // in one pass without resorting to additional functions (ex `parse_u64`). match serde_json::from_str(data) { Ok(payload) => Some(payload), Err(err) => { @@ -561,13 +563,6 @@ fn parse_u64(field: &str, value: &str, addr: &str) -> Option { } } -/// Converts a [`std::time::Duration`] to a [`chrono::Duration`], saturating on -/// overflow. -fn to_chrono(duration: Duration) -> chrono::Duration { - // CLAUDE: Inline this function - chrono::Duration::from_std(duration).unwrap_or(chrono::Duration::MAX) -} - /// Returns the delay in fractional seconds for metrics and logging. /// /// Histogram values are non-negative seconds; a negative delay (an event @@ -628,14 +623,13 @@ mod tests { fn compute_delay_inside_and_outside_window() { let actor = test_actor(vec![]); let slot = 10; + let window = chrono::Duration::from_std(SLOT_DURATION).unwrap(); - let (delay, ok) = - actor.compute_delay(slot, slot_time(slot, 5), |d| d < to_chrono(SLOT_DURATION)); + let (delay, ok) = actor.compute_delay(slot, slot_time(slot, 5), |d| d < window); assert_eq!(delay, chrono::Duration::seconds(5)); assert!(ok); - let (delay, ok) = - actor.compute_delay(slot, slot_time(slot, 13), |d| d < to_chrono(SLOT_DURATION)); + let (delay, ok) = actor.compute_delay(slot, slot_time(slot, 13), |d| d < window); assert_eq!(delay, chrono::Duration::seconds(13)); assert!(!ok); } From 586fed2c7cdaf2c9c6e3dd41987f5341e1659a8d Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Thu, 18 Jun 2026 19:44:07 -0300 Subject: [PATCH 05/16] refactor(app): parse SSE payloads in a single typed pass Deserialize numeric fields (slot, depth) directly into u64 via serde_with::DisplayFromStr instead of decoding them as strings and re-parsing in a second step. This removes the parse_u64 helper and makes each handler parse its payload in one pass. Malformed payloads are logged (with the topic) and skipped. --- Cargo.lock | 1 + crates/app/Cargo.toml | 1 + crates/app/src/sse/mod.rs | 49 ++++++------------------------------- crates/app/src/sse/types.rs | 23 ++++++++++++----- 4 files changed, 26 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 185b2efd..0780e60d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5488,6 +5488,7 @@ dependencies = [ "reqwest 0.13.3", "serde", "serde_json", + "serde_with", "tar", "tempfile", "test-case", diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index 7b46eeb0..2977731a 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -26,6 +26,7 @@ serde.workspace = true serde_json.workspace = true hex.workspace = true k256.workspace = true +serde_with.workspace = true bon.workspace = true flate2.workspace = true tar.workspace = true diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs index be4f9bd2..1aca45a5 100644 --- a/crates/app/src/sse/mod.rs +++ b/crates/app/src/sse/mod.rs @@ -220,18 +220,10 @@ impl SseListenerActor { } fn handle_head(&self, event: &SseEvent) { - // CLAUDE: - // 1. On error there is nothing being raised or logged (uses immediate return). - // What does Charon do here? - // 2. The parsing is split in two steps; can we do a single parse of all the - // fields at once? If not, why? let Some(head) = parse_payload::(HEAD_EVENT, &event.data, &self.addr) else { return; }; - - let Some(slot) = parse_u64("slot", &head.slot, &self.addr) else { - return; - }; + let slot = head.slot; if i64::try_from(slot).is_err() { tracing::warn!(addr = %self.addr, slot, "Head slot value exceeds i64 range"); @@ -265,20 +257,12 @@ impl SseListenerActor { } fn handle_chain_reorg(&mut self, event: &SseEvent) { - // CLAUDE: Same situation as in `handle_head`; check other similar instances in - // other methods (I skipped adding comments to those) let Some(reorg) = parse_payload::(CHAIN_REORG_EVENT, &event.data, &self.addr) else { return; }; - - let Some(slot) = parse_u64("slot", &reorg.slot, &self.addr) else { - return; - }; - let Some(depth) = parse_u64("depth", &reorg.depth, &self.addr) else { - return; - }; + let (slot, depth) = (reorg.slot, reorg.depth); if slot < depth { tracing::warn!(addr = %self.addr, slot, depth, "Invalid chain reorg event: depth exceeds slot"); @@ -316,10 +300,7 @@ impl SseListenerActor { else { return; }; - - let Some(slot) = parse_u64("slot", &gossip.slot, &self.addr) else { - return; - }; + let slot = gossip.slot; // A block should be received via gossip between 0/3 and 1/3 of the slot. let third = self.slot_duration.checked_div(3).expect("non-zero divisor"); @@ -341,10 +322,7 @@ impl SseListenerActor { else { return; }; - - let Some(slot) = parse_u64("slot", &block.slot, &self.addr) else { - return; - }; + let slot = block.slot; // A block should be imported into fork choice between 0/3 and 1/3 of the // slot. @@ -538,26 +516,13 @@ fn reconnect_backoff() -> ExponentialBuilder { .with_jitter() } -/// Parses an SSE event's JSON payload, logging and discarding malformed events. +/// Deserializes an SSE event's JSON payload (numeric fields and all) in a +/// single pass, logging and discarding malformed events. fn parse_payload(topic: &str, data: &str, addr: &str) -> Option { - // CLAUDE: I'm not sure about having this function alltogether. Explore removing - // this helper altogether with a simplified parsing logic that parses everything - // in one pass without resorting to additional functions (ex `parse_u64`). match serde_json::from_str(data) { Ok(payload) => Some(payload), Err(err) => { - tracing::warn!(err = ?err, addr = %addr, topic, "Failed to unmarshal SSE event"); - None - } - } -} - -/// Parses a stringified `u64` field, logging and discarding on failure. -fn parse_u64(field: &str, value: &str, addr: &str) -> Option { - match value.parse::() { - Ok(parsed) => Some(parsed), - Err(err) => { - tracing::warn!(err = ?err, addr = %addr, field, value, "Failed to parse SSE numeric field"); + tracing::warn!(err = ?err, addr = %addr, topic, "Failed to parse SSE event"); None } } diff --git a/crates/app/src/sse/types.rs b/crates/app/src/sse/types.rs index fbc1f460..5600785c 100644 --- a/crates/app/src/sse/types.rs +++ b/crates/app/src/sse/types.rs @@ -3,9 +3,11 @@ //! Only the fields consumed by the listener (for metrics, reorg detection, and //! debug logging) are modelled; serde ignores any other fields the beacon node //! sends. `#[serde(default)]` keeps deserialization lenient when a field is -//! absent. +//! absent. Numeric fields are sent by the beacon node as quoted strings and are +//! parsed directly into integers via [`serde_with::DisplayFromStr`]. use chrono::{DateTime, Utc}; +use serde_with::{DisplayFromStr, serde_as}; /// SSE topic for `head` events. pub const HEAD_EVENT: &str = "head"; @@ -31,11 +33,13 @@ pub struct SseEvent { } /// Payload of a `head` event. +#[serde_as] #[derive(Debug, Clone, Default, serde::Deserialize)] #[serde(default)] pub struct HeadEventData { /// The slot of the new head. - pub slot: String, + #[serde_as(as = "DisplayFromStr")] + pub slot: u64, /// The block root of the new head. pub block: String, /// The previous duty dependent root. @@ -45,13 +49,16 @@ pub struct HeadEventData { } /// Payload of a `chain_reorg` event. +#[serde_as] #[derive(Debug, Clone, Default, serde::Deserialize)] #[serde(default)] pub struct ChainReorgEventData { /// The slot at which the reorg occurred. - pub slot: String, + #[serde_as(as = "DisplayFromStr")] + pub slot: u64, /// The depth of the reorg in slots. - pub depth: String, + #[serde_as(as = "DisplayFromStr")] + pub depth: u64, /// The epoch at which the reorg occurred. pub epoch: String, /// The block root of the old head. @@ -61,21 +68,25 @@ pub struct ChainReorgEventData { } /// Payload of a `block_gossip` event. +#[serde_as] #[derive(Debug, Clone, Default, serde::Deserialize)] #[serde(default)] pub struct BlockGossipEventData { /// The slot of the gossiped block. - pub slot: String, + #[serde_as(as = "DisplayFromStr")] + pub slot: u64, /// The block root. pub block: String, } /// Payload of a `block` event. +#[serde_as] #[derive(Debug, Clone, Default, serde::Deserialize)] #[serde(default)] pub struct BlockEventData { /// The slot of the imported block. - pub slot: String, + #[serde_as(as = "DisplayFromStr")] + pub slot: u64, /// The block root. pub block: String, } From 4988660025d724e5b93312f4bff6839935b7e53a Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Thu, 18 Jun 2026 20:10:09 -0300 Subject: [PATCH 06/16] chore(app): remove addressed review notes Resolved per review: keep the mpsc Vec of reorg subscribers (returns an mpsc::Receiver that plugs directly into the scheduler) and keep the local backoff helpers (consistent with the existing scheduler/bootnode pattern). --- crates/app/src/sse/mod.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs index 1aca45a5..9ff11fb9 100644 --- a/crates/app/src/sse/mod.rs +++ b/crates/app/src/sse/mod.rs @@ -69,10 +69,6 @@ type Result = std::result::Result; /// Allows setting up chain reorg subscriptions before the listener is started. /// The listener is started by calling [`SseListenerBuilder::build`]. pub struct SseListenerBuilder { - // CLAUDE: Instead of using a vector of subscribers, prefer to use a - // broadcast channel. This simplifies the logic since we don't need to - // keep a vector of senders, instead a single sender that broadcasts to all - // subscribers. reorg_subs: Vec>, } @@ -349,9 +345,6 @@ impl SseListenerActor { } self.last_reorg_epoch = epoch; - // CLAUDE: By using tokio's broadcast channel, we can simply send the message - // and let the channel handle the subscribers, instead of manually iterating and - // pruning them. let addr = &self.addr; self.reorg_subs.retain(|tx| match tx.try_send(epoch) { Ok(()) => true, @@ -493,9 +486,6 @@ async fn stream_once( } } -// CLAUDE: Check if these backoff configurations already exist in other modules -// in Pluto - /// Backoff used while waiting for the beacon node configuration. fn fast_backoff() -> ExponentialBuilder { ExponentialBuilder::default() From eb550cc09ef1b660b432dbbadcb07b1375f1f8cb Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Thu, 18 Jun 2026 20:11:57 -0300 Subject: [PATCH 07/16] Add TODO's --- crates/app/src/sse/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs index 9ff11fb9..cb3218d2 100644 --- a/crates/app/src/sse/mod.rs +++ b/crates/app/src/sse/mod.rs @@ -69,6 +69,8 @@ type Result = std::result::Result; /// Allows setting up chain reorg subscriptions before the listener is started. /// The listener is started by calling [`SseListenerBuilder::build`]. pub struct SseListenerBuilder { + // TODO: Prefer to use a `broadcast` channel here to simplify the subscription management. + // Requires revisiting the potential subscribers. reorg_subs: Vec>, } @@ -486,6 +488,8 @@ async fn stream_once( } } +// TODO: Extract these backoff configurations into a shared module. + /// Backoff used while waiting for the beacon node configuration. fn fast_backoff() -> ExponentialBuilder { ExponentialBuilder::default() From 2b4d72332fdf58601c8cac6da39805c5035398c9 Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Fri, 19 Jun 2026 09:07:33 -0300 Subject: [PATCH 08/16] refactor(app): drop redundant head-slot i64 range guard The slot > i64::MAX guard mirrored Charon, where it exists to avoid a uint64->int64 wrap in its duration math. compute_delay already saturates the conversion, so the guard only skipped slots no real beacon node can emit; remove it. --- crates/app/src/sse/mod.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs index cb3218d2..2133fd8f 100644 --- a/crates/app/src/sse/mod.rs +++ b/crates/app/src/sse/mod.rs @@ -223,11 +223,6 @@ impl SseListenerActor { }; let slot = head.slot; - if i64::try_from(slot).is_err() { - tracing::warn!(addr = %self.addr, slot, "Head slot value exceeds i64 range"); - return; - } - // The chain's head is updated once a majority of the chain votes for a // block, which realistically happens between 2/3 and 3/3 of the slot. let window = @@ -258,8 +253,11 @@ impl SseListenerActor { let Some(reorg) = parse_payload::(CHAIN_REORG_EVENT, &event.data, &self.addr) else { + // CLAUDE: On failed parsing we don't do anything. What does Charon do? Should + // we at least log? return; }; + // CLAUDE: Inline these variables let (slot, depth) = (reorg.slot, reorg.depth); if slot < depth { From 2463e70ef5986e9873d5faabc4f326198f2c4e72 Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Fri, 19 Jun 2026 09:08:26 -0300 Subject: [PATCH 09/16] refactor(app): inline slot/depth in handle_chain_reorg Use the parsed reorg fields directly instead of binding intermediate slot/depth locals. --- crates/app/src/sse/mod.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs index 2133fd8f..60bf1255 100644 --- a/crates/app/src/sse/mod.rs +++ b/crates/app/src/sse/mod.rs @@ -257,18 +257,16 @@ impl SseListenerActor { // we at least log? return; }; - // CLAUDE: Inline these variables - let (slot, depth) = (reorg.slot, reorg.depth); - - if slot < depth { - tracing::warn!(addr = %self.addr, slot, depth, "Invalid chain reorg event: depth exceeds slot"); + if reorg.slot < reorg.depth { + tracing::warn!(addr = %self.addr, slot = reorg.slot, depth = reorg.depth, "Invalid chain reorg event: depth exceeds slot"); return; } // `slot >= depth` is guaranteed above and `slots_per_epoch` is non-zero // (validated by `fetch_slots_config`). - let reorg_epoch = slot - .checked_sub(depth) + let reorg_epoch = reorg + .slot + .checked_sub(reorg.depth) .expect("slot >= depth") .checked_div(self.slots_per_epoch) .expect("non-zero slots per epoch"); @@ -276,17 +274,17 @@ impl SseListenerActor { tracing::debug!( addr = %self.addr, - slot, + slot = reorg.slot, epoch = %reorg.epoch, reorg_epoch, - depth, + depth = reorg.depth, old_head_block = %reorg.old_head_block, new_head_block = %reorg.new_head_block, "SSE chain reorg event" ); // Reorg depths fit comfortably in a `u32`; `f64::from` is lossless. - let depth_f64 = f64::from(u32::try_from(depth).unwrap_or(u32::MAX)); + let depth_f64 = f64::from(u32::try_from(reorg.depth).unwrap_or(u32::MAX)); SSE_METRICS.sse_chain_reorg_depth[&self.addr].observe(depth_f64); } From 7a75756a490db0a7ce0bddc8ad9a20e640297886 Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Fri, 19 Jun 2026 09:09:45 -0300 Subject: [PATCH 10/16] refactor(app): inline SSE payload parsing with explicit logging Parse each event payload inline in its handler and log a warning on failure at the call site, instead of hiding it inside parse_payload. Makes the discard-on-malformed behavior visible. (Charon propagates the error and tears down the SSE connection; we log and continue, as a deliberate robustness choice.) --- crates/app/src/sse/mod.rs | 51 ++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs index 60bf1255..061ce319 100644 --- a/crates/app/src/sse/mod.rs +++ b/crates/app/src/sse/mod.rs @@ -218,8 +218,12 @@ impl SseListenerActor { } fn handle_head(&self, event: &SseEvent) { - let Some(head) = parse_payload::(HEAD_EVENT, &event.data, &self.addr) else { - return; + let head: HeadEventData = match serde_json::from_str(&event.data) { + Ok(head) => head, + Err(err) => { + tracing::warn!(err = ?err, addr = %self.addr, topic = HEAD_EVENT, "Failed to parse SSE event"); + return; + } }; let slot = head.slot; @@ -250,12 +254,12 @@ impl SseListenerActor { } fn handle_chain_reorg(&mut self, event: &SseEvent) { - let Some(reorg) = - parse_payload::(CHAIN_REORG_EVENT, &event.data, &self.addr) - else { - // CLAUDE: On failed parsing we don't do anything. What does Charon do? Should - // we at least log? - return; + let reorg: ChainReorgEventData = match serde_json::from_str(&event.data) { + Ok(reorg) => reorg, + Err(err) => { + tracing::warn!(err = ?err, addr = %self.addr, topic = CHAIN_REORG_EVENT, "Failed to parse SSE event"); + return; + } }; if reorg.slot < reorg.depth { tracing::warn!(addr = %self.addr, slot = reorg.slot, depth = reorg.depth, "Invalid chain reorg event: depth exceeds slot"); @@ -289,10 +293,12 @@ impl SseListenerActor { } fn handle_block_gossip(&self, event: &SseEvent) { - let Some(gossip) = - parse_payload::(BLOCK_GOSSIP_EVENT, &event.data, &self.addr) - else { - return; + let gossip: BlockGossipEventData = match serde_json::from_str(&event.data) { + Ok(gossip) => gossip, + Err(err) => { + tracing::warn!(err = ?err, addr = %self.addr, topic = BLOCK_GOSSIP_EVENT, "Failed to parse SSE event"); + return; + } }; let slot = gossip.slot; @@ -312,9 +318,12 @@ impl SseListenerActor { } fn handle_block(&self, event: &SseEvent) { - let Some(block) = parse_payload::(BLOCK_EVENT, &event.data, &self.addr) - else { - return; + let block: BlockEventData = match serde_json::from_str(&event.data) { + Ok(block) => block, + Err(err) => { + tracing::warn!(err = ?err, addr = %self.addr, topic = BLOCK_EVENT, "Failed to parse SSE event"); + return; + } }; let slot = block.slot; @@ -506,18 +515,6 @@ fn reconnect_backoff() -> ExponentialBuilder { .with_jitter() } -/// Deserializes an SSE event's JSON payload (numeric fields and all) in a -/// single pass, logging and discarding malformed events. -fn parse_payload(topic: &str, data: &str, addr: &str) -> Option { - match serde_json::from_str(data) { - Ok(payload) => Some(payload), - Err(err) => { - tracing::warn!(err = ?err, addr = %addr, topic, "Failed to parse SSE event"); - None - } - } -} - /// Returns the delay in fractional seconds for metrics and logging. /// /// Histogram values are non-negative seconds; a negative delay (an event From da7c040a5096f806b310b258a71a3b706777397d Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Fri, 19 Jun 2026 16:59:26 -0300 Subject: [PATCH 11/16] fix(app): back off on unproductive SSE reconnects MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reset the reconnect backoff only after a connection that forwarded at least one event. Previously a clean EOF (StreamOutcome::Ended) reset the backoff and reconnected with zero delay, so a beacon node — or a proxy in front of it — that accepts and immediately closes the connection drove a tight connect/EOF/reconnect loop with no rate limiting. Both the Ended and Error arms now back off; only a productive connection resets it. --- crates/app/src/sse/mod.rs | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs index 061ce319..66e92609 100644 --- a/crates/app/src/sse/mod.rs +++ b/crates/app/src/sse/mod.rs @@ -404,10 +404,13 @@ async fn fetch_config(client: &EthBeaconNodeApiClient) -> Result<(DateTime, /// Outcome of a single SSE stream connection. enum StreamOutcome { - /// The stream ended cleanly (server closed the connection). - Ended, + /// The stream ended cleanly (server closed the connection). `productive` + /// is true if at least one event was forwarded before the close. + Ended { productive: bool }, /// A connection or read error occurred; the caller should back off. - Error, + /// `productive` is true if at least one event was forwarded before the + /// error. + Error { productive: bool }, /// The actor's event channel was closed; the pump should stop. ChannelClosed, /// The cancellation token fired. @@ -427,11 +430,14 @@ async fn run_pump( loop { match stream_once(&client, &addr, &events_tx, &ct).await { StreamOutcome::Cancelled | StreamOutcome::ChannelClosed => break, - StreamOutcome::Ended => { - // Clean disconnect: reset the backoff and reconnect promptly. - backoff = reconnect_backoff().build(); - } - StreamOutcome::Error => { + StreamOutcome::Ended { productive } | StreamOutcome::Error { productive } => { + // Reset the backoff only after a productive connection (one that + // forwarded at least one event). Otherwise a server that accepts + // and immediately closes the connection — or fails to connect — + // would drive a tight reconnect loop with no rate limiting. + if productive { + backoff = reconnect_backoff().build(); + } let delay = backoff .next() .expect("reconnect backoff is configured without a retry limit"); @@ -461,11 +467,12 @@ async fn stream_once( Ok(stream) => stream, Err(err) => { tracing::warn!(err = %err, addr = %addr, "Failed to connect to SSE stream"); - return StreamOutcome::Error; + return StreamOutcome::Error { productive: false }; } }; futures::pin_mut!(stream); + let mut productive = false; loop { tokio::select! { biased; @@ -473,10 +480,10 @@ async fn stream_once( _ = ct.cancelled() => return StreamOutcome::Cancelled, item = stream.next() => match item { - None => return StreamOutcome::Ended, + None => return StreamOutcome::Ended { productive }, Some(Err(err)) => { tracing::warn!(err = %err, addr = %addr, "SSE stream read error"); - return StreamOutcome::Error; + return StreamOutcome::Error { productive }; } Some(Ok(BeaconNodeEvent { topic, data })) => { if data.is_empty() { @@ -487,6 +494,7 @@ async fn stream_once( if events_tx.send(event).await.is_err() { return StreamOutcome::ChannelClosed; } + productive = true; } }, } From 385f290aa3f3280995c465d43a81af97c59b7d1b Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Fri, 19 Jun 2026 16:59:52 -0300 Subject: [PATCH 12/16] fix(app): stop SSE actor when event channel closes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The actor's select arm used `Some(event) = events_rx.recv()`, so when the pump dropped its sender the branch became permanently disabled and the actor parked forever on cancellation only — no events, no reconnection, no signal. Match the channel explicitly and break with an error log on closure so the listener shuts down instead of zombie-ing. --- crates/app/src/sse/mod.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs index 66e92609..5abb94ac 100644 --- a/crates/app/src/sse/mod.rs +++ b/crates/app/src/sse/mod.rs @@ -200,8 +200,15 @@ impl SseListenerActor { } }, - Some(event) = events_rx.recv() => { - self.handle_event(&event); + event = events_rx.recv() => match event { + Some(event) => self.handle_event(&event), + // The pump dropped its sender (e.g. it returned early or + // panicked). Stop instead of parking forever on a disabled + // branch with no events and no reconnection. + None => { + tracing::error!(addr = %self.addr, "SSE event channel closed; stopping listener"); + break; + } }, } } From 35f145673ff117eb19333898f8fb204473257546 Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Fri, 19 Jun 2026 17:01:52 -0300 Subject: [PATCH 13/16] refactor(eth2api): preserve SSE error source instead of stringifying MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the flat EventStream(String) variant — which dropped the source() chain, HTTP status and reqwest error classification — with two typed variants: EventStreamRequest(#[from] reqwest::Error) for the request send / non-success status, and EventStreamRead(#[from] EventStreamError) for stream read errors. The URL-cannot-be-base case reuses the existing RequestError(anyhow) variant. Callers can now inspect and propagate the real cause. --- crates/eth2api/src/extensions.rs | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/crates/eth2api/src/extensions.rs b/crates/eth2api/src/extensions.rs index de1e511b..14d82278 100644 --- a/crates/eth2api/src/extensions.rs +++ b/crates/eth2api/src/extensions.rs @@ -38,9 +38,14 @@ pub enum EthBeaconNodeApiClientError { #[error("Domain type not found: {0}")] DomainTypeNotFound(String), - /// Error while opening or reading the beacon node SSE event stream. - #[error("Event stream error: {0}")] - EventStream(String), + /// Error while opening the beacon node SSE event stream (request send or + /// non-success status). + #[error("Event stream request error: {0}")] + EventStreamRequest(#[from] reqwest::Error), + + /// Error while reading from the beacon node SSE event stream. + #[error("Event stream read error: {0}")] + EventStreamRead(#[from] eventsource_stream::EventStreamError), } /// A single Server-Sent Event from a beacon node: the event topic (the SSE @@ -395,7 +400,11 @@ impl EthBeaconNodeApiClient { > { let mut url = self.base_url.clone(); url.path_segments_mut() - .map_err(|()| EthBeaconNodeApiClientError::EventStream("URL cannot be a base".into()))? + .map_err(|()| { + EthBeaconNodeApiClientError::RequestError(anyhow::anyhow!( + "base URL cannot be a base" + )) + })? .push("eth") .push("v1") .push("events"); @@ -412,17 +421,15 @@ impl EthBeaconNodeApiClient { .query(&query) .header(reqwest::header::ACCEPT, "text/event-stream") .send() - .await - .map_err(|err| EthBeaconNodeApiClientError::EventStream(err.to_string()))? - .error_for_status() - .map_err(|err| EthBeaconNodeApiClientError::EventStream(err.to_string()))?; + .await? + .error_for_status()?; let stream = response.bytes_stream().eventsource().map(|item| { item.map(|event| BeaconNodeEvent { topic: event.event, data: event.data, }) - .map_err(|err| EthBeaconNodeApiClientError::EventStream(err.to_string())) + .map_err(EthBeaconNodeApiClientError::EventStreamRead) }); Ok(stream) From cb40ef3a639876e0dd85b58543f68c60f88af165 Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Fri, 19 Jun 2026 17:02:43 -0300 Subject: [PATCH 14/16] refactor(app): parse chain_reorg epoch as u64 The epoch field was a raw String while its siblings slot/depth are parsed to u64 via DisplayFromStr. Parse it the same way for consistency; the debug log records it as an integer like slot. --- crates/app/src/sse/mod.rs | 2 +- crates/app/src/sse/types.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs index 5abb94ac..b4cda6c6 100644 --- a/crates/app/src/sse/mod.rs +++ b/crates/app/src/sse/mod.rs @@ -286,7 +286,7 @@ impl SseListenerActor { tracing::debug!( addr = %self.addr, slot = reorg.slot, - epoch = %reorg.epoch, + epoch = reorg.epoch, reorg_epoch, depth = reorg.depth, old_head_block = %reorg.old_head_block, diff --git a/crates/app/src/sse/types.rs b/crates/app/src/sse/types.rs index 5600785c..7c2e4594 100644 --- a/crates/app/src/sse/types.rs +++ b/crates/app/src/sse/types.rs @@ -60,7 +60,8 @@ pub struct ChainReorgEventData { #[serde_as(as = "DisplayFromStr")] pub depth: u64, /// The epoch at which the reorg occurred. - pub epoch: String, + #[serde_as(as = "DisplayFromStr")] + pub epoch: u64, /// The block root of the old head. pub old_head_block: String, /// The block root of the new head. From e69100862a00de3756434d089e4621da9d6b2673 Mon Sep 17 00:00:00 2001 From: "emlautarom1-agent[bot]" <292495798+emlautarom1-agent[bot]@users.noreply.github.com> Date: Fri, 19 Jun 2026 17:07:13 -0300 Subject: [PATCH 15/16] test(app): cover SSE pump and actor reconnection paths Add tests for the reconnection state machine that previously had no coverage: - stream_once reports productive vs unproductive connections (the backoff-reset signal), and returns Error/Cancelled/ChannelClosed for the respective conditions, exercised against a wiremock beacon node. - run_pump forwards events and stops on cancellation. - the actor stops (rather than zombie-ing) when its event channel closes while the cancellation token is still live. --- crates/app/src/sse/mod.rs | 132 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs index b4cda6c6..b89048c3 100644 --- a/crates/app/src/sse/mod.rs +++ b/crates/app/src/sse/mod.rs @@ -725,4 +725,136 @@ mod tests { ct.cancel(); } + + const HEAD_EVENT_BODY: &str = "event: head\ndata: {\"slot\":\"10\"}\n\n"; + + /// Starts a mock beacon node serving the given SSE body (and status) at + /// `/eth/v1/events` and returns a client pointed at it. The returned + /// `MockServer` must be kept alive for the duration of the test. + async fn mock_sse(status: u16, body: &str) -> (wiremock::MockServer, EthBeaconNodeApiClient) { + use wiremock::{ + Mock, ResponseTemplate, + matchers::{method, path}, + }; + + let server = wiremock::MockServer::start().await; + Mock::given(method("GET")) + .and(path("/eth/v1/events")) + .respond_with( + ResponseTemplate::new(status).set_body_raw(body.to_owned(), "text/event-stream"), + ) + .mount(&server) + .await; + + let client = EthBeaconNodeApiClient::with_base_url(server.uri()).expect("valid url"); + (server, client) + } + + #[tokio::test] + async fn run_loop_stops_when_event_channel_closes() { + // The pump dropping its sender must stop the actor, not leave it parked + // forever with no events and no reconnection (the token is never fired). + let ct = CancellationToken::new(); + let (events_tx, events_rx) = sync::mpsc::channel(8); + let (_msg_tx, msg_rx) = sync::mpsc::channel(8); + + let actor = test_actor(vec![]); + let handle = tokio::spawn(actor.run(events_rx, msg_rx, ct.clone())); + + drop(events_tx); + + tokio::time::timeout(Duration::from_secs(5), handle) + .await + .expect("actor did not stop after the event channel closed") + .expect("actor task panicked"); + assert!( + !ct.is_cancelled(), + "actor stopped on its own, without cancellation" + ); + } + + #[tokio::test] + async fn stream_once_forwards_event_and_reports_productive() { + let (_server, client) = mock_sse(200, HEAD_EVENT_BODY).await; + let (events_tx, mut events_rx) = sync::mpsc::channel(8); + let ct = CancellationToken::new(); + + let outcome = stream_once(&client, "test", &events_tx, &ct).await; + + assert!(matches!(outcome, StreamOutcome::Ended { productive: true })); + let event = events_rx.try_recv().expect("event forwarded"); + assert_eq!(event.topic, HEAD_EVENT); + } + + #[tokio::test] + async fn stream_once_reports_unproductive_on_immediate_eof() { + // A connection that accepts and immediately closes with no events must + // report `productive: false` so the pump backs off instead of looping. + let (_server, client) = mock_sse(200, "").await; + let (events_tx, _events_rx) = sync::mpsc::channel(8); + let ct = CancellationToken::new(); + + let outcome = stream_once(&client, "test", &events_tx, &ct).await; + + assert!(matches!( + outcome, + StreamOutcome::Ended { productive: false } + )); + } + + #[tokio::test] + async fn stream_once_reports_error_on_non_success_status() { + let (_server, client) = mock_sse(500, "").await; + let (events_tx, _events_rx) = sync::mpsc::channel(8); + let ct = CancellationToken::new(); + + let outcome = stream_once(&client, "test", &events_tx, &ct).await; + + assert!(matches!( + outcome, + StreamOutcome::Error { productive: false } + )); + } + + #[tokio::test] + async fn stream_once_returns_cancelled_when_token_fires() { + let (_server, client) = mock_sse(200, HEAD_EVENT_BODY).await; + let (events_tx, _events_rx) = sync::mpsc::channel(8); + let ct = CancellationToken::new(); + ct.cancel(); + + let outcome = stream_once(&client, "test", &events_tx, &ct).await; + + assert!(matches!(outcome, StreamOutcome::Cancelled)); + } + + #[tokio::test] + async fn stream_once_returns_channel_closed_when_receiver_dropped() { + let (_server, client) = mock_sse(200, HEAD_EVENT_BODY).await; + let (events_tx, events_rx) = sync::mpsc::channel(8); + drop(events_rx); + let ct = CancellationToken::new(); + + let outcome = stream_once(&client, "test", &events_tx, &ct).await; + + assert!(matches!(outcome, StreamOutcome::ChannelClosed)); + } + + #[tokio::test] + async fn run_pump_forwards_events_and_stops_on_cancellation() { + let (_server, client) = mock_sse(200, HEAD_EVENT_BODY).await; + let (events_tx, mut events_rx) = sync::mpsc::channel(8); + let ct = CancellationToken::new(); + + let pump = tokio::spawn(run_pump(client, "test".to_string(), events_tx, ct.clone())); + + let event = events_rx.recv().await.expect("event forwarded"); + assert_eq!(event.topic, HEAD_EVENT); + + ct.cancel(); + tokio::time::timeout(Duration::from_secs(5), pump) + .await + .expect("pump did not stop after cancellation") + .expect("pump task panicked"); + } } From c42680af440494e529bf432d07739c92e3fccd7c Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Fri, 19 Jun 2026 17:19:48 -0300 Subject: [PATCH 16/16] Increase channel buffer size --- crates/app/src/sse/mod.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs index b89048c3..ffaf45bc 100644 --- a/crates/app/src/sse/mod.rs +++ b/crates/app/src/sse/mod.rs @@ -9,11 +9,6 @@ //! a reconnecting stream "pump") that live until a [`CancellationToken`] fires, //! and the returned [`SseListenerHandle`] allows interacting with the running //! actor. -//! -//! The actor is single-threaded: it owns all of its state and processes one -//! event at a time, so no locking is required. The event source is decoupled -//! from the actor via an mpsc channel — in production a pump task forwards -//! beacon node events into it; tests push [`SseEvent`]s directly. use std::time::Duration; @@ -37,7 +32,7 @@ pub mod metrics; pub mod types; /// Default buffer size for the channels used by the listener. -const CHANNEL_BUFFER_SIZE: usize = 100; +const CHANNEL_BUFFER_SIZE: usize = 1024; /// Base delay between SSE reconnection attempts. const DEFAULT_RETRY: Duration = Duration::from_secs(1);