Skip to content

feat(connectors): Delta Lake Sink Connector#2889

Open
kriti-sc wants to merge 11 commits intoapache:masterfrom
kriti-sc:delta-sink
Open

feat(connectors): Delta Lake Sink Connector#2889
kriti-sc wants to merge 11 commits intoapache:masterfrom
kriti-sc:delta-sink

Conversation

@kriti-sc
Copy link
Copy Markdown
Contributor

@kriti-sc kriti-sc commented Mar 7, 2026

Which issue does this PR close?

Closes #1852

Rationale

Delta Lake is a data analytics engine, and very popular in modern streaming analytics architectures.

What changed?

Introduces a Delta Lake Sink Connector that enables writing data from Iggy to Delta Lake.

The Delta Lake writing logic is heavily inspired by the kafka-delta-ingest project, to have a proven starting ground for writing to Delta Lake.

Local Execution

  1. Produced 32632 messages with schema user_id: String, user_type: u8, email: String, source: String, state: String, created_at: DateTime<Utc>, message: String using sample data producer.
  2. Consumed messages using the Delta Lake sink and created a Delta table on filesystem.
  3. Verified number of rows in delta table and the schema.
  4. Added unit tests and e2e tests, both passing.
image Left: messages produced; Right(top): messages consumed by Delta sink; Right(bottom): Inspecting Delta table in python

AI Usage

If AI tools were used, please answer:

  1. Which tools? Claude Code
  2. Scope of usage? generated functions
  3. How did you verify the generated code works correctly? Manual testing by producing data into Iggy and then running the sink and verifying local Delta Lake creation, unit tests and e2e tests for local Delta Lake and Delta Lake on S3.
  4. Can you explain every line of the code if asked? Yes

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 7, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 72.02%. Comparing base (93d7f89) to head (0be9027).
⚠️ Report is 2 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2889      +/-   ##
============================================
- Coverage     72.17%   72.02%   -0.15%     
  Complexity      930      930              
============================================
  Files          1122     1124       +2     
  Lines         93502    93430      -72     
  Branches      70851    70789      -62     
============================================
- Hits          67488    67297     -191     
- Misses        23447    23563     +116     
- Partials       2567     2570       +3     
Components Coverage Δ
Rust Core 72.73% <ø> (-0.19%) ⬇️
Java SDK 62.08% <ø> (ø)
C# SDK 67.43% <ø> (-0.21%) ⬇️
Python SDK 81.43% <ø> (ø)
Node SDK 91.53% <ø> (ø)
Go SDK 38.68% <ø> (ø)
see 40 files with indirect coverage changes
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

@hubcio hubcio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall good direction, but needs a bit refining. thanks for contribution @kriti-sc

})?;

// Flush buffers to object store and commit to Delta log
let version = match state.writer.flush_and_commit(&mut state.table).await {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every consume() call does flush_and_commit, which creates a new parquet file and a new JSON transaction log entry in _delta_log/. at high throughput with, say, 1000-message batches at 1M ops/sec, that's 1000 parquet files and 1000 log entries per second. delta lake degrades catastrophically under this - metadata parsing slows, checkpoint overhead grows, and cloud object store LIST calls become a bottleneck. the sink needs a buffering strategy: accumulate across multiple consume() calls and flush on a configurable row count, byte threshold, or time interval - not per batch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are two knobs poll_interval and batch_size to control delta write frequency and file size. Does this address your concern or are you thinking of a different issue @hubcio ?

let version = match state.writer.flush_and_commit(&mut state.table).await {
Ok(v) => v,
Err(e) => {
state.writer.reset();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when write() succeeds (line 133) but flush_and_commit() fails here, reset() clears the internal parquet buffer. those messages are permanently lost with no retry path. since the connector runtime uses AutoCommitWhen::PollingMessages (offset committed before consume), the consumer offset may already have been advanced past these messages. there's no retry, no DLQ, no metric - the messages just vanish.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Was planning to propose a DLQ strategy across sinks and sources. Is it ok to defer this concern for later? @hubcio

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, added a TODO for implementing a retry strategy and metrics.

StructField::new("count", DataType::Primitive(PrimitiveType::Integer), true),
StructField::new("amount", DataType::Primitive(PrimitiveType::Double), true),
StructField::new("active", DataType::Primitive(PrimitiveType::Boolean), true),
StructField::new("timestamp", DataType::Primitive(PrimitiveType::Long), true),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test fixture declares timestamp as PrimitiveType::Long, not PrimitiveType::Timestamp. this means all integration tests bypass the coercion logic entirely. an e2e test with a Timestamp-typed column would catch issues like the TimestampNtz gap and null-to-"null" coercion bug at the integration level.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on fixing the schema type. Timestamp better reflects real usage.

Disagree on the second part. Catching specific bugs like the TimestampNtz gap or null coercion is the unit test's responsibility. My rubric with the integration test is to verify the happy path end-to-end.

@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented Mar 26, 2026

@kriti-sc I will finish review after both #2925 and #2933 get merged and you rebase against these.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Delta Lake connectors

4 participants