From 50e9826180baa28d23434433d1399db99374d8d6 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 11:08:52 +0200 Subject: [PATCH 01/17] ref(config): Implement multi-topic config ref STREAM-1042 We want to eventually support multiple topics being consumed. This PR adds the new structure. --- src/config.rs | 520 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 508 insertions(+), 12 deletions(-) diff --git a/src/config.rs b/src/config.rs index 17705933..1b099c68 100644 --- a/src/config.rs +++ b/src/config.rs @@ -10,6 +10,56 @@ use serde::{Deserialize, Serialize}; use crate::Args; use crate::logging::LogFormat; +/// Configuration for a single Kafka topic in multi-topic mode. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct TopicConfig { + /// Which cluster this topic is on (key into kafka_clusters) + pub cluster: String, + /// Consumer group for this topic + pub consumer_group: String, + /// If true, this topic is produce-only (e.g. retry topics). + /// Defaults to false, meaning the topic is consumed. + #[serde(default)] + pub produce_only: bool, + /// Raw mode settings. If set, this topic uses raw mode. + #[serde(default)] + pub raw: Option, +} + +/// Raw mode settings for a topic. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct RawModeConfig { + /// The namespace to assign to raw mode activations. + pub namespace: Option, + /// The application to assign to raw mode activations. + pub application: Option, + /// The taskname to assign to raw mode activations. + pub taskname: Option, + /// Processing deadline duration in seconds for raw mode activations. + pub processing_deadline_duration: Option, +} + +/// Configuration for a Kafka cluster. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct ClusterConfig { + /// Comma-separated list of broker addresses + pub address: String, + /// The security method used for authentication eg. sasl_plaintext + pub security_protocol: Option, + /// The hashing algorithm used for authentication eg. scram-sha-256 + pub sasl_mechanism: Option, + /// The sasl username for authentication + pub sasl_username: Option, + /// The sasl password for authentication + pub sasl_password: Option, + /// The location to the CA certificate file + pub ssl_ca_location: Option, + /// The location to the certificate file + pub ssl_certificate_location: Option, + /// The location to the private key file + pub ssl_key_location: Option, +} + #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] pub enum DatabaseAdapter { @@ -351,6 +401,16 @@ pub struct Config { /// This is an u16 because 1) we don't want to allow signed numbers 2) it can be cast into i32 /// (which we use elsewhere) without error conditions. It doesn't actually have to be that small. pub raw_processing_deadline_duration: u16, + + /// Topic configurations. After normalization, this always contains + /// at least one entry (from legacy config or explicit kafka_topics). + #[serde(default)] + pub kafka_topics: BTreeMap, + + /// Kafka cluster configurations. + /// After normalization, this always contains at least the "default" cluster. + #[serde(default)] + pub kafka_clusters: BTreeMap, } impl Default for Config { @@ -448,6 +508,8 @@ impl Default for Config { raw_application: None, raw_taskname: None, raw_processing_deadline_duration: 30, + kafka_topics: BTreeMap::new(), + kafka_clusters: BTreeMap::new(), } } } @@ -459,17 +521,144 @@ impl Config { if let Some(path) = &args.config { builder = builder.merge(Yaml::file(path)); } - builder = builder.merge(Env::prefixed("TASKBROKER_")); - let config = builder.extract()?; + // Use split("__") to support nested config via envvars like: + // TASKBROKER_KAFKA_TOPICS__PROFILES__CLUSTER=my-cluster + builder = builder.merge(Env::prefixed("TASKBROKER_").split("__")); + let mut config: Config = builder.extract()?; + config.normalize_and_validate()?; Ok(config) } - /// Convert the application Config into rdkafka::ClientConfig + /// Normalize legacy single-topic config into the new multi-topic format, + /// then validate the result. + /// + /// Legacy fields (`kafka_topic`, `kafka_cluster`, etc.) are merged into + /// `kafka_topics`/`kafka_clusters`. New-style config takes precedence. + /// After this, `kafka_topics` and `kafka_clusters` are always populated. + fn normalize_and_validate(&mut self) -> Result<(), Box> { + const DEFAULT_CLUSTER: &str = "default"; + + // Build cluster config from legacy fields + let legacy_cluster = ClusterConfig { + address: self.kafka_cluster.clone(), + security_protocol: self.kafka_security_protocol.clone(), + sasl_mechanism: self.kafka_sasl_mechanism.clone(), + sasl_username: self.kafka_sasl_username.clone(), + sasl_password: self.kafka_sasl_password.clone(), + ssl_ca_location: self.kafka_ssl_ca_location.clone(), + ssl_certificate_location: self.kafka_ssl_certificate_location.clone(), + ssl_key_location: self.kafka_ssl_key_location.clone(), + }; + + // Add the legacy cluster (won't overwrite if "default" already exists) + self.kafka_clusters + .entry(DEFAULT_CLUSTER.to_owned()) + .or_insert(legacy_cluster); + + // Build topic config from legacy fields + let raw_config = if self.raw_mode { + Some(RawModeConfig { + namespace: self.raw_namespace.clone(), + application: self.raw_application.clone(), + taskname: self.raw_taskname.clone(), + processing_deadline_duration: Some(self.raw_processing_deadline_duration), + }) + } else { + None + }; + + let legacy_topic = TopicConfig { + cluster: DEFAULT_CLUSTER.to_owned(), + consumer_group: self.kafka_consumer_group.clone(), + produce_only: false, + raw: raw_config, + }; + + // Add legacy topic (new-style config takes precedence) + self.kafka_topics + .entry(self.kafka_topic.clone()) + .or_insert(legacy_topic); + + // Add retry topic if configured (only if not already present) + if let Some(ref retry_topic) = self.kafka_retry_topic { + self.kafka_topics + .entry(retry_topic.clone()) + .or_insert_with(|| TopicConfig { + cluster: DEFAULT_CLUSTER.to_owned(), + consumer_group: self.kafka_consumer_group.clone(), + produce_only: !self.kafka_consume_retry_topic, + raw: None, + }); + } + + // Validate cluster references + for (topic_name, topic_config) in &self.kafka_topics { + self.cluster(&topic_config.cluster).map_err(|_| { + Box::new(figment::Error::from(format!( + "topic '{}' references unknown cluster '{}'", + topic_name, topic_config.cluster + ))) + })?; + } + + // Validate exactly one consumable topic + self.consumable_topic()?; + + Ok(()) + } + + /// Get the single consumable topic and its config. + /// Returns an error if there are zero or multiple consumable topics. + pub fn consumable_topic(&self) -> Result<(&str, &TopicConfig), Box> { + let mut consumable = self + .kafka_topics + .iter() + .filter(|(_, cfg)| !cfg.produce_only); + + let first = consumable.next().ok_or_else(|| { + Box::new(figment::Error::from( + "no consumable topic configured (all topics have produce_only: true)".to_owned(), + )) + })?; + + if consumable.next().is_some() { + let count = self + .kafka_topics + .values() + .filter(|t| !t.produce_only) + .count(); + return Err(Box::new(figment::Error::from(format!( + "multi-topic consumption is not yet supported: {} consumable topics configured, maximum is 1", + count + )))); + } + + Ok((first.0.as_str(), first.1)) + } + + /// Get cluster config by name. + /// Returns an error if the cluster doesn't exist. + pub fn cluster(&self, name: &str) -> Result<&ClusterConfig, Box> { + self.kafka_clusters + .get(name) + .ok_or_else(|| Box::new(figment::Error::from(format!("unknown cluster: {}", name)))) + } + + /// Convert the application Config into rdkafka::ClientConfig for consumer. + /// Uses the single consumable topic's cluster. + /// Panics if config wasn't validated (call from_args, not extract directly). pub fn kafka_consumer_config(&self) -> ClientConfig { + let (_, topic_config) = self + .consumable_topic() + .expect("consumable_topic failed - was config validated?"); + let cluster = self + .cluster(&topic_config.cluster) + .expect("cluster lookup failed - was config validated?"); + let mut new_config = ClientConfig::new(); let config = new_config - .set("bootstrap.servers", self.kafka_cluster.clone()) - .set("group.id", self.kafka_consumer_group.clone()) + .set("bootstrap.servers", cluster.address.clone()) + .set("group.id", topic_config.consumer_group.clone()) .set( "session.timeout.ms", self.kafka_session_timeout_ms.to_string(), @@ -486,25 +675,25 @@ impl Config { ) .set("enable.auto.offset.store", "false"); - if let Some(sasl_mechanism) = &self.kafka_sasl_mechanism { + if let Some(ref sasl_mechanism) = cluster.sasl_mechanism { config.set("sasl.mechanism", sasl_mechanism); } - if let Some(sasl_username) = &self.kafka_sasl_username { + if let Some(ref sasl_username) = cluster.sasl_username { config.set("sasl.username", sasl_username); } - if let Some(sasl_password) = &self.kafka_sasl_password { + if let Some(ref sasl_password) = cluster.sasl_password { config.set("sasl.password", sasl_password); } - if let Some(security_protocol) = &self.kafka_security_protocol { + if let Some(ref security_protocol) = cluster.security_protocol { config.set("security.protocol", security_protocol); } - if let Some(ssl_ca_location) = &self.kafka_ssl_ca_location { + if let Some(ref ssl_ca_location) = cluster.ssl_ca_location { config.set("ssl.ca.location", ssl_ca_location); } - if let Some(ssl_certificate_location) = &self.kafka_ssl_certificate_location { + if let Some(ref ssl_certificate_location) = cluster.ssl_certificate_location { config.set("ssl.certificate.location", ssl_certificate_location); } - if let Some(ssl_private_key_location) = &self.kafka_ssl_key_location { + if let Some(ref ssl_private_key_location) = cluster.ssl_key_location { config.set("ssl.key.location", ssl_private_key_location); } @@ -936,4 +1125,311 @@ mod tests { Ok(()) }); } + + #[test] + fn test_multi_topic_config_from_yaml() { + use super::{ClusterConfig, RawModeConfig}; + + Jail::expect_with(|jail| { + // Set kafka_topic to match the consumable topic so they merge + jail.create_file( + "config.yaml", + r#" +kafka_topic: profiles + +kafka_topics: + profiles: + cluster: profiles-cluster + consumer_group: taskbroker-profiles + raw: + application: profiles + profiles-retry: + cluster: profiles-cluster + consumer_group: taskbroker-profiles-retry + produce_only: true + +kafka_clusters: + profiles-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let config = Config::from_args(&args).unwrap(); + + let topics = &config.kafka_topics; + // 2 topics from yaml, legacy kafka_topic merges with "profiles" + assert_eq!(topics.len(), 2); + + let profiles = topics.get("profiles").unwrap(); + assert_eq!(profiles.cluster, "profiles-cluster"); + assert_eq!(profiles.consumer_group, "taskbroker-profiles"); + assert!(!profiles.produce_only); + assert_eq!( + profiles.raw, + Some(RawModeConfig { + namespace: None, + application: Some("profiles".to_owned()), + taskname: None, + processing_deadline_duration: None, + }) + ); + + let retry = topics.get("profiles-retry").unwrap(); + assert!(retry.produce_only); + + let clusters = &config.kafka_clusters; + // 2 clusters: profiles-cluster from yaml + default from legacy kafka_cluster + assert_eq!(clusters.len(), 2); + assert_eq!( + clusters.get("profiles-cluster"), + Some(&ClusterConfig { + address: "10.0.0.1:9092".to_owned(), + security_protocol: None, + sasl_mechanism: None, + sasl_username: None, + sasl_password: None, + ssl_ca_location: None, + ssl_certificate_location: None, + ssl_key_location: None, + }) + ); + // Legacy "default" cluster also exists + assert!(clusters.contains_key("default")); + + // Test consumable_topic() and cluster() helpers + let (topic_name, topic_config) = config.consumable_topic().unwrap(); + assert_eq!(topic_name, "profiles"); + assert_eq!(topic_config.cluster, "profiles-cluster"); + + let cluster = config.cluster("profiles-cluster").unwrap(); + assert_eq!(cluster.address, "10.0.0.1:9092"); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_config_from_env() { + Jail::expect_with(|jail| { + // Set kafka_topic to match the new topic so they merge + jail.set_env("TASKBROKER_KAFKA_TOPIC", "profiles"); + // Note: figment lowercases env var keys after splitting on "__", + // so MY_CLUSTER becomes my_cluster (with underscore, not hyphen). + // The cluster reference value is preserved as-is. + jail.set_env("TASKBROKER_KAFKA_TOPICS__PROFILES__CLUSTER", "my_cluster"); + jail.set_env( + "TASKBROKER_KAFKA_TOPICS__PROFILES__CONSUMER_GROUP", + "taskbroker-profiles", + ); + jail.set_env( + "TASKBROKER_KAFKA_CLUSTERS__MY_CLUSTER__ADDRESS", + "10.0.0.2:9092", + ); + + let args = Args { config: None }; + let config = Config::from_args(&args).unwrap(); + + let topics = &config.kafka_topics; + // "profiles" from env + legacy kafka_topic merges with it + assert_eq!(topics.len(), 1); + + let profiles = topics.get("profiles").unwrap(); + assert_eq!(profiles.cluster, "my_cluster"); + assert_eq!(profiles.consumer_group, "taskbroker-profiles"); + + let clusters = &config.kafka_clusters; + assert_eq!(clusters.get("my_cluster").unwrap().address, "10.0.0.2:9092"); + // Legacy "default" cluster also exists + assert!(clusters.contains_key("default")); + + // Test consumable_topic() helper + let (topic_name, _) = config.consumable_topic().unwrap(); + assert_eq!(topic_name, "profiles"); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_unknown_cluster_reference() { + // When kafka_topics references a cluster that doesn't exist in kafka_clusters, + // validation should fail. The "default" cluster is always added from legacy config, + // but custom cluster references must exist. + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_topic: profiles +kafka_topics: + profiles: + cluster: nonexistent-cluster + consumer_group: taskbroker-profiles +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains("unknown cluster"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_validates_cluster_references() { + Jail::expect_with(|jail| { + // Set kafka_topic to match the profile so legacy topic merges + jail.create_file( + "config.yaml", + r#" +kafka_topic: profiles +kafka_topics: + profiles: + cluster: nonexistent-cluster + consumer_group: taskbroker-profiles + +kafka_clusters: + other-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains("unknown cluster"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_rejects_multiple_consumable_topics() { + Jail::expect_with(|jail| { + // Two consumable topics in kafka_topics - should fail even with legacy merging + // because both profiles and subscriptions are consumable + jail.create_file( + "config.yaml", + r#" +kafka_topic: profiles +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + subscriptions: + cluster: my-cluster + consumer_group: taskbroker-subscriptions + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string() + .contains("multi-topic consumption is not yet supported"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_allows_one_consumable_with_produce_only() { + Jail::expect_with(|jail| { + // One consumable topic (profiles), one produce-only (profiles-retry) + // Legacy kafka_topic merges with profiles + jail.create_file( + "config.yaml", + r#" +kafka_topic: profiles +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + profiles-retry: + cluster: my-cluster + consumer_group: taskbroker-profiles-retry + produce_only: true + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let config = Config::from_args(&args).unwrap(); + + let topics = &config.kafka_topics; + assert_eq!(topics.len(), 2); + + // One consumable, one produce-only + assert!(!topics.get("profiles").unwrap().produce_only); + assert!(topics.get("profiles-retry").unwrap().produce_only); + + // consumable_topic() returns the one consumable topic + let (topic_name, _) = config.consumable_topic().unwrap(); + assert_eq!(topic_name, "profiles"); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_rejects_zero_consumable_topics() { + Jail::expect_with(|jail| { + // All topics are produce-only - should fail + jail.create_file( + "config.yaml", + r#" +kafka_topic: profiles +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + produce_only: true + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains("no consumable topic"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } } From da5f1d425b36a32d0510b334bb943aed4edfd369 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 12:44:06 +0200 Subject: [PATCH 02/17] Actually deprecate the old fields with warnings --- src/config.rs | 185 ++++++++++++++++++++------------ src/kafka/activation_batcher.rs | 16 ++- src/kafka/deserialize_raw.rs | 5 +- src/main.rs | 10 +- src/test_utils.rs | 20 ++-- src/upkeep.rs | 19 +++- 6 files changed, 169 insertions(+), 86 deletions(-) diff --git a/src/config.rs b/src/config.rs index 1b099c68..3a1e4281 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,6 +6,7 @@ use figment::providers::{Env, Format, Yaml}; use figment::{Figment, Metadata, Profile, Provider}; use rdkafka::ClientConfig; use serde::{Deserialize, Serialize}; +use tracing::warn; use crate::Args; use crate::logging::LogFormat; @@ -115,37 +116,47 @@ pub struct Config { /// We support a list of secrets to allow for key rotation. pub grpc_shared_secret: Vec, - /// Comma separated list of kafka brokers to connect to - pub kafka_cluster: String, + /// Comma separated list of kafka brokers to connect to. + /// Deprecated: use kafka_clusters instead. + pub kafka_cluster: Option, - /// The kafka consumer group name - pub kafka_consumer_group: String, + /// The kafka consumer group name. + /// Deprecated: use kafka_topics instead. + pub kafka_consumer_group: Option, /// The topic to fetch task messages from. - pub kafka_topic: String, + /// Deprecated: use kafka_topics instead. + pub kafka_topic: Option, /// The topic to produce demoted "long" namespace tasks to. pub kafka_long_topic: String, - /// The security method used for authentication eg. sasl_plaintext + /// The security method used for authentication eg. sasl_plaintext. + /// Deprecated: use kafka_clusters instead. pub kafka_security_protocol: Option, - /// The hashing algorithm used for authentication eg. scram-sha-256 + /// The hashing algorithm used for authentication eg. scram-sha-256. + /// Deprecated: use kafka_clusters instead. pub kafka_sasl_mechanism: Option, - /// The sasl username for ingesting messages + /// The sasl username for ingesting messages. + /// Deprecated: use kafka_clusters instead. pub kafka_sasl_username: Option, - /// The sasl password for ingesting messages + /// The sasl password for ingesting messages. + /// Deprecated: use kafka_clusters instead. pub kafka_sasl_password: Option, - /// The location to the CA certificate file + /// The location to the CA certificate file. + /// Deprecated: use kafka_clusters instead. pub kafka_ssl_ca_location: Option, - /// The location to the certificate file + /// The location to the certificate file. + /// Deprecated: use kafka_clusters instead. pub kafka_ssl_certificate_location: Option, - /// The location to the private key file + /// The location to the private key file. + /// Deprecated: use kafka_clusters instead. pub kafka_ssl_key_location: Option, /// Whether to create missing topics if they don't exist. @@ -426,8 +437,8 @@ impl Default for Config { grpc_shared_secret: vec![], statsd_addr: "127.0.0.1:8126".parse().unwrap(), default_metrics_tags: Default::default(), - kafka_cluster: "127.0.0.1:9092".to_owned(), - kafka_consumer_group: "taskworker".to_owned(), + kafka_cluster: Some("127.0.0.1:9092".to_owned()), + kafka_consumer_group: Some("taskworker".to_owned()), kafka_sasl_mechanism: None, kafka_sasl_username: None, kafka_sasl_password: None, @@ -435,7 +446,7 @@ impl Default for Config { kafka_ssl_certificate_location: None, kafka_ssl_key_location: None, kafka_security_protocol: None, - kafka_topic: "taskworker".to_owned(), + kafka_topic: Some("taskworker".to_owned()), kafka_long_topic: "taskworker-long".to_owned(), create_missing_topics: false, kafka_deadletter_cluster: None, @@ -538,57 +549,87 @@ impl Config { fn normalize_and_validate(&mut self) -> Result<(), Box> { const DEFAULT_CLUSTER: &str = "default"; - // Build cluster config from legacy fields - let legacy_cluster = ClusterConfig { - address: self.kafka_cluster.clone(), - security_protocol: self.kafka_security_protocol.clone(), - sasl_mechanism: self.kafka_sasl_mechanism.clone(), - sasl_username: self.kafka_sasl_username.clone(), - sasl_password: self.kafka_sasl_password.clone(), - ssl_ca_location: self.kafka_ssl_ca_location.clone(), - ssl_certificate_location: self.kafka_ssl_certificate_location.clone(), - ssl_key_location: self.kafka_ssl_key_location.clone(), - }; + // Validate that legacy fields are used together + match (&self.kafka_cluster, &self.kafka_topic) { + (Some(_), None) => { + return Err(Box::new(figment::Error::from( + "kafka_cluster is set but kafka_topic is not; \ + either set both or use kafka_clusters/kafka_topics instead" + .to_owned(), + ))); + } + (None, Some(_)) => { + return Err(Box::new(figment::Error::from( + "kafka_topic is set but kafka_cluster is not; \ + either set both or use kafka_clusters/kafka_topics instead" + .to_owned(), + ))); + } + _ => {} + } - // Add the legacy cluster (won't overwrite if "default" already exists) - self.kafka_clusters - .entry(DEFAULT_CLUSTER.to_owned()) - .or_insert(legacy_cluster); - - // Build topic config from legacy fields - let raw_config = if self.raw_mode { - Some(RawModeConfig { - namespace: self.raw_namespace.clone(), - application: self.raw_application.clone(), - taskname: self.raw_taskname.clone(), - processing_deadline_duration: Some(self.raw_processing_deadline_duration), - }) - } else { - None - }; + // Build cluster config from legacy fields (if present) + if let Some(ref address) = self.kafka_cluster { + warn!("kafka_cluster is deprecated, use kafka_clusters instead"); + let legacy_cluster = ClusterConfig { + address: address.clone(), + security_protocol: self.kafka_security_protocol.clone(), + sasl_mechanism: self.kafka_sasl_mechanism.clone(), + sasl_username: self.kafka_sasl_username.clone(), + sasl_password: self.kafka_sasl_password.clone(), + ssl_ca_location: self.kafka_ssl_ca_location.clone(), + ssl_certificate_location: self.kafka_ssl_certificate_location.clone(), + ssl_key_location: self.kafka_ssl_key_location.clone(), + }; - let legacy_topic = TopicConfig { - cluster: DEFAULT_CLUSTER.to_owned(), - consumer_group: self.kafka_consumer_group.clone(), - produce_only: false, - raw: raw_config, - }; + // Add the legacy cluster (won't overwrite if "default" already exists) + self.kafka_clusters + .entry(DEFAULT_CLUSTER.to_owned()) + .or_insert(legacy_cluster); + } - // Add legacy topic (new-style config takes precedence) - self.kafka_topics - .entry(self.kafka_topic.clone()) - .or_insert(legacy_topic); + // Build topic config from legacy fields (if present) + if let Some(ref topic_name) = self.kafka_topic { + warn!("kafka_topic is deprecated, use kafka_topics instead"); + let consumer_group = self + .kafka_consumer_group + .clone() + .unwrap_or_else(|| "taskworker".to_owned()); - // Add retry topic if configured (only if not already present) - if let Some(ref retry_topic) = self.kafka_retry_topic { + let raw_config = if self.raw_mode { + Some(RawModeConfig { + namespace: self.raw_namespace.clone(), + application: self.raw_application.clone(), + taskname: self.raw_taskname.clone(), + processing_deadline_duration: Some(self.raw_processing_deadline_duration), + }) + } else { + None + }; + + let legacy_topic = TopicConfig { + cluster: DEFAULT_CLUSTER.to_owned(), + consumer_group: consumer_group.clone(), + produce_only: false, + raw: raw_config, + }; + + // Add legacy topic (new-style config takes precedence) self.kafka_topics - .entry(retry_topic.clone()) - .or_insert_with(|| TopicConfig { - cluster: DEFAULT_CLUSTER.to_owned(), - consumer_group: self.kafka_consumer_group.clone(), - produce_only: !self.kafka_consume_retry_topic, - raw: None, - }); + .entry(topic_name.clone()) + .or_insert(legacy_topic); + + // Add retry topic if configured (only if not already present) + if let Some(ref retry_topic) = self.kafka_retry_topic { + self.kafka_topics + .entry(retry_topic.clone()) + .or_insert_with(|| TopicConfig { + cluster: DEFAULT_CLUSTER.to_owned(), + consumer_group, + produce_only: !self.kafka_consume_retry_topic, + raw: None, + }); + } } // Validate cluster references @@ -701,14 +742,24 @@ impl Config { } /// Convert the application Config into rdkafka::ClientConfig + /// Convert the application Config into rdkafka::ClientConfig for producer (DLQ). + /// Falls back to consumable topic's cluster if kafka_deadletter_cluster is not set. + /// Panics if config wasn't validated. pub fn kafka_producer_config(&self) -> ClientConfig { + let (_, topic_config) = self + .consumable_topic() + .expect("consumable_topic failed - was config validated?"); + let default_cluster = self + .cluster(&topic_config.cluster) + .expect("cluster lookup failed - was config validated?"); + let mut new_config = ClientConfig::new(); let config = new_config .set( "bootstrap.servers", self.kafka_deadletter_cluster .as_ref() - .unwrap_or(&self.kafka_cluster), + .unwrap_or(&default_cluster.address), ) .set("message.max.bytes", format!("{}", self.max_message_size)); if let Some(sasl_mechanism) = &self.kafka_deadletter_sasl_mechanism { @@ -769,7 +820,7 @@ mod tests { assert_eq!(config.log_filter, "info,librdkafka=warn,h2=off"); assert_eq!(config.log_format, LogFormat::Text); assert_eq!(config.grpc_port, 50051); - assert_eq!(config.kafka_topic, "taskworker"); + assert_eq!(config.kafka_topic, Some("taskworker".to_owned())); assert_eq!(config.db_path, "./taskbroker-inflight.sqlite"); assert_eq!(config.max_pending_count, 2048); assert_eq!(config.max_processing_count, 2048); @@ -823,16 +874,16 @@ mod tests { assert_eq!(config.log_format, LogFormat::Json); assert_eq!( config.kafka_cluster, - "10.0.0.1:9092,10.0.0.2:9092".to_owned() + Some("10.0.0.1:9092,10.0.0.2:9092".to_owned()) ); assert_eq!( config.default_metrics_tags, BTreeMap::from([("key_1".to_owned(), "value_1".to_owned())]) ); - assert_eq!(config.kafka_consumer_group, "taskworker".to_owned()); + assert_eq!(config.kafka_consumer_group, Some("taskworker".to_owned())); assert_eq!(config.kafka_auto_offset_reset, "earliest".to_owned()); assert_eq!(config.kafka_session_timeout_ms, 6000.to_owned()); - assert_eq!(config.kafka_topic, "error-tasks".to_owned()); + assert_eq!(config.kafka_topic, Some("error-tasks".to_owned())); assert_eq!(config.kafka_deadletter_topic, "error-tasks-dlq".to_owned()); assert_eq!(config.database_adapter, DatabaseAdapter::Postgres); assert_eq!(config.db_path, "./taskbroker-error.sqlite".to_owned()); @@ -886,7 +937,7 @@ mod tests { assert_eq!(config.sentry_dsn, None); assert_eq!(config.sentry_env, None); assert_eq!(config.log_filter, "error"); - assert_eq!(config.kafka_topic, "taskworker".to_owned()); + assert_eq!(config.kafka_topic, Some("taskworker".to_owned())); assert_eq!(config.kafka_deadletter_topic, "taskworker-dlq".to_owned()); assert_eq!(config.db_path, "./taskbroker-inflight.sqlite".to_owned()); assert_eq!(config.max_pending_count, 2048); diff --git a/src/kafka/activation_batcher.rs b/src/kafka/activation_batcher.rs index 4449db35..b1546267 100644 --- a/src/kafka/activation_batcher.rs +++ b/src/kafka/activation_batcher.rs @@ -31,10 +31,17 @@ pub struct ActivationBatcherConfig { impl ActivationBatcherConfig { /// Convert from application configuration into ActivationBatcher config. pub fn from_config(config: &Config) -> Self { + let (topic_name, topic_config) = config + .consumable_topic() + .expect("no consumable topic configured"); + let cluster = config + .cluster(&topic_config.cluster) + .expect("cluster not found"); + Self { producer_config: config.kafka_producer_config(), - kafka_cluster: config.kafka_cluster.clone(), - kafka_topic: config.kafka_topic.clone(), + kafka_cluster: cluster.address.clone(), + kafka_topic: topic_name.to_owned(), kafka_long_topic: config.kafka_long_topic.clone(), send_timeout_ms: config.kafka_send_timeout_ms, max_batch_time_ms: config.db_insert_batch_max_time_ms, @@ -367,7 +374,10 @@ demoted_topic: taskworker-demoted"#; runtime_config, ); - assert_eq!(batcher.producer_cluster, config.kafka_cluster.clone()); + assert_eq!( + batcher.producer_cluster, + config.kafka_cluster.clone().unwrap() + ); let activation_0 = ActivationBuilder::new() .id("0") diff --git a/src/kafka/deserialize_raw.rs b/src/kafka/deserialize_raw.rs index d885de9f..89577850 100644 --- a/src/kafka/deserialize_raw.rs +++ b/src/kafka/deserialize_raw.rs @@ -45,8 +45,11 @@ impl RawConfig { ); if let Some(ref retry_topic) = config.kafka_retry_topic { + let (main_topic, _) = config + .consumable_topic() + .expect("no consumable topic configured"); assert!( - retry_topic != &config.kafka_topic, + retry_topic != main_topic, "kafka_retry_topic cannot equal kafka_topic when raw_mode is enabled" ); } diff --git a/src/main.rs b/src/main.rs index 8dd4a99d..77753b53 100644 --- a/src/main.rs +++ b/src/main.rs @@ -76,9 +76,12 @@ async fn main() -> Result<(), Error> { // If this is an environment where the topics might not exist, check and create them. if config.create_missing_topics { let kafka_client_config = config.kafka_consumer_config(); + let (main_topic, _) = config + .consumable_topic() + .map_err(|e| anyhow!("invalid config: {}", e))?; create_missing_topics( kafka_client_config.clone(), - &config.kafka_topic, + main_topic, config.default_topic_partitions, ) .await?; @@ -161,7 +164,10 @@ async fn main() -> Result<(), Error> { let runtime_config_manager = runtime_config_manager.clone(); // Build list of topics to consume from - let mut topics_to_consume = vec![consumer_config.kafka_topic.clone()]; + let (main_topic, _) = consumer_config + .consumable_topic() + .expect("invalid config: no consumable topic"); + let mut topics_to_consume = vec![main_topic.to_owned()]; if consumer_config.kafka_consume_retry_topic && let Some(ref retry_topic) = consumer_config.kafka_retry_topic { diff --git a/src/test_utils.rs b/src/test_utils.rs index 3dcbe887..d6940827 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -306,7 +306,9 @@ pub fn create_integration_config() -> Arc { pg_password: get_pg_password(), pg_database_name: get_pg_database_name(), run_migrations: true, - kafka_topic: "taskbroker-test".into(), + kafka_topic: Some("taskbroker-test".into()), + kafka_cluster: Some("127.0.0.1:9092".into()), + kafka_consumer_group: Some("taskworker".into()), kafka_auto_offset_reset: "earliest".into(), ..Config::default() }; @@ -326,7 +328,9 @@ pub fn create_integration_config_with_ssl() -> Arc { pg_database_name: get_pg_database_name(), pg_extra_query_params: Some("sslmode=require".to_string()), run_migrations: true, - kafka_topic: "taskbroker-test".into(), + kafka_topic: Some("taskbroker-test".into()), + kafka_cluster: Some("127.0.0.1:9092".into()), + kafka_consumer_group: Some("taskworker".into()), kafka_auto_offset_reset: "earliest".into(), ..Config::default() }; @@ -342,7 +346,9 @@ pub fn create_integration_config_with_topic(topic: String) -> Config { pg_password: get_pg_password(), pg_database_name: get_pg_database_name(), run_migrations: true, - kafka_topic: topic, + kafka_topic: Some(topic), + kafka_cluster: Some("127.0.0.1:9092".into()), + kafka_consumer_group: Some("taskworker".into()), kafka_auto_offset_reset: "earliest".into(), ..Config::default() } @@ -365,15 +371,13 @@ pub async fn reset_topic(config: Arc) { .create() .expect("Could not create admin client"); + let (main_topic, _) = config.consumable_topic().expect("no consumable topic"); let options = AdminOptions::default(); admin_client - .delete_topics( - &[config.kafka_topic.as_ref(), &config.kafka_deadletter_topic], - &options, - ) + .delete_topics(&[main_topic, &config.kafka_deadletter_topic], &options) .await .expect("Could not delete topic"); - let new_topic = NewTopic::new(&config.kafka_topic, 1, TopicReplication::Fixed(0)); + let new_topic = NewTopic::new(main_topic, 1, TopicReplication::Fixed(0)); let new_dlq_topic = NewTopic::new( &config.kafka_deadletter_topic, 1, diff --git a/src/upkeep.rs b/src/upkeep.rs index faee0ff1..ed869415 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -156,10 +156,12 @@ pub async fn do_upkeep( let handle_retries_start = Instant::now(); if let Ok(retries) = store.get_retry_activations().await { // Use retry topic if configured, otherwise fall back to main topic + let (main_topic, _) = config.consumable_topic().expect("no consumable topic"); + let main_topic_owned = main_topic.to_owned(); let retry_target_topic = config .kafka_retry_topic .as_ref() - .unwrap_or(&config.kafka_topic); + .unwrap_or(&main_topic_owned); // 2. Append retries to kafka let deliveries = retries @@ -320,16 +322,22 @@ pub async fn do_upkeep( // 12. Forward tasks from demoted namespaces to `runtime_config.demoted_topic` let demoted_namespaces = runtime_config.demoted_namespaces.clone(); + let (main_topic, main_topic_config) = config.consumable_topic().expect("no consumable topic"); + let main_cluster = config + .cluster(&main_topic_config.cluster) + .expect("cluster not found") + .address + .clone(); let forward_cluster = runtime_config .demoted_topic_cluster .clone() - .unwrap_or(config.kafka_cluster.clone()); + .unwrap_or(main_cluster.clone()); let forward_topic = runtime_config .demoted_topic .clone() .unwrap_or(config.kafka_long_topic.clone()); - let same_cluster = forward_cluster == config.kafka_cluster; - let same_topic = forward_topic == config.kafka_topic; + let same_cluster = forward_cluster == main_cluster; + let same_topic = forward_topic == main_topic; if !(demoted_namespaces.is_empty() || (same_cluster && same_topic)) { let forward_demoted_start = Instant::now(); let mut forward_producer_config = config.kafka_producer_config(); @@ -742,7 +750,8 @@ mod tests { assert_eq!(store.count().await.unwrap(), 1); assert_eq!(result_context.retried, 1); - let messages = consume_topic(config.clone(), config.kafka_topic.as_ref(), 1).await; + let (main_topic, _) = config.consumable_topic().unwrap(); + let messages = consume_topic(config.clone(), main_topic, 1).await; assert_eq!(messages.len(), 1); let activation = &messages[0]; From 9350108f082148fb8cc7fdf0de64f44e5766d6a4 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 13:17:47 +0200 Subject: [PATCH 03/17] Wire up the new fields to application, remove kafka_consume_retry_topic again, deprecate old fields --- src/config.rs | 283 +++++++++++++++++++++----------- src/kafka/activation_batcher.rs | 35 ++-- src/main.rs | 7 +- src/test_utils.rs | 19 ++- 4 files changed, 222 insertions(+), 122 deletions(-) diff --git a/src/config.rs b/src/config.rs index 3a1e4281..643ec737 100644 --- a/src/config.rs +++ b/src/config.rs @@ -117,15 +117,19 @@ pub struct Config { pub grpc_shared_secret: Vec, /// Comma separated list of kafka brokers to connect to. - /// Deprecated: use kafka_clusters instead. + /// Deprecated: use kafka_clusters instead. Mutually exclusive with the new + /// format; defaults to None (the historical "127.0.0.1:9092" default is + /// applied during normalization when no kafka config is provided at all). pub kafka_cluster: Option, /// The kafka consumer group name. - /// Deprecated: use kafka_topics instead. + /// Deprecated: use kafka_topics instead. Mutually exclusive with the new + /// format; defaults to None (the historical "taskworker" default applies). pub kafka_consumer_group: Option, /// The topic to fetch task messages from. - /// Deprecated: use kafka_topics instead. + /// Deprecated: use kafka_topics instead. Mutually exclusive with the new + /// format; defaults to None (the historical "taskworker" default applies). pub kafka_topic: Option, /// The topic to produce demoted "long" namespace tasks to. @@ -195,12 +199,6 @@ pub struct Config { /// Required for raw_mode where the main topic has other consumers. pub kafka_retry_topic: Option, - /// Whether to consume from the retry topic. - /// When false (default), this taskbroker only produces to the retry topic - /// but does not consume from it, allowing the retry topic to be shared - /// across multiple taskbroker instances. - pub kafka_consume_retry_topic: bool, - /// The default number of partitions for a topic pub default_topic_partitions: i32, @@ -425,6 +423,9 @@ pub struct Config { } impl Default for Config { + /// Field defaults. `kafka_topics`/`kafka_clusters` are left empty; call + /// [`Config::normalize_and_validate`] (as `from_args` does) to populate + /// them from the legacy fields before using the kafka helpers. fn default() -> Self { Self { sentry_dsn: None, @@ -437,8 +438,8 @@ impl Default for Config { grpc_shared_secret: vec![], statsd_addr: "127.0.0.1:8126".parse().unwrap(), default_metrics_tags: Default::default(), - kafka_cluster: Some("127.0.0.1:9092".to_owned()), - kafka_consumer_group: Some("taskworker".to_owned()), + kafka_cluster: None, + kafka_consumer_group: None, kafka_sasl_mechanism: None, kafka_sasl_username: None, kafka_sasl_password: None, @@ -446,7 +447,7 @@ impl Default for Config { kafka_ssl_certificate_location: None, kafka_ssl_key_location: None, kafka_security_protocol: None, - kafka_topic: Some("taskworker".to_owned()), + kafka_topic: None, kafka_long_topic: "taskworker-long".to_owned(), create_missing_topics: false, kafka_deadletter_cluster: None, @@ -459,7 +460,6 @@ impl Default for Config { kafka_deadletter_ssl_certificate_location: None, kafka_deadletter_ssl_key_location: None, kafka_retry_topic: None, - kafka_consume_retry_topic: false, default_topic_partitions: 1, kafka_session_timeout_ms: 6000, kafka_auto_commit_interval_ms: 5000, @@ -540,61 +540,111 @@ impl Config { Ok(config) } - /// Normalize legacy single-topic config into the new multi-topic format, - /// then validate the result. + /// Normalize the legacy single-topic config into the new multi-topic + /// format, then validate the result. /// - /// Legacy fields (`kafka_topic`, `kafka_cluster`, etc.) are merged into - /// `kafka_topics`/`kafka_clusters`. New-style config takes precedence. - /// After this, `kafka_topics` and `kafka_clusters` are always populated. - fn normalize_and_validate(&mut self) -> Result<(), Box> { + /// The legacy fields (`kafka_topic`, `kafka_cluster`, etc.) and the new + /// fields (`kafka_topics`, `kafka_clusters`) are mutually exclusive: mixing + /// them is a hard error. When only legacy fields are used (including the + /// zero-config case, where the historical `taskworker` defaults apply), they + /// are normalized into `kafka_topics`/`kafka_clusters`. After this, + /// `kafka_topics` and `kafka_clusters` are always populated. + pub(crate) fn normalize_and_validate(&mut self) -> Result<(), Box> { const DEFAULT_CLUSTER: &str = "default"; + const DEFAULT_TOPIC: &str = "taskworker"; + const DEFAULT_CLUSTER_ADDRESS: &str = "127.0.0.1:9092"; + const DEFAULT_CONSUMER_GROUP: &str = "taskworker"; + + let uses_new_format = !self.kafka_topics.is_empty() || !self.kafka_clusters.is_empty(); + // Any explicitly-set legacy field tied to the main consumed cluster. + // (Deadletter fields are not legacy duals and are intentionally excluded.) + let uses_legacy = self.kafka_topic.is_some() + || self.kafka_cluster.is_some() + || self.kafka_consumer_group.is_some() + || self.kafka_security_protocol.is_some() + || self.kafka_sasl_mechanism.is_some() + || self.kafka_sasl_username.is_some() + || self.kafka_sasl_password.is_some() + || self.kafka_ssl_ca_location.is_some() + || self.kafka_ssl_certificate_location.is_some() + || self.kafka_ssl_key_location.is_some(); + + if uses_new_format && uses_legacy { + return Err(Box::new(figment::Error::from( + "cannot mix the deprecated kafka_topic/kafka_cluster/kafka_consumer_group \ + (and related kafka_sasl_*/kafka_ssl_* fields) with kafka_topics/kafka_clusters; \ + use one config format or the other" + .to_owned(), + ))); + } - // Validate that legacy fields are used together - match (&self.kafka_cluster, &self.kafka_topic) { - (Some(_), None) => { + if uses_new_format { + // New format: the maps are the source of truth. Require both halves + // so a topic always has a cluster to resolve against. + if self.kafka_topics.is_empty() { return Err(Box::new(figment::Error::from( - "kafka_cluster is set but kafka_topic is not; \ - either set both or use kafka_clusters/kafka_topics instead" - .to_owned(), + "kafka_clusters is set but kafka_topics is empty".to_owned(), ))); } - (None, Some(_)) => { + if self.kafka_clusters.is_empty() { return Err(Box::new(figment::Error::from( - "kafka_topic is set but kafka_cluster is not; \ - either set both or use kafka_clusters/kafka_topics instead" - .to_owned(), + "kafka_topics is set but kafka_clusters is empty".to_owned(), ))); } - _ => {} - } - - // Build cluster config from legacy fields (if present) - if let Some(ref address) = self.kafka_cluster { - warn!("kafka_cluster is deprecated, use kafka_clusters instead"); - let legacy_cluster = ClusterConfig { - address: address.clone(), - security_protocol: self.kafka_security_protocol.clone(), - sasl_mechanism: self.kafka_sasl_mechanism.clone(), - sasl_username: self.kafka_sasl_username.clone(), - sasl_password: self.kafka_sasl_password.clone(), - ssl_ca_location: self.kafka_ssl_ca_location.clone(), - ssl_certificate_location: self.kafka_ssl_certificate_location.clone(), - ssl_key_location: self.kafka_ssl_key_location.clone(), - }; + } else { + // Legacy / zero-config path. Legacy topic and cluster must be set + // together (or neither, in which case the historical defaults apply). + match (&self.kafka_cluster, &self.kafka_topic) { + (Some(_), None) => { + return Err(Box::new(figment::Error::from( + "kafka_cluster is set but kafka_topic is not; \ + either set both or use kafka_clusters/kafka_topics instead" + .to_owned(), + ))); + } + (None, Some(_)) => { + return Err(Box::new(figment::Error::from( + "kafka_topic is set but kafka_cluster is not; \ + either set both or use kafka_clusters/kafka_topics instead" + .to_owned(), + ))); + } + _ => {} + } - // Add the legacy cluster (won't overwrite if "default" already exists) - self.kafka_clusters - .entry(DEFAULT_CLUSTER.to_owned()) - .or_insert(legacy_cluster); - } + if self.kafka_cluster.is_some() { + warn!("kafka_cluster is deprecated, use kafka_clusters instead"); + } + if self.kafka_topic.is_some() { + warn!("kafka_topic is deprecated, use kafka_topics instead"); + } - // Build topic config from legacy fields (if present) - if let Some(ref topic_name) = self.kafka_topic { - warn!("kafka_topic is deprecated, use kafka_topics instead"); + let topic_name = self + .kafka_topic + .clone() + .unwrap_or_else(|| DEFAULT_TOPIC.to_owned()); + let address = self + .kafka_cluster + .clone() + .unwrap_or_else(|| DEFAULT_CLUSTER_ADDRESS.to_owned()); let consumer_group = self .kafka_consumer_group .clone() - .unwrap_or_else(|| "taskworker".to_owned()); + .unwrap_or_else(|| DEFAULT_CONSUMER_GROUP.to_owned()); + + self.kafka_clusters.insert( + DEFAULT_CLUSTER.to_owned(), + ClusterConfig { + address, + security_protocol: self.kafka_security_protocol.clone(), + sasl_mechanism: self.kafka_sasl_mechanism.clone(), + sasl_username: self.kafka_sasl_username.clone(), + sasl_password: self.kafka_sasl_password.clone(), + ssl_ca_location: self.kafka_ssl_ca_location.clone(), + ssl_certificate_location: self.kafka_ssl_certificate_location.clone(), + ssl_key_location: self.kafka_ssl_key_location.clone(), + }, + ); let raw_config = if self.raw_mode { Some(RawModeConfig { @@ -607,26 +657,26 @@ impl Config { None }; - let legacy_topic = TopicConfig { - cluster: DEFAULT_CLUSTER.to_owned(), - consumer_group: consumer_group.clone(), - produce_only: false, - raw: raw_config, - }; - - // Add legacy topic (new-style config takes precedence) - self.kafka_topics - .entry(topic_name.clone()) - .or_insert(legacy_topic); + self.kafka_topics.insert( + topic_name, + TopicConfig { + cluster: DEFAULT_CLUSTER.to_owned(), + consumer_group: consumer_group.clone(), + produce_only: false, + raw: raw_config, + }, + ); - // Add retry topic if configured (only if not already present) + // Add the retry topic if configured (unless it collides with the + // main topic). Retry topics are produce-only; this taskbroker writes + // retries to them but does not consume from them. if let Some(ref retry_topic) = self.kafka_retry_topic { self.kafka_topics .entry(retry_topic.clone()) .or_insert_with(|| TopicConfig { cluster: DEFAULT_CLUSTER.to_owned(), consumer_group, - produce_only: !self.kafka_consume_retry_topic, + produce_only: true, raw: None, }); } @@ -820,7 +870,12 @@ mod tests { assert_eq!(config.log_filter, "info,librdkafka=warn,h2=off"); assert_eq!(config.log_format, LogFormat::Text); assert_eq!(config.grpc_port, 50051); - assert_eq!(config.kafka_topic, Some("taskworker".to_owned())); + // The legacy kafka fields default to None now; the historical + // "taskworker" default is applied during normalization (see + // test_from_args_env_test). + assert_eq!(config.kafka_topic, None); + assert_eq!(config.kafka_cluster, None); + assert_eq!(config.kafka_consumer_group, None); assert_eq!(config.db_path, "./taskbroker-inflight.sqlite"); assert_eq!(config.max_pending_count, 2048); assert_eq!(config.max_processing_count, 2048); @@ -880,7 +935,12 @@ mod tests { config.default_metrics_tags, BTreeMap::from([("key_1".to_owned(), "value_1".to_owned())]) ); - assert_eq!(config.kafka_consumer_group, Some("taskworker".to_owned())); + // kafka_consumer_group is unset in the yaml, so the legacy field + // stays None and normalization applies the "taskworker" default. + assert_eq!(config.kafka_consumer_group, None); + let (topic_name, topic_config) = config.consumable_topic().unwrap(); + assert_eq!(topic_name, "error-tasks"); + assert_eq!(topic_config.consumer_group, "taskworker"); assert_eq!(config.kafka_auto_offset_reset, "earliest".to_owned()); assert_eq!(config.kafka_session_timeout_ms, 6000.to_owned()); assert_eq!(config.kafka_topic, Some("error-tasks".to_owned())); @@ -937,7 +997,15 @@ mod tests { assert_eq!(config.sentry_dsn, None); assert_eq!(config.sentry_env, None); assert_eq!(config.log_filter, "error"); - assert_eq!(config.kafka_topic, Some("taskworker".to_owned())); + // Zero-config: legacy fields stay None, but normalization applies + // the historical "taskworker" default as the consumable topic. + assert_eq!(config.kafka_topic, None); + let (topic_name, topic_config) = config.consumable_topic().unwrap(); + assert_eq!(topic_name, "taskworker"); + assert_eq!( + config.cluster(&topic_config.cluster).unwrap().address, + "127.0.0.1:9092" + ); assert_eq!(config.kafka_deadletter_topic, "taskworker-dlq".to_owned()); assert_eq!(config.db_path, "./taskbroker-inflight.sqlite".to_owned()); assert_eq!(config.max_pending_count, 2048); @@ -1182,12 +1250,9 @@ mod tests { use super::{ClusterConfig, RawModeConfig}; Jail::expect_with(|jail| { - // Set kafka_topic to match the consumable topic so they merge jail.create_file( "config.yaml", r#" -kafka_topic: profiles - kafka_topics: profiles: cluster: profiles-cluster @@ -1211,7 +1276,6 @@ kafka_clusters: let config = Config::from_args(&args).unwrap(); let topics = &config.kafka_topics; - // 2 topics from yaml, legacy kafka_topic merges with "profiles" assert_eq!(topics.len(), 2); let profiles = topics.get("profiles").unwrap(); @@ -1232,8 +1296,9 @@ kafka_clusters: assert!(retry.produce_only); let clusters = &config.kafka_clusters; - // 2 clusters: profiles-cluster from yaml + default from legacy kafka_cluster - assert_eq!(clusters.len(), 2); + // Only the explicitly-declared cluster exists; no legacy "default" + // cluster is injected when the new format is used. + assert_eq!(clusters.len(), 1); assert_eq!( clusters.get("profiles-cluster"), Some(&ClusterConfig { @@ -1247,8 +1312,7 @@ kafka_clusters: ssl_key_location: None, }) ); - // Legacy "default" cluster also exists - assert!(clusters.contains_key("default")); + assert!(!clusters.contains_key("default")); // Test consumable_topic() and cluster() helpers let (topic_name, topic_config) = config.consumable_topic().unwrap(); @@ -1265,8 +1329,6 @@ kafka_clusters: #[test] fn test_multi_topic_config_from_env() { Jail::expect_with(|jail| { - // Set kafka_topic to match the new topic so they merge - jail.set_env("TASKBROKER_KAFKA_TOPIC", "profiles"); // Note: figment lowercases env var keys after splitting on "__", // so MY_CLUSTER becomes my_cluster (with underscore, not hyphen). // The cluster reference value is preserved as-is. @@ -1284,7 +1346,6 @@ kafka_clusters: let config = Config::from_args(&args).unwrap(); let topics = &config.kafka_topics; - // "profiles" from env + legacy kafka_topic merges with it assert_eq!(topics.len(), 1); let profiles = topics.get("profiles").unwrap(); @@ -1292,9 +1353,9 @@ kafka_clusters: assert_eq!(profiles.consumer_group, "taskbroker-profiles"); let clusters = &config.kafka_clusters; + assert_eq!(clusters.len(), 1); assert_eq!(clusters.get("my_cluster").unwrap().address, "10.0.0.2:9092"); - // Legacy "default" cluster also exists - assert!(clusters.contains_key("default")); + assert!(!clusters.contains_key("default")); // Test consumable_topic() helper let (topic_name, _) = config.consumable_topic().unwrap(); @@ -1305,18 +1366,49 @@ kafka_clusters: } #[test] - fn test_multi_topic_unknown_cluster_reference() { - // When kafka_topics references a cluster that doesn't exist in kafka_clusters, - // validation should fail. The "default" cluster is always added from legacy config, - // but custom cluster references must exist. + fn test_rejects_mixing_legacy_and_new_format() { Jail::expect_with(|jail| { jail.create_file( "config.yaml", r#" -kafka_topic: profiles +kafka_topic: legacy-topic +kafka_cluster: 127.0.0.1:9092 + kafka_topics: profiles: - cluster: nonexistent-cluster + cluster: my-cluster + consumer_group: taskbroker-profiles + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains("cannot mix"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_requires_clusters() { + // kafka_topics without any kafka_clusters is a misconfiguration. + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_topics: + profiles: + cluster: my-cluster consumer_group: taskbroker-profiles "#, )?; @@ -1326,7 +1418,7 @@ kafka_topics: }; let err = Config::from_args(&args).unwrap_err(); assert!( - err.to_string().contains("unknown cluster"), + err.to_string().contains("kafka_clusters is empty"), "unexpected error: {}", err ); @@ -1338,11 +1430,9 @@ kafka_topics: #[test] fn test_multi_topic_validates_cluster_references() { Jail::expect_with(|jail| { - // Set kafka_topic to match the profile so legacy topic merges jail.create_file( "config.yaml", r#" -kafka_topic: profiles kafka_topics: profiles: cluster: nonexistent-cluster @@ -1371,12 +1461,10 @@ kafka_clusters: #[test] fn test_multi_topic_rejects_multiple_consumable_topics() { Jail::expect_with(|jail| { - // Two consumable topics in kafka_topics - should fail even with legacy merging - // because both profiles and subscriptions are consumable + // Two consumable topics - the guard for this PR rejects this. jail.create_file( "config.yaml", r#" -kafka_topic: profiles kafka_topics: profiles: cluster: my-cluster @@ -1409,12 +1497,10 @@ kafka_clusters: #[test] fn test_multi_topic_allows_one_consumable_with_produce_only() { Jail::expect_with(|jail| { - // One consumable topic (profiles), one produce-only (profiles-retry) - // Legacy kafka_topic merges with profiles + // One consumable topic (profiles), one produce-only (profiles-retry). jail.create_file( "config.yaml", r#" -kafka_topic: profiles kafka_topics: profiles: cluster: my-cluster @@ -1457,7 +1543,6 @@ kafka_clusters: jail.create_file( "config.yaml", r#" -kafka_topic: profiles kafka_topics: profiles: cluster: my-cluster diff --git a/src/kafka/activation_batcher.rs b/src/kafka/activation_batcher.rs index b1546267..a5a1ba73 100644 --- a/src/kafka/activation_batcher.rs +++ b/src/kafka/activation_batcher.rs @@ -248,7 +248,9 @@ demoted_namespaces: let runtime_config = Arc::new( RuntimeConfigManager::new(Some(config_file.path().to_str().unwrap().to_string())).await, ); - let config = Arc::new(Config::default()); + let mut config = Config::default(); + config.normalize_and_validate().unwrap(); + let config = Arc::new(config); let mut batcher = ActivationBatcher::new( ActivationBatcherConfig::from_config(&config), runtime_config, @@ -269,7 +271,9 @@ demoted_namespaces: #[tokio::test] async fn test_drop_task_due_to_expiry() { let runtime_config = Arc::new(RuntimeConfigManager::new(None).await); - let config = Arc::new(Config::default()); + let mut config = Config::default(); + config.normalize_and_validate().unwrap(); + let config = Arc::new(config); let mut batcher = ActivationBatcher::new( ActivationBatcherConfig::from_config(&config), runtime_config, @@ -291,11 +295,13 @@ demoted_namespaces: #[tokio::test] async fn test_close_by_bytes_limit() { let runtime_config = Arc::new(RuntimeConfigManager::new(None).await); - let config = Arc::new(Config { + let mut config = Config { db_insert_batch_max_size: 1, db_insert_batch_max_len: 2, ..Default::default() - }); + }; + config.normalize_and_validate().unwrap(); + let config = Arc::new(config); let mut batcher = ActivationBatcher::new( ActivationBatcherConfig::from_config(&config), @@ -319,11 +325,13 @@ demoted_namespaces: #[tokio::test] async fn test_close_by_rows_limit() { let runtime_config = Arc::new(RuntimeConfigManager::new(None).await); - let config = Arc::new(Config { + let mut config = Config { db_insert_batch_max_size: 100000, db_insert_batch_max_len: 2, ..Default::default() - }); + }; + config.normalize_and_validate().unwrap(); + let config = Arc::new(config); let mut batcher = ActivationBatcher::new( ActivationBatcherConfig::from_config(&config), @@ -368,16 +376,21 @@ demoted_topic: taskworker-demoted"#; let runtime_config = Arc::new( RuntimeConfigManager::new(Some(config_file.path().to_str().unwrap().to_string())).await, ); - let config = Arc::new(Config::default()); + let mut config = Config::default(); + config.normalize_and_validate().unwrap(); + let config = Arc::new(config); let mut batcher = ActivationBatcher::new( ActivationBatcherConfig::from_config(&config), runtime_config, ); - assert_eq!( - batcher.producer_cluster, - config.kafka_cluster.clone().unwrap() - ); + let (_, topic_config) = config.consumable_topic().unwrap(); + let cluster_address = config + .cluster(&topic_config.cluster) + .unwrap() + .address + .clone(); + assert_eq!(batcher.producer_cluster, cluster_address); let activation_0 = ActivationBuilder::new() .id("0") diff --git a/src/main.rs b/src/main.rs index 77753b53..edf59e4a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -167,12 +167,7 @@ async fn main() -> Result<(), Error> { let (main_topic, _) = consumer_config .consumable_topic() .expect("invalid config: no consumable topic"); - let mut topics_to_consume = vec![main_topic.to_owned()]; - if consumer_config.kafka_consume_retry_topic - && let Some(ref retry_topic) = consumer_config.kafka_retry_topic - { - topics_to_consume.push(retry_topic.clone()); - } + let topics_to_consume = vec![main_topic.to_owned()]; async move { // The consumer has an internal thread that listens for cancellations, so it doesn't need diff --git a/src/test_utils.rs b/src/test_utils.rs index d6940827..1b51530b 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -264,9 +264,12 @@ pub fn make_activations(count: u32) -> Vec { make_activations_with_namespace(namespace, count) } -/// Create a basic default [`Config`] +/// Create a basic default [`Config`], normalized so the kafka helpers +/// (`consumable_topic`, `cluster`, ...) are usable. pub fn create_config() -> Arc { - Arc::new(Config::default()) + let mut config = Config::default(); + config.normalize_and_validate().unwrap(); + Arc::new(config) } /// Create an ActivationStore instance @@ -299,7 +302,7 @@ pub async fn create_test_store(adapter: &str) -> Arc { /// and earliest auto_offset_reset. This is intended to be combined /// with [`reset_topic`] pub fn create_integration_config() -> Arc { - let config = Config { + let mut config = Config { pg_host: get_pg_host(), pg_port: get_pg_port(), pg_username: get_pg_username(), @@ -312,6 +315,7 @@ pub fn create_integration_config() -> Arc { kafka_auto_offset_reset: "earliest".into(), ..Config::default() }; + config.normalize_and_validate().unwrap(); Arc::new(config) } @@ -320,7 +324,7 @@ pub fn create_integration_config() -> Arc { /// and earliest auto_offset_reset. This is intended to be combined /// with [`reset_topic`] pub fn create_integration_config_with_ssl() -> Arc { - let config = Config { + let mut config = Config { pg_host: get_pg_host(), pg_port: get_pg_port(), pg_username: get_pg_username(), @@ -334,12 +338,13 @@ pub fn create_integration_config_with_ssl() -> Arc { kafka_auto_offset_reset: "earliest".into(), ..Config::default() }; + config.normalize_and_validate().unwrap(); Arc::new(config) } pub fn create_integration_config_with_topic(topic: String) -> Config { - Config { + let mut config = Config { pg_host: get_pg_host(), pg_port: get_pg_port(), pg_username: get_pg_username(), @@ -351,7 +356,9 @@ pub fn create_integration_config_with_topic(topic: String) -> Config { kafka_consumer_group: Some("taskworker".into()), kafka_auto_offset_reset: "earliest".into(), ..Config::default() - } + }; + config.normalize_and_validate().unwrap(); + config } /// Create a kafka producer for a given config From b0b8268ac3d50bb9ca7eaf0867bf2450538c14ed Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 13:20:57 +0200 Subject: [PATCH 04/17] Remove too restrictive validation --- src/config.rs | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/src/config.rs b/src/config.rs index 643ec737..55151a77 100644 --- a/src/config.rs +++ b/src/config.rs @@ -592,26 +592,6 @@ impl Config { ))); } } else { - // Legacy / zero-config path. Legacy topic and cluster must be set - // together (or neither, in which case the historical defaults apply). - match (&self.kafka_cluster, &self.kafka_topic) { - (Some(_), None) => { - return Err(Box::new(figment::Error::from( - "kafka_cluster is set but kafka_topic is not; \ - either set both or use kafka_clusters/kafka_topics instead" - .to_owned(), - ))); - } - (None, Some(_)) => { - return Err(Box::new(figment::Error::from( - "kafka_topic is set but kafka_cluster is not; \ - either set both or use kafka_clusters/kafka_topics instead" - .to_owned(), - ))); - } - _ => {} - } - if self.kafka_cluster.is_some() { warn!("kafka_cluster is deprecated, use kafka_clusters instead"); } From 1ccc32399dbd5ff8861d786c3e14295b586b7fc1 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 13:31:45 +0200 Subject: [PATCH 05/17] Migrate kafka_deadletter to the new format too --- src/config.rs | 264 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 221 insertions(+), 43 deletions(-) diff --git a/src/config.rs b/src/config.rs index 55151a77..48bfdfda 100644 --- a/src/config.rs +++ b/src/config.rs @@ -166,32 +166,42 @@ pub struct Config { /// Whether to create missing topics if they don't exist. pub create_missing_topics: bool, - /// Comma separated list of kafka brokers to - /// publish dead letter messages on + /// Comma separated list of kafka brokers to publish dead letter messages on. + /// Deprecated: declare the deadletter topic in kafka_topics (produce_only) + /// with a cluster reference instead. pub kafka_deadletter_cluster: Option, - /// The kafka topic to publish dead letter messages on + /// The kafka topic to publish dead letter messages on. + /// Still valid in the new format: it names the produce-only topic in + /// kafka_topics whose cluster the deadletter producer connects to. pub kafka_deadletter_topic: String, - /// The security method used for authentication to the DLQ eg. sasl_plaintext + /// The security method used for authentication to the DLQ eg. sasl_plaintext. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_security_protocol: Option, - /// The hashing algorithm used for authentication to the DLQ eg. scram-sha-256 + /// The hashing algorithm used for authentication to the DLQ eg. scram-sha-256. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_sasl_mechanism: Option, - /// The sasl username for DLQ publishing + /// The sasl username for DLQ publishing. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_sasl_username: Option, - /// The sasl password for DLQ publishing + /// The sasl password for DLQ publishing. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_sasl_password: Option, - /// The location to the DLQ CA certificate file + /// The location to the DLQ CA certificate file. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_ssl_ca_location: Option, - /// The location to the DLQ certificate file + /// The location to the DLQ certificate file. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_ssl_certificate_location: Option, - /// The location to the DLQ private key file + /// The location to the DLQ private key file. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_ssl_key_location: Option, /// The topic to publish retry task activations to. @@ -551,13 +561,15 @@ impl Config { /// `kafka_topics` and `kafka_clusters` are always populated. pub(crate) fn normalize_and_validate(&mut self) -> Result<(), Box> { const DEFAULT_CLUSTER: &str = "default"; + const DEADLETTER_CLUSTER: &str = "deadletter"; const DEFAULT_TOPIC: &str = "taskworker"; const DEFAULT_CLUSTER_ADDRESS: &str = "127.0.0.1:9092"; const DEFAULT_CONSUMER_GROUP: &str = "taskworker"; let uses_new_format = !self.kafka_topics.is_empty() || !self.kafka_clusters.is_empty(); - // Any explicitly-set legacy field tied to the main consumed cluster. - // (Deadletter fields are not legacy duals and are intentionally excluded.) + // Any explicitly-set deprecated field describing a cluster (the main + // consumed cluster or the deadletter cluster). kafka_deadletter_topic is + // NOT deprecated and is intentionally excluded. let uses_legacy = self.kafka_topic.is_some() || self.kafka_cluster.is_some() || self.kafka_consumer_group.is_some() @@ -567,13 +579,21 @@ impl Config { || self.kafka_sasl_password.is_some() || self.kafka_ssl_ca_location.is_some() || self.kafka_ssl_certificate_location.is_some() - || self.kafka_ssl_key_location.is_some(); + || self.kafka_ssl_key_location.is_some() + || self.kafka_deadletter_cluster.is_some() + || self.kafka_deadletter_security_protocol.is_some() + || self.kafka_deadletter_sasl_mechanism.is_some() + || self.kafka_deadletter_sasl_username.is_some() + || self.kafka_deadletter_sasl_password.is_some() + || self.kafka_deadletter_ssl_ca_location.is_some() + || self.kafka_deadletter_ssl_certificate_location.is_some() + || self.kafka_deadletter_ssl_key_location.is_some(); if uses_new_format && uses_legacy { return Err(Box::new(figment::Error::from( - "cannot mix the deprecated kafka_topic/kafka_cluster/kafka_consumer_group \ - (and related kafka_sasl_*/kafka_ssl_* fields) with kafka_topics/kafka_clusters; \ - use one config format or the other" + "cannot mix the deprecated kafka_cluster/kafka_topic/kafka_consumer_group/\ + kafka_deadletter_cluster (and related kafka_sasl_*/kafka_ssl_*/kafka_deadletter_* \ + auth fields) with kafka_topics/kafka_clusters; use one config format or the other" .to_owned(), ))); } @@ -598,6 +618,12 @@ impl Config { if self.kafka_topic.is_some() { warn!("kafka_topic is deprecated, use kafka_topics instead"); } + if self.kafka_deadletter_cluster.is_some() { + warn!( + "kafka_deadletter_cluster is deprecated, declare the deadletter topic in \ + kafka_topics with a cluster reference instead" + ); + } let topic_name = self .kafka_topic @@ -615,7 +641,7 @@ impl Config { self.kafka_clusters.insert( DEFAULT_CLUSTER.to_owned(), ClusterConfig { - address, + address: address.clone(), security_protocol: self.kafka_security_protocol.clone(), sasl_mechanism: self.kafka_sasl_mechanism.clone(), sasl_username: self.kafka_sasl_username.clone(), @@ -626,6 +652,29 @@ impl Config { }, ); + // Migrate the deprecated deadletter cluster/auth fields into a + // dedicated cluster. The deadletter producer historically falls back + // to the main cluster's address when kafka_deadletter_cluster is + // unset, while using its own (possibly empty) auth. + self.kafka_clusters.insert( + DEADLETTER_CLUSTER.to_owned(), + ClusterConfig { + address: self + .kafka_deadletter_cluster + .clone() + .unwrap_or_else(|| address.clone()), + security_protocol: self.kafka_deadletter_security_protocol.clone(), + sasl_mechanism: self.kafka_deadletter_sasl_mechanism.clone(), + sasl_username: self.kafka_deadletter_sasl_username.clone(), + sasl_password: self.kafka_deadletter_sasl_password.clone(), + ssl_ca_location: self.kafka_deadletter_ssl_ca_location.clone(), + ssl_certificate_location: self + .kafka_deadletter_ssl_certificate_location + .clone(), + ssl_key_location: self.kafka_deadletter_ssl_key_location.clone(), + }, + ); + let raw_config = if self.raw_mode { Some(RawModeConfig { namespace: self.raw_namespace.clone(), @@ -647,6 +696,18 @@ impl Config { }, ); + // Register the deadletter topic as a produce-only topic, just like + // the retry topic below. Its name stays configured via the (still + // valid) kafka_deadletter_topic field. + self.kafka_topics + .entry(self.kafka_deadletter_topic.clone()) + .or_insert_with(|| TopicConfig { + cluster: DEADLETTER_CLUSTER.to_owned(), + consumer_group: consumer_group.clone(), + produce_only: true, + raw: None, + }); + // Add the retry topic if configured (unless it collides with the // main topic). Retry topics are produce-only; this taskbroker writes // retries to them but does not consume from them. @@ -675,6 +736,16 @@ impl Config { // Validate exactly one consumable topic self.consumable_topic()?; + // The deadletter topic must be a declared topic so the producer can + // resolve its cluster. In legacy mode it was added above; in the new + // format the user must declare it (produce-only) in kafka_topics. + if !self.kafka_topics.contains_key(&self.kafka_deadletter_topic) { + return Err(Box::new(figment::Error::from(format!( + "kafka_deadletter_topic '{}' is not defined in kafka_topics", + self.kafka_deadletter_topic + )))); + } + Ok(()) } @@ -771,46 +842,43 @@ impl Config { config.clone() } - /// Convert the application Config into rdkafka::ClientConfig - /// Convert the application Config into rdkafka::ClientConfig for producer (DLQ). - /// Falls back to consumable topic's cluster if kafka_deadletter_cluster is not set. + /// Convert the application Config into rdkafka::ClientConfig for the + /// deadletter / forwarding producer. The producer connects to the cluster + /// of the `kafka_deadletter_topic` entry in `kafka_topics` (in legacy mode + /// this is the migrated "deadletter" cluster). /// Panics if config wasn't validated. pub fn kafka_producer_config(&self) -> ClientConfig { - let (_, topic_config) = self - .consumable_topic() - .expect("consumable_topic failed - was config validated?"); - let default_cluster = self - .cluster(&topic_config.cluster) + let dlq_topic = self + .kafka_topics + .get(&self.kafka_deadletter_topic) + .expect("deadletter topic not in kafka_topics - was config validated?"); + let cluster = self + .cluster(&dlq_topic.cluster) .expect("cluster lookup failed - was config validated?"); let mut new_config = ClientConfig::new(); let config = new_config - .set( - "bootstrap.servers", - self.kafka_deadletter_cluster - .as_ref() - .unwrap_or(&default_cluster.address), - ) + .set("bootstrap.servers", cluster.address.clone()) .set("message.max.bytes", format!("{}", self.max_message_size)); - if let Some(sasl_mechanism) = &self.kafka_deadletter_sasl_mechanism { + if let Some(ref sasl_mechanism) = cluster.sasl_mechanism { config.set("sasl.mechanism", sasl_mechanism); } - if let Some(sasl_username) = &self.kafka_deadletter_sasl_username { + if let Some(ref sasl_username) = cluster.sasl_username { config.set("sasl.username", sasl_username); } - if let Some(sasl_password) = &self.kafka_deadletter_sasl_password { + if let Some(ref sasl_password) = cluster.sasl_password { config.set("sasl.password", sasl_password); } - if let Some(security_protocol) = &self.kafka_deadletter_security_protocol { + if let Some(ref security_protocol) = cluster.security_protocol { config.set("security.protocol", security_protocol); } - if let Some(ssl_ca_location) = &self.kafka_deadletter_ssl_ca_location { + if let Some(ref ssl_ca_location) = cluster.ssl_ca_location { config.set("ssl.ca.location", ssl_ca_location); } - if let Some(ssl_certificate_location) = &self.kafka_deadletter_ssl_certificate_location { + if let Some(ref ssl_certificate_location) = cluster.ssl_certificate_location { config.set("ssl.certificate.location", ssl_certificate_location); } - if let Some(ssl_private_key_location) = &self.kafka_deadletter_ssl_key_location { + if let Some(ref ssl_private_key_location) = cluster.ssl_key_location { config.set("ssl.key.location", ssl_private_key_location); } @@ -1233,6 +1301,8 @@ mod tests { jail.create_file( "config.yaml", r#" +kafka_deadletter_topic: profiles-dlq + kafka_topics: profiles: cluster: profiles-cluster @@ -1243,6 +1313,10 @@ kafka_topics: cluster: profiles-cluster consumer_group: taskbroker-profiles-retry produce_only: true + profiles-dlq: + cluster: profiles-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true kafka_clusters: profiles-cluster: @@ -1256,7 +1330,7 @@ kafka_clusters: let config = Config::from_args(&args).unwrap(); let topics = &config.kafka_topics; - assert_eq!(topics.len(), 2); + assert_eq!(topics.len(), 3); let profiles = topics.get("profiles").unwrap(); assert_eq!(profiles.cluster, "profiles-cluster"); @@ -1317,6 +1391,21 @@ kafka_clusters: "TASKBROKER_KAFKA_TOPICS__PROFILES__CONSUMER_GROUP", "taskbroker-profiles", ); + // The deadletter topic must be a declared topic. The key segment is + // lowercased to "profiles_dlq", so kafka_deadletter_topic must match. + jail.set_env("TASKBROKER_KAFKA_DEADLETTER_TOPIC", "profiles_dlq"); + jail.set_env( + "TASKBROKER_KAFKA_TOPICS__PROFILES_DLQ__CLUSTER", + "my_cluster", + ); + jail.set_env( + "TASKBROKER_KAFKA_TOPICS__PROFILES_DLQ__CONSUMER_GROUP", + "taskbroker-profiles-dlq", + ); + jail.set_env( + "TASKBROKER_KAFKA_TOPICS__PROFILES_DLQ__PRODUCE_ONLY", + "true", + ); jail.set_env( "TASKBROKER_KAFKA_CLUSTERS__MY_CLUSTER__ADDRESS", "10.0.0.2:9092", @@ -1326,11 +1415,12 @@ kafka_clusters: let config = Config::from_args(&args).unwrap(); let topics = &config.kafka_topics; - assert_eq!(topics.len(), 1); + assert_eq!(topics.len(), 2); let profiles = topics.get("profiles").unwrap(); assert_eq!(profiles.cluster, "my_cluster"); assert_eq!(profiles.consumer_group, "taskbroker-profiles"); + assert!(topics.get("profiles_dlq").unwrap().produce_only); let clusters = &config.kafka_clusters; assert_eq!(clusters.len(), 1); @@ -1477,10 +1567,12 @@ kafka_clusters: #[test] fn test_multi_topic_allows_one_consumable_with_produce_only() { Jail::expect_with(|jail| { - // One consumable topic (profiles), one produce-only (profiles-retry). + // One consumable topic (profiles), two produce-only (retry + dlq). jail.create_file( "config.yaml", r#" +kafka_deadletter_topic: profiles-dlq + kafka_topics: profiles: cluster: my-cluster @@ -1489,6 +1581,10 @@ kafka_topics: cluster: my-cluster consumer_group: taskbroker-profiles-retry produce_only: true + profiles-dlq: + cluster: my-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true kafka_clusters: my-cluster: @@ -1502,11 +1598,12 @@ kafka_clusters: let config = Config::from_args(&args).unwrap(); let topics = &config.kafka_topics; - assert_eq!(topics.len(), 2); + assert_eq!(topics.len(), 3); - // One consumable, one produce-only + // One consumable, two produce-only assert!(!topics.get("profiles").unwrap().produce_only); assert!(topics.get("profiles-retry").unwrap().produce_only); + assert!(topics.get("profiles-dlq").unwrap().produce_only); // consumable_topic() returns the one consumable topic let (topic_name, _) = config.consumable_topic().unwrap(); @@ -1548,4 +1645,85 @@ kafka_clusters: Ok(()) }); } + + #[test] + fn test_multi_topic_requires_deadletter_topic() { + // In the new format the deadletter topic must be declared in kafka_topics. + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string() + .contains("kafka_deadletter_topic 'taskworker-dlq' is not defined"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_deadletter_producer_uses_its_topic_cluster() { + // The deadletter producer connects to the cluster of the deadletter + // topic, which can differ from the consumable topic's cluster. + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_deadletter_topic: profiles-dlq + +kafka_topics: + profiles: + cluster: main-cluster + consumer_group: taskbroker-profiles + profiles-dlq: + cluster: dlq-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true + +kafka_clusters: + main-cluster: + address: 10.0.0.1:9092 + dlq-cluster: + address: 10.9.9.9:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let config = Config::from_args(&args).unwrap(); + + let consumer_config = config.kafka_consumer_config(); + assert_eq!( + consumer_config.get("bootstrap.servers").unwrap(), + "10.0.0.1:9092" + ); + + let producer_config = config.kafka_producer_config(); + assert_eq!( + producer_config.get("bootstrap.servers").unwrap(), + "10.9.9.9:9092" + ); + + Ok(()) + }); + } } From f50a17400ceb33b021e70abc017c2566afb74df9 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 13:49:12 +0200 Subject: [PATCH 06/17] Add per-topic session timeout, auto commit, and auto offset reset --- src/config.rs | 152 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 141 insertions(+), 11 deletions(-) diff --git a/src/config.rs b/src/config.rs index 48bfdfda..67a9ff87 100644 --- a/src/config.rs +++ b/src/config.rs @@ -25,6 +25,18 @@ pub struct TopicConfig { /// Raw mode settings. If set, this topic uses raw mode. #[serde(default)] pub raw: Option, + /// The kafka session timeout in ms for this topic's consumer. + /// Falls back to the global `kafka_session_timeout_ms` when unset. + #[serde(default)] + pub session_timeout_ms: Option, + /// The interval in ms at which this topic's consumer auto-commits. + /// Falls back to the global `kafka_auto_commit_interval_ms` when unset. + #[serde(default)] + pub auto_commit_interval_ms: Option, + /// The auto offset reset policy for this topic's consumer. + /// Falls back to the global `kafka_auto_offset_reset` when unset. + #[serde(default)] + pub auto_offset_reset: Option, } /// Raw mode settings for a topic. @@ -212,13 +224,19 @@ pub struct Config { /// The default number of partitions for a topic pub default_topic_partitions: i32, - /// The kafka session timeout in ms + /// The kafka session timeout in ms. + /// Used as the default for topics that don't set their own + /// `session_timeout_ms`. pub kafka_session_timeout_ms: usize, /// The amount of ms that the consumer will commit at. + /// Used as the default for topics that don't set their own + /// `auto_commit_interval_ms`. pub kafka_auto_commit_interval_ms: usize, - /// The amount of ms that the consumer will commit at. + /// The auto offset reset policy for the consumer. + /// Used as the default for topics that don't set their own + /// `auto_offset_reset`. pub kafka_auto_offset_reset: String, /// The number of ms for timeouts when publishing messages to kafka. @@ -693,6 +711,9 @@ impl Config { consumer_group: consumer_group.clone(), produce_only: false, raw: raw_config, + session_timeout_ms: None, + auto_commit_interval_ms: None, + auto_offset_reset: None, }, ); @@ -706,6 +727,9 @@ impl Config { consumer_group: consumer_group.clone(), produce_only: true, raw: None, + session_timeout_ms: None, + auto_commit_interval_ms: None, + auto_offset_reset: None, }); // Add the retry topic if configured (unless it collides with the @@ -719,6 +743,9 @@ impl Config { consumer_group, produce_only: true, raw: None, + session_timeout_ms: None, + auto_commit_interval_ms: None, + auto_offset_reset: None, }); } } @@ -797,24 +824,30 @@ impl Config { .cluster(&topic_config.cluster) .expect("cluster lookup failed - was config validated?"); + // Per-topic consumer settings, falling back to the global defaults. + let session_timeout_ms = topic_config + .session_timeout_ms + .unwrap_or(self.kafka_session_timeout_ms); + let auto_commit_interval_ms = topic_config + .auto_commit_interval_ms + .unwrap_or(self.kafka_auto_commit_interval_ms); + let auto_offset_reset = topic_config + .auto_offset_reset + .clone() + .unwrap_or_else(|| self.kafka_auto_offset_reset.clone()); + let mut new_config = ClientConfig::new(); let config = new_config .set("bootstrap.servers", cluster.address.clone()) .set("group.id", topic_config.consumer_group.clone()) - .set( - "session.timeout.ms", - self.kafka_session_timeout_ms.to_string(), - ) + .set("session.timeout.ms", session_timeout_ms.to_string()) .set("enable.partition.eof", "false") .set("enable.auto.commit", "true") .set( "auto.commit.interval.ms", - self.kafka_auto_commit_interval_ms.to_string(), - ) - .set( - "auto.offset.reset", - self.kafka_auto_offset_reset.to_string(), + auto_commit_interval_ms.to_string(), ) + .set("auto.offset.reset", auto_offset_reset) .set("enable.auto.offset.store", "false"); if let Some(ref sasl_mechanism) = cluster.sasl_mechanism { @@ -1679,6 +1712,103 @@ kafka_clusters: }); } + #[test] + fn test_per_topic_consumer_settings_override_globals() { + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_deadletter_topic: profiles-dlq +kafka_session_timeout_ms: 6000 +kafka_auto_commit_interval_ms: 5000 +kafka_auto_offset_reset: latest + +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + session_timeout_ms: 12000 + auto_commit_interval_ms: 1000 + auto_offset_reset: earliest + profiles-dlq: + cluster: my-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let config = Config::from_args(&args).unwrap(); + let consumer_config = config.kafka_consumer_config(); + + // Per-topic values win over the globals. + assert_eq!(consumer_config.get("session.timeout.ms").unwrap(), "12000"); + assert_eq!( + consumer_config.get("auto.commit.interval.ms").unwrap(), + "1000" + ); + assert_eq!( + consumer_config.get("auto.offset.reset").unwrap(), + "earliest" + ); + + Ok(()) + }); + } + + #[test] + fn test_topic_consumer_settings_fall_back_to_globals() { + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_deadletter_topic: profiles-dlq +kafka_session_timeout_ms: 7000 +kafka_auto_commit_interval_ms: 2000 +kafka_auto_offset_reset: earliest + +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + profiles-dlq: + cluster: my-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let config = Config::from_args(&args).unwrap(); + let consumer_config = config.kafka_consumer_config(); + + // No per-topic overrides, so the globals are used. + assert_eq!(consumer_config.get("session.timeout.ms").unwrap(), "7000"); + assert_eq!( + consumer_config.get("auto.commit.interval.ms").unwrap(), + "2000" + ); + assert_eq!( + consumer_config.get("auto.offset.reset").unwrap(), + "earliest" + ); + + Ok(()) + }); + } + #[test] fn test_deadletter_producer_uses_its_topic_cluster() { // The deadletter producer connects to the cluster of the deadletter From e27965ce7360c343f0679ef5073564667c72ae37 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 13:52:46 +0200 Subject: [PATCH 07/17] Fix clippy useless vec! warning Co-Authored-By: Claude Opus 4.8 --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index edf59e4a..d26ab9b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -167,7 +167,7 @@ async fn main() -> Result<(), Error> { let (main_topic, _) = consumer_config .consumable_topic() .expect("invalid config: no consumable topic"); - let topics_to_consume = vec![main_topic.to_owned()]; + let topics_to_consume = [main_topic.to_owned()]; async move { // The consumer has an internal thread that listens for cancellations, so it doesn't need From b1cc58079033ee3824edc5e5eea3622397b84f7e Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 14:05:34 +0200 Subject: [PATCH 08/17] Fix upkeep tests building unnormalized configs The upkeep tests built Config literals (overriding kafka_deadletter_topic after normalization, or skipping normalization entirely), so kafka_producer_config() panicked because the deadletter topic was never registered in kafka_topics. Introduce create_integration_config_from_base(base) as the single normalized-config builder and rebuild the other integration helpers on top of it. Tests now pass their overrides via the base config so normalization runs after all fields are set. Co-Authored-By: Claude Opus 4.8 --- src/test_utils.rs | 57 +++++++++++++++++++---------------------------- src/upkeep.rs | 22 +++++++++--------- 2 files changed, 35 insertions(+), 44 deletions(-) diff --git a/src/test_utils.rs b/src/test_utils.rs index 1b51530b..964e180e 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -298,10 +298,13 @@ pub async fn create_test_store(adapter: &str) -> Arc { } } -/// Create a Config instance that uses a testing topic -/// and earliest auto_offset_reset. This is intended to be combined -/// with [`reset_topic`] -pub fn create_integration_config() -> Arc { +/// Create a normalized integration [`Config`] from a caller-supplied `base`. +/// The standard integration defaults (pg connection, local kafka cluster, +/// consumer group, earliest offset reset) are applied first, then `base` +/// overrides them, and finally the config is normalized — so any field the +/// caller sets (e.g. `kafka_deadletter_topic`) is in place before the kafka +/// topic maps are built. +pub fn create_integration_config_from_base(base: Config) -> Config { let mut config = Config { pg_host: get_pg_host(), pg_port: get_pg_port(), @@ -309,56 +312,42 @@ pub fn create_integration_config() -> Arc { pg_password: get_pg_password(), pg_database_name: get_pg_database_name(), run_migrations: true, - kafka_topic: Some("taskbroker-test".into()), kafka_cluster: Some("127.0.0.1:9092".into()), kafka_consumer_group: Some("taskworker".into()), kafka_auto_offset_reset: "earliest".into(), - ..Config::default() + ..base }; config.normalize_and_validate().unwrap(); + config +} - Arc::new(config) +/// Create a Config instance that uses a testing topic +/// and earliest auto_offset_reset. This is intended to be combined +/// with [`reset_topic`] +pub fn create_integration_config() -> Arc { + Arc::new(create_integration_config_from_base(Config { + kafka_topic: Some("taskbroker-test".into()), + ..Config::default() + })) } /// Create a Config instance that uses SSL /// and earliest auto_offset_reset. This is intended to be combined /// with [`reset_topic`] pub fn create_integration_config_with_ssl() -> Arc { - let mut config = Config { - pg_host: get_pg_host(), - pg_port: get_pg_port(), - pg_username: get_pg_username(), - pg_password: get_pg_password(), - pg_database_name: get_pg_database_name(), + Arc::new(create_integration_config_from_base(Config { pg_extra_query_params: Some("sslmode=require".to_string()), - run_migrations: true, kafka_topic: Some("taskbroker-test".into()), - kafka_cluster: Some("127.0.0.1:9092".into()), - kafka_consumer_group: Some("taskworker".into()), - kafka_auto_offset_reset: "earliest".into(), ..Config::default() - }; - config.normalize_and_validate().unwrap(); - - Arc::new(config) + })) } +/// Create a normalized integration [`Config`] using the given main topic. pub fn create_integration_config_with_topic(topic: String) -> Config { - let mut config = Config { - pg_host: get_pg_host(), - pg_port: get_pg_port(), - pg_username: get_pg_username(), - pg_password: get_pg_password(), - pg_database_name: get_pg_database_name(), - run_migrations: true, + create_integration_config_from_base(Config { kafka_topic: Some(topic), - kafka_cluster: Some("127.0.0.1:9092".into()), - kafka_consumer_group: Some("taskworker".into()), - kafka_auto_offset_reset: "earliest".into(), ..Config::default() - }; - config.normalize_and_validate().unwrap(); - config + }) } /// Create a kafka producer for a given config diff --git a/src/upkeep.rs b/src/upkeep.rs index ed869415..dd5eaade 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -599,7 +599,7 @@ mod tests { use crate::store::activation::ActivationStatus; use crate::test_utils::{ StatusCount, assert_counts, consume_topic, create_config, - create_integration_config_with_topic, create_producer, create_test_store, make_activations, + create_integration_config_from_base, create_producer, create_test_store, make_activations, replace_retry_state, reset_topic, }; use crate::upkeep::{create_retry_activation, do_upkeep}; @@ -690,10 +690,11 @@ mod tests { #[case::sqlite("sqlite")] #[case::postgres("postgres")] async fn test_retry_activation_is_appended_to_kafka(#[case] adapter: &str) { - let config = Arc::new(Config { + let config = Arc::new(create_integration_config_from_base(Config { + kafka_topic: Some(format!("taskbroker-test-{adapter}")), kafka_deadletter_topic: format!("taskbroker-test-{adapter}-dlq"), - ..create_integration_config_with_topic(format!("taskbroker-test-{adapter}")) - }); + ..Default::default() + })); let runtime_config = Arc::new(RuntimeConfigManager::new(None).await); let start_time = Utc::now(); @@ -1047,10 +1048,11 @@ mod tests { #[case::sqlite("sqlite")] #[case::postgres("postgres")] async fn test_remove_at_remove_failed_publish_to_kafka(#[case] adapter: &str) { - let config = Arc::new(Config { + let config = Arc::new(create_integration_config_from_base(Config { + kafka_topic: Some(format!("taskbroker-test-{adapter}")), kafka_deadletter_topic: format!("taskbroker-test-{adapter}-dlq"), - ..create_integration_config_with_topic(format!("taskbroker-test-{adapter}")) - }); + ..Default::default() + })); let runtime_config = Arc::new(RuntimeConfigManager::new(None).await); reset_topic(config.clone()).await; @@ -1432,11 +1434,11 @@ demoted_namespaces: #[case::sqlite("sqlite")] #[case::postgres("postgres")] async fn test_full_vacuum_on_upkeep(#[case] adapter: &str) { - let raw_config = Config { + let config = Arc::new(create_integration_config_from_base(Config { + kafka_topic: Some(format!("taskbroker-test-full-vacuum-{adapter}")), full_vacuum_on_start: true, ..Default::default() - }; - let config = Arc::new(raw_config); + })); let runtime_config = Arc::new(RuntimeConfigManager::new(None).await); let store = create_test_store(adapter).await; From 3f3ca3c5d7b501d34c57d3c147bec530deba80be Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 14:27:19 +0200 Subject: [PATCH 09/17] Guard kafka_topics/clusters inserts against silent collisions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The legacy normalization path keys topics and clusters by name. A deadletter topic configured with the same name as the consumed topic previously collapsed silently into the main topic's entry (via entry().or_insert_with()), so the deadletter producer would resolve to the main cluster instead of the deadletter cluster — misrouting deadletter messages. Use the insert return value to reject that collision with a clear error, and assert the remaining internal inserts (clusters, main topic) never overwrite an existing entry. The intentional retry-topic alias of the main topic is left as-is. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 --- src/config.rs | 72 ++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 12 deletions(-) diff --git a/src/config.rs b/src/config.rs index 67a9ff87..fa3ec522 100644 --- a/src/config.rs +++ b/src/config.rs @@ -656,7 +656,7 @@ impl Config { .clone() .unwrap_or_else(|| DEFAULT_CONSUMER_GROUP.to_owned()); - self.kafka_clusters.insert( + let prev = self.kafka_clusters.insert( DEFAULT_CLUSTER.to_owned(), ClusterConfig { address: address.clone(), @@ -669,12 +669,16 @@ impl Config { ssl_key_location: self.kafka_ssl_key_location.clone(), }, ); + assert!( + prev.is_none(), + "internal: duplicate '{DEFAULT_CLUSTER}' cluster" + ); // Migrate the deprecated deadletter cluster/auth fields into a // dedicated cluster. The deadletter producer historically falls back // to the main cluster's address when kafka_deadletter_cluster is // unset, while using its own (possibly empty) auth. - self.kafka_clusters.insert( + let prev = self.kafka_clusters.insert( DEADLETTER_CLUSTER.to_owned(), ClusterConfig { address: self @@ -692,6 +696,10 @@ impl Config { ssl_key_location: self.kafka_deadletter_ssl_key_location.clone(), }, ); + assert!( + prev.is_none(), + "internal: duplicate '{DEADLETTER_CLUSTER}' cluster" + ); let raw_config = if self.raw_mode { Some(RawModeConfig { @@ -704,8 +712,8 @@ impl Config { None }; - self.kafka_topics.insert( - topic_name, + let prev = self.kafka_topics.insert( + topic_name.clone(), TopicConfig { cluster: DEFAULT_CLUSTER.to_owned(), consumer_group: consumer_group.clone(), @@ -716,13 +724,21 @@ impl Config { auto_offset_reset: None, }, ); - - // Register the deadletter topic as a produce-only topic, just like - // the retry topic below. Its name stays configured via the (still - // valid) kafka_deadletter_topic field. - self.kafka_topics - .entry(self.kafka_deadletter_topic.clone()) - .or_insert_with(|| TopicConfig { + assert!(prev.is_none(), "internal: duplicate topic '{topic_name}'"); + + // Register the deadletter topic as a produce-only topic on its own + // cluster, just like the retry topic below. Its name stays + // configured via the (still valid) kafka_deadletter_topic field. + // + // Topics are keyed by name, so a deadletter topic sharing the main + // topic's name cannot also carry the deadletter cluster: this insert + // would silently collapse the two into the consumed topic's cluster. + // A non-empty return value means such a collision happened (the main + // topic is the only prior entry), so reject it rather than misroute + // deadletter messages. + let prev = self.kafka_topics.insert( + self.kafka_deadletter_topic.clone(), + TopicConfig { cluster: DEADLETTER_CLUSTER.to_owned(), consumer_group: consumer_group.clone(), produce_only: true, @@ -730,7 +746,14 @@ impl Config { session_timeout_ms: None, auto_commit_interval_ms: None, auto_offset_reset: None, - }); + }, + ); + if prev.is_some() { + return Err(Box::new(figment::Error::from(format!( + "kafka_deadletter_topic '{}' must differ from the consumed topic '{}'", + self.kafka_deadletter_topic, topic_name + )))); + } // Add the retry topic if configured (unless it collides with the // main topic). Retry topics are produce-only; this taskbroker writes @@ -1712,6 +1735,31 @@ kafka_clusters: }); } + #[test] + fn test_legacy_rejects_deadletter_topic_equal_to_main_topic() { + // Topics are keyed by name, so a deadletter topic that shares the main + // topic's name cannot carry its own (deadletter) cluster. Rather than + // silently route deadletter messages to the consumed topic's cluster, + // normalization must reject the collision. + Jail::expect_with(|jail| { + jail.set_env("TASKBROKER_KAFKA_TOPIC", "taskworker"); + jail.set_env("TASKBROKER_KAFKA_DEADLETTER_TOPIC", "taskworker"); + + let args = Args { config: None }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains( + "kafka_deadletter_topic 'taskworker' must differ from the consumed topic \ + 'taskworker'" + ), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + #[test] fn test_per_topic_consumer_settings_override_globals() { Jail::expect_with(|jail| { From 69da19d0f57fedd1d98993e2a60be97db32f43be Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 14:28:15 +0200 Subject: [PATCH 10/17] Trim verbose deadletter collision comment Co-Authored-By: Claude Opus 4.8 --- src/config.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/config.rs b/src/config.rs index fa3ec522..7e01632b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -727,15 +727,8 @@ impl Config { assert!(prev.is_none(), "internal: duplicate topic '{topic_name}'"); // Register the deadletter topic as a produce-only topic on its own - // cluster, just like the retry topic below. Its name stays - // configured via the (still valid) kafka_deadletter_topic field. - // - // Topics are keyed by name, so a deadletter topic sharing the main - // topic's name cannot also carry the deadletter cluster: this insert - // would silently collapse the two into the consumed topic's cluster. - // A non-empty return value means such a collision happened (the main - // topic is the only prior entry), so reject it rather than misroute - // deadletter messages. + // cluster. A non-empty return value means it collided with the main + // topic, which would route deadletter messages to the wrong cluster. let prev = self.kafka_topics.insert( self.kafka_deadletter_topic.clone(), TopicConfig { From a02e3757d35fcc35eec99ec98144822f63133da7 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 14:28:33 +0200 Subject: [PATCH 11/17] Reword deadletter comment: wrong topic Co-Authored-By: Claude Opus 4.8 --- src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 7e01632b..d49bf0f3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -728,7 +728,7 @@ impl Config { // Register the deadletter topic as a produce-only topic on its own // cluster. A non-empty return value means it collided with the main - // topic, which would route deadletter messages to the wrong cluster. + // topic, which would route deadletter messages to the wrong topic. let prev = self.kafka_topics.insert( self.kafka_deadletter_topic.clone(), TopicConfig { From eff4da68bb5b2f46fe46e4c8c9277e4684c9c0ef Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 14:42:20 +0200 Subject: [PATCH 12/17] Reject kafka_retry_topic colliding with the deadletter topic The retry topic may alias the main topic (retries get re-enqueued there), but aliasing the deadletter topic silently gave the retry topic the deadletter cluster/role via entry().or_insert_with(). Reject that collision during normalization. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 --- src/config.rs | 38 +++++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/src/config.rs b/src/config.rs index d49bf0f3..ba9c71a8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -748,10 +748,19 @@ impl Config { )))); } - // Add the retry topic if configured (unless it collides with the - // main topic). Retry topics are produce-only; this taskbroker writes - // retries to them but does not consume from them. + // Add the retry topic if configured. Retry topics are produce-only; + // this taskbroker writes retries to them but does not consume from + // them. Aliasing the main topic is allowed (retries are re-enqueued + // there), but aliasing the deadletter topic is not: the names would + // collide and the retry would silently inherit the deadletter + // cluster/role. if let Some(ref retry_topic) = self.kafka_retry_topic { + if retry_topic == &self.kafka_deadletter_topic { + return Err(Box::new(figment::Error::from(format!( + "kafka_retry_topic '{}' must differ from kafka_deadletter_topic", + retry_topic + )))); + } self.kafka_topics .entry(retry_topic.clone()) .or_insert_with(|| TopicConfig { @@ -1753,6 +1762,29 @@ kafka_clusters: }); } + #[test] + fn test_legacy_rejects_retry_topic_equal_to_deadletter_topic() { + // The retry topic may alias the main topic, but not the deadletter + // topic: that collision would silently give the retry topic the + // deadletter cluster/role. + Jail::expect_with(|jail| { + jail.set_env("TASKBROKER_KAFKA_RETRY_TOPIC", "taskworker-dlq"); + jail.set_env("TASKBROKER_KAFKA_DEADLETTER_TOPIC", "taskworker-dlq"); + + let args = Args { config: None }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains( + "kafka_retry_topic 'taskworker-dlq' must differ from kafka_deadletter_topic" + ), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + #[test] fn test_per_topic_consumer_settings_override_globals() { Jail::expect_with(|jail| { From 7caad4daceee6c7bfbad5bb60cee247840a0b69a Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 14:42:49 +0200 Subject: [PATCH 13/17] remove dead code --- src/test_utils.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/test_utils.rs b/src/test_utils.rs index 964e180e..31455a0f 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -342,14 +342,6 @@ pub fn create_integration_config_with_ssl() -> Arc { })) } -/// Create a normalized integration [`Config`] using the given main topic. -pub fn create_integration_config_with_topic(topic: String) -> Config { - create_integration_config_from_base(Config { - kafka_topic: Some(topic), - ..Config::default() - }) -} - /// Create a kafka producer for a given config pub fn create_producer(config: Arc) -> Arc { let producer: FutureProducer = config From fb61fe7e4755b8ed6b828524b65da5cd90bae074 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 14:48:53 +0200 Subject: [PATCH 14/17] Warn when demoted-namespace forwarder reuses mismatched DLQ auth The forwarding producer reuses the deadletter cluster's credentials and only overrides bootstrap.servers, so forwarding to a different cluster with its own auth can fail to publish. Behavior is unchanged for compatibility, but we now warn when the forward cluster differs from the deadletter cluster and the latter has auth configured. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 --- src/config.rs | 36 +++++++++++++++++++++++++++++------- src/upkeep.rs | 16 +++++++++++++++- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/src/config.rs b/src/config.rs index ba9c71a8..6fc8067a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -73,6 +73,21 @@ pub struct ClusterConfig { pub ssl_key_location: Option, } +impl ClusterConfig { + /// Whether any authentication / TLS settings are configured for this + /// cluster. Used to detect when a producer would carry credentials that + /// only apply to this specific cluster. + pub fn has_auth(&self) -> bool { + self.security_protocol.is_some() + || self.sasl_mechanism.is_some() + || self.sasl_username.is_some() + || self.sasl_password.is_some() + || self.ssl_ca_location.is_some() + || self.ssl_certificate_location.is_some() + || self.ssl_key_location.is_some() + } +} + #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] pub enum DatabaseAdapter { @@ -900,19 +915,26 @@ impl Config { config.clone() } + /// The cluster the deadletter / forwarding producer connects to: the + /// cluster of the `kafka_deadletter_topic` entry in `kafka_topics` (in + /// legacy mode this is the migrated "deadletter" cluster). + /// Panics if config wasn't validated. + pub fn kafka_producer_cluster(&self) -> &ClusterConfig { + let dlq_topic = self + .kafka_topics + .get(&self.kafka_deadletter_topic) + .expect("deadletter topic not in kafka_topics - was config validated?"); + self.cluster(&dlq_topic.cluster) + .expect("cluster lookup failed - was config validated?") + } + /// Convert the application Config into rdkafka::ClientConfig for the /// deadletter / forwarding producer. The producer connects to the cluster /// of the `kafka_deadletter_topic` entry in `kafka_topics` (in legacy mode /// this is the migrated "deadletter" cluster). /// Panics if config wasn't validated. pub fn kafka_producer_config(&self) -> ClientConfig { - let dlq_topic = self - .kafka_topics - .get(&self.kafka_deadletter_topic) - .expect("deadletter topic not in kafka_topics - was config validated?"); - let cluster = self - .cluster(&dlq_topic.cluster) - .expect("cluster lookup failed - was config validated?"); + let cluster = self.kafka_producer_cluster(); let mut new_config = ClientConfig::new(); let config = new_config diff --git a/src/upkeep.rs b/src/upkeep.rs index dd5eaade..e407cd10 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -14,7 +14,7 @@ use sentry_protos::taskbroker::v1::TaskActivation; use tokio::{fs, join, select, time}; use tonic_health::ServingStatus; use tonic_health::server::HealthReporter; -use tracing::{debug, error, info, instrument}; +use tracing::{debug, error, info, instrument, warn}; use uuid::Uuid; use crate::SERVICE_NAME; @@ -340,6 +340,20 @@ pub async fn do_upkeep( let same_topic = forward_topic == main_topic; if !(demoted_namespaces.is_empty() || (same_cluster && same_topic)) { let forward_demoted_start = Instant::now(); + // The forwarding producer reuses the deadletter cluster's credentials + // (see Config::kafka_producer_config) and only overrides + // bootstrap.servers. If we're forwarding to a different cluster that has + // its own auth, those credentials won't apply there and publishing will + // fail. We keep the legacy behavior (reuse + override) for compatibility + // but warn so the misconfiguration is visible. + let dlq_cluster = config.kafka_producer_cluster(); + if forward_cluster != dlq_cluster.address && dlq_cluster.has_auth() { + warn!( + "forwarding demoted-namespace tasks to cluster '{}', but the producer reuses the \ + deadletter cluster '{}' credentials; publishing may fail if their auth differs", + forward_cluster, dlq_cluster.address + ); + } let mut forward_producer_config = config.kafka_producer_config(); forward_producer_config.set("bootstrap.servers", &forward_cluster); let forward_producer: Arc = Arc::new( From 32d189d72129d151e256ef8420c2f6eb4d57a61e Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 14:58:32 +0200 Subject: [PATCH 15/17] Require the deadletter topic to share the retry target's cluster The upkeep producer connects to the deadletter topic's cluster but is also reused to publish retries to the retry topic (or the consumed topic when none is configured). A single producer reaches only one cluster, so a deadletter topic on a different cluster than the retry target silently misroutes retries to the wrong brokers. Reject that during normalization. Replaces a bogus test that asserted the deadletter topic could live on its own cluster, which the shared-producer implementation cannot actually support. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 --- src/config.rs | 48 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/src/config.rs b/src/config.rs index 6fc8067a..0b57101a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -813,6 +813,28 @@ impl Config { )))); } + // The upkeep producer connects to the deadletter topic's cluster but is + // also reused to publish retries to the retry topic (or the consumed + // topic when no retry topic is configured). A single producer can only + // reach one cluster, so the retry target must resolve to the same + // cluster address as the deadletter topic; otherwise retries would be + // published to the wrong brokers. + let (consumed_topic, _) = self.consumable_topic()?; + let retry_target = self.kafka_retry_topic.as_deref().unwrap_or(consumed_topic); + let deadletter_address = &self + .cluster(&self.kafka_topics[&self.kafka_deadletter_topic].cluster)? + .address; + if let Some(retry_topic_config) = self.kafka_topics.get(retry_target) { + let retry_address = &self.cluster(&retry_topic_config.cluster)?.address; + if retry_address != deadletter_address { + return Err(Box::new(figment::Error::from(format!( + "retry target topic '{}' is on cluster '{}', but deadletter topic '{}' is on \ + '{}'; they share a single producer and must be on the same cluster", + retry_target, retry_address, self.kafka_deadletter_topic, deadletter_address + )))); + } + } + Ok(()) } @@ -1905,9 +1927,11 @@ kafka_clusters: } #[test] - fn test_deadletter_producer_uses_its_topic_cluster() { - // The deadletter producer connects to the cluster of the deadletter - // topic, which can differ from the consumable topic's cluster. + fn test_rejects_deadletter_topic_on_different_cluster() { + // The upkeep producer is shared between deadletter publishing and retry + // publishing (to the consumed topic), and a single producer can only + // reach one cluster. Putting the deadletter topic on a different cluster + // than the consumed topic would misroute retries, so it's rejected. Jail::expect_with(|jail| { jail.create_file( "config.yaml", @@ -1934,18 +1958,12 @@ kafka_clusters: let args = Args { config: Some("config.yaml".to_owned()), }; - let config = Config::from_args(&args).unwrap(); - - let consumer_config = config.kafka_consumer_config(); - assert_eq!( - consumer_config.get("bootstrap.servers").unwrap(), - "10.0.0.1:9092" - ); - - let producer_config = config.kafka_producer_config(); - assert_eq!( - producer_config.get("bootstrap.servers").unwrap(), - "10.9.9.9:9092" + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string() + .contains("they share a single producer and must be on the same cluster"), + "unexpected error: {}", + err ); Ok(()) From 4aa6fca5958ec14814e9a3d33423db50096010a8 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 15:04:05 +0200 Subject: [PATCH 16/17] Require kafka_retry_topic to be declared in kafka_topics In the new format kafka_retry_topic was never registered as a topic, so a retry topic on a different cluster would silently fall through to the deadletter producer's cluster with no validation. Require the retry target to be a declared topic so its cluster is known and checked against the deadletter cluster. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 --- src/config.rs | 71 ++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 12 deletions(-) diff --git a/src/config.rs b/src/config.rs index 0b57101a..fdcffc31 100644 --- a/src/config.rs +++ b/src/config.rs @@ -816,23 +816,28 @@ impl Config { // The upkeep producer connects to the deadletter topic's cluster but is // also reused to publish retries to the retry topic (or the consumed // topic when no retry topic is configured). A single producer can only - // reach one cluster, so the retry target must resolve to the same - // cluster address as the deadletter topic; otherwise retries would be - // published to the wrong brokers. + // reach one cluster, so the retry target must be a declared topic on + // the same cluster address as the deadletter topic; otherwise retries + // would be published to the wrong brokers. In the legacy format the + // retry topic is registered above; in the new format the user must + // declare it in kafka_topics. let (consumed_topic, _) = self.consumable_topic()?; let retry_target = self.kafka_retry_topic.as_deref().unwrap_or(consumed_topic); + let retry_topic_config = self.kafka_topics.get(retry_target).ok_or_else(|| { + Box::new(figment::Error::from(format!( + "kafka_retry_topic '{retry_target}' is not defined in kafka_topics" + ))) + })?; + let retry_address = &self.cluster(&retry_topic_config.cluster)?.address; let deadletter_address = &self .cluster(&self.kafka_topics[&self.kafka_deadletter_topic].cluster)? .address; - if let Some(retry_topic_config) = self.kafka_topics.get(retry_target) { - let retry_address = &self.cluster(&retry_topic_config.cluster)?.address; - if retry_address != deadletter_address { - return Err(Box::new(figment::Error::from(format!( - "retry target topic '{}' is on cluster '{}', but deadletter topic '{}' is on \ - '{}'; they share a single producer and must be on the same cluster", - retry_target, retry_address, self.kafka_deadletter_topic, deadletter_address - )))); - } + if retry_address != deadletter_address { + return Err(Box::new(figment::Error::from(format!( + "retry target topic '{}' is on cluster '{}', but deadletter topic '{}' is on \ + '{}'; they share a single producer and must be on the same cluster", + retry_target, retry_address, self.kafka_deadletter_topic, deadletter_address + )))); } Ok(()) @@ -1926,6 +1931,48 @@ kafka_clusters: }); } + #[test] + fn test_new_format_requires_retry_topic_to_be_declared() { + // In the new format kafka_retry_topic must be declared in kafka_topics + // so its cluster is known; otherwise retries would silently use the + // deadletter producer's cluster. + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_deadletter_topic: profiles-dlq +kafka_retry_topic: profiles-retry + +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + profiles-dlq: + cluster: my-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string() + .contains("kafka_retry_topic 'profiles-retry' is not defined in kafka_topics"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + #[test] fn test_rejects_deadletter_topic_on_different_cluster() { // The upkeep producer is shared between deadletter publishing and retry From f16ad8155ed514b7d4d53652269289730c0173b9 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jun 2026 15:04:40 +0200 Subject: [PATCH 17/17] Warn on deprecated kafka_consumer_group in legacy normalization kafka_consumer_group is documented as deprecated and participates in the legacy/new-format mutual-exclusivity check, but unlike the other deprecated fields it emitted no warning when set on its own. Add the matching deprecation warning. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 --- src/config.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/config.rs b/src/config.rs index fdcffc31..1e63062b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -651,6 +651,9 @@ impl Config { if self.kafka_topic.is_some() { warn!("kafka_topic is deprecated, use kafka_topics instead"); } + if self.kafka_consumer_group.is_some() { + warn!("kafka_consumer_group is deprecated, use kafka_topics instead"); + } if self.kafka_deadletter_cluster.is_some() { warn!( "kafka_deadletter_cluster is deprecated, declare the deadletter topic in \