Skip to content

bug(connectors): PollingMessages auto-commit commits offsets before sink processing #2928

@mlevkov

Description

@mlevkov

Description

The AutoCommitWhen::PollingMessages strategy commits consumer group offsets before consume() is called on the sink plugin. This means if consume() fails (and the failure is already silently discarded per #2927), the messages are permanently lost — the consumer group has already advanced past them.

Sequence

1. consumer.next()     → messages received
2. Offsets committed   → consumer group advances  ← PROBLEM
3. process_messages()  → calls sink consume() via FFI
4. consume() fails     → messages already committed, lost forever

Location

core/connectors/runtime/src/sink.rs:

  • Line 421: Consumer configured with AutoCommitWhen::PollingMessages

    .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
  • Lines 266-344 (consume_messages loop): consumer.next() at line 272 triggers the auto-commit, but process_messages() doesn't execute until line 311.

Impact

Combined with #2927 (consume return value discarded), at-least-once delivery is not achievable with the current runtime for any sink connector. If consume() fails and the process restarts, those messages will never be retried because the offsets have already been committed.

Suggested Fix

Change auto-commit strategy to AutoCommitWhen::ConsumerStopped or AutoCommit::Disabled, and commit offsets after successful consume():

// Option A: Commit after processing
.auto_commit(AutoCommit::Disabled)
// ... in the consume loop, after successful consume():
consumer.store_offset(offset).await?;

// Option B: Use AfterPollingMessages if available
.auto_commit(AutoCommit::When(AutoCommitWhen::AfterProcessing))

The exact API depends on the Iggy SDK's consumer group offset management capabilities.

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions