diff --git a/clients/rust/src/many.rs b/clients/rust/src/many.rs index 79a145ff..dac92baa 100644 --- a/clients/rust/src/many.rs +++ b/clients/rust/src/many.rs @@ -38,7 +38,8 @@ const MAX_INDIVIDUAL_CONCURRENCY: usize = 5; /// Maximum number of requests to the batch endpoint that can be executed concurrently. const MAX_BATCH_CONCURRENCY: usize = 3; -// TODO: add limit and logic for whole batch request body size +/// Maximum total body size (pre-compression) to include in a single batch request. +const MAX_BATCH_BODY_SIZE: u64 = 100 * 1024 * 1024; // 100 MB /// A builder that can be used to enqueue multiple operations. /// @@ -230,8 +231,8 @@ impl OperationContext { /// The result of classifying a single operation for batch processing. enum Classified { - /// The operation can be included in a batch request. - Batchable(BatchOperation), + /// The operation can be included in a batch request, with its estimated body size in bytes. + Batchable(BatchOperation, u64), /// The operation must be executed as an individual request (e.g., oversized file body). Individual(BatchOperation), /// An error was encountered during classification. @@ -521,13 +522,12 @@ async fn classify(op: BatchOperation) -> Classified { body, }; - if size.is_some_and(|s| s <= MAX_BATCH_PART_SIZE as u64) { - Classified::Batchable(op) - } else { - Classified::Individual(op) + match size { + Some(s) if s <= MAX_BATCH_PART_SIZE as u64 => Classified::Batchable(op, s), + _ => Classified::Individual(op), } } - other => Classified::Batchable(other), + other => Classified::Batchable(other, 0), } } @@ -537,7 +537,7 @@ async fn classify(op: BatchOperation) -> Classified { async fn partition( operations: Vec, ) -> ( - Vec, + Vec<(BatchOperation, u64)>, Vec, Vec, ) { @@ -547,7 +547,7 @@ async fn partition( let mut failed = Vec::new(); for item in classified { match item { - Classified::Batchable(op) => batchable.push(op), + Classified::Batchable(op, size) => batchable.push((op, size)), Classified::Individual(op) => individual.push(op), Classified::Failed(result) => failed.push(result), } @@ -615,6 +615,34 @@ async fn execute_batch(operations: Vec, session: &Session) -> Ve } } +/// Returns a lazy iterator over batches of operations. +/// +/// Each batch respects both the operation-count limit ([`MAX_BATCH_OPS`]) and the total body-size +/// limit ([`MAX_BATCH_BODY_SIZE`]). +fn iter_batches(ops: Vec<(BatchOperation, u64)>) -> impl Iterator> { + let mut remaining = ops.into_iter().peekable(); + + std::iter::from_fn(move || { + remaining.peek()?; + let mut batch_size = 0; + let mut batch = Vec::new(); + + while let Some((_, op_size)) = remaining.peek() { + if batch.len() >= MAX_BATCH_OPS + || (!batch.is_empty() && batch_size + op_size > MAX_BATCH_BODY_SIZE) + { + break; + } + + let (op, op_size) = remaining.next().expect("peeked above"); + batch_size += op_size; + batch.push(op); + } + + Some(batch) + }) +} + impl ManyBuilder { /// Consumes this builder, returning a lazy stream over all the enqueued operations' results. /// @@ -636,21 +664,14 @@ impl ManyBuilder { }) .buffer_unordered(MAX_INDIVIDUAL_CONCURRENCY); - // Lazily chunk batchable operations and execute as batch requests, concurrently - let batch_results = futures_util::stream::unfold(batchable, |mut remaining| async { - if remaining.is_empty() { - return None; - } - let at = remaining.len().min(MAX_BATCH_OPS); - let chunk: Vec<_> = remaining.drain(..at).collect(); - Some((chunk, remaining)) - }) - .map(move |chunk| { - let session = session.clone(); - async move { execute_batch(chunk, &session).await } - }) - .buffer_unordered(MAX_BATCH_CONCURRENCY) - .flat_map(futures_util::stream::iter); + // Chunk batchable operations and execute as batch requests, concurrently + let batch_results = futures_util::stream::iter(iter_batches(batchable)) + .map(move |chunk| { + let session = session.clone(); + async move { execute_batch(chunk, &session).await } + }) + .buffer_unordered(MAX_BATCH_CONCURRENCY) + .flat_map(futures_util::stream::iter); let results = futures_util::stream::iter(failed) .chain(individual_results) @@ -674,3 +695,73 @@ impl ManyBuilder { self } } + +#[cfg(test)] +mod tests { + use super::*; + + /// Creates a dummy sized op for use in `iter_batches` tests. + fn op(size: u64) -> (BatchOperation, u64) { + ( + BatchOperation::Delete { + key: "k".to_owned(), + }, + size, + ) + } + + fn batch_sizes(batches: &[Vec]) -> Vec { + batches.iter().map(Vec::len).collect() + } + + fn batches(ops: Vec<(BatchOperation, u64)>) -> Vec> { + iter_batches(ops).collect() + } + + #[test] + fn iter_batches_empty() { + assert!(batches(vec![]).is_empty()); + } + + #[test] + fn iter_batches_single_batch_count_limit() { + // 1000 tiny ops → exactly one batch + let ops: Vec<_> = (0..1000).map(|_| op(1)).collect(); + assert_eq!(batch_sizes(&batches(ops)), vec![1000]); + } + + #[test] + fn iter_batches_splits_on_count_limit() { + // 1001 tiny ops → two batches: 1000 + 1 + let ops: Vec<_> = (0..1001).map(|_| op(1)).collect(); + assert_eq!(batch_sizes(&batches(ops)), vec![1000, 1]); + } + + #[test] + fn iter_batches_exactly_at_size_limit() { + // 100 ops of 1 MB each = exactly 100 MB → one batch + let ops: Vec<_> = (0..100).map(|_| op(1024 * 1024)).collect(); + assert_eq!(batch_sizes(&batches(ops)), vec![100]); + } + + #[test] + fn iter_batches_splits_on_size_limit() { + // 101 ops of 1 MB each = 101 MB → two batches: 100 + 1 + let ops: Vec<_> = (0..101).map(|_| op(1024 * 1024)).collect(); + assert_eq!(batch_sizes(&batches(ops)), vec![100, 1]); + } + + #[test] + fn iter_batches_size_limit_hits_before_count_limit() { + // 200 ops of ~600 KB each → size limit triggers before the 1000-op count limit + let op_size = 600 * 1024; + let ops: Vec<_> = (0..200).map(|_| op(op_size)).collect(); + let result = batches(ops); + // Each batch holds floor(100 MB / 600 KB) ops + let per_batch = (MAX_BATCH_BODY_SIZE / op_size) as usize; + assert!(result.len() > 1, "expected multiple batches"); + for batch in &result[..result.len() - 1] { + assert_eq!(batch.len(), per_batch); + } + } +}