Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
make context ref mut
  • Loading branch information
axos88 committed Nov 16, 2025
commit 4697c137257ac0e995498b82735fc1f87038a9dd
7 changes: 5 additions & 2 deletions example-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ async fn main() -> anyhow::Result<()> {
let client = WorldClient::new(client::Config::default(), transport.await?).spawn();

let hello = async move {
let mut context = context::current();
let mut context2 = context::current();

// Send the request twice, just to be safe! ;)
tokio::select! {
hello1 = client.hello(context::current(), format!("{}1", flags.name)) => { hello1 }
hello2 = client.hello(context::current(), format!("{}2", flags.name)) => { hello2 }
hello1 = client.hello(&mut context, format!("{}1", flags.name)) => { hello1 }
hello2 = client.hello(&mut context2, format!("{}2", flags.name)) => { hello2 }
}
}
.instrument(tracing::info_span!("Two Hellos"))
Expand Down
2 changes: 1 addition & 1 deletion example-service/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct Flags {
struct HelloServer(SocketAddr);

impl World for HelloServer {
async fn hello(self, _: context::Context, name: String) -> String {
async fn hello(self, _: &mut context::Context, name: String) -> String {
let sleep_time =
Duration::from_millis(Uniform::new_inclusive(1, 10).sample(&mut thread_rng()));
time::sleep(sleep_time).await;
Expand Down
8 changes: 4 additions & 4 deletions plugins/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ fn collect_cfg_attrs(rpcs: &[RpcMethod]) -> Vec<Vec<&Attribute>> {
/// #[derive(Clone)]
/// struct CalculatorServer;
/// impl Calculator for CalculatorServer {
/// async fn add(self, context: Context, a: i32, b: i32) -> i32 {
/// async fn add(self, context: &mut Context, a: i32, b: i32) -> i32 {
/// a + b
/// }
/// }
Expand Down Expand Up @@ -558,7 +558,7 @@ impl ServiceGenerator<'_> {
)| {
quote! {
#( #attrs )*
async fn #ident(self, context: ::tarpc::context::Context, #( #args ),*) -> #output;
async fn #ident(self, context: &mut ::tarpc::context::Context, #( #args ),*) -> #output;
}
},
);
Expand Down Expand Up @@ -622,7 +622,7 @@ impl ServiceGenerator<'_> {
type Resp = #response_ident;


async fn serve(self, ctx: ::tarpc::context::Context, req: #request_ident)
async fn serve(self, ctx: &mut ::tarpc::context::Context, req: #request_ident)
-> ::core::result::Result<#response_ident, ::tarpc::ServerError> {
match req {
#(
Expand Down Expand Up @@ -786,7 +786,7 @@ impl ServiceGenerator<'_> {
#(
#[allow(unused)]
#( #method_attrs )*
#vis fn #method_idents(&self, ctx: ::tarpc::context::Context, #( #args ),*)
#vis fn #method_idents<'a>(&'a self, ctx: &'a mut ::tarpc::context::Context, #( #args ),*)
-> impl ::core::future::Future<Output = ::core::result::Result<#return_types, ::tarpc::client::RpcError>> + '_ {
let request = #request_ident::#camel_case_idents { #( #arg_pats ),* };
let resp = self.0.call(ctx, request);
Expand Down
14 changes: 7 additions & 7 deletions plugins/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ fn att_service_trait() {
}

impl Foo for () {
async fn two_part(self, _: context::Context, s: String, i: i32) -> (String, i32) {
async fn two_part(self, _: &mut context::Context, s: String, i: i32) -> (String, i32) {
(s, i)
}

async fn bar(self, _: context::Context, s: String) -> String {
async fn bar(self, _: &mut context::Context, s: String) -> String {
s
}

async fn baz(self, _: context::Context) {}
async fn baz(self, _: &mut context::Context) {}
}
}

Expand All @@ -39,18 +39,18 @@ fn raw_idents() {
impl r#trait for () {
async fn r#await(
self,
_: context::Context,
_: &mut context::Context,
r#struct: r#yield,
r#enum: i32,
) -> (r#yield, i32) {
(r#struct, r#enum)
}

async fn r#fn(self, _: context::Context, r#impl: r#yield) -> r#yield {
async fn r#fn(self, _: &mut context::Context, r#impl: r#yield) -> r#yield {
r#impl
}

async fn r#async(self, _: context::Context) {}
async fn r#async(self, _: &mut context::Context) {}
}
}

Expand All @@ -64,7 +64,7 @@ fn service_with_cfg_rpc() {
}

impl Foo for () {
async fn foo(self, _: context::Context) {}
async fn foo(self, _: &mut context::Context) {}
}
}

Expand Down
4 changes: 2 additions & 2 deletions tarpc/examples/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub trait World {
struct HelloServer;

impl World for HelloServer {
async fn hello(self, _: context::Context, name: String) -> String {
async fn hello(self, _: &mut context::Context, name: String) -> String {
format!("Hey, {name}!")
}
}
Expand All @@ -134,7 +134,7 @@ async fn main() -> anyhow::Result<()> {

println!(
"{}",
client.hello(context::current(), "friend".into()).await?
client.hello(&mut context::current(), "friend".into()).await?
);
Ok(())
}
4 changes: 2 additions & 2 deletions tarpc/examples/custom_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub trait PingService {
struct Service;

impl PingService for Service {
async fn ping(self, _: Context) {}
async fn ping(self, _: &mut Context) {}
}

#[tokio::main]
Expand Down Expand Up @@ -52,7 +52,7 @@ async fn main() -> anyhow::Result<()> {
let transport = transport::new(codec_builder.new_framed(conn), Bincode::default());
PingServiceClient::new(Default::default(), transport)
.spawn()
.ping(tarpc::context::current())
.ping(&mut tarpc::context::current())
.await?;

Ok(())
Expand Down
23 changes: 14 additions & 9 deletions tarpc/examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ struct Subscriber {
}

impl subscriber::Subscriber for Subscriber {
async fn topics(self, _: context::Context) -> Vec<String> {
async fn topics(self, _: &mut context::Context) -> Vec<String> {
self.topics.clone()
}

async fn receive(self, _: context::Context, topic: String, message: String) {
async fn receive(self, _: &mut context::Context, topic: String, message: String) {
info!(local_addr = %self.local_addr, %topic, %message, "ReceivedMessage")
}
}
Expand Down Expand Up @@ -210,7 +210,7 @@ impl Publisher {
subscriber: subscriber::SubscriberClient,
) {
// Populate the topics
if let Ok(topics) = subscriber.topics(context::current()).await {
if let Ok(topics) = subscriber.topics(&mut context::current()).await {
self.clients.lock().unwrap().insert(
subscriber_addr,
Subscription {
Expand Down Expand Up @@ -263,15 +263,20 @@ impl Publisher {
}

impl publisher::Publisher for Publisher {
async fn publish(self, _: context::Context, topic: String, message: String) {
async fn publish(self, _: &mut context::Context, topic: String, message: String) {
info!("received message to publish.");
let mut subscribers = match self.subscriptions.read().unwrap().get(&topic) {
None => return,
Some(subscriptions) => subscriptions.clone(),
};
let mut publications = Vec::new();


for client in subscribers.values_mut() {
publications.push(client.receive(context::current(), topic.clone(), message.clone()));
publications.push(async {
let mut context = context::current();
client.receive(&mut context, topic.clone(), message.clone()).await
});
}
// Ignore failing subscribers. In a real pubsub, you'd want to continually retry until
// subscribers ack. Of course, a lot would be different in a real pubsub :)
Expand Down Expand Up @@ -342,26 +347,26 @@ async fn main() -> anyhow::Result<()> {
.spawn();

publisher
.publish(context::current(), "calculus".into(), "sqrt(2)".into())
.publish(&mut context::current(), "calculus".into(), "sqrt(2)".into())
.await?;

publisher
.publish(
context::current(),
&mut context::current(),
"cool shorts".into(),
"hello to all".into(),
)
.await?;

publisher
.publish(context::current(), "history".into(), "napoleon".to_string())
.publish(&mut context::current(), "history".into(), "napoleon".to_string())
.await?;

drop(_subscriber0);

publisher
.publish(
context::current(),
&mut context::current(),
"cool shorts".into(),
"hello to who?".into(),
)
Expand Down
4 changes: 2 additions & 2 deletions tarpc/examples/readme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait World {
struct HelloServer;

impl World for HelloServer {
async fn hello(self, _: context::Context, name: String) -> String {
async fn hello(self, _: &mut context::Context, name: String) -> String {
format!("Hello, {name}!")
}
}
Expand All @@ -46,7 +46,7 @@ async fn main() -> anyhow::Result<()> {
// The client has an RPC method for each RPC defined in the annotated trait. It takes the same
// args as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), "Stim".to_string()).await?;
let hello = client.hello(&mut context::current(), "Stim".to_string()).await?;

println!("{hello}");

Expand Down
4 changes: 2 additions & 2 deletions tarpc/examples/tls_over_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub trait PingService {
struct Service;

impl PingService for Service {
async fn ping(self, _: Context) -> String {
async fn ping(self, _: &mut Context) -> String {
"🔒".to_owned()
}
}
Expand Down Expand Up @@ -146,7 +146,7 @@ async fn main() -> anyhow::Result<()> {
let transport = transport::new(codec_builder.new_framed(stream), Bincode::default());
let answer = PingServiceClient::new(Default::default(), transport)
.spawn()
.ping(tarpc::context::current())
.ping(&mut tarpc::context::current())
.await?;

println!("ping answer: {answer}");
Expand Down
10 changes: 5 additions & 5 deletions tarpc/examples/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub mod double {
struct AddServer;

impl AddService for AddServer {
async fn add(self, _: context::Context, x: i32, y: i32) -> i32 {
async fn add(self, _: &mut context::Context, x: i32, y: i32) -> i32 {
x + y
}
}
Expand All @@ -70,9 +70,9 @@ impl<Stub> DoubleService for DoubleServer<Stub>
where
Stub: AddStub + Clone + Send + Sync + 'static,
{
async fn double(self, _: context::Context, x: i32) -> Result<i32, String> {
async fn double(self, _: &mut context::Context, x: i32) -> Result<i32, String> {
self.add_client
.add(context::current(), x, x)
.add(&mut context::current(), x, x)
.await
.map_err(|e| e.to_string())
}
Expand Down Expand Up @@ -193,9 +193,9 @@ async fn main() -> anyhow::Result<()> {
let double_client =
double::DoubleClient::new(client::Config::default(), to_double_server).spawn();

let ctx = context::current();
let mut ctx = context::current();
for _ in 1..=5 {
tracing::info!("{:?}", double_client.double(ctx, 1).await?);
tracing::info!("{:?}", double_client.double(&mut ctx, 1).await?);
}

tracer_provider.shutdown()?;
Expand Down
9 changes: 6 additions & 3 deletions tarpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ where
otel.kind = "client",
otel.name = %request.name())
)]
pub async fn call(&self, mut ctx: context::Context, request: Req) -> Result<Resp, RpcError> {
pub async fn call(&self, ctx: &mut context::Context, request: Req) -> Result<Resp, RpcError> {
let span = Span::current();
ctx.trace_context = trace::Context::try_from(&span).unwrap_or_else(|_| {
tracing::trace!(
Expand All @@ -153,7 +153,10 @@ where
};
self.to_dispatch
.send(DispatchRequest {
ctx,
ctx: context::Context {
deadline: ctx.deadline,
trace_context: ctx.trace_context.clone(),
},
span,
request_id,
request,
Expand Down Expand Up @@ -881,7 +884,7 @@ mod tests {
let (dispatch, channel, _server_channel) = set_up();
drop(dispatch);
// error on send
let resp = channel.call(current(), "hi".to_string()).await;
let resp = channel.call(&mut current(), "hi".to_string()).await;
assert_matches!(resp, Err(RpcError::Shutdown));
}

Expand Down
6 changes: 3 additions & 3 deletions tarpc/src/client/stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub trait Stub {
type Resp;

/// Calls a remote service.
async fn call(&self, ctx: context::Context, request: Self::Req)
async fn call(&self, ctx: &mut context::Context, request: Self::Req)
-> Result<Self::Resp, RpcError>;
}

Expand All @@ -35,7 +35,7 @@ where
type Req = Req;
type Resp = Resp;

async fn call(&self, ctx: context::Context, request: Req) -> Result<Self::Resp, RpcError> {
async fn call(&self, ctx: &mut context::Context, request: Req) -> Result<Self::Resp, RpcError> {
Self::call(self, ctx, request).await
}
}
Expand All @@ -46,7 +46,7 @@ where
{
type Req = S::Req;
type Resp = S::Resp;
async fn call(&self, ctx: context::Context, req: Self::Req) -> Result<Self::Resp, RpcError> {
async fn call(&self, ctx: &mut context::Context, req: Self::Req) -> Result<Self::Resp, RpcError> {
self.clone().serve(ctx, req).await.map_err(RpcError::Server)
}
}
10 changes: 5 additions & 5 deletions tarpc/src/client/stub/load_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod round_robin {

async fn call(
&self,
ctx: context::Context,
ctx: &mut context::Context,
request: Self::Req,
) -> Result<Stub::Resp, RpcError> {
let next = self.stubs.next();
Expand Down Expand Up @@ -119,7 +119,7 @@ mod consistent_hash {

async fn call(
&self,
ctx: context::Context,
ctx: &mut context::Context,
request: Self::Req,
) -> Result<Stub::Resp, RpcError> {
let index = usize::try_from(self.hasher.hash_one(&request) % self.stubs_len).expect(
Expand Down Expand Up @@ -200,13 +200,13 @@ mod consistent_hash {
)?;

for _ in 0..2 {
let resp = stub.call(context::current(), 'a').await?;
let resp = stub.call(&mut context::current(), 'a').await?;
assert_eq!(resp, 1);

let resp = stub.call(context::current(), 'b').await?;
let resp = stub.call(&mut context::current(), 'b').await?;
assert_eq!(resp, 2);

let resp = stub.call(context::current(), 'c').await?;
let resp = stub.call(&mut context::current(), 'c').await?;
assert_eq!(resp, 3);
}

Expand Down
2 changes: 1 addition & 1 deletion tarpc/src/client/stub/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where
type Req = Req;
type Resp = Resp;

async fn call(&self, _: context::Context, request: Self::Req) -> Result<Resp, RpcError> {
async fn call(&self, _: &mut context::Context, request: Self::Req) -> Result<Resp, RpcError> {
self.responses
.get(&request)
.cloned()
Expand Down
Loading
Loading