From 5e88c2f3e0cb83d62cf9b6d0636885283f11a2fa Mon Sep 17 00:00:00 2001 From: den818 Date: Sun, 9 Oct 2022 13:15:54 +0400 Subject: [PATCH 1/6] feat support CLIENT_WRITE_INFO --- clickhouse/client.cpp | 12 +++++++++++- clickhouse/query.h | 3 +++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 209f70c1..4ef5ca70 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -35,8 +35,9 @@ #define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401 #define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405 #define DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA 54410 +#define DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO 54420 -#define REVISION DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA +#define REVISION DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO namespace clickhouse { @@ -408,6 +409,15 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) { return false; } } + if (REVISION >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO) + { + if (!WireFormat::ReadUInt64(*input_, &info.written_rows)) { + return false; + } + if (!WireFormat::ReadUInt64(*input_, &info.written_bytes)) { + return false; + } + } if (events_) { events_->OnProgress(info); diff --git a/clickhouse/query.h b/clickhouse/query.h index ae98690d..5a0319d8 100644 --- a/clickhouse/query.h +++ b/clickhouse/query.h @@ -48,6 +48,8 @@ struct Progress { uint64_t rows = 0; uint64_t bytes = 0; uint64_t total_rows = 0; + uint64_t written_rows = 0; + uint64_t written_bytes = 0; }; @@ -92,6 +94,7 @@ class Query : public QueryEvents { return query_id_; } + /// Set handler for receiving result data. inline Query& OnData(SelectCallback cb) { select_cb_ = std::move(cb); From 576ef2b52cd3bf5a0de67e6ce948eecdd3cdd329 Mon Sep 17 00:00:00 2001 From: den818 Date: Sun, 9 Oct 2022 13:46:36 +0400 Subject: [PATCH 2/6] fix codestyle --- clickhouse/query.h | 1 - 1 file changed, 1 deletion(-) diff --git a/clickhouse/query.h b/clickhouse/query.h index 5a0319d8..d4fbdda5 100644 --- a/clickhouse/query.h +++ b/clickhouse/query.h @@ -94,7 +94,6 @@ class Query : public QueryEvents { return query_id_; } - /// Set handler for receiving result data. inline Query& OnData(SelectCallback cb) { select_cb_ = std::move(cb); From 72504b25eeadbc6217aae5ccf7abea5f1522d1ef Mon Sep 17 00:00:00 2001 From: den818 Date: Mon, 17 Oct 2022 23:20:26 +0400 Subject: [PATCH 3/6] tests --- clickhouse/block.cpp | 5 +++++ clickhouse/block.h | 3 +++ clickhouse/client.cpp | 2 +- ut/client_ut.cpp | 17 +++++++++++++++++ 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/clickhouse/block.cpp b/clickhouse/block.cpp index aca77c00..28f0ddc9 100644 --- a/clickhouse/block.cpp +++ b/clickhouse/block.cpp @@ -71,6 +71,11 @@ const BlockInfo& Block::Info() const { return info_; } +/// Set block info +void Block::SetInfo(BlockInfo info) { + info_ = std::move(info); +} + /// Count of rows in the block. size_t Block::GetRowCount() const { return rows_; diff --git a/clickhouse/block.h b/clickhouse/block.h index a647f12d..5b8f57da 100644 --- a/clickhouse/block.h +++ b/clickhouse/block.h @@ -73,6 +73,9 @@ class Block { const BlockInfo& Info() const; + /// Set block info + void SetInfo(BlockInfo info); + /// Count of rows in the block. size_t GetRowCount() const; diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 4ef5ca70..46a7235e 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -485,7 +485,7 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) { return false; } - // TODO use data + block->SetInfo(std::move(info)); } uint64_t num_columns = 0; diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index 5cc1b81a..9e32fd77 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -1010,6 +1010,23 @@ TEST_P(ClientCase, RoundtripArrayTString) { EXPECT_TRUE(CompareRecursive(*array, *result_typed)); } +TEST_P(ClientCase, WriteInfo) { + Block block; + createTableWithOneColumn(block); + + std::optional received_progress; + Query query("INSERT INTO " + table_name + " (*) VALUES (\'Foo\'), (\'Bar\')" ); + query.OnProgress([&](const Progress& progress) { + received_progress = progress; + }); + client_->Execute(query); + + EXPECT_TRUE(received_progress.has_value()); + // server for some reason sent "rows" instead "written_rows" + EXPECT_GT(received_progress->rows + received_progress->written_rows , 0ul); + EXPECT_GT(received_progress->bytes + received_progress->written_bytes, 0ul); +} + const auto LocalHostEndpoint = ClientOptions() .SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost")) .SetPort( getEnvOrDefault("CLICKHOUSE_PORT", "9000")) From 058fd628e4dee7420d4470103566743f43f803cd Mon Sep 17 00:00:00 2001 From: den818 Date: Mon, 17 Oct 2022 23:39:49 +0400 Subject: [PATCH 4/6] fix test --- ut/client_ut.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index 9e32fd77..ae1b28be 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -1010,7 +1010,7 @@ TEST_P(ClientCase, RoundtripArrayTString) { EXPECT_TRUE(CompareRecursive(*array, *result_typed)); } -TEST_P(ClientCase, WriteInfo) { +TEST_P(ClientCase, OnProgress) { Block block; createTableWithOneColumn(block); @@ -1022,9 +1022,8 @@ TEST_P(ClientCase, WriteInfo) { client_->Execute(query); EXPECT_TRUE(received_progress.has_value()); - // server for some reason sent "rows" instead "written_rows" - EXPECT_GT(received_progress->rows + received_progress->written_rows , 0ul); - EXPECT_GT(received_progress->bytes + received_progress->written_bytes, 0ul); + // Unfortunately server has different behavior in different version. + // So checking value of rows, bytes, etc is absolutely useless } const auto LocalHostEndpoint = ClientOptions() From 6aa45e68232a01f8a06e4b594aa05ab7dee66ffe Mon Sep 17 00:00:00 2001 From: den818 Date: Tue, 18 Oct 2022 15:40:46 +0400 Subject: [PATCH 5/6] fix tests --- ut/client_ut.cpp | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index ae1b28be..7cb57fa5 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -1022,8 +1022,21 @@ TEST_P(ClientCase, OnProgress) { client_->Execute(query); EXPECT_TRUE(received_progress.has_value()); - // Unfortunately server has different behavior in different version. - // So checking value of rows, bytes, etc is absolutely useless + + EXPECT_GE(received_progress->rows, 0u); + EXPECT_LE(received_progress->rows, 2u); + + EXPECT_GE(received_progress->bytes, 0u); + EXPECT_LE(received_progress->bytes, 10000u); + + EXPECT_GE(received_progress->total_rows, 0u); + EXPECT_LE(received_progress->total_rows, 2u); + + EXPECT_GE(received_progress->written_rows, 0u); + EXPECT_LE(received_progress->written_rows, 2u); + + EXPECT_GE(received_progress->written_bytes, 0u); + EXPECT_LE(received_progress->written_bytes, 10000u); } const auto LocalHostEndpoint = ClientOptions() From 85f8c35d4577850b0c0062620055be7f9fb4ce37 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 18 Oct 2022 16:58:58 +0400 Subject: [PATCH 6/6] Minor test fix Test driver wouldn't crash if client hasn't received the progress --- ut/client_ut.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index 7cb57fa5..969efb18 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -1021,7 +1021,7 @@ TEST_P(ClientCase, OnProgress) { }); client_->Execute(query); - EXPECT_TRUE(received_progress.has_value()); + ASSERT_TRUE(received_progress.has_value()); EXPECT_GE(received_progress->rows, 0u); EXPECT_LE(received_progress->rows, 2u);