From d04c596262bc9478f272f206a0f6523aff555e0f Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Tue, 7 Apr 2026 14:09:02 -0300 Subject: [PATCH 1/4] feat: add ChildHandle and ActorId for type-erased actor management --- concurrency/src/child_handle.rs | 256 +++++++++++++++++++++++++++++++ concurrency/src/lib.rs | 2 + concurrency/src/tasks/actor.rs | 29 ++++ concurrency/src/threads/actor.rs | 30 ++++ 4 files changed, 317 insertions(+) create mode 100644 concurrency/src/child_handle.rs diff --git a/concurrency/src/child_handle.rs b/concurrency/src/child_handle.rs new file mode 100644 index 0000000..daa7703 --- /dev/null +++ b/concurrency/src/child_handle.rs @@ -0,0 +1,256 @@ +use crate::error::ExitReason; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; + +// --------------------------------------------------------------------------- +// ActorId +// --------------------------------------------------------------------------- + +static NEXT_ACTOR_ID: AtomicU64 = AtomicU64::new(1); + +/// Unique identity for an actor instance. Used by monitors and links to +/// identify actors without needing the concrete actor type. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ActorId(u64); + +impl ActorId { + pub(crate) fn next() -> Self { + Self(NEXT_ACTOR_ID.fetch_add(1, Ordering::Relaxed)) + } +} + +impl std::fmt::Display for ActorId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ActorId({})", self.0) + } +} + +// --------------------------------------------------------------------------- +// Completion — abstraction over tasks/threads completion signals +// --------------------------------------------------------------------------- + +/// Type-erased completion signal. Wraps the mode-specific completion mechanism +/// so `ChildHandle` works uniformly across tasks and threads. +#[derive(Clone)] +pub(crate) enum Completion { + /// Tasks mode: watch channel carrying Option. + Watch(spawned_rt::tasks::watch::Receiver>), + /// Threads mode: Mutex + Condvar carrying Option. + Condvar(Arc<(Mutex>, Condvar)>), +} + +// --------------------------------------------------------------------------- +// ChildHandle +// --------------------------------------------------------------------------- + +/// Type-erased cancel function. Wraps the mode-specific cancellation token. +type CancelFn = Arc; + +/// Type-erased handle to a running actor. Provides lifecycle operations +/// (stop, liveness check, exit reason) without knowing the actor's concrete type. +/// +/// Obtained via `ChildHandle::from(actor_ref)`. +/// +/// Unlike `ActorRef`, a `ChildHandle` cannot send messages — it only +/// provides supervision-related operations (stop, wait, monitor, link). +#[derive(Clone)] +pub struct ChildHandle { + id: ActorId, + cancel: CancelFn, + completion: Completion, +} + +impl std::fmt::Debug for ChildHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ChildHandle") + .field("id", &self.id) + .finish_non_exhaustive() + } +} + +impl ChildHandle { + /// Create a ChildHandle for tasks mode. + pub(crate) fn from_tasks( + id: ActorId, + cancellation_token: spawned_rt::tasks::CancellationToken, + completion_rx: spawned_rt::tasks::watch::Receiver>, + ) -> Self { + Self { + id, + cancel: Arc::new(move || cancellation_token.cancel()), + completion: Completion::Watch(completion_rx), + } + } + + /// Create a ChildHandle for threads mode. + pub(crate) fn from_threads( + id: ActorId, + cancellation_token: spawned_rt::threads::CancellationToken, + completion: Arc<(Mutex>, Condvar)>, + ) -> Self { + Self { + id, + cancel: Arc::new(move || cancellation_token.cancel()), + completion: Completion::Condvar(completion), + } + } + + /// The actor's unique identity. + pub fn id(&self) -> ActorId { + self.id + } + + /// Signal the actor to stop. The actor will finish its current handler, + /// run `stopped()`, and exit. + pub fn stop(&self) { + (self.cancel)(); + } + + /// Returns `true` if the actor is still running. + pub fn is_alive(&self) -> bool { + self.exit_reason().is_none() + } + + /// Poll the exit reason. Returns `None` if the actor is still running. + pub fn exit_reason(&self) -> Option { + match &self.completion { + Completion::Watch(rx) => rx.borrow().clone(), + Completion::Condvar(completion) => { + let (lock, _) = &**completion; + let guard = lock.lock().unwrap_or_else(|p| p.into_inner()); + guard.clone() + } + } + } + + /// Block until the actor stops and return the exit reason. + /// + /// **Warning:** In tasks mode, this blocks the calling thread (not async). + /// Use `wait_exit_async()` from async code instead. + pub fn wait_exit_blocking(&self) -> ExitReason { + match &self.completion { + Completion::Watch(rx) => { + let mut rx = rx.clone(); + spawned_rt::threads::block_on(async move { + loop { + if let Some(reason) = rx.borrow_and_update().clone() { + return reason; + } + if rx.changed().await.is_err() { + return ExitReason::Kill; + } + } + }) + } + Completion::Condvar(completion) => { + let (lock, cvar) = &**completion; + let mut guard = lock.lock().unwrap_or_else(|p| p.into_inner()); + loop { + if let Some(reason) = guard.clone() { + return reason; + } + guard = cvar.wait(guard).unwrap_or_else(|p| p.into_inner()); + } + } + } + } + + /// Async wait until the actor stops and return the exit reason. + /// Only usable from async code (tasks mode). + pub async fn wait_exit_async(&self) -> ExitReason { + match &self.completion { + Completion::Watch(rx) => { + let mut rx = rx.clone(); + loop { + if let Some(reason) = rx.borrow_and_update().clone() { + return reason; + } + if rx.changed().await.is_err() { + return ExitReason::Kill; + } + } + } + Completion::Condvar(_) => { + // Fallback: spawn a blocking task to avoid blocking the async runtime + let handle = self.clone(); + spawned_rt::tasks::spawn_blocking(move || handle.wait_exit_blocking()) + .await + .unwrap_or(ExitReason::Kill) + } + } + } +} + +impl PartialEq for ChildHandle { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for ChildHandle {} + +impl std::hash::Hash for ChildHandle { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn actor_id_is_unique() { + let a = ActorId::next(); + let b = ActorId::next(); + assert_ne!(a, b); + } + + #[test] + fn actor_id_display() { + let id = ActorId::next(); + let s = format!("{id}"); + assert!(s.starts_with("ActorId(")); + } + + #[test] + fn child_handle_from_threads_basics() { + let completion = Arc::new((Mutex::new(None), Condvar::new())); + let token = spawned_rt::threads::CancellationToken::new(); + let handle = ChildHandle::from_threads(ActorId::next(), token, completion.clone()); + + assert!(handle.is_alive()); + assert!(handle.exit_reason().is_none()); + + // Simulate actor completion + { + let (lock, cvar) = &*completion; + let mut guard = lock.lock().unwrap(); + *guard = Some(ExitReason::Normal); + cvar.notify_all(); + } + + assert!(!handle.is_alive()); + assert_eq!(handle.exit_reason(), Some(ExitReason::Normal)); + } + + #[test] + fn child_handle_stop() { + let completion = Arc::new((Mutex::new(None), Condvar::new())); + let token = spawned_rt::threads::CancellationToken::new(); + assert!(!token.is_cancelled()); + let handle = ChildHandle::from_threads(ActorId::next(), token.clone(), completion); + handle.stop(); + assert!(token.is_cancelled()); + } + + #[test] + fn child_handle_equality_by_id() { + let completion = Arc::new((Mutex::new(None), Condvar::new())); + let token = spawned_rt::threads::CancellationToken::new(); + let id = ActorId::next(); + let h1 = ChildHandle::from_threads(id, token.clone(), completion.clone()); + let h2 = ChildHandle::from_threads(id, token, completion); + assert_eq!(h1, h2); + } +} diff --git a/concurrency/src/lib.rs b/concurrency/src/lib.rs index d4230c0..e34000a 100644 --- a/concurrency/src/lib.rs +++ b/concurrency/src/lib.rs @@ -71,6 +71,7 @@ //! - `Recipient` (`Arc>`) for type-erased per-message references //! - [`tasks::Backend`] enum for choosing async runtime, blocking pool, or OS thread +pub mod child_handle; pub mod error; pub mod message; pub mod registry; @@ -78,6 +79,7 @@ pub mod response; pub mod tasks; pub mod threads; +pub use child_handle::{ActorId, ChildHandle}; pub use error::{ActorError, ExitReason}; pub use response::Response; pub use spawned_macros::{actor, protocol}; diff --git a/concurrency/src/tasks/actor.rs b/concurrency/src/tasks/actor.rs index 33f6cd6..70f2490 100644 --- a/concurrency/src/tasks/actor.rs +++ b/concurrency/src/tasks/actor.rs @@ -1,3 +1,4 @@ +use crate::child_handle::{ActorId, ChildHandle}; use crate::error::{panic_message, ActorError, ExitReason}; use crate::message::Message; use core::pin::pin; @@ -106,6 +107,7 @@ where /// /// Clone is cheap — it clones the inner channel sender and cancellation token. pub struct Context { + id: ActorId, sender: mpsc::Sender + Send>>, cancellation_token: CancellationToken, completion_rx: watch::Receiver>, @@ -114,6 +116,7 @@ pub struct Context { impl Clone for Context { fn clone(&self) -> Self { Self { + id: self.id, sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion_rx: self.completion_rx.clone(), @@ -132,6 +135,7 @@ impl Context { /// or stream listeners from outside the actor. pub fn from_ref(actor_ref: &ActorRef) -> Self { Self { + id: actor_ref.id, sender: actor_ref.sender.clone(), cancellation_token: actor_ref.cancellation_token.clone(), completion_rx: actor_ref.completion_rx.clone(), @@ -211,6 +215,7 @@ impl Context { /// Get an `ActorRef` from this context. pub fn actor_ref(&self) -> ActorRef { ActorRef { + id: self.id, sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion_rx: self.completion_rx.clone(), @@ -279,6 +284,7 @@ pub async fn request( /// To stop the actor, send an explicit shutdown message through your protocol, /// or call [`Context::stop`] from within a handler. pub struct ActorRef { + id: ActorId, sender: mpsc::Sender + Send>>, cancellation_token: CancellationToken, completion_rx: watch::Receiver>, @@ -293,6 +299,7 @@ impl Debug for ActorRef { impl Clone for ActorRef { fn clone(&self) -> Self { Self { + id: self.id, sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion_rx: self.completion_rx.clone(), @@ -391,6 +398,26 @@ impl ActorRef { } } } + + /// The actor's unique identity. + pub fn id(&self) -> ActorId { + self.id + } + + /// Get a type-erased `ChildHandle` for this actor. + pub fn child_handle(&self) -> ChildHandle { + ChildHandle::from(self.clone()) + } +} + +impl From> for ChildHandle { + fn from(actor_ref: ActorRef) -> Self { + ChildHandle::from_tasks( + actor_ref.id, + actor_ref.cancellation_token.clone(), + actor_ref.completion_rx.clone(), + ) + } } // Bridge: ActorRef implements Receiver for any M that A handles @@ -419,12 +446,14 @@ impl ActorRef { let (completion_tx, completion_rx) = watch::channel(None); let actor_ref = ActorRef { + id: ActorId::next(), sender: tx.clone(), cancellation_token: cancellation_token.clone(), completion_rx, }; let ctx = Context { + id: actor_ref.id, sender: tx, cancellation_token: cancellation_token.clone(), completion_rx: actor_ref.completion_rx.clone(), diff --git a/concurrency/src/threads/actor.rs b/concurrency/src/threads/actor.rs index 8038699..732a00b 100644 --- a/concurrency/src/threads/actor.rs +++ b/concurrency/src/threads/actor.rs @@ -8,6 +8,7 @@ use std::{ time::Duration, }; +use crate::child_handle::{ActorId, ChildHandle}; use crate::error::{panic_message, ActorError, ExitReason}; use crate::message::Message; @@ -74,6 +75,7 @@ where /// /// Clone is cheap — it clones the inner channel sender and cancellation token. pub struct Context { + id: ActorId, sender: mpsc::Sender + Send>>, cancellation_token: CancellationToken, completion: Arc<(Mutex>, Condvar)>, @@ -82,6 +84,7 @@ pub struct Context { impl Clone for Context { fn clone(&self) -> Self { Self { + id: self.id, sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion: self.completion.clone(), @@ -100,6 +103,7 @@ impl Context { /// or stream listeners from outside the actor. pub fn from_ref(actor_ref: &ActorRef) -> Self { Self { + id: actor_ref.id, sender: actor_ref.sender.clone(), cancellation_token: actor_ref.cancellation_token.clone(), completion: actor_ref.completion.clone(), @@ -178,6 +182,7 @@ impl Context { /// Get an `ActorRef` from this context. pub fn actor_ref(&self) -> ActorRef { ActorRef { + id: self.id, sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion: self.completion.clone(), @@ -261,6 +266,7 @@ impl Drop for CompletionGuard { /// To stop the actor, send an explicit shutdown message through your protocol, /// or call [`Context::stop`] from within a handler. pub struct ActorRef { + id: ActorId, sender: mpsc::Sender + Send>>, cancellation_token: CancellationToken, completion: Arc<(Mutex>, Condvar)>, @@ -275,6 +281,7 @@ impl Debug for ActorRef { impl Clone for ActorRef { fn clone(&self) -> Self { Self { + id: self.id, sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion: self.completion.clone(), @@ -373,6 +380,26 @@ impl ActorRef { completed = cvar.wait(completed).unwrap_or_else(|p| p.into_inner()); } } + + /// The actor's unique identity. + pub fn id(&self) -> ActorId { + self.id + } + + /// Get a type-erased `ChildHandle` for this actor. + pub fn child_handle(&self) -> ChildHandle { + ChildHandle::from(self.clone()) + } +} + +impl From> for ChildHandle { + fn from(actor_ref: ActorRef) -> Self { + ChildHandle::from_threads( + actor_ref.id, + actor_ref.cancellation_token.clone(), + actor_ref.completion.clone(), + ) + } } // Bridge: ActorRef implements Receiver for any M that A handles @@ -399,14 +426,17 @@ impl ActorRef { let (tx, rx) = mpsc::channel:: + Send>>(); let cancellation_token = CancellationToken::new(); let completion = Arc::new((Mutex::new(None), Condvar::new())); + let id = ActorId::next(); let actor_ref = ActorRef { + id, sender: tx.clone(), cancellation_token: cancellation_token.clone(), completion: completion.clone(), }; let ctx = Context { + id, sender: tx, cancellation_token: cancellation_token.clone(), completion: actor_ref.completion.clone(), From f9f52799e800a742713454ae1b388e91febec699 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Tue, 7 Apr 2026 15:03:51 -0300 Subject: [PATCH 2/4] fix: ChildHandle wait_exit_blocking inside tokio runtime, add integration tests --- concurrency/src/child_handle.rs | 162 ++++++++++++++++++++++++++++---- rt/src/tasks/mod.rs | 13 ++- rt/src/tasks/tokio/mod.rs | 4 +- 3 files changed, 158 insertions(+), 21 deletions(-) diff --git a/concurrency/src/child_handle.rs b/concurrency/src/child_handle.rs index daa7703..12695c7 100644 --- a/concurrency/src/child_handle.rs +++ b/concurrency/src/child_handle.rs @@ -123,24 +123,17 @@ impl ChildHandle { } } - /// Block until the actor stops and return the exit reason. + /// Block the calling thread until the actor stops and return the exit reason. /// - /// **Warning:** In tasks mode, this blocks the calling thread (not async). - /// Use `wait_exit_async()` from async code instead. + /// Safe to call from any context (sync or async). For the Watch variant, + /// uses the watch channel's built-in blocking recv. For Condvar, blocks + /// on the condvar directly. pub fn wait_exit_blocking(&self) -> ExitReason { match &self.completion { Completion::Watch(rx) => { - let mut rx = rx.clone(); - spawned_rt::threads::block_on(async move { - loop { - if let Some(reason) = rx.borrow_and_update().clone() { - return reason; - } - if rx.changed().await.is_err() { - return ExitReason::Kill; - } - } - }) + // wait_for_exit_watch_blocking is extracted to avoid holding the + // borrow on self across the loop + wait_for_exit_watch_blocking(&rx.clone()) } Completion::Condvar(completion) => { let (lock, cvar) = &**completion; @@ -156,7 +149,10 @@ impl ChildHandle { } /// Async wait until the actor stops and return the exit reason. - /// Only usable from async code (tasks mode). + /// + /// Works with both execution modes. For Watch (tasks-mode handles), awaits + /// the watch channel directly. For Condvar (threads-mode handles), delegates + /// to a blocking task via `spawn_blocking` to avoid blocking the async runtime. pub async fn wait_exit_async(&self) -> ExitReason { match &self.completion { Completion::Watch(rx) => { @@ -171,7 +167,6 @@ impl ChildHandle { } } Completion::Condvar(_) => { - // Fallback: spawn a blocking task to avoid blocking the async runtime let handle = self.clone(); spawned_rt::tasks::spawn_blocking(move || handle.wait_exit_blocking()) .await @@ -181,6 +176,37 @@ impl ChildHandle { } } +/// Blocking wait on a watch channel. Uses `block_in_place` if inside a tokio +/// runtime (safe from multi-threaded runtime), otherwise creates a temporary runtime. +fn wait_for_exit_watch_blocking( + rx: &spawned_rt::tasks::watch::Receiver>, +) -> ExitReason { + // Fast path: already done + if let Some(reason) = rx.borrow().clone() { + return reason; + } + + let mut rx = rx.clone(); + let wait = async move { + loop { + if let Some(reason) = rx.borrow_and_update().clone() { + return reason; + } + if rx.changed().await.is_err() { + return ExitReason::Kill; + } + } + }; + + // If inside a tokio runtime, use block_in_place + block_on to avoid + // "cannot start a runtime from within a runtime" panic. + if let Ok(handle) = spawned_rt::tasks::Handle::try_current() { + spawned_rt::tasks::block_in_place(|| handle.block_on(wait)) + } else { + spawned_rt::threads::block_on(wait) + } +} + impl PartialEq for ChildHandle { fn eq(&self, other: &Self) -> bool { self.id == other.id @@ -253,4 +279,108 @@ mod tests { let h2 = ChildHandle::from_threads(id, token, completion); assert_eq!(h1, h2); } + + // --- Integration tests using real actors --- + + #[test] + fn child_handle_from_threads_actor_ref() { + use crate::message::Message; + use crate::threads::actor::{Actor, ActorStart, Context, Handler}; + + struct Worker; + struct Stop; + impl Message for Stop { + type Result = (); + } + impl Actor for Worker {} + impl Handler for Worker { + fn handle(&mut self, _msg: Stop, ctx: &Context) { + ctx.stop(); + } + } + + let actor = Worker.start(); + let handle = actor.child_handle(); + + assert!(handle.is_alive()); + assert!(handle.exit_reason().is_none()); + assert_eq!(handle.id(), actor.id()); + + actor.send(Stop).unwrap(); + let reason = handle.wait_exit_blocking(); + assert_eq!(reason, ExitReason::Normal); + assert!(!handle.is_alive()); + } + + #[test] + fn child_handle_from_tasks_actor_ref() { + use crate::message::Message; + use crate::tasks::actor::{Actor, ActorStart, Context, Handler}; + + struct Worker; + struct Stop; + impl Message for Stop { + type Result = (); + } + impl Actor for Worker {} + impl Handler for Worker { + async fn handle(&mut self, _msg: Stop, ctx: &Context) { + ctx.stop(); + } + } + + let runtime = spawned_rt::tasks::Runtime::new().unwrap(); + runtime.block_on(async { + let actor = Worker.start(); + let handle = actor.child_handle(); + + assert!(handle.is_alive()); + assert!(handle.exit_reason().is_none()); + assert_eq!(handle.id(), actor.id()); + + actor.send(Stop).unwrap(); + let reason = handle.wait_exit_async().await; + assert_eq!(reason, ExitReason::Normal); + assert!(!handle.is_alive()); + }); + } + + #[test] + fn child_handle_stop_from_tasks_actor() { + use crate::tasks::actor::{Actor, ActorStart}; + + struct Idler; + impl Actor for Idler {} + + let runtime = spawned_rt::tasks::Runtime::new().unwrap(); + runtime.block_on(async { + let actor = Idler.start(); + let handle = actor.child_handle(); + + assert!(handle.is_alive()); + handle.stop(); + let reason = handle.wait_exit_async().await; + assert_eq!(reason, ExitReason::Normal); + }); + } + + #[test] + fn child_handle_wait_blocking_inside_runtime() { + use crate::tasks::actor::{Actor, ActorStart}; + + struct Idler; + impl Actor for Idler {} + + let runtime = spawned_rt::tasks::Runtime::new().unwrap(); + runtime.block_on(async { + let actor = Idler.start(); + let handle = actor.child_handle(); + handle.stop(); + + // This is the bug-fix test: wait_exit_blocking inside a tokio runtime + // should NOT panic (uses block_in_place internally) + let reason = spawned_rt::tasks::block_in_place(|| handle.wait_exit_blocking()); + assert_eq!(reason, ExitReason::Normal); + }); + } } diff --git a/rt/src/tasks/mod.rs b/rt/src/tasks/mod.rs index 17ca641..4d3e040 100644 --- a/rt/src/tasks/mod.rs +++ b/rt/src/tasks/mod.rs @@ -9,8 +9,6 @@ mod tokio; -use ::tokio::runtime::Handle; - use crate::tracing::init_tracing; pub use crate::tasks::tokio::mpsc; @@ -19,7 +17,9 @@ pub use crate::tasks::tokio::sleep; pub use crate::tasks::tokio::timeout; pub use crate::tasks::tokio::watch; pub use crate::tasks::tokio::CancellationToken; -pub use crate::tasks::tokio::{spawn, spawn_blocking, task_id, JoinHandle, Runtime}; +pub use crate::tasks::tokio::{ + block_in_place, spawn, spawn_blocking, task_id, Handle, JoinHandle, Runtime, +}; pub use crate::tasks::tokio::{BroadcastStream, ReceiverStream}; use std::future::Future; @@ -32,8 +32,15 @@ pub fn run(future: F) -> F::Output { } /// Block on a future using the current tokio runtime handle. +/// Panics if no tokio runtime is active. pub fn block_on(future: F) -> F::Output { Handle::current().block_on(future) } +/// Block on a future using the current tokio runtime handle. +/// Returns `None` if no tokio runtime is active. +pub fn try_block_on(future: F) -> Option { + Handle::try_current().ok().map(|h| h.block_on(future)) +} + pub use crate::tasks::tokio::ctrl_c; diff --git a/rt/src/tasks/tokio/mod.rs b/rt/src/tasks/tokio/mod.rs index af6c558..4c32248 100644 --- a/rt/src/tasks/tokio/mod.rs +++ b/rt/src/tasks/tokio/mod.rs @@ -4,8 +4,8 @@ pub mod oneshot; pub use tokio::sync::watch; pub use tokio::{ - runtime::Runtime, - task::{id as task_id, spawn, spawn_blocking, JoinHandle}, + runtime::{Handle, Runtime}, + task::{block_in_place, id as task_id, spawn, spawn_blocking, JoinHandle}, time::{sleep, timeout}, }; pub use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream as ReceiverStream}; From 5cf0218d89f3c7121412121d225836932e1bb491 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Tue, 7 Apr 2026 15:27:17 -0300 Subject: [PATCH 3/4] fix: address review feedback on ChildHandle PR --- concurrency/src/child_handle.rs | 9 ++++++++- concurrency/src/tasks/actor.rs | 9 +++++++-- concurrency/src/threads/actor.rs | 9 +++++++-- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/concurrency/src/child_handle.rs b/concurrency/src/child_handle.rs index 12695c7..a990e66 100644 --- a/concurrency/src/child_handle.rs +++ b/concurrency/src/child_handle.rs @@ -52,7 +52,7 @@ type CancelFn = Arc; /// Obtained via `ChildHandle::from(actor_ref)`. /// /// Unlike `ActorRef`, a `ChildHandle` cannot send messages — it only -/// provides supervision-related operations (stop, wait, monitor, link). +/// provides supervision-related operations (stop, wait, check liveness). #[derive(Clone)] pub struct ChildHandle { id: ActorId, @@ -153,6 +153,9 @@ impl ChildHandle { /// Works with both execution modes. For Watch (tasks-mode handles), awaits /// the watch channel directly. For Condvar (threads-mode handles), delegates /// to a blocking task via `spawn_blocking` to avoid blocking the async runtime. + /// + /// **Note:** When used with threads-mode handles, this consumes a thread from + /// tokio's blocking pool for the duration of the wait. pub async fn wait_exit_async(&self) -> ExitReason { match &self.completion { Completion::Watch(rx) => { @@ -178,6 +181,10 @@ impl ChildHandle { /// Blocking wait on a watch channel. Uses `block_in_place` if inside a tokio /// runtime (safe from multi-threaded runtime), otherwise creates a temporary runtime. +/// +/// Returns `ExitReason::Kill` if the watch sender is dropped without setting a +/// reason — this means the actor task was aborted externally (e.g., runtime +/// shutdown) without going through the normal exit path. fn wait_for_exit_watch_blocking( rx: &spawned_rt::tasks::watch::Receiver>, ) -> ExitReason { diff --git a/concurrency/src/tasks/actor.rs b/concurrency/src/tasks/actor.rs index 70f2490..1d241a8 100644 --- a/concurrency/src/tasks/actor.rs +++ b/concurrency/src/tasks/actor.rs @@ -142,6 +142,11 @@ impl Context { } } + /// The actor's unique identity. + pub fn id(&self) -> ActorId { + self.id + } + /// Signal the actor to stop. The current handler will finish, then /// `stopped()` is called and the actor exits. pub fn stop(&self) { @@ -414,8 +419,8 @@ impl From> for ChildHandle { fn from(actor_ref: ActorRef) -> Self { ChildHandle::from_tasks( actor_ref.id, - actor_ref.cancellation_token.clone(), - actor_ref.completion_rx.clone(), + actor_ref.cancellation_token, + actor_ref.completion_rx, ) } } diff --git a/concurrency/src/threads/actor.rs b/concurrency/src/threads/actor.rs index 732a00b..61fb57d 100644 --- a/concurrency/src/threads/actor.rs +++ b/concurrency/src/threads/actor.rs @@ -110,6 +110,11 @@ impl Context { } } + /// The actor's unique identity. + pub fn id(&self) -> ActorId { + self.id + } + /// Signal the actor to stop. The current handler will finish, then /// `stopped()` is called and the actor exits. pub fn stop(&self) { @@ -396,8 +401,8 @@ impl From> for ChildHandle { fn from(actor_ref: ActorRef) -> Self { ChildHandle::from_threads( actor_ref.id, - actor_ref.cancellation_token.clone(), - actor_ref.completion.clone(), + actor_ref.cancellation_token, + actor_ref.completion, ) } } From 260cc350d2a1fc902b3c08bf1526d616960880b5 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 8 Apr 2026 13:25:09 -0300 Subject: [PATCH 4/4] feat: extend exit_reason example with ChildHandle scenarios --- examples/exit_reason/src/main.rs | 46 +++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/examples/exit_reason/src/main.rs b/examples/exit_reason/src/main.rs index 263efc3..30d05ef 100644 --- a/examples/exit_reason/src/main.rs +++ b/examples/exit_reason/src/main.rs @@ -1,6 +1,6 @@ use spawned_concurrency::protocol; use spawned_concurrency::tasks::{Actor, ActorStart, Context, Handler}; -use spawned_concurrency::Response; +use spawned_concurrency::{ChildHandle, Response}; use spawned_rt::tasks as rt; use std::time::Duration; @@ -108,6 +108,50 @@ fn main() { worker4.join().await; println!(" After stop: {:?}", worker4.exit_reason()); + // 5. ChildHandle — type-erased supervision handle + println!("\n--- Scenario 5: ChildHandle ---"); + let worker5 = Worker::new("worker-5").start(); + let handle: ChildHandle = worker5.child_handle(); + println!(" ActorId: {}", handle.id()); + println!(" Same id as ActorRef: {}", handle.id() == worker5.id()); + println!(" is_alive: {}", handle.is_alive()); + + // Stop via ChildHandle (type-erased, no message sending) + handle.stop(); + let reason = handle.wait_exit_async().await; + println!(" Stopped via ChildHandle"); + println!(" Exit reason: {reason}"); + println!(" is_alive: {}", handle.is_alive()); + + // 6. ChildHandle from a panicking actor + println!("\n--- Scenario 6: ChildHandle observes panic ---"); + let worker6 = Worker::new("worker-6").start(); + let handle6 = worker6.child_handle(); + let _ = worker6.panic_now().await; + let reason = handle6.wait_exit_async().await; + println!(" Exit reason: {reason}"); + println!(" is_abnormal: {}", reason.is_abnormal()); + + // 7. Multiple ChildHandles from different actor types + println!("\n--- Scenario 7: Heterogeneous ChildHandle vec ---"); + struct Idler; + impl Actor for Idler {} + + let w = Worker::new("worker-7").start(); + let i = Idler.start(); + let handles: Vec = vec![w.child_handle(), i.child_handle()]; + println!(" {} actors managed via Vec", handles.len()); + for h in &handles { + println!(" {} — alive: {}", h.id(), h.is_alive()); + } + for h in &handles { + h.stop(); + } + for h in &handles { + let reason = h.wait_exit_async().await; + println!(" {} — exit: {reason}", h.id()); + } + // Give tracing a moment to flush rt::sleep(Duration::from_millis(50)).await; println!("\n=== Done ===");