Skip to content

feat(connectors): Clickhouse Sink Connector#2886

Open
kriti-sc wants to merge 4 commits intoapache:masterfrom
kriti-sc:clickhouse-sink
Open

feat(connectors): Clickhouse Sink Connector#2886
kriti-sc wants to merge 4 commits intoapache:masterfrom
kriti-sc:clickhouse-sink

Conversation

@kriti-sc
Copy link
Copy Markdown
Contributor

@kriti-sc kriti-sc commented Mar 6, 2026

Which issue does this PR close?

Closes #2539

Rationale

Clickhouse is a real-time data analytics engine, and very popular in modern analytics architectures.

What changed?

This PR introduces a Clickhouse Sink Connector that enables writing data from Iggy to Clickhouse.

The Clickhouse writing logic is heavily inspired by the official Clickhouse Kafka Connector.

Local Execution

  • Produced messages 30456 + 29060 rows with schema user_id: String, user_type: u8, email: String, source: String, state: String, created_at: DateTime, message: String using sample data producer.
  • Consumed messages using the Clickhouse sink and into the particular Clickhouse table.
  • Verified schema and number of rows in Clickhouse.
  • Added unit tests and e2e tests, both passing.

Images 1&2: Produced 30456 + 29060 rows into Iggy in two batches
Image 3: Verified schema and number of rows in Clickhouse

image image image

AI Usage

  1. Which tools? (e.g., GitHub Copilot, Claude, ChatGPT) Claude Code
  2. Scope of usage? (e.g., autocomplete, generated functions, entire implementation) generated functions
  3. How did you verify the generated code works correctly? Manual testing by producing data into Iggy and then running the sink and verifying insertion into Clickhouse, unit tests and e2e tests for different Clickhouse insert configurations.
  4. Can you explain every line of the code if asked? Yes

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 6, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 65.95%. Comparing base (601e597) to head (4cd3eb8).

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2886      +/-   ##
============================================
- Coverage     72.18%   65.95%   -6.23%     
  Complexity      930      930              
============================================
  Files          1122     1059      -63     
  Lines         93502    88695    -4807     
  Branches      70851    70773      -78     
============================================
- Hits          67494    58500    -8994     
- Misses        23441    27927    +4486     
+ Partials       2567     2268     -299     
Components Coverage Δ
Rust Core 72.91% <ø> (-0.03%) ⬇️
Java SDK 62.08% <ø> (ø)
C# SDK 12.59% <ø> (-55.05%) ⬇️
Python SDK 81.43% <ø> (ø)
Node SDK 87.29% <ø> (-4.12%) ⬇️
Go SDK 8.35% <ø> (-30.34%) ⬇️
see 214 files with indirect coverage changes
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Comment on lines +19 to +30
//! RowBinary / RowBinaryWithDefaults byte serialization.
//!
//! Follows the ClickHouse binary format specification:
//! <https://clickhouse.com/docs/en/interfaces/formats#rowbinary>
//!
//! Key layout rules:
//! - All integers are **little-endian**.
//! - Strings are prefixed with an **unsigned LEB128 varint** length.
//! - `Nullable(T)`: 1-byte null marker (`0x01` = null, `0x00` = not null)
//! followed by T bytes when not null.
//! - `RowBinaryWithDefaults`: each top-level column is preceded by a 1-byte
//! flag (`0x01` = use server DEFAULT, `0x00` = value follows).
Copy link
Copy Markdown

@abonander abonander Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind explaining the reasoning to choose a bespoke implementation here over using the official Rust client? It uses HTTP and RowBinary serialization by default, so it's not clear what's being gained here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The official client is not suitable for use with Iggy because it requires the target table schema to be defined at compile time using statically typed Rust structs. In contrast, Iggy connectors expect the schema to be provided dynamically via configuration.

Even if the ClickHouse client were used, a dynamic encoder would still need to be implemented to convert runtime data into the required binary format. In that case, the client would only simplify some HTTP request handling while leaving the core complexity unresolved.

Supporting the binary ingestion format is important because it provides the best ingestion performance in ClickHouse.

Let me know if this addresses your question, or if there are other considerations I may have overlooked.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We hadn't previously thought a whole lot about supporting dynamic serialization, but with your feedback as well as some comments we've received privately, it definitely seems like something we should look at. Thank you.

@kriti-sc kriti-sc requested a review from abonander March 7, 2026 10:58
Comment on lines +194 to +208
let f = coerce_f64(value)?;
let scale_factor = 10f64.powi(*scale as i32);
let int_val = (f * scale_factor).round() as i128;
if *precision <= 9 {
buf.extend_from_slice(&(int_val as i32).to_le_bytes());
} else if *precision <= 18 {
buf.extend_from_slice(&(int_val as i64).to_le_bytes());
} else {
// Int128: two little-endian 64-bit words, low word first
let lo = int_val as i64;
let hi = (int_val >> 64) as i64;
buf.extend_from_slice(&lo.to_le_bytes());
buf.extend_from_slice(&hi.to_le_bytes());
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The as i32 / as i64 casts on int_val are truncating in Rust. if the incoming value exceeds the column's declared precision, the lower bits silently wrap around and you'll write wrong data into ClickHouse with no error. ClickHouse won't reject it since RowBinary is trusted input. Could you add bounds checks before the casts and return InvalidRecord if the scaled value doesn't fit?

@github-actions
Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs.

If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR.

Thank you for your contribution!

@github-actions github-actions bot added stale Inactive issue or pull request and removed stale Inactive issue or pull request labels Mar 17, 2026
Copy link
Copy Markdown
Contributor

@hubcio hubcio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall good direction, just needs a little bit polishing

"IPv6" => Ok(ChType::IPv6),

// ── Explicitly unsupported ─────────────────────────────────────────
s if s.starts_with("LowCardinality") => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the error message says "LowCardinality uses dictionary encoding that is not supported in RowBinary mode" - this is incorrect. clickhouse docs explicitly state that in RowBinary format, LowCardinality columns are serialized identically to their underlying type - no dictionary encoding on the wire.

since clickhouse auto-wraps String columns in LowCardinality(String) by default, this makes RowBinary mode fail on nearly all production tables. fix: unwrap LowCardinality(T) and parse T:

if let Some(inner) = strip_wrapper(s, "LowCardinality") {
    return parse_type_inner(inner);
}

ChType::DateTime64(precision) => {
// Unix time scaled by 10^precision as Int64.
let secs_f64 = coerce_to_unix_seconds_f64(value)?;
let scale = 10i64.pow(*precision as u32) as f64;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10i64.pow(*precision as u32) overflows i64::MAX when precision > 18 - panics in debug, silently wraps in release. clickhouse allows DateTime64 precision 0-9, but the parser at schema.rs:213-215 accepts any u8 without validation. add a bounds check at parse time.

fn coerce_u64(value: &OwnedValue) -> Result<u64, Error> {
match value {
OwnedValue::Static(simd_json::StaticNode::U64(n)) => Ok(*n),
OwnedValue::Static(simd_json::StaticNode::I64(n)) => Ok(*n as u64),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*n as u64 where n is I64(-1) produces u64::MAX (18446744073709551615). silently corrupts all UInt columns. needs a if *n < 0 { return Err(...) } guard.


// ── Decimal ──────────────────────────────────────────────────────────
ChType::Decimal(precision, scale) => {
let f = coerce_f64(value)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all Decimal values are coerced through f64 which has ~15 significant digits. Decimal64 supports 18 digits, Decimal128 supports 38. values above 2^53 are silently rounded before scaling. for string inputs, the value could be parsed directly into a scaled integer without the f64 intermediary.


if is_retryable_status(status) {
attempts += 1;
if attempts >= max_retries {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with max_retries = 3, the loop makes at most 3 total attempts (1 initial + 2 retries), not "retries up to 3 times" as the README says. either change the condition to attempts > max_retries or update the docs to say "total attempts".

let s = coerce_to_string(value)?;
let bytes = s.as_bytes();
// Pad or truncate to exactly n bytes
let mut fixed = vec![0u8; *n];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vec![0u8; *n] heap-allocates for every FixedString column in every row. write directly to buf instead:

let copy_len = bytes.len().min(*n);
buf.extend_from_slice(&bytes[..copy_len]);
buf.extend(std::iter::repeat(0u8).take(n - copy_len));

// Input: standard hyphenated UUID string "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
ChType::Uuid => {
let s = coerce_to_string(value)?;
let hex: String = s.chars().filter(|c| c.is_ascii_hexdigit()).collect();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let hex: String = s.chars().filter(...).collect() allocates 32 bytes on the heap per UUID. UUIDs are common columns - at scale this adds up. parse the UUID in-place from the hyphenated form using known dash positions, or use a [u8; 32] stack buffer.

} else if *precision <= 18 {
buf.extend_from_slice(&(int_val as i64).to_le_bytes());
} else {
// Int128: two little-endian 64-bit words, low word first
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

manual i128 split into two i64 words is unnecessary. buf.extend_from_slice(&int_val.to_le_bytes()) produces the identical 16 bytes and is clearer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement ClickHouse Sink Connector

4 participants