Skip to content

Commit 9da1094

Browse files
committed
fix: less abusable API
1 parent ecac0ad commit 9da1094

File tree

1 file changed

+29
-21
lines changed

1 file changed

+29
-21
lines changed

process/src/process.rs

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ pub struct Process<M: MessageBounds> {
3434

3535
/// Active modules by name
3636
modules: HashMap<String, Arc<dyn Module<M>>>,
37-
38-
/// A handle to the monitor task, if one is running
39-
monitor: Option<JoinHandle<()>>,
4037
}
4138

4239
impl<M: MessageBounds> Process<M> {
@@ -101,23 +98,22 @@ impl<M: MessageBounds> Process<M> {
10198
config,
10299
context,
103100
modules: HashMap::new(),
104-
monitor: None,
105101
}
106102
}
107103

108104
/// Run the process
109-
pub async fn run(mut self) -> Result<()> {
110-
self.start().await?;
105+
pub async fn run(self) -> Result<()> {
106+
let process = self.start().await?;
111107

112108
// Wait for SIGINT
113109
tokio::signal::ctrl_c().await?;
114110

115111
info!("SIGINT received. Shutting down...");
116112

117-
self.stop().await
113+
process.stop().await
118114
}
119115

120-
pub async fn start(&mut self) -> Result<()> {
116+
pub async fn start(self) -> Result<RunningProcess<M>> {
121117
info!("Initialising...");
122118

123119
let mut monitor = None;
@@ -163,9 +159,7 @@ impl<M: MessageBounds> Process<M> {
163159

164160
info!("Running...");
165161

166-
if let Some(monitor) = monitor {
167-
self.monitor = Some(tokio::spawn(monitor.monitor()));
168-
}
162+
let monitor_task = monitor.map(|m| tokio::spawn(m.monitor()));
169163

170164
// Send the startup message if required
171165
let _ = self.context.startup_watch.send(true);
@@ -177,9 +171,32 @@ impl<M: MessageBounds> Process<M> {
177171
.unwrap_or_else(|e| error!("Failed to publish: {e}"));
178172
}
179173

180-
Ok(())
174+
Ok(RunningProcess {
175+
context: self.context,
176+
monitor: monitor_task,
177+
})
178+
}
179+
}
180+
181+
/// Module registry implementation
182+
impl<M: MessageBounds> ModuleRegistry<M> for Process<M> {
183+
/// Register a module
184+
fn register(&mut self, module: Arc<dyn Module<M>>) {
185+
let name = module.get_name();
186+
self.modules.insert(name.to_string(), module.clone());
181187
}
188+
}
182189

190+
pub struct RunningProcess<M: MessageBounds> {
191+
/// Global context
192+
context: GlobalContext<M>,
193+
194+
/// A handle to the monitor task, if one is running
195+
monitor: Option<JoinHandle<()>>,
196+
}
197+
198+
impl<M: MessageBounds> RunningProcess<M> {
199+
/// Gracefully stop the process.
183200
pub async fn stop(self) -> Result<()> {
184201
// Shutdown the message bus and all subscriptions (before losing modules)
185202
let _ = self.context.message_bus.shutdown().await;
@@ -191,12 +208,3 @@ impl<M: MessageBounds> Process<M> {
191208
Ok(())
192209
}
193210
}
194-
195-
/// Module registry implementation
196-
impl<M: MessageBounds> ModuleRegistry<M> for Process<M> {
197-
/// Register a module
198-
fn register(&mut self, module: Arc<dyn Module<M>>) {
199-
let name = module.get_name();
200-
self.modules.insert(name.to_string(), module.clone());
201-
}
202-
}

0 commit comments

Comments
 (0)