Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 41 additions & 26 deletions src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -878,10 +878,14 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
}

ibv_send_wr* bad = NULL;
if (ibv_post_send(_resource->qp, &wr, &bad) < 0) {
int err = ibv_post_send(_resource->qp, &wr, &bad);
if (err != 0) {
// We use other way to guarantee the Send Queue is not full.
// So we just consider this error as an unrecoverable error.
PLOG(WARNING) << "Fail to ibv_post_send";
LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
<< ", window=" << window
<< ", sq_current=" << _sq_current;
errno = err;
return -1;
}

Expand Down Expand Up @@ -920,10 +924,11 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
wr.send_flags |= IBV_SEND_SIGNALED;

ibv_send_wr* bad = NULL;
if (ibv_post_send(_resource->qp, &wr, &bad) < 0) {
int err = ibv_post_send(_resource->qp, &wr, &bad);
if (err != 0) {
// We use other way to guarantee the Send Queue is not full.
// So we just consider this error as an unrecoverable error.
PLOG(WARNING) << "Fail to ibv_post_send";
LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
return -1;
}
return 0;
Expand Down Expand Up @@ -1004,8 +1009,9 @@ int RdmaEndpoint::DoPostRecv(void* block, size_t block_size) {
wr.sg_list = &sge;

ibv_recv_wr* bad = NULL;
if (ibv_post_recv(_resource->qp, &wr, &bad) < 0) {
PLOG(WARNING) << "Fail to ibv_post_recv";
int err = ibv_post_recv(_resource->qp, &wr, &bad);
if (err != 0) {
LOG(WARNING) << "Fail to ibv_post_recv: " << berror(err);
return -1;
}
return 0;
Expand Down Expand Up @@ -1143,8 +1149,9 @@ int RdmaEndpoint::AllocateResources() {
return -1;
}

if (ibv_req_notify_cq(_resource->cq, 1) < 0) {
PLOG(WARNING) << "Fail to arm CQ comp channel";
int err = ibv_req_notify_cq(_resource->cq, 1);
if (err != 0) {
LOG(WARNING) << "Fail to arm CQ comp channel: " << berror(err);
return -1;
}
} else {
Expand Down Expand Up @@ -1186,12 +1193,13 @@ int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num) {
attr.pkey_index = 0; // TODO: support more pkey use in future
attr.port_num = GetRdmaPortNum();
attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE;
if (IbvModifyQp(_resource->qp, &attr, (ibv_qp_attr_mask)(
IBV_QP_STATE |
int err = IbvModifyQp(_resource->qp, &attr, (ibv_qp_attr_mask)(
IBV_QP_STATE |
IBV_QP_PKEY_INDEX |
IBV_QP_PORT |
IBV_QP_ACCESS_FLAGS)) < 0) {
PLOG(WARNING) << "Fail to modify QP from RESET to INIT";
IBV_QP_ACCESS_FLAGS));
if (err != 0) {
LOG(WARNING) << "Fail to modify QP from RESET to INIT: " << berror(err);
return -1;
}

Expand All @@ -1217,15 +1225,16 @@ int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num) {
attr.rq_psn = 0;
attr.max_dest_rd_atomic = 0;
attr.min_rnr_timer = 0; // We do not allow rnr error
if (IbvModifyQp(_resource->qp, &attr, (ibv_qp_attr_mask)(
err = IbvModifyQp(_resource->qp, &attr, (ibv_qp_attr_mask)(
IBV_QP_STATE |
IBV_QP_PATH_MTU |
IBV_QP_MIN_RNR_TIMER |
IBV_QP_AV |
IBV_QP_MAX_DEST_RD_ATOMIC |
IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN)) < 0) {
PLOG(WARNING) << "Fail to modify QP from INIT to RTR";
IBV_QP_RQ_PSN));
if (err != 0) {
LOG(WARNING) << "Fail to modify QP from INIT to RTR: " << berror(err);
return -1;
}

Expand All @@ -1235,14 +1244,15 @@ int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num) {
attr.rnr_retry = 0; // We do not allow rnr error
attr.sq_psn = 0;
attr.max_rd_atomic = 0;
if (IbvModifyQp(_resource->qp, &attr, (ibv_qp_attr_mask)(
err = IbvModifyQp(_resource->qp, &attr, (ibv_qp_attr_mask)(
IBV_QP_STATE |
IBV_QP_RNR_RETRY |
IBV_QP_RETRY_CNT |
IBV_QP_TIMEOUT |
IBV_QP_SQ_PSN |
IBV_QP_MAX_QP_RD_ATOMIC)) < 0) {
PLOG(WARNING) << "Fail to modify QP from RTR to RTS";
IBV_QP_MAX_QP_RD_ATOMIC));
if (err != 0) {
LOG(WARNING) << "Fail to modify QP from RTR to RTS: " << berror(err);
return -1;
}

Expand Down Expand Up @@ -1270,17 +1280,20 @@ void RdmaEndpoint::DeallocateResources() {
if (_resource->comp_channel) {
fd = _resource->comp_channel->fd;
}
int err;
if (!move_to_rdma_resource_list) {
if (_resource->qp) {
if (IbvDestroyQp(_resource->qp) < 0) {
PLOG(WARNING) << "Fail to destroy QP";
err = IbvDestroyQp(_resource->qp);
if (err != 0) {
LOG(WARNING) << "Fail to destroy QP: " << berror(err);
}
_resource->qp = NULL;
}
if (_resource->cq) {
IbvAckCqEvents(_resource->cq, _cq_events);
if (IbvDestroyCq(_resource->cq) < 0) {
PLOG(WARNING) << "Fail to destroy CQ";
err = IbvDestroyCq(_resource->cq);
if (err != 0) {
PLOG(WARNING) << "Fail to destroy CQ: " << berror(err);
Comment thread
live4thee marked this conversation as resolved.
}
_resource->cq = NULL;
}
Expand All @@ -1289,8 +1302,9 @@ void RdmaEndpoint::DeallocateResources() {
// so that we should remove it from epoll fd first
_socket->_io_event.RemoveConsumer(fd);
fd = -1;
if (IbvDestroyCompChannel(_resource->comp_channel) < 0) {
PLOG(WARNING) << "Fail to destroy CQ channel";
err = IbvDestroyCompChannel(_resource->comp_channel);
if (err != 0) {
LOG(WARNING) << "Fail to destroy CQ channel: " << berror(err);
}
_resource->comp_channel = NULL;
}
Expand Down Expand Up @@ -1328,7 +1342,7 @@ static const int MAX_CQ_EVENTS = 128;
int RdmaEndpoint::GetAndAckEvents() {
int events = 0; void* context = NULL;
while (1) {
if (IbvGetCqEvent(_resource->comp_channel, &_resource->cq, &context) < 0) {
if (IbvGetCqEvent(_resource->comp_channel, &_resource->cq, &context) != 0) {
Comment thread
yanglimingcn marked this conversation as resolved.
if (errno != EAGAIN) {
return -1;
}
Expand Down Expand Up @@ -1392,7 +1406,8 @@ void RdmaEndpoint::PollCq(Socket* m) {
// that the event arrives after the poll but before the notify,
// we should re-poll the CQ once after the notify to check if
// there is an available CQE.
if (ibv_req_notify_cq(ep->_resource->cq, 1) < 0) {
errno = ibv_req_notify_cq(ep->_resource->cq, 1);
if (errno != 0) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to arm CQ comp channel: " << s->description();
s->SetFailed(saved_errno, "Fail to arm cq channel from %s: %s",
Expand Down
11 changes: 6 additions & 5 deletions src/brpc/rdma/rdma_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ void BlockDeallocate(void* buf) {

static void FindRdmaLid() {
ibv_port_attr attr;
if (IbvQueryPort(g_context, g_port_num, &attr) < 0) {
if (IbvQueryPort(g_context, g_port_num, &attr) != 0) {
return;
}
g_lid = attr.lid;
Expand All @@ -215,7 +215,7 @@ static bool FindRdmaGid(ibv_context* context) {
bool found = false;
for (int i = g_gid_tbl_len - 1; i >= 0; --i) {
ibv_gid gid;
if (IbvQueryGid(context, g_port_num, i, &gid) < 0) {
if (IbvQueryGid(context, g_port_num, i, &gid) != 0) {
continue;
}
if (gid.global.interface_id == 0) {
Expand Down Expand Up @@ -245,7 +245,7 @@ static void OnRdmaAsyncEvent(Socket* m) {
int progress = Socket::PROGRESS_INIT;
do {
ibv_async_event event;
if (IbvGetAsyncEvent(g_context, &event) < 0) {
if (IbvGetAsyncEvent(g_context, &event) != 0) {
break;
}
LOG(WARNING) << "rdma async event: " << IbvEventTypeStr(event.event_type);
Expand Down Expand Up @@ -405,7 +405,8 @@ static ibv_context* OpenDevice(int num_total, int* num_available_devices) {
continue;
}
ibv_port_attr attr;
if (IbvQueryPort(context.get(), uint8_t(FLAGS_rdma_port), &attr) < 0) {
errno = IbvQueryPort(context.get(), uint8_t(FLAGS_rdma_port), &attr);
if (errno != 0) {
PLOG(WARNING) << "Fail to query port " << FLAGS_rdma_port << " on "
<< dev_name;
continue;
Expand Down Expand Up @@ -522,7 +523,7 @@ static void GlobalRdmaInitializeOrDieImpl() {
}

ibv_device_attr attr;
if (IbvQueryDevice(g_context, &attr) < 0) {
if (IbvQueryDevice(g_context, &attr) != 0) {
PLOG(ERROR) << "Fail to get the device information";
ExitWithError();
}
Expand Down
Loading