Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,6 @@ pub struct IndexerConfig {
pub enable_cooperative_indexing: bool,
#[serde(default = "IndexerConfig::default_cpu_capacity")]
pub cpu_capacity: CpuCapacity,
/// When true, the compactor service is not implicitly started on indexer
/// nodes. Dedicated compactor nodes must be deployed separately.
/// When false (default), every indexer node also runs the compactor.
#[serde(default = "IndexerConfig::default_enable_standalone_compactors")]
pub enable_standalone_compactors: bool,
/// If true, run Parquet merges through the streaming column-major engine
/// (`execute_merge_operation`). If false (default), use the in-memory
/// `merge_sorted_parquet_files` engine. The legacy in-memory engine is
Expand Down Expand Up @@ -216,10 +211,6 @@ impl IndexerConfig {
CpuCapacity::one_cpu_thread() * (quickwit_common::num_cpus() as u32)
}

fn default_enable_standalone_compactors() -> bool {
false
}

fn default_parquet_merge_use_streaming_engine() -> bool {
false
}
Expand All @@ -236,7 +227,6 @@ impl IndexerConfig {
cpu_capacity: PIPELINE_FULL_CAPACITY * 4u32,
max_merge_write_throughput: None,
merge_concurrency: NonZeroUsize::new(3).unwrap(),
enable_standalone_compactors: false,
parquet_merge_use_streaming_engine: Self::default_parquet_merge_use_streaming_engine(),
};
Ok(indexer_config)
Expand All @@ -254,7 +244,6 @@ impl Default for IndexerConfig {
cpu_capacity: Self::default_cpu_capacity(),
merge_concurrency: Self::default_merge_concurrency(),
max_merge_write_throughput: None,
enable_standalone_compactors: Self::default_enable_standalone_compactors(),
parquet_merge_use_streaming_engine: Self::default_parquet_merge_use_streaming_engine(),
}
}
Expand Down Expand Up @@ -861,6 +850,8 @@ pub struct NodeConfig {
pub ingest_api_config: IngestApiConfig,
pub jaeger_config: JaegerConfig,
pub compactor_config: CompactorConfig,
#[serde(skip_serializing)]
pub enable_standalone_compactors: bool,
}

impl NodeConfig {
Expand Down Expand Up @@ -925,6 +916,14 @@ impl NodeConfig {
pub fn for_test_from_ports(rest_listen_port: u16, grpc_listen_port: u16) -> Self {
serialize::node_config_for_tests_from_ports(rest_listen_port, grpc_listen_port)
}

/// Test config with `enable_standalone_compactors = true`.
#[cfg(any(test, feature = "testsuite"))]
pub fn for_test_with_standalone_compactors() -> Self {
let mut node_config = Self::for_test();
node_config.enable_standalone_compactors = true;
node_config
}
}

#[cfg(test)]
Expand Down
51 changes: 47 additions & 4 deletions quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ impl NodeConfigBuilder {
let node_id = self.node_id.resolve(env_vars).map(NodeId::new)?;
let availability_zone = self.availability_zone.resolve_optional(env_vars)?;

self.indexer_config.enable_standalone_compactors =
self.enable_standalone_compactors.resolve(env_vars)?;
let enable_standalone_compactors = self.enable_standalone_compactors.resolve(env_vars)?;

let enabled_services: HashSet<QuickwitService> = self
.enabled_services
Expand Down Expand Up @@ -343,6 +342,7 @@ impl NodeConfigBuilder {
ingest_api_config: self.ingest_api_config,
jaeger_config: self.jaeger_config,
compactor_config: self.compactor_config,
enable_standalone_compactors,
};

validate(&node_config)?;
Expand All @@ -361,7 +361,7 @@ fn validate(node_config: &NodeConfig) -> anyhow::Result<()> {
warn!("peer seeds are empty");
}
if node_config.is_service_enabled(QuickwitService::Compactor)
&& !node_config.indexer_config.enable_standalone_compactors
&& !node_config.enable_standalone_compactors
{
bail!(
"the `compactor` service can only be enabled when `enable_standalone_compactors` is \
Expand Down Expand Up @@ -552,6 +552,7 @@ pub fn node_config_for_tests_from_ports(
ingest_api_config: IngestApiConfig::default(),
jaeger_config: JaegerConfig::default(),
compactor_config: CompactorConfig::default(),
enable_standalone_compactors: false,
}
}

Expand Down Expand Up @@ -681,7 +682,6 @@ mod tests {
cpu_capacity: IndexerConfig::default_cpu_capacity(),
enable_cooperative_indexing: false,
max_merge_write_throughput: Some(ByteSize::mb(100)),
enable_standalone_compactors: false,
parquet_merge_use_streaming_engine: true,
}
);
Expand Down Expand Up @@ -977,6 +977,49 @@ mod tests {
.unwrap();
}

#[tokio::test]
async fn test_compactor_service_requires_standalone_flag() {
// Compactor service enabled while `enable_standalone_compactors` is off is an
// invalid combination: the default indexer-local merge pipeline is in use, so a
// dedicated compactor must not run.
let error = NodeConfigBuilder {
enabled_services: ConfigValue::for_test(List(vec![
"indexer".to_string(),
"compactor".to_string(),
])),
..Default::default()
}
.build_and_validate(&HashMap::new())
.await
.unwrap_err();
assert!(
error.to_string().contains("enable_standalone_compactors"),
"expected error to mention `enable_standalone_compactors`, got: {error}",
);
}

#[tokio::test]
async fn test_compactor_service_with_standalone_flag_validates() {
// All services enabled, including the compactor, with the standalone flag on
// is a valid configuration.
let node_config = NodeConfigBuilder {
enabled_services: ConfigValue::for_test(List(vec![
"control_plane".to_string(),
"indexer".to_string(),
"searcher".to_string(),
"janitor".to_string(),
"metastore".to_string(),
"compactor".to_string(),
])),
enable_standalone_compactors: ConfigValue::for_test(true),
..Default::default()
}
.build_and_validate(&HashMap::new())
.await
.unwrap();
assert!(node_config.is_service_enabled(QuickwitService::Compactor));
}

#[tokio::test]
async fn test_peer_socket_addrs() {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ impl TestNodeConfig {
tcp_listener_resolver.add_listener(grpc_tcp_listener).await;
config.indexer_config.enable_otlp_endpoint = self.enable_otlp;
config.enabled_services.clone_from(&self.services);
if config.enabled_services.contains(&QuickwitService::Indexer)
&& !config.indexer_config.enable_standalone_compactors
{
config.enabled_services.insert(QuickwitService::Compactor);
}
config.jaeger_config.enable_endpoint = true;
config.cluster_id.clone_from(&cluster_id);
config.node_id = NodeId::new(format!("test-node-{node_idx}"));
Expand Down
44 changes: 0 additions & 44 deletions quickwit/quickwit-integration-tests/src/tests/basic_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,47 +162,3 @@ async fn test_multi_nodes_cluster() {

sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_indexer_implicitly_runs_compactor() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is that test being removed?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, we were going to force standalone compactors to be false (meaning indexers still had to do merges). To still get merges, we were going to have indexer nodes run both the indexer and compactor service. We decided since then to not go that route, and indexers should no longer implicitly run the compactor service as a result.

quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandboxBuilder::default()
.add_node([QuickwitService::Searcher])
.add_node([QuickwitService::Metastore])
.add_node([QuickwitService::ControlPlane])
.add_node([QuickwitService::Janitor])
.add_node([QuickwitService::Indexer])
.build_and_start()
.await;

let cluster_snapshot = sandbox
.rest_client(QuickwitService::Searcher)
.cluster()
.snapshot()
.await
.unwrap();
assert_eq!(cluster_snapshot.ready_nodes.len(), 5);

// The indexer node should also advertise the compactor service.
let indexer_node = cluster_snapshot
.chitchat_state_snapshot
.node_states
.iter()
.find(|node_state| {
node_state
.get("enabled_services")
.map(|s| s.contains("indexer"))
.unwrap_or(false)
})
.expect("indexer node not found in cluster state");
let services = indexer_node.get("enabled_services").unwrap();
assert!(
services.contains("compactor"),
"indexer node should implicitly run the compactor, got: {services}"
);

sandbox.shutdown().await.unwrap();
}

// TODO: add test_standalone_compactor_node once the test config builder
// resolves QW_ENABLE_STANDALONE_COMPACTORS from the environment.
23 changes: 10 additions & 13 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ async fn get_compaction_planner_client_if_needed(
Option<CompactionPlannerServiceClient>,
Option<ActorHandle<CompactionPlanner>>,
)> {
if !node_config.indexer_config.enable_standalone_compactors {
if !node_config.enable_standalone_compactors {
return Ok((None, None));
}
let is_janitor = node_config.is_service_enabled(QuickwitService::Janitor);
Expand Down Expand Up @@ -649,12 +649,11 @@ pub async fn serve_quickwit(
let indexing_service_opt = if node_config.is_service_enabled(QuickwitService::Indexer) {
// if standalone compactors is enabled, indexing pipelines don't perform any merges.
// if standalone compactors is disabled, indexing pipelines perform all merges as before.
let merge_scheduler_mailbox_opt =
if !node_config.indexer_config.enable_standalone_compactors {
Some(spawn_merge_scheduler_service(&universe, &node_config))
} else {
None
};
let merge_scheduler_mailbox_opt = if !node_config.enable_standalone_compactors {
Some(spawn_merge_scheduler_service(&universe, &node_config))
} else {
None
};

let split_cache = indexing_split_cache.clone();
let indexing_service = start_indexing_service(
Expand Down Expand Up @@ -852,7 +851,7 @@ pub async fn serve_quickwit(
};

let compactor_supervisor_opt = if node_config.is_service_enabled(QuickwitService::Compactor)
&& node_config.indexer_config.enable_standalone_compactors
&& node_config.enable_standalone_compactors
{
let compaction_dir = node_config.data_dir_path.join("compaction");
fs::create_dir_all(&compaction_dir)?;
Expand Down Expand Up @@ -1993,8 +1992,7 @@ mod tests {
let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new());

// Janitor + indexer with standalone compactors enabled: planner client is returned.
let mut node_config = NodeConfig::for_test();
node_config.indexer_config.enable_standalone_compactors = true;
let mut node_config = NodeConfig::for_test_with_standalone_compactors();
node_config.enabled_services =
HashSet::from([QuickwitService::Janitor, QuickwitService::Indexer]);
let (client_opt, handle_opt) =
Expand Down Expand Up @@ -2028,7 +2026,7 @@ mod tests {

// Standalone compactors disabled: short-circuit returns (None, None) regardless of
// which services are enabled.
node_config.indexer_config.enable_standalone_compactors = false;
node_config.enable_standalone_compactors = false;
node_config.enabled_services =
HashSet::from([QuickwitService::Janitor, QuickwitService::Indexer]);
let (client_opt, handle_opt) =
Expand All @@ -2050,8 +2048,7 @@ mod tests {
let universe = Universe::new();
let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new());

let mut node_config = NodeConfig::for_test();
node_config.indexer_config.enable_standalone_compactors = true;
let mut node_config = NodeConfig::for_test_with_standalone_compactors();
node_config.enabled_services =
HashSet::from([QuickwitService::Indexer, QuickwitService::Compactor]);
let result =
Expand Down
Loading