From 2bf3f501810c691df58c2d247a1759632a416b3f Mon Sep 17 00:00:00 2001 From: jenrryyou Date: Mon, 2 Sep 2024 17:34:50 +0800 Subject: [PATCH 1/4] feat: batch create and accept stream --- docs/cn/streaming_rpc.md | 17 ++- docs/en/streaming_rpc.md | 16 +- .../streaming_batch_echo_c++/CMakeLists.txt | 140 ++++++++++++++++++ example/streaming_batch_echo_c++/client.cpp | 91 ++++++++++++ example/streaming_batch_echo_c++/echo.proto | 33 +++++ example/streaming_batch_echo_c++/server.cpp | 117 +++++++++++++++ src/brpc/channel.cpp | 2 +- src/brpc/controller.cpp | 44 ++++-- src/brpc/controller.h | 6 +- .../details/controller_private_accessor.h | 4 +- src/brpc/policy/baidu_rpc_protocol.cpp | 38 +++-- src/brpc/stream.cpp | 96 +++++++++--- src/brpc/stream.h | 15 ++ src/brpc/stream_impl.h | 5 +- src/brpc/streaming_rpc_meta.proto | 1 + 15 files changed, 571 insertions(+), 54 deletions(-) create mode 100644 example/streaming_batch_echo_c++/CMakeLists.txt create mode 100644 example/streaming_batch_echo_c++/client.cpp create mode 100644 example/streaming_batch_echo_c++/echo.proto create mode 100644 example/streaming_batch_echo_c++/server.cpp diff --git a/docs/cn/streaming_rpc.md b/docs/cn/streaming_rpc.md index 1480b83b27..c9f9ab3673 100644 --- a/docs/cn/streaming_rpc.md +++ b/docs/cn/streaming_rpc.md @@ -23,7 +23,7 @@ Streaming RPC保证: # 建立Stream -目前Stream都由Client端建立。Client先在本地创建一个Stream,再通过一次RPC(必须使用baidu_std协议)与指定的Service建立一个Stream,如果Service在收到请求之后选择接受这个Stream, 那在response返回Client后Stream就会建立成功。过程中的任何错误都把RPC标记为失败,同时也意味着Stream创建失败。用linux下建立连接的过程打比方,Client先创建[socket](http://linux.die.net/man/7/socket)(创建Stream),再调用[connect](http://linux.die.net/man/2/connect)尝试与远端建立连接(通过RPC建立Stream),远端[accept](http://linux.die.net/man/2/accept)后连接就建立了(service接受后创建成功)。 +目前Stream都由Client端建立。Client先在本地创建一个或者多个Stream,再通过一次RPC(必须使用baidu_std协议)与指定的Service建立一个Stream,如果Service在收到请求之后选择接受这批Stream, 那在response返回Client后这批Stream就会建立成功。过程中的任何错误都把RPC标记为失败,同时也意味着Stream创建失败。用linux下建立连接的过程打比方,Client先创建[socket](http://linux.die.net/man/7/socket)(创建Stream),再调用[connect](http://linux.die.net/man/2/connect)尝试与远端建立连接(通过RPC建立Stream),远端[accept](http://linux.die.net/man/2/accept)后连接就建立了(service接受后创建成功)。 > 如果Client尝试向不支持Streaming RPC的老Server建立Stream,将总是失败。 @@ -58,11 +58,18 @@ struct StreamOptions // NULL, the Stream will be created with default options // Return 0 on success, -1 otherwise int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions* options); + +// [Called at the client side for creating multiple streams] +// Create streams at client-side along with the |cntl|, which will be connected +// when receiving the response with streams from server-side. If |options| is +// NULL, the stream will be created with default options +// Return 0 on success, -1 otherwise +int StreamCreate(StreamIds& request_streams, int request_stream_size, Controller& cntl, const StreamOptions* options); ``` # 接受Stream -如果client在RPC上附带了一个Stream, service在收到RPC后可以通过调用StreamAccept接受。接受后Server端对应产生的Stream存放在response_stream中,Server可通过这个Stream向Client发送数据。 +如果client在RPC上附带了一个或者多个Stream, service在收到RPC后可以通过调用StreamAccept接受。接受后Server端对应产生的Stream存放在response_stream中,Server可通过这个Stream向Client发送数据。 ```c++ // [Called at the server side] @@ -70,6 +77,12 @@ int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions // (cntl.has_remote_stream() returns false), this method would fail. // Return 0 on success, -1 otherwise. int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options); + +// [Called at the server side for accepting multiple streams] +// Accept the streams. If client didn't create streams with the request +// (cntl.has_remote_stream() returns false), this method would fail. +// Return 0 on success, -1 otherwise. +int StreamAccept(StreamIds& response_stream, Controller& cntl, const StreamOptions* options); ``` # 读取Stream diff --git a/docs/en/streaming_rpc.md b/docs/en/streaming_rpc.md index 80302015fe..8e06714887 100644 --- a/docs/en/streaming_rpc.md +++ b/docs/en/streaming_rpc.md @@ -23,7 +23,7 @@ For examples please refer to [example/streaming_echo_c++](https://github.com/apa # Create a Stream -Currently stream is established by the client only. A new Stream object is created in client and then is used to issue an RPC (through baidu_std protocol) to the specified service. The service could accept this stream by responding to the request without error, thus a Stream is created once the client receives the response successfully. Any error during this process fails the RPC and thus fails the Stream creation. Take the Linux environment as an example, the client creates a [socket](http://linux.die.net/man/7/socket) first (creates a Stream), and then tries to establish a connection with the remote side by [connect](http://linux.die.net/man/2/connect) (establish a Stream through RPC). Finally the stream has been created once the remote side [accept](http://linux.die.net/man/2/accept) the request. +Currently streams are established by the client only. The new Stream objects are created in client and then are used to issue an RPC (through baidu_std protocol) to the specified service. The service could accept these streams by responding to the request without error, thus the Streams are created once the client receives the response successfully. Any error during this process fails the RPC and thus fails the Stream creation. Take the Linux environment as an example, the client creates a [socket](http://linux.die.net/man/7/socket) first (creates a Stream), and then tries to establish a connection with the remote side by [connect](http://linux.die.net/man/2/connect) (establish a Stream through RPC). Finally the stream has been created once the remote side [accept](http://linux.die.net/man/2/accept) the request. > If the client tries to establish a stream to a server that doesn't support streaming RPC, it will always return failure. @@ -58,6 +58,14 @@ struct StreamOptions // NULL, the Stream will be created with default options // Return 0 on success, -1 otherwise int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions* options); + +// [Called at the client side for creating multiple streams] +// Create streams at client-side along with the |cntl|, which will be connected +// when receiving the response with streams from server-side. If |options| is +// NULL, the stream will be created with default options +// Return 0 on success, -1 otherwise +int StreamCreate(StreamIds& request_streams, int request_stream_size, Controller& cntl, + const StreamOptions* options); ``` # Accept a Stream @@ -70,6 +78,12 @@ If a Stream is attached inside the request of an RPC, the service can accept the // (cntl.has_remote_stream() returns false), this method would fail. // Return 0 on success, -1 otherwise. int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options); + +// [Called at the server side for accepting multiple streams] +// Accept the streams. If client didn't create streams with the request +// (cntl.has_remote_stream() returns false), this method would fail. +// Return 0 on success, -1 otherwise. +int StreamAccept(StreamIds& response_stream, Controller& cntl, const StreamOptions* options); ``` # Read from a Stream diff --git a/example/streaming_batch_echo_c++/CMakeLists.txt b/example/streaming_batch_echo_c++/CMakeLists.txt new file mode 100644 index 0000000000..1d152ef9a0 --- /dev/null +++ b/example/streaming_batch_echo_c++/CMakeLists.txt @@ -0,0 +1,140 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cmake_minimum_required(VERSION 2.8.10) +project(streaming_batch_echo_c++ C CXX) + +option(LINK_SO "Whether examples are linked dynamically" OFF) + +execute_process( + COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'" + OUTPUT_VARIABLE OUTPUT_PATH +) + +set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) + +include(FindThreads) +include(FindProtobuf) +protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto) +# include PROTO_HEADER +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +# Search for libthrift* by best effort. If it is not found and brpc is +# compiled with thrift protocol enabled, a link error would be reported. +find_library(THRIFT_LIB NAMES thrift) +if (NOT THRIFT_LIB) + set(THRIFT_LIB "") +endif() + +find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h) +if(LINK_SO) + find_library(BRPC_LIB NAMES brpc) +else() + find_library(BRPC_LIB NAMES libbrpc.a brpc) +endif() +if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB)) + message(FATAL_ERROR "Fail to find brpc") +endif() +include_directories(${BRPC_INCLUDE_PATH}) + +find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h) +find_library(GFLAGS_LIBRARY NAMES gflags libgflags) +if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY)) + message(FATAL_ERROR "Fail to find gflags") +endif() +include_directories(${GFLAGS_INCLUDE_PATH}) + +execute_process( + COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS +) +if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE") + execute_process( + COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS + ) +endif() +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + include(CheckFunctionExists) + CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) + if(NOT HAVE_CLOCK_GETTIME) + set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC") + endif() +endif() + +set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}") +set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") + +if(CMAKE_VERSION VERSION_LESS "3.1.3") + if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() + if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() +else() + set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD_REQUIRED ON) +endif() + +find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h) +find_library(LEVELDB_LIB NAMES leveldb) +if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) + message(FATAL_ERROR "Fail to find leveldb") +endif() +include_directories(${LEVELDB_INCLUDE_PATH}) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(OPENSSL_ROOT_DIR + "/usr/local/opt/openssl" # Homebrew installed OpenSSL + ) +endif() + +find_package(OpenSSL) +include_directories(${OPENSSL_INCLUDE_DIR}) + +set(DYNAMIC_LIB + ${CMAKE_THREAD_LIBS_INIT} + ${GFLAGS_LIBRARY} + ${PROTOBUF_LIBRARIES} + ${LEVELDB_LIB} + ${OPENSSL_CRYPTO_LIBRARY} + ${OPENSSL_SSL_LIBRARY} + ${THRIFT_LIB} + dl +) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(DYNAMIC_LIB ${DYNAMIC_LIB} + pthread + "-framework CoreFoundation" + "-framework CoreGraphics" + "-framework CoreData" + "-framework CoreText" + "-framework Security" + "-framework Foundation" + "-Wl,-U,_MallocExtension_ReleaseFreeMemory" + "-Wl,-U,_ProfilerStart" + "-Wl,-U,_ProfilerStop" + "-Wl,-U,__Z13GetStackTracePPvii") +endif() + +add_executable(streaming_batch_echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER}) +add_executable(streaming_batch_echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER}) + +target_link_libraries(streaming_batch_echo_client ${BRPC_LIB} ${DYNAMIC_LIB}) +target_link_libraries(streaming_batch_echo_server ${BRPC_LIB} ${DYNAMIC_LIB}) diff --git a/example/streaming_batch_echo_c++/client.cpp b/example/streaming_batch_echo_c++/client.cpp new file mode 100644 index 0000000000..9088ac5ab7 --- /dev/null +++ b/example/streaming_batch_echo_c++/client.cpp @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A client sending requests to server in batch every 1 second. + +#include +#include +#include +#include +#include "echo.pb.h" + +DEFINE_bool(send_attachment, true, "Carry attachment along with requests"); +DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); +DEFINE_string(server, "0.0.0.0:8001", "IP Address of server"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // A Channel represents a communication line to a Server. Notice that + // Channel is thread-safe and can be shared by all threads in your program. + brpc::Channel channel; + + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_BAIDU_STD; + options.connection_type = FLAGS_connection_type; + options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/; + options.max_retry = FLAGS_max_retry; + if (channel.Init(FLAGS_server.c_str(), NULL) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return -1; + } + + // Normally, you should not call a Channel directly, but instead construct + // a stub Service wrapping it. stub can be shared by all threads as well. + example::EchoService_Stub stub(&channel); + brpc::Controller cntl; + brpc::StreamIds streams; + if (brpc::StreamCreate(streams, 3, cntl, NULL) != 0) { + LOG(ERROR) << "Fail to create stream"; + return -1; + } + for(size_t i = 0; i < streams.size(); ++i) { + LOG(INFO) << "Created Stream=" << streams[i]; + } + example::EchoRequest request; + example::EchoResponse response; + request.set_message("I'm a RPC to connect stream"); + stub.Echo(&cntl, &request, &response, NULL); + if (cntl.Failed()) { + LOG(ERROR) << "Fail to connect stream, " << cntl.ErrorText(); + return -1; + } + + while (!brpc::IsAskedToQuit()) { + butil::IOBuf msg1; + msg1.append("abcdefghijklmnopqrstuvwxyz"); + CHECK_EQ(0, brpc::StreamWrite(streams[0], msg1)); + butil::IOBuf msg2; + msg2.append("0123456789"); + CHECK_EQ(0, brpc::StreamWrite(streams[1], msg2)); + sleep(1); + butil::IOBuf msg3; + msg3.append("hello world"); + CHECK_EQ(0, brpc::StreamWrite(streams[2], msg3)); + sleep(1); + } + + CHECK_EQ(0, brpc::StreamClose(streams[0])); + CHECK_EQ(0, brpc::StreamClose(streams[1])); + CHECK_EQ(0, brpc::StreamClose(streams[2])); + LOG(INFO) << "EchoClient is going to quit"; + return 0; +} diff --git a/example/streaming_batch_echo_c++/echo.proto b/example/streaming_batch_echo_c++/echo.proto new file mode 100644 index 0000000000..2b39627fe8 --- /dev/null +++ b/example/streaming_batch_echo_c++/echo.proto @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax="proto2"; +package example; + +option cc_generic_services = true; + +message EchoRequest { + required string message = 1; +}; + +message EchoResponse { + required string message = 1; +}; + +service EchoService { + rpc Echo(EchoRequest) returns (EchoResponse); +}; diff --git a/example/streaming_batch_echo_c++/server.cpp b/example/streaming_batch_echo_c++/server.cpp new file mode 100644 index 0000000000..4f879c4f0a --- /dev/null +++ b/example/streaming_batch_echo_c++/server.cpp @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A server to receive EchoRequest and send back EchoResponse. + +#include +#include +#include +#include "echo.pb.h" +#include + +DEFINE_bool(send_attachment, true, "Carry attachment along with response"); +DEFINE_int32(port, 8001, "TCP Port of this server"); +DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s'"); + +class StreamReceiver : public brpc::StreamInputHandler { +public: + virtual int on_received_messages(brpc::StreamId id, + butil::IOBuf *const messages[], + size_t size) { + std::ostringstream os; + for (size_t i = 0; i < size; ++i) { + os << "msg[" << i << "]=" << *messages[i]; + } + LOG(INFO) << "Received from Stream=" << id << ": " << os.str(); + return 0; + } + virtual void on_idle_timeout(brpc::StreamId id) { + LOG(INFO) << "Stream=" << id << " has no data transmission for a while"; + } + virtual void on_closed(brpc::StreamId id) { + LOG(INFO) << "Stream=" << id << " is closed"; + } + + virtual void on_finished(brpc::StreamId id, int32_t finish_code) { + LOG(INFO) << "Stream=" << id << " is finished, code " << finish_code; + } +}; + +// Your implementation of example::EchoService +class StreamingBatchEchoService : public example::EchoService { +public: + virtual ~StreamingBatchEchoService() { + brpc::StreamClose(_sds[0]); + brpc::StreamClose(_sds[1]); + brpc::StreamClose(_sds[2]); + }; + virtual void Echo(google::protobuf::RpcController* controller, + const example::EchoRequest* /*request*/, + example::EchoResponse* response, + google::protobuf::Closure* done) { + // This object helps you to call done->Run() in RAII style. If you need + // to process the request asynchronously, pass done_guard.release(). + brpc::ClosureGuard done_guard(done); + + brpc::Controller* cntl = + static_cast(controller); + brpc::StreamOptions stream_options; + stream_options.handler = &_receiver; + if (brpc::StreamAccept(_sds, *cntl, &stream_options) != 0) { + cntl->SetFailed("Fail to accept stream"); + return; + } + response->set_message("Accepted stream"); + } + +private: + StreamReceiver _receiver; + brpc::StreamIds _sds; +}; + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // Generally you only need one Server. + brpc::Server server; + + // Instance of your service. + StreamingBatchEchoService echo_service_impl; + + // Add the service into server. Notice the second parameter, because the + // service is put on stack, we don't want server to delete it, otherwise + // use brpc::SERVER_OWNS_SERVICE. + if (server.AddService(&echo_service_impl, + brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Fail to add service"; + return -1; + } + + // Start the server. + brpc::ServerOptions options; + options.idle_timeout_sec = FLAGS_idle_timeout_s; + if (server.Start(FLAGS_port, &options) != 0) { + LOG(ERROR) << "Fail to start EchoServer"; + return -1; + } + + // Wait until Ctrl-C is pressed, then Stop() and Join() the server. + server.RunUntilAskedToQuit(); + return 0; +} diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index c15611ea8f..ab07ad3bff 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -535,7 +535,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, return cntl->HandleSendFailed(); } - if (cntl->_request_stream != INVALID_STREAM_ID) { + if (!cntl->_request_streams.empty()) { // Currently we cannot handle retry and backup request correctly cntl->set_max_retry(0); cntl->set_backup_request_ms(-1); diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index afebb3c282..7d242ef91d 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -290,8 +290,8 @@ void Controller::ResetPods() { _http_response = NULL; _request_user_fields = NULL; _response_user_fields = NULL; - _request_stream = INVALID_STREAM_ID; - _response_stream = INVALID_STREAM_ID; + _request_streams.clear(); + _response_streams.clear(); _remote_stream_settings = NULL; _auth_flags = 0; } @@ -1398,34 +1398,54 @@ void* Controller::session_local_data() { } void Controller::HandleStreamConnection(Socket *host_socket) { - if (_request_stream == INVALID_STREAM_ID) { + if (_request_streams.empty()) { CHECK(!has_remote_stream()); return; } - SocketUniquePtr ptr; + size_t stream_num = _request_streams.size(); + std::vector ptrs(stream_num); if (!FailedInline()) { - if (Socket::Address(_request_stream, &ptr) != 0) { - if (!FailedInline()) { - SetFailed(EREQUEST, "Request stream=%" PRIu64 " was closed before responded", - _request_stream); - } - } else if (_remote_stream_settings == NULL) { + if (_remote_stream_settings == NULL) { if (!FailedInline()) { SetFailed(EREQUEST, "The server didn't accept the stream"); } + } else { + for (size_t i = 0; i < stream_num; ++i) { + if (Socket::Address(_request_streams[i], &ptrs[i]) != 0) { + if (!FailedInline()) { + SetFailed(EREQUEST, "Request stream=%" PRIu64 " was closed before responded", + _request_streams[i]); + break; + } + } + } } } if (FailedInline()) { - Stream::SetFailed(_request_stream, _error_code, + Stream::SetFailed(_request_streams, _error_code, "%s", _error_text.c_str()); if (_remote_stream_settings != NULL) { policy::SendStreamRst(host_socket, _remote_stream_settings->stream_id()); + for (int i = 0; i < _remote_stream_settings->extra_stream_ids().size(); ++i) { + policy::SendStreamRst(host_socket, + _remote_stream_settings->extra_stream_ids()[i]); + } } return; } - Stream* s = (Stream*)ptr->conn(); + Stream* s = (Stream*)ptrs[0]->conn(); s->SetConnected(_remote_stream_settings); + if (stream_num > 1) { + const auto& extra_stream_ids = _remote_stream_settings->extra_stream_ids(); + _remote_stream_settings->clear_extra_stream_ids(); + for (size_t i = 1; i < stream_num; ++i) { + Stream* extra_stream = (Stream *) ptrs[i]->conn(); + _remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]); + s->shareHostSocket(*extra_stream); + extra_stream->SetConnected(_remote_stream_settings); + } + } } // TODO: Need more security advices from professionals. diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 9b3c0201ae..2a4ec6b58a 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -123,7 +123,9 @@ friend class schan::Sender; friend class schan::SubDone; friend class policy::OnServerStreamCreated; friend int StreamCreate(StreamId*, Controller&, const StreamOptions*); +friend int StreamCreate(StreamIds&, int, Controller&, const StreamOptions*); friend int StreamAccept(StreamId*, Controller&, const StreamOptions*); +friend int StreamAccept(StreamIds&, Controller&, const StreamOptions*); friend void policy::ProcessMongoRequest(InputMessageBase*); friend void policy::ProcessThriftRequest(InputMessageBase*); // << Flags >> @@ -866,9 +868,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // TODO: Replace following fields with StreamCreator // Defined at client side - StreamId _request_stream; + StreamIds _request_streams; // Defined at server side - StreamId _response_stream; + StreamIds _response_streams; // Defined at both sides StreamSettings *_remote_stream_settings; diff --git a/src/brpc/details/controller_private_accessor.h b/src/brpc/details/controller_private_accessor.h index 1be7df8bb8..db40ca1541 100644 --- a/src/brpc/details/controller_private_accessor.h +++ b/src/brpc/details/controller_private_accessor.h @@ -119,8 +119,8 @@ class ControllerPrivateAccessor { return _cntl->_remote_stream_settings; } - StreamId request_stream() { return _cntl->_request_stream; } - StreamId response_stream() { return _cntl->_response_stream; } + StreamIds request_streams() { return _cntl->_request_streams; } + StreamIds response_streams() { return _cntl->_response_streams; } void set_method(const google::protobuf::MethodDescriptor* method) { _cntl->_method = method; } diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 504895b185..a7920fa729 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -216,10 +216,12 @@ void SendRpcResponse(int64_t correlation_id, ClosureGuard guard(brpc::NewCallback( cntl, &Controller::CallAfterRpcResp, req, res)); - StreamId response_stream_id = accessor.response_stream(); + StreamIds response_stream_ids = accessor.response_streams(); if (cntl->IsCloseConnection()) { - StreamClose(response_stream_id); + for(size_t i = 0; i < response_stream_ids.size(); ++i) { + StreamClose(response_stream_ids[i]); + } sock->SetFailed(); return; } @@ -262,11 +264,16 @@ void SendRpcResponse(int64_t correlation_id, meta.set_attachment_size(attached_size); } SocketUniquePtr stream_ptr; - if (response_stream_id != INVALID_STREAM_ID) { + if (!response_stream_ids.empty()) { + StreamId response_stream_id = response_stream_ids[0]; if (Socket::Address(response_stream_id, &stream_ptr) == 0) { - Stream* s = (Stream*)stream_ptr->conn(); - s->FillSettings(meta.mutable_stream_settings()); + Stream* s = (Stream *) stream_ptr->conn(); + StreamSettings *stream_settings = meta.mutable_stream_settings(); + s->FillSettings(stream_settings); s->SetHostSocket(sock); + for (size_t i = 1; i < response_stream_ids.size(); ++i) { + stream_settings->mutable_extra_stream_ids()->Add(response_stream_ids[i]); + } } else { LOG(WARNING) << "Stream=" << response_stream_id << " was closed before sending response"; @@ -302,7 +309,7 @@ void SendRpcResponse(int64_t correlation_id, // Response_stream can be INVALID_STREAM_ID when error occurs. if (SendStreamData(sock, &res_buf, accessor.remote_stream_settings()->stream_id(), - accessor.response_stream()) != 0) { + accessor.response_streams()[0]) != 0) { const int errcode = errno; std::string error_text = butil::string_printf(64, "Fail to write into %s", sock->description().c_str()); @@ -716,6 +723,10 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { << "Fail to lock correlation_id=" << cid << ": " << berror(rc); if (remote_stream_id != INVALID_STREAM_ID) { SendStreamRst(msg->socket(), meta.stream_settings().stream_id()); + const auto & extra_stream_ids = meta.stream_settings().extra_stream_ids(); + for (int i = 0; i < extra_stream_ids.size(); ++i) { + policy::SendStreamRst(msg->socket(), extra_stream_ids[i]); + } } return; } @@ -825,15 +836,20 @@ void PackRpcRequest(butil::IOBuf* req_buf, request_meta->set_request_id(cntl->request_id()); } meta.set_correlation_id(correlation_id); - StreamId request_stream_id = accessor.request_stream(); - if (request_stream_id != INVALID_STREAM_ID) { + StreamIds request_stream_ids = accessor.request_streams(); + if (!request_stream_ids.empty()) { + StreamSettings* stream_settings = meta.mutable_stream_settings(); + StreamId request_stream_id = request_stream_ids[0]; SocketUniquePtr ptr; if (Socket::Address(request_stream_id, &ptr) != 0) { - return cntl->SetFailed(EREQUEST, "Stream=%" PRIu64 " was closed", + return cntl->SetFailed(EREQUEST, "Stream=%" PRIu64 " was closed", request_stream_id); } - Stream *s = (Stream*)ptr->conn(); - s->FillSettings(meta.mutable_stream_settings()); + Stream* s = (Stream*) ptr->conn(); + s->FillSettings(stream_settings); + for (size_t i = 1; i < request_stream_ids.size(); ++i) { + stream_settings->mutable_extra_stream_ids()->Add(request_stream_ids[i]); + } } if (cntl->has_request_user_fields() && !cntl->request_user_fields()->empty()) { diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index 27f87c6f49..ef1fc2cf23 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -68,7 +68,7 @@ Stream::~Stream() { int Stream::Create(const StreamOptions &options, const StreamSettings *remote_settings, - StreamId *id) { + StreamId *id, bool parse_rpc_response) { Stream* s = new Stream(); s->_host_socket = NULL; s->_fake_socket_weak_ref = NULL; @@ -88,10 +88,8 @@ int Stream::Create(const StreamOptions &options, if (remote_settings != NULL) { s->_remote_settings.MergeFrom(*remote_settings); - s->_parse_rpc_response = false; - } else { - s->_parse_rpc_response = true; } + s->_parse_rpc_response = parse_rpc_response; if (bthread_id_list_init(&s->_writable_wait_list, 8, 8/*FIXME*/)) { delete s; return -1; @@ -668,6 +666,10 @@ void Stream::Close(int error_code, const char* reason_fmt, ...) { return TriggerOnConnectIfNeed(); } +int Stream::shareHostSocket(Stream& other_stream) { + return other_stream.SetHostSocket(_host_socket); +} + int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) { SocketUniquePtr ptr; if (Socket::AddressFailedAsWell(id, &ptr) == -1) { @@ -682,6 +684,16 @@ int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) return 0; } +int Stream::SetFailed(const StreamIds& ids, int error_code, const char* reason_fmt, ...) { + va_list ap; + va_start(ap, reason_fmt); + for(size_t i = 0; i< ids.size(); ++i) { + Stream::SetFailed(ids[i], error_code, reason_fmt, ap); + } + va_end(ap); + return 0; +} + void Stream::HandleRpcResponse(butil::IOBuf* response_buffer) { CHECK(!_remote_settings.IsInitialized()); CHECK(_host_socket != NULL); @@ -759,37 +771,73 @@ int StreamClose(StreamId stream_id) { int StreamCreate(StreamId *request_stream, Controller &cntl, const StreamOptions* options) { - if (cntl._request_stream != INVALID_STREAM_ID) { + if (request_stream == NULL) { + LOG(ERROR) << "request_stream is NULL"; + return -1; + } + StreamIds request_streams; + StreamCreate(request_streams, 1, cntl, options); + *request_stream = request_streams[0]; + return 0; +} + +int StreamCreate(StreamIds& request_streams, int request_stream_size, Controller & cntl, + const StreamOptions* options) { + if (!cntl._request_streams.empty()) { LOG(ERROR) << "Can't create request stream more than once"; return -1; } - if (request_stream == NULL) { - LOG(ERROR) << "request_stream is NULL"; + if (!request_streams.empty()) { + LOG(ERROR) << "request_streams should be empty"; return -1; } - StreamId stream_id; StreamOptions opt; if (options != NULL) { opt = *options; } - if (Stream::Create(opt, NULL, &stream_id) != 0) { - LOG(ERROR) << "Fail to create stream"; - return -1; + for (auto i = 0; i < request_stream_size; ++i) { + StreamId stream_id; + if (Stream::Create(opt, NULL, &stream_id) != 0) { + // Close already created streams + Stream::SetFailed(request_streams, 0 , "Fail to create stream at %d index", i); + LOG(ERROR) << "Fail to create stream"; + return -1; + } + cntl._request_streams.push_back(stream_id); + request_streams.push_back(stream_id); } - cntl._request_stream = stream_id; - *request_stream = stream_id; return 0; } int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options) { + if (response_stream == NULL) { + LOG(ERROR) << "response_stream is NULL"; + return -1; + } + StreamIds response_streams; + int res = StreamAccept(response_streams, cntl, options); + if(res != 0) { + return res; + } + if(response_streams.size() != 1) { + Stream::SetFailed(response_streams, 0, "Logic error"); + LOG(ERROR) << "accept more than one response_stream"; + return -1; + } + *response_stream = response_streams[0]; + return 0; +} - if (cntl._response_stream != INVALID_STREAM_ID) { +int StreamAccept(StreamIds& response_streams, Controller& cntl, + const StreamOptions* options) { + if (!cntl._response_streams.empty()) { LOG(ERROR) << "Can't create response stream more than once"; return -1; } - if (response_stream == NULL) { - LOG(ERROR) << "response_stream is NULL"; + + if (!response_streams.empty()) { + LOG(ERROR) << "response_streams should be empty"; return -1; } if (!cntl.has_remote_stream()) { @@ -800,13 +848,17 @@ int StreamAccept(StreamId* response_stream, Controller &cntl, if (options != NULL) { opt = *options; } - StreamId stream_id; - if (Stream::Create(opt, cntl._remote_stream_settings, &stream_id) != 0) { - LOG(ERROR) << "Fail to create stream"; - return -1; + for (auto i = 0; i <= cntl._remote_stream_settings->extra_stream_ids_size(); ++i) { + StreamId stream_id; + if (Stream::Create(opt, cntl._remote_stream_settings, &stream_id, false) != 0) { + Stream::SetFailed(response_streams, 0, "Fail to accept stream at %d index", i); + LOG(ERROR) << "Fail to accept stream"; + return -1; + } + cntl._response_streams.push_back(stream_id); + response_streams.push_back(stream_id); } - cntl._response_stream = stream_id; - *response_stream = stream_id; + return 0; } diff --git a/src/brpc/stream.h b/src/brpc/stream.h index f222ba0940..36c0def70f 100644 --- a/src/brpc/stream.h +++ b/src/brpc/stream.h @@ -28,6 +28,7 @@ namespace brpc { class Controller; typedef SocketId StreamId; +using StreamIds = std::vector; const StreamId INVALID_STREAM_ID = (StreamId)-1L; namespace detail { @@ -105,6 +106,14 @@ struct StreamWriteOptions { int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions* options); +// [Called at the client side for creating multiple streams] +// Create streams at client-side along with the |cntl|, which will be connected +// when receiving the response with streams from server-side. If |options| is +// NULL, the stream will be created with default options +// Return 0 on success, -1 otherwise +int StreamCreate(StreamIds& request_streams, int request_stream_size, Controller& cntl, + const StreamOptions* options); + // [Called at the server side] // Accept the stream. If client didn't create a stream with the request // (cntl.has_remote_stream() returns false), this method would fail. @@ -112,6 +121,12 @@ int StreamCreate(StreamId* request_stream, Controller &cntl, int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options); +// [Called at the server side for accepting multiple streams] +// Accept the streams. If client didn't create streams with the request +// (cntl.has_remote_stream() returns false), this method would fail. +// Return 0 on success, -1 otherwise. +int StreamAccept(StreamIds& response_stream, Controller& cntl, + const StreamOptions* options); // Write |message| into |stream_id|. The remote-side handler will received the // message by the written order // Returns 0 on success, errno otherwise diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h index db92dd63d6..c57126a2eb 100644 --- a/src/brpc/stream_impl.h +++ b/src/brpc/stream_impl.h @@ -46,7 +46,7 @@ class BAIDU_CACHELINE_ALIGNMENT Stream : public SocketConnection { const StreamWriteOptions* options = NULL); static int Create(const StreamOptions& options, const StreamSettings *remote_settings, - StreamId *id); + StreamId *id, bool parse_rpc_response = true); StreamId id() { return _id; } int OnReceived(const StreamFrameMeta& fm, butil::IOBuf *buf, Socket* sock); @@ -63,8 +63,11 @@ class BAIDU_CACHELINE_ALIGNMENT Stream : public SocketConnection { void FillSettings(StreamSettings *settings); static int SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) __attribute__ ((__format__ (__printf__, 3, 4))); + static int SetFailed(const StreamIds& ids, int error_code, const char* reason_fmt, ...) + __attribute__ ((__format__ (__printf__, 3, 4))); void Close(int error_code, const char* reason_fmt, ...) __attribute__ ((__format__ (__printf__, 3, 4))); + int shareHostSocket(Stream& other_stream); private: friend void StreamWait(StreamId stream_id, const timespec *due_time, diff --git a/src/brpc/streaming_rpc_meta.proto b/src/brpc/streaming_rpc_meta.proto index 05d83217af..5474583d2f 100644 --- a/src/brpc/streaming_rpc_meta.proto +++ b/src/brpc/streaming_rpc_meta.proto @@ -25,6 +25,7 @@ message StreamSettings { required int64 stream_id = 1; optional bool need_feedback = 2 [default = false]; optional bool writable = 3 [default = false]; + repeated int64 extra_stream_ids = 4; } enum FrameType { From 650d2f9c8e43456585881f06118bc1bf3a20e1dd Mon Sep 17 00:00:00 2001 From: jenrryyou Date: Tue, 3 Sep 2024 22:16:00 +0800 Subject: [PATCH 2/4] fix protobuf 22.5 compilation error related to thread_local in MacOS --- .github/workflows/ci-macos.yml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci-macos.yml b/.github/workflows/ci-macos.yml index 685d4e6b00..50ab2e0294 100644 --- a/.github/workflows/ci-macos.yml +++ b/.github/workflows/ci-macos.yml @@ -57,7 +57,7 @@ jobs: cd build make -j ${{env.proc_num}} - compile-with-make-protobuf22: + compile-with-make-protobuf23: runs-on: macos-latest # https://github.com/actions/runner-images steps: @@ -67,9 +67,9 @@ jobs: run: | brew install openssl gnu-getopt coreutils gflags leveldb # abseil 20230125.3 - curl -o abseil.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/abseil.rb - # protobuf 22.5 - curl -o protobuf.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/protobuf.rb + curl -o abseil.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/abseil.rb + # protobuf 23.3 + curl -o protobuf.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/protobuf.rb HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK=1 brew install --formula --ignore-dependencies ./abseil.rb ./protobuf.rb - name: config_brpc @@ -82,7 +82,7 @@ jobs: run: | make -j ${{env.proc_num}} - compile-with-cmake-protobuf22: + compile-with-cmake-protobuf23: runs-on: macos-latest steps: @@ -92,9 +92,9 @@ jobs: run: | brew install openssl gflags leveldb # abseil 20230125.3 - curl -o abseil.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/abseil.rb - # protobuf 22.5 - curl -o protobuf.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/protobuf.rb + curl -o abseil.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/abseil.rb + # protobuf 23.3 + curl -o protobuf.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/protobuf.rb HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK=1 brew install --formula --ignore-dependencies ./abseil.rb ./protobuf.rb - name: cmake From 69f5380f9c3d5928de26d7354c2c67ccae91453f Mon Sep 17 00:00:00 2001 From: jenrryyou Date: Wed, 4 Sep 2024 20:53:40 +0800 Subject: [PATCH 3/4] refine style --- src/brpc/controller.cpp | 2 +- src/brpc/stream.cpp | 2 +- src/brpc/stream_impl.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 7d242ef91d..271ccfa485 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -1442,7 +1442,7 @@ void Controller::HandleStreamConnection(Socket *host_socket) { for (size_t i = 1; i < stream_num; ++i) { Stream* extra_stream = (Stream *) ptrs[i]->conn(); _remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]); - s->shareHostSocket(*extra_stream); + s->ShareHostSocket(*extra_stream); extra_stream->SetConnected(_remote_stream_settings); } } diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index ef1fc2cf23..62e92a9d6c 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -666,7 +666,7 @@ void Stream::Close(int error_code, const char* reason_fmt, ...) { return TriggerOnConnectIfNeed(); } -int Stream::shareHostSocket(Stream& other_stream) { +int Stream::ShareHostSocket(Stream& other_stream) { return other_stream.SetHostSocket(_host_socket); } diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h index c57126a2eb..66e0d7191b 100644 --- a/src/brpc/stream_impl.h +++ b/src/brpc/stream_impl.h @@ -67,7 +67,7 @@ class BAIDU_CACHELINE_ALIGNMENT Stream : public SocketConnection { __attribute__ ((__format__ (__printf__, 3, 4))); void Close(int error_code, const char* reason_fmt, ...) __attribute__ ((__format__ (__printf__, 3, 4))); - int shareHostSocket(Stream& other_stream); + int ShareHostSocket(Stream& other_stream); private: friend void StreamWait(StreamId stream_id, const timespec *due_time, From 58c392ab3b2726fc70d505ed0c4df68fe1018db5 Mon Sep 17 00:00:00 2001 From: jenrryyou Date: Mon, 30 Sep 2024 21:48:14 +0800 Subject: [PATCH 4/4] modify code based on the code review feedback and add more tests modify code based on the code review feedback and add more tests --- example/streaming_batch_echo_c++/client.cpp | 29 +++++++++++++- example/streaming_batch_echo_c++/server.cpp | 15 ++++--- src/brpc/policy/baidu_rpc_protocol.cpp | 30 +++++++++----- src/brpc/stream.cpp | 43 +++++++++++++++------ 4 files changed, 91 insertions(+), 26 deletions(-) diff --git a/example/streaming_batch_echo_c++/client.cpp b/example/streaming_batch_echo_c++/client.cpp index 9088ac5ab7..ea58a191b8 100644 --- a/example/streaming_batch_echo_c++/client.cpp +++ b/example/streaming_batch_echo_c++/client.cpp @@ -29,6 +29,30 @@ DEFINE_string(server, "0.0.0.0:8001", "IP Address of server"); DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +class StreamClientReceiver : public brpc::StreamInputHandler { +public: + virtual int on_received_messages(brpc::StreamId id, + butil::IOBuf *const messages[], + size_t size) { + std::ostringstream os; + for (size_t i = 0; i < size; ++i) { + os << "msg[" << i << "]=" << *messages[i]; + } + LOG(INFO) << "Received from Stream=" << id << ": " << os.str(); + return 0; + } + virtual void on_idle_timeout(brpc::StreamId id) { + LOG(INFO) << "Stream=" << id << " has no data transmission for a while"; + } + virtual void on_closed(brpc::StreamId id) { + LOG(INFO) << "Stream=" << id << " is closed"; + } + + virtual void on_finished(brpc::StreamId id, int32_t finish_code) { + LOG(INFO) << "Stream=" << id << " is finished, code " << finish_code; + } +}; + int main(int argc, char* argv[]) { // Parse gflags. We recommend you to use gflags as well. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); @@ -51,9 +75,12 @@ int main(int argc, char* argv[]) { // Normally, you should not call a Channel directly, but instead construct // a stub Service wrapping it. stub can be shared by all threads as well. example::EchoService_Stub stub(&channel); + StreamClientReceiver receiver; brpc::Controller cntl; brpc::StreamIds streams; - if (brpc::StreamCreate(streams, 3, cntl, NULL) != 0) { + brpc::StreamOptions stream_options; + stream_options.handler = &receiver; + if (brpc::StreamCreate(streams, 3, cntl, &stream_options) != 0) { LOG(ERROR) << "Fail to create stream"; return -1; } diff --git a/example/streaming_batch_echo_c++/server.cpp b/example/streaming_batch_echo_c++/server.cpp index 4f879c4f0a..3690344a8f 100644 --- a/example/streaming_batch_echo_c++/server.cpp +++ b/example/streaming_batch_echo_c++/server.cpp @@ -37,7 +37,8 @@ class StreamReceiver : public brpc::StreamInputHandler { for (size_t i = 0; i < size; ++i) { os << "msg[" << i << "]=" << *messages[i]; } - LOG(INFO) << "Received from Stream=" << id << ": " << os.str(); + auto res = brpc::StreamWrite(id, *messages[0]); + LOG(INFO) << "Received from Stream=" << id << ": " << os.str() << " and write back result: " << res; return 0; } virtual void on_idle_timeout(brpc::StreamId id) { @@ -56,9 +57,7 @@ class StreamReceiver : public brpc::StreamInputHandler { class StreamingBatchEchoService : public example::EchoService { public: virtual ~StreamingBatchEchoService() { - brpc::StreamClose(_sds[0]); - brpc::StreamClose(_sds[1]); - brpc::StreamClose(_sds[2]); + closeStreams(); }; virtual void Echo(google::protobuf::RpcController* controller, const example::EchoRequest* /*request*/, @@ -67,7 +66,7 @@ class StreamingBatchEchoService : public example::EchoService { // This object helps you to call done->Run() in RAII style. If you need // to process the request asynchronously, pass done_guard.release(). brpc::ClosureGuard done_guard(done); - + closeStreams(); brpc::Controller* cntl = static_cast(controller); brpc::StreamOptions stream_options; @@ -80,6 +79,12 @@ class StreamingBatchEchoService : public example::EchoService { } private: + void closeStreams() { + for(auto i = 0; i < _sds.size(); ++i) { + brpc::StreamClose(_sds[i]); + } + _sds.clear(); + } StreamReceiver _receiver; brpc::StreamIds _sds; }; diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index a7920fa729..cc2dcbd27d 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -263,9 +263,10 @@ void SendRpcResponse(int64_t correlation_id, if (attached_size > 0) { meta.set_attachment_size(attached_size); } + StreamId response_stream_id = INVALID_STREAM_ID; SocketUniquePtr stream_ptr; if (!response_stream_ids.empty()) { - StreamId response_stream_id = response_stream_ids[0]; + response_stream_id = response_stream_ids[0]; if (Socket::Address(response_stream_id, &stream_ptr) == 0) { Stream* s = (Stream *) stream_ptr->conn(); StreamSettings *stream_settings = meta.mutable_stream_settings(); @@ -309,24 +310,35 @@ void SendRpcResponse(int64_t correlation_id, // Response_stream can be INVALID_STREAM_ID when error occurs. if (SendStreamData(sock, &res_buf, accessor.remote_stream_settings()->stream_id(), - accessor.response_streams()[0]) != 0) { + response_stream_id) != 0) { const int errcode = errno; std::string error_text = butil::string_printf(64, "Fail to write into %s", sock->description().c_str()); PLOG_IF(WARNING, errcode != EPIPE) << error_text; cntl->SetFailed(errcode, "%s", error_text.c_str()); - if(stream_ptr) { - ((Stream*)stream_ptr->conn())->Close(errcode, "%s", - error_text.c_str()); - } + Stream::SetFailed(response_stream_ids, errcode, "%s", + error_text.c_str()); return; } + // Now it's ok the mark these server-side streams as connected as all the + // written user data would follower the RPC response. + // Reuse stream_ptr to avoid address first stream id again if(stream_ptr) { - // Now it's ok the mark this server-side stream as connected as all the - // written user data would follower the RPC response. ((Stream*)stream_ptr->conn())->SetConnected(); } + for (size_t i = 1; i < response_stream_ids.size(); ++i) { + StreamId extra_stream_id = response_stream_ids[i]; + SocketUniquePtr extra_stream_ptr; + if (Socket::Address(extra_stream_id, &extra_stream_ptr) == 0) { + Stream* extra_stream = (Stream *) extra_stream_ptr->conn(); + extra_stream->SetHostSocket(sock); + extra_stream->SetConnected(); + } else { + LOG(WARNING) << "Stream=" << extra_stream_id + << " was closed before sending response"; + } + } } else{ // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. @@ -722,7 +734,7 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { LOG_IF(ERROR, rc != EINVAL && rc != EPERM) << "Fail to lock correlation_id=" << cid << ": " << berror(rc); if (remote_stream_id != INVALID_STREAM_ID) { - SendStreamRst(msg->socket(), meta.stream_settings().stream_id()); + SendStreamRst(msg->socket(), remote_stream_id); const auto & extra_stream_ids = meta.stream_settings().extra_stream_ids(); for (int i = 0; i < extra_stream_ids.size(); ++i) { policy::SendStreamRst(msg->socket(), extra_stream_ids[i]); diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index 62e92a9d6c..73d6405190 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -797,7 +797,8 @@ int StreamCreate(StreamIds& request_streams, int request_stream_size, Controller } for (auto i = 0; i < request_stream_size; ++i) { StreamId stream_id; - if (Stream::Create(opt, NULL, &stream_id) != 0) { + bool parse_rpc_response = (i == 0); // Only the first stream need parse rpc + if (Stream::Create(opt, NULL, &stream_id, parse_rpc_response) != 0) { // Close already created streams Stream::SetFailed(request_streams, 0 , "Fail to create stream at %d index", i); LOG(ERROR) << "Fail to create stream"; @@ -821,8 +822,10 @@ int StreamAccept(StreamId* response_stream, Controller &cntl, return res; } if(response_streams.size() != 1) { - Stream::SetFailed(response_streams, 0, "Logic error"); - LOG(ERROR) << "accept more than one response_stream"; + Stream::SetFailed(response_streams, EINVAL, + "misusing StreamAccept for single stream to accept multiple streams"); + cntl._response_streams.clear(); + LOG(ERROR) << "misusing StreamAccept for single stream to accept multiple streams"; return -1; } *response_stream = response_streams[0]; @@ -848,15 +851,33 @@ int StreamAccept(StreamIds& response_streams, Controller& cntl, if (options != NULL) { opt = *options; } - for (auto i = 0; i <= cntl._remote_stream_settings->extra_stream_ids_size(); ++i) { - StreamId stream_id; - if (Stream::Create(opt, cntl._remote_stream_settings, &stream_id, false) != 0) { - Stream::SetFailed(response_streams, 0, "Fail to accept stream at %d index", i); - LOG(ERROR) << "Fail to accept stream"; - return -1; + StreamId stream_id; + if (Stream::Create(opt, cntl._remote_stream_settings, &stream_id, false) != 0) { + Stream::SetFailed(response_streams, 0, "Fail to accept stream"); + LOG(ERROR) << "Fail to accept stream"; + return -1; + } + + cntl._response_streams.push_back(stream_id); + response_streams.push_back(stream_id); + if(!cntl._remote_stream_settings->extra_stream_ids().empty()) { + StreamSettings stream_remote_settings; + stream_remote_settings.MergeFrom(*cntl._remote_stream_settings); + //Only the first stream needs extra_stream_ids settings + stream_remote_settings.clear_extra_stream_ids(); + for (auto i = 0; i < cntl._remote_stream_settings->extra_stream_ids_size(); ++i) { + stream_remote_settings.set_stream_id(cntl._remote_stream_settings->extra_stream_ids()[i]); + StreamId extra_stream_id; + if (Stream::Create(opt, &stream_remote_settings, &extra_stream_id, false) != 0) { + Stream::SetFailed(response_streams, 0, "Fail to accept stream at %d index", i); + cntl._response_streams.clear(); + response_streams.clear(); + LOG(ERROR) << "Fail to accept stream"; + return -1; + } + cntl._response_streams.push_back(extra_stream_id); + response_streams.push_back(extra_stream_id); } - cntl._response_streams.push_back(stream_id); - response_streams.push_back(stream_id); } return 0;