Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions quickwit/quickwit-compaction/src/compaction_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use quickwit_doc_mapper::DocMapper;
use quickwit_indexing::actors::{
MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader, UploaderType,
};
use quickwit_indexing::merge_policy::MergeOperation;
use quickwit_indexing::merge_policy::{MergeOperation, MergeSource};
use quickwit_indexing::{IndexingSplitStore, SplitsUpdateMailbox};
use quickwit_metrics::{counter, gauge, histogram, label_values};
use quickwit_proto::indexing::MergePipelineId;
Expand Down Expand Up @@ -351,7 +351,7 @@ impl CompactionPipeline {
self.pipeline_start = Some(now);
// Kick off the pipeline.
merge_split_downloader_mailbox
.try_send_message(self.merge_operation.clone())
.try_send_message(MergeSource::Operation(self.merge_operation.clone()))
.map_err(|err| {
anyhow::anyhow!("failed to send merge operation to downloader: {err:?}")
})?;
Expand Down
7 changes: 3 additions & 4 deletions quickwit/quickwit-indexing/failpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use quickwit_common::rand::append_random_suffix;
use quickwit_common::split_file;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_indexing::actors::MergeExecutor;
use quickwit_indexing::merge_policy::{MergeOperation, MergeTask};
use quickwit_indexing::merge_policy::{MergeOperation, MergeSource, MergeTask};
use quickwit_indexing::models::MergeScratch;
use quickwit_indexing::{TestSandbox, get_tantivy_directory_from_split_bundle};
use quickwit_metastore::{
Expand Down Expand Up @@ -285,10 +285,9 @@ async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Resul
tantivy_dirs.push(get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap());
}
let merge_operation = MergeOperation::new_merge_operation(split_metadatas);
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation.clone());
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation);
let merge_scratch = MergeScratch {
merge_operation,
merge_task: Some(merge_task),
merge_source: MergeSource::Task(merge_task),
merge_scratch_directory,
downloaded_splits_directory,
tantivy_dirs,
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1398,9 +1398,9 @@ mod tests {
// change whenever IndexingSettings fields are added/removed. Recompute
// by temporarily adding a test that prints
// `indexing_pipeline_params_fingerprint(&index_config, &source_config)`.
const PARAMS_FINGERPRINT_INGEST_API: u64 = 1637744865450232394;
const PARAMS_FINGERPRINT_SOURCE_1: u64 = 1705211905504908791;
const PARAMS_FINGERPRINT_SOURCE_2: u64 = 8706667372658059428;
const PARAMS_FINGERPRINT_INGEST_API: u64 = 7973087274884969148;
const PARAMS_FINGERPRINT_SOURCE_1: u64 = 9420938500552890840;
const PARAMS_FINGERPRINT_SOURCE_2: u64 = 16199199787360162635;

quickwit_common::setup_logging_for_tests();
let transport = ChannelTransport::default();
Expand Down
29 changes: 16 additions & 13 deletions quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use tracing::{debug, error, info, instrument, warn};

use crate::actors::Packager;
use crate::controlled_directory::ControlledDirectory;
use crate::merge_policy::MergeOperationType;
use crate::merge_policy::{MergeOperationType, MergeSource};
use crate::models::{IndexedSplit, IndexedSplitBatch, MergeScratch, PublishLock, SplitAttrs};

#[derive(Clone)]
Expand Down Expand Up @@ -85,20 +85,20 @@ impl Actor for MergeExecutor {
impl Handler<MergeScratch> for MergeExecutor {
type Reply = ();

#[instrument(level = "info", name = "merge_executor", parent = merge_scratch.merge_operation.merge_parent_span.id(), skip_all)]
#[instrument(level = "info", name = "merge_executor", parent = merge_scratch.merge_source.as_operation().merge_parent_span.id(), skip_all)]
async fn handle(
&mut self,
merge_scratch: MergeScratch,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let start = Instant::now();
let MergeScratch {
merge_operation,
merge_task,
merge_source,
tantivy_dirs,
merge_scratch_directory,
..
} = merge_scratch;
let merge_operation = merge_source.as_operation();
// On nodes running the split compaction architecture, merge pipelines are ephemeral, and we
// need to make sure there aren't too many CPU-bound operations occurring concurrently.
let _cpu_permit = match &self.merge_execution_semaphore {
Expand Down Expand Up @@ -164,15 +164,20 @@ impl Handler<MergeScratch> for MergeExecutor {
operation_type = %merge_operation.operation_type,
"merge-operation-success"
);
let batch_parent_span = merge_operation.merge_parent_span.clone();
let merge_task_opt = match merge_source {
MergeSource::Task(task) => Some(task),
MergeSource::Operation(_) => None,
Comment on lines +168 to +170

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep the merge task alive through publish

When this branch forwards a scheduled MergeTask, it eventually lands in SplitsUpdate::merge_task, but the logs publisher destructures SplitsUpdate with .. at quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs:68-75, which drops that field before publish_splits is awaited. For the old merge/delete pipelines, this releases the scheduler permit and removes the operation from the planner/delete inventory before the replacement is actually committed, so a slow or failed publish can let another merge be scheduled and make known_split_ids GC treat the input splits as no longer in flight. Hold/drop the task only after publish succeeds, as the parquet publisher does, if this forwarding is meant to preserve merge-task lifetime.

Useful? React with 👍 / 👎.

};
ctx.send_message(
&self.merge_packager_mailbox,
IndexedSplitBatch {
splits: vec![indexed_split],
checkpoint_delta_opt: Default::default(),
publish_lock: PublishLock::default(),
publish_token_opt: None,
batch_parent_span: merge_operation.merge_parent_span.clone(),
merge_task_opt: merge_task,
batch_parent_span,
merge_task_opt,
},
)
.await?;
Expand Down Expand Up @@ -615,7 +620,7 @@ mod tests {
use tantivy::{Document, ReloadPolicy, TantivyDocument};

use super::*;
use crate::merge_policy::{MergeOperation, MergeTask};
use crate::merge_policy::{MergeOperation, MergeSource, MergeTask};
use crate::{TestSandbox, get_tantivy_directory_from_split_bundle, new_split_id};

#[tokio::test]
Expand Down Expand Up @@ -664,10 +669,9 @@ mod tests {
tantivy_dirs.push(get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap())
}
let merge_operation = MergeOperation::new_merge_operation(split_metas);
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation.clone());
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation);
let merge_scratch = MergeScratch {
merge_operation,
merge_task: Some(merge_task),
merge_source: MergeSource::Task(merge_task),
tantivy_dirs,
merge_scratch_directory,
downloaded_splits_directory,
Expand Down Expand Up @@ -810,10 +814,9 @@ mod tests {
.await?;
let tantivy_dir = get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap();
let merge_operation = MergeOperation::new_delete_and_merge_operation(new_split_metadata);
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation.clone());
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation);
let merge_scratch = MergeScratch {
merge_operation,
merge_task: Some(merge_task),
merge_source: MergeSource::Task(merge_task),
tantivy_dirs: vec![tantivy_dir],
merge_scratch_directory,
downloaded_splits_directory,
Expand Down
32 changes: 18 additions & 14 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ mod tests {

use crate::actors::MergePlanner;
use crate::merge_policy::{
MergePolicy, MergeTask, StableLogMergePolicy, merge_policy_from_settings,
MergePolicy, MergeSource, StableLogMergePolicy, merge_policy_from_settings,
};
use crate::models::NewSplits;

Expand Down Expand Up @@ -481,36 +481,40 @@ mod tests {
};
merge_planner_mailbox.send_message(message).await?;
merge_planner_handle.process_pending_and_observe().await;
let operations = merge_split_downloader_inbox.drain_for_test_typed::<MergeTask>();
let operations = merge_split_downloader_inbox.drain_for_test_typed::<MergeSource>();
assert_eq!(operations.len(), 3);
let mut merge_operations = operations
.into_iter()
.sorted_by_key(|op| (op.splits[0].partition_id, op.splits[0].doc_mapping_uid));
let mut merge_operations = operations.into_iter().sorted_by_key(|op| {
let op = op.as_operation();
(op.splits[0].partition_id, op.splits[0].doc_mapping_uid)
});

let first_merge_operation = merge_operations.next().unwrap();
assert_eq!(first_merge_operation.splits.len(), 4);
assert_eq!(first_merge_operation.as_operation().splits.len(), 4);
assert!(
first_merge_operation
.as_operation()
.splits
.iter()
.all(|split| split.partition_id == 1
&& split.doc_mapping_uid == doc_mapping_uid1)
);

let second_merge_operation = merge_operations.next().unwrap();
assert_eq!(second_merge_operation.splits.len(), 3);
assert_eq!(second_merge_operation.as_operation().splits.len(), 3);
assert!(
second_merge_operation
.as_operation()
.splits
.iter()
.all(|split| split.partition_id == 1
&& split.doc_mapping_uid == doc_mapping_uid2)
);

let third_merge_operation = merge_operations.next().unwrap();
assert_eq!(third_merge_operation.splits.len(), 3);
assert_eq!(third_merge_operation.as_operation().splits.len(), 3);
assert!(
third_merge_operation
.as_operation()
.splits
.iter()
.all(|split| split.partition_id == 2
Expand Down Expand Up @@ -580,7 +584,7 @@ mod tests {
// We wait for the first merge ops. If we sent the Quit message right away, it would have
// been queue before first `PlanMerge` message.
let merge_task_res = merge_split_downloader_inbox
.recv_typed_message::<MergeTask>()
.recv_typed_message::<MergeSource>()
.await;
assert!(merge_task_res.is_ok());

Expand All @@ -594,15 +598,15 @@ mod tests {

let _ = merge_planner_handle.process_pending_and_observe().await;

let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::<MergeTask>();
let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::<MergeSource>();

assert!(merge_ops.is_empty());

merge_planner_mailbox.send_message(Command::Quit).await?;

let (exit_status, _last_state) = merge_planner_handle.join().await;
assert!(matches!(exit_status, ActorExitStatus::Quit));
let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::<MergeTask>();
let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::<MergeSource>();
assert!(merge_ops.is_empty());
universe.assert_quit().await;
Ok(())
Expand Down Expand Up @@ -672,7 +676,7 @@ mod tests {
merge_planner_mailbox.send_message(Command::Quit).await?;
let (exit_status, _last_state) = merge_planner_handle.join().await;
assert!(matches!(exit_status, ActorExitStatus::Quit));
let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::<MergeTask>();
let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::<MergeSource>();

assert!(merge_tasks.is_empty());
universe.assert_quit().await;
Expand Down Expand Up @@ -750,7 +754,7 @@ mod tests {

// Instead, we wait for the first merge ops.
let merge_task_res = merge_split_downloader_inbox
.recv_typed_message::<MergeTask>()
.recv_typed_message::<MergeSource>()
.await;
assert!(merge_task_res.is_ok());

Expand All @@ -759,7 +763,7 @@ mod tests {
let (exit_status, _last_state) = merge_planner_handle.join().await;

assert!(matches!(exit_status, ActorExitStatus::Quit));
let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::<MergeTask>();
let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::<MergeSource>();
assert!(merge_tasks.is_empty());

universe.assert_quit().await;
Expand Down
38 changes: 19 additions & 19 deletions quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::error;
use super::MergeSplitDownloader;
#[cfg(feature = "metrics")]
use super::parquet_pipeline::{ParquetMergeSplitDownloader, ParquetMergeTask};
use crate::merge_policy::{MergeOperation, MergeTask, compute_merge_score};
use crate::merge_policy::{MergeOperation, MergeSource, MergeTask, compute_merge_score};
use crate::metrics::{ONGOING_MERGE_OPERATIONS, PENDING_MERGE_BYTES, PENDING_MERGE_OPERATIONS};

pub struct MergePermit {
Expand Down Expand Up @@ -229,7 +229,7 @@ impl MergeSchedulerService {
self.pending_merge_bytes -= merge_task.merge_operation.total_num_bytes();
PENDING_MERGE_OPERATIONS.set(self.pending_merge_queue.len() as f64);
PENDING_MERGE_BYTES.set(self.pending_merge_bytes as f64);
match split_downloader_mailbox.try_send_message(merge_task) {
match split_downloader_mailbox.try_send_message(MergeSource::Task(merge_task)) {
Ok(_) => {}
Err(quickwit_actors::TrySendError::Full(_)) => {
// The split downloader mailbox has an unbounded queue capacity,
Expand Down Expand Up @@ -452,7 +452,7 @@ mod tests {
use tokio::time::timeout;

use super::*;
use crate::merge_policy::{MergeOperation, MergeTask};
use crate::merge_policy::{MergeOperation, MergeSource};

fn build_merge_operation(num_splits: usize, num_bytes_per_split: u64) -> MergeOperation {
let splits: Vec<SplitMetadata> = std::iter::repeat_with(|| SplitMetadata {
Expand Down Expand Up @@ -530,58 +530,58 @@ mod tests {
.unwrap();
}
{
let merge_task: MergeTask = merge_split_downloader_inbox
.recv_typed_message::<MergeTask>()
let merge_source = merge_split_downloader_inbox
.recv_typed_message::<MergeSource>()
.await
.unwrap();
assert_eq!(
merge_task.merge_operation.splits[0].footer_offsets.end,
merge_source.as_operation().splits[0].footer_offsets.end,
4_000_000
);
let merge_task2: MergeTask = merge_split_downloader_inbox
.recv_typed_message::<MergeTask>()
let merge_source2 = merge_split_downloader_inbox
.recv_typed_message::<MergeSource>()
.await
.unwrap();
assert_eq!(
merge_task2.merge_operation.splits[0].footer_offsets.end,
merge_source2.as_operation().splits[0].footer_offsets.end,
3_000_000
);
assert!(
timeout(
Duration::from_millis(200),
merge_split_downloader_inbox.recv_typed_message::<MergeTask>()
merge_split_downloader_inbox.recv_typed_message::<MergeSource>()
)
.await
.is_err()
);
}
{
let merge_task: MergeTask = merge_split_downloader_inbox
.recv_typed_message::<MergeTask>()
let merge_source = merge_split_downloader_inbox
.recv_typed_message::<MergeSource>()
.await
.unwrap();
assert_eq!(
merge_task.merge_operation.splits[0].footer_offsets.end,
merge_source.as_operation().splits[0].footer_offsets.end,
1_000_000
);
}
{
let merge_task: MergeTask = merge_split_downloader_inbox
.recv_typed_message::<MergeTask>()
let merge_source = merge_split_downloader_inbox
.recv_typed_message::<MergeSource>()
.await
.unwrap();
assert_eq!(
merge_task.merge_operation.splits[0].footer_offsets.end,
merge_source.as_operation().splits[0].footer_offsets.end,
2_000_000
);
}
{
let merge_task: MergeTask = merge_split_downloader_inbox
.recv_typed_message::<MergeTask>()
let merge_source = merge_split_downloader_inbox
.recv_typed_message::<MergeSource>()
.await
.unwrap();
assert_eq!(
merge_task.merge_operation.splits[0].footer_offsets.end,
merge_source.as_operation().splits[0].footer_offsets.end,
5_000_000
);
}
Expand Down
Loading
Loading