diff --git a/Cargo.lock b/Cargo.lock index c0e2ec20..0780e60d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5471,6 +5471,7 @@ dependencies = [ "bon", "chrono", "flate2", + "futures", "hex", "k256", "pluto-build-proto", @@ -5487,6 +5488,7 @@ dependencies = [ "reqwest 0.13.3", "serde", "serde_json", + "serde_with", "tar", "tempfile", "test-case", @@ -5495,6 +5497,7 @@ dependencies = [ "tokio-util", "tracing", "url", + "vise", "wiremock", ] @@ -5745,6 +5748,8 @@ dependencies = [ "chrono", "ethereum_ssz", "ethereum_ssz_derive", + "eventsource-stream", + "futures", "hex", "http", "oas3-gen-support", diff --git a/Cargo.toml b/Cargo.toml index 3093cf40..310f6d53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ crossbeam = "0.8.4" dyn-clone = "1.0" dyn-eq = "0.1.3" either = "1.13" +eventsource-stream = "0.2" futures = "0.3" futures-timer = "3.0" backon = "1.6.0" diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index 77f14edf..2977731a 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -9,10 +9,12 @@ publish.workspace = true [dependencies] backon.workspace = true chrono.workspace = true +futures.workspace = true pluto-core.workspace = true pluto-eth2api.workspace = true tokio.workspace = true tokio-util.workspace = true +vise.workspace = true prost.workspace = true prost-types.workspace = true regex.workspace = true @@ -24,6 +26,7 @@ serde.workspace = true serde_json.workspace = true hex.workspace = true k256.workspace = true +serde_with.workspace = true bon.workspace = true flate2.workspace = true tar.workspace = true diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index 87d7061e..28529991 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -22,5 +22,8 @@ pub mod eth2wrap; /// Private key locking service. pub mod privkeylock; +/// Listen for SSE from Beacon Node +pub mod sse; + /// Utility helpers for archiving, extracting, and comparing files/directories. pub mod utils; diff --git a/crates/app/src/sse/metrics.rs b/crates/app/src/sse/metrics.rs new file mode 100644 index 00000000..0cab6a29 --- /dev/null +++ b/crates/app/src/sse/metrics.rs @@ -0,0 +1,45 @@ +//! Prometheus metrics for the beacon node SSE listener. +//! +//! The `app_beacon_node` prefix and bucket boundaries reproduce Charon's +//! `app/beacon_node` SSE metrics. All metrics are labelled by beacon node +//! address. + +use vise::{Gauge, Global, Histogram, LabeledFamily, Metrics}; + +/// Head delay buckets in seconds. +const HEAD_DELAY_BUCKETS: [f64; 6] = [2.0, 4.0, 6.0, 8.0, 10.0, 12.0]; +/// Chain reorg depth buckets in slots. +const REORG_DEPTH_BUCKETS: [f64; 6] = [1.0, 2.0, 4.0, 6.0, 8.0, 16.0]; +/// Block reception delay buckets in seconds. +const BLOCK_BUCKETS: [f64; 14] = [ + 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 6.0, 8.0, 10.0, 12.0, +]; + +/// Metrics for the beacon node SSE listener. +#[derive(Debug, Metrics)] +#[metrics(prefix = "app_beacon_node")] +pub struct SseMetrics { + /// Current beacon node head slot, supplied by the SSE endpoint. + #[metrics(labels = ["addr"])] + pub sse_head_slot: LabeledFamily>, + + /// Delay in seconds between slot start and head update. + #[metrics(buckets = &HEAD_DELAY_BUCKETS, labels = ["addr"])] + pub sse_head_delay: LabeledFamily, + + /// Chain reorg depth in slots. + #[metrics(buckets = &REORG_DEPTH_BUCKETS, labels = ["addr"])] + pub sse_chain_reorg_depth: LabeledFamily, + + /// Block reception via gossip delay in seconds. + #[metrics(buckets = &BLOCK_BUCKETS, labels = ["addr"])] + pub sse_block_gossip: LabeledFamily, + + /// Block imported into fork choice delay in seconds. + #[metrics(buckets = &BLOCK_BUCKETS, labels = ["addr"])] + pub sse_block: LabeledFamily, +} + +/// Global metrics for the beacon node SSE listener. +#[vise::register] +pub static SSE_METRICS: Global = Global::new(); diff --git a/crates/app/src/sse/mod.rs b/crates/app/src/sse/mod.rs new file mode 100644 index 00000000..ffaf45bc --- /dev/null +++ b/crates/app/src/sse/mod.rs @@ -0,0 +1,855 @@ +//! Beacon node Server-Sent-Events (SSE) listener. +//! +//! Subscribes to a beacon node's `/eth/v1/events` stream and processes `head`, +//! `chain_reorg`, `block` and `block_gossip` events to export timing metrics +//! and notify subscribers of chain reorgs. +//! +//! The listener follows the actor model: a [`SseListenerBuilder`] wires up +//! subscriptions, [`SseListenerBuilder::build`] spawns a background actor (and +//! a reconnecting stream "pump") that live until a [`CancellationToken`] fires, +//! and the returned [`SseListenerHandle`] allows interacting with the running +//! actor. + +use std::time::Duration; + +use backon::{BackoffBuilder, ExponentialBuilder, Retryable}; +use chrono::{DateTime, Utc}; +use futures::StreamExt; +use tokio::sync; +use tokio_util::{future::FutureExt, sync::CancellationToken}; + +use pluto_eth2api::{BeaconNodeEvent, EthBeaconNodeApiClient, EventstreamRequestQueryTopic}; + +use crate::sse::{ + metrics::SSE_METRICS, + types::{ + BLOCK_EVENT, BLOCK_GOSSIP_EVENT, BlockEventData, BlockGossipEventData, CHAIN_REORG_EVENT, + ChainReorgEventData, HEAD_EVENT, HeadEventData, SseEvent, + }, +}; + +pub mod metrics; +pub mod types; + +/// Default buffer size for the channels used by the listener. +const CHANNEL_BUFFER_SIZE: usize = 1024; + +/// Base delay between SSE reconnection attempts. +const DEFAULT_RETRY: Duration = Duration::from_secs(1); + +/// Topics the listener subscribes to. +const TOPICS: [EventstreamRequestQueryTopic; 4] = [ + EventstreamRequestQueryTopic::Head, + EventstreamRequestQueryTopic::ChainReorg, + EventstreamRequestQueryTopic::BlockGossip, + EventstreamRequestQueryTopic::Block, +]; + +/// Errors that can occur while setting up or running the SSE listener. +#[derive(Debug, thiserror::Error)] +pub enum SseListenerError { + /// Beacon Node API client error. + #[error("Error while fetching data from the Eth2 API: {0}")] + EthBeaconNodeApiClientError(#[from] pluto_eth2api::EthBeaconNodeApiClientError), + + /// The underlying SSE listener actor has been terminated. + #[error("SSE listener actor has been terminated")] + Terminated, +} + +type Result = std::result::Result; + +/// A builder for the SSE listener. +/// +/// Allows setting up chain reorg subscriptions before the listener is started. +/// The listener is started by calling [`SseListenerBuilder::build`]. +pub struct SseListenerBuilder { + // TODO: Prefer to use a `broadcast` channel here to simplify the subscription management. + // Requires revisiting the potential subscribers. + reorg_subs: Vec>, +} + +impl SseListenerBuilder { + /// Constructs a new [`SseListenerBuilder`] with no subscriptions. + pub fn new() -> Self { + SseListenerBuilder { + reorg_subs: Vec::new(), + } + } + + /// Subscribes to chain reorg events, returning the receiving end of a + /// channel that yields the (deduplicated) reorg epochs. + /// + /// The returned receiver can be passed directly to consumers such as the + /// scheduler's `with_chain_reorgs`. + pub fn subscribe_chain_reorg(&mut self) -> sync::mpsc::Receiver { + let (tx, rx) = sync::mpsc::channel(CHANNEL_BUFFER_SIZE); + self.reorg_subs.push(tx); + rx + } + + /// Starts the SSE listener in the background. + /// + /// Blocks until the beacon node's genesis time and slot configuration have + /// been fetched (retrying on failure), then spawns the actor and a + /// reconnecting stream pump that run until `ct` is cancelled. + pub async fn build( + self, + client: EthBeaconNodeApiClient, + ct: CancellationToken, + ) -> Result { + let (genesis_time, slot_duration, slots_per_epoch) = fetch_config(&client) + .with_cancellation_token(&ct) + .await + .ok_or(SseListenerError::Terminated)??; + + let addr = client.base_url.to_string(); + + let actor = SseListenerActor { + addr: addr.clone(), + genesis_time, + slot_duration, + slots_per_epoch, + last_reorg_epoch: 0, + reorg_subs: self.reorg_subs, + }; + + let (events_tx, events_rx) = sync::mpsc::channel(CHANNEL_BUFFER_SIZE); + let (msg_tx, msg_rx) = sync::mpsc::channel(CHANNEL_BUFFER_SIZE); + + tokio::spawn(run_pump(client, addr, events_tx, ct.clone())); + tokio::spawn(actor.run(events_rx, msg_rx, ct)); + + Ok(SseListenerHandle { sender: msg_tx }) + } +} + +impl Default for SseListenerBuilder { + fn default() -> Self { + Self::new() + } +} + +/// Messages sent to the [`SseListenerActor`]. +enum SseListenerMessage { + /// Subscribe to chain reorg events at runtime; the actor replies with the + /// receiving end of a freshly created channel. + SubscribeChainReorg { + resp: sync::oneshot::Sender>, + }, +} + +/// A handle to interact with the SSE listener actor. +/// +/// Cloning the handle is cheap and allows sending messages to the actor from +/// multiple tasks. +#[derive(Clone)] +pub struct SseListenerHandle { + sender: sync::mpsc::Sender, +} + +impl SseListenerHandle { + /// Subscribes to chain reorg events at runtime, returning the receiving end + /// of a channel that yields the (deduplicated) reorg epochs. + pub async fn subscribe_chain_reorg(&self) -> Result> { + let (tx, rx) = sync::oneshot::channel(); + self.sender + .send(SseListenerMessage::SubscribeChainReorg { resp: tx }) + .await + .map_err(|_| SseListenerError::Terminated)?; + + rx.await.map_err(|_| SseListenerError::Terminated) + } +} + +struct SseListenerActor { + addr: String, + + // Immutable network configuration. + genesis_time: DateTime, + slot_duration: Duration, + slots_per_epoch: u64, + + last_reorg_epoch: u64, + reorg_subs: Vec>, +} + +impl SseListenerActor { + async fn run( + mut self, + mut events_rx: sync::mpsc::Receiver, + mut msg_rx: sync::mpsc::Receiver, + ct: CancellationToken, + ) { + loop { + tokio::select! { + biased; + + _ = ct.cancelled() => break, + + Some(msg) = msg_rx.recv() => match msg { + SseListenerMessage::SubscribeChainReorg { resp } => { + let (tx, rx) = sync::mpsc::channel(CHANNEL_BUFFER_SIZE); + self.reorg_subs.push(tx); + let _ = resp.send(rx); + } + }, + + event = events_rx.recv() => match event { + Some(event) => self.handle_event(&event), + // The pump dropped its sender (e.g. it returned early or + // panicked). Stop instead of parking forever on a disabled + // branch with no events and no reconnection. + None => { + tracing::error!(addr = %self.addr, "SSE event channel closed; stopping listener"); + break; + } + }, + } + } + } + + fn handle_event(&mut self, event: &SseEvent) { + match event.topic.as_str() { + HEAD_EVENT => self.handle_head(event), + CHAIN_REORG_EVENT => self.handle_chain_reorg(event), + BLOCK_GOSSIP_EVENT => self.handle_block_gossip(event), + BLOCK_EVENT => self.handle_block(event), + _ => {} + } + } + + fn handle_head(&self, event: &SseEvent) { + let head: HeadEventData = match serde_json::from_str(&event.data) { + Ok(head) => head, + Err(err) => { + tracing::warn!(err = ?err, addr = %self.addr, topic = HEAD_EVENT, "Failed to parse SSE event"); + return; + } + }; + let slot = head.slot; + + // The chain's head is updated once a majority of the chain votes for a + // block, which realistically happens between 2/3 and 3/3 of the slot. + let window = + chrono::Duration::from_std(self.slot_duration).unwrap_or(chrono::Duration::MAX); + let (delay, ok) = self.compute_delay(slot, event.timestamp, |delay| delay < window); + let delay_s = delay_secs(delay); + + if ok { + SSE_METRICS.sse_head_delay[&self.addr].observe(delay_s); + } else { + tracing::debug!(addr = %self.addr, slot, delay_s, "Beacon node received head event too late"); + } + + SSE_METRICS.sse_head_slot[&self.addr].set(slot); + + tracing::debug!( + addr = %self.addr, + slot, + delay_s, + block = %head.block, + prev_ddr = %head.previous_duty_dependent_root, + curr_ddr = %head.current_duty_dependent_root, + "SSE head event" + ); + } + + fn handle_chain_reorg(&mut self, event: &SseEvent) { + let reorg: ChainReorgEventData = match serde_json::from_str(&event.data) { + Ok(reorg) => reorg, + Err(err) => { + tracing::warn!(err = ?err, addr = %self.addr, topic = CHAIN_REORG_EVENT, "Failed to parse SSE event"); + return; + } + }; + if reorg.slot < reorg.depth { + tracing::warn!(addr = %self.addr, slot = reorg.slot, depth = reorg.depth, "Invalid chain reorg event: depth exceeds slot"); + return; + } + + // `slot >= depth` is guaranteed above and `slots_per_epoch` is non-zero + // (validated by `fetch_slots_config`). + let reorg_epoch = reorg + .slot + .checked_sub(reorg.depth) + .expect("slot >= depth") + .checked_div(self.slots_per_epoch) + .expect("non-zero slots per epoch"); + self.notify_chain_reorg(reorg_epoch); + + tracing::debug!( + addr = %self.addr, + slot = reorg.slot, + epoch = reorg.epoch, + reorg_epoch, + depth = reorg.depth, + old_head_block = %reorg.old_head_block, + new_head_block = %reorg.new_head_block, + "SSE chain reorg event" + ); + + // Reorg depths fit comfortably in a `u32`; `f64::from` is lossless. + let depth_f64 = f64::from(u32::try_from(reorg.depth).unwrap_or(u32::MAX)); + SSE_METRICS.sse_chain_reorg_depth[&self.addr].observe(depth_f64); + } + + fn handle_block_gossip(&self, event: &SseEvent) { + let gossip: BlockGossipEventData = match serde_json::from_str(&event.data) { + Ok(gossip) => gossip, + Err(err) => { + tracing::warn!(err = ?err, addr = %self.addr, topic = BLOCK_GOSSIP_EVENT, "Failed to parse SSE event"); + return; + } + }; + let slot = gossip.slot; + + // A block should be received via gossip between 0/3 and 1/3 of the slot. + let third = self.slot_duration.checked_div(3).expect("non-zero divisor"); + let window = chrono::Duration::from_std(third).unwrap_or(chrono::Duration::MAX); + let (delay, ok) = self.compute_delay(slot, event.timestamp, |delay| delay < window); + let delay_s = delay_secs(delay); + + if !ok { + tracing::debug!(addr = %self.addr, slot, delay_s, "Beacon node received block_gossip event too late"); + } + + tracing::debug!(addr = %self.addr, slot, delay_s, block = %gossip.block, "SSE block gossip event"); + + SSE_METRICS.sse_block_gossip[&self.addr].observe(delay_s); + } + + fn handle_block(&self, event: &SseEvent) { + let block: BlockEventData = match serde_json::from_str(&event.data) { + Ok(block) => block, + Err(err) => { + tracing::warn!(err = ?err, addr = %self.addr, topic = BLOCK_EVENT, "Failed to parse SSE event"); + return; + } + }; + let slot = block.slot; + + // A block should be imported into fork choice between 0/3 and 1/3 of the + // slot. + let third = self.slot_duration.checked_div(3).expect("non-zero divisor"); + let window = chrono::Duration::from_std(third).unwrap_or(chrono::Duration::MAX); + let (delay, ok) = self.compute_delay(slot, event.timestamp, |delay| delay < window); + let delay_s = delay_secs(delay); + + if !ok { + tracing::debug!(addr = %self.addr, slot, delay_s, "Beacon node received block event too late"); + } + + tracing::debug!(addr = %self.addr, slot, delay_s, block = %block.block, "SSE block event"); + + SSE_METRICS.sse_block[&self.addr].observe(delay_s); + } + + /// Notifies subscribers of a chain reorg, deduplicating consecutive events + /// for the same epoch. Subscribers whose receiver has been dropped are + /// pruned. + fn notify_chain_reorg(&mut self, epoch: u64) { + if epoch == self.last_reorg_epoch { + return; + } + self.last_reorg_epoch = epoch; + + let addr = &self.addr; + self.reorg_subs.retain(|tx| match tx.try_send(epoch) { + Ok(()) => true, + Err(sync::mpsc::error::TrySendError::Full(_)) => { + tracing::warn!(addr = %addr, epoch, "Chain reorg subscriber lagging, dropping event"); + true + } + Err(sync::mpsc::error::TrySendError::Closed(_)) => false, + }); + } + + /// Computes the delay between the start of `slot` and the event timestamp, + /// reporting whether it falls within the expected window. + fn compute_delay( + &self, + slot: u64, + event_ts: DateTime, + delay_ok: impl Fn(chrono::Duration) -> bool, + ) -> (chrono::Duration, bool) { + // Slot times are small in practice (slot duration is a few whole + // seconds), so saturate on the unreachable overflow. + let slot = i64::try_from(slot).unwrap_or(i64::MAX); + let ms_per_slot = i64::try_from(self.slot_duration.as_millis()).unwrap_or(i64::MAX); + let offset = chrono::Duration::milliseconds(slot.saturating_mul(ms_per_slot)); + let slot_start = self + .genesis_time + .checked_add_signed(offset) + .unwrap_or(event_ts); + let delay = event_ts.signed_duration_since(slot_start); + + (delay, delay_ok(delay)) + } +} + +/// Fetches the genesis time, slot duration and slots per epoch, retrying on +/// failure. +async fn fetch_config(client: &EthBeaconNodeApiClient) -> Result<(DateTime, Duration, u64)> { + let genesis_time = (|| client.fetch_genesis_time()) + .retry(fast_backoff()) + .notify(|err, _| tracing::error!(err = ?err, "Failure fetching genesis time")) + .await?; + + let (slot_duration, slots_per_epoch) = (|| client.fetch_slots_config()) + .retry(fast_backoff()) + .notify(|err, _| tracing::error!(err = ?err, "Failure fetching slots config")) + .await?; + + Ok((genesis_time, slot_duration, slots_per_epoch)) +} + +/// Outcome of a single SSE stream connection. +enum StreamOutcome { + /// The stream ended cleanly (server closed the connection). `productive` + /// is true if at least one event was forwarded before the close. + Ended { productive: bool }, + /// A connection or read error occurred; the caller should back off. + /// `productive` is true if at least one event was forwarded before the + /// error. + Error { productive: bool }, + /// The actor's event channel was closed; the pump should stop. + ChannelClosed, + /// The cancellation token fired. + Cancelled, +} + +/// Connects to the beacon node SSE stream and reconnects with exponential +/// backoff until the cancellation token fires or the actor goes away. +async fn run_pump( + client: EthBeaconNodeApiClient, + addr: String, + events_tx: sync::mpsc::Sender, + ct: CancellationToken, +) { + let mut backoff = reconnect_backoff().build(); + + loop { + match stream_once(&client, &addr, &events_tx, &ct).await { + StreamOutcome::Cancelled | StreamOutcome::ChannelClosed => break, + StreamOutcome::Ended { productive } | StreamOutcome::Error { productive } => { + // Reset the backoff only after a productive connection (one that + // forwarded at least one event). Otherwise a server that accepts + // and immediately closes the connection — or fails to connect — + // would drive a tight reconnect loop with no rate limiting. + if productive { + backoff = reconnect_backoff().build(); + } + let delay = backoff + .next() + .expect("reconnect backoff is configured without a retry limit"); + tokio::select! { + biased; + _ = ct.cancelled() => break, + _ = tokio::time::sleep(delay) => {} + } + } + } + } + + tracing::debug!(addr = %addr, "SSE pump stopped"); +} + +/// Opens a single SSE connection and forwards events into `events_tx` until the +/// stream ends, errors, the channel closes, or the token is cancelled. +async fn stream_once( + client: &EthBeaconNodeApiClient, + addr: &str, + events_tx: &sync::mpsc::Sender, + ct: &CancellationToken, +) -> StreamOutcome { + tracing::debug!(addr = %addr, "Connecting to SSE stream"); + + let stream = match client.event_stream(&TOPICS).await { + Ok(stream) => stream, + Err(err) => { + tracing::warn!(err = %err, addr = %addr, "Failed to connect to SSE stream"); + return StreamOutcome::Error { productive: false }; + } + }; + futures::pin_mut!(stream); + + let mut productive = false; + loop { + tokio::select! { + biased; + + _ = ct.cancelled() => return StreamOutcome::Cancelled, + + item = stream.next() => match item { + None => return StreamOutcome::Ended { productive }, + Some(Err(err)) => { + tracing::warn!(err = %err, addr = %addr, "SSE stream read error"); + return StreamOutcome::Error { productive }; + } + Some(Ok(BeaconNodeEvent { topic, data })) => { + if data.is_empty() { + continue; + } + + let event = SseEvent { topic, data, timestamp: Utc::now() }; + if events_tx.send(event).await.is_err() { + return StreamOutcome::ChannelClosed; + } + productive = true; + } + }, + } + } +} + +// TODO: Extract these backoff configurations into a shared module. + +/// Backoff used while waiting for the beacon node configuration. +fn fast_backoff() -> ExponentialBuilder { + ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(100)) + .with_max_delay(Duration::from_secs(5)) + .with_factor(1.6) + .without_max_times() + .with_jitter() +} + +/// Backoff used between SSE reconnection attempts. +fn reconnect_backoff() -> ExponentialBuilder { + ExponentialBuilder::default() + .with_min_delay(DEFAULT_RETRY) + .with_max_delay(DEFAULT_RETRY.checked_mul(2).expect("within range")) + .with_factor(1.6) + .without_max_times() + .with_jitter() +} + +/// Returns the delay in fractional seconds for metrics and logging. +/// +/// Histogram values are non-negative seconds; a negative delay (an event +/// observed before the slot start, e.g. due to clock skew) is clamped to zero. +fn delay_secs(delay: chrono::Duration) -> f64 { + delay + .to_std() + .map(|delay| delay.as_secs_f64()) + .unwrap_or(0.0) +} + +#[cfg(test)] +mod tests { + use super::*; + + const SLOT_DURATION: Duration = Duration::from_secs(12); + const SLOTS_PER_EPOCH: u64 = 32; + + /// Ethereum mainnet genesis time. + fn genesis() -> DateTime { + DateTime::from_timestamp(1_606_824_023, 0).expect("valid timestamp") + } + + fn test_actor(reorg_subs: Vec>) -> SseListenerActor { + SseListenerActor { + addr: "test".to_string(), + genesis_time: genesis(), + slot_duration: SLOT_DURATION, + slots_per_epoch: SLOTS_PER_EPOCH, + last_reorg_epoch: 0, + reorg_subs, + } + } + + fn event(topic: &str, data: &str, timestamp: DateTime) -> SseEvent { + SseEvent { + topic: topic.to_string(), + data: data.to_string(), + timestamp, + } + } + + /// Returns the wall-clock time `offset_secs` into the given slot. + fn slot_time(slot: u64, offset_secs: i64) -> DateTime { + let slot = i64::try_from(slot).unwrap(); + let per_slot = i64::try_from(SLOT_DURATION.as_secs()).unwrap(); + let secs = slot + .checked_mul(per_slot) + .unwrap() + .checked_add(offset_secs) + .unwrap(); + genesis() + .checked_add_signed(chrono::Duration::seconds(secs)) + .unwrap() + } + + #[test] + fn compute_delay_inside_and_outside_window() { + let actor = test_actor(vec![]); + let slot = 10; + let window = chrono::Duration::from_std(SLOT_DURATION).unwrap(); + + let (delay, ok) = actor.compute_delay(slot, slot_time(slot, 5), |d| d < window); + assert_eq!(delay, chrono::Duration::seconds(5)); + assert!(ok); + + let (delay, ok) = actor.compute_delay(slot, slot_time(slot, 13), |d| d < window); + assert_eq!(delay, chrono::Duration::seconds(13)); + assert!(!ok); + } + + #[test] + fn chain_reorg_notifies_and_dedups() { + let (tx, mut rx) = sync::mpsc::channel(8); + let mut actor = test_actor(vec![tx]); + + // slot 64, depth 0 => reorg_epoch = 64 / 32 = 2. + let data = r#"{"slot":"64","depth":"0","epoch":"2","old_head_block":"0xaa","new_head_block":"0xbb"}"#; + actor.handle_event(&event(CHAIN_REORG_EVENT, data, genesis())); + assert_eq!(rx.try_recv().unwrap(), 2); + + // Same epoch again => deduplicated, no notification. + actor.handle_event(&event(CHAIN_REORG_EVENT, data, genesis())); + assert!(rx.try_recv().is_err()); + + // New epoch => notified. + let data = r#"{"slot":"96","depth":"0","epoch":"3"}"#; + actor.handle_event(&event(CHAIN_REORG_EVENT, data, genesis())); + assert_eq!(rx.try_recv().unwrap(), 3); + } + + #[test] + fn chain_reorg_epoch_zero_first_event_is_deduped() { + // Parity with Charon: `last_reorg_epoch` starts at 0, so a first reorg at + // epoch 0 is treated as a duplicate and not notified. + let (tx, mut rx) = sync::mpsc::channel(8); + let mut actor = test_actor(vec![tx]); + + let data = r#"{"slot":"0","depth":"0","epoch":"0"}"#; + actor.handle_event(&event(CHAIN_REORG_EVENT, data, genesis())); + assert!(rx.try_recv().is_err()); + } + + #[test] + fn chain_reorg_depth_exceeding_slot_is_ignored() { + let (tx, mut rx) = sync::mpsc::channel(8); + let mut actor = test_actor(vec![tx]); + + let data = r#"{"slot":"5","depth":"10","epoch":"0"}"#; + actor.handle_event(&event(CHAIN_REORG_EVENT, data, genesis())); + assert!(rx.try_recv().is_err()); + } + + #[test] + fn malformed_payload_is_skipped_and_processing_continues() { + let (tx, mut rx) = sync::mpsc::channel(8); + let mut actor = test_actor(vec![tx]); + + // Malformed JSON is logged and skipped, not propagated as a failure. + actor.handle_event(&event(CHAIN_REORG_EVENT, "{not json", genesis())); + assert!(rx.try_recv().is_err()); + + // A subsequent valid event is still processed. + let data = r#"{"slot":"64","depth":"0","epoch":"2"}"#; + actor.handle_event(&event(CHAIN_REORG_EVENT, data, genesis())); + assert_eq!(rx.try_recv().unwrap(), 2); + } + + #[test] + fn handles_all_event_types_without_panicking() { + let (tx, mut rx) = sync::mpsc::channel(8); + let mut actor = test_actor(vec![tx]); + let ts = slot_time(64, 5); + + actor.handle_event(&event( + HEAD_EVENT, + r#"{"slot":"64","block":"0xabc","previous_duty_dependent_root":"0x01","current_duty_dependent_root":"0x02"}"#, + ts, + )); + actor.handle_event(&event( + BLOCK_GOSSIP_EVENT, + r#"{"slot":"64","block":"0xabc"}"#, + ts, + )); + actor.handle_event(&event(BLOCK_EVENT, r#"{"slot":"64","block":"0xabc"}"#, ts)); + actor.handle_event(&event("unknown_topic", "{}", ts)); + + // None of the above are chain reorgs, so nothing is notified. + assert!(rx.try_recv().is_err()); + } + + #[tokio::test] + async fn run_loop_forwards_events_and_stops_on_cancellation() { + let ct = CancellationToken::new(); + let (events_tx, events_rx) = sync::mpsc::channel(8); + let (_msg_tx, msg_rx) = sync::mpsc::channel(8); + let (reorg_tx, mut reorg_rx) = sync::mpsc::channel(8); + + let actor = test_actor(vec![reorg_tx]); + let handle = tokio::spawn(actor.run(events_rx, msg_rx, ct.clone())); + + let data = r#"{"slot":"64","depth":"0","epoch":"2"}"#; + events_tx + .send(event(CHAIN_REORG_EVENT, data, genesis())) + .await + .unwrap(); + + assert_eq!(reorg_rx.recv().await.unwrap(), 2); + + ct.cancel(); + handle.await.unwrap(); + } + + #[tokio::test] + async fn dynamic_subscription_via_handle() { + let ct = CancellationToken::new(); + let (events_tx, events_rx) = sync::mpsc::channel(8); + let (msg_tx, msg_rx) = sync::mpsc::channel(8); + + let actor = test_actor(vec![]); + tokio::spawn(actor.run(events_rx, msg_rx, ct.clone())); + + let handle = SseListenerHandle { sender: msg_tx }; + let mut reorg_rx = handle.subscribe_chain_reorg().await.unwrap(); + + let data = r#"{"slot":"64","depth":"0","epoch":"2"}"#; + events_tx + .send(event(CHAIN_REORG_EVENT, data, genesis())) + .await + .unwrap(); + + assert_eq!(reorg_rx.recv().await.unwrap(), 2); + + ct.cancel(); + } + + const HEAD_EVENT_BODY: &str = "event: head\ndata: {\"slot\":\"10\"}\n\n"; + + /// Starts a mock beacon node serving the given SSE body (and status) at + /// `/eth/v1/events` and returns a client pointed at it. The returned + /// `MockServer` must be kept alive for the duration of the test. + async fn mock_sse(status: u16, body: &str) -> (wiremock::MockServer, EthBeaconNodeApiClient) { + use wiremock::{ + Mock, ResponseTemplate, + matchers::{method, path}, + }; + + let server = wiremock::MockServer::start().await; + Mock::given(method("GET")) + .and(path("/eth/v1/events")) + .respond_with( + ResponseTemplate::new(status).set_body_raw(body.to_owned(), "text/event-stream"), + ) + .mount(&server) + .await; + + let client = EthBeaconNodeApiClient::with_base_url(server.uri()).expect("valid url"); + (server, client) + } + + #[tokio::test] + async fn run_loop_stops_when_event_channel_closes() { + // The pump dropping its sender must stop the actor, not leave it parked + // forever with no events and no reconnection (the token is never fired). + let ct = CancellationToken::new(); + let (events_tx, events_rx) = sync::mpsc::channel(8); + let (_msg_tx, msg_rx) = sync::mpsc::channel(8); + + let actor = test_actor(vec![]); + let handle = tokio::spawn(actor.run(events_rx, msg_rx, ct.clone())); + + drop(events_tx); + + tokio::time::timeout(Duration::from_secs(5), handle) + .await + .expect("actor did not stop after the event channel closed") + .expect("actor task panicked"); + assert!( + !ct.is_cancelled(), + "actor stopped on its own, without cancellation" + ); + } + + #[tokio::test] + async fn stream_once_forwards_event_and_reports_productive() { + let (_server, client) = mock_sse(200, HEAD_EVENT_BODY).await; + let (events_tx, mut events_rx) = sync::mpsc::channel(8); + let ct = CancellationToken::new(); + + let outcome = stream_once(&client, "test", &events_tx, &ct).await; + + assert!(matches!(outcome, StreamOutcome::Ended { productive: true })); + let event = events_rx.try_recv().expect("event forwarded"); + assert_eq!(event.topic, HEAD_EVENT); + } + + #[tokio::test] + async fn stream_once_reports_unproductive_on_immediate_eof() { + // A connection that accepts and immediately closes with no events must + // report `productive: false` so the pump backs off instead of looping. + let (_server, client) = mock_sse(200, "").await; + let (events_tx, _events_rx) = sync::mpsc::channel(8); + let ct = CancellationToken::new(); + + let outcome = stream_once(&client, "test", &events_tx, &ct).await; + + assert!(matches!( + outcome, + StreamOutcome::Ended { productive: false } + )); + } + + #[tokio::test] + async fn stream_once_reports_error_on_non_success_status() { + let (_server, client) = mock_sse(500, "").await; + let (events_tx, _events_rx) = sync::mpsc::channel(8); + let ct = CancellationToken::new(); + + let outcome = stream_once(&client, "test", &events_tx, &ct).await; + + assert!(matches!( + outcome, + StreamOutcome::Error { productive: false } + )); + } + + #[tokio::test] + async fn stream_once_returns_cancelled_when_token_fires() { + let (_server, client) = mock_sse(200, HEAD_EVENT_BODY).await; + let (events_tx, _events_rx) = sync::mpsc::channel(8); + let ct = CancellationToken::new(); + ct.cancel(); + + let outcome = stream_once(&client, "test", &events_tx, &ct).await; + + assert!(matches!(outcome, StreamOutcome::Cancelled)); + } + + #[tokio::test] + async fn stream_once_returns_channel_closed_when_receiver_dropped() { + let (_server, client) = mock_sse(200, HEAD_EVENT_BODY).await; + let (events_tx, events_rx) = sync::mpsc::channel(8); + drop(events_rx); + let ct = CancellationToken::new(); + + let outcome = stream_once(&client, "test", &events_tx, &ct).await; + + assert!(matches!(outcome, StreamOutcome::ChannelClosed)); + } + + #[tokio::test] + async fn run_pump_forwards_events_and_stops_on_cancellation() { + let (_server, client) = mock_sse(200, HEAD_EVENT_BODY).await; + let (events_tx, mut events_rx) = sync::mpsc::channel(8); + let ct = CancellationToken::new(); + + let pump = tokio::spawn(run_pump(client, "test".to_string(), events_tx, ct.clone())); + + let event = events_rx.recv().await.expect("event forwarded"); + assert_eq!(event.topic, HEAD_EVENT); + + ct.cancel(); + tokio::time::timeout(Duration::from_secs(5), pump) + .await + .expect("pump did not stop after cancellation") + .expect("pump task panicked"); + } +} diff --git a/crates/app/src/sse/types.rs b/crates/app/src/sse/types.rs new file mode 100644 index 00000000..7c2e4594 --- /dev/null +++ b/crates/app/src/sse/types.rs @@ -0,0 +1,93 @@ +//! Event payloads and topics for the beacon node SSE stream. +//! +//! Only the fields consumed by the listener (for metrics, reorg detection, and +//! debug logging) are modelled; serde ignores any other fields the beacon node +//! sends. `#[serde(default)]` keeps deserialization lenient when a field is +//! absent. Numeric fields are sent by the beacon node as quoted strings and are +//! parsed directly into integers via [`serde_with::DisplayFromStr`]. + +use chrono::{DateTime, Utc}; +use serde_with::{DisplayFromStr, serde_as}; + +/// SSE topic for `head` events. +pub const HEAD_EVENT: &str = "head"; +/// SSE topic for `chain_reorg` events. +pub const CHAIN_REORG_EVENT: &str = "chain_reorg"; +/// SSE topic for `block_gossip` events. +pub const BLOCK_GOSSIP_EVENT: &str = "block_gossip"; +/// SSE topic for `block` events. +pub const BLOCK_EVENT: &str = "block"; + +/// A raw SSE event handed from the stream source to the listener actor. +/// +/// `timestamp` is captured when the event is received (production) or set +/// explicitly (tests), and is used to compute the per-event delay. +#[derive(Debug, Clone)] +pub struct SseEvent { + /// The event topic (the SSE `event:` field). + pub topic: String, + /// The raw, unparsed JSON `data` payload. + pub data: String, + /// The time the event was received. + pub timestamp: DateTime, +} + +/// Payload of a `head` event. +#[serde_as] +#[derive(Debug, Clone, Default, serde::Deserialize)] +#[serde(default)] +pub struct HeadEventData { + /// The slot of the new head. + #[serde_as(as = "DisplayFromStr")] + pub slot: u64, + /// The block root of the new head. + pub block: String, + /// The previous duty dependent root. + pub previous_duty_dependent_root: String, + /// The current duty dependent root. + pub current_duty_dependent_root: String, +} + +/// Payload of a `chain_reorg` event. +#[serde_as] +#[derive(Debug, Clone, Default, serde::Deserialize)] +#[serde(default)] +pub struct ChainReorgEventData { + /// The slot at which the reorg occurred. + #[serde_as(as = "DisplayFromStr")] + pub slot: u64, + /// The depth of the reorg in slots. + #[serde_as(as = "DisplayFromStr")] + pub depth: u64, + /// The epoch at which the reorg occurred. + #[serde_as(as = "DisplayFromStr")] + pub epoch: u64, + /// The block root of the old head. + pub old_head_block: String, + /// The block root of the new head. + pub new_head_block: String, +} + +/// Payload of a `block_gossip` event. +#[serde_as] +#[derive(Debug, Clone, Default, serde::Deserialize)] +#[serde(default)] +pub struct BlockGossipEventData { + /// The slot of the gossiped block. + #[serde_as(as = "DisplayFromStr")] + pub slot: u64, + /// The block root. + pub block: String, +} + +/// Payload of a `block` event. +#[serde_as] +#[derive(Debug, Clone, Default, serde::Deserialize)] +#[serde(default)] +pub struct BlockEventData { + /// The slot of the imported block. + #[serde_as(as = "DisplayFromStr")] + pub slot: u64, + /// The block root. + pub block: String, +} diff --git a/crates/eth2api/Cargo.toml b/crates/eth2api/Cargo.toml index fbc5c298..42d12519 100644 --- a/crates/eth2api/Cargo.toml +++ b/crates/eth2api/Cargo.toml @@ -15,10 +15,12 @@ ignored = ["bon", "http", "oas3-gen-support", "regex", "reqwest", "validator"] anyhow.workspace = true async-trait.workspace = true bon.workspace = true +eventsource-stream.workspace = true +futures.workspace = true http.workspace = true oas3-gen-support.workspace = true regex.workspace = true -reqwest.workspace = true +reqwest = { workspace = true, features = ["stream"] } serde_json.workspace = true serde_with.workspace = true serde.workspace = true diff --git a/crates/eth2api/src/extensions.rs b/crates/eth2api/src/extensions.rs index fc08e939..14d82278 100644 --- a/crates/eth2api/src/extensions.rs +++ b/crates/eth2api/src/extensions.rs @@ -1,8 +1,11 @@ use crate::{ - ConsensusVersion, EthBeaconNodeApiClient, GetGenesisRequest, GetGenesisResponse, - GetGenesisResponseResponseData, GetSpecRequest, GetSpecResponse, ValidatorStatus, spec::phase0, + ConsensusVersion, EthBeaconNodeApiClient, EventstreamRequestQueryTopic, GetGenesisRequest, + GetGenesisResponse, GetGenesisResponseResponseData, GetSpecRequest, GetSpecResponse, + ValidatorStatus, spec::phase0, }; use chrono::{DateTime, Utc}; +use eventsource_stream::Eventsource; +use futures::{Stream, StreamExt}; use std::{collections::HashMap, time}; use tree_hash::TreeHash; @@ -34,6 +37,26 @@ pub enum EthBeaconNodeApiClientError { /// Domain type not found in the beacon spec response #[error("Domain type not found: {0}")] DomainTypeNotFound(String), + + /// Error while opening the beacon node SSE event stream (request send or + /// non-success status). + #[error("Event stream request error: {0}")] + EventStreamRequest(#[from] reqwest::Error), + + /// Error while reading from the beacon node SSE event stream. + #[error("Event stream read error: {0}")] + EventStreamRead(#[from] eventsource_stream::EventStreamError), +} + +/// A single Server-Sent Event from a beacon node: the event topic (the SSE +/// `event:` field, e.g. `head` or `chain_reorg`) and its raw, unparsed JSON +/// `data` payload. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BeaconNodeEvent { + /// The SSE event topic. + pub topic: String, + /// The raw JSON data payload. + pub data: String, } const FORKS: [ConsensusVersion; 6] = [ @@ -360,6 +383,57 @@ impl EthBeaconNodeApiClient { epoch, )) } + + /// Subscribes to the beacon node SSE stream (`GET /eth/v1/events`) for the + /// given topics. + /// + /// Unlike the generated [`Self::eventstream`], the returned stream + /// preserves each event's topic and yields its raw JSON `data` + /// unparsed, so callers can dispatch on the topic and deserialize the + /// payload themselves. + pub async fn event_stream( + &self, + topics: &[EventstreamRequestQueryTopic], + ) -> Result< + impl Stream> + Send, + EthBeaconNodeApiClientError, + > { + let mut url = self.base_url.clone(); + url.path_segments_mut() + .map_err(|()| { + EthBeaconNodeApiClientError::RequestError(anyhow::anyhow!( + "base URL cannot be a base" + )) + })? + .push("eth") + .push("v1") + .push("events"); + + // Topics are sent as repeated `topics=` query pairs. + let query: Vec<(&str, String)> = topics + .iter() + .map(|topic| ("topics", topic.to_string())) + .collect(); + + let response = self + .client + .get(url) + .query(&query) + .header(reqwest::header::ACCEPT, "text/event-stream") + .send() + .await? + .error_for_status()?; + + let stream = response.bytes_stream().eventsource().map(|item| { + item.map(|event| BeaconNodeEvent { + topic: event.event, + data: event.data, + }) + .map_err(EthBeaconNodeApiClientError::EventStreamRead) + }); + + Ok(stream) + } } #[cfg(test)] @@ -452,4 +526,47 @@ mod tests { ) ); } + + #[tokio::test] + async fn event_stream_preserves_topic_and_raw_data() { + use crate::EventstreamRequestQueryTopic; + use futures::StreamExt; + use wiremock::{ + Mock, MockServer, ResponseTemplate, + matchers::{method, path}, + }; + + let server = MockServer::start().await; + + let body = "event: head\ndata: {\"slot\":\"10\"}\n\n\ + event: chain_reorg\ndata: {\"slot\":\"20\",\"depth\":\"2\"}\n\n"; + + Mock::given(method("GET")) + .and(path("/eth/v1/events")) + .respond_with(ResponseTemplate::new(200).set_body_raw(body, "text/event-stream")) + .mount(&server) + .await; + + let client = EthBeaconNodeApiClient::with_base_url(server.uri()).expect("valid url"); + let stream = client + .event_stream(&[ + EventstreamRequestQueryTopic::Head, + EventstreamRequestQueryTopic::ChainReorg, + ]) + .await + .expect("open stream"); + futures::pin_mut!(stream); + + let first = stream.next().await.expect("first event").expect("ok event"); + assert_eq!(first.topic, "head"); + assert_eq!(first.data, r#"{"slot":"10"}"#); + + let second = stream + .next() + .await + .expect("second event") + .expect("ok event"); + assert_eq!(second.topic, "chain_reorg"); + assert_eq!(second.data, r#"{"slot":"20","depth":"2"}"#); + } }