Skip to content

fix: source redis lag/requeue, lane-growth bound, and honest reserved-seam docs#148

Merged
joshua-temple merged 6 commits into
mainfrom
fix/source-correctness
Jun 5, 2026
Merged

fix: source redis lag/requeue, lane-growth bound, and honest reserved-seam docs#148
joshua-temple merged 6 commits into
mainfrom
fix/source-correctness

Conversation

@joshua-temple
Copy link
Copy Markdown
Collaborator

What this change does

Remediates the source review findings (correctness + perf + coverage; no new feature behavior):

  • redis correctness: Lag() no longer hides a caught-up group (dropped the lag>0 guard); ActionNak now honors Result.Requeue via a per-entry requeue floor (with an injectable clock + tests).
  • source-core: batch result over-count is now surfaced as ErrBatchResultCount instead of silently dropped; kafka guards a second Subscribe on a transactional Inlet (errTransactionalSingleSubscribe).
  • bridge: empty EventID now emits a statemachine.exactly_once=false telemetry signal and is documented; DriveTx Save-failure-aborts-transaction path gets the missing test; doc.go documents the DriveTx mode; the drivefunc godoc no longer links a nonexistent type.
  • inert surface (verified): Deduper is actually consumed by source/idempotency — kept, and the misleading "the Hopper uses it" doc corrected to "reserved seam adapted by the idempotency middleware." Batched.SettleBatch documented as a reserved one-call seam (per-message Results make a single-result batch settle wrong). Dead retry AttemptHeader/WithClock removed (writing the header would need backend cooperation = feature work, explicitly skipped).
  • perf: WithMaxLanes (default 4096) bounds the per-key lane map/goroutine growth (the unbounded-growth leak) by folding keys onto a bounded lane set without breaking per-key ordering; laneKey hashes the partition key directly (no []byte alloc); jetstream/redis newMessage headers are now lazy.
  • docs/tests: source/cloudevents gains doc.go + README; added ReceiveBatch, kafka transactional-Subscribe, and cdc error-path coverage.

Note: WithMaxLanes is a new option (additive) rather than silent idle-eviction; flagging since it is the one piece of new public surface here.

Checklist

  • Signed off (DCO), conventional commits; feature work explicitly skipped (header stamping, true dedup wiring)
  • All source modules build/test/vet (-race) clean; examples/sourcedrive + e2e build

…loor

Lag now returns the group's lag from XINFO GROUPS as-is, including zero
when the group is caught up; the prior positive-only guard fell through to
XLEN and reported the full stream length for a fully-consumed group.

ActionNak now records a per-entry redelivery floor when Result.Requeue
exceeds the configured minimum idle, and NakRedeliver holds an entry back
until its floor passes, matching the documented contract. Message headers
are built lazily so a message whose headers are never read pays no
per-message header allocation.

Signed-off-by: Joshua Temple <joshua.temple@stablekernel.com>
…rt docs

Add WithMaxLanes so the Hopper's per-key lane set (and its goroutines and
map entries) is bounded under unbounded key cardinality; keys beyond the
bound fold onto lanes by hash without reordering a key's messages. laneKey
hashes the partition-key string directly, dropping a per-message []byte
allocation.

dispatchBatch now records ErrBatchResultCount on a batch result over-count
so the discarded extra results are visible in traces rather than swallowed.

Correct three inert-surface docs: Deduper is a reserved seam adapted by the
idempotency middleware, not consulted by the Hopper; Batched.SettleBatch is
a one-call settle seam the Hopper does not use (it settles per message); and
remove retry's dead AttemptHeader and WithClock/cfg.now, and make
WithJitterSource's scope honest. Add tests for ReceiveBatch, batch
over-count, WithMaxLanes folding, InProgress/Manual dispositions, and the
cdc DecodeEvent registry-error and RawJSON.As unmarshal-error paths.

Signed-off-by: Joshua Temple <joshua.temple@stablekernel.com>
…ignal

DriveFunc's godoc dropped the broken [Deduper] link for concrete guidance
(use Drive or the idempotency middleware). doc.go now documents the
transactional DriveTx mode alongside the durable and stateless modes.

Drive and DriveTx mark the span with statemachine.exactly_once=false when a
message yields no event id, so the silent loss of dedup is visible in
traces rather than passing unnoticed; EventID's godoc explains it. Add a
DriveTx Save-failure test covering the in-transaction persist-error abort.

Signed-off-by: Joshua Temple <joshua.temple@stablekernel.com>
A transactional inlet is backed by one GroupTransactSession that fences a
single consumer; a second Subscribe could not share it and would silently
return a subscription with no transact session (Begin would then report
not-transactional). Subscribe now rejects the second call with a sentinel
error. Add coverage for the transactional option assembly and the guard.

Signed-off-by: Joshua Temple <joshua.temple@stablekernel.com>
…s): add README and doc.go

jetstream newMessage no longer eagerly builds the header slice; Headers
materializes and caches it on first read, so a message whose headers are
never read pays no per-message header allocation.

Add a README and a doc.go for source/cloudevents, the only source adapter
that lacked both; the package comment moves to doc.go to match the suite's
documentation layout.

Signed-off-by: Joshua Temple <joshua.temple@stablekernel.com>
Signed-off-by: Joshua Temple <joshua.temple@stablekernel.com>
@joshua-temple joshua-temple merged commit 476e33c into main Jun 5, 2026
121 checks passed
@joshua-temple joshua-temple deleted the fix/source-correctness branch June 5, 2026 16:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant