Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions source/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,12 @@ func (hp *Hopper) dispatchBatch(ctx context.Context, bh BatchHandler, buf []*del
hp.settle(ctx, span, d, r)
}

// Surface a count mismatch loudly rather than letting it pass. An under-count
// already terminated each unmatched message as poison above; an over-count
// (more results than messages) silently discards the extra results, so record
// the sentinel here too so the discard is visible in traces, not swallowed.
if len(results) != len(live) {
span.RecordError(ErrBatchResultCount)
span.SetStatus(telemetry.StatusError, "batch result count mismatch")
}
}
64 changes: 64 additions & 0 deletions source/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,70 @@ func TestHopper_BatchResultCountMismatch(t *testing.T) {
}
}

// TestHopper_BatchResultOverCount checks that returning more results than the
// batch holds settles every message and does not panic: the extra results are
// discarded, and the mismatch is surfaced (recorded on the span) rather than
// swallowed.
func TestHopper_BatchResultOverCount(t *testing.T) {
t.Parallel()
h := memsource.NewHarness(t,
[]source.Option{source.WithBatch(2, 0)},
memsource.Msg{Key: "k"},
memsource.Msg{Key: "k"},
)
h.RunBatch(func(_ context.Context, ms []source.Message) []source.Result {
// Three results for two messages: the third is extra and must be dropped
// without affecting the two real messages.
return []source.Result{source.Ack(), source.Ack(), source.Ack()}
})
h.AssertCounts(memsource.Counts{Acked: 2})
}

// TestHopper_ReceiveBatchSubscribesAndRuns covers the ReceiveBatch entry point,
// the batch analog of Receive: it subscribes through the inlet, drives the batch
// handler, and settles every message.
func TestHopper_ReceiveBatchSubscribesAndRuns(t *testing.T) {
t.Parallel()
in := memsource.New(
memsource.WithMessages(
memsource.Msg{Key: "k", Value: []byte("a")},
memsource.Msg{Key: "k", Value: []byte("b")},
),
)
hp := source.New(source.WithBatch(2, 0))

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

done := make(chan error, 1)
go func() {
done <- hp.ReceiveBatch(ctx, in, source.SubscribeConfig{Topics: []string{"t"}},
func(_ context.Context, ms []source.Message) []source.Result {
res := make([]source.Result, len(ms))
for i := range res {
res[i] = source.Ack()
}
return res
})
}()

// ReceiveBatch does not auto-close the subscription's stream; cancel to drain
// once the queued messages have been handled.
time.Sleep(50 * time.Millisecond)
cancel()
select {
case err := <-done:
if err != nil {
t.Fatalf("ReceiveBatch returned %v, want nil", err)
}
case <-time.After(2 * time.Second):
t.Fatal("ReceiveBatch did not return")
}
if c := in.Ledger().Counts(); c.Acked != 2 {
t.Fatalf("acked = %d, want 2", c.Acked)
}
}

// TestHopper_BatchDecodeFailureIsolated checks one undecodable message terminates
// alone without poisoning its batch-mates.
func TestHopper_BatchDecodeFailureIsolated(t *testing.T) {
Expand Down
40 changes: 31 additions & 9 deletions source/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,26 @@ type OrderedDelivery interface {
}

// Batched is a [Subscription] that can yield and settle messages in batches,
// amortizing per-message overhead. The [Hopper] uses it when present to fetch and
// ack in groups; an unbatched subscription is driven one message at a time.
// amortizing per-message overhead. The [Hopper]'s batch mode (see
// [Hopper.RunBatch]) uses [Batched.NextBatch] to fetch whole batches from the
// backend when this capability is present; an unbatched subscription is fetched
// one message at a time.
//
// Settlement is a separate decision. A [BatchHandler] returns one [Result] per
// message, so the Hopper settles each message by its own Result through
// [Subscription.Settle] rather than collapsing a batch onto a single outcome —
// a slow handler that fails the third message of five must not nak the other
// four. [SettleBatch] is therefore a reserved seam the Hopper does not call: it
// is the one-call settle path for an adapter or a caller that already knows a
// whole batch shares one outcome (a uniform ack after a bulk commit), not a
// substitute for per-message settlement.
type Batched interface {
// NextBatch returns up to limit messages, blocking for at least one, or
// ctx.Err()/ErrDrained as [Subscription.Next] would.
NextBatch(ctx context.Context, limit int) ([]Message, error)
// SettleBatch applies r to every message in ms in one call.
// SettleBatch applies the single result r to every message in ms in one call.
// It is for a caller settling a uniform batch directly; the Hopper settles per
// message (one Result each) and does not call it.
SettleBatch(ctx context.Context, ms []Message, r Result) error
}

Expand Down Expand Up @@ -175,13 +188,22 @@ type ProducedRecord struct {
Headers Headers
}

// Deduper is a [Subscription] (or an inlet seam) that suppresses re-delivery of
// an already-processed message by an idempotency key. The no-op default is "no
// deduplication"; the state-machine bridge supplies the machine's state version
// as the key so redelivery is provably idempotent with no external store.
// Deduper suppresses re-delivery of an already-processed message by an
// idempotency key. It is a reserved seam, not a capability the [Hopper] consults
// on its own: the Hopper never type-asserts a subscription to Deduper and never
// calls [Deduper.Seen]. Deduplication is opt-in middleware — wire a Deduper into
// the source/idempotency middleware with its WithDeduper option (it adapts a
// Deduper to the middleware's store via FromDeduper) and add that middleware to
// the Hopper, or call Seen yourself from a handler. The no-op default, with no
// such middleware, is "no deduplication".
//
// Exactly-once into a statechart is provided separately by the source/statemachine
// bridge's version idempotency (a redelivered, already-applied event id is
// skipped against the persisted version); that path does not use this interface.
type Deduper interface {
// Seen reports whether key has already been processed (and records it if not),
// so the Hopper can skip a duplicate by acking without re-running the handler.
// Seen reports whether key has already been processed, recording it if not, so
// a caller (the idempotency middleware, or a handler) can skip a duplicate by
// acking without re-running the work.
Seen(ctx context.Context, key string) (bool, error)
}

Expand Down
35 changes: 35 additions & 0 deletions source/cdc/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,41 @@ func TestDecodeEvent_WrongTypeIsPoison(t *testing.T) {
}
}

func TestDecodeEvent_RegistryErrorPropagates(t *testing.T) {
t.Parallel()

// A registry whose codec fails to decode: DecodeEvent returns the registry's
// (poison-classified) decode error rather than masking it.
registry := source.NewRegistry().SetDefault(source.CodecFunc(
func([]byte, source.Headers) (any, error) {
return nil, errors.New("codec exploded")
},
))
_, err := cdc.DecodeEvent(registry, changeMessage{value: []byte(`{"op":"c"}`)})
if err == nil {
t.Fatal("DecodeEvent error = nil, want registry decode error")
}
if !errors.Is(err, source.ErrPoison) {
t.Fatalf("DecodeEvent error = %v, want poison-classified decode error", err)
}
}

func TestRawJSON_As_UnmarshalErrorReported(t *testing.T) {
t.Parallel()

// A create carries an after-image; decoding it into an incompatible shape
// surfaces the json.Unmarshal error (wrapped), not ErrMissingImage.
ev := decode(t, []byte(`{"op":"c","after":{"id":1,"name":"ada"}}`))
var dst int // the after-image is an object; an int cannot hold it
err := ev.After.As(&dst)
if err == nil {
t.Fatal("RawJSON.As error = nil, want unmarshal error")
}
if errors.Is(err, cdc.ErrMissingImage) {
t.Fatalf("RawJSON.As error = %v, want unmarshal error not ErrMissingImage", err)
}
}

func TestSourceHeaders(t *testing.T) {
t.Parallel()

Expand Down
46 changes: 46 additions & 0 deletions source/cloudevents/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# source/cloudevents

A [`crucible/source`](../) codec that decodes inbound messages into
[CloudEvents](https://cloudevents.io). Runtime dependency:
[`github.com/cloudevents/sdk-go/v2`](https://github.com/cloudevents/sdk-go).

```go
reg := source.NewRegistry().
SetDefault(cloudevents.New()). // binary mode by content type
Register(cloudevents.StructuredContentType, cloudevents.New()) // structured JSON

ev, order, err := cloudevents.DecodeData[Order](reg, m)
```

Construct a `Codec` with `New` and register it on a `source.Registry` under the
content types you accept (or as the registry default). There is no package-level
format registration: every codec is instance-scoped, so two codecs in one
process never share mutable state.

## Content modes

The CloudEvents spec defines two ways an event rides a transport, and this codec
accepts both, selecting by the message's content type (see `Detect`):

- **Structured** — the whole event (attributes and data) is one JSON document in
the body, carried under `application/cloudevents+json`
(`StructuredContentType`).
- **Binary** — the event's attributes ride as `ce-`-prefixed headers and the data
is the raw body, with the body's own media type in the `datacontenttype`
header (or the message's content type).

A content type whose media type begins with `application/cloudevents` decodes as
structured; anything else decodes as binary.

## Decoded value

`Decode` yields a `cloudevents.Event` (the SDK's canonical event). Recover it
from a handler with `EventOf`, or in one call with `DecodeEvent`. Decode the data
payload into a concrete type with `DataAs`, or use the generic `DecodeData[T]` to
decode and project in one step. Extension attributes are surfaced through the
core `source.Headers` (see `Extensions`, prefixed with `ExtensionHeaderPrefix`)
so a handler reads them the same way it reads any other inbound metadata.

## Stability

Experimental (pre-v1); the API may change until the suite locks v1.0.0.
34 changes: 0 additions & 34 deletions source/cloudevents/cloudevents.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,5 @@
// SPDX-License-Identifier: Apache-2.0

// Package cloudevents is a source codec that decodes inbound messages into
// CloudEvents. It plugs into a source.Registry as an instance-scoped
// [source.Codec]: construct one with [New] and register it under the content
// types you accept; there is no global format registration (the CloudEvents
// SDK's package-level format registry is deliberately not used, so two codecs
// in one process never share mutable state).
//
// # Content modes
//
// The CloudEvents spec defines two ways an event rides a transport, and this
// codec accepts both, selecting between them by the message's content type:
//
// - Structured mode: the entire event — attributes and data — is one JSON
// document in the body, carried under "application/cloudevents+json".
// - Binary mode: the event's attributes ride as "ce-"-prefixed headers and
// the data is the raw body, with the body's own media type in the
// "datacontenttype" header (or the message's content-type).
//
// A content type whose media type begins with "application/cloudevents" (the
// structured prefix) decodes as structured; anything else decodes as binary.
// See [Detect].
//
// # Decoded value
//
// Decode yields a [cloudevents.Event] (the SDK's canonical event). Recover it
// from a handler with [EventOf], and decode its data payload into a concrete
// type with [DataAs] or the generic [DecodeData] helper. Extension attributes
// are surfaced through the core [source.Headers] (see [Extensions]) rather than
// a magic-string map, so a handler reads them the same way it reads any other
// inbound metadata.
//
// # Stability
//
// Experimental (pre-v1); the API may change until the suite locks v1.0.0.
package cloudevents

import (
Expand Down
37 changes: 37 additions & 0 deletions source/cloudevents/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// SPDX-License-Identifier: Apache-2.0

// Package cloudevents is a source codec that decodes inbound messages into
// CloudEvents. It plugs into a source.Registry as an instance-scoped
// [source.Codec]: construct one with [New] and register it under the content
// types you accept; there is no global format registration (the CloudEvents
// SDK's package-level format registry is deliberately not used, so two codecs
// in one process never share mutable state).
//
// # Content modes
//
// The CloudEvents spec defines two ways an event rides a transport, and this
// codec accepts both, selecting between them by the message's content type:
//
// - Structured mode: the entire event — attributes and data — is one JSON
// document in the body, carried under "application/cloudevents+json".
// - Binary mode: the event's attributes ride as "ce-"-prefixed headers and
// the data is the raw body, with the body's own media type in the
// "datacontenttype" header (or the message's content-type).
//
// A content type whose media type begins with "application/cloudevents" (the
// structured prefix) decodes as structured; anything else decodes as binary.
// See [Detect].
//
// # Decoded value
//
// Decode yields a [cloudevents.Event] (the SDK's canonical event). Recover it
// from a handler with [EventOf], and decode its data payload into a concrete
// type with [DataAs] or the generic [DecodeData] helper. Extension attributes
// are surfaced through the core [source.Headers] (see [Extensions]) rather than
// a magic-string map, so a handler reads them the same way it reads any other
// inbound metadata.
//
// # Stability
//
// Experimental (pre-v1); the API may change until the suite locks v1.0.0.
package cloudevents
38 changes: 30 additions & 8 deletions source/hopper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
// A lane is a single goroutine that processes its queue strictly in arrival
// order, so two messages with the same key are never reordered, while distinct
// keys run in parallel up to [WithConcurrency]. This is the guarantee a
// statechart instance needs: its events arrive in order.
// statechart instance needs: its events arrive in order. The number of distinct
// lanes (and their goroutines) is bounded by [WithMaxLanes]; keys beyond the
// bound share a lane by hash, which serializes only the unrelated colliding keys
// and never reorders a single key's messages.
//
// Backpressure. [WithMaxInFlight] bounds the messages delivered but not yet
// settled; when the window is full the fetch loop blocks before pulling the next
Expand All @@ -44,6 +47,7 @@ type Hopper struct {
middleware []Middleware
concurrency int
maxInFlight int
maxLanes int
batch batchConfig

received telemetry.Counter
Expand Down Expand Up @@ -76,6 +80,7 @@ func New(opts ...Option) *Hopper {
middleware: append([]Middleware(nil), cfg.middleware...),
concurrency: cfg.concurrency,
maxInFlight: cfg.maxInFlight,
maxLanes: cfg.maxLanes,
batch: cfg.batch,
received: m.Counter("source.received", telemetry.WithDescription("messages received from the subscription")),
acked: m.Counter("source.acked", telemetry.WithDescription("messages acknowledged after successful handling")),
Expand Down Expand Up @@ -349,16 +354,25 @@ func (hp *Hopper) reportLag(ctx context.Context, sub Subscription) {
}

// laneKey selects the ordered lane for m: its PartitionKey when non-empty, else
// a hash of its Key, else lane 0 (every keyless message shares one ordered
// lane, preserving global order for them).
// a hash of its Key, else lane 0 (every keyless message shares one ordered lane,
// preserving global order for them). The hash is folded onto the bounded lane
// set (see [WithMaxLanes]) so the number of lanes never exceeds maxLanes; a key
// always maps to the same lane, so folding never reorders a key's messages.
func (hp *Hopper) laneKey(m Message) uint64 {
if pk := m.PartitionKey(); pk != "" {
return hashBytes([]byte(pk))
var h uint64
switch {
case m.PartitionKey() != "":
// Hash the string directly, avoiding a []byte conversion per message.
h = hashString(m.PartitionKey())
case len(m.Key()) > 0:
h = hashBytes(m.Key())
default:
return 0
}
if k := m.Key(); len(k) > 0 {
return hashBytes(k)
if hp.maxLanes > 0 {
return h % uint64(hp.maxLanes)
}
return 0
return h
}

func hashBytes(b []byte) uint64 {
Expand All @@ -367,6 +381,14 @@ func hashBytes(b []byte) uint64 {
return h.Sum64()
}

// hashString hashes s without allocating a []byte copy, using the hash's
// io.Writer-free string path.
func hashString(s string) uint64 {
h := fnv.New64a()
_, _ = io.WriteString(h, s)
return h.Sum64()
}

// Close begins a graceful drain: the running Hopper stops fetching, finishes and
// settles in-flight messages, and Run returns. Close is idempotent and never
// blocks on the drain; it signals, and Run completes the drain.
Expand Down
Loading
Loading