From 20e2ba4fcf2fc9cf77b5070328880a735db8ab63 Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Mon, 5 May 2025 13:48:40 +0200 Subject: [PATCH 1/4] feat(client): cleanup of zombie processes in child process client Using process wrap library, Process Group for Unix and Job Object for Windows --- crates/rmcp/Cargo.toml | 1 + crates/rmcp/src/transport/child_process.rs | 49 ++++++++++++++-------- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/crates/rmcp/Cargo.toml b/crates/rmcp/Cargo.toml index 088c72fb2..b79c1b84d 100644 --- a/crates/rmcp/Cargo.toml +++ b/crates/rmcp/Cargo.toml @@ -18,6 +18,7 @@ serde_json = "1.0" thiserror = "2" chrono = { version = "0.4.38", features = ["serde"] } tokio = { version = "1", features = ["sync", "macros", "rt", "time"] } +process-wrap = { version = "8.2", features = ["tokio1"] } futures = "0.3" tracing = { version = "0.1" } tokio-util = { version = "0.7" } diff --git a/crates/rmcp/src/transport/child_process.rs b/crates/rmcp/src/transport/child_process.rs index 640d6414e..53ef844eb 100644 --- a/crates/rmcp/src/transport/child_process.rs +++ b/crates/rmcp/src/transport/child_process.rs @@ -1,4 +1,5 @@ use futures::{Sink, Stream}; +use process_wrap::tokio::{TokioChildWrapper, TokioCommandWrap}; use tokio::{ io::AsyncRead, process::{ChildStdin, ChildStdout}, @@ -8,29 +9,39 @@ use super::IntoTransport; use crate::service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage}; pub(crate) fn child_process( - mut child: tokio::process::Child, -) -> std::io::Result<(tokio::process::Child, (ChildStdout, ChildStdin))> { - if child.stdin.is_none() { - return Err(std::io::Error::other("std in was taken")); - } - if child.stdout.is_none() { - return Err(std::io::Error::other("std out was taken")); - } - let child_stdin = child.stdin.take().expect("already checked"); - let child_stdout = child.stdout.take().expect("already checked"); + mut child: Box, +) -> std::io::Result<(Box, (ChildStdout, ChildStdin))> { + let child_stdin = match child.inner_mut().stdin().take() { + Some(stdin) => stdin, + None => return Err(std::io::Error::other("std in was taken")), + }; + let child_stdout = match child.inner_mut().stdout().take() { + Some(stdout) => stdout, + None => return Err(std::io::Error::other("std out was taken")), + }; Ok((child, (child_stdout, child_stdin))) } pub struct TokioChildProcess { - child: tokio::process::Child, + child: ChildWithCleanup, child_stdin: ChildStdin, child_stdout: ChildStdout, } +pub struct ChildWithCleanup { + inner: Box, +} + +impl Drop for ChildWithCleanup { + fn drop(&mut self) { + let _ = self.inner.start_kill(); + } +} + // we hold the child process with stdout, for it's easier to implement AsyncRead pin_project_lite::pin_project! { pub struct TokioChildProcessOut { - child: tokio::process::Child, + child: ChildWithCleanup, #[pin] child_stdout: ChildStdout, } @@ -47,14 +58,18 @@ impl AsyncRead for TokioChildProcessOut { } impl TokioChildProcess { - pub fn new(child: &mut tokio::process::Command) -> std::io::Result { - child - .kill_on_drop(true) + pub fn new(mut command: tokio::process::Command) -> std::io::Result { + command .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()); - let (child, (child_stdout, child_stdin)) = child_process(child.spawn()?)?; + let mut command_wrap = TokioCommandWrap::from(command); + #[cfg(unix)] + command_wrap.wrap(process_wrap::tokio::ProcessGroup::leader()); + #[cfg(windows)] + command_wrap.wrap(process_wrap::tokio::JobObject); + let (child, (child_stdout, child_stdin)) = child_process(command_wrap.spawn()?)?; Ok(Self { - child, + child: ChildWithCleanup { inner: child }, child_stdin, child_stdout, }) From dedf2048ad0ab1b1a4aca7ba49b90d81da4e170a Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Tue, 6 May 2025 13:19:12 +0200 Subject: [PATCH 2/4] fix: install extra dep based on feature, update examples take owned command Install process-wrap if transport-child-process feature is enabled. Update examples to take ownership of the command instead of mutable reference --- crates/rmcp/Cargo.toml | 9 +++++++-- crates/rmcp/tests/test_with_js.rs | 6 +++--- crates/rmcp/tests/test_with_python.rs | 9 ++++----- examples/clients/src/collection.rs | 9 +++------ examples/clients/src/everything_stdio.rs | 11 ++++------- examples/clients/src/std_io.rs | 8 +++----- examples/rig-integration/src/config/mcp.rs | 11 +++++------ 7 files changed, 29 insertions(+), 34 deletions(-) diff --git a/crates/rmcp/Cargo.toml b/crates/rmcp/Cargo.toml index b79c1b84d..bce1ba668 100644 --- a/crates/rmcp/Cargo.toml +++ b/crates/rmcp/Cargo.toml @@ -18,7 +18,6 @@ serde_json = "1.0" thiserror = "2" chrono = { version = "0.4.38", features = ["serde"] } tokio = { version = "1", features = ["sync", "macros", "rt", "time"] } -process-wrap = { version = "8.2", features = ["tokio1"] } futures = "0.3" tracing = { version = "0.1" } tokio-util = { version = "0.7" } @@ -46,6 +45,8 @@ url = { version = "2.4", optional = true } # For tower compatibility tower-service = { version = "0.3", optional = true } +# for child process transport +process-wrap = { version = "8.2", features = ["tokio1"], optional = true} # for ws transport # tokio-tungstenite ={ version = "0.26", optional = true } @@ -66,7 +67,11 @@ macros = ["dep:rmcp-macros", "dep:paste"] transport-sse = ["dep:reqwest", "dep:sse-stream", "dep:url"] transport-async-rw = ["tokio/io-util", "tokio-util/codec"] transport-io = ["transport-async-rw", "tokio/io-std"] -transport-child-process = ["transport-async-rw", "tokio/process"] +transport-child-process = [ + "transport-async-rw", + "tokio/process", + "dep:process-wrap", +] transport-sse-server = [ "transport-async-rw", "dep:axum", diff --git a/crates/rmcp/tests/test_with_js.rs b/crates/rmcp/tests/test_with_js.rs index f5431beef..f927bf2fc 100644 --- a/crates/rmcp/tests/test_with_js.rs +++ b/crates/rmcp/tests/test_with_js.rs @@ -53,9 +53,9 @@ async fn test_with_js_server() -> anyhow::Result<()> { .spawn()? .wait() .await?; - let transport = TokioChildProcess::new( - tokio::process::Command::new("node").arg("tests/test_with_js/server.js"), - )?; + let mut cmd = tokio::process::Command::new("node"); + cmd.arg("tests/test_with_js/server.js"); + let transport = TokioChildProcess::new(cmd)?; let client = ().serve(transport).await?; let resources = client.list_all_resources().await?; diff --git a/crates/rmcp/tests/test_with_python.rs b/crates/rmcp/tests/test_with_python.rs index 7e0fbd713..8f42abbec 100644 --- a/crates/rmcp/tests/test_with_python.rs +++ b/crates/rmcp/tests/test_with_python.rs @@ -54,11 +54,10 @@ async fn test_with_python_server() -> anyhow::Result<()> { .spawn()? .wait() .await?; - let transport = TokioChildProcess::new( - tokio::process::Command::new("uv") - .arg("run") - .arg("tests/test_with_python/server.py"), - )?; + let mut cmd = tokio::process::Command::new("uv"); + cmd.arg("run"); + cmd.arg("tests/test_with_python/server.py"); + let transport = TokioChildProcess::new(cmd)?; let client = ().serve(transport).await?; let resources = client.list_all_resources().await?; diff --git a/examples/clients/src/collection.rs b/examples/clients/src/collection.rs index f474ea85f..f8d1339fa 100644 --- a/examples/clients/src/collection.rs +++ b/examples/clients/src/collection.rs @@ -18,12 +18,9 @@ async fn main() -> Result<()> { let mut client_list = HashMap::new(); for idx in 0..10 { - let service = () - .into_dyn() - .serve(TokioChildProcess::new( - Command::new("uvx").arg("mcp-server-git"), - )?) - .await?; + let mut cmd = Command::new("uvx"); + cmd.arg("mcp-client-git"); + let service = ().into_dyn().serve(TokioChildProcess::new(cmd)?).await?; client_list.insert(idx, service); } diff --git a/examples/clients/src/everything_stdio.rs b/examples/clients/src/everything_stdio.rs index 091e9053e..7c4a408f3 100644 --- a/examples/clients/src/everything_stdio.rs +++ b/examples/clients/src/everything_stdio.rs @@ -20,13 +20,10 @@ async fn main() -> Result<()> { .init(); // Start server - let service = () - .serve(TokioChildProcess::new( - Command::new("npx") - .arg("-y") - .arg("@modelcontextprotocol/server-everything"), - )?) - .await?; + let mut cmd = Command::new("npx"); + cmd.arg("-y"); + cmd.arg("@modelcontextprotocol/server-everything"); + let service = ().serve(TokioChildProcess::new(cmd)?).await?; // Initialize let server_info = service.peer_info(); diff --git a/examples/clients/src/std_io.rs b/examples/clients/src/std_io.rs index 987418915..fca245ede 100644 --- a/examples/clients/src/std_io.rs +++ b/examples/clients/src/std_io.rs @@ -13,11 +13,9 @@ async fn main() -> Result<()> { ) .with(tracing_subscriber::fmt::layer()) .init(); - let service = () - .serve(TokioChildProcess::new( - Command::new("uvx").arg("mcp-server-git"), - )?) - .await?; + let mut cmd = Command::new("uvx"); + cmd.arg("mcp-server-git"); + let service = ().serve(TokioChildProcess::new(cmd)?).await?; // or // serve_client( diff --git a/examples/rig-integration/src/config/mcp.rs b/examples/rig-integration/src/config/mcp.rs index ba42f2bc0..96e0fe24b 100644 --- a/examples/rig-integration/src/config/mcp.rs +++ b/examples/rig-integration/src/config/mcp.rs @@ -69,12 +69,11 @@ impl McpServerTransportConfig { args, envs, } => { - let transport = rmcp::transport::TokioChildProcess::new( - tokio::process::Command::new(command) - .args(args) - .envs(envs) - .stderr(Stdio::null()), - )?; + let mut cmd = tokio::process::Command::new(command); + cmd.args(args); + cmd.envs(envs); + cmd.stderr(Stdio::null()); + let transport = rmcp::transport::TokioChildProcess::new(cmd)?; ().serve(transport).await? } }; From 9da3f84ebeff2c90b653846ec52d5db3d3742631 Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Wed, 7 May 2025 15:36:54 +0200 Subject: [PATCH 3/4] fix: update other examples, comments and readme to take ownership of command Updated more examples, comments and readme to take command instead of mutable ref. Also small fix in cargo toml of example to use local path. --- README.md | 10 +++++----- crates/rmcp/src/lib.rs | 8 +++----- docs/readme/README.zh-cn.md | 8 ++++---- examples/clients/src/std_io.rs | 7 +------ examples/rig-integration/src/config/mcp.rs | 4 +--- examples/simple-chat-client/Cargo.toml | 2 +- examples/simple-chat-client/src/config.rs | 13 ++++++------- 7 files changed, 21 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index a174a4050..7650757a3 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ rmcp = { git = "https://github.com/modelcontextprotocol/rust-sdk", branch = "mai ### Quick start -Start a client in one line: +Start a client: ```rust, ignore use rmcp::{ServiceExt, transport::TokioChildProcess}; @@ -30,9 +30,9 @@ use tokio::process::Command; #[tokio::main] async fn main() -> Result<(), Box> { - let client = ().serve( - TokioChildProcess::new(Command::new("npx").arg("-y").arg("@modelcontextprotocol/server-everything"))? - ).await?; + let mut cmd = Command::new("npx"); + cmd.arg("-y").arg("@modelcontextprotocol/server-everything"); + let client = ().serve(TokioChildProcess::new(cmd)?).await?; Ok(()) } ``` @@ -86,7 +86,7 @@ let server = service.serve(transport).await?; Once the server is initialized, you can send requests or notifications: ```rust, ignore -// request +// request let roots = server.list_roots().await?; // or send notification diff --git a/crates/rmcp/src/lib.rs b/crates/rmcp/src/lib.rs index f82a37b75..ca1f37d40 100644 --- a/crates/rmcp/src/lib.rs +++ b/crates/rmcp/src/lib.rs @@ -60,11 +60,9 @@ //! use tokio::process::Command; //! //! async fn client() -> Result<()> { -//! let service = () -//! .serve(TokioChildProcess::new( -//! Command::new("uvx").arg("mcp-server-git"), -//! )?) -//! .await?; +//! let mut cmd = Command::new("uvx"); +//! cmd.arg("mcp-server-git"); +//! let service = ().serve(TokioChildProcess::new(cmd)?).await?; //! //! // Initialize //! let server_info = service.peer_info(); diff --git a/docs/readme/README.zh-cn.md b/docs/readme/README.zh-cn.md index 606138565..c6937a3af 100644 --- a/docs/readme/README.zh-cn.md +++ b/docs/readme/README.zh-cn.md @@ -20,9 +20,9 @@ rmcp = { git = "https://github.com/modelcontextprotocol/rust-sdk", branch = "mai use rmcp::{ServiceExt, transport::TokioChildProcess}; use tokio::process::Command; -let client = ().serve( - TokioChildProcess::new(Command::new("npx").arg("-y").arg("@modelcontextprotocol/server-everything"))? -).await?; +let mut cmd = Command::new("npx"); +cmd.arg("-y").arg("@modelcontextprotocol/server-everything"); +let client = ().serve(TokioChildProcess::new(cmd)?).await?; ``` #### 1. 构建传输层 @@ -63,7 +63,7 @@ let server = service.serve(transport).await?; 一旦服务初始化完成,你可以发送请求或通知: ```rust, ignore -// 请求 +// 请求 let roots = server.list_roots().await?; // 或发送通知 diff --git a/examples/clients/src/std_io.rs b/examples/clients/src/std_io.rs index fca245ede..0f7ce8f12 100644 --- a/examples/clients/src/std_io.rs +++ b/examples/clients/src/std_io.rs @@ -17,12 +17,7 @@ async fn main() -> Result<()> { cmd.arg("mcp-server-git"); let service = ().serve(TokioChildProcess::new(cmd)?).await?; - // or - // serve_client( - // (), - // TokioChildProcess::new(Command::new("uvx").arg("mcp-server-git"))?, - // ) - // .await?; + // or serve_client((), TokioChildProcess::new(cmd)?).await?; // Initialize let server_info = service.peer_info(); diff --git a/examples/rig-integration/src/config/mcp.rs b/examples/rig-integration/src/config/mcp.rs index 96e0fe24b..9bbb6df98 100644 --- a/examples/rig-integration/src/config/mcp.rs +++ b/examples/rig-integration/src/config/mcp.rs @@ -70,9 +70,7 @@ impl McpServerTransportConfig { envs, } => { let mut cmd = tokio::process::Command::new(command); - cmd.args(args); - cmd.envs(envs); - cmd.stderr(Stdio::null()); + cmd.args(args).envs(envs).stderr(Stdio::null()); let transport = rmcp::transport::TokioChildProcess::new(cmd)?; ().serve(transport).await? } diff --git a/examples/simple-chat-client/Cargo.toml b/examples/simple-chat-client/Cargo.toml index 5387e9245..9b2a99310 100644 --- a/examples/simple-chat-client/Cargo.toml +++ b/examples/simple-chat-client/Cargo.toml @@ -13,7 +13,7 @@ thiserror = "1.0" async-trait = "0.1" futures = "0.3" toml = "0.8" -rmcp = { git = "https://github.com/modelcontextprotocol/rust-sdk", features = [ +rmcp = { path = "../../crates/rmcp", features = [ "client", "transport-child-process", "transport-sse", diff --git a/examples/simple-chat-client/src/config.rs b/examples/simple-chat-client/src/config.rs index e84d5b05b..3c5d0bb78 100644 --- a/examples/simple-chat-client/src/config.rs +++ b/examples/simple-chat-client/src/config.rs @@ -52,13 +52,12 @@ impl McpServerTransportConfig { args, envs, } => { - let transport = rmcp::transport::child_process::TokioChildProcess::new( - tokio::process::Command::new(command) - .args(args) - .envs(envs) - .stderr(Stdio::inherit()) - .stdout(Stdio::inherit()), - )?; + let mut cmd = tokio::process::Command::new(command); + cmd.args(args) + .envs(envs) + .stderr(Stdio::inherit()) + .stdout(Stdio::inherit()); + let transport = rmcp::transport::child_process::TokioChildProcess::new(cmd)?; ().serve(transport).await? } }; From 9edc6c824200a2d7f4b213c1b2b9a56c601c9341 Mon Sep 17 00:00:00 2001 From: Humberto Yusta Date: Sat, 17 May 2025 17:30:28 +0200 Subject: [PATCH 4/4] refactor: add configure command ext Added configure command ext to tokio process command so that you can use .configure() to use inline commands for mcp stdio client. Added warning if start kill process fails --- README.md | 8 ++++---- crates/rmcp/src/lib.rs | 6 +++--- crates/rmcp/src/transport.rs | 2 +- crates/rmcp/src/transport/child_process.rs | 15 ++++++++++++++- crates/rmcp/tests/test_with_js.rs | 9 +++++---- crates/rmcp/tests/test_with_python.rs | 9 ++++----- docs/readme/README.zh-cn.md | 8 ++++---- examples/clients/src/collection.rs | 17 +++++++++++++---- examples/clients/src/everything_stdio.rs | 13 ++++++++----- examples/clients/src/std_io.rs | 16 ++++++++++++---- examples/rig-integration/src/config/mcp.rs | 10 ++++++---- examples/simple-chat-client/src/config.rs | 16 +++++++++------- 12 files changed, 83 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index 7650757a3..b7b471065 100644 --- a/README.md +++ b/README.md @@ -25,14 +25,14 @@ rmcp = { git = "https://github.com/modelcontextprotocol/rust-sdk", branch = "mai Start a client: ```rust, ignore -use rmcp::{ServiceExt, transport::TokioChildProcess}; +use rmcp::{ServiceExt, transport::{TokioChildProcess, ConfigureCommandExt}}; use tokio::process::Command; #[tokio::main] async fn main() -> Result<(), Box> { - let mut cmd = Command::new("npx"); - cmd.arg("-y").arg("@modelcontextprotocol/server-everything"); - let client = ().serve(TokioChildProcess::new(cmd)?).await?; + let client = ().serve(TokioChildProcess::new(Command::new("npx").configure(|cmd| { + cmd.arg("-y").arg("@modelcontextprotocol/server-everything"); + }))?).await?; Ok(()) } ``` diff --git a/crates/rmcp/src/lib.rs b/crates/rmcp/src/lib.rs index ca1f37d40..af015ee83 100644 --- a/crates/rmcp/src/lib.rs +++ b/crates/rmcp/src/lib.rs @@ -60,9 +60,9 @@ //! use tokio::process::Command; //! //! async fn client() -> Result<()> { -//! let mut cmd = Command::new("uvx"); -//! cmd.arg("mcp-server-git"); -//! let service = ().serve(TokioChildProcess::new(cmd)?).await?; +//! let service = ().serve(TokioChildProcess::new(Command::new("uvx").configure(|cmd| { +//! cmd.arg("mcp-server-git"); +//! }))?).await?; //! //! // Initialize //! let server_info = service.peer_info(); diff --git a/crates/rmcp/src/transport.rs b/crates/rmcp/src/transport.rs index 9b49f7622..f544dca33 100644 --- a/crates/rmcp/src/transport.rs +++ b/crates/rmcp/src/transport.rs @@ -44,7 +44,7 @@ use crate::service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage}; #[cfg(feature = "transport-child-process")] pub mod child_process; #[cfg(feature = "transport-child-process")] -pub use child_process::TokioChildProcess; +pub use child_process::{ConfigureCommandExt, TokioChildProcess}; #[cfg(feature = "transport-async-rw")] pub mod io; diff --git a/crates/rmcp/src/transport/child_process.rs b/crates/rmcp/src/transport/child_process.rs index 53ef844eb..37c5d00ce 100644 --- a/crates/rmcp/src/transport/child_process.rs +++ b/crates/rmcp/src/transport/child_process.rs @@ -34,7 +34,9 @@ pub struct ChildWithCleanup { impl Drop for ChildWithCleanup { fn drop(&mut self) { - let _ = self.inner.start_kill(); + if let Err(e) = self.inner.start_kill() { + tracing::warn!("Failed to kill child process: {e}"); + } } } @@ -103,3 +105,14 @@ impl IntoTransport for TokioChildProcess ) } } + +pub trait ConfigureCommandExt { + fn configure(self, f: impl FnOnce(&mut Self)) -> Self; +} + +impl ConfigureCommandExt for tokio::process::Command { + fn configure(mut self, f: impl FnOnce(&mut Self)) -> Self { + f(&mut self); + self + } +} diff --git a/crates/rmcp/tests/test_with_js.rs b/crates/rmcp/tests/test_with_js.rs index f927bf2fc..f7247420b 100644 --- a/crates/rmcp/tests/test_with_js.rs +++ b/crates/rmcp/tests/test_with_js.rs @@ -1,6 +1,6 @@ use rmcp::{ ServiceExt, - transport::{SseServer, TokioChildProcess}, + transport::{ConfigureCommandExt, SseServer, TokioChildProcess}, }; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; mod common; @@ -53,9 +53,10 @@ async fn test_with_js_server() -> anyhow::Result<()> { .spawn()? .wait() .await?; - let mut cmd = tokio::process::Command::new("node"); - cmd.arg("tests/test_with_js/server.js"); - let transport = TokioChildProcess::new(cmd)?; + let transport = + TokioChildProcess::new(tokio::process::Command::new("node").configure(|cmd| { + cmd.arg("tests/test_with_js/server.js"); + }))?; let client = ().serve(transport).await?; let resources = client.list_all_resources().await?; diff --git a/crates/rmcp/tests/test_with_python.rs b/crates/rmcp/tests/test_with_python.rs index 8f42abbec..f8fa56ee6 100644 --- a/crates/rmcp/tests/test_with_python.rs +++ b/crates/rmcp/tests/test_with_python.rs @@ -1,6 +1,6 @@ use rmcp::{ ServiceExt, - transport::{SseServer, TokioChildProcess}, + transport::{ConfigureCommandExt, SseServer, TokioChildProcess}, }; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; mod common; @@ -54,10 +54,9 @@ async fn test_with_python_server() -> anyhow::Result<()> { .spawn()? .wait() .await?; - let mut cmd = tokio::process::Command::new("uv"); - cmd.arg("run"); - cmd.arg("tests/test_with_python/server.py"); - let transport = TokioChildProcess::new(cmd)?; + let transport = TokioChildProcess::new(tokio::process::Command::new("uv").configure(|cmd| { + cmd.arg("run").arg("tests/test_with_python/server.py"); + }))?; let client = ().serve(transport).await?; let resources = client.list_all_resources().await?; diff --git a/docs/readme/README.zh-cn.md b/docs/readme/README.zh-cn.md index c6937a3af..cc730c1da 100644 --- a/docs/readme/README.zh-cn.md +++ b/docs/readme/README.zh-cn.md @@ -17,12 +17,12 @@ rmcp = { git = "https://github.com/modelcontextprotocol/rust-sdk", branch = "mai ### 快速上手 一行代码启动客户端: ```rust -use rmcp::{ServiceExt, transport::TokioChildProcess}; +use rmcp::{ServiceExt, transport::{TokioChildProcess, ConfigureCommandExt}}; use tokio::process::Command; -let mut cmd = Command::new("npx"); -cmd.arg("-y").arg("@modelcontextprotocol/server-everything"); -let client = ().serve(TokioChildProcess::new(cmd)?).await?; +let client = ().serve(TokioChildProcess::new(Command::new("npx").configure(|cmd| { + cmd.arg("-y").arg("@modelcontextprotocol/server-everything"); +}))?).await?; ``` #### 1. 构建传输层 diff --git a/examples/clients/src/collection.rs b/examples/clients/src/collection.rs index f8d1339fa..244d369c4 100644 --- a/examples/clients/src/collection.rs +++ b/examples/clients/src/collection.rs @@ -1,7 +1,11 @@ use std::collections::HashMap; use anyhow::Result; -use rmcp::{model::CallToolRequestParam, service::ServiceExt, transport::TokioChildProcess}; +use rmcp::{ + model::CallToolRequestParam, + service::ServiceExt, + transport::{ConfigureCommandExt, TokioChildProcess}, +}; use tokio::process::Command; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -18,9 +22,14 @@ async fn main() -> Result<()> { let mut client_list = HashMap::new(); for idx in 0..10 { - let mut cmd = Command::new("uvx"); - cmd.arg("mcp-client-git"); - let service = ().into_dyn().serve(TokioChildProcess::new(cmd)?).await?; + let service = () + .into_dyn() + .serve(TokioChildProcess::new(Command::new("uvx").configure( + |cmd| { + cmd.arg("mcp-client-git"); + }, + ))?) + .await?; client_list.insert(idx, service); } diff --git a/examples/clients/src/everything_stdio.rs b/examples/clients/src/everything_stdio.rs index 7c4a408f3..a2a7475aa 100644 --- a/examples/clients/src/everything_stdio.rs +++ b/examples/clients/src/everything_stdio.rs @@ -3,7 +3,7 @@ use rmcp::{ ServiceExt, model::{CallToolRequestParam, GetPromptRequestParam, ReadResourceRequestParam}, object, - transport::TokioChildProcess, + transport::{ConfigureCommandExt, TokioChildProcess}, }; use tokio::process::Command; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -20,10 +20,13 @@ async fn main() -> Result<()> { .init(); // Start server - let mut cmd = Command::new("npx"); - cmd.arg("-y"); - cmd.arg("@modelcontextprotocol/server-everything"); - let service = ().serve(TokioChildProcess::new(cmd)?).await?; + let service = () + .serve(TokioChildProcess::new(Command::new("npx").configure( + |cmd| { + cmd.arg("-y").arg("@modelcontextprotocol/server-everything"); + }, + ))?) + .await?; // Initialize let server_info = service.peer_info(); diff --git a/examples/clients/src/std_io.rs b/examples/clients/src/std_io.rs index 0f7ce8f12..0e04763d6 100644 --- a/examples/clients/src/std_io.rs +++ b/examples/clients/src/std_io.rs @@ -1,5 +1,9 @@ use anyhow::Result; -use rmcp::{model::CallToolRequestParam, service::ServiceExt, transport::TokioChildProcess}; +use rmcp::{ + model::CallToolRequestParam, + service::ServiceExt, + transport::{ConfigureCommandExt, TokioChildProcess}, +}; use tokio::process::Command; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -13,9 +17,13 @@ async fn main() -> Result<()> { ) .with(tracing_subscriber::fmt::layer()) .init(); - let mut cmd = Command::new("uvx"); - cmd.arg("mcp-server-git"); - let service = ().serve(TokioChildProcess::new(cmd)?).await?; + let service = () + .serve(TokioChildProcess::new(Command::new("uvx").configure( + |cmd| { + cmd.arg("mcp-server-git"); + }, + ))?) + .await?; // or serve_client((), TokioChildProcess::new(cmd)?).await?; diff --git a/examples/rig-integration/src/config/mcp.rs b/examples/rig-integration/src/config/mcp.rs index 9bbb6df98..834f2fda6 100644 --- a/examples/rig-integration/src/config/mcp.rs +++ b/examples/rig-integration/src/config/mcp.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, process::Stdio}; -use rmcp::{RoleClient, ServiceExt, service::RunningService}; +use rmcp::{RoleClient, ServiceExt, service::RunningService, transport::ConfigureCommandExt}; use serde::{Deserialize, Serialize}; use crate::mcp_adaptor::McpManager; @@ -69,9 +69,11 @@ impl McpServerTransportConfig { args, envs, } => { - let mut cmd = tokio::process::Command::new(command); - cmd.args(args).envs(envs).stderr(Stdio::null()); - let transport = rmcp::transport::TokioChildProcess::new(cmd)?; + let transport = rmcp::transport::TokioChildProcess::new( + tokio::process::Command::new(command).configure(|cmd| { + cmd.args(args).envs(envs).stderr(Stdio::null()); + }), + )?; ().serve(transport).await? } }; diff --git a/examples/simple-chat-client/src/config.rs b/examples/simple-chat-client/src/config.rs index 3c5d0bb78..c363d5593 100644 --- a/examples/simple-chat-client/src/config.rs +++ b/examples/simple-chat-client/src/config.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, path::Path, process::Stdio}; use anyhow::Result; -use rmcp::{RoleClient, ServiceExt, service::RunningService}; +use rmcp::{RoleClient, ServiceExt, service::RunningService, transport::ConfigureCommandExt}; use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] @@ -52,12 +52,14 @@ impl McpServerTransportConfig { args, envs, } => { - let mut cmd = tokio::process::Command::new(command); - cmd.args(args) - .envs(envs) - .stderr(Stdio::inherit()) - .stdout(Stdio::inherit()); - let transport = rmcp::transport::child_process::TokioChildProcess::new(cmd)?; + let transport = rmcp::transport::child_process::TokioChildProcess::new( + tokio::process::Command::new(command).configure(|cmd| { + cmd.args(args) + .envs(envs) + .stderr(Stdio::inherit()) + .stdout(Stdio::inherit()); + }), + )?; ().serve(transport).await? } };