From fce5c1dfb8cdfad730682f4b7bb8679730ca3ee8 Mon Sep 17 00:00:00 2001 From: dicethedev Date: Tue, 28 Apr 2026 05:29:13 +0100 Subject: [PATCH] fix: stop genservers gracefully --- bin/ethlambda/src/main.rs | 34 +++++++++++++++++++++++++++++----- crates/net/rpc/src/lib.rs | 14 +++++++++++--- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 3c3f816..a52edab 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -16,6 +16,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; +use tokio::sync::Notify; use clap::Parser; use ethlambda_blockchain::key_manager::ValidatorKeyPair; @@ -204,13 +205,23 @@ async fn main() -> eyre::Result<()> { }) .inspect_err(|err| error!(%err, "Failed to send InitBlockChain — actors not wired"))?; - tokio::spawn(async move { - let _ = ethlambda_rpc::start_metrics_server(metrics_socket) + let shutdown_notify = Arc::new(Notify::new()); + let metrics_shutdown = shutdown_notify.clone(); + let api_shutdown = shutdown_notify.clone(); + + let metrics_handle = tokio::spawn(async move { + let shutdown_future = async move { + metrics_shutdown.notified().await; + }; + let _ = ethlambda_rpc::start_metrics_server(metrics_socket, shutdown_future) .await .inspect_err(|err| error!(%err, "Metrics server failed")); }); - tokio::spawn(async move { - let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator) + let api_handle = tokio::spawn(async move { + let shutdown_future = async move { + api_shutdown.notified().await; + }; + let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator, shutdown_future) .await .inspect_err(|err| error!(%err, "API server failed")); }); @@ -218,7 +229,20 @@ async fn main() -> eyre::Result<()> { info!("Node initialized"); tokio::signal::ctrl_c().await.ok(); - println!("Shutting down..."); + info!("Shutdown signal received, stopping actors and servers..."); + + let blockchain_ref = blockchain.actor_ref().clone(); + let p2p_ref = p2p.actor_ref().clone(); + blockchain_ref.context().stop(); + p2p_ref.context().stop(); + shutdown_notify.notify_waiters(); + + blockchain_ref.join().await; + p2p_ref.join().await; + let _ = api_handle.await; + let _ = metrics_handle.await; + + info!("Shutdown complete"); Ok(()) } diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index acec7fa..21f031b 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -20,23 +20,31 @@ pub async fn start_api_server( address: SocketAddr, store: Store, aggregator: AggregatorController, + shutdown: impl std::future::Future + Send + 'static, ) -> Result<(), std::io::Error> { let api_router = build_api_router(store).layer(Extension(aggregator)); let listener = tokio::net::TcpListener::bind(address).await?; - axum::serve(listener, api_router).await?; + axum::serve(listener, api_router) + .with_graceful_shutdown(shutdown) + .await?; Ok(()) } -pub async fn start_metrics_server(address: SocketAddr) -> Result<(), std::io::Error> { +pub async fn start_metrics_server( + address: SocketAddr, + shutdown: impl std::future::Future + Send + 'static, +) -> Result<(), std::io::Error> { let metrics_router = metrics::start_prometheus_metrics_api(); let debug_router = build_debug_router(); let app = Router::new().merge(metrics_router).merge(debug_router); let listener = tokio::net::TcpListener::bind(address).await?; - axum::serve(listener, app).await?; + axum::serve(listener, app) + .with_graceful_shutdown(shutdown) + .await?; Ok(()) }