diff --git a/src/config.rs b/src/config.rs index 1e63062b..a013fece 100644 --- a/src/config.rs +++ b/src/config.rs @@ -766,12 +766,10 @@ impl Config { )))); } - // Add the retry topic if configured. Retry topics are produce-only; - // this taskbroker writes retries to them but does not consume from - // them. Aliasing the main topic is allowed (retries are re-enqueued - // there), but aliasing the deadletter topic is not: the names would - // collide and the retry would silently inherit the deadletter - // cluster/role. + // Register the retry topic on the deadletter cluster: retries are + // published by the upkeep producer, which is the same producer used + // for the deadletter topic (see kafka_producer_cluster). Aliasing + // the deadletter topic is rejected to avoid a name collision. if let Some(ref retry_topic) = self.kafka_retry_topic { if retry_topic == &self.kafka_deadletter_topic { return Err(Box::new(figment::Error::from(format!( @@ -782,7 +780,7 @@ impl Config { self.kafka_topics .entry(retry_topic.clone()) .or_insert_with(|| TopicConfig { - cluster: DEFAULT_CLUSTER.to_owned(), + cluster: DEADLETTER_CLUSTER.to_owned(), consumer_group, produce_only: true, raw: None, @@ -2019,4 +2017,40 @@ kafka_clusters: Ok(()) }); } + + #[test] + fn test_legacy_retry_topic_uses_deadletter_cluster() { + // Reproduces the `ingest-profiles-raw` pool: the main consumer topic + // lives on a different cluster than the retry+DLQ topics. The retry + // producer is the upkeep/deadletter producer, so a distinct legacy + // retry topic must be registered on the deadletter cluster, not the + // main consumer cluster. Otherwise the same-cluster validation compares + // the retry topic against the wrong cluster and rejects this config. + Jail::expect_with(|jail| { + jail.set_env("TASKBROKER_KAFKA_CLUSTER", "kafka-profiles:9092"); + jail.set_env("TASKBROKER_KAFKA_TOPIC", "profiles"); + jail.set_env("TASKBROKER_KAFKA_RETRY_TOPIC", "taskworker-ingest"); + jail.set_env("TASKBROKER_KAFKA_DEADLETTER_TOPIC", "taskworker-ingest-dlq"); + jail.set_env("TASKBROKER_KAFKA_DEADLETTER_CLUSTER", "kafka-small:9092"); + + let args = Args { config: None }; + let config = Config::from_args(&args).expect("legacy retry config should validate"); + + // The retry topic resolves to the deadletter cluster (where the + // upkeep producer actually publishes), not the main consumer cluster. + let retry_topic = config + .kafka_topics + .get("taskworker-ingest") + .expect("retry topic registered"); + assert!(retry_topic.produce_only); + assert_eq!( + config.cluster(&retry_topic.cluster).unwrap().address, + "kafka-small:9092" + ); + // And it matches the producer's cluster. + assert_eq!(config.kafka_producer_cluster().address, "kafka-small:9092"); + + Ok(()) + }); + } }