Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,12 +766,10 @@ impl Config {
))));
}

// Add the retry topic if configured. Retry topics are produce-only;
// this taskbroker writes retries to them but does not consume from
// them. Aliasing the main topic is allowed (retries are re-enqueued
// there), but aliasing the deadletter topic is not: the names would
// collide and the retry would silently inherit the deadletter
// cluster/role.
// Register the retry topic on the deadletter cluster: retries are
// published by the upkeep producer, which is the same producer used
// for the deadletter topic (see kafka_producer_cluster). Aliasing
// the deadletter topic is rejected to avoid a name collision.
if let Some(ref retry_topic) = self.kafka_retry_topic {
if retry_topic == &self.kafka_deadletter_topic {
return Err(Box::new(figment::Error::from(format!(
Expand All @@ -782,7 +780,7 @@ impl Config {
self.kafka_topics
.entry(retry_topic.clone())
.or_insert_with(|| TopicConfig {
cluster: DEFAULT_CLUSTER.to_owned(),
cluster: DEADLETTER_CLUSTER.to_owned(),
consumer_group,
produce_only: true,
raw: None,
Expand Down Expand Up @@ -2019,4 +2017,40 @@ kafka_clusters:
Ok(())
});
}

#[test]
fn test_legacy_retry_topic_uses_deadletter_cluster() {
// Reproduces the `ingest-profiles-raw` pool: the main consumer topic
// lives on a different cluster than the retry+DLQ topics. The retry
// producer is the upkeep/deadletter producer, so a distinct legacy
// retry topic must be registered on the deadletter cluster, not the
// main consumer cluster. Otherwise the same-cluster validation compares
// the retry topic against the wrong cluster and rejects this config.
Jail::expect_with(|jail| {
jail.set_env("TASKBROKER_KAFKA_CLUSTER", "kafka-profiles:9092");
jail.set_env("TASKBROKER_KAFKA_TOPIC", "profiles");
jail.set_env("TASKBROKER_KAFKA_RETRY_TOPIC", "taskworker-ingest");
jail.set_env("TASKBROKER_KAFKA_DEADLETTER_TOPIC", "taskworker-ingest-dlq");
jail.set_env("TASKBROKER_KAFKA_DEADLETTER_CLUSTER", "kafka-small:9092");

let args = Args { config: None };
let config = Config::from_args(&args).expect("legacy retry config should validate");

// The retry topic resolves to the deadletter cluster (where the
// upkeep producer actually publishes), not the main consumer cluster.
let retry_topic = config
.kafka_topics
.get("taskworker-ingest")
.expect("retry topic registered");
assert!(retry_topic.produce_only);
assert_eq!(
config.cluster(&retry_topic.cluster).unwrap().address,
"kafka-small:9092"
);
// And it matches the producer's cluster.
assert_eq!(config.kafka_producer_cluster().address, "kafka-small:9092");

Ok(())
});
}
}
Loading