Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/cn/streaming_rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ Streaming RPC保证:
- 全双工。
- 支持流控。
- 提供超时提醒

目前的实现还没有自动切割过大的消息,同一个tcp连接上的多个Stream之间可能有[Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking)问题,请尽量避免过大的单个消息,实现自动切割后我们会告知并更新文档。
- 支持自动切割过大的消息,避免[Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking)问题

例子见[example/streaming_echo_c++](https://github.com/apache/brpc/tree/master/example/streaming_echo_c++/)。

Expand Down
4 changes: 2 additions & 2 deletions docs/en/streaming_rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ Streaming RPC ensures/provides:
- Full duplex
- Flow control
- Notification on timeout

We do not support segment large messages automatically so that multiple Streams on a single TCP connection may lead to [Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking) problem. Please avoid putting huge data into single message until we provide automatic segmentation.
- We support segment large messages automatically to avoid [Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_bloc
king) problem.

For examples please refer to [example/streaming_echo_c++](https://github.com/apache/brpc/tree/master/example/streaming_echo_c++/).

Expand Down
64 changes: 54 additions & 10 deletions src/brpc/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ namespace brpc {

DECLARE_bool(usercode_in_pthread);
DECLARE_int64(socket_max_streams_unconsumed_bytes);
DEFINE_uint64(stream_write_max_segment_size, 512 * 1024 * 1024,
Comment thread
chenBright marked this conversation as resolved.
"Stream message exceeding this size will be automatically split into smaller segments");
BRPC_VALIDATE_GFLAG(stream_write_max_segment_size, PositiveInteger);

const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L;

Expand All @@ -60,6 +63,11 @@ Stream::Stream()
}

Stream::~Stream() {
// Clear pending buffer
if (_pending_buf != NULL) {
delete _pending_buf;
_pending_buf = NULL;
}
CHECK(_host_socket == NULL);
bthread_mutex_destroy(&_connect_mutex);
bthread_mutex_destroy(&_congestion_control_mutex);
Expand Down Expand Up @@ -154,18 +162,54 @@ ssize_t Stream::CutMessageIntoFileDescriptor(int /*fd*/,
}
butil::IOBuf out;
ssize_t len = 0;
ssize_t unwritten_data_size = 0;
for (size_t i = 0; i < size; ++i) {
StreamFrameMeta fm;
fm.set_stream_id(_remote_settings.stream_id());
fm.set_source_stream_id(id());
fm.set_frame_type(FRAME_TYPE_DATA);
// TODO: split large data
fm.set_has_continuation(false);
policy::PackStreamMessage(&out, fm, data_list[i]);
len += data_list[i]->length();
data_list[i]->clear();
butil::IOBuf *data = data_list[i];
size_t length = data->length();
if (length > FLAGS_stream_write_max_segment_size) {
if (unwritten_data_size) {
WriteToHostSocket(&out);
unwritten_data_size = 0;
out.clear();
}
// segmenting large data into multiple parts
butil::IOBuf segment_buf;
bool has_continuation = true;
while (has_continuation) {
data->cutn(&segment_buf, FLAGS_stream_write_max_segment_size);
StreamFrameMeta fm;
fm.set_stream_id(_remote_settings.stream_id());
fm.set_source_stream_id(id());
fm.set_frame_type(FRAME_TYPE_DATA);
has_continuation = !data->empty();
fm.set_has_continuation(has_continuation);
policy::PackStreamMessage(&out, fm, &segment_buf);
len += segment_buf.length();
segment_buf.clear();
WriteToHostSocket(&out);
out.clear();
}
} else {
if (unwritten_data_size + length > FLAGS_stream_write_max_segment_size) {
WriteToHostSocket(&out);
unwritten_data_size = 0;
out.clear();
}
unwritten_data_size += length;
StreamFrameMeta fm;
fm.set_stream_id(_remote_settings.stream_id());
fm.set_source_stream_id(id());
fm.set_frame_type(FRAME_TYPE_DATA);
fm.set_has_continuation(false);
policy::PackStreamMessage(&out, fm, data_list[i]);
len += length;
data_list[i]->clear();
}
}

if (!out.empty()) {
WriteToHostSocket(&out);
}
WriteToHostSocket(&out);
return len;
}

Expand Down
55 changes: 55 additions & 0 deletions test/brpc_streaming_rpc_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "brpc/controller.h"
#include "brpc/channel.h"
#include "brpc/socket.h"
#include "brpc/stream_impl.h"
#include "brpc/policy/streaming_rpc_protocol.h"
#include "echo.pb.h"
Expand Down Expand Up @@ -577,3 +578,57 @@ TEST_F(StreamingRpcTest, server_send_data_before_run_done) {
ASSERT_FALSE(handler.failed());
ASSERT_EQ(0, handler.idle_times());
}

TEST_F(StreamingRpcTest, segment_stream_data_automatically) {
GFLAGS_NAMESPACE::SetCommandLineOption("stream_write_max_segment_size", "1");
OrderedInputHandler handler;
brpc::StreamOptions opt;
opt.handler = &handler;
opt.messages_in_batch = 100;
brpc::Server server;
MyServiceWithStream service(opt);
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(9007, NULL));
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
brpc::Controller cntl;
brpc::StreamId request_stream;
brpc::StreamOptions request_stream_options;
ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options));
brpc::ScopedStream stream_guard(request_stream);
test::EchoService_Stub stub(&channel);
stub.Echo(&cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream;
const int N = 1000;
for (int i = 0; i < N; ++i) {
int network = htonl(i);
butil::IOBuf out;
out.append(&network, sizeof(network));
ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)) << "i=" << i;
}

brpc::SocketUniquePtr host_socket_ptr;
{
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(request_stream, &ptr));
brpc::Stream *s = (brpc::Stream *)ptr->conn();
ASSERT_TRUE(s->_host_socket != NULL);
s->_host_socket->ReAddress(&host_socket_ptr);
}

ASSERT_EQ(0, brpc::StreamClose(request_stream));
server.Stop(0);
server.Join();
while (!handler.stopped()) {
usleep(100);
}
const int64_t now_ms = butil::cpuwide_time_ms();
host_socket_ptr->UpdateStatsEverySecond(now_ms);
brpc::SocketStat stat;
host_socket_ptr->GetStat(&stat);
ASSERT_LT(N * sizeof(N), stat.out_num_messages_m);
ASSERT_FALSE(handler.failed());
ASSERT_EQ(0, handler.idle_times());
ASSERT_EQ(N, handler._expected_next_value);
GFLAGS_NAMESPACE::SetCommandLineOption("stream_write_max_segment_size", "536870912");
}