diff --git a/clickhouse/block.cpp b/clickhouse/block.cpp index aca77c00..28f0ddc9 100644 --- a/clickhouse/block.cpp +++ b/clickhouse/block.cpp @@ -71,6 +71,11 @@ const BlockInfo& Block::Info() const { return info_; } +/// Set block info +void Block::SetInfo(BlockInfo info) { + info_ = std::move(info); +} + /// Count of rows in the block. size_t Block::GetRowCount() const { return rows_; diff --git a/clickhouse/block.h b/clickhouse/block.h index a647f12d..5b8f57da 100644 --- a/clickhouse/block.h +++ b/clickhouse/block.h @@ -73,6 +73,9 @@ class Block { const BlockInfo& Info() const; + /// Set block info + void SetInfo(BlockInfo info); + /// Count of rows in the block. size_t GetRowCount() const; diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 209f70c1..46a7235e 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -35,8 +35,9 @@ #define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401 #define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405 #define DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA 54410 +#define DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO 54420 -#define REVISION DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA +#define REVISION DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO namespace clickhouse { @@ -408,6 +409,15 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) { return false; } } + if (REVISION >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO) + { + if (!WireFormat::ReadUInt64(*input_, &info.written_rows)) { + return false; + } + if (!WireFormat::ReadUInt64(*input_, &info.written_bytes)) { + return false; + } + } if (events_) { events_->OnProgress(info); @@ -475,7 +485,7 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) { return false; } - // TODO use data + block->SetInfo(std::move(info)); } uint64_t num_columns = 0; diff --git a/clickhouse/query.h b/clickhouse/query.h index ae98690d..d4fbdda5 100644 --- a/clickhouse/query.h +++ b/clickhouse/query.h @@ -48,6 +48,8 @@ struct Progress { uint64_t rows = 0; uint64_t bytes = 0; uint64_t total_rows = 0; + uint64_t written_rows = 0; + uint64_t written_bytes = 0; }; diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index 5cc1b81a..969efb18 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -1010,6 +1010,35 @@ TEST_P(ClientCase, RoundtripArrayTString) { EXPECT_TRUE(CompareRecursive(*array, *result_typed)); } +TEST_P(ClientCase, OnProgress) { + Block block; + createTableWithOneColumn(block); + + std::optional received_progress; + Query query("INSERT INTO " + table_name + " (*) VALUES (\'Foo\'), (\'Bar\')" ); + query.OnProgress([&](const Progress& progress) { + received_progress = progress; + }); + client_->Execute(query); + + ASSERT_TRUE(received_progress.has_value()); + + EXPECT_GE(received_progress->rows, 0u); + EXPECT_LE(received_progress->rows, 2u); + + EXPECT_GE(received_progress->bytes, 0u); + EXPECT_LE(received_progress->bytes, 10000u); + + EXPECT_GE(received_progress->total_rows, 0u); + EXPECT_LE(received_progress->total_rows, 2u); + + EXPECT_GE(received_progress->written_rows, 0u); + EXPECT_LE(received_progress->written_rows, 2u); + + EXPECT_GE(received_progress->written_bytes, 0u); + EXPECT_LE(received_progress->written_bytes, 10000u); +} + const auto LocalHostEndpoint = ClientOptions() .SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost")) .SetPort( getEnvOrDefault("CLICKHOUSE_PORT", "9000"))