Skip to content

Commit 7425458

Browse files
Merge pull request #4 from NetSys/vector_tx_unix
C++-ize RX/TX vector implementation for UnixSocketPort
2 parents 18e21b2 + bbc3d72 commit 7425458

File tree

3 files changed

+84
-130
lines changed

3 files changed

+84
-130
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,4 @@ Please add your name to the end of this file and include this file to the PR, un
5757
* Eran Gampel
5858
* Tamás Lévai
5959
* Matthew Mussomele
60+
* Anton Ivanov

core/drivers/unix_socket.cc

Lines changed: 71 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,20 @@ void UnixSocketAcceptThread::Run() {
9696
}
9797
}
9898

99-
static void replenishVector(struct iovec *iovecs, bess::Packet **pktvec, int cnt) {
100-
for (int i=0; i < cnt; i++) {
101-
pktvec[i] = current_worker.packet_pool()->Alloc();
102-
if (pktvec[i]) {
103-
iovecs[i].iov_base = pktvec[i]->data();
104-
iovecs[i].iov_len = SNBUF_DATA;
105-
} else {
106-
iovecs[i].iov_base = NULL; /* vectors can have holes, it will just drop the packet */
107-
iovecs[i].iov_len = 0;
108-
}
99+
void UnixSocketPort::ReplenishRecvVector(int cnt) {
100+
DCHECK_LE(cnt, bess::PacketBatch::kMaxBurst);
101+
bool allocated =
102+
current_worker.packet_pool()->AllocBulk(pkt_recv_vector_.data(), cnt);
103+
104+
for (int i = 0; i < cnt; i++) {
105+
if (allocated) {
106+
recv_iovecs_[i] = {.iov_base = pkt_recv_vector_[i]->data(),
107+
.iov_len = SNBUF_DATA};
108+
} else {
109+
// vectors can have holes, it will just drop the packet
110+
recv_iovecs_[i] = {.iov_base = nullptr, .iov_len = 0};
109111
}
112+
}
110113
}
111114

112115
CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) {
@@ -171,36 +174,20 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) {
171174
return CommandFailure(errno, "unable to start accept thread");
172175
}
173176

174-
send_vector = new(std::nothrow) mmsghdr[VECTOR_QUANTUM]{};
175-
send_iovecs = new(std::nothrow) iovec[VECTOR_QUANTUM * MAX_SEGS_IN_VECTOR]{};
176-
177-
recv_vector = new(std::nothrow) mmsghdr[VECTOR_QUANTUM]{};
178-
recv_iovecs = new(std::nothrow) iovec[VECTOR_QUANTUM]{};
179-
pkt_recv_vector = new(std::nothrow) bess::Packet*[VECTOR_QUANTUM];
180-
181-
for (int i=0; i < VECTOR_QUANTUM; i++) {
182-
pkt_recv_vector[i] = nullptr;
183-
recv_iovecs[i].iov_base = NULL;
184-
recv_iovecs[i].iov_len = 0;
185-
recv_vector[i].msg_hdr.msg_iov = &recv_iovecs[i];
186-
recv_vector[i].msg_hdr.msg_iovlen = 1;
187-
recv_vector[i].msg_hdr.msg_name = NULL;
188-
recv_vector[i].msg_hdr.msg_namelen = 0;
189-
recv_vector[i].msg_hdr.msg_control = NULL;
190-
recv_vector[i].msg_hdr.msg_controllen = 0;
191-
recv_vector[i].msg_hdr.msg_flags = 0;
192-
}
193-
194-
replenishVector(recv_iovecs, pkt_recv_vector, VECTOR_QUANTUM);
195-
196-
if ((send_vector == nullptr) || (send_iovecs == nullptr) || (recv_iovecs == nullptr) || (recv_vector == nullptr) || (pkt_recv_vector == nullptr)) {
197-
DeInit();
198-
return CommandFailure(errno, "failed to allocate vector buffers");
177+
for (size_t i = 0; i < bess::PacketBatch::kMaxBurst; i++) {
178+
recv_vector_[i] = {.msg_hdr = {.msg_name = nullptr,
179+
.msg_namelen = 0,
180+
.msg_iov = &recv_iovecs_[i],
181+
.msg_iovlen = 1,
182+
.msg_control = nullptr,
183+
.msg_controllen = 0,
184+
.msg_flags = 0},
185+
.msg_len = 0};
199186
}
200187

201-
for (int i=0; i<VECTOR_QUANTUM; i++) {
202-
send_vector[i].msg_hdr.msg_iov = &send_iovecs[i * MAX_SEGS_IN_VECTOR];
203-
}
188+
recv_iovecs_.fill({.iov_base = nullptr, .iov_len = 0});
189+
pkt_recv_vector_.fill(nullptr);
190+
ReplenishRecvVector(bess::PacketBatch::kMaxBurst);
204191

205192
return CommandSuccess();
206193
}
@@ -215,21 +202,9 @@ void UnixSocketPort::DeInit() {
215202
if (client_fd_ != kNotConnectedFd) {
216203
close(client_fd_);
217204
}
218-
if (send_vector != nullptr)
219-
delete[] send_vector;
220-
if (send_iovecs != nullptr)
221-
delete[] send_iovecs;
222-
if (recv_vector != nullptr)
223-
delete[] recv_vector;
224-
if (recv_iovecs != nullptr)
225-
delete[] recv_iovecs;
226-
if (pkt_recv_vector != nullptr) {
227-
for (int i = 0; i < VECTOR_QUANTUM ; i++) {
228-
if (pkt_recv_vector[i]) {
229-
bess::Packet::Free(pkt_recv_vector[i]);
230-
}
231-
}
232-
delete[] pkt_recv_vector;
205+
206+
for (auto *pkt : pkt_recv_vector_) {
207+
bess::Packet::Free(pkt);
233208
}
234209
}
235210

@@ -251,36 +226,31 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) {
251226
int received = 0;
252227

253228
while (received < cnt) {
254-
int ret = recvmmsg(client_fd, recv_vector, std::min(cnt - received, VECTOR_QUANTUM), 0, NULL);
255-
256-
if (ret > 0) {
257-
for (int i=0; i<ret; i++) {
258-
if ((recv_iovecs[i].iov_base != NULL) && (recv_vector[i].msg_len > 0)) {
259-
pkt_recv_vector[i]->append(recv_vector[i].msg_len);
260-
pkts[received++] = pkt_recv_vector[i];
261-
}
262-
}
263-
replenishVector(recv_iovecs, pkt_recv_vector, ret);
264-
} else {
265-
break;
229+
int ret =
230+
recvmmsg(client_fd, recv_vector_.data(), cnt - received, 0, nullptr);
231+
232+
if (ret > 0) {
233+
for (int i = 0; i < ret; i++) {
234+
if ((recv_iovecs_[i].iov_base != nullptr) &&
235+
(recv_vector_[i].msg_len > 0)) {
236+
pkt_recv_vector_[i]->append(recv_vector_[i].msg_len);
237+
pkts[received++] = pkt_recv_vector_[i];
238+
}
266239
}
240+
ReplenishRecvVector(ret);
241+
} else {
242+
break;
243+
}
267244
}
268245

269246
last_idle_ns_ = (received == 0) ? now_ns : 0;
270247

271248
return received;
272249
}
273250

274-
static void enqueuePacket(struct iovec *iov, bess::Packet *pkt, int nb_segs) {
275-
for (int j = 0; j < nb_segs; j++) {
276-
iov[j].iov_base = pkt->head_data();
277-
iov[j].iov_len = pkt->head_len();
278-
pkt = pkt->next();
279-
}
280-
}
281-
282251
int UnixSocketPort::SendPackets(queue_t qid, bess::Packet **pkts, int cnt) {
283-
int sent = 0, vecindex = 0;
252+
int i;
253+
int sent = 0;
284254
int client_fd = client_fd_;
285255

286256
DCHECK_EQ(qid, 0);
@@ -289,56 +259,39 @@ int UnixSocketPort::SendPackets(queue_t qid, bess::Packet **pkts, int cnt) {
289259
return 0;
290260
}
291261

292-
for (int i = 0; i < cnt; i++) {
262+
size_t iovec_idx = 0;
263+
for (i = 0; i < cnt; i++) {
293264
bess::Packet *pkt = pkts[i];
294-
295265
int nb_segs = pkt->nb_segs();
296-
if ((nb_segs <= MAX_SEGS_IN_VECTOR) || (vecindex > VECTOR_QUANTUM)) {
297-
enqueuePacket(send_vector[vecindex].msg_hdr.msg_iov, pkt, nb_segs);
298-
send_vector[vecindex].msg_hdr.msg_iovlen = nb_segs;
299-
vecindex++;
300-
} else {
301-
if (vecindex > 0) {
302-
int vecret;
303-
vecret = sendmmsg(client_fd, send_vector, vecindex, 0);
304-
vecindex = 0;
305-
if (vecret < 0)
306-
break;
307-
sent += vecret;
308-
if (vecret < vecindex)
309-
break;
310-
}
311-
if (nb_segs > MAX_SEGS_IN_VECTOR) {
312-
struct iovec iov[nb_segs];
313-
314-
struct msghdr msg = msghdr();
315-
msg.msg_iov = iov;
316-
msg.msg_iovlen = nb_segs;
317-
318-
enqueuePacket(iov, pkt, nb_segs);
319266

320-
ssize_t ret;
321-
322-
ret = sendmsg(client_fd, &msg, 0);
323-
if (ret < 0) {
324-
break;
325-
}
267+
for (int j = 0; j < nb_segs; j++) {
268+
if (iovec_idx >= send_iovecs_.size()) {
269+
break;
270+
}
271+
send_iovecs_[iovec_idx++] = {
272+
.iov_base = pkt->head_data(),
273+
.iov_len = static_cast<size_t>(pkt->head_len())};
274+
pkt = pkt->next();
275+
}
326276

327-
sent++;
328-
}
329-
}
277+
send_vector_[i] = {
278+
.msg_hdr = {.msg_name = nullptr,
279+
.msg_namelen = 0,
280+
.msg_iov = &send_iovecs_[iovec_idx - nb_segs],
281+
.msg_iovlen = static_cast<size_t>(nb_segs),
282+
.msg_control = nullptr,
283+
.msg_controllen = 0,
284+
.msg_flags = 0},
285+
.msg_len = 0};
330286
}
331287

332-
/* Final flush if there is an outstanding vector */
333-
334-
if (vecindex > 0) {
335-
int vecret;
336-
vecret = sendmmsg(client_fd, send_vector, vecindex, 0);
337-
if (vecret > 0)
338-
sent += vecret;
339-
}
340-
if (sent) {
341-
bess::Packet::Free(pkts, sent);
288+
if (!send_vector_.empty()) {
289+
sent = sendmmsg(client_fd, send_vector_.data(), i, 0);
290+
if (sent > 0) {
291+
bess::Packet::Free(pkts, sent);
292+
} else {
293+
sent = 0;
294+
}
342295
}
343296

344297
return sent;

core/drivers/unix_socket.h

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,13 @@
3030

3131
#ifndef BESS_DRIVERS_UNIXSOCKET_H_
3232
#define BESS_DRIVERS_UNIXSOCKET_H_
33-
#ifndef _GNU_SOURCE
34-
#define _GNU_SOURCE
35-
#endif
36-
#include <sys/types.h>
33+
3734
#include <sys/socket.h>
35+
#include <sys/types.h>
3836
#include <sys/un.h>
3937
#include <unistd.h>
4038

39+
#include <array>
4140
#include <atomic>
4241
#include <thread>
4342

@@ -46,9 +45,6 @@
4645

4746
#include "../utils/syscallthread.h"
4847

49-
#define MAX_SEGS_IN_VECTOR 8
50-
#define VECTOR_QUANTUM 64
51-
5248
class UnixSocketPort;
5349

5450
// We promise to block only in ppoll(),
@@ -103,11 +99,15 @@ class UnixSocketPort final : public Port {
10399
int SendPackets(queue_t qid, bess::Packet **pkts, int cnt) override;
104100

105101
private:
106-
107-
// These rely on there beeing no multiqueue support !!!
108-
struct mmsghdr *send_vector, *recv_vector;
109-
bess::Packet **pkt_recv_vector;
110-
struct iovec *send_iovecs, *recv_iovecs;
102+
void ReplenishRecvVector(int cnt);
103+
104+
// These rely on there being no multiqueue support !!!
105+
std::array<bess::Packet *, bess::PacketBatch::kMaxBurst> pkt_recv_vector_;
106+
std::array<mmsghdr, bess::PacketBatch::kMaxBurst> recv_vector_;
107+
std::array<iovec, bess::PacketBatch::kMaxBurst> recv_iovecs_;
108+
// send_iovecs reserves *8 elements for segmented packets
109+
std::array<mmsghdr, bess::PacketBatch::kMaxBurst> send_vector_;
110+
std::array<iovec, bess::PacketBatch::kMaxBurst * 8> send_iovecs_;
111111

112112
// Value for a disconnected socket.
113113
static const int kNotConnectedFd = -1;

0 commit comments

Comments
 (0)