Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/brpc/input_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,20 @@ DEFINE_bool(socket_keepalive, false,
"Enable keepalive of sockets if this value is true");

DEFINE_int32(socket_keepalive_idle_s, -1,
"Set idle time of sockets before keepalive if this value is positive");
"Set idle time for socket keepalive in seconds if this value is positive");

DEFINE_int32(socket_keepalive_interval_s, -1,
"Set interval of sockets between keepalives if this value is positive");
"Set interval between keepalives in seconds if this value is positive");

DEFINE_int32(socket_keepalive_count, -1,
"Set number of keepalives of sockets before close if this value is positive");
"Set number of keepalives before death if this value is positive");

DEFINE_int32(socket_tcp_user_timeout_ms, -1,
"If this value is positive, set number of milliseconds that transmitted "
"data may remain unacknowledged, or bufferred data may remain untransmitted "
"(due to zero window size) before TCP will forcibly close the corresponding "
"connection and return ETIMEDOUT to the application. Only linux supports "
"TCP_USER_TIMEOUT.");

DECLARE_bool(usercode_in_pthread);
DECLARE_bool(usercode_in_coroutine);
Expand Down Expand Up @@ -501,6 +508,7 @@ int InputMessenger::Create(const butil::EndPoint& remote_side,
options.keepalive_options->keepalive_count
= FLAGS_socket_keepalive_count;
}
options.tcp_user_timeout_ms = FLAGS_socket_tcp_user_timeout_ms;
return Socket::Create(options, id);
}

Expand Down Expand Up @@ -535,6 +543,9 @@ int InputMessenger::Create(SocketOptions options, SocketId* id) {
= FLAGS_socket_keepalive_count;
}
}
if (options.tcp_user_timeout_ms <= 0) {
options.tcp_user_timeout_ms = FLAGS_socket_tcp_user_timeout_ms;
}
return Socket::Create(options, id);
}

Expand Down
49 changes: 35 additions & 14 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ Socket::Socket(Forbidden f)
, _stream_set(NULL)
, _total_streams_unconsumed_size(0)
, _ninflight_app_health_check(0)
, _tcp_user_timeout_ms(-1)
, _http_request_method(HTTP_METHOD_GET) {
CreateVarsOnce();
pthread_mutex_init(&_id_wait_list_mutex, NULL);
Expand Down Expand Up @@ -597,6 +598,21 @@ int Socket::ResetFileDescriptor(int fd) {
// turn off nagling.
// OK to fail, namely unix domain socket does not support this.
butil::make_no_delay(fd);

SetSocketOptions(fd);

if (_on_edge_triggered_events) {
if (_io_event.AddConsumer(fd) != 0) {
PLOG(ERROR) << "Fail to add SocketId=" << id()
<< " into EventDispatcher";
_fd.store(-1, butil::memory_order_release);
return -1;
}
}
return 0;
}

void Socket::SetSocketOptions(int fd) {
if (_tos > 0 &&
setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) != 0) {
PLOG(ERROR) << "Fail to set tos of fd=" << fd << " to " << _tos;
Expand All @@ -618,27 +634,21 @@ int Socket::ResetFileDescriptor(int fd) {
}
}

EnableKeepaliveIfNeeded(fd);

if (_on_edge_triggered_events) {
if (_io_event.AddConsumer(fd) != 0) {
PLOG(ERROR) << "Fail to add SocketId=" << id()
<< " into EventDispatcher";
_fd.store(-1, butil::memory_order_release);
return -1;
#if defined(OS_LINUX)
if (_tcp_user_timeout_ms > 0) {
if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT,
&_tcp_user_timeout_ms, sizeof(_tcp_user_timeout_ms)) != 0) {
PLOG(ERROR) << "Fail to set TCP_USER_TIMEOUT of fd=" << fd;
}
}
return 0;
}
#endif

void Socket::EnableKeepaliveIfNeeded(int fd) {
if (!_keepalive_options) {
return;
}

int keepalive = 1;
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive,
sizeof(keepalive)) != 0) {
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)) != 0) {
PLOG(ERROR) << "Fail to set keepalive of fd=" << fd;
return;
}
Expand Down Expand Up @@ -782,6 +792,7 @@ int Socket::OnCreated(const SocketOptions& options) {
_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
_unwritten_bytes.store(0, butil::memory_order_relaxed);
_keepalive_options = options.keepalive_options;
_tcp_user_timeout_ms = options.tcp_user_timeout_ms;
CHECK(NULL == _write_head.load(butil::memory_order_relaxed));
_is_write_shutdown = false;
int fd = options.fd;
Expand Down Expand Up @@ -1388,7 +1399,7 @@ int Socket::CheckConnected(int sockfd) {
butil::EndPoint local_point;
CHECK_EQ(0, butil::get_local_side(sockfd, &local_point));
LOG(INFO) << "Connected to " << remote_side()
<< " via fd=" << (int)sockfd << " SocketId=" << id()
<< " via fd=" << sockfd << " SocketId=" << id()
<< " local_side=" << local_point;
}

Expand Down Expand Up @@ -2501,6 +2512,16 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
#endif
}

#if defined(OS_LINUX)
{
int tcp_user_timeout = 0;
socklen_t len = sizeof(tcp_user_timeout);
if (getsockopt(fd, SOL_TCP, TCP_USER_TIMEOUT, &tcp_user_timeout, &len) == 0) {
os << "\ntcp_user_timeout=" << tcp_user_timeout;
}
}
#endif

#if defined(OS_MACOSX)
struct tcp_connection_info ti;
socklen_t len = sizeof(ti);
Expand Down
52 changes: 29 additions & 23 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,60 +234,57 @@ struct SocketSSLContext {
};

struct SocketKeepaliveOptions {
SocketKeepaliveOptions()
: keepalive_idle_s(-1)
, keepalive_interval_s(-1)
, keepalive_count(-1)
{}
// Start keeplives after this period.
int keepalive_idle_s;
int keepalive_idle_s{-1};
// Interval between keepalives.
int keepalive_interval_s;
int keepalive_interval_s{-1};
// Number of keepalives before death.
int keepalive_count;
int keepalive_count{-1};
};

// TODO: Comment fields
struct SocketOptions {
SocketOptions();

// If `fd' is non-negative, set `fd' to be non-blocking and take the
// ownership. Socket will close the fd(if needed) and call
// user->BeforeRecycle() before recycling.
int fd;
int fd{-1};
butil::EndPoint remote_side;
// If `connect_on_create' is true and `fd' is less than 0,
// a client connection will be established to remote_side()
// regarding deadline `connect_abstime' when Socket is being created.
// Default: false, means that a connection will be established
// on first write.
bool connect_on_create;
bool connect_on_create{false};
// Default: NULL, means no timeout.
const timespec* connect_abstime;
SocketUser* user;
const timespec* connect_abstime{NULL};
SocketUser* user{NULL};
// When *edge-triggered* events happen on the file descriptor, callback
// `on_edge_triggered_events' will be called. Inside the callback, user
// shall read fd() in non-blocking mode until all data has been read
// or EAGAIN is met, otherwise the callback will not be called again
// until new data arrives. The callback will not be called from more than
// one thread at any time.
void (*on_edge_triggered_events)(Socket*);
int health_check_interval_s;
void (*on_edge_triggered_events)(Socket*){NULL};
int health_check_interval_s{-1};
// Only accept ssl connection.
bool force_ssl;
bool force_ssl{false};
std::shared_ptr<SocketSSLContext> initial_ssl_ctx;
bool use_rdma;
bthread_keytable_pool_t* keytable_pool;
SocketConnection* conn;
bool use_rdma{false};
bthread_keytable_pool_t* keytable_pool{NULL};
SocketConnection* conn{NULL};
std::shared_ptr<AppConnect> app_connect;
// The created socket will set parsing_context with this value.
Destroyable* initial_parsing_context;
Destroyable* initial_parsing_context{NULL};

// Socket keepalive related options.
// Refer to `SocketKeepaliveOptions' for details.
std::shared_ptr<SocketKeepaliveOptions> keepalive_options;
// https://github.com/apache/brpc/issues/1154
// https://github.com/grpc/grpc/pull/16419/files
// Only linux supports TCP_USER_TIMEOUT.
int tcp_user_timeout_ms{ -1};
// Tag of this socket
bthread_tag_t bthread_tag;
bthread_tag_t bthread_tag{BTHREAD_TAG_DEFAULT};
};

// Abstractions on reading from and writing into file descriptors.
Expand Down Expand Up @@ -725,7 +722,7 @@ friend void DereferenceSocket(Socket*);

int ResetFileDescriptor(int fd);

void EnableKeepaliveIfNeeded(int fd);
void SetSocketOptions(int fd);

// Wait until nref hits `expected_nref' and reset some internal resources.
int WaitAndReset(int32_t expected_nref);
Expand Down Expand Up @@ -973,6 +970,15 @@ friend void DereferenceSocket(Socket*);
// non-NULL means that keepalive is on.
std::shared_ptr<SocketKeepaliveOptions> _keepalive_options;

// Only linux supports TCP_USER_TIMEOUT.
// When the value is greater than 0, it specifies the maximum
// amount of time in milliseconds that transmitted data may
// remain unacknowledged, or bufferred data may remain
// untransmitted (due to zero window size) before TCP will
// forcibly close the corresponding connection and return
// ETIMEDOUT to the application.
int _tcp_user_timeout_ms;

HttpMethod _http_request_method;
};

Expand Down
15 changes: 0 additions & 15 deletions src/brpc/socket_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,6 @@

namespace brpc {

inline SocketOptions::SocketOptions()
: fd(-1)
, connect_on_create(false)
Comment thread
yanglimingcn marked this conversation as resolved.
, connect_abstime(NULL)
, user(NULL)
, on_edge_triggered_events(NULL)
, health_check_interval_s(-1)
, force_ssl(false)
, use_rdma(false)
, keytable_pool(NULL)
, conn(NULL)
, app_connect(NULL)
, initial_parsing_context(NULL)
, bthread_tag(BTHREAD_TAG_DEFAULT) {}

inline bool Socket::MoreReadEvents(int* progress) {
// Fail to CAS means that new events arrived.
return !_nevent.compare_exchange_strong(
Expand Down
4 changes: 2 additions & 2 deletions src/brpc/versioned_ref_with_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class VersionedRefWithId {
// Create a VersionedRefWithId, put the identifier into `id'.
// `args' will be passed to OnCreated() directly.
// Returns 0 on success, -1 otherwise.
template<typename ... Args>
template<typename... Args>
static int Create(VRefId* id, Args&&... args);

// Place the VersionedRefWithId associated with identifier `id' into
Expand Down Expand Up @@ -350,7 +350,7 @@ void DereferenceVersionedRefWithId(T* r) {
}

template <typename T>
template<typename ... Args>
template<typename... Args>
int VersionedRefWithId<T>::Create(VRefId* id, Args&&... args) {
resource_id_t slot;
T* const t = butil::get_resource(&slot, Forbidden());
Expand Down
Loading