diff --git a/src/config/deprecated.rs b/src/config/deprecated.rs index 22c575b0..53cb31a3 100644 --- a/src/config/deprecated.rs +++ b/src/config/deprecated.rs @@ -5,15 +5,57 @@ use crate::config::store::DatabaseAdapter; macro_rules! map { () => {}; - // An optional deprecated value always wins - ($deprecated:expr => some($current:expr)) => { - $current = $deprecated.take(); + // Two or more transformed mappings separated by commas + ( + $deprecated_base:ident $(.$deprecated_field:ident)+ as $mapper:path => $current_base:ident $(.$current_field:ident)+ if $provided:ident, + $($rest:tt)+ + ) => { + crate::config::deprecated::map! { + $deprecated_base$(.$deprecated_field)+ as $mapper => $current_base$(.$current_field)+ if $provided + }; + + crate::config::deprecated::map! { + $($rest)+ + }; + }; + + // Two or more mappings separated by commas + ( + $deprecated:expr => $current_base:ident $(.$current_field:ident)+ if $provided:ident, + $($rest:tt)+ + ) => { + crate::config::deprecated::map! { + $deprecated => $current_base$(.$current_field)+ if $provided + }; + + crate::config::deprecated::map! { + $($rest)+ + }; + }; + + // A transformed deprecated value only wins when it's provided + ($deprecated_base:ident $(.$deprecated_field:ident)+ as $mapper:path => $current_base:ident $(.$current_field:ident)+ if $provided:ident) => { + let key = concat!(stringify!($current_base), $(".", stringify!($current_field)),+) + .strip_prefix("self.") + .unwrap(); + + if !$provided(key) { + if let Some(v) = $deprecated_base$(.$deprecated_field)+.take() { + $current_base$(.$current_field)+ = $mapper(v); + } + } }; // A plain deprecated value only wins when it's provided - ($deprecated:expr => $current:expr) => { - if let Some(v) = $deprecated.take() { - $current = v; + ($deprecated:expr => $current_base:ident $(.$current_field:ident)+ if $provided:ident) => { + let key = concat!(stringify!($current_base), $(".", stringify!($current_field)),+) + .strip_prefix("self.") + .unwrap(); + + if !$provided(key) { + if let Some(v) = $deprecated.take() { + $current_base$(.$current_field)+ = v; + } } }; } diff --git a/src/config/fetch.rs b/src/config/fetch.rs index 26d22059..30ed4b63 100644 --- a/src/config/fetch.rs +++ b/src/config/fetch.rs @@ -1,23 +1,15 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; -use validator::{Validate, ValidationError}; +use validator::Validate; +use crate::config::validate; use crate::fetch::MAX_FETCH_THREADS; -// (TODO) Create a `validate` module to keep all of our custom validators (there are now at least two). -fn validate_power_of_two(n: usize) -> Result<(), ValidationError> { - if n.is_power_of_two() { - Ok(()) - } else { - Err(ValidationError::new("not_power_of_two")) - } -} - #[derive(PartialEq, Debug, Deserialize, Serialize, Validate)] pub struct FetchConfig { /// The number of concurrent fetch loops in push mode, which should be ≤ `MAX_FETCH_THREADS` and a power of two. - #[validate(range(min = 1, max = MAX_FETCH_THREADS), custom(function = "validate_power_of_two"))] + #[validate(range(min = 1, max = MAX_FETCH_THREADS), custom(function = "validate::power_of_two"))] pub threads: usize, /// Time in milliseconds to wait between fetch attempts when no pending activation is found. diff --git a/src/config/mod.rs b/src/config/mod.rs index 0f1beb4e..e5e8a414 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -24,6 +24,7 @@ pub mod kafka; pub mod push; pub mod raw; pub mod store; +pub mod validate; use deprecated::DeprecatedConfig; use kafka::{ClusterConfig, TopicConfig}; @@ -313,232 +314,81 @@ impl Config { /// - If the user does not provide `store.db_max_size`, use the deprecated field `db_max_size` instead fn map_deprecated_options(&mut self, builder: &mut Figment) { // Nested function definition since it's not used anywhere else - fn user_provided(builder: &Figment, key: &str) -> bool { + let provided = |key: &str| { builder .find_metadata(key) .is_some_and(|metadata| metadata.name != DEFAULT_CONFIG_PROVIDER) - } - - // Map deprecated push mode configuration options - if !user_provided(builder, "fetch.threads") { - deprecated::map! { - self.deprecated.fetch_threads => self.fetch.threads - }; - } - - if !user_provided(builder, "fetch.backoff") - && let Some(v) = self.deprecated.fetch_wait_ms - { - self.fetch.backoff = Duration::from_millis(v); - } - - if !user_provided(builder, "fetch.batch_length") { - deprecated::map! { - self.deprecated.fetch_batch_size => self.fetch.batch_length - }; - } - - if !user_provided(builder, "push.threads") { - deprecated::map! { - self.deprecated.push_threads => self.push.threads - }; - } - - if !user_provided(builder, "push.timeout") - && let Some(v) = self.deprecated.push_timeout_ms - { - self.push.timeout = Duration::from_millis(v); - } - - if !user_provided(builder, "push.queue.size") { - deprecated::map! { - self.deprecated.push_queue_size => self.push.queue.size - }; - } + }; - if !user_provided(builder, "push.queue.timeout") - && let Some(v) = self.deprecated.push_queue_timeout_ms - { - self.push.queue.timeout = Duration::from_millis(v); + // Convert a 'u64' into a duration. + fn duration>(v: T) -> Duration { + Duration::from_millis(v.into()) } - if !user_provided(builder, "push.update.batched") { - deprecated::map! { - self.deprecated.batch_push_updates => self.push.update.batched - }; + // Wrap anything inside `Some`. + fn optional(v: T) -> Option { + Some(v) } - if !user_provided(builder, "push.update.batch.length") { - deprecated::map! { - self.deprecated.push_update_batch_size => self.push.update.batch.length - }; - } + // Map deprecated fetch configuration options + deprecated::map! { + self.deprecated.fetch_threads => self.fetch.threads if provided, + self.deprecated.fetch_batch_size => self.fetch.batch_length if provided, + self.deprecated.fetch_wait_ms as duration => self.fetch.backoff if provided + }; - if !user_provided(builder, "push.update.batch.interval") - && let Some(v) = self.deprecated.push_update_interval_ms - { - self.push.update.batch.interval = Duration::from_millis(v.into()); - } + // Map deprecated push configuration options + deprecated::map! { + self.deprecated.push_threads => self.push.threads if provided, + self.deprecated.push_queue_size => self.push.queue.size if provided, + self.deprecated.batch_push_updates => self.push.update.batched if provided, + self.deprecated.push_update_batch_size => self.push.update.batch.length if provided, + self.deprecated.push_timeout_ms as duration => self.push.timeout if provided, + self.deprecated.push_queue_timeout_ms as duration => self.push.queue.timeout if provided, + self.deprecated.push_update_interval_ms as duration => self.push.update.batch.interval if provided + }; // Map deprecated Postgres configuration options - if !user_provided(builder, "store.pg.run_migrations") { - deprecated::map! { - self.deprecated.run_migrations => self.store.pg.run_migrations - }; - } - - if !user_provided(builder, "store.pg.host") { - deprecated::map! { - self.deprecated.pg_host => self.store.pg.host - }; - } - - if !user_provided(builder, "store.pg.port") { - deprecated::map! { - self.deprecated.pg_port => self.store.pg.port - }; - } - - if !user_provided(builder, "store.pg.ddl_username") { - deprecated::map! { - self.deprecated.pg_ddl_username => self.store.pg.ddl_username - }; - } - - if !user_provided(builder, "store.pg.username") { - deprecated::map! { - self.deprecated.pg_username => self.store.pg.username - }; - } - - if !user_provided(builder, "store.pg.password") { - deprecated::map! { - self.deprecated.pg_password => self.store.pg.password - }; - } - - if !user_provided(builder, "store.pg.ddl_password") { - deprecated::map! { - self.deprecated.pg_ddl_password => self.store.pg.ddl_password - }; - } - - if !user_provided(builder, "store.pg.database_name") { - deprecated::map! { - self.deprecated.pg_database_name => self.store.pg.database_name - }; - } - - if !user_provided(builder, "store.pg.default_database_name") { - deprecated::map! { - self.deprecated.pg_default_database_name => self.store.pg.default_database_name - }; - } - - if !user_provided(builder, "store.pg.query_params") { - deprecated::map! { - self.deprecated.pg_extra_query_params => some(self.store.pg.query_params) - }; - } + deprecated::map! { + self.deprecated.run_migrations => self.store.pg.run_migrations if provided, + self.deprecated.pg_host => self.store.pg.host if provided, + self.deprecated.pg_port => self.store.pg.port if provided, + self.deprecated.pg_ddl_username => self.store.pg.ddl_username if provided, + self.deprecated.pg_username => self.store.pg.username if provided, + self.deprecated.pg_password => self.store.pg.password if provided, + self.deprecated.pg_ddl_password => self.store.pg.ddl_password if provided, + self.deprecated.pg_database_name => self.store.pg.database_name if provided, + self.deprecated.pg_default_database_name => self.store.pg.default_database_name if provided, + self.deprecated.pg_extra_query_params as optional => self.store.pg.query_params if provided + }; // Map deprecated SQLite configuration options - if !user_provided(builder, "store.sqlite.path") { - deprecated::map! { - self.deprecated.db_path => self.store.sqlite.path - }; - } - - if !user_provided(builder, "store.sqlite.vacuum_page_count") { - deprecated::map! { - self.deprecated.vacuum_page_count => some(self.store.sqlite.vacuum_page_count) - }; - } - - if !user_provided(builder, "store.sqlite.enable_status_metrics") { - deprecated::map! { - self.deprecated.enable_sqlite_status_metrics => self.store.sqlite.enable_status_metrics - }; - } + deprecated::map! { + self.deprecated.db_path => self.store.sqlite.path if provided, + self.deprecated.vacuum_page_count as optional => self.store.sqlite.vacuum_page_count if provided, + self.deprecated.enable_sqlite_status_metrics => self.store.sqlite.enable_status_metrics if provided + }; // Map deprecated retry configuration options - if !user_provided(builder, "store.retry.max_retries") { - deprecated::map! { - self.deprecated.db_query_max_retries => self.store.retry.max_retries - }; - } - - if !user_provided(builder, "store.retry.delay") - && let Some(v) = self.deprecated.db_query_retry_delay_ms - { - self.store.retry.delay = Duration::from_millis(v); - } + deprecated::map! { + self.deprecated.db_query_max_retries => self.store.retry.max_retries if provided, + self.deprecated.db_query_retry_delay_ms as duration => self.store.retry.delay if provided + }; // Map deprecated store configuration options - if !user_provided(builder, "store.adapter") { - deprecated::map! { - self.deprecated.database_adapter => self.store.adapter - }; - } - - if !user_provided(builder, "store.insert_failure_backoff_ms") { - deprecated::map! { - self.deprecated.db_write_failure_backoff_ms => self.store.insert_failure_backoff_ms - }; - } - - if !user_provided(builder, "store.insert_batch_max_length") { - deprecated::map! { - self.deprecated.db_insert_batch_max_len => self.store.insert_batch_max_length - }; - } - - if !user_provided(builder, "store.insert_batch_max_bytes") { - deprecated::map! { - self.deprecated.db_insert_batch_max_size => self.store.insert_batch_max_bytes - }; - } - - if !user_provided(builder, "store.insert_batch_max_time_ms") { - deprecated::map! { - self.deprecated.db_insert_batch_max_time_ms => self.store.insert_batch_max_time_ms - }; - } - - if !user_provided(builder, "store.max_size") { - deprecated::map! { - self.deprecated.db_max_size => some(self.store.max_size) - }; - } - - if !user_provided(builder, "store.max_pending_count") { - deprecated::map! { - self.deprecated.max_pending_count => self.store.max_pending_count - }; - } - - if !user_provided(builder, "store.max_delay_count") { - deprecated::map! { - self.deprecated.max_delay_count => self.store.max_delay_count - }; - } - - if !user_provided(builder, "store.max_processing_count") { - deprecated::map! { - self.deprecated.max_processing_count => self.store.max_processing_count - }; - } - - if !user_provided(builder, "store.max_processing_attempts") { - deprecated::map! { - self.deprecated.max_processing_attempts => self.store.max_processing_attempts - }; - } - - if !user_provided(builder, "store.processing_deadline_grace_sec") { - deprecated::map! { - self.deprecated.processing_deadline_grace_sec => self.store.processing_deadline_grace_sec - }; - } + deprecated::map! { + self.deprecated.database_adapter => self.store.adapter if provided, + self.deprecated.db_write_failure_backoff_ms => self.store.insert_failure_backoff_ms if provided, + self.deprecated.db_insert_batch_max_len => self.store.insert_batch_max_length if provided, + self.deprecated.db_insert_batch_max_size => self.store.insert_batch_max_bytes if provided, + self.deprecated.db_insert_batch_max_time_ms => self.store.insert_batch_max_time_ms if provided, + self.deprecated.db_max_size as optional => self.store.max_size if provided, + self.deprecated.max_pending_count => self.store.max_pending_count if provided, + self.deprecated.max_delay_count => self.store.max_delay_count if provided, + self.deprecated.max_processing_count => self.store.max_processing_count if provided, + self.deprecated.max_processing_attempts => self.store.max_processing_attempts if provided, + self.deprecated.processing_deadline_grace_sec => self.store.processing_deadline_grace_sec if provided + }; } /// Normalize the legacy single-topic config into the new multi-topic diff --git a/src/config/push.rs b/src/config/push.rs index 45c909c9..ddcc4c73 100644 --- a/src/config/push.rs +++ b/src/config/push.rs @@ -1,18 +1,10 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; -use validator::{Validate, ValidationError}; +use validator::Validate; use crate::config::batch::BatchConfig; - -// (TODO) Create a `validate` module to keep all of our custom validators (there are now at least two). -fn validate_nonzero_duration(duration: &Duration) -> Result<(), ValidationError> { - if duration.is_zero() { - Err(ValidationError::new("nonzero_duration")) - } else { - Ok(()) - } -} +use crate::config::validate; #[derive(PartialEq, Debug, Deserialize, Serialize, Validate)] pub struct PushQueueConfig { @@ -22,7 +14,7 @@ pub struct PushQueueConfig { /// Maximum time to wait when submitting an activation to the push pool. #[serde(with = "crate::serde::duration")] - #[validate(custom(function = "validate_nonzero_duration"))] + #[validate(custom(function = "validate::nonzero_duration"))] pub timeout: Duration, } @@ -66,7 +58,7 @@ pub struct PushConfig { /// Maximum time for a single push RPC to the worker service. This should be greater than the worker's internal timeout. #[serde(with = "crate::serde::duration")] - #[validate(custom(function = "validate_nonzero_duration"))] + #[validate(custom(function = "validate::nonzero_duration"))] pub timeout: Duration, /// The push queue configuration. diff --git a/src/config/validate.rs b/src/config/validate.rs new file mode 100644 index 00000000..d9240154 --- /dev/null +++ b/src/config/validate.rs @@ -0,0 +1,20 @@ +use std::time::Duration; +use validator::ValidationError; + +/// Ensure `n` is a power of two. Used to validate fetch thread count. +pub fn power_of_two(n: usize) -> Result<(), ValidationError> { + if n.is_power_of_two() { + Ok(()) + } else { + Err(ValidationError::new("not_power_of_two")) + } +} + +/// Ensure duration is greater than zero to avoid crashes when initializing intervals. +pub fn nonzero_duration(duration: &Duration) -> Result<(), ValidationError> { + if duration.is_zero() { + Err(ValidationError::new("nonzero_duration")) + } else { + Ok(()) + } +}