Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 10 additions & 21 deletions src/bthread/fd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,11 @@ class EpollThread {
return -1;
}
#endif
if (butex_wait(butex, expected_val, abstime) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return -1;
while (butex->load(butil::memory_order_relaxed) == expected_val) {
if (butex_wait(butex, expected_val, abstime) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return -1;
}
}
return 0;
}
Expand Down Expand Up @@ -496,17 +498,11 @@ int bthread_connect(int sockfd, const sockaddr* serv_addr,
#endif
return -1;
}
int err;
socklen_t errlen = sizeof(err);
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
PLOG(FATAL) << "Fail to getsockopt";
return -1;
}
if (err != 0) {
CHECK(err != EINPROGRESS);
errno = err;

if (butil::is_connected(sockfd) != 0) {
return -1;
}

return 0;
}

Expand Down Expand Up @@ -539,17 +535,10 @@ int bthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
return -1;
}

int err;
socklen_t errlen = sizeof(err);
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
PLOG(FATAL) << "Fail to getsockopt";
return -1;
}
if (err != 0) {
CHECK(err != EINPROGRESS);
errno = err;
if (butil::is_connected(sockfd) != 0) {
return -1;
}

return 0;
}

Expand Down
88 changes: 40 additions & 48 deletions src/butil/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ int BAIDU_WEAK bthread_timed_connect(

__END_DECLS

#include "details/extended_endpoint.hpp"
#include "butil/details/extended_endpoint.hpp"

namespace butil {

Expand Down Expand Up @@ -419,18 +419,6 @@ short kqueue_to_poll_events(int kqueue_events) {

int pthread_fd_wait(int fd, unsigned events,
const timespec* abstime) {
int diff_ms = -1;
if (abstime) {
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
int64_t now_us = butil::timespec_to_microseconds(now);
int64_t abstime_us = butil::timespec_to_microseconds(*abstime);
if (abstime_us <= now_us) {
errno = ETIMEDOUT;
return -1;
}
diff_ms = (abstime_us - now_us + 999L) / 1000L;
}
#if defined(OS_LINUX)
const short poll_events = epoll_to_poll_events(events);
#elif defined(OS_MACOSX)
Expand All @@ -441,13 +429,32 @@ int pthread_fd_wait(int fd, unsigned events,
return -1;
}
pollfd ufds = { fd, poll_events, 0 };
const int rc = poll(&ufds, 1, diff_ms);
if (rc < 0) {
return -1;
}
if (rc == 0) {
errno = ETIMEDOUT;
return -1;
int64_t abstime_us = -1;
if (NULL != abstime) {
abstime_us = butil::timespec_to_microseconds(*abstime);
}
while (true) {
int diff_ms = -1;
if (NULL != abstime) {
int64_t now_us = butil::gettimeofday_us();
if (abstime_us <= now_us) {
errno = ETIMEDOUT;
return -1;
}
diff_ms = (abstime_us - now_us + 999L) / 1000L;
}
int rc = poll(&ufds, 1, diff_ms);
if (rc > 0) {
break;
} else if (rc == 0) {
errno = ETIMEDOUT;
return -1;
} else {
if (errno == EINTR) {
continue;
}
return -1;
}
}
if (ufds.revents & POLLNVAL) {
errno = EBADF;
Expand All @@ -458,10 +465,6 @@ int pthread_fd_wait(int fd, unsigned events,

int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
socklen_t addrlen, const timespec* abstime) {
if (abstime == NULL) {
return ::connect(sockfd, serv_addr, addrlen);
}

bool is_blocking = butil::is_blocking(sockfd);
if (is_blocking) {
butil::make_non_blocking(sockfd);
Expand All @@ -485,15 +488,7 @@ int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
return -1;
}

int err;
socklen_t errlen = sizeof(err);
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
PLOG(FATAL) << "Fail to getsockopt";
return -1;
}
if (err != 0) {
CHECK(err != EINPROGRESS);
errno = err;
if (is_connected(sockfd) != 0) {
return -1;
}
return 0;
Expand All @@ -513,22 +508,19 @@ int tcp_connect(const EndPoint& server, int* self_port, int connect_timeout_ms)
if (sockfd < 0) {
return -1;
}
int rc = 0;
if (connect_timeout_ms <= 0) {
if (bthread_connect != NULL) {
rc = bthread_connect(sockfd, (struct sockaddr*)&serv_addr, serv_addr_size);
} else {
rc = ::connect(sockfd, (struct sockaddr*) &serv_addr, serv_addr_size);
}
timespec abstime{};
timespec* abstime_ptr = NULL;
if (connect_timeout_ms > 0) {
abstime = butil::milliseconds_from_now(connect_timeout_ms);
abstime_ptr = &abstime;
}
int rc;
if (bthread_timed_connect != NULL) {
rc = bthread_timed_connect(sockfd, (struct sockaddr*)&serv_addr,
serv_addr_size, abstime_ptr);
} else {
timespec abstime = butil::milliseconds_from_now(connect_timeout_ms);
if (bthread_timed_connect != NULL) {
rc = bthread_timed_connect(sockfd, (struct sockaddr*)&serv_addr,
serv_addr_size, &abstime);
} else {
rc = pthread_timed_connect(sockfd, (struct sockaddr*) &serv_addr,
serv_addr_size, &abstime);
}
rc = pthread_timed_connect(sockfd, (struct sockaddr*) &serv_addr,
serv_addr_size, abstime_ptr);
}
if (rc < 0) {
return -1;
Expand Down
51 changes: 49 additions & 2 deletions src/butil/fd_utility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

// Date: Mon. Nov 7 14:47:36 CST 2011

#include "butil/build_config.h"
#include <fcntl.h> // fcntl()
#include <netinet/in.h> // IPPROTO_TCP
#include <sys/types.h>
#include <sys/socket.h> // setsockopt
#include <netinet/tcp.h> // TCP_NODELAY
#include <netinet/tcp.h>
#if defined(OS_MACOSX)
#include <netinet/tcp_fsm.h> // TCPS_ESTABLISHED, TCP6S_ESTABLISHED
#endif
#include "butil/logging.h"

namespace butil {

Expand Down Expand Up @@ -56,9 +62,50 @@ int make_close_on_exec(int fd) {
return fcntl(fd, F_SETFD, FD_CLOEXEC);
}

int make_no_delay(int socket) {
int make_no_delay(int sockfd) {
int flag = 1;
return setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
return setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
}

int is_connected(int sockfd) {
errno = 0;
int err;
socklen_t errlen = sizeof(err);
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
PLOG(FATAL) << "Fail to getsockopt";
return -1;
}
if (err != 0) {
errno = err;
return -1;
}

#if defined(OS_LINUX)
struct tcp_info ti{};
socklen_t len = sizeof(ti);
if(getsockopt(sockfd, SOL_TCP, TCP_INFO, &ti, &len) < 0) {
PLOG(FATAL) << "Fail to getsockopt";
return -1;
}
if (ti.tcpi_state != TCP_ESTABLISHED) {
errno = ENOTCONN;
return -1;
}
#elif defined(OS_MACOSX)
struct tcp_connection_info ti{};
socklen_t len = sizeof(ti);
if (getsockopt(sockfd, IPPROTO_TCP, TCP_CONNECTION_INFO, &ti, &len) < 0) {
PLOG(FATAL) << "Fail to getsockopt";
return -1;
}
if (ti.tcpi_state != TCPS_ESTABLISHED &&
ti.tcpi_state != TCP6S_ESTABLISHED) {
errno = ENOTCONN;
return -1;
}
#endif

return 0;
}

} // namespace butil
5 changes: 4 additions & 1 deletion src/butil/fd_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ int make_close_on_exec(int fd);

// Disable nagling on file descriptor |socket|.
// Returns 0 on success, -1 when error and errno is set (by setsockopt)
int make_no_delay(int socket);
int make_no_delay(int sockfd);

// Return true if the socket is connected.
int is_connected(int sockfd);

} // namespace butil

Expand Down
50 changes: 50 additions & 0 deletions test/bthread_fd_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
#include "bthread/interrupt_pthread.h"
#include "bthread/bthread.h"
#include "bthread/unstable.h"
#include <netinet/tcp.h>
#if defined(OS_MACOSX)
#include <sys/types.h> // struct kevent
#include <sys/event.h> // kevent(), kqueue()
#include <netinet/tcp_fsm.h>
#endif

#ifndef NDEBUG
Expand Down Expand Up @@ -594,4 +596,52 @@ TEST(FDTest, bthread_connect) {
}
}

void TestConnectInterruptImpl(bool timed) {
butil::EndPoint ep;
ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep));
struct sockaddr_storage serv_addr{};
socklen_t serv_addr_size = 0;
ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size));
butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
ASSERT_GE(sockfd, 0);

int rc;
if (timed) {
int64_t start_ms = butil::cpuwide_time_ms();
butil::tcp_connect(ep, NULL);
int64_t connect_ms = butil::cpuwide_time_ms() - start_ms;
LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms";

timespec abstime = butil::milliseconds_from_now(connect_ms + 100);
rc = bthread_timed_connect(
sockfd, (struct sockaddr*) &serv_addr,
serv_addr_size, &abstime);
} else {
rc = bthread_timed_connect(
sockfd, (struct sockaddr*) &serv_addr,
serv_addr_size, NULL);
}
ASSERT_EQ(0, rc) << "errno=" << errno;
ASSERT_EQ(0, butil::is_connected(sockfd));

}

void* ConnectThread(void* arg) {
bool timed = *(bool*)arg;
TestConnectInterruptImpl(timed);
return NULL;
}

void TestConnectInterrupt(bool timed) {
bthread_t tid;
ASSERT_EQ(0, bthread_start_background(&tid, NULL, ConnectThread, &timed));
ASSERT_EQ(0, bthread_stop(tid));
ASSERT_EQ(0, bthread_join(tid, NULL));
}

TEST(FDTest, interrupt) {
TestConnectInterrupt(false);
TestConnectInterrupt(true);
}

} // namespace
Loading