Skip to content

feat(app): beacon node SSE listener#494

Open
emlautarom1-agent[bot] wants to merge 16 commits into
mainfrom
emlautarom1/app-sse
Open

feat(app): beacon node SSE listener#494
emlautarom1-agent[bot] wants to merge 16 commits into
mainfrom
emlautarom1/app-sse

Conversation

@emlautarom1-agent

@emlautarom1-agent emlautarom1-agent Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Closes #183

Summary

Ports Charon's app/sse to Pluto: an actor-model listener that subscribes to a beacon node's /eth/v1/events stream, exports timing metrics for head/block/block_gossip events, and notifies subscribers of chain_reorg epochs. The reorg Receiver<u64> is designed to feed scheduler.with_chain_reorgs directly.

A SseListenerBuilder wires up subscriptions; build() fetches the beacon config (with retry), spawns the actor plus a reconnecting stream "pump" under a CancellationToken, and returns a cloneable SseListenerHandle for runtime subscription.

Differences from Charon

  1. Single beacon node. Charon fans the listener across every configured beacon address; Pluto runs against one EthBeaconNodeApiClient (matching the scheduler). Metrics still carry an addr label for parity.
  2. Log-and-continue on malformed payloads. Charon returns an error from the event handler, which tears down the SSE connection and stops the listener for that node until restart. We log a warning and keep processing — one bad frame can't kill the listener.
  3. Topic-preserving event fetch. The generated eventstream discards the SSE topic and parses data as a JSON string (which fails on the object payloads). Added eth2api::event_stream, which yields {topic, data} so events can be dispatched by topic.
  4. last_reorg_epoch starts at 0 (parity with Charon's zero-value Epoch): a first-ever reorg at epoch 0 is deduped away. Kept for parity.

Out of scope

CLI flags / run-command wiring — the scheduler itself isn't wired into a run command yet. The pieces are shaped so the reorg receiver plugs straight into the scheduler when that wiring lands.

Inline comments on the diff flag the TODOs and the parity-sensitive spots for reviewers.

emlautarom1-agent Bot and others added 10 commits June 18, 2026 19:03
The generated eventstream method returns EventStream<String>, which
discards the SSE event topic and fails to deserialize the beacon node's
JSON-object payloads. Add a hand-written event_stream(topics) on
EthBeaconNodeApiClient that yields BeaconNodeEvent { topic, data } with
the raw payload preserved, so callers can dispatch by topic.
Add an actor-model SSE listener under crates/app/src/sse that subscribes
to a beacon node's /eth/v1/events stream (head, chain_reorg, block,
block_gossip), exports timing metrics, and notifies subscribers of chain
reorgs.

A reconnecting pump task forwards beacon events into the actor over an
mpsc channel; the actor owns all state and processes events single-
threaded. SseListenerBuilder::subscribe_chain_reorg returns an
mpsc::Receiver<u64> that plugs directly into the scheduler's
with_chain_reorgs, and the cloneable SseListenerHandle allows dynamic
subscription at runtime. Both builder and listener live until a
CancellationToken fires.
The reconnect backoff is built with without_max_times, so next() never
returns None. Use expect to surface a logic error instead of silently
falling back to DEFAULT_RETRY.
Compute each delay window inline where it is used instead of routing
through a single-use to_chrono helper.
Deserialize numeric fields (slot, depth) directly into u64 via
serde_with::DisplayFromStr instead of decoding them as strings and
re-parsing in a second step. This removes the parse_u64 helper and makes
each handler parse its payload in one pass. Malformed payloads are
logged (with the topic) and skipped.
Resolved per review: keep the mpsc Vec of reorg subscribers (returns an
mpsc::Receiver that plugs directly into the scheduler) and keep the local
backoff helpers (consistent with the existing scheduler/bootnode pattern).
The slot > i64::MAX guard mirrored Charon, where it exists to avoid a
uint64->int64 wrap in its duration math. compute_delay already saturates
the conversion, so the guard only skipped slots no real beacon node can
emit; remove it.
Use the parsed reorg fields directly instead of binding intermediate
slot/depth locals.
Parse each event payload inline in its handler and log a warning on
failure at the call site, instead of hiding it inside parse_payload.
Makes the discard-on-malformed behavior visible. (Charon propagates the
error and tears down the SSE connection; we log and continue, as a
deliberate robustness choice.)

@emlautarom1-agent emlautarom1-agent Bot left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline notes on the TODOs and parity-sensitive spots for reviewers.

Comment thread crates/app/src/sse/mod.rs
Comment on lines +72 to +74
// TODO: Prefer to use a `broadcast` channel here to simplify the subscription management.
// Requires revisiting the potential subscribers.
reorg_subs: Vec<sync::mpsc::Sender<u64>>,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO (open question). Subscribers are tracked as a Vec<mpsc::Sender<u64>> and fanned out manually. A broadcast channel would remove the bookkeeping (dedup/prune), but it changes the receiver type the scheduler consumes, so it needs a look at who actually subscribes first. Kept as mpsc for now.

Comment thread crates/app/src/sse/mod.rs
genesis_time,
slot_duration,
slots_per_epoch,
last_reorg_epoch: 0,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Charon parity quirk. last_reorg_epoch starts at 0 to match Charon's zero-value Epoch. Consequence: a first-ever reorg at epoch 0 is treated as a duplicate and not emitted. Intentionally kept for parity (covered by a test).

Comment thread crates/app/src/sse/mod.rs
Comment on lines +221 to +227
let head: HeadEventData = match serde_json::from_str(&event.data) {
Ok(head) => head,
Err(err) => {
tracing::warn!(err = ?err, addr = %self.addr, topic = HEAD_EVENT, "Failed to parse SSE event");
return;
}
};

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Differs from Charon. On a malformed payload Charon returns an error, which tears down the SSE connection and stops the listener for that node until restart. We log a warning and continue, so one bad frame can't kill the listener. This pattern is repeated in the other three handlers.

Comment thread crates/app/src/sse/mod.rs
Comment on lines +356 to +363
self.reorg_subs.retain(|tx| match tx.try_send(epoch) {
Ok(()) => true,
Err(sync::mpsc::error::TrySendError::Full(_)) => {
tracing::warn!(addr = %addr, epoch, "Chain reorg subscriber lagging, dropping event");
true
}
Err(sync::mpsc::error::TrySendError::Closed(_)) => false,
});

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fan-out uses non-blocking try_send so a slow/dead subscriber never stalls the actor: closed receivers are pruned, and a full buffer logs a lag warning but keeps the subscriber. Reorgs are rare, so back-pressure is a non-issue.

Comment thread crates/app/src/sse/mod.rs
}
}

// TODO: Extract these backoff configurations into a shared module.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO. The two backoff builders here duplicate config that likely belongs in a shared module (the scheduler has a similar fast_backoff). Left as-is to keep this PR scoped.

Comment on lines +389 to +393
pub async fn event_stream(
&self,
topics: &[EventstreamRequestQueryTopic],
) -> Result<
impl Stream<Item = Result<BeaconNodeEvent, EthBeaconNodeApiClientError>> + Send,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New API (why not the generated eventstream). The generated client discards the SSE event: topic and parses data as a JSON string, which fails on the beacon node's object payloads. event_stream yields raw {topic, data} pairs so the listener can dispatch by topic and parse each payload itself.

@emlautarom1 emlautarom1 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Manually checked, comments are accurate, all good.

@varex83

varex83 commented Jun 19, 2026

Copy link
Copy Markdown
Collaborator

@varex83agent /review-pr

@varex83agent varex83agent left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: beacon node SSE listener

A clean, well-documented port of Charon's app/sse. The actor model is a good fit, metric names/buckets/labels match Charon exactly (verified against v1.7.1 app/sse/metrics.go), the reorg-epoch dedup and last_reorg_epoch=0 behavior are faithful, and the event_stream extension is a sensible fix for the topic-discarding generated method. Test coverage of the handler logic is solid.

Three things should be addressed before merge:

  1. Tight reconnect loop on clean/immediate disconnect (run_pump). StreamOutcome::Ended resets the backoff to fresh and reconnects with zero delay. Any clean EOF — a beacon node or fronting proxy that accepts and immediately closes the connection with no events — drives a connect→EOF→reset→reconnect busy-loop with no rate limiting, saturating a core and hammering the beacon node. Only the Error arm sleeps.
  2. Unbounded SSE event buffering → OOM (event_stream). bytes_stream().eventsource() (eventsource-stream 0.2) buffers a single event with no size cap; a beacon node that streams an endless data: field (or never sends the terminating blank line) grows memory until OOM. The mpsc(100) backpressure only bounds completed-event count, not single-event size.
  3. Zombie actor if the pump exits without cancelling ct. If run_pump ever panics (the .expect() paths) or returns early, events_tx drops, the Some(event) = events_rx.recv() arm goes permanently disabled, and the actor runs idle forever with no reconnection and no signal — "metrics just stopped." Consider tying the two task lifetimes (a child token, or having the pump cancel ct on exit).

The .expect() arithmetic on beacon-node-derived values is correctly guarded today, and the biased select does NOT busy-loop on channel close (the still-Pending arms keep it parked) — both verified.

Remaining items are minor (metrics-parity divergences worth documenting: negative-delay clamp, non-2xx-retries-vs-Charon-stops; EventStream(String) dropping the error source; reorg-drop on slow subscribers; epoch: String typing; reconnection-path test gap) and a couple of nits. Requesting changes for the three items above.

Comment thread crates/app/src/sse/mod.rs Outdated
Comment thread crates/eth2api/src/extensions.rs
Comment thread crates/app/src/sse/mod.rs Outdated
Comment thread crates/app/src/sse/mod.rs
Comment thread crates/eth2api/src/extensions.rs Outdated
Comment thread crates/app/src/sse/mod.rs
Comment thread crates/app/src/sse/types.rs Outdated
Comment thread crates/app/src/sse/mod.rs
Comment thread crates/app/src/sse/mod.rs
Comment thread crates/app/src/sse/mod.rs

@varex83 varex83 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall! One thing to keep in mind is the capacity of the channels, because if it will be too small it will drop old elements on capacity reached

Comment thread crates/app/src/sse/mod.rs
}

/// Messages sent to the [`SseListenerActor`].
enum SseListenerMessage {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this enum here? Will we extend it in the future? If not, it would be good to simplify it to the struct

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't extend it but it's for consistency across actors: they all accept *Message enum and dispatch actions based on the actual message.

emlautarom1-agent Bot and others added 6 commits June 19, 2026 16:59
Reset the reconnect backoff only after a connection that forwarded at
least one event. Previously a clean EOF (StreamOutcome::Ended) reset the
backoff and reconnected with zero delay, so a beacon node — or a proxy in
front of it — that accepts and immediately closes the connection drove a
tight connect/EOF/reconnect loop with no rate limiting. Both the Ended
and Error arms now back off; only a productive connection resets it.
The actor's select arm used `Some(event) = events_rx.recv()`, so when the
pump dropped its sender the branch became permanently disabled and the
actor parked forever on cancellation only — no events, no reconnection,
no signal. Match the channel explicitly and break with an error log on
closure so the listener shuts down instead of zombie-ing.
Replace the flat EventStream(String) variant — which dropped the
source() chain, HTTP status and reqwest error classification — with two
typed variants: EventStreamRequest(#[from] reqwest::Error) for the
request send / non-success status, and EventStreamRead(#[from]
EventStreamError) for stream read errors. The URL-cannot-be-base case
reuses the existing RequestError(anyhow) variant. Callers can now inspect
and propagate the real cause.
The epoch field was a raw String while its siblings slot/depth are parsed
to u64 via DisplayFromStr. Parse it the same way for consistency; the
debug log records it as an integer like slot.
Add tests for the reconnection state machine that previously had no
coverage:
- stream_once reports productive vs unproductive connections (the
  backoff-reset signal), and returns Error/Cancelled/ChannelClosed for
  the respective conditions, exercised against a wiremock beacon node.
- run_pump forwards events and stops on cancellation.
- the actor stops (rather than zombie-ing) when its event channel
  closes while the cancellation token is still live.
@emlautarom1

Copy link
Copy Markdown
Collaborator

@varex83 Considering the type of events, these should happen every ~12s (Eth Mainnet) so I think we're fine; I've increased the default channel size to 1024 just to be sure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement app/sse

3 participants