Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 224 additions & 125 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
[workspace]
members = [
"rt",
"examples/ping_pong",
"examples/bank",
"examples/bank_threads",
"examples/name_server",
"examples/name_server_with_error",
"examples/ping_pong",
"examples/ping_pong_threads",
"examples/updater",
"examples/bank",
"examples/updater_threads",
]

[workspace.dependencies]
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
# Spawned
Simple library for concurrent Rust.
Library for concurrent Rust.

# Goals:

- Provide a framework to build robust, scalable and efficient applications in concurrent Rust.
- Set coding guidelines to apply along LambdaClass repositories and codebase.
- Starting point to ideate what we want for Concrete.

# Versions:

Two versions exist in their own submodules:
- threads: no use of async/await. Just IO threads code.
- tasks: a runtime is required to run async/await code. The runtime is selected in `spawned_rt::tasks` module that abstracts it.

# Inspiration:

- [Erlang](https://www.erlang.org/)
- [Commonware.xyz](https://commonware.xyz)
- [Ractor](https://slawlor.github.io/ractor/)
- [Actors with Tokio](https://ryhl.io/blog/actors-with-tokio/)
- [Vale.dev](https://vale.dev/)
- [Gleam](https://gleam.run/)
4 changes: 4 additions & 0 deletions concurrency/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Spawned concurrency
Some basic traits and structs to implement à-la-Erlang concurrent code.

Currently two versions:

- threads: no use of async/await. Just IO threads code
- tasks: a runtime is required to run async/await code. It uses `spawned_rt::tasks` module that abstracts the runtime.
11 changes: 0 additions & 11 deletions concurrency/src/error.rs

This file was deleted.

15 changes: 4 additions & 11 deletions concurrency/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
//! λ-kit concurrency
//! Some basic traits and structs to implement À-la-Erlang concurrent code.
//! spawned concurrency
//! Some basic traits and structs to implement concurrent code à-la-Erlang.

mod error;
mod gen_server;
mod process;
mod time;

pub use error::GenServerError;
pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
pub use process::{Process, ProcessInfo, send};
pub use time::send_after;
pub mod tasks;
pub mod threads;
11 changes: 11 additions & 0 deletions concurrency/src/tasks/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#[derive(Debug)]
pub enum GenServerError {
CallbackError,
ServerError,
}

impl<T> From<spawned_rt::tasks::mpsc::SendError<T>> for GenServerError {
fn from(_value: spawned_rt::tasks::mpsc::SendError<T>) -> Self {
Self::ServerError
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! GernServer trait and structs to create an abstraction similar to Erlang gen_server.
//! GenServer trait and structs to create an abstraction similar to Erlang gen_server.
//! See examples/name_server for a usage example.
use futures::future::FutureExt as _;
use spawned_rt::{self as rt, JoinHandle, mpsc, oneshot};
use spawned_rt::tasks::{self as rt, mpsc, oneshot, JoinHandle};
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe};

use crate::error::GenServerError;
use super::error::GenServerError;

#[derive(Debug)]
pub struct GenServerHandle<G: GenServer + 'static> {
Expand Down
12 changes: 12 additions & 0 deletions concurrency/src/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//! spawned concurrency
//! Runtime tasks-based traits and structs to implement concurrent code à-la-Erlang.

mod error;
mod gen_server;
mod process;
mod time;

pub use error::GenServerError;
pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
pub use process::{send, Process, ProcessInfo};
pub use time::send_after;
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
//! Process trait and struct to create a process abstraction similar to Erlang processes.
//! See examples/ping_pong for a usage example.

use spawned_rt::tasks::{self as rt, mpsc, JoinHandle};
use std::future::Future;

use spawned_rt::{self as rt, JoinHandle, mpsc};

#[derive(Debug)]
pub struct ProcessInfo<T> {
pub tx: mpsc::Sender<T>,
Expand Down
21 changes: 21 additions & 0 deletions concurrency/src/tasks/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::time::Duration;

use spawned_rt::tasks::{self as rt, mpsc::Sender, JoinHandle};

use super::{GenServer, GenServerInMsg};

// Sends a message after a given period to the specified GenServer. The task terminates
// once the send has completed
pub fn send_after<T>(
period: Duration,
tx: Sender<GenServerInMsg<T>>,
message: T::InMsg,
) -> JoinHandle<()>
where
T: GenServer + 'static,
{
rt::spawn(async move {
rt::sleep(period).await;
let _ = tx.send(GenServerInMsg::Cast { message });
})
}
11 changes: 11 additions & 0 deletions concurrency/src/threads/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#[derive(Debug)]
pub enum GenServerError {
CallbackError,
ServerError,
}

impl<T> From<spawned_rt::threads::mpsc::SendError<T>> for GenServerError {
fn from(_value: spawned_rt::threads::mpsc::SendError<T>) -> Self {
Self::ServerError
}
}
179 changes: 179 additions & 0 deletions concurrency/src/threads/gen_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
//! GenServer trait and structs to create an abstraction similar to Erlang gen_server.
//! See examples/name_server for a usage example.
use spawned_rt::threads::{self as rt, mpsc, oneshot, JoinHandle};
use std::{
fmt::Debug,
panic::{catch_unwind, AssertUnwindSafe},
};

use super::error::GenServerError;

#[derive(Debug)]
pub struct GenServerHandle<G: GenServer + 'static> {
pub tx: mpsc::Sender<GenServerInMsg<G>>,
#[allow(unused)]
handle: JoinHandle<()>,
}

impl<G: GenServer> GenServerHandle<G> {
pub(crate) fn new(mut initial_state: G::State) -> Self {
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
let tx_clone = tx.clone();
let mut gen_server: G = GenServer::new();
let handle = rt::spawn(move || {
if gen_server
.run(&tx_clone, &mut rx, &mut initial_state)
.is_err()
{
tracing::trace!("GenServer crashed")
};
});
GenServerHandle { tx, handle }
}

pub fn sender(&self) -> mpsc::Sender<GenServerInMsg<G>> {
self.tx.clone()
}

pub fn call(&mut self, message: G::InMsg) -> Result<G::OutMsg, GenServerError> {
let (oneshot_tx, oneshot_rx) = oneshot::channel::<Result<G::OutMsg, GenServerError>>();
self.tx.send(GenServerInMsg::Call {
sender: oneshot_tx,
message,
})?;
match oneshot_rx.recv() {
Ok(result) => result,
Err(_) => Err(GenServerError::ServerError),
}
}

pub fn cast(&mut self, message: G::InMsg) -> Result<(), GenServerError> {
self.tx
.send(GenServerInMsg::Cast { message })
.map_err(|_error| GenServerError::ServerError)
}
}

pub enum GenServerInMsg<A: GenServer> {
Call {
sender: oneshot::Sender<Result<A::OutMsg, GenServerError>>,
message: A::InMsg,
},
Cast {
message: A::InMsg,
},
}

pub enum CallResponse<U> {
Reply(U),
Stop(U),
}

pub enum CastResponse {
NoReply,
Stop,
}

pub trait GenServer
where
Self: Send + Sized,
{
type InMsg: Send + Sized;
type OutMsg: Send + Sized;
type State: Clone + Send;
type Error: Debug;

fn new() -> Self;

fn start(initial_state: Self::State) -> GenServerHandle<Self> {
GenServerHandle::new(initial_state)
}

fn run(
&mut self,
tx: &mpsc::Sender<GenServerInMsg<Self>>,
rx: &mut mpsc::Receiver<GenServerInMsg<Self>>,
state: &mut Self::State,
) -> Result<(), GenServerError> {
self.main_loop(tx, rx, state)?;
Ok(())
}

fn main_loop(
&mut self,
tx: &mpsc::Sender<GenServerInMsg<Self>>,
rx: &mut mpsc::Receiver<GenServerInMsg<Self>>,
state: &mut Self::State,
) -> Result<(), GenServerError> {
loop {
if !self.receive(tx, rx, state)? {
break;
}
}
tracing::trace!("Stopping GenServer");
Ok(())
}

fn receive(
&mut self,
tx: &mpsc::Sender<GenServerInMsg<Self>>,
rx: &mut mpsc::Receiver<GenServerInMsg<Self>>,
state: &mut Self::State,
) -> Result<bool, GenServerError> {
let message = rx.recv().ok();

// Save current state in case of a rollback
let state_clone = state.clone();

let (keep_running, error) = match message {
Some(GenServerInMsg::Call { sender, message }) => {
let (keep_running, error, response) =
match catch_unwind(AssertUnwindSafe(|| self.handle_call(message, tx, state))) {
Ok(response) => match response {
CallResponse::Reply(response) => (true, None, Ok(response)),
CallResponse::Stop(response) => (false, None, Ok(response)),
},
Err(error) => (true, Some(error), Err(GenServerError::CallbackError)),
};
// Send response back
if sender.send(response).is_err() {
tracing::trace!("GenServer failed to send response back, client must have died")
};
(keep_running, error)
}
Some(GenServerInMsg::Cast { message }) => {
match catch_unwind(AssertUnwindSafe(|| self.handle_cast(message, tx, state))) {
Ok(response) => match response {
CastResponse::NoReply => (true, None),
CastResponse::Stop => (false, None),
},
Err(error) => (true, Some(error)),
}
}
None => {
// Channel has been closed; won't receive further messages. Stop the server.
(false, None)
}
};
if let Some(error) = error {
tracing::trace!("Error in callback, reverting state - Error: '{error:?}'");
// Restore initial state (ie. dismiss any change)
*state = state_clone;
};
Ok(keep_running)
}

fn handle_call(
&mut self,
message: Self::InMsg,
tx: &mpsc::Sender<GenServerInMsg<Self>>,
state: &mut Self::State,
) -> CallResponse<Self::OutMsg>;

fn handle_cast(
&mut self,
_message: Self::InMsg,
_tx: &mpsc::Sender<GenServerInMsg<Self>>,
state: &mut Self::State,
) -> CastResponse;
}
11 changes: 11 additions & 0 deletions concurrency/src/threads/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//! spawned concurrency
//! IO threads-based traits and structs to implement concurrent code à-la-Erlang.

mod error;
mod gen_server;
mod process;
mod time;

pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
pub use process::{send, Process, ProcessInfo};
pub use time::send_after;
Loading