Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ClickHouse C++ client [![Build Status](https://travis-ci.org/ClickHouse/clickhouse-cpp.svg?branch=master)](https://travis-ci.org/ClickHouse/clickhouse-cpp)
ClickHouse C++ client [![Linux](https://github.com/ClickHouse/clickhouse-cpp/actions/workflows/linux.yml/badge.svg)](https://github.com/ClickHouse/clickhouse-cpp/actions/workflows/linux.yml) [![macOS](https://github.com/ClickHouse/clickhouse-cpp/actions/workflows/macos.yml/badge.svg)](https://github.com/ClickHouse/clickhouse-cpp/actions/workflows/macos.yml)
=====

C++ client for [ClickHouse](https://clickhouse.tech/).
C++ client for [ClickHouse](https://clickhouse.com/).

## Supported data types

Expand Down
3 changes: 1 addition & 2 deletions clickhouse/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
SET ( clickhouse-cpp-lib-src
base/coded.cpp
base/compressed.cpp
base/input.cpp
base/output.cpp
base/platform.cpp
base/socket.cpp
base/wire_format.cpp

columns/array.cpp
columns/date.cpp
Expand Down Expand Up @@ -72,7 +72,6 @@ INSTALL(FILES query.h DESTINATION include/clickhouse/)

# base
INSTALL(FILES base/buffer.h DESTINATION include/clickhouse/base/)
INSTALL(FILES base/coded.h DESTINATION include/clickhouse/base/)
INSTALL(FILES base/compressed.h DESTINATION include/clickhouse/base/)
INSTALL(FILES base/input.h DESTINATION include/clickhouse/base/)
INSTALL(FILES base/output.h DESTINATION include/clickhouse/base/)
Expand Down
100 changes: 0 additions & 100 deletions clickhouse/base/coded.cpp

This file was deleted.

65 changes: 0 additions & 65 deletions clickhouse/base/coded.h

This file was deleted.

75 changes: 71 additions & 4 deletions clickhouse/base/compressed.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
#include "compressed.h"
#include "wire_format.h"
#include "output.h"

#include <cityhash/city.h>
#include <lz4/lz4.h>
#include <stdexcept>
#include <system_error>


namespace {
static const size_t HEADER_SIZE = 9;
static const size_t EXTRA_PREALLOCATE_COMPRESS_BUFFER = 15;
static const uint8_t COMPRESSION_METHOD = 0x82;
#define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL // 1GB
}

namespace clickhouse {

CompressedInput::CompressedInput(CodedInputStream* input)
CompressedInput::CompressedInput(InputStream* input)
: input_(input)
{
}
Expand Down Expand Up @@ -50,7 +57,7 @@ bool CompressedInput::Decompress() {
return false;
}

if (method != 0x82) {
if (method != COMPRESSION_METHOD) {
throw std::runtime_error("unsupported compression method " +
std::to_string(int(method)));
} else {
Expand All @@ -75,7 +82,7 @@ bool CompressedInput::Decompress() {
out.Write(&original, sizeof(original));
}

if (!WireFormat::ReadBytes(input_, tmp.data() + 9, compressed - 9)) {
if (!WireFormat::ReadBytes(input_, tmp.data() + HEADER_SIZE, compressed - HEADER_SIZE)) {
return false;
} else {
if (hash != CityHash128((const char*)tmp.data(), compressed)) {
Expand All @@ -85,7 +92,7 @@ bool CompressedInput::Decompress() {

data_ = Buffer(original);

if (LZ4_decompress_safe((const char*)tmp.data() + 9, (char*)data_.data(), compressed - 9, original) < 0) {
if (LZ4_decompress_safe((const char*)tmp.data() + HEADER_SIZE, (char*)data_.data(), compressed - HEADER_SIZE, original) < 0) {
throw std::runtime_error("can't decompress data");
} else {
mem_.Reset(data_.data(), original);
Expand All @@ -95,4 +102,64 @@ bool CompressedInput::Decompress() {
return true;
}


CompressedOutput::CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size)
: destination_(destination),
max_compressed_chunk_size_(max_compressed_chunk_size)
{
}

CompressedOutput::~CompressedOutput() {
Flush();
}

size_t CompressedOutput::DoWrite(const void* data, size_t len) {
const size_t original_len = len;
const size_t max_chunk_size = max_compressed_chunk_size_ ? max_compressed_chunk_size_ : len;

while (len > 0)
{
auto to_compress = std::min(len, max_chunk_size);
if (!Compress(data, to_compress))
break;

len -= to_compress;
data = reinterpret_cast<const char*>(data) + to_compress;
}

return original_len - len;
}

void CompressedOutput::DoFlush() {
destination_->Flush();
}

bool CompressedOutput::Compress(const void * data, size_t len) {

const size_t expected_out_size = LZ4_compressBound(len);
compressed_buffer_.resize(std::max(compressed_buffer_.size(), expected_out_size + HEADER_SIZE + EXTRA_PREALLOCATE_COMPRESS_BUFFER));

const int compressed_size = LZ4_compress_default(
(const char*)data,
(char*)compressed_buffer_.data() + HEADER_SIZE,
len,
compressed_buffer_.size() - HEADER_SIZE);

{
auto header = compressed_buffer_.data();
WriteUnaligned(header, COMPRESSION_METHOD);
// Compressed data size with header
WriteUnaligned(header + 1, static_cast<uint32_t>(compressed_size + HEADER_SIZE));
// Original data size
WriteUnaligned(header + 5, static_cast<uint32_t>(len));
}

WireFormat::WriteFixed(destination_, CityHash128(
(const char*)compressed_buffer_.data(), compressed_size + HEADER_SIZE));
WireFormat::WriteBytes(destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);

destination_->Flush();
return true;
}

}
25 changes: 22 additions & 3 deletions clickhouse/base/compressed.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#pragma once

#include "coded.h"
#include "input.h"
#include "output.h"
#include "buffer.h"

namespace clickhouse {

class CompressedInput : public ZeroCopyInput {
public:
CompressedInput(CodedInputStream* input);
CompressedInput(InputStream* input);
~CompressedInput();

protected:
Expand All @@ -15,10 +17,27 @@ class CompressedInput : public ZeroCopyInput {
bool Decompress();

private:
CodedInputStream* const input_;
InputStream* const input_;

Buffer data_;
ArrayInput mem_;
};

class CompressedOutput : public OutputStream {
public:
CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size = 0);
~CompressedOutput();

protected:
size_t DoWrite(const void* data, size_t len) override;
void DoFlush() override;
bool Compress(const void * data, size_t len);


private:
OutputStream * destination_;
Buffer compressed_buffer_;
size_t max_compressed_chunk_size_;
};

}
15 changes: 15 additions & 0 deletions clickhouse/base/input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@

namespace clickhouse {

bool ZeroCopyInput::Skip(size_t bytes) {
while (bytes > 0) {
const void* ptr;
size_t len = Next(&ptr, bytes);

if (len == 0) {
return false;
}

bytes -= len;
}

return true;
}

size_t ZeroCopyInput::DoRead(void* buf, size_t len) {
const void* ptr;
size_t result = DoNext(&ptr, len);
Expand Down
Loading