From 41fb784db40ba284c79362a2e34c0b908bf266db Mon Sep 17 00:00:00 2001 From: jenrryyou Date: Fri, 7 Feb 2025 17:33:35 +0800 Subject: [PATCH] Support segment large stream messages automatically --- docs/cn/streaming_rpc.md | 3 +- docs/en/streaming_rpc.md | 4 +- src/brpc/stream.cpp | 64 +++++++++++++++++++++++----- test/brpc_streaming_rpc_unittest.cpp | 55 ++++++++++++++++++++++++ 4 files changed, 112 insertions(+), 14 deletions(-) diff --git a/docs/cn/streaming_rpc.md b/docs/cn/streaming_rpc.md index c9f9ab3673..6bdb2f2913 100644 --- a/docs/cn/streaming_rpc.md +++ b/docs/cn/streaming_rpc.md @@ -16,8 +16,7 @@ Streaming RPC保证: - 全双工。 - 支持流控。 - 提供超时提醒 - -目前的实现还没有自动切割过大的消息,同一个tcp连接上的多个Stream之间可能有[Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking)问题,请尽量避免过大的单个消息,实现自动切割后我们会告知并更新文档。 +- 支持自动切割过大的消息,避免[Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking)问题 例子见[example/streaming_echo_c++](https://github.com/apache/brpc/tree/master/example/streaming_echo_c++/)。 diff --git a/docs/en/streaming_rpc.md b/docs/en/streaming_rpc.md index 8e06714887..7a41c24dc8 100644 --- a/docs/en/streaming_rpc.md +++ b/docs/en/streaming_rpc.md @@ -16,8 +16,8 @@ Streaming RPC ensures/provides: - Full duplex - Flow control - Notification on timeout - -We do not support segment large messages automatically so that multiple Streams on a single TCP connection may lead to [Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking) problem. Please avoid putting huge data into single message until we provide automatic segmentation. +- We support segment large messages automatically to avoid [Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_bloc +king) problem. For examples please refer to [example/streaming_echo_c++](https://github.com/apache/brpc/tree/master/example/streaming_echo_c++/). diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index 73d6405190..68397b57ec 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -36,6 +36,9 @@ namespace brpc { DECLARE_bool(usercode_in_pthread); DECLARE_int64(socket_max_streams_unconsumed_bytes); +DEFINE_uint64(stream_write_max_segment_size, 512 * 1024 * 1024, + "Stream message exceeding this size will be automatically split into smaller segments"); +BRPC_VALIDATE_GFLAG(stream_write_max_segment_size, PositiveInteger); const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L; @@ -60,6 +63,11 @@ Stream::Stream() } Stream::~Stream() { + // Clear pending buffer + if (_pending_buf != NULL) { + delete _pending_buf; + _pending_buf = NULL; + } CHECK(_host_socket == NULL); bthread_mutex_destroy(&_connect_mutex); bthread_mutex_destroy(&_congestion_control_mutex); @@ -154,18 +162,54 @@ ssize_t Stream::CutMessageIntoFileDescriptor(int /*fd*/, } butil::IOBuf out; ssize_t len = 0; + ssize_t unwritten_data_size = 0; for (size_t i = 0; i < size; ++i) { - StreamFrameMeta fm; - fm.set_stream_id(_remote_settings.stream_id()); - fm.set_source_stream_id(id()); - fm.set_frame_type(FRAME_TYPE_DATA); - // TODO: split large data - fm.set_has_continuation(false); - policy::PackStreamMessage(&out, fm, data_list[i]); - len += data_list[i]->length(); - data_list[i]->clear(); + butil::IOBuf *data = data_list[i]; + size_t length = data->length(); + if (length > FLAGS_stream_write_max_segment_size) { + if (unwritten_data_size) { + WriteToHostSocket(&out); + unwritten_data_size = 0; + out.clear(); + } + // segmenting large data into multiple parts + butil::IOBuf segment_buf; + bool has_continuation = true; + while (has_continuation) { + data->cutn(&segment_buf, FLAGS_stream_write_max_segment_size); + StreamFrameMeta fm; + fm.set_stream_id(_remote_settings.stream_id()); + fm.set_source_stream_id(id()); + fm.set_frame_type(FRAME_TYPE_DATA); + has_continuation = !data->empty(); + fm.set_has_continuation(has_continuation); + policy::PackStreamMessage(&out, fm, &segment_buf); + len += segment_buf.length(); + segment_buf.clear(); + WriteToHostSocket(&out); + out.clear(); + } + } else { + if (unwritten_data_size + length > FLAGS_stream_write_max_segment_size) { + WriteToHostSocket(&out); + unwritten_data_size = 0; + out.clear(); + } + unwritten_data_size += length; + StreamFrameMeta fm; + fm.set_stream_id(_remote_settings.stream_id()); + fm.set_source_stream_id(id()); + fm.set_frame_type(FRAME_TYPE_DATA); + fm.set_has_continuation(false); + policy::PackStreamMessage(&out, fm, data_list[i]); + len += length; + data_list[i]->clear(); + } + } + + if (!out.empty()) { + WriteToHostSocket(&out); } - WriteToHostSocket(&out); return len; } diff --git a/test/brpc_streaming_rpc_unittest.cpp b/test/brpc_streaming_rpc_unittest.cpp index df6a37d888..b0dd4a395e 100644 --- a/test/brpc_streaming_rpc_unittest.cpp +++ b/test/brpc_streaming_rpc_unittest.cpp @@ -24,6 +24,7 @@ #include "brpc/controller.h" #include "brpc/channel.h" +#include "brpc/socket.h" #include "brpc/stream_impl.h" #include "brpc/policy/streaming_rpc_protocol.h" #include "echo.pb.h" @@ -577,3 +578,57 @@ TEST_F(StreamingRpcTest, server_send_data_before_run_done) { ASSERT_FALSE(handler.failed()); ASSERT_EQ(0, handler.idle_times()); } + +TEST_F(StreamingRpcTest, segment_stream_data_automatically) { + GFLAGS_NAMESPACE::SetCommandLineOption("stream_write_max_segment_size", "1"); + OrderedInputHandler handler; + brpc::StreamOptions opt; + opt.handler = &handler; + opt.messages_in_batch = 100; + brpc::Server server; + MyServiceWithStream service(opt); + ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); + ASSERT_EQ(0, server.Start(9007, NULL)); + brpc::Channel channel; + ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); + brpc::Controller cntl; + brpc::StreamId request_stream; + brpc::StreamOptions request_stream_options; + ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); + brpc::ScopedStream stream_guard(request_stream); + test::EchoService_Stub stub(&channel); + stub.Echo(&cntl, &request, &response, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream; + const int N = 1000; + for (int i = 0; i < N; ++i) { + int network = htonl(i); + butil::IOBuf out; + out.append(&network, sizeof(network)); + ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)) << "i=" << i; + } + + brpc::SocketUniquePtr host_socket_ptr; + { + brpc::SocketUniquePtr ptr; + ASSERT_EQ(0, brpc::Socket::Address(request_stream, &ptr)); + brpc::Stream *s = (brpc::Stream *)ptr->conn(); + ASSERT_TRUE(s->_host_socket != NULL); + s->_host_socket->ReAddress(&host_socket_ptr); + } + + ASSERT_EQ(0, brpc::StreamClose(request_stream)); + server.Stop(0); + server.Join(); + while (!handler.stopped()) { + usleep(100); + } + const int64_t now_ms = butil::cpuwide_time_ms(); + host_socket_ptr->UpdateStatsEverySecond(now_ms); + brpc::SocketStat stat; + host_socket_ptr->GetStat(&stat); + ASSERT_LT(N * sizeof(N), stat.out_num_messages_m); + ASSERT_FALSE(handler.failed()); + ASSERT_EQ(0, handler.idle_times()); + ASSERT_EQ(N, handler._expected_next_value); + GFLAGS_NAMESPACE::SetCommandLineOption("stream_write_max_segment_size", "536870912"); +}