From 351200886f910fcb99fdca55580ef6855c7c66fc Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Wed, 1 Apr 2026 11:31:13 +0200 Subject: [PATCH 1/2] feat(client): Add 100 MB per-batch body size limit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cap the total pre-compression body size per batch request at 100 MB in addition to the existing 1000-operation count limit. Previously, up to 1000 × 1 MB = 1 GB could be sent in a single request. Introduces `iter_batches`, a lazy iterator that splits classified operations into batches respecting both limits, replacing the `unfold`- based chunking in `send()`. The operation size is threaded through `Classified::Batchable` and `partition()` to avoid redundant stat calls. Co-Authored-By: Claude --- clients/rust/src/many.rs | 136 ++++++++++++++++++++++++++++++++------- 1 file changed, 114 insertions(+), 22 deletions(-) diff --git a/clients/rust/src/many.rs b/clients/rust/src/many.rs index 79a145ff..50174c1a 100644 --- a/clients/rust/src/many.rs +++ b/clients/rust/src/many.rs @@ -38,7 +38,8 @@ const MAX_INDIVIDUAL_CONCURRENCY: usize = 5; /// Maximum number of requests to the batch endpoint that can be executed concurrently. const MAX_BATCH_CONCURRENCY: usize = 3; -// TODO: add limit and logic for whole batch request body size +/// Maximum total body size (pre-compression) to include in a single batch request. +const MAX_BATCH_BODY_SIZE: u64 = 100 * 1024 * 1024; // 100 MB /// A builder that can be used to enqueue multiple operations. /// @@ -230,8 +231,8 @@ impl OperationContext { /// The result of classifying a single operation for batch processing. enum Classified { - /// The operation can be included in a batch request. - Batchable(BatchOperation), + /// The operation can be included in a batch request, with its estimated body size in bytes. + Batchable(BatchOperation, u64), /// The operation must be executed as an individual request (e.g., oversized file body). Individual(BatchOperation), /// An error was encountered during classification. @@ -522,12 +523,12 @@ async fn classify(op: BatchOperation) -> Classified { }; if size.is_some_and(|s| s <= MAX_BATCH_PART_SIZE as u64) { - Classified::Batchable(op) + Classified::Batchable(op, size.unwrap_or(0)) } else { Classified::Individual(op) } } - other => Classified::Batchable(other), + other => Classified::Batchable(other, 0), } } @@ -537,7 +538,7 @@ async fn classify(op: BatchOperation) -> Classified { async fn partition( operations: Vec, ) -> ( - Vec, + Vec<(BatchOperation, u64)>, Vec, Vec, ) { @@ -547,7 +548,7 @@ async fn partition( let mut failed = Vec::new(); for item in classified { match item { - Classified::Batchable(op) => batchable.push(op), + Classified::Batchable(op, size) => batchable.push((op, size)), Classified::Individual(op) => individual.push(op), Classified::Failed(result) => failed.push(result), } @@ -615,6 +616,34 @@ async fn execute_batch(operations: Vec, session: &Session) -> Ve } } +/// Returns a lazy iterator over batches of operations. +/// +/// Each batch respects both the operation-count limit ([`MAX_BATCH_OPS`]) and the total body-size +/// limit ([`MAX_BATCH_BODY_SIZE`]). +fn iter_batches(ops: Vec<(BatchOperation, u64)>) -> impl Iterator> { + let mut remaining = ops.into_iter().peekable(); + + std::iter::from_fn(move || { + remaining.peek()?; + let mut batch_size = 0; + let mut batch = Vec::new(); + + while let Some((_, op_size)) = remaining.peek() { + if batch.len() >= MAX_BATCH_OPS + || (!batch.is_empty() && batch_size + op_size > MAX_BATCH_BODY_SIZE) + { + break; + } + + let (op, op_size) = remaining.next().expect("peeked above"); + batch_size += op_size; + batch.push(op); + } + + Some(batch) + }) +} + impl ManyBuilder { /// Consumes this builder, returning a lazy stream over all the enqueued operations' results. /// @@ -636,21 +665,14 @@ impl ManyBuilder { }) .buffer_unordered(MAX_INDIVIDUAL_CONCURRENCY); - // Lazily chunk batchable operations and execute as batch requests, concurrently - let batch_results = futures_util::stream::unfold(batchable, |mut remaining| async { - if remaining.is_empty() { - return None; - } - let at = remaining.len().min(MAX_BATCH_OPS); - let chunk: Vec<_> = remaining.drain(..at).collect(); - Some((chunk, remaining)) - }) - .map(move |chunk| { - let session = session.clone(); - async move { execute_batch(chunk, &session).await } - }) - .buffer_unordered(MAX_BATCH_CONCURRENCY) - .flat_map(futures_util::stream::iter); + // Chunk batchable operations and execute as batch requests, concurrently + let batch_results = futures_util::stream::iter(iter_batches(batchable)) + .map(move |chunk| { + let session = session.clone(); + async move { execute_batch(chunk, &session).await } + }) + .buffer_unordered(MAX_BATCH_CONCURRENCY) + .flat_map(futures_util::stream::iter); let results = futures_util::stream::iter(failed) .chain(individual_results) @@ -674,3 +696,73 @@ impl ManyBuilder { self } } + +#[cfg(test)] +mod tests { + use super::*; + + /// Creates a dummy sized op for use in `iter_batches` tests. + fn op(size: u64) -> (BatchOperation, u64) { + ( + BatchOperation::Delete { + key: "k".to_owned(), + }, + size, + ) + } + + fn batch_sizes(batches: &[Vec]) -> Vec { + batches.iter().map(Vec::len).collect() + } + + fn batches(ops: Vec<(BatchOperation, u64)>) -> Vec> { + iter_batches(ops).collect() + } + + #[test] + fn iter_batches_empty() { + assert!(batches(vec![]).is_empty()); + } + + #[test] + fn iter_batches_single_batch_count_limit() { + // 1000 tiny ops → exactly one batch + let ops: Vec<_> = (0..1000).map(|_| op(1)).collect(); + assert_eq!(batch_sizes(&batches(ops)), vec![1000]); + } + + #[test] + fn iter_batches_splits_on_count_limit() { + // 1001 tiny ops → two batches: 1000 + 1 + let ops: Vec<_> = (0..1001).map(|_| op(1)).collect(); + assert_eq!(batch_sizes(&batches(ops)), vec![1000, 1]); + } + + #[test] + fn iter_batches_exactly_at_size_limit() { + // 100 ops of 1 MB each = exactly 100 MB → one batch + let ops: Vec<_> = (0..100).map(|_| op(1024 * 1024)).collect(); + assert_eq!(batch_sizes(&batches(ops)), vec![100]); + } + + #[test] + fn iter_batches_splits_on_size_limit() { + // 101 ops of 1 MB each = 101 MB → two batches: 100 + 1 + let ops: Vec<_> = (0..101).map(|_| op(1024 * 1024)).collect(); + assert_eq!(batch_sizes(&batches(ops)), vec![100, 1]); + } + + #[test] + fn iter_batches_size_limit_hits_before_count_limit() { + // 200 ops of ~600 KB each → size limit triggers before the 1000-op count limit + let op_size = 600 * 1024; + let ops: Vec<_> = (0..200).map(|_| op(op_size)).collect(); + let result = batches(ops); + // Each batch holds floor(100 MB / 600 KB) ops + let per_batch = (MAX_BATCH_BODY_SIZE / op_size) as usize; + assert!(result.len() > 1, "expected multiple batches"); + for batch in &result[..result.len() - 1] { + assert_eq!(batch.len(), per_batch); + } + } +} From e6910fcc84cd5ba6ad3c67d2bc189aeefd210b4a Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 2 Apr 2026 09:04:28 +0200 Subject: [PATCH 2/2] ref: Apply review suggestion --- clients/rust/src/many.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/clients/rust/src/many.rs b/clients/rust/src/many.rs index 50174c1a..dac92baa 100644 --- a/clients/rust/src/many.rs +++ b/clients/rust/src/many.rs @@ -522,10 +522,9 @@ async fn classify(op: BatchOperation) -> Classified { body, }; - if size.is_some_and(|s| s <= MAX_BATCH_PART_SIZE as u64) { - Classified::Batchable(op, size.unwrap_or(0)) - } else { - Classified::Individual(op) + match size { + Some(s) if s <= MAX_BATCH_PART_SIZE as u64 => Classified::Batchable(op, s), + _ => Classified::Individual(op), } } other => Classified::Batchable(other, 0),