diff --git a/src/config.rs b/src/config.rs index 17705933..1e63062b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,10 +6,88 @@ use figment::providers::{Env, Format, Yaml}; use figment::{Figment, Metadata, Profile, Provider}; use rdkafka::ClientConfig; use serde::{Deserialize, Serialize}; +use tracing::warn; use crate::Args; use crate::logging::LogFormat; +/// Configuration for a single Kafka topic in multi-topic mode. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct TopicConfig { + /// Which cluster this topic is on (key into kafka_clusters) + pub cluster: String, + /// Consumer group for this topic + pub consumer_group: String, + /// If true, this topic is produce-only (e.g. retry topics). + /// Defaults to false, meaning the topic is consumed. + #[serde(default)] + pub produce_only: bool, + /// Raw mode settings. If set, this topic uses raw mode. + #[serde(default)] + pub raw: Option, + /// The kafka session timeout in ms for this topic's consumer. + /// Falls back to the global `kafka_session_timeout_ms` when unset. + #[serde(default)] + pub session_timeout_ms: Option, + /// The interval in ms at which this topic's consumer auto-commits. + /// Falls back to the global `kafka_auto_commit_interval_ms` when unset. + #[serde(default)] + pub auto_commit_interval_ms: Option, + /// The auto offset reset policy for this topic's consumer. + /// Falls back to the global `kafka_auto_offset_reset` when unset. + #[serde(default)] + pub auto_offset_reset: Option, +} + +/// Raw mode settings for a topic. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct RawModeConfig { + /// The namespace to assign to raw mode activations. + pub namespace: Option, + /// The application to assign to raw mode activations. + pub application: Option, + /// The taskname to assign to raw mode activations. + pub taskname: Option, + /// Processing deadline duration in seconds for raw mode activations. + pub processing_deadline_duration: Option, +} + +/// Configuration for a Kafka cluster. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct ClusterConfig { + /// Comma-separated list of broker addresses + pub address: String, + /// The security method used for authentication eg. sasl_plaintext + pub security_protocol: Option, + /// The hashing algorithm used for authentication eg. scram-sha-256 + pub sasl_mechanism: Option, + /// The sasl username for authentication + pub sasl_username: Option, + /// The sasl password for authentication + pub sasl_password: Option, + /// The location to the CA certificate file + pub ssl_ca_location: Option, + /// The location to the certificate file + pub ssl_certificate_location: Option, + /// The location to the private key file + pub ssl_key_location: Option, +} + +impl ClusterConfig { + /// Whether any authentication / TLS settings are configured for this + /// cluster. Used to detect when a producer would carry credentials that + /// only apply to this specific cluster. + pub fn has_auth(&self) -> bool { + self.security_protocol.is_some() + || self.sasl_mechanism.is_some() + || self.sasl_username.is_some() + || self.sasl_password.is_some() + || self.ssl_ca_location.is_some() + || self.ssl_certificate_location.is_some() + || self.ssl_key_location.is_some() + } +} + #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] pub enum DatabaseAdapter { @@ -65,68 +143,92 @@ pub struct Config { /// We support a list of secrets to allow for key rotation. pub grpc_shared_secret: Vec, - /// Comma separated list of kafka brokers to connect to - pub kafka_cluster: String, + /// Comma separated list of kafka brokers to connect to. + /// Deprecated: use kafka_clusters instead. Mutually exclusive with the new + /// format; defaults to None (the historical "127.0.0.1:9092" default is + /// applied during normalization when no kafka config is provided at all). + pub kafka_cluster: Option, - /// The kafka consumer group name - pub kafka_consumer_group: String, + /// The kafka consumer group name. + /// Deprecated: use kafka_topics instead. Mutually exclusive with the new + /// format; defaults to None (the historical "taskworker" default applies). + pub kafka_consumer_group: Option, /// The topic to fetch task messages from. - pub kafka_topic: String, + /// Deprecated: use kafka_topics instead. Mutually exclusive with the new + /// format; defaults to None (the historical "taskworker" default applies). + pub kafka_topic: Option, /// The topic to produce demoted "long" namespace tasks to. pub kafka_long_topic: String, - /// The security method used for authentication eg. sasl_plaintext + /// The security method used for authentication eg. sasl_plaintext. + /// Deprecated: use kafka_clusters instead. pub kafka_security_protocol: Option, - /// The hashing algorithm used for authentication eg. scram-sha-256 + /// The hashing algorithm used for authentication eg. scram-sha-256. + /// Deprecated: use kafka_clusters instead. pub kafka_sasl_mechanism: Option, - /// The sasl username for ingesting messages + /// The sasl username for ingesting messages. + /// Deprecated: use kafka_clusters instead. pub kafka_sasl_username: Option, - /// The sasl password for ingesting messages + /// The sasl password for ingesting messages. + /// Deprecated: use kafka_clusters instead. pub kafka_sasl_password: Option, - /// The location to the CA certificate file + /// The location to the CA certificate file. + /// Deprecated: use kafka_clusters instead. pub kafka_ssl_ca_location: Option, - /// The location to the certificate file + /// The location to the certificate file. + /// Deprecated: use kafka_clusters instead. pub kafka_ssl_certificate_location: Option, - /// The location to the private key file + /// The location to the private key file. + /// Deprecated: use kafka_clusters instead. pub kafka_ssl_key_location: Option, /// Whether to create missing topics if they don't exist. pub create_missing_topics: bool, - /// Comma separated list of kafka brokers to - /// publish dead letter messages on + /// Comma separated list of kafka brokers to publish dead letter messages on. + /// Deprecated: declare the deadletter topic in kafka_topics (produce_only) + /// with a cluster reference instead. pub kafka_deadletter_cluster: Option, - /// The kafka topic to publish dead letter messages on + /// The kafka topic to publish dead letter messages on. + /// Still valid in the new format: it names the produce-only topic in + /// kafka_topics whose cluster the deadletter producer connects to. pub kafka_deadletter_topic: String, - /// The security method used for authentication to the DLQ eg. sasl_plaintext + /// The security method used for authentication to the DLQ eg. sasl_plaintext. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_security_protocol: Option, - /// The hashing algorithm used for authentication to the DLQ eg. scram-sha-256 + /// The hashing algorithm used for authentication to the DLQ eg. scram-sha-256. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_sasl_mechanism: Option, - /// The sasl username for DLQ publishing + /// The sasl username for DLQ publishing. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_sasl_username: Option, - /// The sasl password for DLQ publishing + /// The sasl password for DLQ publishing. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_sasl_password: Option, - /// The location to the DLQ CA certificate file + /// The location to the DLQ CA certificate file. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_ssl_ca_location: Option, - /// The location to the DLQ certificate file + /// The location to the DLQ certificate file. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_ssl_certificate_location: Option, - /// The location to the DLQ private key file + /// The location to the DLQ private key file. + /// Deprecated: configure auth on the referenced cluster in kafka_clusters. pub kafka_deadletter_ssl_key_location: Option, /// The topic to publish retry task activations to. @@ -134,22 +236,22 @@ pub struct Config { /// Required for raw_mode where the main topic has other consumers. pub kafka_retry_topic: Option, - /// Whether to consume from the retry topic. - /// When false (default), this taskbroker only produces to the retry topic - /// but does not consume from it, allowing the retry topic to be shared - /// across multiple taskbroker instances. - pub kafka_consume_retry_topic: bool, - /// The default number of partitions for a topic pub default_topic_partitions: i32, - /// The kafka session timeout in ms + /// The kafka session timeout in ms. + /// Used as the default for topics that don't set their own + /// `session_timeout_ms`. pub kafka_session_timeout_ms: usize, /// The amount of ms that the consumer will commit at. + /// Used as the default for topics that don't set their own + /// `auto_commit_interval_ms`. pub kafka_auto_commit_interval_ms: usize, - /// The amount of ms that the consumer will commit at. + /// The auto offset reset policy for the consumer. + /// Used as the default for topics that don't set their own + /// `auto_offset_reset`. pub kafka_auto_offset_reset: String, /// The number of ms for timeouts when publishing messages to kafka. @@ -351,9 +453,22 @@ pub struct Config { /// This is an u16 because 1) we don't want to allow signed numbers 2) it can be cast into i32 /// (which we use elsewhere) without error conditions. It doesn't actually have to be that small. pub raw_processing_deadline_duration: u16, + + /// Topic configurations. After normalization, this always contains + /// at least one entry (from legacy config or explicit kafka_topics). + #[serde(default)] + pub kafka_topics: BTreeMap, + + /// Kafka cluster configurations. + /// After normalization, this always contains at least the "default" cluster. + #[serde(default)] + pub kafka_clusters: BTreeMap, } impl Default for Config { + /// Field defaults. `kafka_topics`/`kafka_clusters` are left empty; call + /// [`Config::normalize_and_validate`] (as `from_args` does) to populate + /// them from the legacy fields before using the kafka helpers. fn default() -> Self { Self { sentry_dsn: None, @@ -366,8 +481,8 @@ impl Default for Config { grpc_shared_secret: vec![], statsd_addr: "127.0.0.1:8126".parse().unwrap(), default_metrics_tags: Default::default(), - kafka_cluster: "127.0.0.1:9092".to_owned(), - kafka_consumer_group: "taskworker".to_owned(), + kafka_cluster: None, + kafka_consumer_group: None, kafka_sasl_mechanism: None, kafka_sasl_username: None, kafka_sasl_password: None, @@ -375,7 +490,7 @@ impl Default for Config { kafka_ssl_certificate_location: None, kafka_ssl_key_location: None, kafka_security_protocol: None, - kafka_topic: "taskworker".to_owned(), + kafka_topic: None, kafka_long_topic: "taskworker-long".to_owned(), create_missing_topics: false, kafka_deadletter_cluster: None, @@ -388,7 +503,6 @@ impl Default for Config { kafka_deadletter_ssl_certificate_location: None, kafka_deadletter_ssl_key_location: None, kafka_retry_topic: None, - kafka_consume_retry_topic: false, default_topic_partitions: 1, kafka_session_timeout_ms: 6000, kafka_auto_commit_interval_ms: 5000, @@ -448,6 +562,8 @@ impl Default for Config { raw_application: None, raw_taskname: None, raw_processing_deadline_duration: 30, + kafka_topics: BTreeMap::new(), + kafka_clusters: BTreeMap::new(), } } } @@ -459,88 +575,420 @@ impl Config { if let Some(path) = &args.config { builder = builder.merge(Yaml::file(path)); } - builder = builder.merge(Env::prefixed("TASKBROKER_")); - let config = builder.extract()?; + // Use split("__") to support nested config via envvars like: + // TASKBROKER_KAFKA_TOPICS__PROFILES__CLUSTER=my-cluster + builder = builder.merge(Env::prefixed("TASKBROKER_").split("__")); + let mut config: Config = builder.extract()?; + config.normalize_and_validate()?; Ok(config) } - /// Convert the application Config into rdkafka::ClientConfig + /// Normalize the legacy single-topic config into the new multi-topic + /// format, then validate the result. + /// + /// The legacy fields (`kafka_topic`, `kafka_cluster`, etc.) and the new + /// fields (`kafka_topics`, `kafka_clusters`) are mutually exclusive: mixing + /// them is a hard error. When only legacy fields are used (including the + /// zero-config case, where the historical `taskworker` defaults apply), they + /// are normalized into `kafka_topics`/`kafka_clusters`. After this, + /// `kafka_topics` and `kafka_clusters` are always populated. + pub(crate) fn normalize_and_validate(&mut self) -> Result<(), Box> { + const DEFAULT_CLUSTER: &str = "default"; + const DEADLETTER_CLUSTER: &str = "deadletter"; + const DEFAULT_TOPIC: &str = "taskworker"; + const DEFAULT_CLUSTER_ADDRESS: &str = "127.0.0.1:9092"; + const DEFAULT_CONSUMER_GROUP: &str = "taskworker"; + + let uses_new_format = !self.kafka_topics.is_empty() || !self.kafka_clusters.is_empty(); + // Any explicitly-set deprecated field describing a cluster (the main + // consumed cluster or the deadletter cluster). kafka_deadletter_topic is + // NOT deprecated and is intentionally excluded. + let uses_legacy = self.kafka_topic.is_some() + || self.kafka_cluster.is_some() + || self.kafka_consumer_group.is_some() + || self.kafka_security_protocol.is_some() + || self.kafka_sasl_mechanism.is_some() + || self.kafka_sasl_username.is_some() + || self.kafka_sasl_password.is_some() + || self.kafka_ssl_ca_location.is_some() + || self.kafka_ssl_certificate_location.is_some() + || self.kafka_ssl_key_location.is_some() + || self.kafka_deadletter_cluster.is_some() + || self.kafka_deadletter_security_protocol.is_some() + || self.kafka_deadletter_sasl_mechanism.is_some() + || self.kafka_deadletter_sasl_username.is_some() + || self.kafka_deadletter_sasl_password.is_some() + || self.kafka_deadletter_ssl_ca_location.is_some() + || self.kafka_deadletter_ssl_certificate_location.is_some() + || self.kafka_deadletter_ssl_key_location.is_some(); + + if uses_new_format && uses_legacy { + return Err(Box::new(figment::Error::from( + "cannot mix the deprecated kafka_cluster/kafka_topic/kafka_consumer_group/\ + kafka_deadletter_cluster (and related kafka_sasl_*/kafka_ssl_*/kafka_deadletter_* \ + auth fields) with kafka_topics/kafka_clusters; use one config format or the other" + .to_owned(), + ))); + } + + if uses_new_format { + // New format: the maps are the source of truth. Require both halves + // so a topic always has a cluster to resolve against. + if self.kafka_topics.is_empty() { + return Err(Box::new(figment::Error::from( + "kafka_clusters is set but kafka_topics is empty".to_owned(), + ))); + } + if self.kafka_clusters.is_empty() { + return Err(Box::new(figment::Error::from( + "kafka_topics is set but kafka_clusters is empty".to_owned(), + ))); + } + } else { + if self.kafka_cluster.is_some() { + warn!("kafka_cluster is deprecated, use kafka_clusters instead"); + } + if self.kafka_topic.is_some() { + warn!("kafka_topic is deprecated, use kafka_topics instead"); + } + if self.kafka_consumer_group.is_some() { + warn!("kafka_consumer_group is deprecated, use kafka_topics instead"); + } + if self.kafka_deadletter_cluster.is_some() { + warn!( + "kafka_deadletter_cluster is deprecated, declare the deadletter topic in \ + kafka_topics with a cluster reference instead" + ); + } + + let topic_name = self + .kafka_topic + .clone() + .unwrap_or_else(|| DEFAULT_TOPIC.to_owned()); + let address = self + .kafka_cluster + .clone() + .unwrap_or_else(|| DEFAULT_CLUSTER_ADDRESS.to_owned()); + let consumer_group = self + .kafka_consumer_group + .clone() + .unwrap_or_else(|| DEFAULT_CONSUMER_GROUP.to_owned()); + + let prev = self.kafka_clusters.insert( + DEFAULT_CLUSTER.to_owned(), + ClusterConfig { + address: address.clone(), + security_protocol: self.kafka_security_protocol.clone(), + sasl_mechanism: self.kafka_sasl_mechanism.clone(), + sasl_username: self.kafka_sasl_username.clone(), + sasl_password: self.kafka_sasl_password.clone(), + ssl_ca_location: self.kafka_ssl_ca_location.clone(), + ssl_certificate_location: self.kafka_ssl_certificate_location.clone(), + ssl_key_location: self.kafka_ssl_key_location.clone(), + }, + ); + assert!( + prev.is_none(), + "internal: duplicate '{DEFAULT_CLUSTER}' cluster" + ); + + // Migrate the deprecated deadletter cluster/auth fields into a + // dedicated cluster. The deadletter producer historically falls back + // to the main cluster's address when kafka_deadletter_cluster is + // unset, while using its own (possibly empty) auth. + let prev = self.kafka_clusters.insert( + DEADLETTER_CLUSTER.to_owned(), + ClusterConfig { + address: self + .kafka_deadletter_cluster + .clone() + .unwrap_or_else(|| address.clone()), + security_protocol: self.kafka_deadletter_security_protocol.clone(), + sasl_mechanism: self.kafka_deadletter_sasl_mechanism.clone(), + sasl_username: self.kafka_deadletter_sasl_username.clone(), + sasl_password: self.kafka_deadletter_sasl_password.clone(), + ssl_ca_location: self.kafka_deadletter_ssl_ca_location.clone(), + ssl_certificate_location: self + .kafka_deadletter_ssl_certificate_location + .clone(), + ssl_key_location: self.kafka_deadletter_ssl_key_location.clone(), + }, + ); + assert!( + prev.is_none(), + "internal: duplicate '{DEADLETTER_CLUSTER}' cluster" + ); + + let raw_config = if self.raw_mode { + Some(RawModeConfig { + namespace: self.raw_namespace.clone(), + application: self.raw_application.clone(), + taskname: self.raw_taskname.clone(), + processing_deadline_duration: Some(self.raw_processing_deadline_duration), + }) + } else { + None + }; + + let prev = self.kafka_topics.insert( + topic_name.clone(), + TopicConfig { + cluster: DEFAULT_CLUSTER.to_owned(), + consumer_group: consumer_group.clone(), + produce_only: false, + raw: raw_config, + session_timeout_ms: None, + auto_commit_interval_ms: None, + auto_offset_reset: None, + }, + ); + assert!(prev.is_none(), "internal: duplicate topic '{topic_name}'"); + + // Register the deadletter topic as a produce-only topic on its own + // cluster. A non-empty return value means it collided with the main + // topic, which would route deadletter messages to the wrong topic. + let prev = self.kafka_topics.insert( + self.kafka_deadletter_topic.clone(), + TopicConfig { + cluster: DEADLETTER_CLUSTER.to_owned(), + consumer_group: consumer_group.clone(), + produce_only: true, + raw: None, + session_timeout_ms: None, + auto_commit_interval_ms: None, + auto_offset_reset: None, + }, + ); + if prev.is_some() { + return Err(Box::new(figment::Error::from(format!( + "kafka_deadletter_topic '{}' must differ from the consumed topic '{}'", + self.kafka_deadletter_topic, topic_name + )))); + } + + // Add the retry topic if configured. Retry topics are produce-only; + // this taskbroker writes retries to them but does not consume from + // them. Aliasing the main topic is allowed (retries are re-enqueued + // there), but aliasing the deadletter topic is not: the names would + // collide and the retry would silently inherit the deadletter + // cluster/role. + if let Some(ref retry_topic) = self.kafka_retry_topic { + if retry_topic == &self.kafka_deadletter_topic { + return Err(Box::new(figment::Error::from(format!( + "kafka_retry_topic '{}' must differ from kafka_deadletter_topic", + retry_topic + )))); + } + self.kafka_topics + .entry(retry_topic.clone()) + .or_insert_with(|| TopicConfig { + cluster: DEFAULT_CLUSTER.to_owned(), + consumer_group, + produce_only: true, + raw: None, + session_timeout_ms: None, + auto_commit_interval_ms: None, + auto_offset_reset: None, + }); + } + } + + // Validate cluster references + for (topic_name, topic_config) in &self.kafka_topics { + self.cluster(&topic_config.cluster).map_err(|_| { + Box::new(figment::Error::from(format!( + "topic '{}' references unknown cluster '{}'", + topic_name, topic_config.cluster + ))) + })?; + } + + // Validate exactly one consumable topic + self.consumable_topic()?; + + // The deadletter topic must be a declared topic so the producer can + // resolve its cluster. In legacy mode it was added above; in the new + // format the user must declare it (produce-only) in kafka_topics. + if !self.kafka_topics.contains_key(&self.kafka_deadletter_topic) { + return Err(Box::new(figment::Error::from(format!( + "kafka_deadletter_topic '{}' is not defined in kafka_topics", + self.kafka_deadletter_topic + )))); + } + + // The upkeep producer connects to the deadletter topic's cluster but is + // also reused to publish retries to the retry topic (or the consumed + // topic when no retry topic is configured). A single producer can only + // reach one cluster, so the retry target must be a declared topic on + // the same cluster address as the deadletter topic; otherwise retries + // would be published to the wrong brokers. In the legacy format the + // retry topic is registered above; in the new format the user must + // declare it in kafka_topics. + let (consumed_topic, _) = self.consumable_topic()?; + let retry_target = self.kafka_retry_topic.as_deref().unwrap_or(consumed_topic); + let retry_topic_config = self.kafka_topics.get(retry_target).ok_or_else(|| { + Box::new(figment::Error::from(format!( + "kafka_retry_topic '{retry_target}' is not defined in kafka_topics" + ))) + })?; + let retry_address = &self.cluster(&retry_topic_config.cluster)?.address; + let deadletter_address = &self + .cluster(&self.kafka_topics[&self.kafka_deadletter_topic].cluster)? + .address; + if retry_address != deadletter_address { + return Err(Box::new(figment::Error::from(format!( + "retry target topic '{}' is on cluster '{}', but deadletter topic '{}' is on \ + '{}'; they share a single producer and must be on the same cluster", + retry_target, retry_address, self.kafka_deadletter_topic, deadletter_address + )))); + } + + Ok(()) + } + + /// Get the single consumable topic and its config. + /// Returns an error if there are zero or multiple consumable topics. + pub fn consumable_topic(&self) -> Result<(&str, &TopicConfig), Box> { + let mut consumable = self + .kafka_topics + .iter() + .filter(|(_, cfg)| !cfg.produce_only); + + let first = consumable.next().ok_or_else(|| { + Box::new(figment::Error::from( + "no consumable topic configured (all topics have produce_only: true)".to_owned(), + )) + })?; + + if consumable.next().is_some() { + let count = self + .kafka_topics + .values() + .filter(|t| !t.produce_only) + .count(); + return Err(Box::new(figment::Error::from(format!( + "multi-topic consumption is not yet supported: {} consumable topics configured, maximum is 1", + count + )))); + } + + Ok((first.0.as_str(), first.1)) + } + + /// Get cluster config by name. + /// Returns an error if the cluster doesn't exist. + pub fn cluster(&self, name: &str) -> Result<&ClusterConfig, Box> { + self.kafka_clusters + .get(name) + .ok_or_else(|| Box::new(figment::Error::from(format!("unknown cluster: {}", name)))) + } + + /// Convert the application Config into rdkafka::ClientConfig for consumer. + /// Uses the single consumable topic's cluster. + /// Panics if config wasn't validated (call from_args, not extract directly). pub fn kafka_consumer_config(&self) -> ClientConfig { + let (_, topic_config) = self + .consumable_topic() + .expect("consumable_topic failed - was config validated?"); + let cluster = self + .cluster(&topic_config.cluster) + .expect("cluster lookup failed - was config validated?"); + + // Per-topic consumer settings, falling back to the global defaults. + let session_timeout_ms = topic_config + .session_timeout_ms + .unwrap_or(self.kafka_session_timeout_ms); + let auto_commit_interval_ms = topic_config + .auto_commit_interval_ms + .unwrap_or(self.kafka_auto_commit_interval_ms); + let auto_offset_reset = topic_config + .auto_offset_reset + .clone() + .unwrap_or_else(|| self.kafka_auto_offset_reset.clone()); + let mut new_config = ClientConfig::new(); let config = new_config - .set("bootstrap.servers", self.kafka_cluster.clone()) - .set("group.id", self.kafka_consumer_group.clone()) - .set( - "session.timeout.ms", - self.kafka_session_timeout_ms.to_string(), - ) + .set("bootstrap.servers", cluster.address.clone()) + .set("group.id", topic_config.consumer_group.clone()) + .set("session.timeout.ms", session_timeout_ms.to_string()) .set("enable.partition.eof", "false") .set("enable.auto.commit", "true") .set( "auto.commit.interval.ms", - self.kafka_auto_commit_interval_ms.to_string(), - ) - .set( - "auto.offset.reset", - self.kafka_auto_offset_reset.to_string(), + auto_commit_interval_ms.to_string(), ) + .set("auto.offset.reset", auto_offset_reset) .set("enable.auto.offset.store", "false"); - if let Some(sasl_mechanism) = &self.kafka_sasl_mechanism { + if let Some(ref sasl_mechanism) = cluster.sasl_mechanism { config.set("sasl.mechanism", sasl_mechanism); } - if let Some(sasl_username) = &self.kafka_sasl_username { + if let Some(ref sasl_username) = cluster.sasl_username { config.set("sasl.username", sasl_username); } - if let Some(sasl_password) = &self.kafka_sasl_password { + if let Some(ref sasl_password) = cluster.sasl_password { config.set("sasl.password", sasl_password); } - if let Some(security_protocol) = &self.kafka_security_protocol { + if let Some(ref security_protocol) = cluster.security_protocol { config.set("security.protocol", security_protocol); } - if let Some(ssl_ca_location) = &self.kafka_ssl_ca_location { + if let Some(ref ssl_ca_location) = cluster.ssl_ca_location { config.set("ssl.ca.location", ssl_ca_location); } - if let Some(ssl_certificate_location) = &self.kafka_ssl_certificate_location { + if let Some(ref ssl_certificate_location) = cluster.ssl_certificate_location { config.set("ssl.certificate.location", ssl_certificate_location); } - if let Some(ssl_private_key_location) = &self.kafka_ssl_key_location { + if let Some(ref ssl_private_key_location) = cluster.ssl_key_location { config.set("ssl.key.location", ssl_private_key_location); } config.clone() } - /// Convert the application Config into rdkafka::ClientConfig + /// The cluster the deadletter / forwarding producer connects to: the + /// cluster of the `kafka_deadletter_topic` entry in `kafka_topics` (in + /// legacy mode this is the migrated "deadletter" cluster). + /// Panics if config wasn't validated. + pub fn kafka_producer_cluster(&self) -> &ClusterConfig { + let dlq_topic = self + .kafka_topics + .get(&self.kafka_deadletter_topic) + .expect("deadletter topic not in kafka_topics - was config validated?"); + self.cluster(&dlq_topic.cluster) + .expect("cluster lookup failed - was config validated?") + } + + /// Convert the application Config into rdkafka::ClientConfig for the + /// deadletter / forwarding producer. The producer connects to the cluster + /// of the `kafka_deadletter_topic` entry in `kafka_topics` (in legacy mode + /// this is the migrated "deadletter" cluster). + /// Panics if config wasn't validated. pub fn kafka_producer_config(&self) -> ClientConfig { + let cluster = self.kafka_producer_cluster(); + let mut new_config = ClientConfig::new(); let config = new_config - .set( - "bootstrap.servers", - self.kafka_deadletter_cluster - .as_ref() - .unwrap_or(&self.kafka_cluster), - ) + .set("bootstrap.servers", cluster.address.clone()) .set("message.max.bytes", format!("{}", self.max_message_size)); - if let Some(sasl_mechanism) = &self.kafka_deadletter_sasl_mechanism { + if let Some(ref sasl_mechanism) = cluster.sasl_mechanism { config.set("sasl.mechanism", sasl_mechanism); } - if let Some(sasl_username) = &self.kafka_deadletter_sasl_username { + if let Some(ref sasl_username) = cluster.sasl_username { config.set("sasl.username", sasl_username); } - if let Some(sasl_password) = &self.kafka_deadletter_sasl_password { + if let Some(ref sasl_password) = cluster.sasl_password { config.set("sasl.password", sasl_password); } - if let Some(security_protocol) = &self.kafka_deadletter_security_protocol { + if let Some(ref security_protocol) = cluster.security_protocol { config.set("security.protocol", security_protocol); } - if let Some(ssl_ca_location) = &self.kafka_deadletter_ssl_ca_location { + if let Some(ref ssl_ca_location) = cluster.ssl_ca_location { config.set("ssl.ca.location", ssl_ca_location); } - if let Some(ssl_certificate_location) = &self.kafka_deadletter_ssl_certificate_location { + if let Some(ref ssl_certificate_location) = cluster.ssl_certificate_location { config.set("ssl.certificate.location", ssl_certificate_location); } - if let Some(ssl_private_key_location) = &self.kafka_deadletter_ssl_key_location { + if let Some(ref ssl_private_key_location) = cluster.ssl_key_location { config.set("ssl.key.location", ssl_private_key_location); } @@ -580,7 +1028,12 @@ mod tests { assert_eq!(config.log_filter, "info,librdkafka=warn,h2=off"); assert_eq!(config.log_format, LogFormat::Text); assert_eq!(config.grpc_port, 50051); - assert_eq!(config.kafka_topic, "taskworker"); + // The legacy kafka fields default to None now; the historical + // "taskworker" default is applied during normalization (see + // test_from_args_env_test). + assert_eq!(config.kafka_topic, None); + assert_eq!(config.kafka_cluster, None); + assert_eq!(config.kafka_consumer_group, None); assert_eq!(config.db_path, "./taskbroker-inflight.sqlite"); assert_eq!(config.max_pending_count, 2048); assert_eq!(config.max_processing_count, 2048); @@ -634,16 +1087,21 @@ mod tests { assert_eq!(config.log_format, LogFormat::Json); assert_eq!( config.kafka_cluster, - "10.0.0.1:9092,10.0.0.2:9092".to_owned() + Some("10.0.0.1:9092,10.0.0.2:9092".to_owned()) ); assert_eq!( config.default_metrics_tags, BTreeMap::from([("key_1".to_owned(), "value_1".to_owned())]) ); - assert_eq!(config.kafka_consumer_group, "taskworker".to_owned()); + // kafka_consumer_group is unset in the yaml, so the legacy field + // stays None and normalization applies the "taskworker" default. + assert_eq!(config.kafka_consumer_group, None); + let (topic_name, topic_config) = config.consumable_topic().unwrap(); + assert_eq!(topic_name, "error-tasks"); + assert_eq!(topic_config.consumer_group, "taskworker"); assert_eq!(config.kafka_auto_offset_reset, "earliest".to_owned()); assert_eq!(config.kafka_session_timeout_ms, 6000.to_owned()); - assert_eq!(config.kafka_topic, "error-tasks".to_owned()); + assert_eq!(config.kafka_topic, Some("error-tasks".to_owned())); assert_eq!(config.kafka_deadletter_topic, "error-tasks-dlq".to_owned()); assert_eq!(config.database_adapter, DatabaseAdapter::Postgres); assert_eq!(config.db_path, "./taskbroker-error.sqlite".to_owned()); @@ -697,7 +1155,15 @@ mod tests { assert_eq!(config.sentry_dsn, None); assert_eq!(config.sentry_env, None); assert_eq!(config.log_filter, "error"); - assert_eq!(config.kafka_topic, "taskworker".to_owned()); + // Zero-config: legacy fields stay None, but normalization applies + // the historical "taskworker" default as the consumable topic. + assert_eq!(config.kafka_topic, None); + let (topic_name, topic_config) = config.consumable_topic().unwrap(); + assert_eq!(topic_name, "taskworker"); + assert_eq!( + config.cluster(&topic_config.cluster).unwrap().address, + "127.0.0.1:9092" + ); assert_eq!(config.kafka_deadletter_topic, "taskworker-dlq".to_owned()); assert_eq!(config.db_path, "./taskbroker-inflight.sqlite".to_owned()); assert_eq!(config.max_pending_count, 2048); @@ -936,4 +1402,621 @@ mod tests { Ok(()) }); } + + #[test] + fn test_multi_topic_config_from_yaml() { + use super::{ClusterConfig, RawModeConfig}; + + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_deadletter_topic: profiles-dlq + +kafka_topics: + profiles: + cluster: profiles-cluster + consumer_group: taskbroker-profiles + raw: + application: profiles + profiles-retry: + cluster: profiles-cluster + consumer_group: taskbroker-profiles-retry + produce_only: true + profiles-dlq: + cluster: profiles-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true + +kafka_clusters: + profiles-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let config = Config::from_args(&args).unwrap(); + + let topics = &config.kafka_topics; + assert_eq!(topics.len(), 3); + + let profiles = topics.get("profiles").unwrap(); + assert_eq!(profiles.cluster, "profiles-cluster"); + assert_eq!(profiles.consumer_group, "taskbroker-profiles"); + assert!(!profiles.produce_only); + assert_eq!( + profiles.raw, + Some(RawModeConfig { + namespace: None, + application: Some("profiles".to_owned()), + taskname: None, + processing_deadline_duration: None, + }) + ); + + let retry = topics.get("profiles-retry").unwrap(); + assert!(retry.produce_only); + + let clusters = &config.kafka_clusters; + // Only the explicitly-declared cluster exists; no legacy "default" + // cluster is injected when the new format is used. + assert_eq!(clusters.len(), 1); + assert_eq!( + clusters.get("profiles-cluster"), + Some(&ClusterConfig { + address: "10.0.0.1:9092".to_owned(), + security_protocol: None, + sasl_mechanism: None, + sasl_username: None, + sasl_password: None, + ssl_ca_location: None, + ssl_certificate_location: None, + ssl_key_location: None, + }) + ); + assert!(!clusters.contains_key("default")); + + // Test consumable_topic() and cluster() helpers + let (topic_name, topic_config) = config.consumable_topic().unwrap(); + assert_eq!(topic_name, "profiles"); + assert_eq!(topic_config.cluster, "profiles-cluster"); + + let cluster = config.cluster("profiles-cluster").unwrap(); + assert_eq!(cluster.address, "10.0.0.1:9092"); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_config_from_env() { + Jail::expect_with(|jail| { + // Note: figment lowercases env var keys after splitting on "__", + // so MY_CLUSTER becomes my_cluster (with underscore, not hyphen). + // The cluster reference value is preserved as-is. + jail.set_env("TASKBROKER_KAFKA_TOPICS__PROFILES__CLUSTER", "my_cluster"); + jail.set_env( + "TASKBROKER_KAFKA_TOPICS__PROFILES__CONSUMER_GROUP", + "taskbroker-profiles", + ); + // The deadletter topic must be a declared topic. The key segment is + // lowercased to "profiles_dlq", so kafka_deadletter_topic must match. + jail.set_env("TASKBROKER_KAFKA_DEADLETTER_TOPIC", "profiles_dlq"); + jail.set_env( + "TASKBROKER_KAFKA_TOPICS__PROFILES_DLQ__CLUSTER", + "my_cluster", + ); + jail.set_env( + "TASKBROKER_KAFKA_TOPICS__PROFILES_DLQ__CONSUMER_GROUP", + "taskbroker-profiles-dlq", + ); + jail.set_env( + "TASKBROKER_KAFKA_TOPICS__PROFILES_DLQ__PRODUCE_ONLY", + "true", + ); + jail.set_env( + "TASKBROKER_KAFKA_CLUSTERS__MY_CLUSTER__ADDRESS", + "10.0.0.2:9092", + ); + + let args = Args { config: None }; + let config = Config::from_args(&args).unwrap(); + + let topics = &config.kafka_topics; + assert_eq!(topics.len(), 2); + + let profiles = topics.get("profiles").unwrap(); + assert_eq!(profiles.cluster, "my_cluster"); + assert_eq!(profiles.consumer_group, "taskbroker-profiles"); + assert!(topics.get("profiles_dlq").unwrap().produce_only); + + let clusters = &config.kafka_clusters; + assert_eq!(clusters.len(), 1); + assert_eq!(clusters.get("my_cluster").unwrap().address, "10.0.0.2:9092"); + assert!(!clusters.contains_key("default")); + + // Test consumable_topic() helper + let (topic_name, _) = config.consumable_topic().unwrap(); + assert_eq!(topic_name, "profiles"); + + Ok(()) + }); + } + + #[test] + fn test_rejects_mixing_legacy_and_new_format() { + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_topic: legacy-topic +kafka_cluster: 127.0.0.1:9092 + +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains("cannot mix"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_requires_clusters() { + // kafka_topics without any kafka_clusters is a misconfiguration. + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains("kafka_clusters is empty"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_validates_cluster_references() { + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_topics: + profiles: + cluster: nonexistent-cluster + consumer_group: taskbroker-profiles + +kafka_clusters: + other-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains("unknown cluster"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_rejects_multiple_consumable_topics() { + Jail::expect_with(|jail| { + // Two consumable topics - the guard for this PR rejects this. + jail.create_file( + "config.yaml", + r#" +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + subscriptions: + cluster: my-cluster + consumer_group: taskbroker-subscriptions + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string() + .contains("multi-topic consumption is not yet supported"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_allows_one_consumable_with_produce_only() { + Jail::expect_with(|jail| { + // One consumable topic (profiles), two produce-only (retry + dlq). + jail.create_file( + "config.yaml", + r#" +kafka_deadletter_topic: profiles-dlq + +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + profiles-retry: + cluster: my-cluster + consumer_group: taskbroker-profiles-retry + produce_only: true + profiles-dlq: + cluster: my-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let config = Config::from_args(&args).unwrap(); + + let topics = &config.kafka_topics; + assert_eq!(topics.len(), 3); + + // One consumable, two produce-only + assert!(!topics.get("profiles").unwrap().produce_only); + assert!(topics.get("profiles-retry").unwrap().produce_only); + assert!(topics.get("profiles-dlq").unwrap().produce_only); + + // consumable_topic() returns the one consumable topic + let (topic_name, _) = config.consumable_topic().unwrap(); + assert_eq!(topic_name, "profiles"); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_rejects_zero_consumable_topics() { + Jail::expect_with(|jail| { + // All topics are produce-only - should fail + jail.create_file( + "config.yaml", + r#" +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + produce_only: true + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains("no consumable topic"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_multi_topic_requires_deadletter_topic() { + // In the new format the deadletter topic must be declared in kafka_topics. + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string() + .contains("kafka_deadletter_topic 'taskworker-dlq' is not defined"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_legacy_rejects_deadletter_topic_equal_to_main_topic() { + // Topics are keyed by name, so a deadletter topic that shares the main + // topic's name cannot carry its own (deadletter) cluster. Rather than + // silently route deadletter messages to the consumed topic's cluster, + // normalization must reject the collision. + Jail::expect_with(|jail| { + jail.set_env("TASKBROKER_KAFKA_TOPIC", "taskworker"); + jail.set_env("TASKBROKER_KAFKA_DEADLETTER_TOPIC", "taskworker"); + + let args = Args { config: None }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains( + "kafka_deadletter_topic 'taskworker' must differ from the consumed topic \ + 'taskworker'" + ), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_legacy_rejects_retry_topic_equal_to_deadletter_topic() { + // The retry topic may alias the main topic, but not the deadletter + // topic: that collision would silently give the retry topic the + // deadletter cluster/role. + Jail::expect_with(|jail| { + jail.set_env("TASKBROKER_KAFKA_RETRY_TOPIC", "taskworker-dlq"); + jail.set_env("TASKBROKER_KAFKA_DEADLETTER_TOPIC", "taskworker-dlq"); + + let args = Args { config: None }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains( + "kafka_retry_topic 'taskworker-dlq' must differ from kafka_deadletter_topic" + ), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_per_topic_consumer_settings_override_globals() { + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_deadletter_topic: profiles-dlq +kafka_session_timeout_ms: 6000 +kafka_auto_commit_interval_ms: 5000 +kafka_auto_offset_reset: latest + +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + session_timeout_ms: 12000 + auto_commit_interval_ms: 1000 + auto_offset_reset: earliest + profiles-dlq: + cluster: my-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let config = Config::from_args(&args).unwrap(); + let consumer_config = config.kafka_consumer_config(); + + // Per-topic values win over the globals. + assert_eq!(consumer_config.get("session.timeout.ms").unwrap(), "12000"); + assert_eq!( + consumer_config.get("auto.commit.interval.ms").unwrap(), + "1000" + ); + assert_eq!( + consumer_config.get("auto.offset.reset").unwrap(), + "earliest" + ); + + Ok(()) + }); + } + + #[test] + fn test_topic_consumer_settings_fall_back_to_globals() { + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_deadletter_topic: profiles-dlq +kafka_session_timeout_ms: 7000 +kafka_auto_commit_interval_ms: 2000 +kafka_auto_offset_reset: earliest + +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + profiles-dlq: + cluster: my-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let config = Config::from_args(&args).unwrap(); + let consumer_config = config.kafka_consumer_config(); + + // No per-topic overrides, so the globals are used. + assert_eq!(consumer_config.get("session.timeout.ms").unwrap(), "7000"); + assert_eq!( + consumer_config.get("auto.commit.interval.ms").unwrap(), + "2000" + ); + assert_eq!( + consumer_config.get("auto.offset.reset").unwrap(), + "earliest" + ); + + Ok(()) + }); + } + + #[test] + fn test_new_format_requires_retry_topic_to_be_declared() { + // In the new format kafka_retry_topic must be declared in kafka_topics + // so its cluster is known; otherwise retries would silently use the + // deadletter producer's cluster. + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_deadletter_topic: profiles-dlq +kafka_retry_topic: profiles-retry + +kafka_topics: + profiles: + cluster: my-cluster + consumer_group: taskbroker-profiles + profiles-dlq: + cluster: my-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string() + .contains("kafka_retry_topic 'profiles-retry' is not defined in kafka_topics"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + + #[test] + fn test_rejects_deadletter_topic_on_different_cluster() { + // The upkeep producer is shared between deadletter publishing and retry + // publishing (to the consumed topic), and a single producer can only + // reach one cluster. Putting the deadletter topic on a different cluster + // than the consumed topic would misroute retries, so it's rejected. + Jail::expect_with(|jail| { + jail.create_file( + "config.yaml", + r#" +kafka_deadletter_topic: profiles-dlq + +kafka_topics: + profiles: + cluster: main-cluster + consumer_group: taskbroker-profiles + profiles-dlq: + cluster: dlq-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true + +kafka_clusters: + main-cluster: + address: 10.0.0.1:9092 + dlq-cluster: + address: 10.9.9.9:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string() + .contains("they share a single producer and must be on the same cluster"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } } diff --git a/src/kafka/activation_batcher.rs b/src/kafka/activation_batcher.rs index 4449db35..a5a1ba73 100644 --- a/src/kafka/activation_batcher.rs +++ b/src/kafka/activation_batcher.rs @@ -31,10 +31,17 @@ pub struct ActivationBatcherConfig { impl ActivationBatcherConfig { /// Convert from application configuration into ActivationBatcher config. pub fn from_config(config: &Config) -> Self { + let (topic_name, topic_config) = config + .consumable_topic() + .expect("no consumable topic configured"); + let cluster = config + .cluster(&topic_config.cluster) + .expect("cluster not found"); + Self { producer_config: config.kafka_producer_config(), - kafka_cluster: config.kafka_cluster.clone(), - kafka_topic: config.kafka_topic.clone(), + kafka_cluster: cluster.address.clone(), + kafka_topic: topic_name.to_owned(), kafka_long_topic: config.kafka_long_topic.clone(), send_timeout_ms: config.kafka_send_timeout_ms, max_batch_time_ms: config.db_insert_batch_max_time_ms, @@ -241,7 +248,9 @@ demoted_namespaces: let runtime_config = Arc::new( RuntimeConfigManager::new(Some(config_file.path().to_str().unwrap().to_string())).await, ); - let config = Arc::new(Config::default()); + let mut config = Config::default(); + config.normalize_and_validate().unwrap(); + let config = Arc::new(config); let mut batcher = ActivationBatcher::new( ActivationBatcherConfig::from_config(&config), runtime_config, @@ -262,7 +271,9 @@ demoted_namespaces: #[tokio::test] async fn test_drop_task_due_to_expiry() { let runtime_config = Arc::new(RuntimeConfigManager::new(None).await); - let config = Arc::new(Config::default()); + let mut config = Config::default(); + config.normalize_and_validate().unwrap(); + let config = Arc::new(config); let mut batcher = ActivationBatcher::new( ActivationBatcherConfig::from_config(&config), runtime_config, @@ -284,11 +295,13 @@ demoted_namespaces: #[tokio::test] async fn test_close_by_bytes_limit() { let runtime_config = Arc::new(RuntimeConfigManager::new(None).await); - let config = Arc::new(Config { + let mut config = Config { db_insert_batch_max_size: 1, db_insert_batch_max_len: 2, ..Default::default() - }); + }; + config.normalize_and_validate().unwrap(); + let config = Arc::new(config); let mut batcher = ActivationBatcher::new( ActivationBatcherConfig::from_config(&config), @@ -312,11 +325,13 @@ demoted_namespaces: #[tokio::test] async fn test_close_by_rows_limit() { let runtime_config = Arc::new(RuntimeConfigManager::new(None).await); - let config = Arc::new(Config { + let mut config = Config { db_insert_batch_max_size: 100000, db_insert_batch_max_len: 2, ..Default::default() - }); + }; + config.normalize_and_validate().unwrap(); + let config = Arc::new(config); let mut batcher = ActivationBatcher::new( ActivationBatcherConfig::from_config(&config), @@ -361,13 +376,21 @@ demoted_topic: taskworker-demoted"#; let runtime_config = Arc::new( RuntimeConfigManager::new(Some(config_file.path().to_str().unwrap().to_string())).await, ); - let config = Arc::new(Config::default()); + let mut config = Config::default(); + config.normalize_and_validate().unwrap(); + let config = Arc::new(config); let mut batcher = ActivationBatcher::new( ActivationBatcherConfig::from_config(&config), runtime_config, ); - assert_eq!(batcher.producer_cluster, config.kafka_cluster.clone()); + let (_, topic_config) = config.consumable_topic().unwrap(); + let cluster_address = config + .cluster(&topic_config.cluster) + .unwrap() + .address + .clone(); + assert_eq!(batcher.producer_cluster, cluster_address); let activation_0 = ActivationBuilder::new() .id("0") diff --git a/src/kafka/deserialize_raw.rs b/src/kafka/deserialize_raw.rs index d885de9f..89577850 100644 --- a/src/kafka/deserialize_raw.rs +++ b/src/kafka/deserialize_raw.rs @@ -45,8 +45,11 @@ impl RawConfig { ); if let Some(ref retry_topic) = config.kafka_retry_topic { + let (main_topic, _) = config + .consumable_topic() + .expect("no consumable topic configured"); assert!( - retry_topic != &config.kafka_topic, + retry_topic != main_topic, "kafka_retry_topic cannot equal kafka_topic when raw_mode is enabled" ); } diff --git a/src/main.rs b/src/main.rs index 8dd4a99d..d26ab9b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -76,9 +76,12 @@ async fn main() -> Result<(), Error> { // If this is an environment where the topics might not exist, check and create them. if config.create_missing_topics { let kafka_client_config = config.kafka_consumer_config(); + let (main_topic, _) = config + .consumable_topic() + .map_err(|e| anyhow!("invalid config: {}", e))?; create_missing_topics( kafka_client_config.clone(), - &config.kafka_topic, + main_topic, config.default_topic_partitions, ) .await?; @@ -161,12 +164,10 @@ async fn main() -> Result<(), Error> { let runtime_config_manager = runtime_config_manager.clone(); // Build list of topics to consume from - let mut topics_to_consume = vec![consumer_config.kafka_topic.clone()]; - if consumer_config.kafka_consume_retry_topic - && let Some(ref retry_topic) = consumer_config.kafka_retry_topic - { - topics_to_consume.push(retry_topic.clone()); - } + let (main_topic, _) = consumer_config + .consumable_topic() + .expect("invalid config: no consumable topic"); + let topics_to_consume = [main_topic.to_owned()]; async move { // The consumer has an internal thread that listens for cancellations, so it doesn't need diff --git a/src/test_utils.rs b/src/test_utils.rs index 3dcbe887..31455a0f 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -264,9 +264,12 @@ pub fn make_activations(count: u32) -> Vec { make_activations_with_namespace(namespace, count) } -/// Create a basic default [`Config`] +/// Create a basic default [`Config`], normalized so the kafka helpers +/// (`consumable_topic`, `cluster`, ...) are usable. pub fn create_config() -> Arc { - Arc::new(Config::default()) + let mut config = Config::default(); + config.normalize_and_validate().unwrap(); + Arc::new(config) } /// Create an ActivationStore instance @@ -295,57 +298,48 @@ pub async fn create_test_store(adapter: &str) -> Arc { } } -/// Create a Config instance that uses a testing topic -/// and earliest auto_offset_reset. This is intended to be combined -/// with [`reset_topic`] -pub fn create_integration_config() -> Arc { - let config = Config { +/// Create a normalized integration [`Config`] from a caller-supplied `base`. +/// The standard integration defaults (pg connection, local kafka cluster, +/// consumer group, earliest offset reset) are applied first, then `base` +/// overrides them, and finally the config is normalized — so any field the +/// caller sets (e.g. `kafka_deadletter_topic`) is in place before the kafka +/// topic maps are built. +pub fn create_integration_config_from_base(base: Config) -> Config { + let mut config = Config { pg_host: get_pg_host(), pg_port: get_pg_port(), pg_username: get_pg_username(), pg_password: get_pg_password(), pg_database_name: get_pg_database_name(), run_migrations: true, - kafka_topic: "taskbroker-test".into(), + kafka_cluster: Some("127.0.0.1:9092".into()), + kafka_consumer_group: Some("taskworker".into()), kafka_auto_offset_reset: "earliest".into(), - ..Config::default() + ..base }; + config.normalize_and_validate().unwrap(); + config +} - Arc::new(config) +/// Create a Config instance that uses a testing topic +/// and earliest auto_offset_reset. This is intended to be combined +/// with [`reset_topic`] +pub fn create_integration_config() -> Arc { + Arc::new(create_integration_config_from_base(Config { + kafka_topic: Some("taskbroker-test".into()), + ..Config::default() + })) } /// Create a Config instance that uses SSL /// and earliest auto_offset_reset. This is intended to be combined /// with [`reset_topic`] pub fn create_integration_config_with_ssl() -> Arc { - let config = Config { - pg_host: get_pg_host(), - pg_port: get_pg_port(), - pg_username: get_pg_username(), - pg_password: get_pg_password(), - pg_database_name: get_pg_database_name(), + Arc::new(create_integration_config_from_base(Config { pg_extra_query_params: Some("sslmode=require".to_string()), - run_migrations: true, - kafka_topic: "taskbroker-test".into(), - kafka_auto_offset_reset: "earliest".into(), + kafka_topic: Some("taskbroker-test".into()), ..Config::default() - }; - - Arc::new(config) -} - -pub fn create_integration_config_with_topic(topic: String) -> Config { - Config { - pg_host: get_pg_host(), - pg_port: get_pg_port(), - pg_username: get_pg_username(), - pg_password: get_pg_password(), - pg_database_name: get_pg_database_name(), - run_migrations: true, - kafka_topic: topic, - kafka_auto_offset_reset: "earliest".into(), - ..Config::default() - } + })) } /// Create a kafka producer for a given config @@ -365,15 +359,13 @@ pub async fn reset_topic(config: Arc) { .create() .expect("Could not create admin client"); + let (main_topic, _) = config.consumable_topic().expect("no consumable topic"); let options = AdminOptions::default(); admin_client - .delete_topics( - &[config.kafka_topic.as_ref(), &config.kafka_deadletter_topic], - &options, - ) + .delete_topics(&[main_topic, &config.kafka_deadletter_topic], &options) .await .expect("Could not delete topic"); - let new_topic = NewTopic::new(&config.kafka_topic, 1, TopicReplication::Fixed(0)); + let new_topic = NewTopic::new(main_topic, 1, TopicReplication::Fixed(0)); let new_dlq_topic = NewTopic::new( &config.kafka_deadletter_topic, 1, diff --git a/src/upkeep.rs b/src/upkeep.rs index faee0ff1..e407cd10 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -14,7 +14,7 @@ use sentry_protos::taskbroker::v1::TaskActivation; use tokio::{fs, join, select, time}; use tonic_health::ServingStatus; use tonic_health::server::HealthReporter; -use tracing::{debug, error, info, instrument}; +use tracing::{debug, error, info, instrument, warn}; use uuid::Uuid; use crate::SERVICE_NAME; @@ -156,10 +156,12 @@ pub async fn do_upkeep( let handle_retries_start = Instant::now(); if let Ok(retries) = store.get_retry_activations().await { // Use retry topic if configured, otherwise fall back to main topic + let (main_topic, _) = config.consumable_topic().expect("no consumable topic"); + let main_topic_owned = main_topic.to_owned(); let retry_target_topic = config .kafka_retry_topic .as_ref() - .unwrap_or(&config.kafka_topic); + .unwrap_or(&main_topic_owned); // 2. Append retries to kafka let deliveries = retries @@ -320,18 +322,38 @@ pub async fn do_upkeep( // 12. Forward tasks from demoted namespaces to `runtime_config.demoted_topic` let demoted_namespaces = runtime_config.demoted_namespaces.clone(); + let (main_topic, main_topic_config) = config.consumable_topic().expect("no consumable topic"); + let main_cluster = config + .cluster(&main_topic_config.cluster) + .expect("cluster not found") + .address + .clone(); let forward_cluster = runtime_config .demoted_topic_cluster .clone() - .unwrap_or(config.kafka_cluster.clone()); + .unwrap_or(main_cluster.clone()); let forward_topic = runtime_config .demoted_topic .clone() .unwrap_or(config.kafka_long_topic.clone()); - let same_cluster = forward_cluster == config.kafka_cluster; - let same_topic = forward_topic == config.kafka_topic; + let same_cluster = forward_cluster == main_cluster; + let same_topic = forward_topic == main_topic; if !(demoted_namespaces.is_empty() || (same_cluster && same_topic)) { let forward_demoted_start = Instant::now(); + // The forwarding producer reuses the deadletter cluster's credentials + // (see Config::kafka_producer_config) and only overrides + // bootstrap.servers. If we're forwarding to a different cluster that has + // its own auth, those credentials won't apply there and publishing will + // fail. We keep the legacy behavior (reuse + override) for compatibility + // but warn so the misconfiguration is visible. + let dlq_cluster = config.kafka_producer_cluster(); + if forward_cluster != dlq_cluster.address && dlq_cluster.has_auth() { + warn!( + "forwarding demoted-namespace tasks to cluster '{}', but the producer reuses the \ + deadletter cluster '{}' credentials; publishing may fail if their auth differs", + forward_cluster, dlq_cluster.address + ); + } let mut forward_producer_config = config.kafka_producer_config(); forward_producer_config.set("bootstrap.servers", &forward_cluster); let forward_producer: Arc = Arc::new( @@ -591,7 +613,7 @@ mod tests { use crate::store::activation::ActivationStatus; use crate::test_utils::{ StatusCount, assert_counts, consume_topic, create_config, - create_integration_config_with_topic, create_producer, create_test_store, make_activations, + create_integration_config_from_base, create_producer, create_test_store, make_activations, replace_retry_state, reset_topic, }; use crate::upkeep::{create_retry_activation, do_upkeep}; @@ -682,10 +704,11 @@ mod tests { #[case::sqlite("sqlite")] #[case::postgres("postgres")] async fn test_retry_activation_is_appended_to_kafka(#[case] adapter: &str) { - let config = Arc::new(Config { + let config = Arc::new(create_integration_config_from_base(Config { + kafka_topic: Some(format!("taskbroker-test-{adapter}")), kafka_deadletter_topic: format!("taskbroker-test-{adapter}-dlq"), - ..create_integration_config_with_topic(format!("taskbroker-test-{adapter}")) - }); + ..Default::default() + })); let runtime_config = Arc::new(RuntimeConfigManager::new(None).await); let start_time = Utc::now(); @@ -742,7 +765,8 @@ mod tests { assert_eq!(store.count().await.unwrap(), 1); assert_eq!(result_context.retried, 1); - let messages = consume_topic(config.clone(), config.kafka_topic.as_ref(), 1).await; + let (main_topic, _) = config.consumable_topic().unwrap(); + let messages = consume_topic(config.clone(), main_topic, 1).await; assert_eq!(messages.len(), 1); let activation = &messages[0]; @@ -1038,10 +1062,11 @@ mod tests { #[case::sqlite("sqlite")] #[case::postgres("postgres")] async fn test_remove_at_remove_failed_publish_to_kafka(#[case] adapter: &str) { - let config = Arc::new(Config { + let config = Arc::new(create_integration_config_from_base(Config { + kafka_topic: Some(format!("taskbroker-test-{adapter}")), kafka_deadletter_topic: format!("taskbroker-test-{adapter}-dlq"), - ..create_integration_config_with_topic(format!("taskbroker-test-{adapter}")) - }); + ..Default::default() + })); let runtime_config = Arc::new(RuntimeConfigManager::new(None).await); reset_topic(config.clone()).await; @@ -1423,11 +1448,11 @@ demoted_namespaces: #[case::sqlite("sqlite")] #[case::postgres("postgres")] async fn test_full_vacuum_on_upkeep(#[case] adapter: &str) { - let raw_config = Config { + let config = Arc::new(create_integration_config_from_base(Config { + kafka_topic: Some(format!("taskbroker-test-full-vacuum-{adapter}")), full_vacuum_on_start: true, ..Default::default() - }; - let config = Arc::new(raw_config); + })); let runtime_config = Arc::new(RuntimeConfigManager::new(None).await); let store = create_test_store(adapter).await;