From 491b1d0a66aa98fe9f8cdba8c82945fa0837d28a Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 17:13:55 +0200 Subject: [PATCH 1/2] fix(config): legacy retry topic uses deadletter cluster In legacy-config normalization a distinct kafka_retry_topic was registered on DEFAULT_CLUSTER (the main consumer cluster). But retries are produced by the upkeep producer, which is the same producer used for the DLQ and connects to the deadletter topic's cluster (kafka_producer_config -> kafka_producer_cluster). The same-cluster validation then compared the retry topic's mis-assigned DEFAULT_CLUSTER against the deadletter cluster and falsely rejected configs where the main consumer cluster differs from the deadletter cluster. This only diverges when kafka_deadletter_cluster is explicitly set; when it is unset the deadletter cluster falls back to the main address, so the change is a no-op for those pools. For the ingest-profiles-raw pool the main consumer is on kafka-profiles while retry+DLQ are co-located on kafka-small, which surfaced the bug: retry target topic 'taskworker-ingest' is on cluster 'kafka-profiles...', but deadletter topic 'taskworker-ingest-dlq' is on 'kafka-small...'; they share a single producer and must be on the same cluster Register the legacy retry topic on DEADLETTER_CLUSTER so the config model matches the cluster the producer actually connects to. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 --- src/config.rs | 54 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/src/config.rs b/src/config.rs index 1e63062b..09a199b4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -768,10 +768,18 @@ impl Config { // Add the retry topic if configured. Retry topics are produce-only; // this taskbroker writes retries to them but does not consume from - // them. Aliasing the main topic is allowed (retries are re-enqueued - // there), but aliasing the deadletter topic is not: the names would - // collide and the retry would silently inherit the deadletter - // cluster/role. + // them. Retries are published by the upkeep producer, which is the + // same producer used for the deadletter topic and therefore connects + // to the deadletter cluster (see kafka_producer_cluster). A distinct + // legacy retry topic must therefore be registered on the deadletter + // cluster, not the main consumer cluster -- otherwise the + // same-cluster validation below compares the retry topic against the + // wrong cluster and rejects configs where the main consumer cluster + // differs from the deadletter cluster. Aliasing the main topic is + // allowed (retries are re-enqueued there, which requires the main and + // deadletter clusters to coincide), but aliasing the deadletter topic + // is not: the names would collide and the retry would silently + // inherit the deadletter role. if let Some(ref retry_topic) = self.kafka_retry_topic { if retry_topic == &self.kafka_deadletter_topic { return Err(Box::new(figment::Error::from(format!( @@ -782,7 +790,7 @@ impl Config { self.kafka_topics .entry(retry_topic.clone()) .or_insert_with(|| TopicConfig { - cluster: DEFAULT_CLUSTER.to_owned(), + cluster: DEADLETTER_CLUSTER.to_owned(), consumer_group, produce_only: true, raw: None, @@ -2019,4 +2027,40 @@ kafka_clusters: Ok(()) }); } + + #[test] + fn test_legacy_retry_topic_uses_deadletter_cluster() { + // Reproduces the `ingest-profiles-raw` pool: the main consumer topic + // lives on a different cluster than the retry+DLQ topics. The retry + // producer is the upkeep/deadletter producer, so a distinct legacy + // retry topic must be registered on the deadletter cluster, not the + // main consumer cluster. Otherwise the same-cluster validation compares + // the retry topic against the wrong cluster and rejects this config. + Jail::expect_with(|jail| { + jail.set_env("TASKBROKER_KAFKA_CLUSTER", "kafka-profiles:9092"); + jail.set_env("TASKBROKER_KAFKA_TOPIC", "profiles"); + jail.set_env("TASKBROKER_KAFKA_RETRY_TOPIC", "taskworker-ingest"); + jail.set_env("TASKBROKER_KAFKA_DEADLETTER_TOPIC", "taskworker-ingest-dlq"); + jail.set_env("TASKBROKER_KAFKA_DEADLETTER_CLUSTER", "kafka-small:9092"); + + let args = Args { config: None }; + let config = Config::from_args(&args).expect("legacy retry config should validate"); + + // The retry topic resolves to the deadletter cluster (where the + // upkeep producer actually publishes), not the main consumer cluster. + let retry_topic = config + .kafka_topics + .get("taskworker-ingest") + .expect("retry topic registered"); + assert!(retry_topic.produce_only); + assert_eq!( + config.cluster(&retry_topic.cluster).unwrap().address, + "kafka-small:9092" + ); + // And it matches the producer's cluster. + assert_eq!(config.kafka_producer_cluster().address, "kafka-small:9092"); + + Ok(()) + }); + } } From 11bffce963a6b520a486edd516badbba07e539cd Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 17:16:02 +0200 Subject: [PATCH 2/2] chore(config): shorten retry topic comment Co-Authored-By: Claude Opus 4.8 --- src/config.rs | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/src/config.rs b/src/config.rs index 09a199b4..a013fece 100644 --- a/src/config.rs +++ b/src/config.rs @@ -766,20 +766,10 @@ impl Config { )))); } - // Add the retry topic if configured. Retry topics are produce-only; - // this taskbroker writes retries to them but does not consume from - // them. Retries are published by the upkeep producer, which is the - // same producer used for the deadletter topic and therefore connects - // to the deadletter cluster (see kafka_producer_cluster). A distinct - // legacy retry topic must therefore be registered on the deadletter - // cluster, not the main consumer cluster -- otherwise the - // same-cluster validation below compares the retry topic against the - // wrong cluster and rejects configs where the main consumer cluster - // differs from the deadletter cluster. Aliasing the main topic is - // allowed (retries are re-enqueued there, which requires the main and - // deadletter clusters to coincide), but aliasing the deadletter topic - // is not: the names would collide and the retry would silently - // inherit the deadletter role. + // Register the retry topic on the deadletter cluster: retries are + // published by the upkeep producer, which is the same producer used + // for the deadletter topic (see kafka_producer_cluster). Aliasing + // the deadletter topic is rejected to avoid a name collision. if let Some(ref retry_topic) = self.kafka_retry_topic { if retry_topic == &self.kafka_deadletter_topic { return Err(Box::new(figment::Error::from(format!(