Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 48 additions & 6 deletions src/config/deprecated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,57 @@ use crate::config::store::DatabaseAdapter;
macro_rules! map {
() => {};

// An optional deprecated value always wins
($deprecated:expr => some($current:expr)) => {
$current = $deprecated.take();
// Two or more transformed mappings separated by commas
(
$deprecated_base:ident $(.$deprecated_field:ident)+ as $mapper:path => $current_base:ident $(.$current_field:ident)+ if $provided:ident,
$($rest:tt)+
) => {
crate::config::deprecated::map! {
$deprecated_base$(.$deprecated_field)+ as $mapper => $current_base$(.$current_field)+ if $provided
};

crate::config::deprecated::map! {
$($rest)+
};
};

// Two or more mappings separated by commas
(
$deprecated:expr => $current_base:ident $(.$current_field:ident)+ if $provided:ident,
$($rest:tt)+
) => {
crate::config::deprecated::map! {
$deprecated => $current_base$(.$current_field)+ if $provided
};

crate::config::deprecated::map! {
$($rest)+
};
};

// A transformed deprecated value only wins when it's provided
($deprecated_base:ident $(.$deprecated_field:ident)+ as $mapper:path => $current_base:ident $(.$current_field:ident)+ if $provided:ident) => {
let key = concat!(stringify!($current_base), $(".", stringify!($current_field)),+)
.strip_prefix("self.")
.unwrap();

if !$provided(key) {
if let Some(v) = $deprecated_base$(.$deprecated_field)+.take() {
$current_base$(.$current_field)+ = $mapper(v);
}
}
};

// A plain deprecated value only wins when it's provided
($deprecated:expr => $current:expr) => {
if let Some(v) = $deprecated.take() {
$current = v;
($deprecated:expr => $current_base:ident $(.$current_field:ident)+ if $provided:ident) => {
let key = concat!(stringify!($current_base), $(".", stringify!($current_field)),+)
.strip_prefix("self.")
.unwrap();

if !$provided(key) {
if let Some(v) = $deprecated.take() {
$current_base$(.$current_field)+ = v;
}
}
};
}
Expand Down
14 changes: 3 additions & 11 deletions src/config/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@
use std::time::Duration;

use serde::{Deserialize, Serialize};
use validator::{Validate, ValidationError};
use validator::Validate;

use crate::config::validate;
use crate::fetch::MAX_FETCH_THREADS;

// (TODO) Create a `validate` module to keep all of our custom validators (there are now at least two).
fn validate_power_of_two(n: usize) -> Result<(), ValidationError> {
if n.is_power_of_two() {
Ok(())
} else {
Err(ValidationError::new("not_power_of_two"))
}
}

#[derive(PartialEq, Debug, Deserialize, Serialize, Validate)]
pub struct FetchConfig {
/// The number of concurrent fetch loops in push mode, which should be ≤ `MAX_FETCH_THREADS` and a power of two.
#[validate(range(min = 1, max = MAX_FETCH_THREADS), custom(function = "validate_power_of_two"))]
#[validate(range(min = 1, max = MAX_FETCH_THREADS), custom(function = "validate::power_of_two"))]
pub threads: usize,

/// Time in milliseconds to wait between fetch attempts when no pending activation is found.
Expand Down
268 changes: 59 additions & 209 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod kafka;
pub mod push;
pub mod raw;
pub mod store;
pub mod validate;

use deprecated::DeprecatedConfig;
use kafka::{ClusterConfig, TopicConfig};
Expand Down Expand Up @@ -313,232 +314,81 @@ impl Config {
/// - If the user does not provide `store.db_max_size`, use the deprecated field `db_max_size` instead
fn map_deprecated_options(&mut self, builder: &mut Figment) {
// Nested function definition since it's not used anywhere else
fn user_provided(builder: &Figment, key: &str) -> bool {
let provided = |key: &str| {
builder
.find_metadata(key)
.is_some_and(|metadata| metadata.name != DEFAULT_CONFIG_PROVIDER)
}

// Map deprecated push mode configuration options
if !user_provided(builder, "fetch.threads") {
deprecated::map! {
self.deprecated.fetch_threads => self.fetch.threads
};
}

if !user_provided(builder, "fetch.backoff")
&& let Some(v) = self.deprecated.fetch_wait_ms
{
self.fetch.backoff = Duration::from_millis(v);
}

if !user_provided(builder, "fetch.batch_length") {
deprecated::map! {
self.deprecated.fetch_batch_size => self.fetch.batch_length
};
}

if !user_provided(builder, "push.threads") {
deprecated::map! {
self.deprecated.push_threads => self.push.threads
};
}

if !user_provided(builder, "push.timeout")
&& let Some(v) = self.deprecated.push_timeout_ms
{
self.push.timeout = Duration::from_millis(v);
}

if !user_provided(builder, "push.queue.size") {
deprecated::map! {
self.deprecated.push_queue_size => self.push.queue.size
};
}
};

if !user_provided(builder, "push.queue.timeout")
&& let Some(v) = self.deprecated.push_queue_timeout_ms
{
self.push.queue.timeout = Duration::from_millis(v);
// Convert a 'u64' into a duration.
fn duration<T: Into<u64>>(v: T) -> Duration {
Duration::from_millis(v.into())
}

if !user_provided(builder, "push.update.batched") {
deprecated::map! {
self.deprecated.batch_push_updates => self.push.update.batched
};
// Wrap anything inside `Some`.
fn optional<T>(v: T) -> Option<T> {
Some(v)
}

if !user_provided(builder, "push.update.batch.length") {
deprecated::map! {
self.deprecated.push_update_batch_size => self.push.update.batch.length
};
}
// Map deprecated fetch configuration options
deprecated::map! {
self.deprecated.fetch_threads => self.fetch.threads if provided,
self.deprecated.fetch_batch_size => self.fetch.batch_length if provided,
self.deprecated.fetch_wait_ms as duration => self.fetch.backoff if provided
};

if !user_provided(builder, "push.update.batch.interval")
&& let Some(v) = self.deprecated.push_update_interval_ms
{
self.push.update.batch.interval = Duration::from_millis(v.into());
}
// Map deprecated push configuration options
deprecated::map! {
self.deprecated.push_threads => self.push.threads if provided,
self.deprecated.push_queue_size => self.push.queue.size if provided,
self.deprecated.batch_push_updates => self.push.update.batched if provided,
self.deprecated.push_update_batch_size => self.push.update.batch.length if provided,
self.deprecated.push_timeout_ms as duration => self.push.timeout if provided,
self.deprecated.push_queue_timeout_ms as duration => self.push.queue.timeout if provided,
self.deprecated.push_update_interval_ms as duration => self.push.update.batch.interval if provided
};

// Map deprecated Postgres configuration options
if !user_provided(builder, "store.pg.run_migrations") {
deprecated::map! {
self.deprecated.run_migrations => self.store.pg.run_migrations
};
}

if !user_provided(builder, "store.pg.host") {
deprecated::map! {
self.deprecated.pg_host => self.store.pg.host
};
}

if !user_provided(builder, "store.pg.port") {
deprecated::map! {
self.deprecated.pg_port => self.store.pg.port
};
}

if !user_provided(builder, "store.pg.ddl_username") {
deprecated::map! {
self.deprecated.pg_ddl_username => self.store.pg.ddl_username
};
}

if !user_provided(builder, "store.pg.username") {
deprecated::map! {
self.deprecated.pg_username => self.store.pg.username
};
}

if !user_provided(builder, "store.pg.password") {
deprecated::map! {
self.deprecated.pg_password => self.store.pg.password
};
}

if !user_provided(builder, "store.pg.ddl_password") {
deprecated::map! {
self.deprecated.pg_ddl_password => self.store.pg.ddl_password
};
}

if !user_provided(builder, "store.pg.database_name") {
deprecated::map! {
self.deprecated.pg_database_name => self.store.pg.database_name
};
}

if !user_provided(builder, "store.pg.default_database_name") {
deprecated::map! {
self.deprecated.pg_default_database_name => self.store.pg.default_database_name
};
}

if !user_provided(builder, "store.pg.query_params") {
deprecated::map! {
self.deprecated.pg_extra_query_params => some(self.store.pg.query_params)
};
}
deprecated::map! {
self.deprecated.run_migrations => self.store.pg.run_migrations if provided,
self.deprecated.pg_host => self.store.pg.host if provided,
self.deprecated.pg_port => self.store.pg.port if provided,
self.deprecated.pg_ddl_username => self.store.pg.ddl_username if provided,
self.deprecated.pg_username => self.store.pg.username if provided,
self.deprecated.pg_password => self.store.pg.password if provided,
self.deprecated.pg_ddl_password => self.store.pg.ddl_password if provided,
self.deprecated.pg_database_name => self.store.pg.database_name if provided,
self.deprecated.pg_default_database_name => self.store.pg.default_database_name if provided,
self.deprecated.pg_extra_query_params as optional => self.store.pg.query_params if provided
};

// Map deprecated SQLite configuration options
if !user_provided(builder, "store.sqlite.path") {
deprecated::map! {
self.deprecated.db_path => self.store.sqlite.path
};
}

if !user_provided(builder, "store.sqlite.vacuum_page_count") {
deprecated::map! {
self.deprecated.vacuum_page_count => some(self.store.sqlite.vacuum_page_count)
};
}

if !user_provided(builder, "store.sqlite.enable_status_metrics") {
deprecated::map! {
self.deprecated.enable_sqlite_status_metrics => self.store.sqlite.enable_status_metrics
};
}
deprecated::map! {
self.deprecated.db_path => self.store.sqlite.path if provided,
self.deprecated.vacuum_page_count as optional => self.store.sqlite.vacuum_page_count if provided,
self.deprecated.enable_sqlite_status_metrics => self.store.sqlite.enable_status_metrics if provided
};

// Map deprecated retry configuration options
if !user_provided(builder, "store.retry.max_retries") {
deprecated::map! {
self.deprecated.db_query_max_retries => self.store.retry.max_retries
};
}

if !user_provided(builder, "store.retry.delay")
&& let Some(v) = self.deprecated.db_query_retry_delay_ms
{
self.store.retry.delay = Duration::from_millis(v);
}
deprecated::map! {
self.deprecated.db_query_max_retries => self.store.retry.max_retries if provided,
self.deprecated.db_query_retry_delay_ms as duration => self.store.retry.delay if provided
};

// Map deprecated store configuration options
if !user_provided(builder, "store.adapter") {
deprecated::map! {
self.deprecated.database_adapter => self.store.adapter
};
}

if !user_provided(builder, "store.insert_failure_backoff_ms") {
deprecated::map! {
self.deprecated.db_write_failure_backoff_ms => self.store.insert_failure_backoff_ms
};
}

if !user_provided(builder, "store.insert_batch_max_length") {
deprecated::map! {
self.deprecated.db_insert_batch_max_len => self.store.insert_batch_max_length
};
}

if !user_provided(builder, "store.insert_batch_max_bytes") {
deprecated::map! {
self.deprecated.db_insert_batch_max_size => self.store.insert_batch_max_bytes
};
}

if !user_provided(builder, "store.insert_batch_max_time_ms") {
deprecated::map! {
self.deprecated.db_insert_batch_max_time_ms => self.store.insert_batch_max_time_ms
};
}

if !user_provided(builder, "store.max_size") {
deprecated::map! {
self.deprecated.db_max_size => some(self.store.max_size)
};
}

if !user_provided(builder, "store.max_pending_count") {
deprecated::map! {
self.deprecated.max_pending_count => self.store.max_pending_count
};
}

if !user_provided(builder, "store.max_delay_count") {
deprecated::map! {
self.deprecated.max_delay_count => self.store.max_delay_count
};
}

if !user_provided(builder, "store.max_processing_count") {
deprecated::map! {
self.deprecated.max_processing_count => self.store.max_processing_count
};
}

if !user_provided(builder, "store.max_processing_attempts") {
deprecated::map! {
self.deprecated.max_processing_attempts => self.store.max_processing_attempts
};
}

if !user_provided(builder, "store.processing_deadline_grace_sec") {
deprecated::map! {
self.deprecated.processing_deadline_grace_sec => self.store.processing_deadline_grace_sec
};
}
deprecated::map! {
self.deprecated.database_adapter => self.store.adapter if provided,
self.deprecated.db_write_failure_backoff_ms => self.store.insert_failure_backoff_ms if provided,
self.deprecated.db_insert_batch_max_len => self.store.insert_batch_max_length if provided,
self.deprecated.db_insert_batch_max_size => self.store.insert_batch_max_bytes if provided,
self.deprecated.db_insert_batch_max_time_ms => self.store.insert_batch_max_time_ms if provided,
self.deprecated.db_max_size as optional => self.store.max_size if provided,
self.deprecated.max_pending_count => self.store.max_pending_count if provided,
self.deprecated.max_delay_count => self.store.max_delay_count if provided,
self.deprecated.max_processing_count => self.store.max_processing_count if provided,
self.deprecated.max_processing_attempts => self.store.max_processing_attempts if provided,
self.deprecated.processing_deadline_grace_sec => self.store.processing_deadline_grace_sec if provided
};
}

/// Normalize the legacy single-topic config into the new multi-topic
Expand Down
Loading
Loading