From 9d8d3750d1753502c99bb93f5890e4c58f905bf7 Mon Sep 17 00:00:00 2001 From: wangwei <1261385937@qq.com> Date: Thu, 15 Sep 2022 20:17:39 +0800 Subject: [PATCH 1/4] fix windows compile error (C-style designated initializer syntax) --- ut/socket_ut.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ut/socket_ut.cpp b/ut/socket_ut.cpp index 36b6a65b..9e4a2529 100644 --- a/ut/socket_ut.cpp +++ b/ut/socket_ut.cpp @@ -43,7 +43,7 @@ TEST(Socketcase, timeoutrecv) { std::this_thread::sleep_for(std::chrono::seconds(1)); try { - Socket socket(addr, SocketTimeoutParams { .recv_timeout = Seconds(5), .send_timeout = Seconds(5) }); + Socket socket(addr, SocketTimeoutParams { Seconds(5), Seconds(5) }); std::unique_ptr ptr_input_stream = socket.makeInputStream(); char buf[1024]; From ebc7ab1aba7bd035deeebc44ca0be9860d739992 Mon Sep 17 00:00:00 2001 From: wangwei <1261385937@qq.com> Date: Thu, 15 Sep 2022 21:29:18 +0800 Subject: [PATCH 2/4] add windows socket timeout implementation. --- clickhouse/base/socket.cpp | 28 ++++++++++++++++++++++++---- clickhouse/base/socket.h | 7 +++++-- ut/socket_ut.cpp | 14 ++++++++++---- 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/clickhouse/base/socket.cpp b/clickhouse/base/socket.cpp index e0f8fb1c..dbfc11c3 100644 --- a/clickhouse/base/socket.cpp +++ b/clickhouse/base/socket.cpp @@ -114,11 +114,27 @@ void SetNonBlock(SOCKET fd, bool value) { void SetTimeout(SOCKET fd, const SocketTimeoutParams& timeout_params) { #if defined(_unix_) - timeval recv_timeout { .tv_sec = timeout_params.recv_timeout.count(), .tv_usec = 0 }; - setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeout, sizeof(recv_timeout)); + timeval recv_timeout { timeout_params.recv_timeout_s.count(), timeout_params.recv_timeout_us.count() }; + auto recv_ret = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeout, sizeof(recv_timeout)); - timeval send_timeout { .tv_sec = timeout_params.send_timeout.count(), .tv_usec = 0 }; - setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeout, sizeof(send_timeout)); + timeval send_timeout { timeout_params.send_timeout_s.count(), timeout_params.send_timeout_us.count() }; + auto send_ret = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeout, sizeof(send_timeout)); + + if (recv_ret == -1 || send_ret == -1) { + throw std::system_error(getSocketErrorCode(), getErrorCategory(), "fail to set socket timeout"); + } +#else + const struct timeval recv_tv { timeout_params.recv_timeout_s.count(), timeout_params.recv_timeout_us.count()}; + DWORD recv_timeout = recv_tv.tv_sec * 1000 + recv_tv.tv_usec / 1000; + auto recv_ret = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&recv_timeout, sizeof(DWORD)); + + const struct timeval send_tv { timeout_params.send_timeout_s.count(), timeout_params.send_timeout_us.count()}; + DWORD send_timeout = send_tv.tv_sec * 1000 + send_tv.tv_usec / 1000; + auto send_ret = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (const char*)&send_timeout, sizeof(DWORD)); + + if (recv_ret == SOCKET_ERROR || send_ret == SOCKET_ERROR) { + throw std::system_error(getSocketErrorCode(), getErrorCategory(), "fail to set socket timeout"); + } #endif }; @@ -244,6 +260,10 @@ Socket::Socket(const NetworkAddress& addr, const SocketTimeoutParams& timeout_pa : handle_(SocketConnect(addr, timeout_params)) {} +Socket::Socket(const NetworkAddress & addr) + : handle_(SocketConnect(addr, SocketTimeoutParams{})) +{} + Socket::Socket(Socket&& other) noexcept : handle_(other.handle_) { diff --git a/clickhouse/base/socket.h b/clickhouse/base/socket.h index b3d916e1..eaee403b 100644 --- a/clickhouse/base/socket.h +++ b/clickhouse/base/socket.h @@ -83,13 +83,16 @@ class SocketFactory { struct SocketTimeoutParams { - const std::chrono::seconds recv_timeout {0}; - const std::chrono::seconds send_timeout {0}; + const std::chrono::seconds recv_timeout_s {0}; + const std::chrono::seconds send_timeout_s {0}; + const std::chrono::microseconds recv_timeout_us{ 0 }; + const std::chrono::microseconds send_timeout_us{ 0 }; }; class Socket : public SocketBase { public: Socket(const NetworkAddress& addr, const SocketTimeoutParams& timeout_params); + Socket(const NetworkAddress& addr); Socket(Socket&& other) noexcept; Socket& operator=(Socket&& other) noexcept; diff --git a/ut/socket_ut.cpp b/ut/socket_ut.cpp index 9e4a2529..5a263435 100644 --- a/ut/socket_ut.cpp +++ b/ut/socket_ut.cpp @@ -18,7 +18,7 @@ TEST(Socketcase, connecterror) { std::this_thread::sleep_for(std::chrono::seconds(1)); try { - Socket socket(addr, SocketTimeoutParams {}); + Socket socket(addr); } catch (const std::system_error& e) { FAIL(); } @@ -26,7 +26,7 @@ TEST(Socketcase, connecterror) { std::this_thread::sleep_for(std::chrono::seconds(1)); server.stop(); try { - Socket socket(addr, SocketTimeoutParams {}); + Socket socket(addr); FAIL(); } catch (const std::system_error& e) { ASSERT_NE(EINPROGRESS,e.code().value()); @@ -49,8 +49,14 @@ TEST(Socketcase, timeoutrecv) { char buf[1024]; ptr_input_stream->Read(buf, sizeof(buf)); - } catch (const std::system_error& e) { - ASSERT_EQ(EAGAIN, e.code().value()); + } + catch (const std::system_error& e) { +#if defined(_unix_) + auto expected = EAGAIN; +#else + auto expected = WSAETIMEDOUT; +#endif + ASSERT_EQ(expected, e.code().value()); } std::this_thread::sleep_for(std::chrono::seconds(1)); From 79c9dd3b7bec74c973a40a10e14f9a9deeef225b Mon Sep 17 00:00:00 2001 From: wangwei <1261385937@qq.com> Date: Thu, 15 Sep 2022 21:46:53 +0800 Subject: [PATCH 3/4] fix macOS compile --- clickhouse/base/socket.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clickhouse/base/socket.cpp b/clickhouse/base/socket.cpp index dbfc11c3..76ffef90 100644 --- a/clickhouse/base/socket.cpp +++ b/clickhouse/base/socket.cpp @@ -114,10 +114,10 @@ void SetNonBlock(SOCKET fd, bool value) { void SetTimeout(SOCKET fd, const SocketTimeoutParams& timeout_params) { #if defined(_unix_) - timeval recv_timeout { timeout_params.recv_timeout_s.count(), timeout_params.recv_timeout_us.count() }; + timeval recv_timeout { timeout_params.recv_timeout_s.count(), static_cast(timeout_params.recv_timeout_us.count()) }; auto recv_ret = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeout, sizeof(recv_timeout)); - timeval send_timeout { timeout_params.send_timeout_s.count(), timeout_params.send_timeout_us.count() }; + timeval send_timeout { timeout_params.send_timeout_s.count(), static_cast(timeout_params.send_timeout_us.count()) }; auto send_ret = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeout, sizeof(send_timeout)); if (recv_ret == -1 || send_ret == -1) { From 9e3491f5a8a61b2ec03c5886fc96e16eee32feef Mon Sep 17 00:00:00 2001 From: wangwei <1261385937@qq.com> Date: Mon, 19 Sep 2022 23:40:24 +0800 Subject: [PATCH 4/4] optimize interface --- clickhouse/base/socket.cpp | 12 +++++------- clickhouse/base/socket.h | 6 ++---- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/clickhouse/base/socket.cpp b/clickhouse/base/socket.cpp index 76ffef90..e9ed5fd2 100644 --- a/clickhouse/base/socket.cpp +++ b/clickhouse/base/socket.cpp @@ -114,22 +114,20 @@ void SetNonBlock(SOCKET fd, bool value) { void SetTimeout(SOCKET fd, const SocketTimeoutParams& timeout_params) { #if defined(_unix_) - timeval recv_timeout { timeout_params.recv_timeout_s.count(), static_cast(timeout_params.recv_timeout_us.count()) }; + timeval recv_timeout{ timeout_params.recv_timeout.count() / 1000, static_cast(timeout_params.recv_timeout.count() % 1000 * 1000) }; auto recv_ret = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeout, sizeof(recv_timeout)); - timeval send_timeout { timeout_params.send_timeout_s.count(), static_cast(timeout_params.send_timeout_us.count()) }; + timeval send_timeout{ timeout_params.send_timeout.count() / 1000, static_cast(timeout_params.send_timeout.count() % 1000 * 1000) }; auto send_ret = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeout, sizeof(send_timeout)); if (recv_ret == -1 || send_ret == -1) { throw std::system_error(getSocketErrorCode(), getErrorCategory(), "fail to set socket timeout"); } #else - const struct timeval recv_tv { timeout_params.recv_timeout_s.count(), timeout_params.recv_timeout_us.count()}; - DWORD recv_timeout = recv_tv.tv_sec * 1000 + recv_tv.tv_usec / 1000; + DWORD recv_timeout = static_cast(timeout_params.recv_timeout.count()); auto recv_ret = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&recv_timeout, sizeof(DWORD)); - - const struct timeval send_tv { timeout_params.send_timeout_s.count(), timeout_params.send_timeout_us.count()}; - DWORD send_timeout = send_tv.tv_sec * 1000 + send_tv.tv_usec / 1000; + + DWORD send_timeout = static_cast(timeout_params.send_timeout.count()); auto send_ret = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (const char*)&send_timeout, sizeof(DWORD)); if (recv_ret == SOCKET_ERROR || send_ret == SOCKET_ERROR) { diff --git a/clickhouse/base/socket.h b/clickhouse/base/socket.h index eaee403b..c68f250d 100644 --- a/clickhouse/base/socket.h +++ b/clickhouse/base/socket.h @@ -83,10 +83,8 @@ class SocketFactory { struct SocketTimeoutParams { - const std::chrono::seconds recv_timeout_s {0}; - const std::chrono::seconds send_timeout_s {0}; - const std::chrono::microseconds recv_timeout_us{ 0 }; - const std::chrono::microseconds send_timeout_us{ 0 }; + std::chrono::milliseconds recv_timeout{ 0 }; + std::chrono::milliseconds send_timeout{ 0 }; }; class Socket : public SocketBase {