Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/cn/streaming_rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ Streaming RPC保证:
- 全双工。
- 支持流控。
- 提供超时提醒

目前的实现还没有自动切割过大的消息,同一个tcp连接上的多个Stream之间可能有[Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking)问题,请尽量避免过大的单个消息,实现自动切割后我们会告知并更新文档。
- 自动切割过大的消息

例子见[example/streaming_echo_c++](https://github.com/brpc/brpc/tree/master/example/streaming_echo_c++/)。

Expand Down
3 changes: 1 addition & 2 deletions docs/en/streaming_rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ Streaming RPC ensures/provides:
- Full duplex
- Flow control
- Notification on timeout

We do not support segment large messages automatically so that multiple Streams on a single TCP connection may lead to [Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking) problem. Please avoid putting huge data into single message until we provide automatic segmentation.
- support segment large messages automatically

For examples please refer to [example/streaming_echo_c++](https://github.com/brpc/brpc/tree/master/example/streaming_echo_c++/).

Expand Down
29 changes: 21 additions & 8 deletions src/brpc/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
namespace brpc {

DECLARE_bool(usercode_in_pthread);

DEFINE_uint64(max_stream_data_frame_size, 64 * 1024 * 1024,
"Maximum size of a transmission unit that we used to cut the message.");
const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L;

Stream::Stream()
Expand Down Expand Up @@ -140,20 +141,32 @@ ssize_t Stream::CutMessageIntoFileDescriptor(int /*fd*/,
errno = EBADF;
return -1;
}
butil::IOBuf out;
ssize_t len = 0;
for (size_t i = 0; i < size; ++i) {
butil::IOBuf *data = data_list[i];
size_t length = data->length();
uint64_t trans_unit = FLAGS_max_stream_data_frame_size == 0
? length
: FLAGS_max_stream_data_frame_size;
int packet_num = ceil((double)length / (double)trans_unit);
Comment thread
Aaaaaaron marked this conversation as resolved.

butil::IOBuf split_data;
for (int j = 0; j < packet_num; j++) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用while (!data->empty()) 来判断是不是更直接。也不用计算packet_num了。

butil::IOBuf out;
data->cutn(&split_data, trans_unit);
bool has_continuation = (j != packet_num - 1);
StreamFrameMeta fm;
fm.set_stream_id(_remote_settings.stream_id());
fm.set_source_stream_id(id());
fm.set_frame_type(FRAME_TYPE_DATA);
// TODO: split large data
fm.set_has_continuation(false);
policy::PackStreamMessage(&out, fm, data_list[i]);
len += data_list[i]->length();
data_list[i]->clear();
fm.set_has_continuation(has_continuation);
policy::PackStreamMessage(&out, fm, &split_data);
WriteToHostSocket(&out);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果has_continuation再Write
否则放到最后统一Write

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是指如果消息没有达到 max_stream_data_frame_size, 就还是和之前一样放到最后统一 write 吗

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的

len += (ssize_t)split_data.length();
split_data.clear();
}
data->clear();
}
WriteToHostSocket(&out);
return len;
}

Expand Down