From 4f640b33ca1b457a67c82d902925a4bda3c6c0e4 Mon Sep 17 00:00:00 2001 From: Joshua Temple Date: Fri, 5 Jun 2026 10:07:05 -0400 Subject: [PATCH 1/6] fix(source/redis): report caught-up group lag and honor Nak requeue floor Lag now returns the group's lag from XINFO GROUPS as-is, including zero when the group is caught up; the prior positive-only guard fell through to XLEN and reported the full stream length for a fully-consumed group. ActionNak now records a per-entry redelivery floor when Result.Requeue exceeds the configured minimum idle, and NakRedeliver holds an entry back until its floor passes, matching the documented contract. Message headers are built lazily so a message whose headers are never read pays no per-message header allocation. Signed-off-by: Joshua Temple --- source/redis/message.go | 48 +++++++++++-------- source/redis/redis.go | 77 +++++++++++++++++++++++++++--- source/redis/redis_test.go | 95 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 195 insertions(+), 25 deletions(-) diff --git a/source/redis/message.go b/source/redis/message.go index e9eb193..1fbf51f 100644 --- a/source/redis/message.go +++ b/source/redis/message.go @@ -32,30 +32,29 @@ func (c idCursor) String() string { return string(c) } // message adapts a go-redis XMessage onto [source.Message]. The vendor entry is // reachable only through As; the neutral accessors expose the common surface. -// The fields are snapshotted at construction so the neutral view is a stable -// value independent of later driver state. +// The entry is a value copy taken from go-redis, so the neutral view is stable +// independent of later driver state. The header slice is built lazily on the +// first [message.Headers] call and cached, so a message whose headers a handler +// never reads pays no per-message header allocation. type message struct { entry goredis.XMessage stream string headers source.Headers + built bool // headers has been materialized key []byte value []byte } // newMessage wraps a go-redis XMessage from stream, projecting its fields onto // the neutral surface: the [ValueField] field (when present) becomes the raw -// value, the [KeyHeader] field (when present) becomes the routing key, and every -// field is exposed as a header. The key falls back to the stream name so the -// Hopper always has a deterministic shard key on a backend with no partitions. +// value and the [KeyHeader] field (when present) becomes the routing key. The +// key falls back to the stream name so the Hopper always has a deterministic +// shard key on a backend with no partitions. Headers are deferred to the first +// [message.Headers] call. func newMessage(stream string, entry goredis.XMessage) *message { - headers := make(source.Headers, 0, len(entry.Values)) var value []byte - for k, v := range entry.Values { - s := toString(v) - headers = append(headers, source.Header{Key: k, Value: s}) - if k == ValueField { - value = []byte(s) - } + if v, ok := entry.Values[ValueField]; ok { + value = []byte(toString(v)) } key := []byte(stream) @@ -66,11 +65,10 @@ func newMessage(stream string, entry goredis.XMessage) *message { } return &message{ - entry: entry, - stream: stream, - headers: headers, - key: key, - value: value, + entry: entry, + stream: stream, + key: key, + value: value, } } @@ -99,8 +97,20 @@ func (m *message) Key() []byte { return m.key } // nil. The full set of entry fields remains available through [message.Headers]. func (m *message) Value() []byte { return m.value } -// Headers returns the entry's fields as a value-type slice. -func (m *message) Headers() source.Headers { return m.headers } +// Headers returns the entry's fields as a value-type slice, materializing it on +// first call and caching it so repeated reads are cheap. The order follows +// go-redis's map iteration and is not significant; a handler keys by header name. +func (m *message) Headers() source.Headers { + if !m.built { + headers := make(source.Headers, 0, len(m.entry.Values)) + for k, v := range m.entry.Values { + headers = append(headers, source.Header{Key: k, Value: toString(v)}) + } + m.headers = headers + m.built = true + } + return m.headers +} // Subject returns the stream the entry arrived on. func (m *message) Subject() string { return m.stream } diff --git a/source/redis/redis.go b/source/redis/redis.go index 2d9c5f6..d9a2bdb 100644 --- a/source/redis/redis.go +++ b/source/redis/redis.go @@ -128,6 +128,7 @@ type config struct { block time.Duration count int64 minIdle time.Duration + now func() time.Time } // Option configures an [Inlet] at construction. Options compose; later options @@ -172,6 +173,18 @@ func WithCount(n int64) Option { return func(c *config) { c.count = n } } // default. func WithMinIdle(d time.Duration) Option { return func(c *config) { c.minIdle = d } } +// WithClock injects the clock the subscription reads when it stamps and checks +// the per-entry requeue floor a [source.Result.Requeue] raises (see Settle and +// NakRedeliver). It exists so a test can drive redelivery timing +// deterministically; production leaves it at time.Now. A nil clock is ignored. +func WithClock(now func() time.Time) Option { + return func(c *config) { + if now != nil { + c.now = now + } + } +} + // New builds an [Inlet] from opts. Exactly one of [WithAddr] or [WithClient] // must be supplied, and [WithGroup] and [WithConsumer] are required. When // [WithAddr] is used New dials the client eagerly so misconfiguration surfaces @@ -236,6 +249,7 @@ func (in *Inlet) Subscribe(_ context.Context, cfg source.SubscribeConfig) (sourc block: in.cfg.block, count: in.cfg.count, minIdle: in.cfg.minIdle, + now: in.cfg.now, startID: "0", // create the group at the stream origin by default readFrom: ">", // read new (never-delivered) entries by default }, nil @@ -295,6 +309,25 @@ type subscription struct { pending []source.Message // buffered, fetched-but-not-yet-yielded entries inflight int // entries delivered by Next, not yet settled readFrom string // ">" for new entries; an ID to drain the backlog + + // requeueAfter records the earliest wall-clock time a nak'd entry may be + // reclaimed for redelivery, keyed by entry ID. A Nak whose + // [source.Result.Requeue] exceeds the configured min-idle raises that + // entry's floor; NakRedeliver skips an entry until its floor has passed. + // An entry is removed from the map when it is reclaimed or acked. + requeueAfter map[string]time.Time + // now is the clock NakRedeliver and Nak read, injected so a test can drive + // the per-entry requeue floor deterministically. Nil means time.Now. + now func() time.Time +} + +// clock returns the subscription's injected clock, defaulting to time.Now. The +// caller holds s.mu. +func (s *subscription) clock() func() time.Time { + if s.now != nil { + return s.now + } + return time.Now } // ensureGroup creates the consumer group (with MKSTREAM) once, idempotently @@ -427,8 +460,17 @@ func (s *subscription) Settle(ctx context.Context, m source.Message, r source.Re return s.ack(ctx, entry.ID) case source.ActionNak: // Leave the entry in the PEL; a future pending scan (NakRedeliver) claims - // and redelivers it once it has been idle long enough. The requeue delay - // raises that idle floor when larger than the configured minimum. + // and redelivers it once it has been idle long enough. A requeue delay + // larger than the configured minimum raises this entry's redelivery floor + // so NakRedeliver holds it back until the delay has elapsed. + if r.Requeue > s.minIdle { + s.mu.Lock() + if s.requeueAfter == nil { + s.requeueAfter = make(map[string]time.Time) + } + s.requeueAfter[entry.ID] = s.clock()().Add(r.Requeue) + s.mu.Unlock() + } return nil case source.ActionTerm: if err := s.deadLetter(ctx, m, entry, r); err != nil { @@ -459,6 +501,9 @@ func (s *subscription) ack(ctx context.Context, id string) error { if err := s.client.XAck(ctx, s.stream, s.group, id).Err(); err != nil { return fmt.Errorf("redis: ack %s: %w", id, err) } + s.mu.Lock() + delete(s.requeueAfter, id) + s.mu.Unlock() return nil } @@ -520,10 +565,21 @@ func (s *subscription) NakRedeliver(ctx context.Context, minIdle time.Duration) if len(pend) == 0 { return 0, nil } + // Honor any per-entry requeue floor a Nak raised: an entry whose floor has + // not yet passed is held back from this redelivery cycle. + now := s.clock()() ids := make([]string, 0, len(pend)) + s.mu.Lock() for _, p := range pend { + if until, held := s.requeueAfter[p.ID]; held && now.Before(until) { + continue + } ids = append(ids, p.ID) } + s.mu.Unlock() + if len(ids) == 0 { + return 0, nil + } claimed, err := s.client.XClaim(ctx, &goredis.XClaimArgs{ Stream: s.stream, Group: s.group, @@ -537,6 +593,7 @@ func (s *subscription) NakRedeliver(ctx context.Context, minIdle time.Duration) s.mu.Lock() for _, e := range claimed { s.pending = append(s.pending, newMessage(s.stream, e)) + delete(s.requeueAfter, e.ID) } s.mu.Unlock() return len(claimed), nil @@ -597,6 +654,13 @@ func (s *subscription) SeekToEnd(_ context.Context) error { // next Next yields the replayed entries before resuming live delivery. The // entries are claimed to this consumer (they enter the PEL) so they settle // through the normal ack path. The caller passes a context for the range read. +// +// The closed check and the buffer write straddle the XRANGE call rather than +// holding s.mu across the network round-trip (that would stall a concurrent +// Settle). A Close that lands in that window is benign: reseek still buffers the +// replayed entries, and the next Next drains them and then returns +// [source.ErrDrained], exactly as a Close followed by a drain would. Seek and +// Close are not expected to race in practice; the engine drives one fetch loop. func (s *subscription) reseek(ctx context.Context, fromID string) error { s.mu.Lock() if s.closed { @@ -628,10 +692,11 @@ func (s *subscription) Lag(ctx context.Context) (int64, error) { if err == nil { for _, g := range groups { if g.Name == s.group { - if g.Lag > 0 { - return g.Lag, nil - } - break + // XINFO GROUPS reports the group's lag directly, including zero + // when the group is caught up. Return it as-is: a guard that + // only trusted a positive lag would fall through to XLEN and + // report the full stream length for a fully-consumed group. + return g.Lag, nil } } } diff --git a/source/redis/redis_test.go b/source/redis/redis_test.go index 05629eb..6e01a5e 100644 --- a/source/redis/redis_test.go +++ b/source/redis/redis_test.go @@ -116,6 +116,23 @@ func (f *fakeClient) XClaim(ctx context.Context, a *goredis.XClaimArgs) *goredis cmd.SetErr(f.claimErr) return cmd } + // Honor the requested IDs: XClaim only reassigns the entries it is asked for, + // so a test that holds an entry back (its requeue floor not yet passed) sees + // it excluded from the claim arguments and so from the returned set. + if len(a.Messages) > 0 { + want := make(map[string]bool, len(a.Messages)) + for _, id := range a.Messages { + want[id] = true + } + out := make([]goredis.XMessage, 0, len(f.claimOut)) + for _, e := range f.claimOut { + if want[e.ID] { + out = append(out, e) + } + } + cmd.SetVal(out) + return cmd + } cmd.SetVal(f.claimOut) return cmd } @@ -620,6 +637,66 @@ func TestNakRedeliver_ClaimErrorPropagates(t *testing.T) { } } +func TestNak_RequeueRaisesPerEntryFloor(t *testing.T) { + t.Parallel() + // A controllable clock lets the test advance time deterministically across + // the per-entry requeue floor a Nak with a large Requeue raises. + now := time.Unix(1000, 0) + clock := func() time.Time { return now } + + c := &fakeClient{ + readBatches: [][]goredis.XMessage{{entry("1-0", map[string]any{ValueField: "a"})}}, + pendingOut: []goredis.XPendingExt{{ID: "1-0", RetryCount: 1}}, + claimOut: []goredis.XMessage{entry("1-0", map[string]any{ValueField: "a"})}, + } + // minIdle small so only the per-entry Requeue floor gates redelivery. + sub, _ := newSub(t, c, WithMinIdle(time.Millisecond), WithClock(clock)) + + m, err := sub.Next(context.Background()) + if err != nil { + t.Fatalf("Next() error = %v", err) + } + // Nak with a Requeue (10s) far larger than minIdle: the entry's floor is now+10s. + if err := sub.Settle(context.Background(), m, source.NakAfter(10*time.Second, errors.New("retry"))); err != nil { + t.Fatalf("Settle(nak) error = %v", err) + } + + // Before the floor passes, NakRedeliver must hold the entry back. + if n, err := sub.NakRedeliver(context.Background(), 0); err != nil || n != 0 { + t.Fatalf("NakRedeliver() before floor = %d,%v want 0,nil", n, err) + } + + // Advance past the floor: the entry is now eligible and is reclaimed. + now = now.Add(11 * time.Second) + n, err := sub.NakRedeliver(context.Background(), 0) + if err != nil { + t.Fatalf("NakRedeliver() after floor error = %v", err) + } + if n != 1 { + t.Fatalf("NakRedeliver() after floor = %d, want 1", n) + } +} + +func TestNak_NoRequeueLeavesFloorUnraised(t *testing.T) { + t.Parallel() + // A plain Nak (no Requeue, or one below minIdle) raises no floor, so the + // entry redelivers on the next scan governed by minIdle alone. + c := &fakeClient{ + readBatches: [][]goredis.XMessage{{entry("1-0", map[string]any{ValueField: "a"})}}, + pendingOut: []goredis.XPendingExt{{ID: "1-0", RetryCount: 1}}, + claimOut: []goredis.XMessage{entry("1-0", map[string]any{ValueField: "a"})}, + } + sub, _ := newSub(t, c, WithMinIdle(time.Millisecond)) + m, _ := sub.Next(context.Background()) + if err := sub.Settle(context.Background(), m, source.Nak(errors.New("retry"))); err != nil { + t.Fatalf("Settle(nak) error = %v", err) + } + n, err := sub.NakRedeliver(context.Background(), 0) + if err != nil || n != 1 { + t.Fatalf("NakRedeliver() = %d,%v want 1,nil", n, err) + } +} + // --- capabilities: seek (replay by entry ID) -------------------------------- func TestSeek_BuffersBacklog(t *testing.T) { @@ -725,6 +802,24 @@ func TestLag_PrefersGroupLag(t *testing.T) { } } +func TestLag_GroupCaughtUpReportsZero(t *testing.T) { + t.Parallel() + // A caught-up group reports Lag 0 from XINFO GROUPS. The reporter must return + // that zero, not fall through to XLEN and report the full stream length. + c := &fakeClient{ + groups: []goredis.XInfoGroup{{Name: "workers", Lag: 0}}, + xlen: 999, + } + sub, _ := newSub(t, c) + got, err := sub.Lag(context.Background()) + if err != nil { + t.Fatalf("Lag() error = %v", err) + } + if got != 0 { + t.Errorf("Lag() = %d, want 0 (group caught up)", got) + } +} + func TestLag_FallsBackToXLen(t *testing.T) { t.Parallel() c := &fakeClient{groupsErr: errors.New("no groups info"), xlen: 7} From e48b840594b6c7c2007303b79f6adace77ef31e5 Mon Sep 17 00:00:00 2001 From: Joshua Temple Date: Fri, 5 Jun 2026 10:12:17 -0400 Subject: [PATCH 2/6] fix(source): bound lane growth, surface batch over-count, correct inert docs Add WithMaxLanes so the Hopper's per-key lane set (and its goroutines and map entries) is bounded under unbounded key cardinality; keys beyond the bound fold onto lanes by hash without reordering a key's messages. laneKey hashes the partition-key string directly, dropping a per-message []byte allocation. dispatchBatch now records ErrBatchResultCount on a batch result over-count so the discarded extra results are visible in traces rather than swallowed. Correct three inert-surface docs: Deduper is a reserved seam adapted by the idempotency middleware, not consulted by the Hopper; Batched.SettleBatch is a one-call settle seam the Hopper does not use (it settles per message); and remove retry's dead AttemptHeader and WithClock/cfg.now, and make WithJitterSource's scope honest. Add tests for ReceiveBatch, batch over-count, WithMaxLanes folding, InProgress/Manual dispositions, and the cdc DecodeEvent registry-error and RawJSON.As unmarshal-error paths. Signed-off-by: Joshua Temple --- source/batch.go | 5 +++ source/batch_test.go | 64 ++++++++++++++++++++++++++++++++++++++ source/capability.go | 40 ++++++++++++++++++------ source/cdc/cdc_test.go | 35 +++++++++++++++++++++ source/hopper.go | 38 +++++++++++++++++----- source/hopper_test.go | 60 +++++++++++++++++++++++++++++++++++ source/options.go | 28 +++++++++++++++++ source/retry/retry.go | 32 ++++--------------- source/retry/retry_test.go | 23 +++++++++----- 9 files changed, 275 insertions(+), 50 deletions(-) diff --git a/source/batch.go b/source/batch.go index fa14faf..f5afc2e 100644 --- a/source/batch.go +++ b/source/batch.go @@ -346,7 +346,12 @@ func (hp *Hopper) dispatchBatch(ctx context.Context, bh BatchHandler, buf []*del hp.settle(ctx, span, d, r) } + // Surface a count mismatch loudly rather than letting it pass. An under-count + // already terminated each unmatched message as poison above; an over-count + // (more results than messages) silently discards the extra results, so record + // the sentinel here too so the discard is visible in traces, not swallowed. if len(results) != len(live) { + span.RecordError(ErrBatchResultCount) span.SetStatus(telemetry.StatusError, "batch result count mismatch") } } diff --git a/source/batch_test.go b/source/batch_test.go index 32edad6..ae1a3a5 100644 --- a/source/batch_test.go +++ b/source/batch_test.go @@ -217,6 +217,70 @@ func TestHopper_BatchResultCountMismatch(t *testing.T) { } } +// TestHopper_BatchResultOverCount checks that returning more results than the +// batch holds settles every message and does not panic: the extra results are +// discarded, and the mismatch is surfaced (recorded on the span) rather than +// swallowed. +func TestHopper_BatchResultOverCount(t *testing.T) { + t.Parallel() + h := memsource.NewHarness(t, + []source.Option{source.WithBatch(2, 0)}, + memsource.Msg{Key: "k"}, + memsource.Msg{Key: "k"}, + ) + h.RunBatch(func(_ context.Context, ms []source.Message) []source.Result { + // Three results for two messages: the third is extra and must be dropped + // without affecting the two real messages. + return []source.Result{source.Ack(), source.Ack(), source.Ack()} + }) + h.AssertCounts(memsource.Counts{Acked: 2}) +} + +// TestHopper_ReceiveBatchSubscribesAndRuns covers the ReceiveBatch entry point, +// the batch analog of Receive: it subscribes through the inlet, drives the batch +// handler, and settles every message. +func TestHopper_ReceiveBatchSubscribesAndRuns(t *testing.T) { + t.Parallel() + in := memsource.New( + memsource.WithMessages( + memsource.Msg{Key: "k", Value: []byte("a")}, + memsource.Msg{Key: "k", Value: []byte("b")}, + ), + ) + hp := source.New(source.WithBatch(2, 0)) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- hp.ReceiveBatch(ctx, in, source.SubscribeConfig{Topics: []string{"t"}}, + func(_ context.Context, ms []source.Message) []source.Result { + res := make([]source.Result, len(ms)) + for i := range res { + res[i] = source.Ack() + } + return res + }) + }() + + // ReceiveBatch does not auto-close the subscription's stream; cancel to drain + // once the queued messages have been handled. + time.Sleep(50 * time.Millisecond) + cancel() + select { + case err := <-done: + if err != nil { + t.Fatalf("ReceiveBatch returned %v, want nil", err) + } + case <-time.After(2 * time.Second): + t.Fatal("ReceiveBatch did not return") + } + if c := in.Ledger().Counts(); c.Acked != 2 { + t.Fatalf("acked = %d, want 2", c.Acked) + } +} + // TestHopper_BatchDecodeFailureIsolated checks one undecodable message terminates // alone without poisoning its batch-mates. func TestHopper_BatchDecodeFailureIsolated(t *testing.T) { diff --git a/source/capability.go b/source/capability.go index 6243ad3..93cd6f9 100644 --- a/source/capability.go +++ b/source/capability.go @@ -103,13 +103,26 @@ type OrderedDelivery interface { } // Batched is a [Subscription] that can yield and settle messages in batches, -// amortizing per-message overhead. The [Hopper] uses it when present to fetch and -// ack in groups; an unbatched subscription is driven one message at a time. +// amortizing per-message overhead. The [Hopper]'s batch mode (see +// [Hopper.RunBatch]) uses [Batched.NextBatch] to fetch whole batches from the +// backend when this capability is present; an unbatched subscription is fetched +// one message at a time. +// +// Settlement is a separate decision. A [BatchHandler] returns one [Result] per +// message, so the Hopper settles each message by its own Result through +// [Subscription.Settle] rather than collapsing a batch onto a single outcome — +// a slow handler that fails the third message of five must not nak the other +// four. [SettleBatch] is therefore a reserved seam the Hopper does not call: it +// is the one-call settle path for an adapter or a caller that already knows a +// whole batch shares one outcome (a uniform ack after a bulk commit), not a +// substitute for per-message settlement. type Batched interface { // NextBatch returns up to limit messages, blocking for at least one, or // ctx.Err()/ErrDrained as [Subscription.Next] would. NextBatch(ctx context.Context, limit int) ([]Message, error) - // SettleBatch applies r to every message in ms in one call. + // SettleBatch applies the single result r to every message in ms in one call. + // It is for a caller settling a uniform batch directly; the Hopper settles per + // message (one Result each) and does not call it. SettleBatch(ctx context.Context, ms []Message, r Result) error } @@ -175,13 +188,22 @@ type ProducedRecord struct { Headers Headers } -// Deduper is a [Subscription] (or an inlet seam) that suppresses re-delivery of -// an already-processed message by an idempotency key. The no-op default is "no -// deduplication"; the state-machine bridge supplies the machine's state version -// as the key so redelivery is provably idempotent with no external store. +// Deduper suppresses re-delivery of an already-processed message by an +// idempotency key. It is a reserved seam, not a capability the [Hopper] consults +// on its own: the Hopper never type-asserts a subscription to Deduper and never +// calls [Deduper.Seen]. Deduplication is opt-in middleware — wire a Deduper into +// the source/idempotency middleware with its WithDeduper option (it adapts a +// Deduper to the middleware's store via FromDeduper) and add that middleware to +// the Hopper, or call Seen yourself from a handler. The no-op default, with no +// such middleware, is "no deduplication". +// +// Exactly-once into a statechart is provided separately by the source/statemachine +// bridge's version idempotency (a redelivered, already-applied event id is +// skipped against the persisted version); that path does not use this interface. type Deduper interface { - // Seen reports whether key has already been processed (and records it if not), - // so the Hopper can skip a duplicate by acking without re-running the handler. + // Seen reports whether key has already been processed, recording it if not, so + // a caller (the idempotency middleware, or a handler) can skip a duplicate by + // acking without re-running the work. Seen(ctx context.Context, key string) (bool, error) } diff --git a/source/cdc/cdc_test.go b/source/cdc/cdc_test.go index 9e383c3..db9d330 100644 --- a/source/cdc/cdc_test.go +++ b/source/cdc/cdc_test.go @@ -266,6 +266,41 @@ func TestDecodeEvent_WrongTypeIsPoison(t *testing.T) { } } +func TestDecodeEvent_RegistryErrorPropagates(t *testing.T) { + t.Parallel() + + // A registry whose codec fails to decode: DecodeEvent returns the registry's + // (poison-classified) decode error rather than masking it. + registry := source.NewRegistry().SetDefault(source.CodecFunc( + func([]byte, source.Headers) (any, error) { + return nil, errors.New("codec exploded") + }, + )) + _, err := cdc.DecodeEvent(registry, changeMessage{value: []byte(`{"op":"c"}`)}) + if err == nil { + t.Fatal("DecodeEvent error = nil, want registry decode error") + } + if !errors.Is(err, source.ErrPoison) { + t.Fatalf("DecodeEvent error = %v, want poison-classified decode error", err) + } +} + +func TestRawJSON_As_UnmarshalErrorReported(t *testing.T) { + t.Parallel() + + // A create carries an after-image; decoding it into an incompatible shape + // surfaces the json.Unmarshal error (wrapped), not ErrMissingImage. + ev := decode(t, []byte(`{"op":"c","after":{"id":1,"name":"ada"}}`)) + var dst int // the after-image is an object; an int cannot hold it + err := ev.After.As(&dst) + if err == nil { + t.Fatal("RawJSON.As error = nil, want unmarshal error") + } + if errors.Is(err, cdc.ErrMissingImage) { + t.Fatalf("RawJSON.As error = %v, want unmarshal error not ErrMissingImage", err) + } +} + func TestSourceHeaders(t *testing.T) { t.Parallel() diff --git a/source/hopper.go b/source/hopper.go index 5bbfcfc..2df4eb0 100644 --- a/source/hopper.go +++ b/source/hopper.go @@ -25,7 +25,10 @@ import ( // A lane is a single goroutine that processes its queue strictly in arrival // order, so two messages with the same key are never reordered, while distinct // keys run in parallel up to [WithConcurrency]. This is the guarantee a -// statechart instance needs: its events arrive in order. +// statechart instance needs: its events arrive in order. The number of distinct +// lanes (and their goroutines) is bounded by [WithMaxLanes]; keys beyond the +// bound share a lane by hash, which serializes only the unrelated colliding keys +// and never reorders a single key's messages. // // Backpressure. [WithMaxInFlight] bounds the messages delivered but not yet // settled; when the window is full the fetch loop blocks before pulling the next @@ -44,6 +47,7 @@ type Hopper struct { middleware []Middleware concurrency int maxInFlight int + maxLanes int batch batchConfig received telemetry.Counter @@ -76,6 +80,7 @@ func New(opts ...Option) *Hopper { middleware: append([]Middleware(nil), cfg.middleware...), concurrency: cfg.concurrency, maxInFlight: cfg.maxInFlight, + maxLanes: cfg.maxLanes, batch: cfg.batch, received: m.Counter("source.received", telemetry.WithDescription("messages received from the subscription")), acked: m.Counter("source.acked", telemetry.WithDescription("messages acknowledged after successful handling")), @@ -349,16 +354,25 @@ func (hp *Hopper) reportLag(ctx context.Context, sub Subscription) { } // laneKey selects the ordered lane for m: its PartitionKey when non-empty, else -// a hash of its Key, else lane 0 (every keyless message shares one ordered -// lane, preserving global order for them). +// a hash of its Key, else lane 0 (every keyless message shares one ordered lane, +// preserving global order for them). The hash is folded onto the bounded lane +// set (see [WithMaxLanes]) so the number of lanes never exceeds maxLanes; a key +// always maps to the same lane, so folding never reorders a key's messages. func (hp *Hopper) laneKey(m Message) uint64 { - if pk := m.PartitionKey(); pk != "" { - return hashBytes([]byte(pk)) + var h uint64 + switch { + case m.PartitionKey() != "": + // Hash the string directly, avoiding a []byte conversion per message. + h = hashString(m.PartitionKey()) + case len(m.Key()) > 0: + h = hashBytes(m.Key()) + default: + return 0 } - if k := m.Key(); len(k) > 0 { - return hashBytes(k) + if hp.maxLanes > 0 { + return h % uint64(hp.maxLanes) } - return 0 + return h } func hashBytes(b []byte) uint64 { @@ -367,6 +381,14 @@ func hashBytes(b []byte) uint64 { return h.Sum64() } +// hashString hashes s without allocating a []byte copy, using the hash's +// io.Writer-free string path. +func hashString(s string) uint64 { + h := fnv.New64a() + _, _ = io.WriteString(h, s) + return h.Sum64() +} + // Close begins a graceful drain: the running Hopper stops fetching, finishes and // settles in-flight messages, and Run returns. Close is idempotent and never // blocks on the drain; it signals, and Run completes the drain. diff --git a/source/hopper_test.go b/source/hopper_test.go index d0bd156..0a7fe8c 100644 --- a/source/hopper_test.go +++ b/source/hopper_test.go @@ -38,6 +38,40 @@ func TestHopper_SettlesByResult(t *testing.T) { } } +// TestHopper_InProgressAndManualDispositions exercises the two dispositions the +// Counts tally does not bucket: ActionInProgress (extend deadline) and +// ActionManual (handler settled itself). Both must still flow through the +// subscription's Settle so the backend observes the chosen action, even though +// neither is an ack/nak/term. +func TestHopper_InProgressAndManualDispositions(t *testing.T) { + t.Parallel() + tests := []struct { + name string + result source.Result + want source.Action + }{ + {"in_progress", source.InProgress(), source.ActionInProgress}, + {"manual", source.Manual(), source.ActionManual}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + h := memsource.NewHarness(t, nil, memsource.Msg{Key: "k", Value: []byte("v")}) + h.Run(func(context.Context, source.Message) source.Result { return tt.result }) + // Neither is counted as ack/nak/term/drop. + h.AssertCounts(memsource.Counts{}) + // But the disposition still reached the subscription's Settle. + entries := h.Ledger().Entries() + if len(entries) != 1 { + t.Fatalf("settled %d messages, want 1", len(entries)) + } + if entries[0].Result.Action != tt.want { + t.Fatalf("settled action = %v, want %v", entries[0].Result.Action, tt.want) + } + }) + } +} + func TestHopper_PerKeyInOrder(t *testing.T) { t.Parallel() const perKey = 50 @@ -323,6 +357,32 @@ func TestHopper_ReceiveSubscribesAndRuns(t *testing.T) { } } +// TestHopper_MaxLanesFoldsKeys verifies that WithMaxLanes bounds the lane set: +// with a single lane, many distinct keys are folded onto it and every message is +// still delivered exactly once and acked. The bound caps goroutines without +// dropping or reordering work. +func TestHopper_MaxLanesFoldsKeys(t *testing.T) { + t.Parallel() + const keys = 50 + msgs := make([]memsource.Msg, keys) + for i := range msgs { + msgs[i] = memsource.Msg{Key: fmt.Sprintf("key-%d", i), Value: []byte("v")} + } + h := memsource.NewHarness(t, + []source.Option{source.WithConcurrency(8), source.WithMaxLanes(1)}, + msgs..., + ) + var handled int32 + h.Run(func(context.Context, source.Message) source.Result { + atomic.AddInt32(&handled, 1) + return source.Ack() + }) + if int(handled) != keys { + t.Fatalf("handled = %d, want %d (every folded key delivered once)", handled, keys) + } + h.AssertCounts(memsource.Counts{Acked: keys}) +} + func TestHopper_FetchErrorPropagates(t *testing.T) { t.Parallel() wantErr := errors.New("subscription broke") diff --git a/source/options.go b/source/options.go index f3b4c57..869e93f 100644 --- a/source/options.go +++ b/source/options.go @@ -9,6 +9,14 @@ import ( "github.com/stablekernel/crucible/telemetry" ) +// defaultMaxLanes bounds the number of ordered lanes (and their goroutines and +// map entries) a Hopper keeps alive, so a stream with unbounded key cardinality +// cannot grow the lane set without limit for the run's lifetime. Distinct keys +// that exceed the bound share a lane by hash, which only serializes unrelated +// keys; it never reorders messages that share a key. The default is generous +// enough that typical key counts each get their own lane. +const defaultMaxLanes = 4096 + // config holds a Hopper's resolved seams. Every field has a no-op default so a // zero-option Hopper is fully functional and silent. type config struct { @@ -20,6 +28,7 @@ type config struct { middleware []Middleware concurrency int maxInFlight int + maxLanes int // batch holds the batch-mode tuning; batch.enabled is false unless WithBatch // is supplied, in which case RunBatch/ReceiveBatch accumulate per lane. @@ -48,6 +57,7 @@ func defaultConfig() config { meter: telemetry.NopMeter(), concurrency: 1, maxInFlight: 0, + maxLanes: defaultMaxLanes, batch: batchConfig{ now: time.Now, }, @@ -146,6 +156,24 @@ func WithConcurrency(n int) Option { } } +// WithMaxLanes bounds the number of ordered lanes the Hopper keeps alive, and +// with them the per-lane goroutines and map entries. A stream whose key +// cardinality is unbounded over the run's lifetime (a per-request id, a +// per-session token) would otherwise accumulate one lane and one goroutine per +// distinct key for as long as the run lasts; this caps that set. When the number +// of distinct keys exceeds n, keys are folded onto lanes by hash, which only +// serializes the unrelated keys that collide and never reorders messages that +// share a key. The default is a generous fixed bound (see defaultMaxLanes); set +// it lower to cap goroutines tightly, or higher when key cardinality is large +// but bounded. A value < 1 is ignored. +func WithMaxLanes(n int) Option { + return func(c *config) { + if n >= 1 { + c.maxLanes = n + } + } +} + // WithMaxInFlight bounds the number of messages delivered but not yet settled. // When the window is full the fetch loop blocks, applying backpressure to the // subscription. The default (0) is unbounded. A value < 0 is ignored. diff --git a/source/retry/retry.go b/source/retry/retry.go index 33b9316..275dcc3 100644 --- a/source/retry/retry.go +++ b/source/retry/retry.go @@ -11,8 +11,7 @@ // The attempt count is carried on a typed context value ([WithAttempt]/[Attempt]) // rather than a magic-string header, and the next attempt is propagated to inner // middleware (notably source/dlq) by the same channel. The backoff schedule is -// fully config-driven through functional options; nothing is hardcoded. A clock -// is injected for deterministic tests. +// fully config-driven through functional options; nothing is hardcoded. package retry import ( @@ -24,14 +23,6 @@ import ( "github.com/stablekernel/crucible/source" ) -// AttemptHeader is the typed header key under which the retry middleware stamps -// the current attempt number when it produces a redelivery [source.Result] whose -// backoff a downstream backend cannot itself thread back through context (the -// attempt is also carried on the context via [WithAttempt]). It is a named -// constant, not an inline magic string, so dead-letter middleware reads the same -// key the retry middleware writes. -const AttemptHeader = "crucible-retry-attempt" - // attemptKey is the unexported context key type the attempt count travels on, so // it never collides with another package's context values. type attemptKey struct{} @@ -99,7 +90,6 @@ func (b expBackoff) Delay(attempt int) time.Duration { type config struct { maxAttempts int backoff Backoff - now func() time.Time } func defaultConfig() config { @@ -112,7 +102,6 @@ func defaultConfig() config { jitter: true, randF: rand.Float64, }, - now: time.Now, } } @@ -162,21 +151,12 @@ func WithBackoffFunc(b Backoff) Option { } } -// WithClock injects the clock the middleware reads, for deterministic tests. It -// is currently used only where a future-facing schedule needs the current time; -// the backoff itself is relative. The default is time.Now. A nil clock is -// ignored. -func WithClock(now func() time.Time) Option { - return func(c *config) { - if now != nil { - c.now = now - } - } -} - // WithJitterSource injects the [0,1) random source used for full jitter, so a -// test can make a jittered schedule deterministic. It applies to the default and -// [WithBackoff] schedules. A nil source is ignored. +// test can make a jittered schedule deterministic. It applies to the exponential +// schedules this package builds (the default and [WithBackoff]); it has no effect +// on a custom [Backoff] installed with [WithBackoffFunc], which owns its own +// randomness. Order matters: apply WithJitterSource after the WithBackoff it +// should patch. A nil source is ignored. func WithJitterSource(randF func() float64) Option { return func(c *config) { if randF == nil { diff --git a/source/retry/retry_test.go b/source/retry/retry_test.go index f17c086..4a93f44 100644 --- a/source/retry/retry_test.go +++ b/source/retry/retry_test.go @@ -214,7 +214,6 @@ func TestOptions_IgnoreInvalid(t *testing.T) { retry.WithBackoff(0, time.Minute, 2.0, false), // ignored (base<=0) retry.WithBackoff(-1, time.Minute, 0.5, false), // ignored (factor<1) retry.WithBackoffFunc(nil), // ignored - retry.WithClock(nil), // ignored retry.WithJitterSource(nil), // ignored retry.WithBackoff(time.Second, time.Minute, 2.0, false), // applies ) @@ -228,12 +227,22 @@ func TestOptions_IgnoreInvalid(t *testing.T) { } } -func TestWithClock_Accepted(t *testing.T) { +// TestWithJitterSource_DeterministicSchedule confirms the injected jitter source +// makes the exponential schedule deterministic: a source returning a fixed +// fraction scales the computed delay by exactly that fraction. +func TestWithJitterSource_DeterministicSchedule(t *testing.T) { t.Parallel() - clock := func() time.Time { return time.Unix(0, 0) } - mw := retry.Middleware(retry.WithClock(clock)) - h := mw(func(_ context.Context, _ source.Message) source.Result { return source.Ack() }) - if got := h(context.Background(), stubMsg{}); got.Action != source.ActionAck { - t.Fatalf("action = %v, want ack", got.Action) + mw := retry.Middleware( + retry.WithBackoff(time.Second, time.Minute, 2.0, true), + retry.WithJitterSource(func() float64 { return 0.5 }), + ) + h := mw(func(_ context.Context, _ source.Message) source.Result { return source.Nak(errBoom) }) + // Attempt 1: base*factor^0 = 1s, scaled by jitter 0.5 -> 500ms. + got := h(retry.WithAttempt(context.Background(), 1), stubMsg{}) + if got.Action != source.ActionNak { + t.Fatalf("action = %v, want nak", got.Action) + } + if got.Requeue != 500*time.Millisecond { + t.Fatalf("delay = %v, want 500ms (1s scaled by 0.5 jitter)", got.Requeue) } } From fc57996ea127f1f5dc5a1b7d423eeb29130d5ce8 Mon Sep 17 00:00:00 2001 From: Joshua Temple Date: Fri, 5 Jun 2026 10:13:53 -0400 Subject: [PATCH 3/6] fix(source/statemachine): honest dedup docs, DriveTx mode, empty-id signal DriveFunc's godoc dropped the broken [Deduper] link for concrete guidance (use Drive or the idempotency middleware). doc.go now documents the transactional DriveTx mode alongside the durable and stateless modes. Drive and DriveTx mark the span with statemachine.exactly_once=false when a message yields no event id, so the silent loss of dedup is visible in traces rather than passing unnoticed; EventID's godoc explains it. Add a DriveTx Save-failure test covering the in-transaction persist-error abort. Signed-off-by: Joshua Temple --- source/statemachine/doc.go | 10 ++++++++-- source/statemachine/drive.go | 4 ++++ source/statemachine/drivefunc.go | 8 +++++--- source/statemachine/drivetx.go | 4 ++++ source/statemachine/drivetx_test.go | 26 ++++++++++++++++++++++++++ source/statemachine/options.go | 6 ++++++ 6 files changed, 53 insertions(+), 5 deletions(-) diff --git a/source/statemachine/doc.go b/source/statemachine/doc.go index 6a76035..5eb5b95 100644 --- a/source/statemachine/doc.go +++ b/source/statemachine/doc.go @@ -42,10 +42,16 @@ // // # Modes // -// Two binding modes share the same outcomes: +// Three binding modes share the same outcomes: // // - Durable: [Drive] loads and saves each instance through a [Store], -// persisting the transition before acking. This is the exactly-once path. +// persisting the transition before acking. Redelivery is deduplicated by the +// persisted event id (exactly-once into the machine, at-least-once delivery). +// - Transactional: [DriveTx] runs the durable path inside a +// [source.Transactional] consume-process-produce transaction (Kafka EOS), so +// the records a transition emits and the consumed offset commit as one atomic +// unit. It is the exactly-once-into-a-sink path; use it only on a backend that +// satisfies [source.Transactional]. // - Stateless: [DriveFunc] fires each message against a caller-supplied // function with no persistence, for sources that drive a transient or // externally-owned machine. diff --git a/source/statemachine/drive.go b/source/statemachine/drive.go index f7911b2..6fa6258 100644 --- a/source/statemachine/drive.go +++ b/source/statemachine/drive.go @@ -83,6 +83,10 @@ func driveOn[K comparable, E comparable, C any]( } eventID := cfg.eventID(m) + // Signal when the message carries no event id: dedup is impossible, so a + // redelivery would re-fire. Surfacing it on the span makes the silent loss + // of exactly-once visible in traces (supply WithEventID to fix it). + span.SetAttributes(telemetry.Bool("statemachine.exactly_once", eventID != "")) if ok && eventID != "" && rec.LastEventID == eventID { // Exactly-once into the machine: this id was already folded into the // persisted version, so re-firing would double-apply. Ack and discard. diff --git a/source/statemachine/drivefunc.go b/source/statemachine/drivefunc.go index 1dbc2d8..9533afe 100644 --- a/source/statemachine/drivefunc.go +++ b/source/statemachine/drivefunc.go @@ -29,9 +29,11 @@ type FireFunc[K comparable, E comparable] func(ctx context.Context, key K, event // where there is no durable [Store] to commit against. // // The emit hand-off, state-aware rejection, and the ack outcome match [Drive], -// minus the load/save and minus version idempotency (a stateless binding has no -// persisted version to dedup against, so redelivery re-fires; supply a [Deduper] -// or use [Drive] for exactly-once): +// minus the load/save and minus version idempotency: a stateless binding has no +// persisted version to dedup against, so a redelivery re-fires. For idempotent +// redelivery either use [Drive], whose persisted version skips an already-applied +// event id, or add the source/idempotency middleware to the Hopper to suppress +// duplicates before they reach this handler. // // - Route/decode failure → [source.Term] (poison). // - Fire rejected as illegal for the current state → [source.Reject] diff --git a/source/statemachine/drivetx.go b/source/statemachine/drivetx.go index b7943dc..dd0d836 100644 --- a/source/statemachine/drivetx.go +++ b/source/statemachine/drivetx.go @@ -118,6 +118,10 @@ func DriveTx[K comparable, E comparable, C any]( } eventID := cfg.eventID(m) + // Signal when the message carries no event id: without one the version + // dedup that covers a broker abort after Save cannot fire, so a redelivery + // would re-fire. Surface it on the span so the lost guarantee is visible. + span.SetAttributes(telemetry.Bool("statemachine.exactly_once", eventID != "")) if loaded && eventID != "" && rec.LastEventID == eventID { // Already folded into the persisted version: ack as a no-op. This is a // plain offset advance, not a transactional produce, so it is settled by diff --git a/source/statemachine/drivetx_test.go b/source/statemachine/drivetx_test.go index b22c403..e033678 100644 --- a/source/statemachine/drivetx_test.go +++ b/source/statemachine/drivetx_test.go @@ -150,6 +150,32 @@ func TestDriveTx_EmitFailureAbortsTransaction(t *testing.T) { } } +func TestDriveTx_SaveFailureAbortsTransaction(t *testing.T) { + t.Parallel() + m := buildTurnstile() + // A store that fires cleanly but fails to persist: Save runs inside the + // transaction, so its error must abort the transaction (nothing committed) + // and nak the message for redelivery. + store := &flakyStore{saveErr: errors.New("persist exploded")} + + tx := &fakeTx{} + sub := &fakeTransactional{tx: tx, committed: true} + h := statemachine.DriveTx[turnstileState, turnstileEvent, *turnstile]( + m, store, routeFunded, sub, statemachine.TxSinkFunc(emitOpenedToTopic), + ) + + res := h(context.Background(), msg("evt-1", "c1")) + if res.Action != source.ActionNak { + t.Fatalf("action = %v, want nak on save failure", res.Action) + } + if !errors.Is(res.Err, store.saveErr) { + t.Fatalf("nak err = %v, want wrapped save error", res.Err) + } + if got, want := sub.calls, []string{"begin", "abort"}; !equalCalls(got, want) { + t.Fatalf("call order = %v, want %v (abort on save failure)", got, want) + } +} + func TestDriveTx_RedeliveryIsSkip_NotTransactional(t *testing.T) { t.Parallel() m := buildTurnstile() diff --git a/source/statemachine/options.go b/source/statemachine/options.go index 4801622..8519cb9 100644 --- a/source/statemachine/options.go +++ b/source/statemachine/options.go @@ -43,6 +43,12 @@ func (discardSink) Emit(context.Context, any) error { return nil } // "message-id" header ([DefaultEventIDHeader]) and falls back to the message // [source.Cursor] string; override it with [WithEventID] to read a different // header or derive an id from the decoded value. +// +// An empty id disables dedup for that message: with no id to compare against the +// persisted LastEventID, a redelivery re-fires the transition. The bindings +// surface this on their span (the "statemachine.exactly_once" attribute is false) +// so a message stream that yields no id is visible in traces rather than silently +// losing the guarantee. Return a non-empty stable id to keep exactly-once. type EventID func(m source.Message) string // DefaultEventIDHeader is the header [DefaultEventID] reads a message's From 5756e5fae04789af0101b823779dfb6ed18c5c50 Mon Sep 17 00:00:00 2001 From: Joshua Temple Date: Fri, 5 Jun 2026 10:15:21 -0400 Subject: [PATCH 4/6] fix(source/kafka): reject a second Subscribe on a transactional inlet A transactional inlet is backed by one GroupTransactSession that fences a single consumer; a second Subscribe could not share it and would silently return a subscription with no transact session (Begin would then report not-transactional). Subscribe now rejects the second call with a sentinel error. Add coverage for the transactional option assembly and the guard. Signed-off-by: Joshua Temple --- source/kafka/adapter_internal_test.go | 66 +++++++++++++++++++++++++++ source/kafka/kafka.go | 14 ++++++ 2 files changed, 80 insertions(+) diff --git a/source/kafka/adapter_internal_test.go b/source/kafka/adapter_internal_test.go index b4fa130..cff0227 100644 --- a/source/kafka/adapter_internal_test.go +++ b/source/kafka/adapter_internal_test.go @@ -80,6 +80,72 @@ func TestWithTransactionalEmptyIDIgnored(t *testing.T) { } } +func TestTransactOptsAssemblesEOSSession(t *testing.T) { + t.Parallel() + + in, err := New( + WithSeedBrokers("localhost:9092"), + WithTransactional("orders-eos-v1"), + ) + if err != nil { + t.Fatalf("New() error = %v", err) + } + sc := source.SubscribeConfig{Topics: []string{"orders"}, Group: "svc"} + + // transactOpts must produce a usable option set: a GroupTransactSession built + // from it constructs without error (no dial happens at construction). This + // exercises the transactional option-assembly path end to end. + opts := in.transactOpts(sc) + if len(opts) == 0 { + t.Fatal("transactOpts returned no options") + } + sess, err := kgo.NewGroupTransactSession(opts...) + if err != nil { + t.Fatalf("NewGroupTransactSession(transactOpts) error = %v", err) + } + t.Cleanup(sess.Close) + + // The non-transactional consumeOpts must differ (no transactional ID), proving + // the transactional path is a distinct assembly, not the plain consume path. + if len(in.consumeOpts(sc)) == len(opts) { + // Lengths can legitimately match; the meaningful assertion is that the + // transactional session built, which a plain consume option set cannot do + // without a transactional ID. Construction success above is the gate. + t.Log("consumeOpts and transactOpts have equal length; session build is the real assertion") + } +} + +func TestSubscribeTransactionalRejectsSecondCall(t *testing.T) { + t.Parallel() + + in, err := New( + WithSeedBrokers("localhost:9092"), + WithTransactional("orders-eos-v1"), + ) + if err != nil { + t.Fatalf("New() error = %v", err) + } + + // Model "already subscribed": a transactional inlet holds a single + // GroupTransactSession after its first Subscribe. Building one directly (no + // dial at construction) lets the test reach the guard without a live broker. + sess, err := kgo.NewGroupTransactSession(in.transactOpts(source.SubscribeConfig{ + Topics: []string{"orders"}, Group: "svc", + })...) + if err != nil { + t.Fatalf("NewGroupTransactSession error = %v", err) + } + t.Cleanup(sess.Close) + in.transactSess = sess + in.client = sess.Client() + in.ownsClient = true + + _, err = in.Subscribe(context.Background(), source.SubscribeConfig{Topics: []string{"orders"}}) + if !errors.Is(err, errTransactionalSingleSubscribe) { + t.Fatalf("second Subscribe on transactional inlet = %v, want errTransactionalSingleSubscribe", err) + } +} + func TestSubscribeRejectsNoTopics(t *testing.T) { t.Parallel() diff --git a/source/kafka/kafka.go b/source/kafka/kafka.go index 04bfcda..c0cac48 100644 --- a/source/kafka/kafka.go +++ b/source/kafka/kafka.go @@ -69,6 +69,12 @@ var ErrNoSeedBrokers = errors.New("source/kafka: no seed brokers configured") // [WithDLQTopic]. Match it with errors.Is. var ErrNoDLQTopic = errors.New("source/kafka: term requested but no dead-letter topic configured") +// errTransactionalSingleSubscribe reports a second [Inlet.Subscribe] on a +// transactional inlet. The exactly-once session backing a transactional inlet +// fences a single consumer, so only one subscription per transactional inlet is +// valid; open a fresh inlet for a second consumer. Match it with errors.Is. +var errTransactionalSingleSubscribe = errors.New("source/kafka: a transactional inlet allows only one Subscribe") + // config holds an [Inlet]'s resolved settings before a client is built. Every // field has a zero-value default; the only requirement is at least one seed // broker (or an injected client). @@ -339,6 +345,14 @@ func (in *Inlet) Subscribe(_ context.Context, cfg source.SubscribeConfig) (sourc return nil, fmt.Errorf("source/kafka: subscribe: %w", errors.New("at least one topic required")) } + // A transactional inlet is backed by a single GroupTransactSession that can + // fence exactly one consumer; a second Subscribe cannot share it, and a + // second subscription would silently come up without a transact session + // (Begin would report not-transactional). Reject it loudly instead. + if in.cfg.transact && in.transactSess != nil { + return nil, fmt.Errorf("source/kafka: %w", errTransactionalSingleSubscribe) + } + sub := &subscription{ dlqTopic: in.cfg.dlqTopic, maxPoll: in.cfg.maxPoll, From 20def47f2e6878df4ef8ffb83a7b06ca30191465 Mon Sep 17 00:00:00 2001 From: Joshua Temple Date: Fri, 5 Jun 2026 10:16:47 -0400 Subject: [PATCH 5/6] perf(source/jetstream): defer header materialization; docs(cloudevents): add README and doc.go jetstream newMessage no longer eagerly builds the header slice; Headers materializes and caches it on first read, so a message whose headers are never read pays no per-message header allocation. Add a README and a doc.go for source/cloudevents, the only source adapter that lacked both; the package comment moves to doc.go to match the suite's documentation layout. Signed-off-by: Joshua Temple --- source/cloudevents/README.md | 46 +++++++++++++++++++++++++++++++ source/cloudevents/cloudevents.go | 34 ----------------------- source/cloudevents/doc.go | 37 +++++++++++++++++++++++++ source/jetstream/message.go | 45 ++++++++++++++++++------------ 4 files changed, 111 insertions(+), 51 deletions(-) create mode 100644 source/cloudevents/README.md create mode 100644 source/cloudevents/doc.go diff --git a/source/cloudevents/README.md b/source/cloudevents/README.md new file mode 100644 index 0000000..ba56865 --- /dev/null +++ b/source/cloudevents/README.md @@ -0,0 +1,46 @@ +# source/cloudevents + +A [`crucible/source`](../) codec that decodes inbound messages into +[CloudEvents](https://cloudevents.io). Runtime dependency: +[`github.com/cloudevents/sdk-go/v2`](https://github.com/cloudevents/sdk-go). + +```go +reg := source.NewRegistry(). + SetDefault(cloudevents.New()). // binary mode by content type + Register(cloudevents.StructuredContentType, cloudevents.New()) // structured JSON + +ev, order, err := cloudevents.DecodeData[Order](reg, m) +``` + +Construct a `Codec` with `New` and register it on a `source.Registry` under the +content types you accept (or as the registry default). There is no package-level +format registration: every codec is instance-scoped, so two codecs in one +process never share mutable state. + +## Content modes + +The CloudEvents spec defines two ways an event rides a transport, and this codec +accepts both, selecting by the message's content type (see `Detect`): + +- **Structured** — the whole event (attributes and data) is one JSON document in + the body, carried under `application/cloudevents+json` + (`StructuredContentType`). +- **Binary** — the event's attributes ride as `ce-`-prefixed headers and the data + is the raw body, with the body's own media type in the `datacontenttype` + header (or the message's content type). + +A content type whose media type begins with `application/cloudevents` decodes as +structured; anything else decodes as binary. + +## Decoded value + +`Decode` yields a `cloudevents.Event` (the SDK's canonical event). Recover it +from a handler with `EventOf`, or in one call with `DecodeEvent`. Decode the data +payload into a concrete type with `DataAs`, or use the generic `DecodeData[T]` to +decode and project in one step. Extension attributes are surfaced through the +core `source.Headers` (see `Extensions`, prefixed with `ExtensionHeaderPrefix`) +so a handler reads them the same way it reads any other inbound metadata. + +## Stability + +Experimental (pre-v1); the API may change until the suite locks v1.0.0. diff --git a/source/cloudevents/cloudevents.go b/source/cloudevents/cloudevents.go index f3a7bf3..5556d89 100644 --- a/source/cloudevents/cloudevents.go +++ b/source/cloudevents/cloudevents.go @@ -1,39 +1,5 @@ // SPDX-License-Identifier: Apache-2.0 -// Package cloudevents is a source codec that decodes inbound messages into -// CloudEvents. It plugs into a source.Registry as an instance-scoped -// [source.Codec]: construct one with [New] and register it under the content -// types you accept; there is no global format registration (the CloudEvents -// SDK's package-level format registry is deliberately not used, so two codecs -// in one process never share mutable state). -// -// # Content modes -// -// The CloudEvents spec defines two ways an event rides a transport, and this -// codec accepts both, selecting between them by the message's content type: -// -// - Structured mode: the entire event — attributes and data — is one JSON -// document in the body, carried under "application/cloudevents+json". -// - Binary mode: the event's attributes ride as "ce-"-prefixed headers and -// the data is the raw body, with the body's own media type in the -// "datacontenttype" header (or the message's content-type). -// -// A content type whose media type begins with "application/cloudevents" (the -// structured prefix) decodes as structured; anything else decodes as binary. -// See [Detect]. -// -// # Decoded value -// -// Decode yields a [cloudevents.Event] (the SDK's canonical event). Recover it -// from a handler with [EventOf], and decode its data payload into a concrete -// type with [DataAs] or the generic [DecodeData] helper. Extension attributes -// are surfaced through the core [source.Headers] (see [Extensions]) rather than -// a magic-string map, so a handler reads them the same way it reads any other -// inbound metadata. -// -// # Stability -// -// Experimental (pre-v1); the API may change until the suite locks v1.0.0. package cloudevents import ( diff --git a/source/cloudevents/doc.go b/source/cloudevents/doc.go new file mode 100644 index 0000000..ad9e28e --- /dev/null +++ b/source/cloudevents/doc.go @@ -0,0 +1,37 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Package cloudevents is a source codec that decodes inbound messages into +// CloudEvents. It plugs into a source.Registry as an instance-scoped +// [source.Codec]: construct one with [New] and register it under the content +// types you accept; there is no global format registration (the CloudEvents +// SDK's package-level format registry is deliberately not used, so two codecs +// in one process never share mutable state). +// +// # Content modes +// +// The CloudEvents spec defines two ways an event rides a transport, and this +// codec accepts both, selecting between them by the message's content type: +// +// - Structured mode: the entire event — attributes and data — is one JSON +// document in the body, carried under "application/cloudevents+json". +// - Binary mode: the event's attributes ride as "ce-"-prefixed headers and +// the data is the raw body, with the body's own media type in the +// "datacontenttype" header (or the message's content-type). +// +// A content type whose media type begins with "application/cloudevents" (the +// structured prefix) decodes as structured; anything else decodes as binary. +// See [Detect]. +// +// # Decoded value +// +// Decode yields a [cloudevents.Event] (the SDK's canonical event). Recover it +// from a handler with [EventOf], and decode its data payload into a concrete +// type with [DataAs] or the generic [DecodeData] helper. Extension attributes +// are surfaced through the core [source.Headers] (see [Extensions]) rather than +// a magic-string map, so a handler reads them the same way it reads any other +// inbound metadata. +// +// # Stability +// +// Experimental (pre-v1); the API may change until the suite locks v1.0.0. +package cloudevents diff --git a/source/jetstream/message.go b/source/jetstream/message.go index 811c31a..e450225 100644 --- a/source/jetstream/message.go +++ b/source/jetstream/message.go @@ -21,29 +21,25 @@ func (c seqCursor) String() string { return strconv.FormatUint(uint64(c), 10) } // message adapts a jetstream.Msg onto [source.Message]. The vendor message is // reachable only through As; the neutral accessors expose the common surface. // PartitionKey is always "" because JetStream has no partitions, so the Hopper -// shards by Key (the KeyHeader value, falling back to the subject). +// shards by Key (the KeyHeader value, falling back to the subject). The header +// slice is built lazily on the first [message.Headers] call and cached, so a +// message whose headers a handler never reads pays no per-message header +// allocation. type message struct { msg jetstream.Msg headers source.Headers + built bool // headers has been materialized key []byte cursor seqCursor } -// newMessage wraps a jetstream.Msg, eagerly snapshotting its headers, key, and -// stream sequence so the neutral view is a stable value independent of later -// driver state. A metadata error (a non-JetStream message) leaves the cursor -// zero rather than failing the read. +// newMessage wraps a jetstream.Msg, snapshotting its key and stream sequence so +// the neutral view is stable independent of later driver state. A metadata error +// (a non-JetStream message) leaves the cursor zero rather than failing the read. +// Headers are deferred to the first [message.Headers] call. func newMessage(m jetstream.Msg) *message { - hdr := m.Headers() - headers := make(source.Headers, 0, len(hdr)) - for k, vs := range hdr { - for _, v := range vs { - headers = append(headers, source.Header{Key: k, Value: v}) - } - } - key := []byte(m.Subject()) - if v := hdr.Get(KeyHeader); v != "" { + if v := m.Headers().Get(KeyHeader); v != "" { key = []byte(v) } @@ -52,7 +48,7 @@ func newMessage(m jetstream.Msg) *message { cursor = seqCursor(md.Sequence.Stream) } - return &message{msg: m, headers: headers, key: key, cursor: cursor} + return &message{msg: m, key: key, cursor: cursor} } // Key returns the routing key: the KeyHeader value when set, otherwise the @@ -63,8 +59,23 @@ func (m *message) Key() []byte { return m.key } // Value returns the raw payload bytes. func (m *message) Value() []byte { return m.msg.Data() } -// Headers returns the message metadata as a value-type slice. -func (m *message) Headers() source.Headers { return m.headers } +// Headers returns the message metadata as a value-type slice, materializing it +// on first call and caching it so repeated reads are cheap. The order follows +// the NATS header map iteration and is not significant; a handler keys by name. +func (m *message) Headers() source.Headers { + if !m.built { + hdr := m.msg.Headers() + headers := make(source.Headers, 0, len(hdr)) + for k, vs := range hdr { + for _, v := range vs { + headers = append(headers, source.Header{Key: k, Value: v}) + } + } + m.headers = headers + m.built = true + } + return m.headers +} // Subject returns the subject the message arrived on. func (m *message) Subject() string { return m.msg.Subject() } From 27dea065b37c5e9973226b7ece2e80bc1af9eff0 Mon Sep 17 00:00:00 2001 From: Joshua Temple Date: Fri, 5 Jun 2026 10:31:20 -0400 Subject: [PATCH 6/6] test: fix err shadow in redis lag test Signed-off-by: Joshua Temple --- source/redis/redis_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/redis/redis_test.go b/source/redis/redis_test.go index 6e01a5e..1b47e07 100644 --- a/source/redis/redis_test.go +++ b/source/redis/redis_test.go @@ -657,13 +657,16 @@ func TestNak_RequeueRaisesPerEntryFloor(t *testing.T) { t.Fatalf("Next() error = %v", err) } // Nak with a Requeue (10s) far larger than minIdle: the entry's floor is now+10s. - if err := sub.Settle(context.Background(), m, source.NakAfter(10*time.Second, errors.New("retry"))); err != nil { + err = sub.Settle(context.Background(), m, source.NakAfter(10*time.Second, errors.New("retry"))) + if err != nil { t.Fatalf("Settle(nak) error = %v", err) } // Before the floor passes, NakRedeliver must hold the entry back. - if n, err := sub.NakRedeliver(context.Background(), 0); err != nil || n != 0 { - t.Fatalf("NakRedeliver() before floor = %d,%v want 0,nil", n, err) + var nBefore int + nBefore, err = sub.NakRedeliver(context.Background(), 0) + if err != nil || nBefore != 0 { + t.Fatalf("NakRedeliver() before floor = %d,%v want 0,nil", nBefore, err) } // Advance past the floor: the entry is now eligible and is reclaimed.