diff --git a/concurrency/src/child_handle.rs b/concurrency/src/child_handle.rs new file mode 100644 index 0000000..a990e66 --- /dev/null +++ b/concurrency/src/child_handle.rs @@ -0,0 +1,393 @@ +use crate::error::ExitReason; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; + +// --------------------------------------------------------------------------- +// ActorId +// --------------------------------------------------------------------------- + +static NEXT_ACTOR_ID: AtomicU64 = AtomicU64::new(1); + +/// Unique identity for an actor instance. Used by monitors and links to +/// identify actors without needing the concrete actor type. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ActorId(u64); + +impl ActorId { + pub(crate) fn next() -> Self { + Self(NEXT_ACTOR_ID.fetch_add(1, Ordering::Relaxed)) + } +} + +impl std::fmt::Display for ActorId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ActorId({})", self.0) + } +} + +// --------------------------------------------------------------------------- +// Completion — abstraction over tasks/threads completion signals +// --------------------------------------------------------------------------- + +/// Type-erased completion signal. Wraps the mode-specific completion mechanism +/// so `ChildHandle` works uniformly across tasks and threads. +#[derive(Clone)] +pub(crate) enum Completion { + /// Tasks mode: watch channel carrying Option. + Watch(spawned_rt::tasks::watch::Receiver>), + /// Threads mode: Mutex + Condvar carrying Option. + Condvar(Arc<(Mutex>, Condvar)>), +} + +// --------------------------------------------------------------------------- +// ChildHandle +// --------------------------------------------------------------------------- + +/// Type-erased cancel function. Wraps the mode-specific cancellation token. +type CancelFn = Arc; + +/// Type-erased handle to a running actor. Provides lifecycle operations +/// (stop, liveness check, exit reason) without knowing the actor's concrete type. +/// +/// Obtained via `ChildHandle::from(actor_ref)`. +/// +/// Unlike `ActorRef`, a `ChildHandle` cannot send messages — it only +/// provides supervision-related operations (stop, wait, check liveness). +#[derive(Clone)] +pub struct ChildHandle { + id: ActorId, + cancel: CancelFn, + completion: Completion, +} + +impl std::fmt::Debug for ChildHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ChildHandle") + .field("id", &self.id) + .finish_non_exhaustive() + } +} + +impl ChildHandle { + /// Create a ChildHandle for tasks mode. + pub(crate) fn from_tasks( + id: ActorId, + cancellation_token: spawned_rt::tasks::CancellationToken, + completion_rx: spawned_rt::tasks::watch::Receiver>, + ) -> Self { + Self { + id, + cancel: Arc::new(move || cancellation_token.cancel()), + completion: Completion::Watch(completion_rx), + } + } + + /// Create a ChildHandle for threads mode. + pub(crate) fn from_threads( + id: ActorId, + cancellation_token: spawned_rt::threads::CancellationToken, + completion: Arc<(Mutex>, Condvar)>, + ) -> Self { + Self { + id, + cancel: Arc::new(move || cancellation_token.cancel()), + completion: Completion::Condvar(completion), + } + } + + /// The actor's unique identity. + pub fn id(&self) -> ActorId { + self.id + } + + /// Signal the actor to stop. The actor will finish its current handler, + /// run `stopped()`, and exit. + pub fn stop(&self) { + (self.cancel)(); + } + + /// Returns `true` if the actor is still running. + pub fn is_alive(&self) -> bool { + self.exit_reason().is_none() + } + + /// Poll the exit reason. Returns `None` if the actor is still running. + pub fn exit_reason(&self) -> Option { + match &self.completion { + Completion::Watch(rx) => rx.borrow().clone(), + Completion::Condvar(completion) => { + let (lock, _) = &**completion; + let guard = lock.lock().unwrap_or_else(|p| p.into_inner()); + guard.clone() + } + } + } + + /// Block the calling thread until the actor stops and return the exit reason. + /// + /// Safe to call from any context (sync or async). For the Watch variant, + /// uses the watch channel's built-in blocking recv. For Condvar, blocks + /// on the condvar directly. + pub fn wait_exit_blocking(&self) -> ExitReason { + match &self.completion { + Completion::Watch(rx) => { + // wait_for_exit_watch_blocking is extracted to avoid holding the + // borrow on self across the loop + wait_for_exit_watch_blocking(&rx.clone()) + } + Completion::Condvar(completion) => { + let (lock, cvar) = &**completion; + let mut guard = lock.lock().unwrap_or_else(|p| p.into_inner()); + loop { + if let Some(reason) = guard.clone() { + return reason; + } + guard = cvar.wait(guard).unwrap_or_else(|p| p.into_inner()); + } + } + } + } + + /// Async wait until the actor stops and return the exit reason. + /// + /// Works with both execution modes. For Watch (tasks-mode handles), awaits + /// the watch channel directly. For Condvar (threads-mode handles), delegates + /// to a blocking task via `spawn_blocking` to avoid blocking the async runtime. + /// + /// **Note:** When used with threads-mode handles, this consumes a thread from + /// tokio's blocking pool for the duration of the wait. + pub async fn wait_exit_async(&self) -> ExitReason { + match &self.completion { + Completion::Watch(rx) => { + let mut rx = rx.clone(); + loop { + if let Some(reason) = rx.borrow_and_update().clone() { + return reason; + } + if rx.changed().await.is_err() { + return ExitReason::Kill; + } + } + } + Completion::Condvar(_) => { + let handle = self.clone(); + spawned_rt::tasks::spawn_blocking(move || handle.wait_exit_blocking()) + .await + .unwrap_or(ExitReason::Kill) + } + } + } +} + +/// Blocking wait on a watch channel. Uses `block_in_place` if inside a tokio +/// runtime (safe from multi-threaded runtime), otherwise creates a temporary runtime. +/// +/// Returns `ExitReason::Kill` if the watch sender is dropped without setting a +/// reason — this means the actor task was aborted externally (e.g., runtime +/// shutdown) without going through the normal exit path. +fn wait_for_exit_watch_blocking( + rx: &spawned_rt::tasks::watch::Receiver>, +) -> ExitReason { + // Fast path: already done + if let Some(reason) = rx.borrow().clone() { + return reason; + } + + let mut rx = rx.clone(); + let wait = async move { + loop { + if let Some(reason) = rx.borrow_and_update().clone() { + return reason; + } + if rx.changed().await.is_err() { + return ExitReason::Kill; + } + } + }; + + // If inside a tokio runtime, use block_in_place + block_on to avoid + // "cannot start a runtime from within a runtime" panic. + if let Ok(handle) = spawned_rt::tasks::Handle::try_current() { + spawned_rt::tasks::block_in_place(|| handle.block_on(wait)) + } else { + spawned_rt::threads::block_on(wait) + } +} + +impl PartialEq for ChildHandle { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for ChildHandle {} + +impl std::hash::Hash for ChildHandle { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn actor_id_is_unique() { + let a = ActorId::next(); + let b = ActorId::next(); + assert_ne!(a, b); + } + + #[test] + fn actor_id_display() { + let id = ActorId::next(); + let s = format!("{id}"); + assert!(s.starts_with("ActorId(")); + } + + #[test] + fn child_handle_from_threads_basics() { + let completion = Arc::new((Mutex::new(None), Condvar::new())); + let token = spawned_rt::threads::CancellationToken::new(); + let handle = ChildHandle::from_threads(ActorId::next(), token, completion.clone()); + + assert!(handle.is_alive()); + assert!(handle.exit_reason().is_none()); + + // Simulate actor completion + { + let (lock, cvar) = &*completion; + let mut guard = lock.lock().unwrap(); + *guard = Some(ExitReason::Normal); + cvar.notify_all(); + } + + assert!(!handle.is_alive()); + assert_eq!(handle.exit_reason(), Some(ExitReason::Normal)); + } + + #[test] + fn child_handle_stop() { + let completion = Arc::new((Mutex::new(None), Condvar::new())); + let token = spawned_rt::threads::CancellationToken::new(); + assert!(!token.is_cancelled()); + let handle = ChildHandle::from_threads(ActorId::next(), token.clone(), completion); + handle.stop(); + assert!(token.is_cancelled()); + } + + #[test] + fn child_handle_equality_by_id() { + let completion = Arc::new((Mutex::new(None), Condvar::new())); + let token = spawned_rt::threads::CancellationToken::new(); + let id = ActorId::next(); + let h1 = ChildHandle::from_threads(id, token.clone(), completion.clone()); + let h2 = ChildHandle::from_threads(id, token, completion); + assert_eq!(h1, h2); + } + + // --- Integration tests using real actors --- + + #[test] + fn child_handle_from_threads_actor_ref() { + use crate::message::Message; + use crate::threads::actor::{Actor, ActorStart, Context, Handler}; + + struct Worker; + struct Stop; + impl Message for Stop { + type Result = (); + } + impl Actor for Worker {} + impl Handler for Worker { + fn handle(&mut self, _msg: Stop, ctx: &Context) { + ctx.stop(); + } + } + + let actor = Worker.start(); + let handle = actor.child_handle(); + + assert!(handle.is_alive()); + assert!(handle.exit_reason().is_none()); + assert_eq!(handle.id(), actor.id()); + + actor.send(Stop).unwrap(); + let reason = handle.wait_exit_blocking(); + assert_eq!(reason, ExitReason::Normal); + assert!(!handle.is_alive()); + } + + #[test] + fn child_handle_from_tasks_actor_ref() { + use crate::message::Message; + use crate::tasks::actor::{Actor, ActorStart, Context, Handler}; + + struct Worker; + struct Stop; + impl Message for Stop { + type Result = (); + } + impl Actor for Worker {} + impl Handler for Worker { + async fn handle(&mut self, _msg: Stop, ctx: &Context) { + ctx.stop(); + } + } + + let runtime = spawned_rt::tasks::Runtime::new().unwrap(); + runtime.block_on(async { + let actor = Worker.start(); + let handle = actor.child_handle(); + + assert!(handle.is_alive()); + assert!(handle.exit_reason().is_none()); + assert_eq!(handle.id(), actor.id()); + + actor.send(Stop).unwrap(); + let reason = handle.wait_exit_async().await; + assert_eq!(reason, ExitReason::Normal); + assert!(!handle.is_alive()); + }); + } + + #[test] + fn child_handle_stop_from_tasks_actor() { + use crate::tasks::actor::{Actor, ActorStart}; + + struct Idler; + impl Actor for Idler {} + + let runtime = spawned_rt::tasks::Runtime::new().unwrap(); + runtime.block_on(async { + let actor = Idler.start(); + let handle = actor.child_handle(); + + assert!(handle.is_alive()); + handle.stop(); + let reason = handle.wait_exit_async().await; + assert_eq!(reason, ExitReason::Normal); + }); + } + + #[test] + fn child_handle_wait_blocking_inside_runtime() { + use crate::tasks::actor::{Actor, ActorStart}; + + struct Idler; + impl Actor for Idler {} + + let runtime = spawned_rt::tasks::Runtime::new().unwrap(); + runtime.block_on(async { + let actor = Idler.start(); + let handle = actor.child_handle(); + handle.stop(); + + // This is the bug-fix test: wait_exit_blocking inside a tokio runtime + // should NOT panic (uses block_in_place internally) + let reason = spawned_rt::tasks::block_in_place(|| handle.wait_exit_blocking()); + assert_eq!(reason, ExitReason::Normal); + }); + } +} diff --git a/concurrency/src/lib.rs b/concurrency/src/lib.rs index d4230c0..e34000a 100644 --- a/concurrency/src/lib.rs +++ b/concurrency/src/lib.rs @@ -71,6 +71,7 @@ //! - `Recipient` (`Arc>`) for type-erased per-message references //! - [`tasks::Backend`] enum for choosing async runtime, blocking pool, or OS thread +pub mod child_handle; pub mod error; pub mod message; pub mod registry; @@ -78,6 +79,7 @@ pub mod response; pub mod tasks; pub mod threads; +pub use child_handle::{ActorId, ChildHandle}; pub use error::{ActorError, ExitReason}; pub use response::Response; pub use spawned_macros::{actor, protocol}; diff --git a/concurrency/src/tasks/actor.rs b/concurrency/src/tasks/actor.rs index 33f6cd6..1d241a8 100644 --- a/concurrency/src/tasks/actor.rs +++ b/concurrency/src/tasks/actor.rs @@ -1,3 +1,4 @@ +use crate::child_handle::{ActorId, ChildHandle}; use crate::error::{panic_message, ActorError, ExitReason}; use crate::message::Message; use core::pin::pin; @@ -106,6 +107,7 @@ where /// /// Clone is cheap — it clones the inner channel sender and cancellation token. pub struct Context { + id: ActorId, sender: mpsc::Sender + Send>>, cancellation_token: CancellationToken, completion_rx: watch::Receiver>, @@ -114,6 +116,7 @@ pub struct Context { impl Clone for Context { fn clone(&self) -> Self { Self { + id: self.id, sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion_rx: self.completion_rx.clone(), @@ -132,12 +135,18 @@ impl Context { /// or stream listeners from outside the actor. pub fn from_ref(actor_ref: &ActorRef) -> Self { Self { + id: actor_ref.id, sender: actor_ref.sender.clone(), cancellation_token: actor_ref.cancellation_token.clone(), completion_rx: actor_ref.completion_rx.clone(), } } + /// The actor's unique identity. + pub fn id(&self) -> ActorId { + self.id + } + /// Signal the actor to stop. The current handler will finish, then /// `stopped()` is called and the actor exits. pub fn stop(&self) { @@ -211,6 +220,7 @@ impl Context { /// Get an `ActorRef` from this context. pub fn actor_ref(&self) -> ActorRef { ActorRef { + id: self.id, sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion_rx: self.completion_rx.clone(), @@ -279,6 +289,7 @@ pub async fn request( /// To stop the actor, send an explicit shutdown message through your protocol, /// or call [`Context::stop`] from within a handler. pub struct ActorRef { + id: ActorId, sender: mpsc::Sender + Send>>, cancellation_token: CancellationToken, completion_rx: watch::Receiver>, @@ -293,6 +304,7 @@ impl Debug for ActorRef { impl Clone for ActorRef { fn clone(&self) -> Self { Self { + id: self.id, sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion_rx: self.completion_rx.clone(), @@ -391,6 +403,26 @@ impl ActorRef { } } } + + /// The actor's unique identity. + pub fn id(&self) -> ActorId { + self.id + } + + /// Get a type-erased `ChildHandle` for this actor. + pub fn child_handle(&self) -> ChildHandle { + ChildHandle::from(self.clone()) + } +} + +impl From> for ChildHandle { + fn from(actor_ref: ActorRef) -> Self { + ChildHandle::from_tasks( + actor_ref.id, + actor_ref.cancellation_token, + actor_ref.completion_rx, + ) + } } // Bridge: ActorRef implements Receiver for any M that A handles @@ -419,12 +451,14 @@ impl ActorRef { let (completion_tx, completion_rx) = watch::channel(None); let actor_ref = ActorRef { + id: ActorId::next(), sender: tx.clone(), cancellation_token: cancellation_token.clone(), completion_rx, }; let ctx = Context { + id: actor_ref.id, sender: tx, cancellation_token: cancellation_token.clone(), completion_rx: actor_ref.completion_rx.clone(), diff --git a/concurrency/src/threads/actor.rs b/concurrency/src/threads/actor.rs index 8038699..61fb57d 100644 --- a/concurrency/src/threads/actor.rs +++ b/concurrency/src/threads/actor.rs @@ -8,6 +8,7 @@ use std::{ time::Duration, }; +use crate::child_handle::{ActorId, ChildHandle}; use crate::error::{panic_message, ActorError, ExitReason}; use crate::message::Message; @@ -74,6 +75,7 @@ where /// /// Clone is cheap — it clones the inner channel sender and cancellation token. pub struct Context { + id: ActorId, sender: mpsc::Sender + Send>>, cancellation_token: CancellationToken, completion: Arc<(Mutex>, Condvar)>, @@ -82,6 +84,7 @@ pub struct Context { impl Clone for Context { fn clone(&self) -> Self { Self { + id: self.id, sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion: self.completion.clone(), @@ -100,12 +103,18 @@ impl Context { /// or stream listeners from outside the actor. pub fn from_ref(actor_ref: &ActorRef) -> Self { Self { + id: actor_ref.id, sender: actor_ref.sender.clone(), cancellation_token: actor_ref.cancellation_token.clone(), completion: actor_ref.completion.clone(), } } + /// The actor's unique identity. + pub fn id(&self) -> ActorId { + self.id + } + /// Signal the actor to stop. The current handler will finish, then /// `stopped()` is called and the actor exits. pub fn stop(&self) { @@ -178,6 +187,7 @@ impl Context { /// Get an `ActorRef` from this context. pub fn actor_ref(&self) -> ActorRef { ActorRef { + id: self.id, sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion: self.completion.clone(), @@ -261,6 +271,7 @@ impl Drop for CompletionGuard { /// To stop the actor, send an explicit shutdown message through your protocol, /// or call [`Context::stop`] from within a handler. pub struct ActorRef { + id: ActorId, sender: mpsc::Sender + Send>>, cancellation_token: CancellationToken, completion: Arc<(Mutex>, Condvar)>, @@ -275,6 +286,7 @@ impl Debug for ActorRef { impl Clone for ActorRef { fn clone(&self) -> Self { Self { + id: self.id, sender: self.sender.clone(), cancellation_token: self.cancellation_token.clone(), completion: self.completion.clone(), @@ -373,6 +385,26 @@ impl ActorRef { completed = cvar.wait(completed).unwrap_or_else(|p| p.into_inner()); } } + + /// The actor's unique identity. + pub fn id(&self) -> ActorId { + self.id + } + + /// Get a type-erased `ChildHandle` for this actor. + pub fn child_handle(&self) -> ChildHandle { + ChildHandle::from(self.clone()) + } +} + +impl From> for ChildHandle { + fn from(actor_ref: ActorRef) -> Self { + ChildHandle::from_threads( + actor_ref.id, + actor_ref.cancellation_token, + actor_ref.completion, + ) + } } // Bridge: ActorRef implements Receiver for any M that A handles @@ -399,14 +431,17 @@ impl ActorRef { let (tx, rx) = mpsc::channel:: + Send>>(); let cancellation_token = CancellationToken::new(); let completion = Arc::new((Mutex::new(None), Condvar::new())); + let id = ActorId::next(); let actor_ref = ActorRef { + id, sender: tx.clone(), cancellation_token: cancellation_token.clone(), completion: completion.clone(), }; let ctx = Context { + id, sender: tx, cancellation_token: cancellation_token.clone(), completion: actor_ref.completion.clone(), diff --git a/examples/exit_reason/src/main.rs b/examples/exit_reason/src/main.rs index 263efc3..30d05ef 100644 --- a/examples/exit_reason/src/main.rs +++ b/examples/exit_reason/src/main.rs @@ -1,6 +1,6 @@ use spawned_concurrency::protocol; use spawned_concurrency::tasks::{Actor, ActorStart, Context, Handler}; -use spawned_concurrency::Response; +use spawned_concurrency::{ChildHandle, Response}; use spawned_rt::tasks as rt; use std::time::Duration; @@ -108,6 +108,50 @@ fn main() { worker4.join().await; println!(" After stop: {:?}", worker4.exit_reason()); + // 5. ChildHandle — type-erased supervision handle + println!("\n--- Scenario 5: ChildHandle ---"); + let worker5 = Worker::new("worker-5").start(); + let handle: ChildHandle = worker5.child_handle(); + println!(" ActorId: {}", handle.id()); + println!(" Same id as ActorRef: {}", handle.id() == worker5.id()); + println!(" is_alive: {}", handle.is_alive()); + + // Stop via ChildHandle (type-erased, no message sending) + handle.stop(); + let reason = handle.wait_exit_async().await; + println!(" Stopped via ChildHandle"); + println!(" Exit reason: {reason}"); + println!(" is_alive: {}", handle.is_alive()); + + // 6. ChildHandle from a panicking actor + println!("\n--- Scenario 6: ChildHandle observes panic ---"); + let worker6 = Worker::new("worker-6").start(); + let handle6 = worker6.child_handle(); + let _ = worker6.panic_now().await; + let reason = handle6.wait_exit_async().await; + println!(" Exit reason: {reason}"); + println!(" is_abnormal: {}", reason.is_abnormal()); + + // 7. Multiple ChildHandles from different actor types + println!("\n--- Scenario 7: Heterogeneous ChildHandle vec ---"); + struct Idler; + impl Actor for Idler {} + + let w = Worker::new("worker-7").start(); + let i = Idler.start(); + let handles: Vec = vec![w.child_handle(), i.child_handle()]; + println!(" {} actors managed via Vec", handles.len()); + for h in &handles { + println!(" {} — alive: {}", h.id(), h.is_alive()); + } + for h in &handles { + h.stop(); + } + for h in &handles { + let reason = h.wait_exit_async().await; + println!(" {} — exit: {reason}", h.id()); + } + // Give tracing a moment to flush rt::sleep(Duration::from_millis(50)).await; println!("\n=== Done ==="); diff --git a/rt/src/tasks/mod.rs b/rt/src/tasks/mod.rs index 17ca641..4d3e040 100644 --- a/rt/src/tasks/mod.rs +++ b/rt/src/tasks/mod.rs @@ -9,8 +9,6 @@ mod tokio; -use ::tokio::runtime::Handle; - use crate::tracing::init_tracing; pub use crate::tasks::tokio::mpsc; @@ -19,7 +17,9 @@ pub use crate::tasks::tokio::sleep; pub use crate::tasks::tokio::timeout; pub use crate::tasks::tokio::watch; pub use crate::tasks::tokio::CancellationToken; -pub use crate::tasks::tokio::{spawn, spawn_blocking, task_id, JoinHandle, Runtime}; +pub use crate::tasks::tokio::{ + block_in_place, spawn, spawn_blocking, task_id, Handle, JoinHandle, Runtime, +}; pub use crate::tasks::tokio::{BroadcastStream, ReceiverStream}; use std::future::Future; @@ -32,8 +32,15 @@ pub fn run(future: F) -> F::Output { } /// Block on a future using the current tokio runtime handle. +/// Panics if no tokio runtime is active. pub fn block_on(future: F) -> F::Output { Handle::current().block_on(future) } +/// Block on a future using the current tokio runtime handle. +/// Returns `None` if no tokio runtime is active. +pub fn try_block_on(future: F) -> Option { + Handle::try_current().ok().map(|h| h.block_on(future)) +} + pub use crate::tasks::tokio::ctrl_c; diff --git a/rt/src/tasks/tokio/mod.rs b/rt/src/tasks/tokio/mod.rs index af6c558..4c32248 100644 --- a/rt/src/tasks/tokio/mod.rs +++ b/rt/src/tasks/tokio/mod.rs @@ -4,8 +4,8 @@ pub mod oneshot; pub use tokio::sync::watch; pub use tokio::{ - runtime::Runtime, - task::{id as task_id, spawn, spawn_blocking, JoinHandle}, + runtime::{Handle, Runtime}, + task::{block_in_place, id as task_id, spawn, spawn_blocking, JoinHandle}, time::{sleep, timeout}, }; pub use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream as ReceiverStream};