Skip to content

Commit 4c2b676

Browse files
authored
Merge pull request #24 from input-output-hk/sg/more-graceful-shutdown
Support more graceful shutdown
2 parents d14774a + 9da1094 commit 4c2b676

File tree

2 files changed

+41
-16
lines changed

2 files changed

+41
-16
lines changed

process/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Caraytid process - loads and runs modules
22
[package]
33
name = "caryatid_process"
4-
version = "0.13.1"
4+
version = "0.14.0"
55
edition = "2021"
66
authors = ["Paul Clark <paul.clark@iohk.io>"]
77
description = "Library for building a Caryatid process"

process/src/process.rs

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use caryatid_sdk::{Context, MessageBounds, MessageBus, Module, ModuleRegistry};
88
use config::Config;
99
use std::collections::HashMap;
1010
use std::sync::Arc;
11-
use tokio::signal::unix::{signal, SignalKind};
1211
use tokio::sync::watch::Sender;
12+
use tokio::task::JoinHandle;
1313
use tracing::{error, info, warn};
1414

1515
mod in_memory_bus;
@@ -102,7 +102,18 @@ impl<M: MessageBounds> Process<M> {
102102
}
103103

104104
/// Run the process
105-
pub async fn run(&self) -> Result<()> {
105+
pub async fn run(self) -> Result<()> {
106+
let process = self.start().await?;
107+
108+
// Wait for SIGINT
109+
tokio::signal::ctrl_c().await?;
110+
111+
info!("SIGINT received. Shutting down...");
112+
113+
process.stop().await
114+
}
115+
116+
pub async fn start(self) -> Result<RunningProcess<M>> {
106117
info!("Initialising...");
107118

108119
let mut monitor = None;
@@ -148,9 +159,7 @@ impl<M: MessageBounds> Process<M> {
148159

149160
info!("Running...");
150161

151-
if let Some(monitor) = monitor {
152-
tokio::spawn(monitor.monitor());
153-
}
162+
let monitor_task = monitor.map(|m| tokio::spawn(m.monitor()));
154163

155164
// Send the startup message if required
156165
let _ = self.context.startup_watch.send(true);
@@ -162,16 +171,10 @@ impl<M: MessageBounds> Process<M> {
162171
.unwrap_or_else(|e| error!("Failed to publish: {e}"));
163172
}
164173

165-
// Wait for SIGTERM
166-
let mut sigterm = signal(SignalKind::terminate()).expect("Can't set signal");
167-
sigterm.recv().await;
168-
169-
info!("SIGTERM received. Shutting down...");
170-
171-
// Shutdown the message bus and all subscriptions (before losing modules)
172-
let _ = self.context.message_bus.shutdown().await;
173-
174-
Ok(())
174+
Ok(RunningProcess {
175+
context: self.context,
176+
monitor: monitor_task,
177+
})
175178
}
176179
}
177180

@@ -183,3 +186,25 @@ impl<M: MessageBounds> ModuleRegistry<M> for Process<M> {
183186
self.modules.insert(name.to_string(), module.clone());
184187
}
185188
}
189+
190+
pub struct RunningProcess<M: MessageBounds> {
191+
/// Global context
192+
context: GlobalContext<M>,
193+
194+
/// A handle to the monitor task, if one is running
195+
monitor: Option<JoinHandle<()>>,
196+
}
197+
198+
impl<M: MessageBounds> RunningProcess<M> {
199+
/// Gracefully stop the process.
200+
pub async fn stop(self) -> Result<()> {
201+
// Shutdown the message bus and all subscriptions (before losing modules)
202+
let _ = self.context.message_bus.shutdown().await;
203+
204+
if let Some(monitor) = self.monitor {
205+
monitor.abort();
206+
}
207+
208+
Ok(())
209+
}
210+
}

0 commit comments

Comments
 (0)