Skip to content

split large data when using brpc streaming#1947

Closed
Aaaaaaron wants to merge 2 commits into
apache:masterfrom
Aaaaaaron:stream_split_mesg
Closed

split large data when using brpc streaming#1947
Aaaaaaron wants to merge 2 commits into
apache:masterfrom
Aaaaaaron:stream_split_mesg

Conversation

@Aaaaaaron

Copy link
Copy Markdown
Member

https://github.com/apache/incubator-brpc/blob/master/docs/cn/streaming_rpc.md

"目前的实现还没有自动切割过大的消息,同一个tcp连接上的多个Stream之间可能有Head-of-line blocking问题,请尽量避免过大的单个消息,实现自动切割后我们会告知并更新文档"

尝试解决这个 todo.

@Aaaaaaron Aaaaaaron force-pushed the stream_split_mesg branch 3 times, most recently from 04a6930 to 67b6b29 Compare October 9, 2022 03:00
Comment thread src/brpc/stream.cpp
Comment thread src/brpc/stream.cpp
int packet_num = ceil((double)length / (double)trans_unit);

butil::IOBuf split_data;
for (int j = 0; j < packet_num; j++) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用while (!data->empty()) 来判断是不是更直接。也不用计算packet_num了。

Comment thread src/brpc/stream.cpp
data_list[i]->clear();
fm.set_has_continuation(has_continuation);
policy::PackStreamMessage(&out, fm, &split_data);
WriteToHostSocket(&out);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果has_continuation再Write
否则放到最后统一Write

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是指如果消息没有达到 max_stream_data_frame_size, 就还是和之前一样放到最后统一 write 吗

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的

Comment thread docs/en/streaming_rpc.md Outdated
Comment thread src/brpc/stream.cpp Outdated
@wwbmmm

wwbmmm commented Oct 11, 2022

Copy link
Copy Markdown
Contributor

另外,建议补充下对应单测

@Aaaaaaron

Copy link
Copy Markdown
Member Author

@wwbmmm 多谢 review, ut 一直想加的, 没有想到合适的测法, 请问你这边有啥建议吗

@wwbmmm

wwbmmm commented Oct 12, 2022

Copy link
Copy Markdown
Contributor

@wwbmmm 多谢 review, ut 一直想加的, 没有想到合适的测法, 请问你这边有啥建议吗

可以把max_stream_data_frame_size设置成一个比较小的值,然后发一个比较长的包,看是否有分多次发送(通过Socket的GetOrNewSharedPart()->out_num_messages可以获取write的次数)

@jenrryyou

jenrryyou commented Oct 24, 2022

Copy link
Copy Markdown
Contributor

想到一个问题,如果还没有收到带结束标记的数据帧(has_continuation=false),这个stream就关闭了(收到close或者主动关闭),_pending_buf已经保存的iobuf数据没有释放的相关代码,可能存在内存泄漏。

@wwbmmm

wwbmmm commented Nov 23, 2022

Copy link
Copy Markdown
Contributor

@Aaaaaaron 这个PR和master有冲突了,能否解决一下。并补充单测,谢谢!

@Aaaaaaron

Copy link
Copy Markdown
Member Author

@wwbmmm 好的 谢谢 不好意思最近比较忙😂

@Superskyyy

Copy link
Copy Markdown
Member

@Aaaaaaron 请问这个可以合并了么

@chenBright

Copy link
Copy Markdown
Contributor

@Aaaaaaron cc

@chenBright

Copy link
Copy Markdown
Contributor

Closed this as completed in #2889 .

@chenBright chenBright closed this Feb 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants