diff --git a/source/batch.go b/source/batch.go index fa14faf..f5afc2e 100644 --- a/source/batch.go +++ b/source/batch.go @@ -346,7 +346,12 @@ func (hp *Hopper) dispatchBatch(ctx context.Context, bh BatchHandler, buf []*del hp.settle(ctx, span, d, r) } + // Surface a count mismatch loudly rather than letting it pass. An under-count + // already terminated each unmatched message as poison above; an over-count + // (more results than messages) silently discards the extra results, so record + // the sentinel here too so the discard is visible in traces, not swallowed. if len(results) != len(live) { + span.RecordError(ErrBatchResultCount) span.SetStatus(telemetry.StatusError, "batch result count mismatch") } } diff --git a/source/batch_test.go b/source/batch_test.go index 32edad6..ae1a3a5 100644 --- a/source/batch_test.go +++ b/source/batch_test.go @@ -217,6 +217,70 @@ func TestHopper_BatchResultCountMismatch(t *testing.T) { } } +// TestHopper_BatchResultOverCount checks that returning more results than the +// batch holds settles every message and does not panic: the extra results are +// discarded, and the mismatch is surfaced (recorded on the span) rather than +// swallowed. +func TestHopper_BatchResultOverCount(t *testing.T) { + t.Parallel() + h := memsource.NewHarness(t, + []source.Option{source.WithBatch(2, 0)}, + memsource.Msg{Key: "k"}, + memsource.Msg{Key: "k"}, + ) + h.RunBatch(func(_ context.Context, ms []source.Message) []source.Result { + // Three results for two messages: the third is extra and must be dropped + // without affecting the two real messages. + return []source.Result{source.Ack(), source.Ack(), source.Ack()} + }) + h.AssertCounts(memsource.Counts{Acked: 2}) +} + +// TestHopper_ReceiveBatchSubscribesAndRuns covers the ReceiveBatch entry point, +// the batch analog of Receive: it subscribes through the inlet, drives the batch +// handler, and settles every message. +func TestHopper_ReceiveBatchSubscribesAndRuns(t *testing.T) { + t.Parallel() + in := memsource.New( + memsource.WithMessages( + memsource.Msg{Key: "k", Value: []byte("a")}, + memsource.Msg{Key: "k", Value: []byte("b")}, + ), + ) + hp := source.New(source.WithBatch(2, 0)) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- hp.ReceiveBatch(ctx, in, source.SubscribeConfig{Topics: []string{"t"}}, + func(_ context.Context, ms []source.Message) []source.Result { + res := make([]source.Result, len(ms)) + for i := range res { + res[i] = source.Ack() + } + return res + }) + }() + + // ReceiveBatch does not auto-close the subscription's stream; cancel to drain + // once the queued messages have been handled. + time.Sleep(50 * time.Millisecond) + cancel() + select { + case err := <-done: + if err != nil { + t.Fatalf("ReceiveBatch returned %v, want nil", err) + } + case <-time.After(2 * time.Second): + t.Fatal("ReceiveBatch did not return") + } + if c := in.Ledger().Counts(); c.Acked != 2 { + t.Fatalf("acked = %d, want 2", c.Acked) + } +} + // TestHopper_BatchDecodeFailureIsolated checks one undecodable message terminates // alone without poisoning its batch-mates. func TestHopper_BatchDecodeFailureIsolated(t *testing.T) { diff --git a/source/capability.go b/source/capability.go index 6243ad3..93cd6f9 100644 --- a/source/capability.go +++ b/source/capability.go @@ -103,13 +103,26 @@ type OrderedDelivery interface { } // Batched is a [Subscription] that can yield and settle messages in batches, -// amortizing per-message overhead. The [Hopper] uses it when present to fetch and -// ack in groups; an unbatched subscription is driven one message at a time. +// amortizing per-message overhead. The [Hopper]'s batch mode (see +// [Hopper.RunBatch]) uses [Batched.NextBatch] to fetch whole batches from the +// backend when this capability is present; an unbatched subscription is fetched +// one message at a time. +// +// Settlement is a separate decision. A [BatchHandler] returns one [Result] per +// message, so the Hopper settles each message by its own Result through +// [Subscription.Settle] rather than collapsing a batch onto a single outcome — +// a slow handler that fails the third message of five must not nak the other +// four. [SettleBatch] is therefore a reserved seam the Hopper does not call: it +// is the one-call settle path for an adapter or a caller that already knows a +// whole batch shares one outcome (a uniform ack after a bulk commit), not a +// substitute for per-message settlement. type Batched interface { // NextBatch returns up to limit messages, blocking for at least one, or // ctx.Err()/ErrDrained as [Subscription.Next] would. NextBatch(ctx context.Context, limit int) ([]Message, error) - // SettleBatch applies r to every message in ms in one call. + // SettleBatch applies the single result r to every message in ms in one call. + // It is for a caller settling a uniform batch directly; the Hopper settles per + // message (one Result each) and does not call it. SettleBatch(ctx context.Context, ms []Message, r Result) error } @@ -175,13 +188,22 @@ type ProducedRecord struct { Headers Headers } -// Deduper is a [Subscription] (or an inlet seam) that suppresses re-delivery of -// an already-processed message by an idempotency key. The no-op default is "no -// deduplication"; the state-machine bridge supplies the machine's state version -// as the key so redelivery is provably idempotent with no external store. +// Deduper suppresses re-delivery of an already-processed message by an +// idempotency key. It is a reserved seam, not a capability the [Hopper] consults +// on its own: the Hopper never type-asserts a subscription to Deduper and never +// calls [Deduper.Seen]. Deduplication is opt-in middleware — wire a Deduper into +// the source/idempotency middleware with its WithDeduper option (it adapts a +// Deduper to the middleware's store via FromDeduper) and add that middleware to +// the Hopper, or call Seen yourself from a handler. The no-op default, with no +// such middleware, is "no deduplication". +// +// Exactly-once into a statechart is provided separately by the source/statemachine +// bridge's version idempotency (a redelivered, already-applied event id is +// skipped against the persisted version); that path does not use this interface. type Deduper interface { - // Seen reports whether key has already been processed (and records it if not), - // so the Hopper can skip a duplicate by acking without re-running the handler. + // Seen reports whether key has already been processed, recording it if not, so + // a caller (the idempotency middleware, or a handler) can skip a duplicate by + // acking without re-running the work. Seen(ctx context.Context, key string) (bool, error) } diff --git a/source/cdc/cdc_test.go b/source/cdc/cdc_test.go index 9e383c3..db9d330 100644 --- a/source/cdc/cdc_test.go +++ b/source/cdc/cdc_test.go @@ -266,6 +266,41 @@ func TestDecodeEvent_WrongTypeIsPoison(t *testing.T) { } } +func TestDecodeEvent_RegistryErrorPropagates(t *testing.T) { + t.Parallel() + + // A registry whose codec fails to decode: DecodeEvent returns the registry's + // (poison-classified) decode error rather than masking it. + registry := source.NewRegistry().SetDefault(source.CodecFunc( + func([]byte, source.Headers) (any, error) { + return nil, errors.New("codec exploded") + }, + )) + _, err := cdc.DecodeEvent(registry, changeMessage{value: []byte(`{"op":"c"}`)}) + if err == nil { + t.Fatal("DecodeEvent error = nil, want registry decode error") + } + if !errors.Is(err, source.ErrPoison) { + t.Fatalf("DecodeEvent error = %v, want poison-classified decode error", err) + } +} + +func TestRawJSON_As_UnmarshalErrorReported(t *testing.T) { + t.Parallel() + + // A create carries an after-image; decoding it into an incompatible shape + // surfaces the json.Unmarshal error (wrapped), not ErrMissingImage. + ev := decode(t, []byte(`{"op":"c","after":{"id":1,"name":"ada"}}`)) + var dst int // the after-image is an object; an int cannot hold it + err := ev.After.As(&dst) + if err == nil { + t.Fatal("RawJSON.As error = nil, want unmarshal error") + } + if errors.Is(err, cdc.ErrMissingImage) { + t.Fatalf("RawJSON.As error = %v, want unmarshal error not ErrMissingImage", err) + } +} + func TestSourceHeaders(t *testing.T) { t.Parallel() diff --git a/source/cloudevents/README.md b/source/cloudevents/README.md new file mode 100644 index 0000000..ba56865 --- /dev/null +++ b/source/cloudevents/README.md @@ -0,0 +1,46 @@ +# source/cloudevents + +A [`crucible/source`](../) codec that decodes inbound messages into +[CloudEvents](https://cloudevents.io). Runtime dependency: +[`github.com/cloudevents/sdk-go/v2`](https://github.com/cloudevents/sdk-go). + +```go +reg := source.NewRegistry(). + SetDefault(cloudevents.New()). // binary mode by content type + Register(cloudevents.StructuredContentType, cloudevents.New()) // structured JSON + +ev, order, err := cloudevents.DecodeData[Order](reg, m) +``` + +Construct a `Codec` with `New` and register it on a `source.Registry` under the +content types you accept (or as the registry default). There is no package-level +format registration: every codec is instance-scoped, so two codecs in one +process never share mutable state. + +## Content modes + +The CloudEvents spec defines two ways an event rides a transport, and this codec +accepts both, selecting by the message's content type (see `Detect`): + +- **Structured** — the whole event (attributes and data) is one JSON document in + the body, carried under `application/cloudevents+json` + (`StructuredContentType`). +- **Binary** — the event's attributes ride as `ce-`-prefixed headers and the data + is the raw body, with the body's own media type in the `datacontenttype` + header (or the message's content type). + +A content type whose media type begins with `application/cloudevents` decodes as +structured; anything else decodes as binary. + +## Decoded value + +`Decode` yields a `cloudevents.Event` (the SDK's canonical event). Recover it +from a handler with `EventOf`, or in one call with `DecodeEvent`. Decode the data +payload into a concrete type with `DataAs`, or use the generic `DecodeData[T]` to +decode and project in one step. Extension attributes are surfaced through the +core `source.Headers` (see `Extensions`, prefixed with `ExtensionHeaderPrefix`) +so a handler reads them the same way it reads any other inbound metadata. + +## Stability + +Experimental (pre-v1); the API may change until the suite locks v1.0.0. diff --git a/source/cloudevents/cloudevents.go b/source/cloudevents/cloudevents.go index f3a7bf3..5556d89 100644 --- a/source/cloudevents/cloudevents.go +++ b/source/cloudevents/cloudevents.go @@ -1,39 +1,5 @@ // SPDX-License-Identifier: Apache-2.0 -// Package cloudevents is a source codec that decodes inbound messages into -// CloudEvents. It plugs into a source.Registry as an instance-scoped -// [source.Codec]: construct one with [New] and register it under the content -// types you accept; there is no global format registration (the CloudEvents -// SDK's package-level format registry is deliberately not used, so two codecs -// in one process never share mutable state). -// -// # Content modes -// -// The CloudEvents spec defines two ways an event rides a transport, and this -// codec accepts both, selecting between them by the message's content type: -// -// - Structured mode: the entire event — attributes and data — is one JSON -// document in the body, carried under "application/cloudevents+json". -// - Binary mode: the event's attributes ride as "ce-"-prefixed headers and -// the data is the raw body, with the body's own media type in the -// "datacontenttype" header (or the message's content-type). -// -// A content type whose media type begins with "application/cloudevents" (the -// structured prefix) decodes as structured; anything else decodes as binary. -// See [Detect]. -// -// # Decoded value -// -// Decode yields a [cloudevents.Event] (the SDK's canonical event). Recover it -// from a handler with [EventOf], and decode its data payload into a concrete -// type with [DataAs] or the generic [DecodeData] helper. Extension attributes -// are surfaced through the core [source.Headers] (see [Extensions]) rather than -// a magic-string map, so a handler reads them the same way it reads any other -// inbound metadata. -// -// # Stability -// -// Experimental (pre-v1); the API may change until the suite locks v1.0.0. package cloudevents import ( diff --git a/source/cloudevents/doc.go b/source/cloudevents/doc.go new file mode 100644 index 0000000..ad9e28e --- /dev/null +++ b/source/cloudevents/doc.go @@ -0,0 +1,37 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Package cloudevents is a source codec that decodes inbound messages into +// CloudEvents. It plugs into a source.Registry as an instance-scoped +// [source.Codec]: construct one with [New] and register it under the content +// types you accept; there is no global format registration (the CloudEvents +// SDK's package-level format registry is deliberately not used, so two codecs +// in one process never share mutable state). +// +// # Content modes +// +// The CloudEvents spec defines two ways an event rides a transport, and this +// codec accepts both, selecting between them by the message's content type: +// +// - Structured mode: the entire event — attributes and data — is one JSON +// document in the body, carried under "application/cloudevents+json". +// - Binary mode: the event's attributes ride as "ce-"-prefixed headers and +// the data is the raw body, with the body's own media type in the +// "datacontenttype" header (or the message's content-type). +// +// A content type whose media type begins with "application/cloudevents" (the +// structured prefix) decodes as structured; anything else decodes as binary. +// See [Detect]. +// +// # Decoded value +// +// Decode yields a [cloudevents.Event] (the SDK's canonical event). Recover it +// from a handler with [EventOf], and decode its data payload into a concrete +// type with [DataAs] or the generic [DecodeData] helper. Extension attributes +// are surfaced through the core [source.Headers] (see [Extensions]) rather than +// a magic-string map, so a handler reads them the same way it reads any other +// inbound metadata. +// +// # Stability +// +// Experimental (pre-v1); the API may change until the suite locks v1.0.0. +package cloudevents diff --git a/source/hopper.go b/source/hopper.go index 5bbfcfc..2df4eb0 100644 --- a/source/hopper.go +++ b/source/hopper.go @@ -25,7 +25,10 @@ import ( // A lane is a single goroutine that processes its queue strictly in arrival // order, so two messages with the same key are never reordered, while distinct // keys run in parallel up to [WithConcurrency]. This is the guarantee a -// statechart instance needs: its events arrive in order. +// statechart instance needs: its events arrive in order. The number of distinct +// lanes (and their goroutines) is bounded by [WithMaxLanes]; keys beyond the +// bound share a lane by hash, which serializes only the unrelated colliding keys +// and never reorders a single key's messages. // // Backpressure. [WithMaxInFlight] bounds the messages delivered but not yet // settled; when the window is full the fetch loop blocks before pulling the next @@ -44,6 +47,7 @@ type Hopper struct { middleware []Middleware concurrency int maxInFlight int + maxLanes int batch batchConfig received telemetry.Counter @@ -76,6 +80,7 @@ func New(opts ...Option) *Hopper { middleware: append([]Middleware(nil), cfg.middleware...), concurrency: cfg.concurrency, maxInFlight: cfg.maxInFlight, + maxLanes: cfg.maxLanes, batch: cfg.batch, received: m.Counter("source.received", telemetry.WithDescription("messages received from the subscription")), acked: m.Counter("source.acked", telemetry.WithDescription("messages acknowledged after successful handling")), @@ -349,16 +354,25 @@ func (hp *Hopper) reportLag(ctx context.Context, sub Subscription) { } // laneKey selects the ordered lane for m: its PartitionKey when non-empty, else -// a hash of its Key, else lane 0 (every keyless message shares one ordered -// lane, preserving global order for them). +// a hash of its Key, else lane 0 (every keyless message shares one ordered lane, +// preserving global order for them). The hash is folded onto the bounded lane +// set (see [WithMaxLanes]) so the number of lanes never exceeds maxLanes; a key +// always maps to the same lane, so folding never reorders a key's messages. func (hp *Hopper) laneKey(m Message) uint64 { - if pk := m.PartitionKey(); pk != "" { - return hashBytes([]byte(pk)) + var h uint64 + switch { + case m.PartitionKey() != "": + // Hash the string directly, avoiding a []byte conversion per message. + h = hashString(m.PartitionKey()) + case len(m.Key()) > 0: + h = hashBytes(m.Key()) + default: + return 0 } - if k := m.Key(); len(k) > 0 { - return hashBytes(k) + if hp.maxLanes > 0 { + return h % uint64(hp.maxLanes) } - return 0 + return h } func hashBytes(b []byte) uint64 { @@ -367,6 +381,14 @@ func hashBytes(b []byte) uint64 { return h.Sum64() } +// hashString hashes s without allocating a []byte copy, using the hash's +// io.Writer-free string path. +func hashString(s string) uint64 { + h := fnv.New64a() + _, _ = io.WriteString(h, s) + return h.Sum64() +} + // Close begins a graceful drain: the running Hopper stops fetching, finishes and // settles in-flight messages, and Run returns. Close is idempotent and never // blocks on the drain; it signals, and Run completes the drain. diff --git a/source/hopper_test.go b/source/hopper_test.go index d0bd156..0a7fe8c 100644 --- a/source/hopper_test.go +++ b/source/hopper_test.go @@ -38,6 +38,40 @@ func TestHopper_SettlesByResult(t *testing.T) { } } +// TestHopper_InProgressAndManualDispositions exercises the two dispositions the +// Counts tally does not bucket: ActionInProgress (extend deadline) and +// ActionManual (handler settled itself). Both must still flow through the +// subscription's Settle so the backend observes the chosen action, even though +// neither is an ack/nak/term. +func TestHopper_InProgressAndManualDispositions(t *testing.T) { + t.Parallel() + tests := []struct { + name string + result source.Result + want source.Action + }{ + {"in_progress", source.InProgress(), source.ActionInProgress}, + {"manual", source.Manual(), source.ActionManual}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + h := memsource.NewHarness(t, nil, memsource.Msg{Key: "k", Value: []byte("v")}) + h.Run(func(context.Context, source.Message) source.Result { return tt.result }) + // Neither is counted as ack/nak/term/drop. + h.AssertCounts(memsource.Counts{}) + // But the disposition still reached the subscription's Settle. + entries := h.Ledger().Entries() + if len(entries) != 1 { + t.Fatalf("settled %d messages, want 1", len(entries)) + } + if entries[0].Result.Action != tt.want { + t.Fatalf("settled action = %v, want %v", entries[0].Result.Action, tt.want) + } + }) + } +} + func TestHopper_PerKeyInOrder(t *testing.T) { t.Parallel() const perKey = 50 @@ -323,6 +357,32 @@ func TestHopper_ReceiveSubscribesAndRuns(t *testing.T) { } } +// TestHopper_MaxLanesFoldsKeys verifies that WithMaxLanes bounds the lane set: +// with a single lane, many distinct keys are folded onto it and every message is +// still delivered exactly once and acked. The bound caps goroutines without +// dropping or reordering work. +func TestHopper_MaxLanesFoldsKeys(t *testing.T) { + t.Parallel() + const keys = 50 + msgs := make([]memsource.Msg, keys) + for i := range msgs { + msgs[i] = memsource.Msg{Key: fmt.Sprintf("key-%d", i), Value: []byte("v")} + } + h := memsource.NewHarness(t, + []source.Option{source.WithConcurrency(8), source.WithMaxLanes(1)}, + msgs..., + ) + var handled int32 + h.Run(func(context.Context, source.Message) source.Result { + atomic.AddInt32(&handled, 1) + return source.Ack() + }) + if int(handled) != keys { + t.Fatalf("handled = %d, want %d (every folded key delivered once)", handled, keys) + } + h.AssertCounts(memsource.Counts{Acked: keys}) +} + func TestHopper_FetchErrorPropagates(t *testing.T) { t.Parallel() wantErr := errors.New("subscription broke") diff --git a/source/jetstream/message.go b/source/jetstream/message.go index 811c31a..e450225 100644 --- a/source/jetstream/message.go +++ b/source/jetstream/message.go @@ -21,29 +21,25 @@ func (c seqCursor) String() string { return strconv.FormatUint(uint64(c), 10) } // message adapts a jetstream.Msg onto [source.Message]. The vendor message is // reachable only through As; the neutral accessors expose the common surface. // PartitionKey is always "" because JetStream has no partitions, so the Hopper -// shards by Key (the KeyHeader value, falling back to the subject). +// shards by Key (the KeyHeader value, falling back to the subject). The header +// slice is built lazily on the first [message.Headers] call and cached, so a +// message whose headers a handler never reads pays no per-message header +// allocation. type message struct { msg jetstream.Msg headers source.Headers + built bool // headers has been materialized key []byte cursor seqCursor } -// newMessage wraps a jetstream.Msg, eagerly snapshotting its headers, key, and -// stream sequence so the neutral view is a stable value independent of later -// driver state. A metadata error (a non-JetStream message) leaves the cursor -// zero rather than failing the read. +// newMessage wraps a jetstream.Msg, snapshotting its key and stream sequence so +// the neutral view is stable independent of later driver state. A metadata error +// (a non-JetStream message) leaves the cursor zero rather than failing the read. +// Headers are deferred to the first [message.Headers] call. func newMessage(m jetstream.Msg) *message { - hdr := m.Headers() - headers := make(source.Headers, 0, len(hdr)) - for k, vs := range hdr { - for _, v := range vs { - headers = append(headers, source.Header{Key: k, Value: v}) - } - } - key := []byte(m.Subject()) - if v := hdr.Get(KeyHeader); v != "" { + if v := m.Headers().Get(KeyHeader); v != "" { key = []byte(v) } @@ -52,7 +48,7 @@ func newMessage(m jetstream.Msg) *message { cursor = seqCursor(md.Sequence.Stream) } - return &message{msg: m, headers: headers, key: key, cursor: cursor} + return &message{msg: m, key: key, cursor: cursor} } // Key returns the routing key: the KeyHeader value when set, otherwise the @@ -63,8 +59,23 @@ func (m *message) Key() []byte { return m.key } // Value returns the raw payload bytes. func (m *message) Value() []byte { return m.msg.Data() } -// Headers returns the message metadata as a value-type slice. -func (m *message) Headers() source.Headers { return m.headers } +// Headers returns the message metadata as a value-type slice, materializing it +// on first call and caching it so repeated reads are cheap. The order follows +// the NATS header map iteration and is not significant; a handler keys by name. +func (m *message) Headers() source.Headers { + if !m.built { + hdr := m.msg.Headers() + headers := make(source.Headers, 0, len(hdr)) + for k, vs := range hdr { + for _, v := range vs { + headers = append(headers, source.Header{Key: k, Value: v}) + } + } + m.headers = headers + m.built = true + } + return m.headers +} // Subject returns the subject the message arrived on. func (m *message) Subject() string { return m.msg.Subject() } diff --git a/source/kafka/adapter_internal_test.go b/source/kafka/adapter_internal_test.go index b4fa130..cff0227 100644 --- a/source/kafka/adapter_internal_test.go +++ b/source/kafka/adapter_internal_test.go @@ -80,6 +80,72 @@ func TestWithTransactionalEmptyIDIgnored(t *testing.T) { } } +func TestTransactOptsAssemblesEOSSession(t *testing.T) { + t.Parallel() + + in, err := New( + WithSeedBrokers("localhost:9092"), + WithTransactional("orders-eos-v1"), + ) + if err != nil { + t.Fatalf("New() error = %v", err) + } + sc := source.SubscribeConfig{Topics: []string{"orders"}, Group: "svc"} + + // transactOpts must produce a usable option set: a GroupTransactSession built + // from it constructs without error (no dial happens at construction). This + // exercises the transactional option-assembly path end to end. + opts := in.transactOpts(sc) + if len(opts) == 0 { + t.Fatal("transactOpts returned no options") + } + sess, err := kgo.NewGroupTransactSession(opts...) + if err != nil { + t.Fatalf("NewGroupTransactSession(transactOpts) error = %v", err) + } + t.Cleanup(sess.Close) + + // The non-transactional consumeOpts must differ (no transactional ID), proving + // the transactional path is a distinct assembly, not the plain consume path. + if len(in.consumeOpts(sc)) == len(opts) { + // Lengths can legitimately match; the meaningful assertion is that the + // transactional session built, which a plain consume option set cannot do + // without a transactional ID. Construction success above is the gate. + t.Log("consumeOpts and transactOpts have equal length; session build is the real assertion") + } +} + +func TestSubscribeTransactionalRejectsSecondCall(t *testing.T) { + t.Parallel() + + in, err := New( + WithSeedBrokers("localhost:9092"), + WithTransactional("orders-eos-v1"), + ) + if err != nil { + t.Fatalf("New() error = %v", err) + } + + // Model "already subscribed": a transactional inlet holds a single + // GroupTransactSession after its first Subscribe. Building one directly (no + // dial at construction) lets the test reach the guard without a live broker. + sess, err := kgo.NewGroupTransactSession(in.transactOpts(source.SubscribeConfig{ + Topics: []string{"orders"}, Group: "svc", + })...) + if err != nil { + t.Fatalf("NewGroupTransactSession error = %v", err) + } + t.Cleanup(sess.Close) + in.transactSess = sess + in.client = sess.Client() + in.ownsClient = true + + _, err = in.Subscribe(context.Background(), source.SubscribeConfig{Topics: []string{"orders"}}) + if !errors.Is(err, errTransactionalSingleSubscribe) { + t.Fatalf("second Subscribe on transactional inlet = %v, want errTransactionalSingleSubscribe", err) + } +} + func TestSubscribeRejectsNoTopics(t *testing.T) { t.Parallel() diff --git a/source/kafka/kafka.go b/source/kafka/kafka.go index 04bfcda..c0cac48 100644 --- a/source/kafka/kafka.go +++ b/source/kafka/kafka.go @@ -69,6 +69,12 @@ var ErrNoSeedBrokers = errors.New("source/kafka: no seed brokers configured") // [WithDLQTopic]. Match it with errors.Is. var ErrNoDLQTopic = errors.New("source/kafka: term requested but no dead-letter topic configured") +// errTransactionalSingleSubscribe reports a second [Inlet.Subscribe] on a +// transactional inlet. The exactly-once session backing a transactional inlet +// fences a single consumer, so only one subscription per transactional inlet is +// valid; open a fresh inlet for a second consumer. Match it with errors.Is. +var errTransactionalSingleSubscribe = errors.New("source/kafka: a transactional inlet allows only one Subscribe") + // config holds an [Inlet]'s resolved settings before a client is built. Every // field has a zero-value default; the only requirement is at least one seed // broker (or an injected client). @@ -339,6 +345,14 @@ func (in *Inlet) Subscribe(_ context.Context, cfg source.SubscribeConfig) (sourc return nil, fmt.Errorf("source/kafka: subscribe: %w", errors.New("at least one topic required")) } + // A transactional inlet is backed by a single GroupTransactSession that can + // fence exactly one consumer; a second Subscribe cannot share it, and a + // second subscription would silently come up without a transact session + // (Begin would report not-transactional). Reject it loudly instead. + if in.cfg.transact && in.transactSess != nil { + return nil, fmt.Errorf("source/kafka: %w", errTransactionalSingleSubscribe) + } + sub := &subscription{ dlqTopic: in.cfg.dlqTopic, maxPoll: in.cfg.maxPoll, diff --git a/source/options.go b/source/options.go index f3b4c57..869e93f 100644 --- a/source/options.go +++ b/source/options.go @@ -9,6 +9,14 @@ import ( "github.com/stablekernel/crucible/telemetry" ) +// defaultMaxLanes bounds the number of ordered lanes (and their goroutines and +// map entries) a Hopper keeps alive, so a stream with unbounded key cardinality +// cannot grow the lane set without limit for the run's lifetime. Distinct keys +// that exceed the bound share a lane by hash, which only serializes unrelated +// keys; it never reorders messages that share a key. The default is generous +// enough that typical key counts each get their own lane. +const defaultMaxLanes = 4096 + // config holds a Hopper's resolved seams. Every field has a no-op default so a // zero-option Hopper is fully functional and silent. type config struct { @@ -20,6 +28,7 @@ type config struct { middleware []Middleware concurrency int maxInFlight int + maxLanes int // batch holds the batch-mode tuning; batch.enabled is false unless WithBatch // is supplied, in which case RunBatch/ReceiveBatch accumulate per lane. @@ -48,6 +57,7 @@ func defaultConfig() config { meter: telemetry.NopMeter(), concurrency: 1, maxInFlight: 0, + maxLanes: defaultMaxLanes, batch: batchConfig{ now: time.Now, }, @@ -146,6 +156,24 @@ func WithConcurrency(n int) Option { } } +// WithMaxLanes bounds the number of ordered lanes the Hopper keeps alive, and +// with them the per-lane goroutines and map entries. A stream whose key +// cardinality is unbounded over the run's lifetime (a per-request id, a +// per-session token) would otherwise accumulate one lane and one goroutine per +// distinct key for as long as the run lasts; this caps that set. When the number +// of distinct keys exceeds n, keys are folded onto lanes by hash, which only +// serializes the unrelated keys that collide and never reorders messages that +// share a key. The default is a generous fixed bound (see defaultMaxLanes); set +// it lower to cap goroutines tightly, or higher when key cardinality is large +// but bounded. A value < 1 is ignored. +func WithMaxLanes(n int) Option { + return func(c *config) { + if n >= 1 { + c.maxLanes = n + } + } +} + // WithMaxInFlight bounds the number of messages delivered but not yet settled. // When the window is full the fetch loop blocks, applying backpressure to the // subscription. The default (0) is unbounded. A value < 0 is ignored. diff --git a/source/redis/message.go b/source/redis/message.go index e9eb193..1fbf51f 100644 --- a/source/redis/message.go +++ b/source/redis/message.go @@ -32,30 +32,29 @@ func (c idCursor) String() string { return string(c) } // message adapts a go-redis XMessage onto [source.Message]. The vendor entry is // reachable only through As; the neutral accessors expose the common surface. -// The fields are snapshotted at construction so the neutral view is a stable -// value independent of later driver state. +// The entry is a value copy taken from go-redis, so the neutral view is stable +// independent of later driver state. The header slice is built lazily on the +// first [message.Headers] call and cached, so a message whose headers a handler +// never reads pays no per-message header allocation. type message struct { entry goredis.XMessage stream string headers source.Headers + built bool // headers has been materialized key []byte value []byte } // newMessage wraps a go-redis XMessage from stream, projecting its fields onto // the neutral surface: the [ValueField] field (when present) becomes the raw -// value, the [KeyHeader] field (when present) becomes the routing key, and every -// field is exposed as a header. The key falls back to the stream name so the -// Hopper always has a deterministic shard key on a backend with no partitions. +// value and the [KeyHeader] field (when present) becomes the routing key. The +// key falls back to the stream name so the Hopper always has a deterministic +// shard key on a backend with no partitions. Headers are deferred to the first +// [message.Headers] call. func newMessage(stream string, entry goredis.XMessage) *message { - headers := make(source.Headers, 0, len(entry.Values)) var value []byte - for k, v := range entry.Values { - s := toString(v) - headers = append(headers, source.Header{Key: k, Value: s}) - if k == ValueField { - value = []byte(s) - } + if v, ok := entry.Values[ValueField]; ok { + value = []byte(toString(v)) } key := []byte(stream) @@ -66,11 +65,10 @@ func newMessage(stream string, entry goredis.XMessage) *message { } return &message{ - entry: entry, - stream: stream, - headers: headers, - key: key, - value: value, + entry: entry, + stream: stream, + key: key, + value: value, } } @@ -99,8 +97,20 @@ func (m *message) Key() []byte { return m.key } // nil. The full set of entry fields remains available through [message.Headers]. func (m *message) Value() []byte { return m.value } -// Headers returns the entry's fields as a value-type slice. -func (m *message) Headers() source.Headers { return m.headers } +// Headers returns the entry's fields as a value-type slice, materializing it on +// first call and caching it so repeated reads are cheap. The order follows +// go-redis's map iteration and is not significant; a handler keys by header name. +func (m *message) Headers() source.Headers { + if !m.built { + headers := make(source.Headers, 0, len(m.entry.Values)) + for k, v := range m.entry.Values { + headers = append(headers, source.Header{Key: k, Value: toString(v)}) + } + m.headers = headers + m.built = true + } + return m.headers +} // Subject returns the stream the entry arrived on. func (m *message) Subject() string { return m.stream } diff --git a/source/redis/redis.go b/source/redis/redis.go index 2d9c5f6..d9a2bdb 100644 --- a/source/redis/redis.go +++ b/source/redis/redis.go @@ -128,6 +128,7 @@ type config struct { block time.Duration count int64 minIdle time.Duration + now func() time.Time } // Option configures an [Inlet] at construction. Options compose; later options @@ -172,6 +173,18 @@ func WithCount(n int64) Option { return func(c *config) { c.count = n } } // default. func WithMinIdle(d time.Duration) Option { return func(c *config) { c.minIdle = d } } +// WithClock injects the clock the subscription reads when it stamps and checks +// the per-entry requeue floor a [source.Result.Requeue] raises (see Settle and +// NakRedeliver). It exists so a test can drive redelivery timing +// deterministically; production leaves it at time.Now. A nil clock is ignored. +func WithClock(now func() time.Time) Option { + return func(c *config) { + if now != nil { + c.now = now + } + } +} + // New builds an [Inlet] from opts. Exactly one of [WithAddr] or [WithClient] // must be supplied, and [WithGroup] and [WithConsumer] are required. When // [WithAddr] is used New dials the client eagerly so misconfiguration surfaces @@ -236,6 +249,7 @@ func (in *Inlet) Subscribe(_ context.Context, cfg source.SubscribeConfig) (sourc block: in.cfg.block, count: in.cfg.count, minIdle: in.cfg.minIdle, + now: in.cfg.now, startID: "0", // create the group at the stream origin by default readFrom: ">", // read new (never-delivered) entries by default }, nil @@ -295,6 +309,25 @@ type subscription struct { pending []source.Message // buffered, fetched-but-not-yet-yielded entries inflight int // entries delivered by Next, not yet settled readFrom string // ">" for new entries; an ID to drain the backlog + + // requeueAfter records the earliest wall-clock time a nak'd entry may be + // reclaimed for redelivery, keyed by entry ID. A Nak whose + // [source.Result.Requeue] exceeds the configured min-idle raises that + // entry's floor; NakRedeliver skips an entry until its floor has passed. + // An entry is removed from the map when it is reclaimed or acked. + requeueAfter map[string]time.Time + // now is the clock NakRedeliver and Nak read, injected so a test can drive + // the per-entry requeue floor deterministically. Nil means time.Now. + now func() time.Time +} + +// clock returns the subscription's injected clock, defaulting to time.Now. The +// caller holds s.mu. +func (s *subscription) clock() func() time.Time { + if s.now != nil { + return s.now + } + return time.Now } // ensureGroup creates the consumer group (with MKSTREAM) once, idempotently @@ -427,8 +460,17 @@ func (s *subscription) Settle(ctx context.Context, m source.Message, r source.Re return s.ack(ctx, entry.ID) case source.ActionNak: // Leave the entry in the PEL; a future pending scan (NakRedeliver) claims - // and redelivers it once it has been idle long enough. The requeue delay - // raises that idle floor when larger than the configured minimum. + // and redelivers it once it has been idle long enough. A requeue delay + // larger than the configured minimum raises this entry's redelivery floor + // so NakRedeliver holds it back until the delay has elapsed. + if r.Requeue > s.minIdle { + s.mu.Lock() + if s.requeueAfter == nil { + s.requeueAfter = make(map[string]time.Time) + } + s.requeueAfter[entry.ID] = s.clock()().Add(r.Requeue) + s.mu.Unlock() + } return nil case source.ActionTerm: if err := s.deadLetter(ctx, m, entry, r); err != nil { @@ -459,6 +501,9 @@ func (s *subscription) ack(ctx context.Context, id string) error { if err := s.client.XAck(ctx, s.stream, s.group, id).Err(); err != nil { return fmt.Errorf("redis: ack %s: %w", id, err) } + s.mu.Lock() + delete(s.requeueAfter, id) + s.mu.Unlock() return nil } @@ -520,10 +565,21 @@ func (s *subscription) NakRedeliver(ctx context.Context, minIdle time.Duration) if len(pend) == 0 { return 0, nil } + // Honor any per-entry requeue floor a Nak raised: an entry whose floor has + // not yet passed is held back from this redelivery cycle. + now := s.clock()() ids := make([]string, 0, len(pend)) + s.mu.Lock() for _, p := range pend { + if until, held := s.requeueAfter[p.ID]; held && now.Before(until) { + continue + } ids = append(ids, p.ID) } + s.mu.Unlock() + if len(ids) == 0 { + return 0, nil + } claimed, err := s.client.XClaim(ctx, &goredis.XClaimArgs{ Stream: s.stream, Group: s.group, @@ -537,6 +593,7 @@ func (s *subscription) NakRedeliver(ctx context.Context, minIdle time.Duration) s.mu.Lock() for _, e := range claimed { s.pending = append(s.pending, newMessage(s.stream, e)) + delete(s.requeueAfter, e.ID) } s.mu.Unlock() return len(claimed), nil @@ -597,6 +654,13 @@ func (s *subscription) SeekToEnd(_ context.Context) error { // next Next yields the replayed entries before resuming live delivery. The // entries are claimed to this consumer (they enter the PEL) so they settle // through the normal ack path. The caller passes a context for the range read. +// +// The closed check and the buffer write straddle the XRANGE call rather than +// holding s.mu across the network round-trip (that would stall a concurrent +// Settle). A Close that lands in that window is benign: reseek still buffers the +// replayed entries, and the next Next drains them and then returns +// [source.ErrDrained], exactly as a Close followed by a drain would. Seek and +// Close are not expected to race in practice; the engine drives one fetch loop. func (s *subscription) reseek(ctx context.Context, fromID string) error { s.mu.Lock() if s.closed { @@ -628,10 +692,11 @@ func (s *subscription) Lag(ctx context.Context) (int64, error) { if err == nil { for _, g := range groups { if g.Name == s.group { - if g.Lag > 0 { - return g.Lag, nil - } - break + // XINFO GROUPS reports the group's lag directly, including zero + // when the group is caught up. Return it as-is: a guard that + // only trusted a positive lag would fall through to XLEN and + // report the full stream length for a fully-consumed group. + return g.Lag, nil } } } diff --git a/source/redis/redis_test.go b/source/redis/redis_test.go index 05629eb..1b47e07 100644 --- a/source/redis/redis_test.go +++ b/source/redis/redis_test.go @@ -116,6 +116,23 @@ func (f *fakeClient) XClaim(ctx context.Context, a *goredis.XClaimArgs) *goredis cmd.SetErr(f.claimErr) return cmd } + // Honor the requested IDs: XClaim only reassigns the entries it is asked for, + // so a test that holds an entry back (its requeue floor not yet passed) sees + // it excluded from the claim arguments and so from the returned set. + if len(a.Messages) > 0 { + want := make(map[string]bool, len(a.Messages)) + for _, id := range a.Messages { + want[id] = true + } + out := make([]goredis.XMessage, 0, len(f.claimOut)) + for _, e := range f.claimOut { + if want[e.ID] { + out = append(out, e) + } + } + cmd.SetVal(out) + return cmd + } cmd.SetVal(f.claimOut) return cmd } @@ -620,6 +637,69 @@ func TestNakRedeliver_ClaimErrorPropagates(t *testing.T) { } } +func TestNak_RequeueRaisesPerEntryFloor(t *testing.T) { + t.Parallel() + // A controllable clock lets the test advance time deterministically across + // the per-entry requeue floor a Nak with a large Requeue raises. + now := time.Unix(1000, 0) + clock := func() time.Time { return now } + + c := &fakeClient{ + readBatches: [][]goredis.XMessage{{entry("1-0", map[string]any{ValueField: "a"})}}, + pendingOut: []goredis.XPendingExt{{ID: "1-0", RetryCount: 1}}, + claimOut: []goredis.XMessage{entry("1-0", map[string]any{ValueField: "a"})}, + } + // minIdle small so only the per-entry Requeue floor gates redelivery. + sub, _ := newSub(t, c, WithMinIdle(time.Millisecond), WithClock(clock)) + + m, err := sub.Next(context.Background()) + if err != nil { + t.Fatalf("Next() error = %v", err) + } + // Nak with a Requeue (10s) far larger than minIdle: the entry's floor is now+10s. + err = sub.Settle(context.Background(), m, source.NakAfter(10*time.Second, errors.New("retry"))) + if err != nil { + t.Fatalf("Settle(nak) error = %v", err) + } + + // Before the floor passes, NakRedeliver must hold the entry back. + var nBefore int + nBefore, err = sub.NakRedeliver(context.Background(), 0) + if err != nil || nBefore != 0 { + t.Fatalf("NakRedeliver() before floor = %d,%v want 0,nil", nBefore, err) + } + + // Advance past the floor: the entry is now eligible and is reclaimed. + now = now.Add(11 * time.Second) + n, err := sub.NakRedeliver(context.Background(), 0) + if err != nil { + t.Fatalf("NakRedeliver() after floor error = %v", err) + } + if n != 1 { + t.Fatalf("NakRedeliver() after floor = %d, want 1", n) + } +} + +func TestNak_NoRequeueLeavesFloorUnraised(t *testing.T) { + t.Parallel() + // A plain Nak (no Requeue, or one below minIdle) raises no floor, so the + // entry redelivers on the next scan governed by minIdle alone. + c := &fakeClient{ + readBatches: [][]goredis.XMessage{{entry("1-0", map[string]any{ValueField: "a"})}}, + pendingOut: []goredis.XPendingExt{{ID: "1-0", RetryCount: 1}}, + claimOut: []goredis.XMessage{entry("1-0", map[string]any{ValueField: "a"})}, + } + sub, _ := newSub(t, c, WithMinIdle(time.Millisecond)) + m, _ := sub.Next(context.Background()) + if err := sub.Settle(context.Background(), m, source.Nak(errors.New("retry"))); err != nil { + t.Fatalf("Settle(nak) error = %v", err) + } + n, err := sub.NakRedeliver(context.Background(), 0) + if err != nil || n != 1 { + t.Fatalf("NakRedeliver() = %d,%v want 1,nil", n, err) + } +} + // --- capabilities: seek (replay by entry ID) -------------------------------- func TestSeek_BuffersBacklog(t *testing.T) { @@ -725,6 +805,24 @@ func TestLag_PrefersGroupLag(t *testing.T) { } } +func TestLag_GroupCaughtUpReportsZero(t *testing.T) { + t.Parallel() + // A caught-up group reports Lag 0 from XINFO GROUPS. The reporter must return + // that zero, not fall through to XLEN and report the full stream length. + c := &fakeClient{ + groups: []goredis.XInfoGroup{{Name: "workers", Lag: 0}}, + xlen: 999, + } + sub, _ := newSub(t, c) + got, err := sub.Lag(context.Background()) + if err != nil { + t.Fatalf("Lag() error = %v", err) + } + if got != 0 { + t.Errorf("Lag() = %d, want 0 (group caught up)", got) + } +} + func TestLag_FallsBackToXLen(t *testing.T) { t.Parallel() c := &fakeClient{groupsErr: errors.New("no groups info"), xlen: 7} diff --git a/source/retry/retry.go b/source/retry/retry.go index 33b9316..275dcc3 100644 --- a/source/retry/retry.go +++ b/source/retry/retry.go @@ -11,8 +11,7 @@ // The attempt count is carried on a typed context value ([WithAttempt]/[Attempt]) // rather than a magic-string header, and the next attempt is propagated to inner // middleware (notably source/dlq) by the same channel. The backoff schedule is -// fully config-driven through functional options; nothing is hardcoded. A clock -// is injected for deterministic tests. +// fully config-driven through functional options; nothing is hardcoded. package retry import ( @@ -24,14 +23,6 @@ import ( "github.com/stablekernel/crucible/source" ) -// AttemptHeader is the typed header key under which the retry middleware stamps -// the current attempt number when it produces a redelivery [source.Result] whose -// backoff a downstream backend cannot itself thread back through context (the -// attempt is also carried on the context via [WithAttempt]). It is a named -// constant, not an inline magic string, so dead-letter middleware reads the same -// key the retry middleware writes. -const AttemptHeader = "crucible-retry-attempt" - // attemptKey is the unexported context key type the attempt count travels on, so // it never collides with another package's context values. type attemptKey struct{} @@ -99,7 +90,6 @@ func (b expBackoff) Delay(attempt int) time.Duration { type config struct { maxAttempts int backoff Backoff - now func() time.Time } func defaultConfig() config { @@ -112,7 +102,6 @@ func defaultConfig() config { jitter: true, randF: rand.Float64, }, - now: time.Now, } } @@ -162,21 +151,12 @@ func WithBackoffFunc(b Backoff) Option { } } -// WithClock injects the clock the middleware reads, for deterministic tests. It -// is currently used only where a future-facing schedule needs the current time; -// the backoff itself is relative. The default is time.Now. A nil clock is -// ignored. -func WithClock(now func() time.Time) Option { - return func(c *config) { - if now != nil { - c.now = now - } - } -} - // WithJitterSource injects the [0,1) random source used for full jitter, so a -// test can make a jittered schedule deterministic. It applies to the default and -// [WithBackoff] schedules. A nil source is ignored. +// test can make a jittered schedule deterministic. It applies to the exponential +// schedules this package builds (the default and [WithBackoff]); it has no effect +// on a custom [Backoff] installed with [WithBackoffFunc], which owns its own +// randomness. Order matters: apply WithJitterSource after the WithBackoff it +// should patch. A nil source is ignored. func WithJitterSource(randF func() float64) Option { return func(c *config) { if randF == nil { diff --git a/source/retry/retry_test.go b/source/retry/retry_test.go index f17c086..4a93f44 100644 --- a/source/retry/retry_test.go +++ b/source/retry/retry_test.go @@ -214,7 +214,6 @@ func TestOptions_IgnoreInvalid(t *testing.T) { retry.WithBackoff(0, time.Minute, 2.0, false), // ignored (base<=0) retry.WithBackoff(-1, time.Minute, 0.5, false), // ignored (factor<1) retry.WithBackoffFunc(nil), // ignored - retry.WithClock(nil), // ignored retry.WithJitterSource(nil), // ignored retry.WithBackoff(time.Second, time.Minute, 2.0, false), // applies ) @@ -228,12 +227,22 @@ func TestOptions_IgnoreInvalid(t *testing.T) { } } -func TestWithClock_Accepted(t *testing.T) { +// TestWithJitterSource_DeterministicSchedule confirms the injected jitter source +// makes the exponential schedule deterministic: a source returning a fixed +// fraction scales the computed delay by exactly that fraction. +func TestWithJitterSource_DeterministicSchedule(t *testing.T) { t.Parallel() - clock := func() time.Time { return time.Unix(0, 0) } - mw := retry.Middleware(retry.WithClock(clock)) - h := mw(func(_ context.Context, _ source.Message) source.Result { return source.Ack() }) - if got := h(context.Background(), stubMsg{}); got.Action != source.ActionAck { - t.Fatalf("action = %v, want ack", got.Action) + mw := retry.Middleware( + retry.WithBackoff(time.Second, time.Minute, 2.0, true), + retry.WithJitterSource(func() float64 { return 0.5 }), + ) + h := mw(func(_ context.Context, _ source.Message) source.Result { return source.Nak(errBoom) }) + // Attempt 1: base*factor^0 = 1s, scaled by jitter 0.5 -> 500ms. + got := h(retry.WithAttempt(context.Background(), 1), stubMsg{}) + if got.Action != source.ActionNak { + t.Fatalf("action = %v, want nak", got.Action) + } + if got.Requeue != 500*time.Millisecond { + t.Fatalf("delay = %v, want 500ms (1s scaled by 0.5 jitter)", got.Requeue) } } diff --git a/source/statemachine/doc.go b/source/statemachine/doc.go index 6a76035..5eb5b95 100644 --- a/source/statemachine/doc.go +++ b/source/statemachine/doc.go @@ -42,10 +42,16 @@ // // # Modes // -// Two binding modes share the same outcomes: +// Three binding modes share the same outcomes: // // - Durable: [Drive] loads and saves each instance through a [Store], -// persisting the transition before acking. This is the exactly-once path. +// persisting the transition before acking. Redelivery is deduplicated by the +// persisted event id (exactly-once into the machine, at-least-once delivery). +// - Transactional: [DriveTx] runs the durable path inside a +// [source.Transactional] consume-process-produce transaction (Kafka EOS), so +// the records a transition emits and the consumed offset commit as one atomic +// unit. It is the exactly-once-into-a-sink path; use it only on a backend that +// satisfies [source.Transactional]. // - Stateless: [DriveFunc] fires each message against a caller-supplied // function with no persistence, for sources that drive a transient or // externally-owned machine. diff --git a/source/statemachine/drive.go b/source/statemachine/drive.go index f7911b2..6fa6258 100644 --- a/source/statemachine/drive.go +++ b/source/statemachine/drive.go @@ -83,6 +83,10 @@ func driveOn[K comparable, E comparable, C any]( } eventID := cfg.eventID(m) + // Signal when the message carries no event id: dedup is impossible, so a + // redelivery would re-fire. Surfacing it on the span makes the silent loss + // of exactly-once visible in traces (supply WithEventID to fix it). + span.SetAttributes(telemetry.Bool("statemachine.exactly_once", eventID != "")) if ok && eventID != "" && rec.LastEventID == eventID { // Exactly-once into the machine: this id was already folded into the // persisted version, so re-firing would double-apply. Ack and discard. diff --git a/source/statemachine/drivefunc.go b/source/statemachine/drivefunc.go index 1dbc2d8..9533afe 100644 --- a/source/statemachine/drivefunc.go +++ b/source/statemachine/drivefunc.go @@ -29,9 +29,11 @@ type FireFunc[K comparable, E comparable] func(ctx context.Context, key K, event // where there is no durable [Store] to commit against. // // The emit hand-off, state-aware rejection, and the ack outcome match [Drive], -// minus the load/save and minus version idempotency (a stateless binding has no -// persisted version to dedup against, so redelivery re-fires; supply a [Deduper] -// or use [Drive] for exactly-once): +// minus the load/save and minus version idempotency: a stateless binding has no +// persisted version to dedup against, so a redelivery re-fires. For idempotent +// redelivery either use [Drive], whose persisted version skips an already-applied +// event id, or add the source/idempotency middleware to the Hopper to suppress +// duplicates before they reach this handler. // // - Route/decode failure → [source.Term] (poison). // - Fire rejected as illegal for the current state → [source.Reject] diff --git a/source/statemachine/drivetx.go b/source/statemachine/drivetx.go index b7943dc..dd0d836 100644 --- a/source/statemachine/drivetx.go +++ b/source/statemachine/drivetx.go @@ -118,6 +118,10 @@ func DriveTx[K comparable, E comparable, C any]( } eventID := cfg.eventID(m) + // Signal when the message carries no event id: without one the version + // dedup that covers a broker abort after Save cannot fire, so a redelivery + // would re-fire. Surface it on the span so the lost guarantee is visible. + span.SetAttributes(telemetry.Bool("statemachine.exactly_once", eventID != "")) if loaded && eventID != "" && rec.LastEventID == eventID { // Already folded into the persisted version: ack as a no-op. This is a // plain offset advance, not a transactional produce, so it is settled by diff --git a/source/statemachine/drivetx_test.go b/source/statemachine/drivetx_test.go index b22c403..e033678 100644 --- a/source/statemachine/drivetx_test.go +++ b/source/statemachine/drivetx_test.go @@ -150,6 +150,32 @@ func TestDriveTx_EmitFailureAbortsTransaction(t *testing.T) { } } +func TestDriveTx_SaveFailureAbortsTransaction(t *testing.T) { + t.Parallel() + m := buildTurnstile() + // A store that fires cleanly but fails to persist: Save runs inside the + // transaction, so its error must abort the transaction (nothing committed) + // and nak the message for redelivery. + store := &flakyStore{saveErr: errors.New("persist exploded")} + + tx := &fakeTx{} + sub := &fakeTransactional{tx: tx, committed: true} + h := statemachine.DriveTx[turnstileState, turnstileEvent, *turnstile]( + m, store, routeFunded, sub, statemachine.TxSinkFunc(emitOpenedToTopic), + ) + + res := h(context.Background(), msg("evt-1", "c1")) + if res.Action != source.ActionNak { + t.Fatalf("action = %v, want nak on save failure", res.Action) + } + if !errors.Is(res.Err, store.saveErr) { + t.Fatalf("nak err = %v, want wrapped save error", res.Err) + } + if got, want := sub.calls, []string{"begin", "abort"}; !equalCalls(got, want) { + t.Fatalf("call order = %v, want %v (abort on save failure)", got, want) + } +} + func TestDriveTx_RedeliveryIsSkip_NotTransactional(t *testing.T) { t.Parallel() m := buildTurnstile() diff --git a/source/statemachine/options.go b/source/statemachine/options.go index 4801622..8519cb9 100644 --- a/source/statemachine/options.go +++ b/source/statemachine/options.go @@ -43,6 +43,12 @@ func (discardSink) Emit(context.Context, any) error { return nil } // "message-id" header ([DefaultEventIDHeader]) and falls back to the message // [source.Cursor] string; override it with [WithEventID] to read a different // header or derive an id from the decoded value. +// +// An empty id disables dedup for that message: with no id to compare against the +// persisted LastEventID, a redelivery re-fires the transition. The bindings +// surface this on their span (the "statemachine.exactly_once" attribute is false) +// so a message stream that yields no id is visible in traces rather than silently +// losing the guarantee. Return a non-empty stable id to keep exactly-once. type EventID func(m source.Message) string // DefaultEventIDHeader is the header [DefaultEventID] reads a message's