Skip to content

feat(connectors): add MongoDB sink connector#2815

Merged
hubcio merged 10 commits intoapache:masterfrom
amuldotexe:codex/2739-sink-sync
Mar 9, 2026
Merged

feat(connectors): add MongoDB sink connector#2815
hubcio merged 10 commits intoapache:masterfrom
amuldotexe:codex/2739-sink-sync

Conversation

@amuldotexe
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Partially addresses #2739 (MongoDB sink only).
MongoDB source support will follow in a separate PR.

Rationale

We need a MongoDB sink connector with explicit failure behavior so writes are never reported as successful when they are not.

What changed?

Before this change, MongoDB sink support was missing for connector runtime users.
This PR adds the MongoDB sink connector, including insert/write logic, retry handling for transient failures, explicit duplicate-key failure behavior, metadata mapping, and delivery-semantics documentation.
It also adds sink-focused integration and unit tests to validate payload formats, batch behavior, auto-create collection behavior, and non-silent failure paths.

Local Execution

  • Passed: cargo fmt --all -- --check

  • Passed: cargo clippy -p iggy_connector_mongodb_sink --all-targets -- -D warnings

  • Passed: cargo test -p iggy_connector_mongodb_sink (13 passed)

  • Passed: cargo test -p integration --test mod -- mongodb_sink (10 passed)

  • Pre-commit hooks ran

  • Docker E2E proof image: ghcr.io/amuldotexe/iggy-mongodb-sink-demo:issue-2739-05fbec16

  • Docker smoke result: SMOKE_OK ... docs=3

  • GHCR package: https://github.com/users/amuldotexe/packages/container/package/iggy-mongodb-sink-demo

  • Key sink tests:

  • duplicate_key_is_explicit_failure_and_not_silent_success

  • ordered_duplicate_partial_insert_has_exact_accounting

  • schema_validation_mid_batch_surfaces_hard_error_and_partial_prefix

  • write_concern_timeout_does_not_report_full_success

  • retryable_write_failover_keeps_single_doc_per_id

  • no_writes_performed_label_path_preserves_state_accuracy

  • json_messages_sink_to_mongodb

  • binary_messages_sink_as_bson_binary

  • large_batch_processed_correctly

  • auto_create_collection_on_open

  • given_no_client_should_return_error_not_silent_ok

AI Usage

  1. Which tools?
    Codex, Claude Code, Rust Rover
  2. Scope of usage?
    PRD, connector precedent analysis, TDD, implementation, and PR prep
  3. How did you verify the generated code works correctly?
    Ran all local format, clippy, unit, and integration checks listed above
  4. Can you explain every line of the code if asked?
    Yes

@amuldotexe
Copy link
Copy Markdown
Contributor Author

amuldotexe commented Feb 25, 2026

Docker proof of working (sink-only demo artifact, outside PR code scope):

Quick pull:

docker pull ghcr.io/amuldotexe/iggy-mongodb-sink-demo:issue-2739-05fbec16

Copy link
Copy Markdown
Contributor

@krishvishal krishvishal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amuldotexe I've added few comments.

It seems that inserts in mongodb can be fail after partial writes. Check if other places where insert is used are effected by this and handle them accordingly.

@amuldotexe
Copy link
Copy Markdown
Contributor Author

Thanks @krishvishal for the detailed feedback

I will work on this and get back with an updated PR

@codecov
Copy link
Copy Markdown

codecov bot commented Feb 26, 2026

Codecov Report

❌ Patch coverage is 87.05036% with 72 lines in your changes missing coverage. Please review.
✅ Project coverage is 70.36%. Comparing base (7d20f31) to head (b806bab).
⚠️ Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
core/connectors/sinks/mongodb_sink/src/lib.rs 87.05% 59 Missing and 13 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2815      +/-   ##
============================================
+ Coverage     70.25%   70.36%   +0.11%     
  Complexity      763      763              
============================================
  Files          1040     1041       +1     
  Lines         85888    86443     +555     
  Branches      62164    62728     +564     
============================================
+ Hits          60338    60827     +489     
- Misses        23035    23081      +46     
- Partials       2515     2535      +20     
Flag Coverage Δ
csharp 67.43% <ø> (-0.19%) ⬇️
go 36.37% <ø> (ø)
java 55.15% <ø> (ø)
node 91.48% <ø> (-0.04%) ⬇️
python 81.57% <ø> (ø)
rust 70.68% <87.05%> (+0.15%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
core/connectors/runtime/src/sink.rs 74.51% <ø> (+0.17%) ⬆️
core/connectors/sdk/src/lib.rs 56.00% <ø> (ø)
core/connectors/sdk/src/sink.rs 76.82% <ø> (+10.59%) ⬆️
core/connectors/sinks/mongodb_sink/src/lib.rs 87.05% <87.05%> (ø)

... and 10 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@atharvalade
Copy link
Copy Markdown
Contributor

Adding to @krishvishal points which already cover a lot of ground. Two more things I think are blockers here.

First, there's a liveness issue with how partial failures interact with offset commits. When any batch inside process_messages fails, the whole call returns Err, so the consumer offset never advances. But ordered insert_many already committed the documents before the failure point. On the next poll the upstream re-sends everything, those already committed docs hit duplicate _id errors, which are non-transient so retries fail immediately, offset still doesn't commit, and you're stuck in a permanent loop. The connector can never recover from even a single transient mid-batch failure.

Second, message.id.to_string() as the MongoDB _id isn't safe when you're consuming multiple topics into the same collection. Two unrelated messages from different topics can easily share the same id, and whichever one lands first wins while the other silently fails. The _id should be a composite of stream, topic, partition, and message id to avoid collisions and quiet data loss.

@amuldotexe
Copy link
Copy Markdown
Contributor Author

Addressed the review feedback in 07bd2317.

What changed:

  • Removed the shared runtime sink callback-status change from this PR. core/connectors/runtime/src/sink.rs is back to connector-runtime behavior, so the MongoDB PR no longer changes sink behavior for all connectors.
  • Completed the remaining range-safe metadata handling in the MongoDB sink for checksum, timestamp, and origin timestamp.
  • Kept the sink-local fixes already on the branch: ordered(false) batch inserts, duplicate-key replay tolerance, composite _id, AtomicU64 counters, config normalization in new(), and warning on unknown payload_format.
  • Re-scoped the two MongoDB integration tests that had been asserting runtime last_error / ConnectorStatus::Error semantics. They now validate connector-local MongoDB outcomes only.

Validation run on this head:

  • cargo clippy -p iggy_connector_mongodb_sink -p iggy-connectors --all-targets --all-features -- -D warnings
  • cargo test -p iggy-connectors
  • cargo test -p iggy_connector_mongodb_sink
  • cargo build -p server --bin iggy-server
  • cargo build -p iggy-connectors --bin iggy-connectors
  • cargo build -p iggy_connector_mongodb_sink --lib
  • cargo test -p integration --test mod connectors::mongodb::mongodb_sink::
  • cargo build --all-targets --all-features

Current PR head: 07bd2317f52b25484e91c0c697dd43d32ec4b2d9.

@krishvishal @atharvalade please take another look when convenient.

Copy link
Copy Markdown
Contributor

@krishvishal krishvishal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All review comments addressed. LGTM.

numinnex
numinnex previously approved these changes Mar 9, 2026
@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented Mar 9, 2026

@amuldotexe looks good, but please fix the CI. after that we can merge.

@amuldotexe
Copy link
Copy Markdown
Contributor Author

@hubcio fixed CI; was an unused import

@hubcio hubcio merged commit f914860 into apache:master Mar 9, 2026
61 checks passed
@avirajkhare00
Copy link
Copy Markdown
Contributor

Following up here for continuity:

Referencing both here in case anyone revisits this thread later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants