Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,50 @@ cargo run -- -c ./config/config-sentry-dev.yaml
## Configuration

Taskbroker uses YAML files for configuration, and all of the available
configuration options can be found in [Config](https://github.com/getsentry/taskbroker/blob/main/src/config.rs#L15)
configuration options can be found in [Config](https://github.com/getsentry/taskbroker/blob/main/src/config.rs).

All configuration options can also be defined as environment variables using
the `TASKBROKER_` prefix.
the `TASKBROKER_` prefix. Nested values use a `__` separator, e.g.
`TASKBROKER_KAFKA_CLUSTERS__DEFAULT__ADDRESS=127.0.0.1:9092`.

### Kafka topics and clusters

Kafka is configured with two maps: `kafka_clusters` defines the brokers (and
optional auth) for each cluster, and `kafka_topics` declares each topic and
which cluster it lives on. Exactly one topic must be consumable; the rest
(retry, dead-letter) are `produce_only`. The dead-letter and retry topics must
share a cluster with the upkeep producer.

```yaml
kafka_deadletter_topic: taskworker-dlq

kafka_clusters:
default:
address: 127.0.0.1:9092
# optional auth, applied to every topic on this cluster:
# security_protocol: sasl_ssl
# sasl_mechanism: scram-sha-256
# sasl_username: ...
# sasl_password: ...

kafka_topics:
taskworker:
cluster: default
consumer_group: taskworker
taskworker-retry:
cluster: default
consumer_group: taskworker
produce_only: true
taskworker-dlq:
cluster: default
consumer_group: taskworker
produce_only: true
```
Comment thread
sentry[bot] marked this conversation as resolved.

The older flat `kafka_cluster` / `kafka_topic` / `kafka_consumer_group` fields
(and the `kafka_deadletter_*` auth fields) are deprecated but still supported.
See [docs/kafka-config-migration.md](docs/kafka-config-migration.md) for how to
move from the legacy fields to the new format.

## Tests

Expand Down
11 changes: 10 additions & 1 deletion devservices/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,16 @@ services:
taskbroker:
image: ghcr.io/getsentry/taskbroker:nightly
environment:
TASKBROKER_KAFKA_CLUSTER: "kafka-kafka-1:9093"
# Multi-topic/cluster config expressed via env vars. Nested keys use the
# "__" separator; figment lowercases each key segment, so topic/cluster
# names can only contain underscores (not hyphens) when set this way.
TASKBROKER_KAFKA_CLUSTERS__DEFAULT__ADDRESS: "kafka-kafka-1:9093"
TASKBROKER_KAFKA_TOPICS__TASKWORKER__CLUSTER: "default"
TASKBROKER_KAFKA_TOPICS__TASKWORKER__CONSUMER_GROUP: "taskworker"
TASKBROKER_KAFKA_DEADLETTER_TOPIC: "taskworker_dlq"
TASKBROKER_KAFKA_TOPICS__TASKWORKER_DLQ__CLUSTER: "default"
TASKBROKER_KAFKA_TOPICS__TASKWORKER_DLQ__CONSUMER_GROUP: "taskworker"
TASKBROKER_KAFKA_TOPICS__TASKWORKER_DLQ__PRODUCE_ONLY: "true"
TASKBROKER_CREATE_MISSING_TOPICS: "true"
ports:
- 127.0.0.1:50051:50051
Expand Down
212 changes: 212 additions & 0 deletions docs/kafka-config-migration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
# Migrating Kafka config to `kafka_clusters` / `kafka_topics`

Taskbroker historically configured Kafka with a set of flat fields describing a
single consumed cluster plus a dead-letter cluster. As of #663 these are
deprecated in favor of two maps:

- `kafka_clusters` — named clusters, each with an address and optional auth.
- `kafka_topics` — named topics, each pointing at a cluster.

The legacy fields still work (they are normalized into the maps at startup, and
emit a deprecation `warn!`), but the two formats are **mutually exclusive** —
you cannot mix a deprecated field with `kafka_clusters`/`kafka_topics` in the
same config.

## Field mapping

| Legacy field | New location |
| --- | --- |
| `kafka_cluster` | `kafka_clusters.<name>.address` |
| `kafka_topic` | a key under `kafka_topics` (the one consumable topic) |
| `kafka_consumer_group` | `kafka_topics.<topic>.consumer_group` |
| `kafka_security_protocol` | `kafka_clusters.<name>.security_protocol` |
| `kafka_sasl_mechanism` | `kafka_clusters.<name>.sasl_mechanism` |
| `kafka_sasl_username` | `kafka_clusters.<name>.sasl_username` |
| `kafka_sasl_password` | `kafka_clusters.<name>.sasl_password` |
| `kafka_ssl_ca_location` | `kafka_clusters.<name>.ssl_ca_location` |
| `kafka_ssl_certificate_location` | `kafka_clusters.<name>.ssl_certificate_location` |
| `kafka_ssl_key_location` | `kafka_clusters.<name>.ssl_key_location` |
| `kafka_deadletter_cluster` | a separate cluster's `address` |
| `kafka_deadletter_security_protocol` (and other `kafka_deadletter_*` auth) | auth on the dead-letter cluster |
| `kafka_deadletter_topic` | **not deprecated** — still a top-level field; must name a `produce_only` topic declared in `kafka_topics` |
| `kafka_retry_topic` | **not deprecated** — still a top-level field; must name a `produce_only` topic declared in `kafka_topics` |

Raw-mode fields move under the consumed topic's `raw:` block:

| Legacy field | New location |
| --- | --- |
| `raw_mode` | presence of a `raw:` block on the topic |
| `raw_namespace` | `kafka_topics.<topic>.raw.namespace` |
| `raw_application` | `kafka_topics.<topic>.raw.application` |
| `raw_taskname` | `kafka_topics.<topic>.raw.taskname` |
| `raw_processing_deadline_duration` | `kafka_topics.<topic>.raw.processing_deadline_duration` |

The top-level `raw_*` fields are deprecated; the legacy global `raw_mode` is
migrated onto the consumed topic's `raw` block during normalization.

### Removed

- `kafka_consume_retry_topic` was removed. Retry topics are declared like any
other topic in `kafka_topics`; to consume one, declare it as a normal
(non-`produce_only`) consumed topic rather than toggling a flag.

## Rules enforced at startup

- At least one topic must be consumable (i.e. not `produce_only`).
- `kafka_deadletter_topic` must be declared in `kafka_topics`.
- `kafka_retry_topic` is **mandatory when a consumed topic uses raw mode**: raw
messages aren't activations, so retries (which are activation-encoded) cannot
loop back into the raw topic and must go to a separate activation-encoded
topic. For a single non-raw consumed topic, retries fall back to that topic
when `kafka_retry_topic` is unset.
- The retry target and the dead-letter topic must resolve to the **same cluster
address** — they share a single upkeep producer.
- Every topic's `cluster` must reference a cluster defined in `kafka_clusters`.

## Examples

### Before (legacy, single cluster)

```yaml
kafka_cluster: 127.0.0.1:9092
kafka_topic: taskworker
kafka_consumer_group: taskworker
kafka_deadletter_topic: taskworker-dlq
kafka_retry_topic: taskworker-retry
```

### After

```yaml
kafka_deadletter_topic: taskworker-dlq
kafka_retry_topic: taskworker-retry

kafka_clusters:
default:
address: 127.0.0.1:9092

kafka_topics:
taskworker:
cluster: default
consumer_group: taskworker
taskworker-retry:
cluster: default
consumer_group: taskworker
produce_only: true
taskworker-dlq:
cluster: default
consumer_group: taskworker
produce_only: true
```

### Before (separate dead-letter cluster, with auth)

```yaml
kafka_cluster: main-brokers:9092
kafka_topic: profiles
kafka_consumer_group: taskbroker-profiles
kafka_security_protocol: sasl_ssl
kafka_sasl_mechanism: scram-sha-256
kafka_sasl_username: main-user
kafka_sasl_password: main-pass

kafka_deadletter_cluster: dlq-brokers:9092
kafka_deadletter_topic: profiles-dlq
kafka_retry_topic: profiles-retry
```

### After

The retry and dead-letter topics share the upkeep producer, so they must sit on
the same cluster (`dlq-brokers` here):

```yaml
kafka_deadletter_topic: profiles-dlq
kafka_retry_topic: profiles-retry

kafka_clusters:
main:
address: main-brokers:9092
security_protocol: sasl_ssl
sasl_mechanism: scram-sha-256
sasl_username: main-user
sasl_password: main-pass
deadletter:
address: dlq-brokers:9092

kafka_topics:
profiles:
cluster: main
consumer_group: taskbroker-profiles
profiles-retry:
cluster: deadletter
consumer_group: taskbroker-profiles
produce_only: true
profiles-dlq:
cluster: deadletter
consumer_group: taskbroker-profiles
produce_only: true
```

### Raw mode (per-topic)

Before — raw mode configured via the global `raw_*` fields:

```yaml
kafka_cluster: 127.0.0.1:9092
kafka_topic: profiles
kafka_consumer_group: ingest-profiles
raw_mode: true
raw_namespace: ingest.profiling.passthrough
raw_application: sentry
raw_taskname: sentry.profiles.task.process_profile_from_kafka
```

After — raw mode moves onto the consumed topic's `raw` block, and a retry topic
becomes mandatory (raw messages aren't activations, so retries can't loop back
into the raw topic). Here retries go to the `taskworker` topic, which another
taskbroker consumes:

```yaml
kafka_deadletter_topic: taskworker-dlq
kafka_retry_topic: taskworker

kafka_clusters:
default:
address: 127.0.0.1:9092

kafka_topics:
profiles:
cluster: default
consumer_group: ingest-profiles
raw:
namespace: ingest.profiling.passthrough
application: sentry
taskname: sentry.profiles.task.process_profile_from_kafka
processing_deadline_duration: 30
taskworker:
cluster: default
consumer_group: ingest-profiles
produce_only: true
taskworker-dlq:
cluster: default
consumer_group: ingest-profiles
produce_only: true
```

## Environment variables

Both formats are also settable via `TASKBROKER_`-prefixed env vars, with `__` as
the nesting separator:

```
TASKBROKER_KAFKA_CLUSTERS__DEFAULT__ADDRESS=127.0.0.1:9092
TASKBROKER_KAFKA_TOPICS__TASKWORKER__CLUSTER=default
TASKBROKER_KAFKA_TOPICS__TASKWORKER__CONSUMER_GROUP=taskworker
```

Note: figment lowercases each key segment after splitting on `__`, so topic and
cluster **names set via env vars can only contain underscores, not hyphens**
(e.g. `taskworker_dlq`, not `taskworker-dlq`). The `kafka_deadletter_topic`
value must then match that lowercased key. For names with hyphens, use a YAML
config file instead.
14 changes: 13 additions & 1 deletion k8s/k8s_stateful_set.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,20 @@ spec:
- name: taskbroker
image: us-central1-docker.pkg.dev/sentryio/taskbroker/image:8ff9bdee7be16ecf73bc9a20a7db6f2d9dda2e95
env:
- name: TASKBROKER_KAFKA_CLUSTER
- name: TASKBROKER_KAFKA_CLUSTERS__DEFAULT__ADDRESS
value: "kafka-001:9092"
- name: TASKBROKER_KAFKA_TOPICS__TASKWORKER__CLUSTER
value: "default"
- name: TASKBROKER_KAFKA_TOPICS__TASKWORKER__CONSUMER_GROUP
value: "taskworker"
- name: TASKBROKER_KAFKA_DEADLETTER_TOPIC
value: "taskworker_dlq"
- name: TASKBROKER_KAFKA_TOPICS__TASKWORKER_DLQ__CLUSTER
value: "default"
- name: TASKBROKER_KAFKA_TOPICS__TASKWORKER_DLQ__CONSUMER_GROUP
value: "taskworker"
- name: TASKBROKER_KAFKA_TOPICS__TASKWORKER_DLQ__PRODUCE_ONLY
value: "true"
ports:
- containerPort: 50051
name: grpc
Expand Down
Loading