diff --git a/README.md b/README.md index a47076fb..d2014f77 100644 --- a/README.md +++ b/README.md @@ -66,10 +66,50 @@ cargo run -- -c ./config/config-sentry-dev.yaml ## Configuration Taskbroker uses YAML files for configuration, and all of the available -configuration options can be found in [Config](https://github.com/getsentry/taskbroker/blob/main/src/config.rs#L15) +configuration options can be found in [Config](https://github.com/getsentry/taskbroker/blob/main/src/config.rs). All configuration options can also be defined as environment variables using -the `TASKBROKER_` prefix. +the `TASKBROKER_` prefix. Nested values use a `__` separator, e.g. +`TASKBROKER_KAFKA_CLUSTERS__DEFAULT__ADDRESS=127.0.0.1:9092`. + +### Kafka topics and clusters + +Kafka is configured with two maps: `kafka_clusters` defines the brokers (and +optional auth) for each cluster, and `kafka_topics` declares each topic and +which cluster it lives on. Exactly one topic must be consumable; the rest +(retry, dead-letter) are `produce_only`. The dead-letter and retry topics must +share a cluster with the upkeep producer. + +```yaml +kafka_deadletter_topic: taskworker-dlq + +kafka_clusters: + default: + address: 127.0.0.1:9092 + # optional auth, applied to every topic on this cluster: + # security_protocol: sasl_ssl + # sasl_mechanism: scram-sha-256 + # sasl_username: ... + # sasl_password: ... + +kafka_topics: + taskworker: + cluster: default + consumer_group: taskworker + taskworker-retry: + cluster: default + consumer_group: taskworker + produce_only: true + taskworker-dlq: + cluster: default + consumer_group: taskworker + produce_only: true +``` + +The older flat `kafka_cluster` / `kafka_topic` / `kafka_consumer_group` fields +(and the `kafka_deadletter_*` auth fields) are deprecated but still supported. +See [docs/kafka-config-migration.md](docs/kafka-config-migration.md) for how to +move from the legacy fields to the new format. ## Tests diff --git a/devservices/config.yml b/devservices/config.yml index adcda65a..d2591d4a 100644 --- a/devservices/config.yml +++ b/devservices/config.yml @@ -36,7 +36,16 @@ services: taskbroker: image: ghcr.io/getsentry/taskbroker:nightly environment: - TASKBROKER_KAFKA_CLUSTER: "kafka-kafka-1:9093" + # Multi-topic/cluster config expressed via env vars. Nested keys use the + # "__" separator; figment lowercases each key segment, so topic/cluster + # names can only contain underscores (not hyphens) when set this way. + TASKBROKER_KAFKA_CLUSTERS__DEFAULT__ADDRESS: "kafka-kafka-1:9093" + TASKBROKER_KAFKA_TOPICS__TASKWORKER__CLUSTER: "default" + TASKBROKER_KAFKA_TOPICS__TASKWORKER__CONSUMER_GROUP: "taskworker" + TASKBROKER_KAFKA_DEADLETTER_TOPIC: "taskworker_dlq" + TASKBROKER_KAFKA_TOPICS__TASKWORKER_DLQ__CLUSTER: "default" + TASKBROKER_KAFKA_TOPICS__TASKWORKER_DLQ__CONSUMER_GROUP: "taskworker" + TASKBROKER_KAFKA_TOPICS__TASKWORKER_DLQ__PRODUCE_ONLY: "true" TASKBROKER_CREATE_MISSING_TOPICS: "true" ports: - 127.0.0.1:50051:50051 diff --git a/docs/kafka-config-migration.md b/docs/kafka-config-migration.md new file mode 100644 index 00000000..2ba7e33d --- /dev/null +++ b/docs/kafka-config-migration.md @@ -0,0 +1,212 @@ +# Migrating Kafka config to `kafka_clusters` / `kafka_topics` + +Taskbroker historically configured Kafka with a set of flat fields describing a +single consumed cluster plus a dead-letter cluster. As of #663 these are +deprecated in favor of two maps: + +- `kafka_clusters` — named clusters, each with an address and optional auth. +- `kafka_topics` — named topics, each pointing at a cluster. + +The legacy fields still work (they are normalized into the maps at startup, and +emit a deprecation `warn!`), but the two formats are **mutually exclusive** — +you cannot mix a deprecated field with `kafka_clusters`/`kafka_topics` in the +same config. + +## Field mapping + +| Legacy field | New location | +| --- | --- | +| `kafka_cluster` | `kafka_clusters..address` | +| `kafka_topic` | a key under `kafka_topics` (the one consumable topic) | +| `kafka_consumer_group` | `kafka_topics..consumer_group` | +| `kafka_security_protocol` | `kafka_clusters..security_protocol` | +| `kafka_sasl_mechanism` | `kafka_clusters..sasl_mechanism` | +| `kafka_sasl_username` | `kafka_clusters..sasl_username` | +| `kafka_sasl_password` | `kafka_clusters..sasl_password` | +| `kafka_ssl_ca_location` | `kafka_clusters..ssl_ca_location` | +| `kafka_ssl_certificate_location` | `kafka_clusters..ssl_certificate_location` | +| `kafka_ssl_key_location` | `kafka_clusters..ssl_key_location` | +| `kafka_deadletter_cluster` | a separate cluster's `address` | +| `kafka_deadletter_security_protocol` (and other `kafka_deadletter_*` auth) | auth on the dead-letter cluster | +| `kafka_deadletter_topic` | **not deprecated** — still a top-level field; must name a `produce_only` topic declared in `kafka_topics` | +| `kafka_retry_topic` | **not deprecated** — still a top-level field; must name a `produce_only` topic declared in `kafka_topics` | + +Raw-mode fields move under the consumed topic's `raw:` block: + +| Legacy field | New location | +| --- | --- | +| `raw_mode` | presence of a `raw:` block on the topic | +| `raw_namespace` | `kafka_topics..raw.namespace` | +| `raw_application` | `kafka_topics..raw.application` | +| `raw_taskname` | `kafka_topics..raw.taskname` | +| `raw_processing_deadline_duration` | `kafka_topics..raw.processing_deadline_duration` | + +The top-level `raw_*` fields are deprecated; the legacy global `raw_mode` is +migrated onto the consumed topic's `raw` block during normalization. + +### Removed + +- `kafka_consume_retry_topic` was removed. Retry topics are declared like any + other topic in `kafka_topics`; to consume one, declare it as a normal + (non-`produce_only`) consumed topic rather than toggling a flag. + +## Rules enforced at startup + +- At least one topic must be consumable (i.e. not `produce_only`). +- `kafka_deadletter_topic` must be declared in `kafka_topics`. +- `kafka_retry_topic` is **mandatory when a consumed topic uses raw mode**: raw + messages aren't activations, so retries (which are activation-encoded) cannot + loop back into the raw topic and must go to a separate activation-encoded + topic. For a single non-raw consumed topic, retries fall back to that topic + when `kafka_retry_topic` is unset. +- The retry target and the dead-letter topic must resolve to the **same cluster + address** — they share a single upkeep producer. +- Every topic's `cluster` must reference a cluster defined in `kafka_clusters`. + +## Examples + +### Before (legacy, single cluster) + +```yaml +kafka_cluster: 127.0.0.1:9092 +kafka_topic: taskworker +kafka_consumer_group: taskworker +kafka_deadletter_topic: taskworker-dlq +kafka_retry_topic: taskworker-retry +``` + +### After + +```yaml +kafka_deadletter_topic: taskworker-dlq +kafka_retry_topic: taskworker-retry + +kafka_clusters: + default: + address: 127.0.0.1:9092 + +kafka_topics: + taskworker: + cluster: default + consumer_group: taskworker + taskworker-retry: + cluster: default + consumer_group: taskworker + produce_only: true + taskworker-dlq: + cluster: default + consumer_group: taskworker + produce_only: true +``` + +### Before (separate dead-letter cluster, with auth) + +```yaml +kafka_cluster: main-brokers:9092 +kafka_topic: profiles +kafka_consumer_group: taskbroker-profiles +kafka_security_protocol: sasl_ssl +kafka_sasl_mechanism: scram-sha-256 +kafka_sasl_username: main-user +kafka_sasl_password: main-pass + +kafka_deadletter_cluster: dlq-brokers:9092 +kafka_deadletter_topic: profiles-dlq +kafka_retry_topic: profiles-retry +``` + +### After + +The retry and dead-letter topics share the upkeep producer, so they must sit on +the same cluster (`dlq-brokers` here): + +```yaml +kafka_deadletter_topic: profiles-dlq +kafka_retry_topic: profiles-retry + +kafka_clusters: + main: + address: main-brokers:9092 + security_protocol: sasl_ssl + sasl_mechanism: scram-sha-256 + sasl_username: main-user + sasl_password: main-pass + deadletter: + address: dlq-brokers:9092 + +kafka_topics: + profiles: + cluster: main + consumer_group: taskbroker-profiles + profiles-retry: + cluster: deadletter + consumer_group: taskbroker-profiles + produce_only: true + profiles-dlq: + cluster: deadletter + consumer_group: taskbroker-profiles + produce_only: true +``` + +### Raw mode (per-topic) + +Before — raw mode configured via the global `raw_*` fields: + +```yaml +kafka_cluster: 127.0.0.1:9092 +kafka_topic: profiles +kafka_consumer_group: ingest-profiles +raw_mode: true +raw_namespace: ingest.profiling.passthrough +raw_application: sentry +raw_taskname: sentry.profiles.task.process_profile_from_kafka +``` + +After — raw mode moves onto the consumed topic's `raw` block, and a retry topic +becomes mandatory (raw messages aren't activations, so retries can't loop back +into the raw topic). Here retries go to the `taskworker` topic, which another +taskbroker consumes: + +```yaml +kafka_deadletter_topic: taskworker-dlq +kafka_retry_topic: taskworker + +kafka_clusters: + default: + address: 127.0.0.1:9092 + +kafka_topics: + profiles: + cluster: default + consumer_group: ingest-profiles + raw: + namespace: ingest.profiling.passthrough + application: sentry + taskname: sentry.profiles.task.process_profile_from_kafka + processing_deadline_duration: 30 + taskworker: + cluster: default + consumer_group: ingest-profiles + produce_only: true + taskworker-dlq: + cluster: default + consumer_group: ingest-profiles + produce_only: true +``` + +## Environment variables + +Both formats are also settable via `TASKBROKER_`-prefixed env vars, with `__` as +the nesting separator: + +``` +TASKBROKER_KAFKA_CLUSTERS__DEFAULT__ADDRESS=127.0.0.1:9092 +TASKBROKER_KAFKA_TOPICS__TASKWORKER__CLUSTER=default +TASKBROKER_KAFKA_TOPICS__TASKWORKER__CONSUMER_GROUP=taskworker +``` + +Note: figment lowercases each key segment after splitting on `__`, so topic and +cluster **names set via env vars can only contain underscores, not hyphens** +(e.g. `taskworker_dlq`, not `taskworker-dlq`). The `kafka_deadletter_topic` +value must then match that lowercased key. For names with hyphens, use a YAML +config file instead. diff --git a/k8s/k8s_stateful_set.yaml b/k8s/k8s_stateful_set.yaml index 83d7207d..97aa53a2 100644 --- a/k8s/k8s_stateful_set.yaml +++ b/k8s/k8s_stateful_set.yaml @@ -37,8 +37,20 @@ spec: - name: taskbroker image: us-central1-docker.pkg.dev/sentryio/taskbroker/image:8ff9bdee7be16ecf73bc9a20a7db6f2d9dda2e95 env: - - name: TASKBROKER_KAFKA_CLUSTER + - name: TASKBROKER_KAFKA_CLUSTERS__DEFAULT__ADDRESS value: "kafka-001:9092" + - name: TASKBROKER_KAFKA_TOPICS__TASKWORKER__CLUSTER + value: "default" + - name: TASKBROKER_KAFKA_TOPICS__TASKWORKER__CONSUMER_GROUP + value: "taskworker" + - name: TASKBROKER_KAFKA_DEADLETTER_TOPIC + value: "taskworker_dlq" + - name: TASKBROKER_KAFKA_TOPICS__TASKWORKER_DLQ__CLUSTER + value: "default" + - name: TASKBROKER_KAFKA_TOPICS__TASKWORKER_DLQ__CONSUMER_GROUP + value: "taskworker" + - name: TASKBROKER_KAFKA_TOPICS__TASKWORKER_DLQ__PRODUCE_ONLY + value: "true" ports: - containerPort: 50051 name: grpc