From 783a705a75ee8a66a77493280413145e05646a7c Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 18 Jun 2026 11:38:26 -0700 Subject: [PATCH 1/4] Improve Deprecated Mapping Macro --- src/config/deprecated.rs | 50 +++++++- src/config/mod.rs | 240 +++++++++------------------------------ 2 files changed, 97 insertions(+), 193 deletions(-) diff --git a/src/config/deprecated.rs b/src/config/deprecated.rs index 22c575b0..5b395ad7 100644 --- a/src/config/deprecated.rs +++ b/src/config/deprecated.rs @@ -5,15 +5,55 @@ use crate::config::store::DatabaseAdapter; macro_rules! map { () => {}; + // Two or more optional mappings separated by commans + ( + $deprecated:expr => some($current_base:ident $(.$current_field:ident)+) if $provided:ident, + $($rest:tt)+ + ) => { + crate::config::deprecated::map! { + $deprecated => some($current_base$(.$current_field)+) if $provided + }; + + crate::config::deprecated::map! { + $($rest)+ + }; + }; + + // Two or more mappings separated by commans + ( + $deprecated:expr => $current_base:ident $(.$current_field:ident)+ if $provided:ident, + $($rest:tt)+ + ) => { + crate::config::deprecated::map! { + $deprecated => $current_base$(.$current_field)+ if $provided + }; + + crate::config::deprecated::map! { + $($rest)+ + }; + }; + // An optional deprecated value always wins - ($deprecated:expr => some($current:expr)) => { - $current = $deprecated.take(); + ($deprecated:expr => some($current_base:ident $(.$current_field:ident)+) if $provided:ident) => { + let key = stringify!($current_base$(.$current_field)+) + .strip_prefix("self.") + .unwrap_or(stringify!($current_base$(.$current_field)+)); + + if !$provided(key) { + $current_base$(.$current_field)+ = $deprecated.take(); + } }; // A plain deprecated value only wins when it's provided - ($deprecated:expr => $current:expr) => { - if let Some(v) = $deprecated.take() { - $current = v; + ($deprecated:expr => $current_base:ident $(.$current_field:ident)+ if $provided:ident) => { + let key = stringify!($current_base$(.$current_field)+) + .strip_prefix("self.") + .unwrap_or(stringify!($current_base$(.$current_field)+)); + + if !$provided(key) { + if let Some(v) = $deprecated.take() { + $current_base$(.$current_field)+ = v; + } } }; } diff --git a/src/config/mod.rs b/src/config/mod.rs index 0f1beb4e..efc71460 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -313,232 +313,96 @@ impl Config { /// - If the user does not provide `store.db_max_size`, use the deprecated field `db_max_size` instead fn map_deprecated_options(&mut self, builder: &mut Figment) { // Nested function definition since it's not used anywhere else - fn user_provided(builder: &Figment, key: &str) -> bool { + let provided = |key: &str| { builder .find_metadata(key) .is_some_and(|metadata| metadata.name != DEFAULT_CONFIG_PROVIDER) - } + }; - // Map deprecated push mode configuration options - if !user_provided(builder, "fetch.threads") { - deprecated::map! { - self.deprecated.fetch_threads => self.fetch.threads - }; - } + // Map deprecated fetch configuration options + deprecated::map! { + self.deprecated.fetch_threads => self.fetch.threads if provided, + self.deprecated.fetch_batch_size => self.fetch.batch_length if provided + }; - if !user_provided(builder, "fetch.backoff") + if !provided("fetch.backoff") && let Some(v) = self.deprecated.fetch_wait_ms { self.fetch.backoff = Duration::from_millis(v); } - if !user_provided(builder, "fetch.batch_length") { - deprecated::map! { - self.deprecated.fetch_batch_size => self.fetch.batch_length - }; - } - - if !user_provided(builder, "push.threads") { - deprecated::map! { - self.deprecated.push_threads => self.push.threads - }; - } + // Map deprecated push configuration options + deprecated::map! { + self.deprecated.push_threads => self.push.threads if provided, + self.deprecated.push_queue_size => self.push.queue.size if provided, + self.deprecated.batch_push_updates => self.push.update.batched if provided, + self.deprecated.push_update_batch_size => self.push.update.batch.length if provided + }; - if !user_provided(builder, "push.timeout") + if !provided("push.timeout") && let Some(v) = self.deprecated.push_timeout_ms { self.push.timeout = Duration::from_millis(v); } - if !user_provided(builder, "push.queue.size") { - deprecated::map! { - self.deprecated.push_queue_size => self.push.queue.size - }; - } - - if !user_provided(builder, "push.queue.timeout") + if !provided("push.queue.timeout") && let Some(v) = self.deprecated.push_queue_timeout_ms { self.push.queue.timeout = Duration::from_millis(v); } - if !user_provided(builder, "push.update.batched") { - deprecated::map! { - self.deprecated.batch_push_updates => self.push.update.batched - }; - } - - if !user_provided(builder, "push.update.batch.length") { - deprecated::map! { - self.deprecated.push_update_batch_size => self.push.update.batch.length - }; - } - - if !user_provided(builder, "push.update.batch.interval") + if !provided("push.update.batch.interval") && let Some(v) = self.deprecated.push_update_interval_ms { self.push.update.batch.interval = Duration::from_millis(v.into()); } // Map deprecated Postgres configuration options - if !user_provided(builder, "store.pg.run_migrations") { - deprecated::map! { - self.deprecated.run_migrations => self.store.pg.run_migrations - }; - } - - if !user_provided(builder, "store.pg.host") { - deprecated::map! { - self.deprecated.pg_host => self.store.pg.host - }; - } - - if !user_provided(builder, "store.pg.port") { - deprecated::map! { - self.deprecated.pg_port => self.store.pg.port - }; - } - - if !user_provided(builder, "store.pg.ddl_username") { - deprecated::map! { - self.deprecated.pg_ddl_username => self.store.pg.ddl_username - }; - } - - if !user_provided(builder, "store.pg.username") { - deprecated::map! { - self.deprecated.pg_username => self.store.pg.username - }; - } - - if !user_provided(builder, "store.pg.password") { - deprecated::map! { - self.deprecated.pg_password => self.store.pg.password - }; - } - - if !user_provided(builder, "store.pg.ddl_password") { - deprecated::map! { - self.deprecated.pg_ddl_password => self.store.pg.ddl_password - }; - } - - if !user_provided(builder, "store.pg.database_name") { - deprecated::map! { - self.deprecated.pg_database_name => self.store.pg.database_name - }; - } - - if !user_provided(builder, "store.pg.default_database_name") { - deprecated::map! { - self.deprecated.pg_default_database_name => self.store.pg.default_database_name - }; - } - - if !user_provided(builder, "store.pg.query_params") { - deprecated::map! { - self.deprecated.pg_extra_query_params => some(self.store.pg.query_params) - }; - } + deprecated::map! { + self.deprecated.run_migrations => self.store.pg.run_migrations if provided, + self.deprecated.pg_host => self.store.pg.host if provided, + self.deprecated.pg_port => self.store.pg.port if provided, + self.deprecated.pg_ddl_username => self.store.pg.ddl_username if provided, + self.deprecated.pg_username => self.store.pg.username if provided, + self.deprecated.pg_password => self.store.pg.password if provided, + self.deprecated.pg_ddl_password => self.store.pg.ddl_password if provided, + self.deprecated.pg_database_name => self.store.pg.database_name if provided, + self.deprecated.pg_default_database_name => self.store.pg.default_database_name if provided, + self.deprecated.pg_extra_query_params => some(self.store.pg.query_params) if provided + }; // Map deprecated SQLite configuration options - if !user_provided(builder, "store.sqlite.path") { - deprecated::map! { - self.deprecated.db_path => self.store.sqlite.path - }; - } - - if !user_provided(builder, "store.sqlite.vacuum_page_count") { - deprecated::map! { - self.deprecated.vacuum_page_count => some(self.store.sqlite.vacuum_page_count) - }; - } - - if !user_provided(builder, "store.sqlite.enable_status_metrics") { - deprecated::map! { - self.deprecated.enable_sqlite_status_metrics => self.store.sqlite.enable_status_metrics - }; - } + deprecated::map! { + self.deprecated.db_path => self.store.sqlite.path if provided, + self.deprecated.vacuum_page_count => some(self.store.sqlite.vacuum_page_count) if provided, + self.deprecated.enable_sqlite_status_metrics => self.store.sqlite.enable_status_metrics if provided + }; // Map deprecated retry configuration options - if !user_provided(builder, "store.retry.max_retries") { - deprecated::map! { - self.deprecated.db_query_max_retries => self.store.retry.max_retries - }; - } + deprecated::map! { + self.deprecated.db_query_max_retries => self.store.retry.max_retries if provided + }; - if !user_provided(builder, "store.retry.delay") + if !provided("store.retry.delay") && let Some(v) = self.deprecated.db_query_retry_delay_ms { self.store.retry.delay = Duration::from_millis(v); } // Map deprecated store configuration options - if !user_provided(builder, "store.adapter") { - deprecated::map! { - self.deprecated.database_adapter => self.store.adapter - }; - } - - if !user_provided(builder, "store.insert_failure_backoff_ms") { - deprecated::map! { - self.deprecated.db_write_failure_backoff_ms => self.store.insert_failure_backoff_ms - }; - } - - if !user_provided(builder, "store.insert_batch_max_length") { - deprecated::map! { - self.deprecated.db_insert_batch_max_len => self.store.insert_batch_max_length - }; - } - - if !user_provided(builder, "store.insert_batch_max_bytes") { - deprecated::map! { - self.deprecated.db_insert_batch_max_size => self.store.insert_batch_max_bytes - }; - } - - if !user_provided(builder, "store.insert_batch_max_time_ms") { - deprecated::map! { - self.deprecated.db_insert_batch_max_time_ms => self.store.insert_batch_max_time_ms - }; - } - - if !user_provided(builder, "store.max_size") { - deprecated::map! { - self.deprecated.db_max_size => some(self.store.max_size) - }; - } - - if !user_provided(builder, "store.max_pending_count") { - deprecated::map! { - self.deprecated.max_pending_count => self.store.max_pending_count - }; - } - - if !user_provided(builder, "store.max_delay_count") { - deprecated::map! { - self.deprecated.max_delay_count => self.store.max_delay_count - }; - } - - if !user_provided(builder, "store.max_processing_count") { - deprecated::map! { - self.deprecated.max_processing_count => self.store.max_processing_count - }; - } - - if !user_provided(builder, "store.max_processing_attempts") { - deprecated::map! { - self.deprecated.max_processing_attempts => self.store.max_processing_attempts - }; - } - - if !user_provided(builder, "store.processing_deadline_grace_sec") { - deprecated::map! { - self.deprecated.processing_deadline_grace_sec => self.store.processing_deadline_grace_sec - }; - } + deprecated::map! { + self.deprecated.database_adapter => self.store.adapter if provided, + self.deprecated.db_write_failure_backoff_ms => self.store.insert_failure_backoff_ms if provided, + self.deprecated.db_insert_batch_max_len => self.store.insert_batch_max_length if provided, + self.deprecated.db_insert_batch_max_size => self.store.insert_batch_max_bytes if provided, + self.deprecated.db_insert_batch_max_time_ms => self.store.insert_batch_max_time_ms if provided, + self.deprecated.db_max_size => some(self.store.max_size) if provided, + self.deprecated.max_pending_count => self.store.max_pending_count if provided, + self.deprecated.max_delay_count => self.store.max_delay_count if provided, + self.deprecated.max_processing_count => self.store.max_processing_count if provided, + self.deprecated.max_processing_attempts => self.store.max_processing_attempts if provided, + self.deprecated.processing_deadline_grace_sec => self.store.processing_deadline_grace_sec if provided + }; } /// Normalize the legacy single-topic config into the new multi-topic From 82a9fe62df683718cbdeb19411322374aff9139b Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 18 Jun 2026 13:42:37 -0700 Subject: [PATCH 2/4] Fix Spaces in Key --- src/config/deprecated.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/config/deprecated.rs b/src/config/deprecated.rs index 5b395ad7..1bd40cee 100644 --- a/src/config/deprecated.rs +++ b/src/config/deprecated.rs @@ -35,9 +35,9 @@ macro_rules! map { // An optional deprecated value always wins ($deprecated:expr => some($current_base:ident $(.$current_field:ident)+) if $provided:ident) => { - let key = stringify!($current_base$(.$current_field)+) + let key = concat!(stringify!($current_base), $(".", stringify!($current_field)),+) .strip_prefix("self.") - .unwrap_or(stringify!($current_base$(.$current_field)+)); + .unwrap(); if !$provided(key) { $current_base$(.$current_field)+ = $deprecated.take(); @@ -46,9 +46,9 @@ macro_rules! map { // A plain deprecated value only wins when it's provided ($deprecated:expr => $current_base:ident $(.$current_field:ident)+ if $provided:ident) => { - let key = stringify!($current_base$(.$current_field)+) + let key = concat!(stringify!($current_base), $(".", stringify!($current_field)),+) .strip_prefix("self.") - .unwrap_or(stringify!($current_base$(.$current_field)+)); + .unwrap(); if !$provided(key) { if let Some(v) = $deprecated.take() { From efd595587edb9547072656dd2ab5c95fdca4618c Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 18 Jun 2026 14:10:37 -0700 Subject: [PATCH 3/4] Remove `some` Branch from Macro, Replace w/`as` Clause --- src/config/deprecated.rs | 16 ++++---- src/config/mod.rs | 87 +++++++++++++++++----------------------- 2 files changed, 45 insertions(+), 58 deletions(-) diff --git a/src/config/deprecated.rs b/src/config/deprecated.rs index 1bd40cee..53cb31a3 100644 --- a/src/config/deprecated.rs +++ b/src/config/deprecated.rs @@ -5,13 +5,13 @@ use crate::config::store::DatabaseAdapter; macro_rules! map { () => {}; - // Two or more optional mappings separated by commans + // Two or more transformed mappings separated by commas ( - $deprecated:expr => some($current_base:ident $(.$current_field:ident)+) if $provided:ident, + $deprecated_base:ident $(.$deprecated_field:ident)+ as $mapper:path => $current_base:ident $(.$current_field:ident)+ if $provided:ident, $($rest:tt)+ ) => { crate::config::deprecated::map! { - $deprecated => some($current_base$(.$current_field)+) if $provided + $deprecated_base$(.$deprecated_field)+ as $mapper => $current_base$(.$current_field)+ if $provided }; crate::config::deprecated::map! { @@ -19,7 +19,7 @@ macro_rules! map { }; }; - // Two or more mappings separated by commans + // Two or more mappings separated by commas ( $deprecated:expr => $current_base:ident $(.$current_field:ident)+ if $provided:ident, $($rest:tt)+ @@ -33,14 +33,16 @@ macro_rules! map { }; }; - // An optional deprecated value always wins - ($deprecated:expr => some($current_base:ident $(.$current_field:ident)+) if $provided:ident) => { + // A transformed deprecated value only wins when it's provided + ($deprecated_base:ident $(.$deprecated_field:ident)+ as $mapper:path => $current_base:ident $(.$current_field:ident)+ if $provided:ident) => { let key = concat!(stringify!($current_base), $(".", stringify!($current_field)),+) .strip_prefix("self.") .unwrap(); if !$provided(key) { - $current_base$(.$current_field)+ = $deprecated.take(); + if let Some(v) = $deprecated_base$(.$deprecated_field)+.take() { + $current_base$(.$current_field)+ = $mapper(v); + } } }; diff --git a/src/config/mod.rs b/src/config/mod.rs index efc71460..a5af19ed 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -319,76 +319,61 @@ impl Config { .is_some_and(|metadata| metadata.name != DEFAULT_CONFIG_PROVIDER) }; + // Convert a 'u64' into a duration. + fn duration>(v: T) -> Duration { + Duration::from_millis(v.into()) + } + + // Wrap anything inside `Some`. + fn optional(v: T) -> Option { + Some(v) + } + // Map deprecated fetch configuration options deprecated::map! { - self.deprecated.fetch_threads => self.fetch.threads if provided, - self.deprecated.fetch_batch_size => self.fetch.batch_length if provided + self.deprecated.fetch_threads => self.fetch.threads if provided, + self.deprecated.fetch_batch_size => self.fetch.batch_length if provided, + self.deprecated.fetch_wait_ms as duration => self.fetch.backoff if provided }; - if !provided("fetch.backoff") - && let Some(v) = self.deprecated.fetch_wait_ms - { - self.fetch.backoff = Duration::from_millis(v); - } - // Map deprecated push configuration options deprecated::map! { - self.deprecated.push_threads => self.push.threads if provided, - self.deprecated.push_queue_size => self.push.queue.size if provided, - self.deprecated.batch_push_updates => self.push.update.batched if provided, - self.deprecated.push_update_batch_size => self.push.update.batch.length if provided + self.deprecated.push_threads => self.push.threads if provided, + self.deprecated.push_queue_size => self.push.queue.size if provided, + self.deprecated.batch_push_updates => self.push.update.batched if provided, + self.deprecated.push_update_batch_size => self.push.update.batch.length if provided, + self.deprecated.push_timeout_ms as duration => self.push.timeout if provided, + self.deprecated.push_queue_timeout_ms as duration => self.push.queue.timeout if provided, + self.deprecated.push_update_interval_ms as duration => self.push.update.batch.interval if provided }; - if !provided("push.timeout") - && let Some(v) = self.deprecated.push_timeout_ms - { - self.push.timeout = Duration::from_millis(v); - } - - if !provided("push.queue.timeout") - && let Some(v) = self.deprecated.push_queue_timeout_ms - { - self.push.queue.timeout = Duration::from_millis(v); - } - - if !provided("push.update.batch.interval") - && let Some(v) = self.deprecated.push_update_interval_ms - { - self.push.update.batch.interval = Duration::from_millis(v.into()); - } - // Map deprecated Postgres configuration options deprecated::map! { - self.deprecated.run_migrations => self.store.pg.run_migrations if provided, - self.deprecated.pg_host => self.store.pg.host if provided, - self.deprecated.pg_port => self.store.pg.port if provided, - self.deprecated.pg_ddl_username => self.store.pg.ddl_username if provided, - self.deprecated.pg_username => self.store.pg.username if provided, - self.deprecated.pg_password => self.store.pg.password if provided, - self.deprecated.pg_ddl_password => self.store.pg.ddl_password if provided, - self.deprecated.pg_database_name => self.store.pg.database_name if provided, - self.deprecated.pg_default_database_name => self.store.pg.default_database_name if provided, - self.deprecated.pg_extra_query_params => some(self.store.pg.query_params) if provided + self.deprecated.run_migrations => self.store.pg.run_migrations if provided, + self.deprecated.pg_host => self.store.pg.host if provided, + self.deprecated.pg_port => self.store.pg.port if provided, + self.deprecated.pg_ddl_username => self.store.pg.ddl_username if provided, + self.deprecated.pg_username => self.store.pg.username if provided, + self.deprecated.pg_password => self.store.pg.password if provided, + self.deprecated.pg_ddl_password => self.store.pg.ddl_password if provided, + self.deprecated.pg_database_name => self.store.pg.database_name if provided, + self.deprecated.pg_default_database_name => self.store.pg.default_database_name if provided, + self.deprecated.pg_extra_query_params as optional => self.store.pg.query_params if provided }; // Map deprecated SQLite configuration options deprecated::map! { - self.deprecated.db_path => self.store.sqlite.path if provided, - self.deprecated.vacuum_page_count => some(self.store.sqlite.vacuum_page_count) if provided, - self.deprecated.enable_sqlite_status_metrics => self.store.sqlite.enable_status_metrics if provided + self.deprecated.db_path => self.store.sqlite.path if provided, + self.deprecated.vacuum_page_count as optional => self.store.sqlite.vacuum_page_count if provided, + self.deprecated.enable_sqlite_status_metrics => self.store.sqlite.enable_status_metrics if provided }; // Map deprecated retry configuration options deprecated::map! { - self.deprecated.db_query_max_retries => self.store.retry.max_retries if provided + self.deprecated.db_query_max_retries => self.store.retry.max_retries if provided, + self.deprecated.db_query_retry_delay_ms as duration => self.store.retry.delay if provided }; - if !provided("store.retry.delay") - && let Some(v) = self.deprecated.db_query_retry_delay_ms - { - self.store.retry.delay = Duration::from_millis(v); - } - // Map deprecated store configuration options deprecated::map! { self.deprecated.database_adapter => self.store.adapter if provided, @@ -396,7 +381,7 @@ impl Config { self.deprecated.db_insert_batch_max_len => self.store.insert_batch_max_length if provided, self.deprecated.db_insert_batch_max_size => self.store.insert_batch_max_bytes if provided, self.deprecated.db_insert_batch_max_time_ms => self.store.insert_batch_max_time_ms if provided, - self.deprecated.db_max_size => some(self.store.max_size) if provided, + self.deprecated.db_max_size as optional => self.store.max_size if provided, self.deprecated.max_pending_count => self.store.max_pending_count if provided, self.deprecated.max_delay_count => self.store.max_delay_count if provided, self.deprecated.max_processing_count => self.store.max_processing_count if provided, From 228739838038c9bc3780809d2433c72cb7450a77 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 18 Jun 2026 14:19:14 -0700 Subject: [PATCH 4/4] Move Validators (Forgot) --- src/config/fetch.rs | 14 +++----------- src/config/mod.rs | 1 + src/config/push.rs | 16 ++++------------ src/config/validate.rs | 20 ++++++++++++++++++++ 4 files changed, 28 insertions(+), 23 deletions(-) create mode 100644 src/config/validate.rs diff --git a/src/config/fetch.rs b/src/config/fetch.rs index 26d22059..30ed4b63 100644 --- a/src/config/fetch.rs +++ b/src/config/fetch.rs @@ -1,23 +1,15 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; -use validator::{Validate, ValidationError}; +use validator::Validate; +use crate::config::validate; use crate::fetch::MAX_FETCH_THREADS; -// (TODO) Create a `validate` module to keep all of our custom validators (there are now at least two). -fn validate_power_of_two(n: usize) -> Result<(), ValidationError> { - if n.is_power_of_two() { - Ok(()) - } else { - Err(ValidationError::new("not_power_of_two")) - } -} - #[derive(PartialEq, Debug, Deserialize, Serialize, Validate)] pub struct FetchConfig { /// The number of concurrent fetch loops in push mode, which should be ≤ `MAX_FETCH_THREADS` and a power of two. - #[validate(range(min = 1, max = MAX_FETCH_THREADS), custom(function = "validate_power_of_two"))] + #[validate(range(min = 1, max = MAX_FETCH_THREADS), custom(function = "validate::power_of_two"))] pub threads: usize, /// Time in milliseconds to wait between fetch attempts when no pending activation is found. diff --git a/src/config/mod.rs b/src/config/mod.rs index a5af19ed..e5e8a414 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -24,6 +24,7 @@ pub mod kafka; pub mod push; pub mod raw; pub mod store; +pub mod validate; use deprecated::DeprecatedConfig; use kafka::{ClusterConfig, TopicConfig}; diff --git a/src/config/push.rs b/src/config/push.rs index 45c909c9..ddcc4c73 100644 --- a/src/config/push.rs +++ b/src/config/push.rs @@ -1,18 +1,10 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; -use validator::{Validate, ValidationError}; +use validator::Validate; use crate::config::batch::BatchConfig; - -// (TODO) Create a `validate` module to keep all of our custom validators (there are now at least two). -fn validate_nonzero_duration(duration: &Duration) -> Result<(), ValidationError> { - if duration.is_zero() { - Err(ValidationError::new("nonzero_duration")) - } else { - Ok(()) - } -} +use crate::config::validate; #[derive(PartialEq, Debug, Deserialize, Serialize, Validate)] pub struct PushQueueConfig { @@ -22,7 +14,7 @@ pub struct PushQueueConfig { /// Maximum time to wait when submitting an activation to the push pool. #[serde(with = "crate::serde::duration")] - #[validate(custom(function = "validate_nonzero_duration"))] + #[validate(custom(function = "validate::nonzero_duration"))] pub timeout: Duration, } @@ -66,7 +58,7 @@ pub struct PushConfig { /// Maximum time for a single push RPC to the worker service. This should be greater than the worker's internal timeout. #[serde(with = "crate::serde::duration")] - #[validate(custom(function = "validate_nonzero_duration"))] + #[validate(custom(function = "validate::nonzero_duration"))] pub timeout: Duration, /// The push queue configuration. diff --git a/src/config/validate.rs b/src/config/validate.rs new file mode 100644 index 00000000..d9240154 --- /dev/null +++ b/src/config/validate.rs @@ -0,0 +1,20 @@ +use std::time::Duration; +use validator::ValidationError; + +/// Ensure `n` is a power of two. Used to validate fetch thread count. +pub fn power_of_two(n: usize) -> Result<(), ValidationError> { + if n.is_power_of_two() { + Ok(()) + } else { + Err(ValidationError::new("not_power_of_two")) + } +} + +/// Ensure duration is greater than zero to avoid crashes when initializing intervals. +pub fn nonzero_duration(duration: &Duration) -> Result<(), ValidationError> { + if duration.is_zero() { + Err(ValidationError::new("nonzero_duration")) + } else { + Ok(()) + } +}