Skip to content

滑动窗口bug:Fail to ibv_post_send: Cannot allocate memory, window=3, sq_current=98 #3132

Description

@sunce4t

Describe the bug
此错误代表:滑动窗口表示可发送,但RDMA的SQ(Send Queue)没有可用的空间发送请求;

滑动窗口代表发送方可继续发送的能力;当前,滑动窗口的更新时机为:poll到带有imm(也就是带有acks)的recv cqe,并且进行对应的处理时;
RDMA的SQ是一个环形buffer,其上存放发送但未完成的请求,SQ的指针更新时机为 poll 到 send cqe时;
这里我们可以发现:SQ和滑动窗口的更新时机不一致;

按照正常逻辑来讲,如果处理到带有imm的recv cqe,那么代表此前发送方所发出的RDMA Send已经完成,在recv cqe之前应该先生成send cqe;

如下图示意:

这是brpc期望的一个逻辑,在更新滑动窗口值前,先更新了SQ;

Image

但遇到该问题时,说明情况是这样的:
发送方调用大量RDMA Send操作,占用完SQ;
接着发送方poll到带有imm的recv cqe(并且此时一定没有其他send cqe,如果有的话,按照brpc一次poll 32个cqe,也会触发SQ指针更新),进入处理逻辑更新滑动窗口值;
更新窗口值后,某个线程发现窗口值非0,尝试发送,但SQ没有可用空间存放请求了,于是报错;

为什么在带有imm的recv cqe之前没有send cqe呢?
根据brpc的逻辑,带有imm就代表接收方收到数据(意味着RDMA Send已经完成),并向发送方发送确认;
而此时发送方已经生成带有imm的recv cqe,那么为什么已经完成的RDMA Send还没生成send cqe?

查看了IB spec,不同WQ共享CQ他们生成cqe是不能确保顺序的,只能确保同一个WQ内部的cqe生成顺序:

Image

所以,brpc的SQ和RQ共享了一个CQ后,不能确保发送方收到带有imm的recv cqe之前,一定生成send cqe;

可以看如下示意图:

Image

因此我认为此问题由两个原因共同导致:
1.滑动窗口更新时机和RDMA SQ更新时机不一致;
2.cqe生成顺序的不确定性。

To Reproduce
由业务触发,本地测试程序没有复现成功。不过发现日志中报错时业务均在发送大量数据。

Expected behavior
我们期望的是:滑动窗口更新时机与RDMA SQ的更新时机保持一致;
目前我们实现了一个patch,保证滑动窗口更新时机一定在RDMA SQ的更新时机之后。(确保RDMA SQ有空间再更新滑动窗口)
我们新增了_remote_acks记录接收方确认的数目;因为同一个WQ内是保证顺序的,我们使用_sq_to_update队列记录unsignaled的数目,使用_sq_update_flag队列记录已经poll到的send cqe数目;
当处理带有imm的recv cqe时,判断此前是否poll到过send cqe,若没有poll到,仅将此次的acks累加到_remote_acks中,不进行窗口值更新。

分析:
此patch不影响正常逻辑(即先生成send cqe,后生成recv cqe),这种情况在此patch下没有任何区别;
当先生成recv cqe,后生成send cqe时,此patch强制要求窗口值更新在send cqe后,保证SQ不会溢出。

diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp
index 1d502a98..dac82bb7 100644
--- a/src/brpc/rdma/rdma_endpoint.cpp
+++ b/src/brpc/rdma/rdma_endpoint.cpp

@@ -101,7 +114,7 @@ static const uint32_t ACK_MSG_RDMA_OK = 0x1;

 static butil::Mutex* g_rdma_resource_mutex = NULL;
 static RdmaResource* g_rdma_resource_list = NULL;
-
+static butil::atomic<uint64_t> g_wr_id(0);
 struct HelloMessage {
     void Serialize(void* data) const;
     void Deserialize(void* data);
@@ -191,7 +204,11 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
     , _remote_window_capacity(0)
     , _window_size(0)
     , _new_rq_wrs(0)
+    , _remote_acks(0)
+    , _m_sq_unsignaled(0)
 {
+    LOG(INFO) << "_remote_acks: " << _remote_acks.load();
+    LOG(INFO) << "_m_sq_unsignaled: " << _m_sq_unsignaled;
     if (_sq_size < MIN_QP_SIZE) {
         _sq_size = MIN_QP_SIZE;
     }
@@ -208,6 +225,11 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
 }

 RdmaEndpoint::~RdmaEndpoint() {
+    LOG(INFO) << _window_size << " " << _remote_acks << " " << _sq_update_flag.size() << " " << _sq_to_update.size() << " " << _sq_unsignaled << " " << _sq_unsignaled;
+    while(_sq_to_update.empty() == false) {
+        LOG(INFO) << _sq_to_update.front();
+       _sq_to_update.pop();
+    }
     Reset();
     bthread::butex_destroy(_read_butex);
 }
@@ -231,6 +253,8 @@ void RdmaEndpoint::Reset() {
     _new_rq_wrs = 0;
     _sq_sent = 0;
     _rq_received = 0;
+    _remote_acks.store(0, butil::memory_order_relaxed);
+    _m_sq_unsignaled.store(0, butil::memory_order_relaxed);
 }

 void RdmaConnect::StartConnect(const Socket* socket,
@@ -878,15 +902,36 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
         }

         ibv_send_wr* bad = NULL;
-        int err = ibv_post_send(_resource->qp, &wr, &bad);
-        if (err != 0) {
-            // We use other way to guarantee the Send Queue is not full.
-            // So we just consider this error as an unrecoverable error.
-            LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
+
+        wr.wr_id = g_wr_id.fetch_add(1, butil::memory_order_relaxed);
+        {
+            BAIDU_SCOPED_LOCK(_m_sq_mutex);
+            int err = ibv_post_send(_resource->qp, &wr, &bad);
+            if (err != 0) {
+                // We use other way to guarantee the Send Queue is not full.
+                // So we just consider this error as an unrecoverable error.
+                LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
                          << ", window=" << window
                          << ", sq_current=" << _sq_current;
                 errno = err;
                 return -1;
+            }
+            _m_sq_unsignaled.fetch_add(1, butil::memory_order_release);
+            uint16_t cur_unsignaled = 0;
+            if(wr.send_flags & IBV_SEND_SIGNALED) {
+                cur_unsignaled = _m_sq_unsignaled.exchange(0, butil::memory_order_acquire);
+            }
+            if(cur_unsignaled != 0) {
+                BAIDU_SCOPED_LOCK(_sq_update_mutex);
+                _sq_to_update.push(cur_unsignaled);
+               //LOG(INFO) << "send signaled before: " << cur_unsignaled;
+            }
         }

         ++_sq_current;
@@ -924,13 +969,26 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
     wr.send_flags |= IBV_SEND_SIGNALED;

     ibv_send_wr* bad = NULL;
-    int err = ibv_post_send(_resource->qp, &wr, &bad);
-    if (err != 0) {
-        // We use other way to guarantee the Send Queue is not full.
-        // So we just consider this error as an unrecoverable error.
-        LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
-        return -1;
+    wr.wr_id = g_wr_id.fetch_add(1, butil::memory_order_relaxed);
+    uint16_t cur_unsignaled = 0;
+    {
+        BAIDU_SCOPED_LOCK(_m_sq_mutex);
+        int err = ibv_post_send(_resource->qp, &wr, &bad);
+        if (err != 0) {
+            // We use other way to guarantee the Send Queue is not full.
+            // So we just consider this error as an unrecoverable error.
+            LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
+            return -1;
+        }
+        cur_unsignaled = _m_sq_unsignaled.exchange(0, butil::memory_order_relaxed);
+        {
+            BAIDU_SCOPED_LOCK(_sq_update_mutex);
+            _sq_to_update.push(cur_unsignaled);
+            //LOG(INFO) << "wr id: " << wr.wr_id <<  " SendImm before: " << cur_unsignaled;
+        }
+
     }
+
     return 0;
 }
@@ -938,8 +996,11 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
     bool zerocopy = FLAGS_rdma_recv_zerocopy;
     switch (wc.opcode) {
     case IBV_WC_SEND: {  // send completion
-        // Do nothing
-        break;
+        {
+            BAIDU_SCOPED_LOCK(_sq_flag_mutex);
+           _sq_update_flag.push(true);
+       }
+       break;
     }
     case IBV_WC_RECV: {  // recv completion
         // Please note that only the first wc.byte_len bytes is valid
@@ -959,24 +1020,66 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
         }
         if (wc.imm_data > 0) {
             // Clear sbuf here because we ignore event wakeup for send completions
-            uint32_t acks = butil::NetToHost32(wc.imm_data);
-            uint32_t num = acks;
-            while (num > 0) {
-                _sbuf[_sq_sent++].clear();
-                if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
-                    _sq_sent = 0;
-                }
-                --num;
-            }
-            butil::subtle::MemoryBarrier();

             // Update window
-            uint32_t wnd_thresh = _local_window_capacity / 8;
-            if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
-                    || acks >= wnd_thresh) {
+            //uint32_t wnd_thresh = _local_window_capacity / 8;
+            //if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
+            //        || acks >= wnd_thresh) {
                 // Do not wake up writing thread right after _window_size > 0.
                 // Otherwise the writing thread may switch to background too quickly.
-                _socket->WakeAsEpollOut();
+            //    _socket->WakeAsEpollOut();
+            //}
+           uint32_t acks = butil::NetToHost32(wc.imm_data);
+           //LOG(INFO) << "acks: " << acks;
+            _remote_acks.fetch_add(acks, butil::memory_order_relaxed);
+            {
+                BAIDU_SCOPED_LOCK(_sq_update_mutex);
+                while(_sq_to_update.empty() == false &&  _remote_acks.load() >= _sq_to_update.front()) {
+                    {
+                        BAIDU_SCOPED_LOCK(_sq_flag_mutex);
+                        if(_sq_update_flag.empty() == true) {
+                            break;
+                        }
+                        _sq_update_flag.pop();
+                    }
+                    uint32_t wnd_to_update = _sq_to_update.front();
+                    _sq_to_update.pop();
+                    _remote_acks.fetch_sub(wnd_to_update, butil::memory_order_relaxed);
+
+                    //LOG(INFO)  << wnd_to_update << " " <<   _remote_acks << " " << _sq_update_flag.size() << " " << _sq_to_update.size();
+                   uint32_t num = wnd_to_update;
+                    while(num > 0) {
+                        _sbuf[_sq_sent++].clear();
+                        if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
+                            _sq_sent = 0;
+                        }
+                        --num;
+                    }
+                    butil::subtle::MemoryBarrier();
+                    uint32_t wnd_thresh = _local_window_capacity / 8;
+                    if (_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed) >= wnd_thresh
+                            || acks >= wnd_thresh) {
+            // Do not wake up writing thread right after _window_size > 0.
+            // Otherwise the writing thread may switch to background too quickly.
+                                _socket->WakeAsEpollOut();
+                    }
+                }
             }
         }
diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h
index de7cd5f6..114eb682 100644
--- a/src/brpc/rdma/rdma_endpoint.h
+++ b/src/brpc/rdma/rdma_endpoint.h
@@ -31,7 +31,7 @@
 #include "butil/containers/mpsc_queue.h"
 #include "brpc/socket.h"

-
+#include <queue>
 namespace brpc {
 class Socket;
 namespace rdma {
@@ -262,6 +262,13 @@ private:
     // The number of new WRs posted in the local Recv Queue
     butil::atomic<uint16_t> _new_rq_wrs;

+    butil::atomic<int> _remote_acks;
+    butil::Mutex _m_sq_mutex;
+    butil::Mutex _sq_update_mutex;
+    butil::Mutex _sq_flag_mutex;
+    butil::atomic<uint16_t> _m_sq_unsignaled;
+    std::queue<uint16_t> _sq_to_update;
+    std::queue<bool> _sq_update_flag;
     // butex for inform read events on TCP fd during handshake
     butil::atomic<int> *_read_butex;

**
Versions
OS:
Compiler:
brpc:
protobuf:

Additional context/screenshots

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugthe code does not work as expected

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions