Skip to content

Commit bf81e16

Browse files
committed
kafka perf tweaks
1 parent 516d06f commit bf81e16

File tree

2 files changed

+11
-3
lines changed

2 files changed

+11
-3
lines changed

cmd/tap/outbox.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"context"
55
"log/slog"
6+
"strconv"
67
"sync"
78
"sync/atomic"
89
"time"
@@ -231,7 +232,14 @@ func (o *Outbox) kafkaProduceAsync(evt *OutboxEvt) {
231232
o.AckEvent(evt.ID)
232233
}
233234

234-
if err := o.kafkaProducer.ProduceAsync(context.Background(), evt.Did, evt.Event, cb); err != nil {
235+
var key string
236+
// TODO: ordering bool
237+
if true {
238+
key = strconv.FormatUint(uint64(evt.ID), 10)
239+
} else {
240+
key = evt.Did
241+
}
242+
if err := o.kafkaProducer.ProduceAsync(context.Background(), key, evt.Event, cb); err != nil {
235243
logger.Error("error queueing event for production", "error", err)
236244
kafkaEventsProduced.WithLabelValues("error_queueing").Inc()
237245
}

cmd/tap/tap.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ func NewTap(config TapConfig) (*Tap, error) {
132132
logger.With("component", "kafkaProducer"),
133133
config.KafkaBootstrapServers,
134134
config.KafkaOutputTopic,
135-
atkafka.WithEnsureTopic(true), // ensures that the topic has been created
136-
atkafka.WithTopicPartitions(24),
135+
atkafka.WithEnsureTopic(true), // ensures that the topic has been created
136+
atkafka.WithTopicPartitions(100), // TODO: partition count env var
137137
)
138138
if err != nil {
139139
return nil, fmt.Errorf("failed to create kafka producer: %w", err)

0 commit comments

Comments
 (0)