From c243cac1cd7ddf74a1eeee05f1c50961200b3ae3 Mon Sep 17 00:00:00 2001 From: Ivan Trofimov Date: Sat, 29 Jan 2022 03:24:07 +0300 Subject: [PATCH 1/9] allow Socket injection into Client --- clickhouse/client.cpp | 58 +++++++++++++++++++++++++++++++++---------- clickhouse/client.h | 3 +++ 2 files changed, 48 insertions(+), 13 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 47f5c370..3dee6d26 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -85,6 +85,7 @@ std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) { class Client::Impl { public: Impl(const ClientOptions& opts); + Impl(const ClientOptions& opts, std::unique_ptr socket); ~Impl(); void ExecuteQuery(Query query); @@ -122,6 +123,8 @@ class Client::Impl { void WriteBlock(const Block& block, OutputStream* output); + void InitializeStreams(std::unique_ptr&& socket); + private: /// In case of network errors tries to reconnect to server and /// call fuc several times. @@ -162,6 +165,8 @@ class Client::Impl { std::unique_ptr socket_; + bool is_external_socket_initialization_{false}; + #if defined(WITH_OPENSSL) std::unique_ptr ssl_context_; #endif @@ -194,6 +199,18 @@ Client::Impl::Impl(const ClientOptions& opts) } } +Client::Impl::Impl(const ClientOptions& opts, std::unique_ptr socket) + : options_(opts) + , events_(nullptr) +{ + InitializeStreams(std::move(socket)); + is_external_socket_initialization_ = true; + + if (options_.compression_method != CompressionMethod::None) { + compression_ = CompressionState::Enable; + } +} + Client::Impl::~Impl() { } @@ -295,7 +312,12 @@ void Client::Impl::Ping() { } } + + void Client::Impl::ResetConnection() { + if (is_external_socket_initialization_) { + throw std::runtime_error("Can not reset connection, socket was provided externally"); + } std::unique_ptr socket; @@ -336,19 +358,7 @@ void Client::Impl::ResetConnection() { socket->SetTcpNoDelay(options_.tcp_nodelay); } - OutputStreams output_streams; - auto socket_output = output_streams.Add(socket->makeOutputStream()); - auto output = output_streams.AddNew(socket_output); - - InputStreams input_streams; - auto socket_input = input_streams.Add(socket->makeInputStream()); - auto input = input_streams.AddNew(socket_input); - - std::swap(output_streams, output_streams_); - std::swap(input_streams, input_streams_); - std::swap(socket, socket_); - output_ = output; - input_ = input; + InitializeStreams(std::move(socket)); #if defined(WITH_OPENSSL) std::swap(ssl_context_, ssl_context); @@ -705,6 +715,22 @@ void Client::Impl::SendData(const Block& block) { output_->Flush(); } +void Client::Impl::InitializeStreams(std::unique_ptr&& socket) { + OutputStreams output_streams; + auto socket_output = output_streams.Add(socket->makeOutputStream()); + auto output = output_streams.AddNew(socket_output); + + InputStreams input_streams; + auto socket_input = input_streams.Add(socket->makeInputStream()); + auto input = input_streams.AddNew(socket_input); + + std::swap(output_streams, output_streams_); + std::swap(input_streams, input_streams_); + std::swap(socket, socket_); + output_ = output; + input_ = input; +} + bool Client::Impl::SendHello() { WireFormat::WriteUInt64(output_, ClientCodes::Hello); WireFormat::WriteString(output_, std::string(DBMS_NAME) + " client"); @@ -796,6 +822,12 @@ Client::Client(const ClientOptions& opts) { } +Client::Client(const ClientOptions& opts, std::unique_ptr socket) + : options_(opts) + , impl_(new Impl(opts, std::move(socket))) +{ +} + Client::~Client() { } diff --git a/clickhouse/client.h b/clickhouse/client.h index 58611f49..aaf91b32 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -163,12 +163,15 @@ struct ClientOptions { std::ostream& operator<<(std::ostream& os, const ClientOptions& options); +class Socket; + /** * */ class Client { public: Client(const ClientOptions& opts); + Client(const ClientOptions& opts, std::unique_ptr socket); ~Client(); /// Intends for execute arbitrary queries. From bcde2985176e7be8be9e281e5dbdde1fd4e6bf01 Mon Sep 17 00:00:00 2001 From: Ivan Trofimov Date: Mon, 31 Jan 2022 22:54:00 +0300 Subject: [PATCH 2/9] add abstract base class for Socket --- clickhouse/base/socket.cpp | 3 +++ clickhouse/base/socket.h | 16 ++++++++++++---- clickhouse/base/sslsocket.h | 2 +- clickhouse/client.cpp | 13 ++++++------- clickhouse/client.h | 2 +- 5 files changed, 23 insertions(+), 13 deletions(-) diff --git a/clickhouse/base/socket.cpp b/clickhouse/base/socket.cpp index b00e3207..8b19898f 100644 --- a/clickhouse/base/socket.cpp +++ b/clickhouse/base/socket.cpp @@ -174,6 +174,9 @@ const std::string & NetworkAddress::Host() const { } +SocketBase::~SocketBase() = default; + + Socket::Socket(const NetworkAddress& addr) : handle_(SocketConnect(addr)) {} diff --git a/clickhouse/base/socket.h b/clickhouse/base/socket.h index 00bda042..74d67501 100644 --- a/clickhouse/base/socket.h +++ b/clickhouse/base/socket.h @@ -47,14 +47,22 @@ class NetworkAddress { struct addrinfo* info_; }; +class SocketBase { +public: + virtual ~SocketBase(); + + virtual std::unique_ptr makeInputStream() const = 0; + virtual std::unique_ptr makeOutputStream() const = 0; +}; + -class Socket { +class Socket : public SocketBase { public: Socket(const NetworkAddress& addr); Socket(Socket&& other) noexcept; Socket& operator=(Socket&& other) noexcept; - virtual ~Socket(); + ~Socket() override; /// @params idle the time (in seconds) the connection needs to remain /// idle before TCP starts sending keepalive probes. @@ -66,8 +74,8 @@ class Socket { /// @params nodelay whether to enable TCP_NODELAY void SetTcpNoDelay(bool nodelay) noexcept; - virtual std::unique_ptr makeInputStream() const; - virtual std::unique_ptr makeOutputStream() const; + std::unique_ptr makeInputStream() const override; + std::unique_ptr makeOutputStream() const override; protected: Socket(const Socket&) = delete; diff --git a/clickhouse/base/sslsocket.h b/clickhouse/base/sslsocket.h index 0a85eef0..5c666162 100644 --- a/clickhouse/base/sslsocket.h +++ b/clickhouse/base/sslsocket.h @@ -44,7 +44,7 @@ class SSLSocket : public Socket { public: explicit SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params, SSLContext& context); SSLSocket(SSLSocket &&) = default; - ~SSLSocket() = default; + ~SSLSocket() override = default; SSLSocket(const SSLSocket & ) = delete; SSLSocket& operator=(const SSLSocket & ) = delete; diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 3dee6d26..5da584f2 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -85,7 +85,7 @@ std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) { class Client::Impl { public: Impl(const ClientOptions& opts); - Impl(const ClientOptions& opts, std::unique_ptr socket); + Impl(const ClientOptions& opts, std::unique_ptr socket); ~Impl(); void ExecuteQuery(Query query); @@ -123,7 +123,7 @@ class Client::Impl { void WriteBlock(const Block& block, OutputStream* output); - void InitializeStreams(std::unique_ptr&& socket); + void InitializeStreams(std::unique_ptr&& socket); private: /// In case of network errors tries to reconnect to server and @@ -163,8 +163,7 @@ class Client::Impl { OutputStreams output_streams_; OutputStream* output_; - std::unique_ptr socket_; - + std::unique_ptr socket_; bool is_external_socket_initialization_{false}; #if defined(WITH_OPENSSL) @@ -199,7 +198,7 @@ Client::Impl::Impl(const ClientOptions& opts) } } -Client::Impl::Impl(const ClientOptions& opts, std::unique_ptr socket) +Client::Impl::Impl(const ClientOptions& opts, std::unique_ptr socket) : options_(opts) , events_(nullptr) { @@ -715,7 +714,7 @@ void Client::Impl::SendData(const Block& block) { output_->Flush(); } -void Client::Impl::InitializeStreams(std::unique_ptr&& socket) { +void Client::Impl::InitializeStreams(std::unique_ptr&& socket) { OutputStreams output_streams; auto socket_output = output_streams.Add(socket->makeOutputStream()); auto output = output_streams.AddNew(socket_output); @@ -822,7 +821,7 @@ Client::Client(const ClientOptions& opts) { } -Client::Client(const ClientOptions& opts, std::unique_ptr socket) +Client::Client(const ClientOptions& opts, std::unique_ptr socket) : options_(opts) , impl_(new Impl(opts, std::move(socket))) { diff --git a/clickhouse/client.h b/clickhouse/client.h index aaf91b32..0d99f5f8 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -171,7 +171,7 @@ class Socket; class Client { public: Client(const ClientOptions& opts); - Client(const ClientOptions& opts, std::unique_ptr socket); + Client(const ClientOptions& opts, std::unique_ptr socket); ~Client(); /// Intends for execute arbitrary queries. From 14470fbb2726684ae58bbf8e419bf4fcd863150d Mon Sep 17 00:00:00 2001 From: Ivan Trofimov Date: Mon, 31 Jan 2022 22:57:09 +0300 Subject: [PATCH 3/9] fix forwarding --- clickhouse/client.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clickhouse/client.h b/clickhouse/client.h index 0d99f5f8..d3e15072 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -163,7 +163,7 @@ struct ClientOptions { std::ostream& operator<<(std::ostream& os, const ClientOptions& options); -class Socket; +class SocketBase; /** * From 1bef85cb8aa23c7ea03d2067ea9b35972ca184c9 Mon Sep 17 00:00:00 2001 From: Ivan Trofimov Date: Wed, 2 Feb 2022 04:47:31 +0300 Subject: [PATCH 4/9] do handshake when initialized externally --- clickhouse/client.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 5da584f2..099a0f4b 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -163,8 +163,8 @@ class Client::Impl { OutputStreams output_streams_; OutputStream* output_; - std::unique_ptr socket_; bool is_external_socket_initialization_{false}; + std::unique_ptr socket_; #if defined(WITH_OPENSSL) std::unique_ptr ssl_context_; @@ -201,9 +201,12 @@ Client::Impl::Impl(const ClientOptions& opts) Client::Impl::Impl(const ClientOptions& opts, std::unique_ptr socket) : options_(opts) , events_(nullptr) + , is_external_socket_initialization_(true) { InitializeStreams(std::move(socket)); - is_external_socket_initialization_ = true; + if (!Handshake()) { + throw std::runtime_error("fail to connect to " + options_.host); + } if (options_.compression_method != CompressionMethod::None) { compression_ = CompressionState::Enable; From d7d5399a9bc56929ef17bb8b3b7d120b20a80c16 Mon Sep 17 00:00:00 2001 From: Ivan Trofimov Date: Wed, 2 Feb 2022 04:59:34 +0300 Subject: [PATCH 5/9] cleanup --- clickhouse/client.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 90464bce..c6f2e31e 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -313,8 +313,6 @@ void Client::Impl::Ping() { } } - - void Client::Impl::ResetConnection() { if (is_external_socket_initialization_) { throw std::runtime_error("Can not reset connection, socket was provided externally"); From 0dca48d5ccb47e6ee88226fc635207eb4aa4ebd0 Mon Sep 17 00:00:00 2001 From: Ivan Trofimov Date: Fri, 4 Feb 2022 00:56:16 +0300 Subject: [PATCH 6/9] rework --- clickhouse/base/socket.cpp | 32 ++++++++++ clickhouse/base/socket.h | 24 +++++++ clickhouse/base/sslsocket.cpp | 32 +++++++++- clickhouse/base/sslsocket.h | 13 +++- clickhouse/client.cpp | 117 +++++++++++++--------------------- clickhouse/client.h | 13 +++- 6 files changed, 153 insertions(+), 78 deletions(-) diff --git a/clickhouse/base/socket.cpp b/clickhouse/base/socket.cpp index e4f27b1f..13159f0e 100644 --- a/clickhouse/base/socket.cpp +++ b/clickhouse/base/socket.cpp @@ -1,5 +1,6 @@ #include "socket.h" #include "singleton.h" +#include "../client.h" #include #include @@ -215,6 +216,8 @@ const std::string & NetworkAddress::Host() const { SocketBase::~SocketBase() = default; +SocketFactory::~SocketFactory() = default; + Socket::Socket(const NetworkAddress& addr) : handle_(SocketConnect(addr)) @@ -289,6 +292,35 @@ std::unique_ptr Socket::makeOutputStream() const { return std::make_unique(handle_); } +NonSecureSocketFactory::~NonSecureSocketFactory() {} + +std::unique_ptr NonSecureSocketFactory::connect(const ClientOptions &opts) { + const auto address = NetworkAddress(opts.host, std::to_string(opts.port)); + + auto socket = doConnect(opts, address); + setSocketOptions(*socket, opts); + + return socket; +} + +std::unique_ptr NonSecureSocketFactory::doConnect(const ClientOptions &opts, + const NetworkAddress& address) { + (void)opts; + return std::make_unique(address); +} + +void NonSecureSocketFactory::setSocketOptions(Socket &socket, const ClientOptions &opts) { + if (opts.tcp_keepalive) { + socket.SetTcpKeepAlive( + static_cast(opts.tcp_keepalive_idle.count()), + static_cast(opts.tcp_keepalive_intvl.count()), + static_cast(opts.tcp_keepalive_cnt)); + } + if (opts.tcp_nodelay) { + socket.SetTcpNoDelay(opts.tcp_nodelay); + } +} + SocketInput::SocketInput(SOCKET s) : s_(s) { diff --git a/clickhouse/base/socket.h b/clickhouse/base/socket.h index e031c333..70a4ef1b 100644 --- a/clickhouse/base/socket.h +++ b/clickhouse/base/socket.h @@ -28,6 +28,8 @@ struct addrinfo; namespace clickhouse { +struct ClientOptions; + /** Address of a host to establish connection to. * */ @@ -67,6 +69,14 @@ class SocketBase { }; +class SocketFactory { +public: + virtual ~SocketFactory(); + + virtual std::unique_ptr connect(const ClientOptions& opts) = 0; +}; + + class Socket : public SocketBase { public: Socket(const NetworkAddress& addr); @@ -97,6 +107,20 @@ class Socket : public SocketBase { }; +class NonSecureSocketFactory : public SocketFactory { +public: + ~NonSecureSocketFactory() override; + + std::unique_ptr connect(const ClientOptions& opts) override; + +protected: + virtual std::unique_ptr doConnect(const ClientOptions& opts, + const NetworkAddress& address); + + void setSocketOptions(Socket& socket, const ClientOptions& opts); +}; + + class SocketInput : public InputStream { public: explicit SocketInput(SOCKET s); diff --git a/clickhouse/base/sslsocket.cpp b/clickhouse/base/sslsocket.cpp index 927fd671..b722bfba 100644 --- a/clickhouse/base/sslsocket.cpp +++ b/clickhouse/base/sslsocket.cpp @@ -1,4 +1,5 @@ #include "sslsocket.h" +#include "../client.h" #include @@ -135,9 +136,11 @@ SSL_CTX * SSLContext::getContext() { << "\n\t handshake state: " << SSL_get_state(ssl_) \ << std::endl */ -SSLSocket::SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params, SSLContext& context) +SSLSocket::SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params, + std::unique_ptr context) : Socket(addr) - , ssl_(SSL_new(context.getContext()), &SSL_free) + , context_(std::move(context)) + , ssl_(SSL_new(context_->getContext()), &SSL_free) { auto ssl = ssl_.get(); if (!ssl) @@ -181,6 +184,31 @@ SSLSocket::SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params, S } } +SSLSocketFactory::~SSLSocketFactory() {} + +std::unique_ptr SSLSocketFactory::doConnect(const ClientOptions& opts, + const NetworkAddress& address) { + std::unique_ptr ssl_context; + const auto ssl_options = opts.ssl_options; + const auto ssl_params = SSLParams{ + ssl_options.path_to_ca_files, + ssl_options.path_to_ca_directory, + ssl_options.use_default_ca_locations, + ssl_options.context_options, + ssl_options.min_protocol_version, + ssl_options.max_protocol_version, + ssl_options.use_sni + }; + + if (ssl_options.ssl_context) + ssl_context = std::make_unique(*ssl_options.ssl_context); + else { + ssl_context = std::make_unique(ssl_params); + } + + return std::make_unique(address, ssl_params, std::move(ssl_context)); +} + std::unique_ptr SSLSocket::makeInputStream() const { return std::make_unique(ssl_.get()); } diff --git a/clickhouse/base/sslsocket.h b/clickhouse/base/sslsocket.h index 5c666162..b33f3eee 100644 --- a/clickhouse/base/sslsocket.h +++ b/clickhouse/base/sslsocket.h @@ -42,7 +42,8 @@ class SSLContext class SSLSocket : public Socket { public: - explicit SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params, SSLContext& context); + explicit SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params, + std::unique_ptr context); SSLSocket(SSLSocket &&) = default; ~SSLSocket() override = default; @@ -53,9 +54,19 @@ class SSLSocket : public Socket { std::unique_ptr makeOutputStream() const override; private: + std::unique_ptr context_; std::unique_ptr ssl_; }; +class SSLSocketFactory : public NonSecureSocketFactory { +public: + ~SSLSocketFactory() override; + +protected: + std::unique_ptr doConnect(const ClientOptions& opts, + const NetworkAddress& address) override; +}; + class SSLSocketInput : public InputStream { public: explicit SSLSocketInput(SSL *ssl); diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index c6f2e31e..4bf9b0bf 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -81,10 +81,36 @@ std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) { return os; } + +namespace { + +std::unique_ptr GetSocketFactory(const ClientOptions& opts) { + (void)opts; +#if defined(WITH_OPENSSL) + if (opts.ssl_options.use_ssl) + return std::make_unique(); + else +#endif + return std::make_unique(); +} + +std::unique_ptr GetDefaultSleepImpl() { + return std::make_unique(); +} + +} + +void SleepImpl::sleepFor(const std::chrono::milliseconds &duration) { + std::this_thread::sleep_for(duration); +} + + class Client::Impl { public: Impl(const ClientOptions& opts); - Impl(const ClientOptions& opts, std::unique_ptr socket); + Impl(const ClientOptions& opts, + std::unique_ptr socket_factory, + std::unique_ptr sleep_impl); ~Impl(); void ExecuteQuery(Query query); @@ -162,7 +188,8 @@ class Client::Impl { OutputStreams output_streams_; OutputStream* output_; - bool is_external_socket_initialization_{false}; + std::unique_ptr socket_factory_; + std::unique_ptr sleep_impl_; std::unique_ptr socket_; #if defined(WITH_OPENSSL) @@ -174,12 +201,17 @@ class Client::Impl { Client::Impl::Impl(const ClientOptions& opts) + : Impl(opts, GetSocketFactory(opts), GetDefaultSleepImpl()) {} + +Client::Impl::Impl(const ClientOptions& opts, + std::unique_ptr socket_factory, + std::unique_ptr sleep_impl) : options_(opts) , events_(nullptr) + , socket_factory_(std::move(socket_factory)) + , sleep_impl_(std::move(sleep_impl)) { - // TODO: throw on big-endianness of platform - - for (unsigned int i = 0; ; ) { + for (unsigned int i = 0;; ++i) { try { ResetConnection(); break; @@ -188,7 +220,7 @@ Client::Impl::Impl(const ClientOptions& opts) throw; } - std::this_thread::sleep_for(options_.retry_timeout); + sleep_impl_->sleepFor(options_.retry_timeout); } } @@ -197,21 +229,6 @@ Client::Impl::Impl(const ClientOptions& opts) } } -Client::Impl::Impl(const ClientOptions& opts, std::unique_ptr socket) - : options_(opts) - , events_(nullptr) - , is_external_socket_initialization_(true) -{ - InitializeStreams(std::move(socket)); - if (!Handshake()) { - throw std::runtime_error("fail to connect to " + options_.host); - } - - if (options_.compression_method != CompressionMethod::None) { - compression_ = CompressionState::Enable; - } -} - Client::Impl::~Impl() { } @@ -314,55 +331,7 @@ void Client::Impl::Ping() { } void Client::Impl::ResetConnection() { - if (is_external_socket_initialization_) { - throw std::runtime_error("Can not reset connection, socket was provided externally"); - } - - std::unique_ptr socket; - - const auto address = NetworkAddress(options_.host, std::to_string(options_.port)); -#if defined(WITH_OPENSSL) - // TODO: maybe do not re-create context multiple times upon reconnection - that doesn't make sense. - std::unique_ptr ssl_context; - if (options_.ssl_options.use_ssl) { - const auto ssl_options = options_.ssl_options; - const auto ssl_params = SSLParams { - ssl_options.path_to_ca_files, - ssl_options.path_to_ca_directory, - ssl_options.use_default_ca_locations, - ssl_options.context_options, - ssl_options.min_protocol_version, - ssl_options.max_protocol_version, - ssl_options.use_sni - }; - - if (ssl_options.ssl_context) - ssl_context = std::make_unique(*ssl_options.ssl_context); - else { - ssl_context = std::make_unique(ssl_params); - } - - socket = std::make_unique(address, ssl_params, *ssl_context); - } - else -#endif - socket = std::make_unique(address); - - if (options_.tcp_keepalive) { - socket->SetTcpKeepAlive( - static_cast(options_.tcp_keepalive_idle.count()), - static_cast(options_.tcp_keepalive_intvl.count()), - static_cast(options_.tcp_keepalive_cnt)); - } - if (options_.tcp_nodelay) { - socket->SetTcpNoDelay(options_.tcp_nodelay); - } - - InitializeStreams(std::move(socket)); - -#if defined(WITH_OPENSSL) - std::swap(ssl_context_, ssl_context); -#endif + InitializeStreams(socket_factory_->connect(options_)); if (!Handshake()) { throw std::runtime_error("fail to connect to " + options_.host); @@ -803,7 +772,7 @@ void Client::Impl::RetryGuard(std::function func) { bool ok = true; try { - std::this_thread::sleep_for(options_.retry_timeout); + sleep_impl_->sleepFor(options_.retry_timeout); ResetConnection(); } catch (...) { ok = false; @@ -822,9 +791,11 @@ Client::Client(const ClientOptions& opts) { } -Client::Client(const ClientOptions& opts, std::unique_ptr socket) +Client::Client(const ClientOptions& opts, + std::unique_ptr socket_factory, + std::unique_ptr sleep_impl) : options_(opts) - , impl_(new Impl(opts, std::move(socket))) + , impl_(new Impl(opts, std::move(socket_factory), std::move(sleep_impl))) { } diff --git a/clickhouse/client.h b/clickhouse/client.h index d3e15072..87b1a2ad 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -163,7 +163,14 @@ struct ClientOptions { std::ostream& operator<<(std::ostream& os, const ClientOptions& options); -class SocketBase; +class SocketFactory; + +class SleepImpl { +public: + virtual ~SleepImpl() = default; + + virtual void sleepFor(const std::chrono::milliseconds& duration); +}; /** * @@ -171,7 +178,9 @@ class SocketBase; class Client { public: Client(const ClientOptions& opts); - Client(const ClientOptions& opts, std::unique_ptr socket); + Client(const ClientOptions& opts, + std::unique_ptr socket_factory, + std::unique_ptr sleep_impl); ~Client(); /// Intends for execute arbitrary queries. From e26d7851d13aff911506fb5bf648ab9c18764df9 Mon Sep 17 00:00:00 2001 From: Ivan Trofimov Date: Fri, 4 Feb 2022 01:01:27 +0300 Subject: [PATCH 7/9] oops --- clickhouse/client.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 4bf9b0bf..05763351 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -211,7 +211,7 @@ Client::Impl::Impl(const ClientOptions& opts, , socket_factory_(std::move(socket_factory)) , sleep_impl_(std::move(sleep_impl)) { - for (unsigned int i = 0;; ++i) { + for (unsigned int i = 0; ; ) { try { ResetConnection(); break; From aa8a7d006249f7ecb98ae58cfe7409c5ed8221a7 Mon Sep 17 00:00:00 2001 From: Ivan Trofimov Date: Fri, 4 Feb 2022 01:23:56 +0300 Subject: [PATCH 8/9] build fix --- clickhouse/base/sslsocket.cpp | 2 ++ clickhouse/base/sslsocket.h | 2 ++ 2 files changed, 4 insertions(+) diff --git a/clickhouse/base/sslsocket.cpp b/clickhouse/base/sslsocket.cpp index b722bfba..068dfddb 100644 --- a/clickhouse/base/sslsocket.cpp +++ b/clickhouse/base/sslsocket.cpp @@ -186,6 +186,7 @@ SSLSocket::SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params, SSLSocketFactory::~SSLSocketFactory() {} +#if defined(WITH_OPENSSL) std::unique_ptr SSLSocketFactory::doConnect(const ClientOptions& opts, const NetworkAddress& address) { std::unique_ptr ssl_context; @@ -208,6 +209,7 @@ std::unique_ptr SSLSocketFactory::doConnect(const ClientOptions& opts, return std::make_unique(address, ssl_params, std::move(ssl_context)); } +#endif std::unique_ptr SSLSocket::makeInputStream() const { return std::make_unique(ssl_.get()); diff --git a/clickhouse/base/sslsocket.h b/clickhouse/base/sslsocket.h index b33f3eee..56f6979c 100644 --- a/clickhouse/base/sslsocket.h +++ b/clickhouse/base/sslsocket.h @@ -62,9 +62,11 @@ class SSLSocketFactory : public NonSecureSocketFactory { public: ~SSLSocketFactory() override; +#if defined(WITH_OPENSSL) protected: std::unique_ptr doConnect(const ClientOptions& opts, const NetworkAddress& address) override; +#endif }; class SSLSocketInput : public InputStream { From f0aa52b5bcc9a78e544c7581f6194f24fdd79f31 Mon Sep 17 00:00:00 2001 From: Ivan Trofimov Date: Mon, 7 Feb 2022 15:10:57 +0300 Subject: [PATCH 9/9] cr fixes --- clickhouse/base/socket.cpp | 11 +++++--- clickhouse/base/socket.h | 8 ++++-- clickhouse/base/sslsocket.cpp | 53 ++++++++++++++++++----------------- clickhouse/base/sslsocket.h | 13 +++++---- clickhouse/client.cpp | 29 ++++++------------- clickhouse/client.h | 10 +------ 6 files changed, 56 insertions(+), 68 deletions(-) diff --git a/clickhouse/base/socket.cpp b/clickhouse/base/socket.cpp index 13159f0e..a11cd2f8 100644 --- a/clickhouse/base/socket.cpp +++ b/clickhouse/base/socket.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #if !defined(_win_) # include @@ -218,6 +219,10 @@ SocketBase::~SocketBase() = default; SocketFactory::~SocketFactory() = default; +void SocketFactory::sleepFor(const std::chrono::milliseconds& duration) { + std::this_thread::sleep_for(duration); +} + Socket::Socket(const NetworkAddress& addr) : handle_(SocketConnect(addr)) @@ -297,15 +302,13 @@ NonSecureSocketFactory::~NonSecureSocketFactory() {} std::unique_ptr NonSecureSocketFactory::connect(const ClientOptions &opts) { const auto address = NetworkAddress(opts.host, std::to_string(opts.port)); - auto socket = doConnect(opts, address); + auto socket = doConnect(address); setSocketOptions(*socket, opts); return socket; } -std::unique_ptr NonSecureSocketFactory::doConnect(const ClientOptions &opts, - const NetworkAddress& address) { - (void)opts; +std::unique_ptr NonSecureSocketFactory::doConnect(const NetworkAddress& address) { return std::make_unique(address); } diff --git a/clickhouse/base/socket.h b/clickhouse/base/socket.h index 70a4ef1b..e7cacc19 100644 --- a/clickhouse/base/socket.h +++ b/clickhouse/base/socket.h @@ -6,6 +6,7 @@ #include #include +#include #if defined(_win_) # include @@ -73,7 +74,11 @@ class SocketFactory { public: virtual ~SocketFactory(); + // TODO: move connection-related options to ConnectionOptions structure. + virtual std::unique_ptr connect(const ClientOptions& opts) = 0; + + virtual void sleepFor(const std::chrono::milliseconds& duration); }; @@ -114,8 +119,7 @@ class NonSecureSocketFactory : public SocketFactory { std::unique_ptr connect(const ClientOptions& opts) override; protected: - virtual std::unique_ptr doConnect(const ClientOptions& opts, - const NetworkAddress& address); + virtual std::unique_ptr doConnect(const NetworkAddress& address); void setSocketOptions(Socket& socket, const ClientOptions& opts); }; diff --git a/clickhouse/base/sslsocket.cpp b/clickhouse/base/sslsocket.cpp index 068dfddb..641ff014 100644 --- a/clickhouse/base/sslsocket.cpp +++ b/clickhouse/base/sslsocket.cpp @@ -101,6 +101,19 @@ SSL_CTX * prepareSSLContext(const clickhouse::SSLParams & context_params) { #undef HANDLE_SSL_CTX_ERROR } +clickhouse::SSLParams GetSSLParams(const clickhouse::ClientOptions& opts) { + const auto& ssl_options = opts.ssl_options; + return clickhouse::SSLParams{ + ssl_options.path_to_ca_files, + ssl_options.path_to_ca_directory, + ssl_options.use_default_ca_locations, + ssl_options.context_options, + ssl_options.min_protocol_version, + ssl_options.max_protocol_version, + ssl_options.use_sni + }; +} + } namespace clickhouse { @@ -137,10 +150,9 @@ SSL_CTX * SSLContext::getContext() { << std::endl */ SSLSocket::SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params, - std::unique_ptr context) + SSLContext& context) : Socket(addr) - , context_(std::move(context)) - , ssl_(SSL_new(context_->getContext()), &SSL_free) + , ssl_(SSL_new(context.getContext()), &SSL_free) { auto ssl = ssl_.get(); if (!ssl) @@ -184,32 +196,21 @@ SSLSocket::SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params, } } -SSLSocketFactory::~SSLSocketFactory() {} - -#if defined(WITH_OPENSSL) -std::unique_ptr SSLSocketFactory::doConnect(const ClientOptions& opts, - const NetworkAddress& address) { - std::unique_ptr ssl_context; - const auto ssl_options = opts.ssl_options; - const auto ssl_params = SSLParams{ - ssl_options.path_to_ca_files, - ssl_options.path_to_ca_directory, - ssl_options.use_default_ca_locations, - ssl_options.context_options, - ssl_options.min_protocol_version, - ssl_options.max_protocol_version, - ssl_options.use_sni - }; - - if (ssl_options.ssl_context) - ssl_context = std::make_unique(*ssl_options.ssl_context); - else { - ssl_context = std::make_unique(ssl_params); +SSLSocketFactory::SSLSocketFactory(const ClientOptions& opts) + : NonSecureSocketFactory() + , ssl_params_(GetSSLParams(opts)) { + if (opts.ssl_options.ssl_context) { + ssl_context_ = std::make_unique(*opts.ssl_options.ssl_context); + } else { + ssl_context_ = std::make_unique(ssl_params_); } +} + +SSLSocketFactory::~SSLSocketFactory() = default; - return std::make_unique(address, ssl_params, std::move(ssl_context)); +std::unique_ptr SSLSocketFactory::doConnect(const NetworkAddress& address) { + return std::make_unique(address, ssl_params_, *ssl_context_); } -#endif std::unique_ptr SSLSocket::makeInputStream() const { return std::make_unique(ssl_.get()); diff --git a/clickhouse/base/sslsocket.h b/clickhouse/base/sslsocket.h index 56f6979c..bd478975 100644 --- a/clickhouse/base/sslsocket.h +++ b/clickhouse/base/sslsocket.h @@ -43,7 +43,7 @@ class SSLContext class SSLSocket : public Socket { public: explicit SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params, - std::unique_ptr context); + SSLContext& context); SSLSocket(SSLSocket &&) = default; ~SSLSocket() override = default; @@ -54,19 +54,20 @@ class SSLSocket : public Socket { std::unique_ptr makeOutputStream() const override; private: - std::unique_ptr context_; std::unique_ptr ssl_; }; class SSLSocketFactory : public NonSecureSocketFactory { public: + explicit SSLSocketFactory(const ClientOptions& opts); ~SSLSocketFactory() override; -#if defined(WITH_OPENSSL) protected: - std::unique_ptr doConnect(const ClientOptions& opts, - const NetworkAddress& address) override; -#endif + std::unique_ptr doConnect(const NetworkAddress& address) override; + +private: + const SSLParams ssl_params_; + std::unique_ptr ssl_context_; }; class SSLSocketInput : public InputStream { diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 05763351..2bb5c593 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -88,20 +88,12 @@ std::unique_ptr GetSocketFactory(const ClientOptions& opts) { (void)opts; #if defined(WITH_OPENSSL) if (opts.ssl_options.use_ssl) - return std::make_unique(); + return std::make_unique(opts); else #endif return std::make_unique(); } -std::unique_ptr GetDefaultSleepImpl() { - return std::make_unique(); -} - -} - -void SleepImpl::sleepFor(const std::chrono::milliseconds &duration) { - std::this_thread::sleep_for(duration); } @@ -109,8 +101,7 @@ class Client::Impl { public: Impl(const ClientOptions& opts); Impl(const ClientOptions& opts, - std::unique_ptr socket_factory, - std::unique_ptr sleep_impl); + std::unique_ptr socket_factory); ~Impl(); void ExecuteQuery(Query query); @@ -189,7 +180,6 @@ class Client::Impl { OutputStream* output_; std::unique_ptr socket_factory_; - std::unique_ptr sleep_impl_; std::unique_ptr socket_; #if defined(WITH_OPENSSL) @@ -201,15 +191,13 @@ class Client::Impl { Client::Impl::Impl(const ClientOptions& opts) - : Impl(opts, GetSocketFactory(opts), GetDefaultSleepImpl()) {} + : Impl(opts, GetSocketFactory(opts)) {} Client::Impl::Impl(const ClientOptions& opts, - std::unique_ptr socket_factory, - std::unique_ptr sleep_impl) + std::unique_ptr socket_factory) : options_(opts) , events_(nullptr) , socket_factory_(std::move(socket_factory)) - , sleep_impl_(std::move(sleep_impl)) { for (unsigned int i = 0; ; ) { try { @@ -220,7 +208,7 @@ Client::Impl::Impl(const ClientOptions& opts, throw; } - sleep_impl_->sleepFor(options_.retry_timeout); + socket_factory_->sleepFor(options_.retry_timeout); } } @@ -772,7 +760,7 @@ void Client::Impl::RetryGuard(std::function func) { bool ok = true; try { - sleep_impl_->sleepFor(options_.retry_timeout); + socket_factory_->sleepFor(options_.retry_timeout); ResetConnection(); } catch (...) { ok = false; @@ -792,10 +780,9 @@ Client::Client(const ClientOptions& opts) } Client::Client(const ClientOptions& opts, - std::unique_ptr socket_factory, - std::unique_ptr sleep_impl) + std::unique_ptr socket_factory) : options_(opts) - , impl_(new Impl(opts, std::move(socket_factory), std::move(sleep_impl))) + , impl_(new Impl(opts, std::move(socket_factory))) { } diff --git a/clickhouse/client.h b/clickhouse/client.h index 87b1a2ad..8f48cdaa 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -165,13 +165,6 @@ std::ostream& operator<<(std::ostream& os, const ClientOptions& options); class SocketFactory; -class SleepImpl { -public: - virtual ~SleepImpl() = default; - - virtual void sleepFor(const std::chrono::milliseconds& duration); -}; - /** * */ @@ -179,8 +172,7 @@ class Client { public: Client(const ClientOptions& opts); Client(const ClientOptions& opts, - std::unique_ptr socket_factory, - std::unique_ptr sleep_impl); + std::unique_ptr socket_factory); ~Client(); /// Intends for execute arbitrary queries.