Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::Notify;

use clap::Parser;
use ethlambda_blockchain::key_manager::ValidatorKeyPair;
Expand Down Expand Up @@ -204,21 +205,44 @@ async fn main() -> eyre::Result<()> {
})
.inspect_err(|err| error!(%err, "Failed to send InitBlockChain — actors not wired"))?;

tokio::spawn(async move {
let _ = ethlambda_rpc::start_metrics_server(metrics_socket)
let shutdown_notify = Arc::new(Notify::new());
Comment on lines 207 to +208
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a CancellationToken here?

let metrics_shutdown = shutdown_notify.clone();
let api_shutdown = shutdown_notify.clone();

let metrics_handle = tokio::spawn(async move {
let shutdown_future = async move {
metrics_shutdown.notified().await;
};
let _ = ethlambda_rpc::start_metrics_server(metrics_socket, shutdown_future)
.await
.inspect_err(|err| error!(%err, "Metrics server failed"));
});
tokio::spawn(async move {
let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator)
let api_handle = tokio::spawn(async move {
let shutdown_future = async move {
api_shutdown.notified().await;
};
let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator, shutdown_future)
.await
.inspect_err(|err| error!(%err, "API server failed"));
});

info!("Node initialized");

tokio::signal::ctrl_c().await.ok();
println!("Shutting down...");
info!("Shutdown signal received, stopping actors and servers...");

let blockchain_ref = blockchain.actor_ref().clone();
let p2p_ref = p2p.actor_ref().clone();
blockchain_ref.context().stop();
p2p_ref.context().stop();
shutdown_notify.notify_waiters();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 notify_waiters() can silently drop shutdown signals

Notify::notify_waiters() only wakes futures that are currently registered as waiters at the moment of the call — it does not store a permit for futures that haven't been polled yet. The shutdown futures inside the spawned tasks (async move { X.notified().await }) don't register themselves until axum polls the shutdown future for the first time, which happens asynchronously after tokio::spawn. If ctrl_c fires before the spawned tasks have had a chance to run, both notifications are silently dropped and api_handle.await / metrics_handle.await will block indefinitely — the process never exits.

The clearest fix is two separate Arc<Notify> handles each called with notify_one(), which stores a permit that survives until consumed:

let api_shutdown_notify = Arc::new(Notify::new());
let metrics_shutdown_notify = Arc::new(Notify::new());
// ... pass clones into spawned tasks ...
api_shutdown_notify.notify_one();
metrics_shutdown_notify.notify_one();

Alternatively, a tokio::sync::watch channel with a stored value avoids this race entirely for any number of listeners.

Prompt To Fix With AI
This is a comment left during a code review.
Path: bin/ethlambda/src/main.rs
Line: 238

Comment:
**`notify_waiters()` can silently drop shutdown signals**

`Notify::notify_waiters()` only wakes futures that are *currently registered* as waiters at the moment of the call — it does not store a permit for futures that haven't been polled yet. The shutdown futures inside the spawned tasks (`async move { X.notified().await }`) don't register themselves until axum polls the shutdown future for the first time, which happens asynchronously after `tokio::spawn`. If `ctrl_c` fires before the spawned tasks have had a chance to run, both notifications are silently dropped and `api_handle.await` / `metrics_handle.await` will block indefinitely — the process never exits.

The clearest fix is two separate `Arc<Notify>` handles each called with `notify_one()`, which stores a permit that survives until consumed:

```rust
let api_shutdown_notify = Arc::new(Notify::new());
let metrics_shutdown_notify = Arc::new(Notify::new());
// ... pass clones into spawned tasks ...
api_shutdown_notify.notify_one();
metrics_shutdown_notify.notify_one();
```

Alternatively, a `tokio::sync::watch` channel with a stored value avoids this race entirely for any number of listeners.

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is shy I suggested CancellationToken, since it was made specifically for these cases


blockchain_ref.join().await;
p2p_ref.join().await;
let _ = api_handle.await;
let _ = metrics_handle.await;
Comment on lines +240 to +243
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 No timeout on shutdown awaits

All four await calls — blockchain_ref.join(), p2p_ref.join(), api_handle.await, and metrics_handle.await — are unbounded. If any actor gets stuck in a long-running handler or the HTTP servers hold open long-lived connections, the process will hang indefinitely. Consider wrapping the shutdown sequence in tokio::time::timeout so the node can force-exit after a reasonable deadline (e.g., 30 s) even if something stalls.

Prompt To Fix With AI
This is a comment left during a code review.
Path: bin/ethlambda/src/main.rs
Line: 240-243

Comment:
**No timeout on shutdown awaits**

All four `await` calls — `blockchain_ref.join()`, `p2p_ref.join()`, `api_handle.await`, and `metrics_handle.await` — are unbounded. If any actor gets stuck in a long-running handler or the HTTP servers hold open long-lived connections, the process will hang indefinitely. Consider wrapping the shutdown sequence in `tokio::time::timeout` so the node can force-exit after a reasonable deadline (e.g., 30 s) even if something stalls.

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be better to do "1 ctrl+C to stop, N ctrl+C to abort immediately" instead. We can tackle that in another PR, if we find it necessary


info!("Shutdown complete");

Ok(())
}
Expand Down
14 changes: 11 additions & 3 deletions crates/net/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,31 @@ pub async fn start_api_server(
address: SocketAddr,
store: Store,
aggregator: AggregatorController,
shutdown: impl std::future::Future<Output = ()> + Send + 'static,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to use the specific type here, instead of generics

) -> Result<(), std::io::Error> {
let api_router = build_api_router(store).layer(Extension(aggregator));

let listener = tokio::net::TcpListener::bind(address).await?;
axum::serve(listener, api_router).await?;
axum::serve(listener, api_router)
.with_graceful_shutdown(shutdown)
.await?;

Ok(())
}

pub async fn start_metrics_server(address: SocketAddr) -> Result<(), std::io::Error> {
pub async fn start_metrics_server(
address: SocketAddr,
shutdown: impl std::future::Future<Output = ()> + Send + 'static,
) -> Result<(), std::io::Error> {
let metrics_router = metrics::start_prometheus_metrics_api();
let debug_router = build_debug_router();

let app = Router::new().merge(metrics_router).merge(debug_router);

let listener = tokio::net::TcpListener::bind(address).await?;
axum::serve(listener, app).await?;
axum::serve(listener, app)
.with_graceful_shutdown(shutdown)
.await?;

Ok(())
}
Expand Down
Loading