From c5f7da4bee18447240aecc1f7e5719d9b55c87c6 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Sun, 16 Jun 2024 23:24:00 +0800 Subject: [PATCH 1/2] Fix tcp connect interrupt --- src/bthread/fd.cpp | 31 ++++++---------- src/butil/endpoint.cpp | 62 ++++++++++++++------------------ src/butil/fd_utility.cpp | 51 ++++++++++++++++++++++++-- src/butil/fd_utility.h | 5 ++- test/bthread_fd_unittest.cpp | 50 ++++++++++++++++++++++++++ test/endpoint_unittest.cpp | 70 ++++++++++++++++++++++++++++++++++++ test/run_tests.sh | 2 +- 7 files changed, 210 insertions(+), 61 deletions(-) diff --git a/src/bthread/fd.cpp b/src/bthread/fd.cpp index e97cee2c3b..b65dca4838 100644 --- a/src/bthread/fd.cpp +++ b/src/bthread/fd.cpp @@ -264,9 +264,11 @@ class EpollThread { return -1; } #endif - if (butex_wait(butex, expected_val, abstime) < 0 && - errno != EWOULDBLOCK && errno != EINTR) { - return -1; + while (butex->load(butil::memory_order_relaxed) == expected_val) { + if (butex_wait(butex, expected_val, abstime) < 0 && + errno != EWOULDBLOCK && errno != EINTR) { + return -1; + } } return 0; } @@ -496,17 +498,11 @@ int bthread_connect(int sockfd, const sockaddr* serv_addr, #endif return -1; } - int err; - socklen_t errlen = sizeof(err); - if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) { - PLOG(FATAL) << "Fail to getsockopt"; - return -1; - } - if (err != 0) { - CHECK(err != EINPROGRESS); - errno = err; + + if (butil::is_connected(sockfd) != 0) { return -1; } + return 0; } @@ -539,17 +535,10 @@ int bthread_timed_connect(int sockfd, const struct sockaddr* serv_addr, return -1; } - int err; - socklen_t errlen = sizeof(err); - if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) { - PLOG(FATAL) << "Fail to getsockopt"; - return -1; - } - if (err != 0) { - CHECK(err != EINPROGRESS); - errno = err; + if (butil::is_connected(sockfd) != 0) { return -1; } + return 0; } diff --git a/src/butil/endpoint.cpp b/src/butil/endpoint.cpp index cfd536a06f..06d0613b43 100644 --- a/src/butil/endpoint.cpp +++ b/src/butil/endpoint.cpp @@ -61,7 +61,7 @@ int BAIDU_WEAK bthread_timed_connect( __END_DECLS -#include "details/extended_endpoint.hpp" +#include "butil/details/extended_endpoint.hpp" namespace butil { @@ -441,13 +441,18 @@ int pthread_fd_wait(int fd, unsigned events, return -1; } pollfd ufds = { fd, poll_events, 0 }; - const int rc = poll(&ufds, 1, diff_ms); - if (rc < 0) { - return -1; - } - if (rc == 0) { - errno = ETIMEDOUT; - return -1; + while (true) { + int rc = poll(&ufds, 1, diff_ms); + if (rc < 0 && errno != EINTR) { + return -1; + } + if (rc == 0) { + errno = ETIMEDOUT; + return -1; + } + if (rc > 0) { + break; + } } if (ufds.revents & POLLNVAL) { errno = EBADF; @@ -458,10 +463,6 @@ int pthread_fd_wait(int fd, unsigned events, int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr, socklen_t addrlen, const timespec* abstime) { - if (abstime == NULL) { - return ::connect(sockfd, serv_addr, addrlen); - } - bool is_blocking = butil::is_blocking(sockfd); if (is_blocking) { butil::make_non_blocking(sockfd); @@ -485,15 +486,7 @@ int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr, return -1; } - int err; - socklen_t errlen = sizeof(err); - if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) { - PLOG(FATAL) << "Fail to getsockopt"; - return -1; - } - if (err != 0) { - CHECK(err != EINPROGRESS); - errno = err; + if (is_connected(sockfd) != 0) { return -1; } return 0; @@ -513,22 +506,19 @@ int tcp_connect(const EndPoint& server, int* self_port, int connect_timeout_ms) if (sockfd < 0) { return -1; } - int rc = 0; - if (connect_timeout_ms <= 0) { - if (bthread_connect != NULL) { - rc = bthread_connect(sockfd, (struct sockaddr*)&serv_addr, serv_addr_size); - } else { - rc = ::connect(sockfd, (struct sockaddr*) &serv_addr, serv_addr_size); - } + timespec abstime{}; + timespec* abstime_ptr = NULL; + if (connect_timeout_ms > 0) { + abstime = butil::milliseconds_from_now(connect_timeout_ms); + abstime_ptr = &abstime; + } + int rc; + if (bthread_timed_connect != NULL) { + rc = bthread_timed_connect(sockfd, (struct sockaddr*)&serv_addr, + serv_addr_size, abstime_ptr); } else { - timespec abstime = butil::milliseconds_from_now(connect_timeout_ms); - if (bthread_timed_connect != NULL) { - rc = bthread_timed_connect(sockfd, (struct sockaddr*)&serv_addr, - serv_addr_size, &abstime); - } else { - rc = pthread_timed_connect(sockfd, (struct sockaddr*) &serv_addr, - serv_addr_size, &abstime); - } + rc = pthread_timed_connect(sockfd, (struct sockaddr*) &serv_addr, + serv_addr_size, abstime_ptr); } if (rc < 0) { return -1; diff --git a/src/butil/fd_utility.cpp b/src/butil/fd_utility.cpp index 4557776930..52410f8c4f 100644 --- a/src/butil/fd_utility.cpp +++ b/src/butil/fd_utility.cpp @@ -17,11 +17,17 @@ // Date: Mon. Nov 7 14:47:36 CST 2011 +#include "butil/build_config.h" #include // fcntl() #include // IPPROTO_TCP #include #include // setsockopt #include // TCP_NODELAY +#include +#if defined(OS_MACOSX) +#include // TCPS_ESTABLISHED, TCP6S_ESTABLISHED +#endif +#include "butil/logging.h" namespace butil { @@ -56,9 +62,50 @@ int make_close_on_exec(int fd) { return fcntl(fd, F_SETFD, FD_CLOEXEC); } -int make_no_delay(int socket) { +int make_no_delay(int sockfd) { int flag = 1; - return setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); + return setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); +} + +int is_connected(int sockfd) { + errno = 0; + int err; + socklen_t errlen = sizeof(err); + if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) { + PLOG(FATAL) << "Fail to getsockopt"; + return -1; + } + if (err != 0) { + errno = err; + return -1; + } + +#if defined(OS_LINUX) + struct tcp_info ti{}; + socklen_t len = sizeof(ti); + if(getsockopt(sockfd, SOL_TCP, TCP_INFO, &ti, &len) < 0) { + PLOG(FATAL) << "Fail to getsockopt"; + return -1; + } + if (ti.tcpi_state != TCP_ESTABLISHED) { + errno = ENOTCONN; + return -1; + } +#elif defined(OS_MACOSX) + struct tcp_connection_info ti{}; + socklen_t len = sizeof(ti); + if (getsockopt(sockfd, IPPROTO_TCP, TCP_CONNECTION_INFO, &ti, &len) < 0) { + PLOG(FATAL) << "Fail to getsockopt"; + return -1; + } + if (ti.tcpi_state != TCPS_ESTABLISHED && + ti.tcpi_state != TCP6S_ESTABLISHED) { + errno = ENOTCONN; + return -1; + } +#endif + + return 0; } } // namespace butil diff --git a/src/butil/fd_utility.h b/src/butil/fd_utility.h index 8d93363d6b..5c920eb389 100644 --- a/src/butil/fd_utility.h +++ b/src/butil/fd_utility.h @@ -41,7 +41,10 @@ int make_close_on_exec(int fd); // Disable nagling on file descriptor |socket|. // Returns 0 on success, -1 when error and errno is set (by setsockopt) -int make_no_delay(int socket); +int make_no_delay(int sockfd); + +// Return true if the socket is connected. +int is_connected(int sockfd); } // namespace butil diff --git a/test/bthread_fd_unittest.cpp b/test/bthread_fd_unittest.cpp index ec94f79f08..32c7701f4e 100644 --- a/test/bthread_fd_unittest.cpp +++ b/test/bthread_fd_unittest.cpp @@ -34,9 +34,11 @@ #include "bthread/interrupt_pthread.h" #include "bthread/bthread.h" #include "bthread/unstable.h" +#include #if defined(OS_MACOSX) #include // struct kevent #include // kevent(), kqueue() +#include #endif #ifndef NDEBUG @@ -594,4 +596,52 @@ TEST(FDTest, bthread_connect) { } } +void TestConnectInterruptImpl(bool timed) { + butil::EndPoint ep; + ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep)); + struct sockaddr_storage serv_addr{}; + socklen_t serv_addr_size = 0; + ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size)); + butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0)); + ASSERT_GE(sockfd, 0); + + int rc; + if (timed) { + int64_t start_ms = butil::cpuwide_time_ms(); + butil::tcp_connect(ep, NULL); + int64_t connect_ms = butil::cpuwide_time_ms() - start_ms; + LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms"; + + timespec abstime = butil::milliseconds_from_now(connect_ms + 1); + rc = bthread_timed_connect( + sockfd, (struct sockaddr*) &serv_addr, + serv_addr_size, &abstime); + } else { + rc = bthread_timed_connect( + sockfd, (struct sockaddr*) &serv_addr, + serv_addr_size, NULL); + } + ASSERT_EQ(0, rc) << "errno=" << errno; + ASSERT_EQ(0, butil::is_connected(sockfd)); + +} + +void* ConnectThread(void* arg) { + bool timed = *(bool*)arg; + TestConnectInterruptImpl(timed); + return NULL; +} + +void TestConnectInterrupt(bool timed) { + bthread_t tid; + ASSERT_EQ(0, bthread_start_background(&tid, NULL, ConnectThread, &timed)); + ASSERT_EQ(0, bthread_stop(tid)); + ASSERT_EQ(0, bthread_join(tid, NULL)); +} + +TEST(FDTest, interrupt) { + TestConnectInterrupt(false); + TestConnectInterrupt(true); +} + } // namespace diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp index fcb23a7b27..a785be19b9 100644 --- a/test/endpoint_unittest.cpp +++ b/test/endpoint_unittest.cpp @@ -23,6 +23,10 @@ #include "butil/logging.h" #include "butil/containers/flat_map.h" #include "butil/details/extended_endpoint.hpp" +#include +#if defined(OS_MACOSX) +#include +#endif namespace butil { int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr, @@ -528,4 +532,70 @@ TEST(EndPointTest, tcp_connect) { } } +bool g_connect_startd = false; + +void TestConnectInterruptImpl(bool timed) { + butil::EndPoint ep; + ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep)); + + struct sockaddr_storage serv_addr{}; + socklen_t serv_addr_size = 0; + ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size)); + butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0)); + ASSERT_LE(0, sockfd); + + int rc; + if (timed) { + int64_t start_ms = butil::cpuwide_time_ms(); + butil::tcp_connect(ep, NULL); + int64_t connect_ms = butil::cpuwide_time_ms() - start_ms; + LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms"; + + timespec abstime = butil::milliseconds_from_now(connect_ms + 1); + rc = butil::pthread_timed_connect( + sockfd, (struct sockaddr*) &serv_addr, + serv_addr_size, &abstime); + } else { + rc = butil::pthread_timed_connect( + sockfd, (struct sockaddr*) &serv_addr, + serv_addr_size, NULL); + } + ASSERT_EQ(0, rc) << "errno=" << errno; + ASSERT_EQ(0, butil::is_connected(sockfd)); +} + +void* ConnectThread(void* arg) { + bool timed = *(bool*)arg; + TestConnectInterruptImpl(timed); + return NULL; +} + +void sig_handler(int sig) { + LOG(INFO) << "sig=" << sig; +} + +void register_sigurg() { + signal(SIGURG, sig_handler); +} + +void TestConnectInterrupt(bool timed) { + g_connect_startd = false; + pthread_t tid; + ASSERT_EQ(0, pthread_create(&tid, NULL, ConnectThread, &timed)); + + while (g_connect_startd) { + usleep(1000); + } + + ASSERT_EQ(0, pthread_kill(tid, SIGURG)); + + pthread_join(tid, NULL); +} + +TEST(EndPointTest, interrupt) { + register_sigurg(); + TestConnectInterrupt(false); + TestConnectInterrupt(true); +} + } // end of namespace diff --git a/test/run_tests.sh b/test/run_tests.sh index ebd648402a..142977975a 100755 --- a/test/run_tests.sh +++ b/test/run_tests.sh @@ -43,7 +43,7 @@ print_bt () { COREFILE=$(find . -name "core*" -type f -printf "%T@ %p\n" | sort -k 1 -n | cut -d' ' -f 2- | tail -n 1) if [ ! -z "$COREFILE" ]; then >&2 echo "corefile=$COREFILE prog=$1" - gdb -c "$COREFILE" $1 -ex "thread apply all bt" -ex "set pagination 0" -batch; + gdb -c "$COREFILE" $1 -ex "bt" -ex "thread apply all bt" -ex "set pagination 0" -batch; fi } From 5edf24ec1f2987ee4393bbea9a5a77744fe5b0fa Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Thu, 20 Jun 2024 21:12:42 +0800 Subject: [PATCH 2/2] Recalculate timeout --- src/butil/endpoint.cpp | 40 +++++++++++++++++++----------------- test/bthread_fd_unittest.cpp | 2 +- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/src/butil/endpoint.cpp b/src/butil/endpoint.cpp index 06d0613b43..2a7d9e3c14 100644 --- a/src/butil/endpoint.cpp +++ b/src/butil/endpoint.cpp @@ -419,18 +419,6 @@ short kqueue_to_poll_events(int kqueue_events) { int pthread_fd_wait(int fd, unsigned events, const timespec* abstime) { - int diff_ms = -1; - if (abstime) { - timespec now; - clock_gettime(CLOCK_REALTIME, &now); - int64_t now_us = butil::timespec_to_microseconds(now); - int64_t abstime_us = butil::timespec_to_microseconds(*abstime); - if (abstime_us <= now_us) { - errno = ETIMEDOUT; - return -1; - } - diff_ms = (abstime_us - now_us + 999L) / 1000L; - } #if defined(OS_LINUX) const short poll_events = epoll_to_poll_events(events); #elif defined(OS_MACOSX) @@ -441,17 +429,31 @@ int pthread_fd_wait(int fd, unsigned events, return -1; } pollfd ufds = { fd, poll_events, 0 }; + int64_t abstime_us = -1; + if (NULL != abstime) { + abstime_us = butil::timespec_to_microseconds(*abstime); + } while (true) { - int rc = poll(&ufds, 1, diff_ms); - if (rc < 0 && errno != EINTR) { - return -1; - } - if (rc == 0) { - errno = ETIMEDOUT; - return -1; + int diff_ms = -1; + if (NULL != abstime) { + int64_t now_us = butil::gettimeofday_us(); + if (abstime_us <= now_us) { + errno = ETIMEDOUT; + return -1; + } + diff_ms = (abstime_us - now_us + 999L) / 1000L; } + int rc = poll(&ufds, 1, diff_ms); if (rc > 0) { break; + } else if (rc == 0) { + errno = ETIMEDOUT; + return -1; + } else { + if (errno == EINTR) { + continue; + } + return -1; } } if (ufds.revents & POLLNVAL) { diff --git a/test/bthread_fd_unittest.cpp b/test/bthread_fd_unittest.cpp index 32c7701f4e..ec5df53677 100644 --- a/test/bthread_fd_unittest.cpp +++ b/test/bthread_fd_unittest.cpp @@ -612,7 +612,7 @@ void TestConnectInterruptImpl(bool timed) { int64_t connect_ms = butil::cpuwide_time_ms() - start_ms; LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms"; - timespec abstime = butil::milliseconds_from_now(connect_ms + 1); + timespec abstime = butil::milliseconds_from_now(connect_ms + 100); rc = bthread_timed_connect( sockfd, (struct sockaddr*) &serv_addr, serv_addr_size, &abstime);