Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 116 additions & 25 deletions clients/rust/src/many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ const MAX_INDIVIDUAL_CONCURRENCY: usize = 5;
/// Maximum number of requests to the batch endpoint that can be executed concurrently.
const MAX_BATCH_CONCURRENCY: usize = 3;

// TODO: add limit and logic for whole batch request body size
/// Maximum total body size (pre-compression) to include in a single batch request.
const MAX_BATCH_BODY_SIZE: u64 = 100 * 1024 * 1024; // 100 MB

/// A builder that can be used to enqueue multiple operations.
///
Expand Down Expand Up @@ -230,8 +231,8 @@ impl OperationContext {

/// The result of classifying a single operation for batch processing.
enum Classified {
/// The operation can be included in a batch request.
Batchable(BatchOperation),
/// The operation can be included in a batch request, with its estimated body size in bytes.
Batchable(BatchOperation, u64),
/// The operation must be executed as an individual request (e.g., oversized file body).
Individual(BatchOperation),
/// An error was encountered during classification.
Expand Down Expand Up @@ -521,13 +522,12 @@ async fn classify(op: BatchOperation) -> Classified {
body,
};

if size.is_some_and(|s| s <= MAX_BATCH_PART_SIZE as u64) {
Classified::Batchable(op)
} else {
Classified::Individual(op)
match size {
Some(s) if s <= MAX_BATCH_PART_SIZE as u64 => Classified::Batchable(op, s),
_ => Classified::Individual(op),
}
}
other => Classified::Batchable(other),
other => Classified::Batchable(other, 0),
}
}

Expand All @@ -537,7 +537,7 @@ async fn classify(op: BatchOperation) -> Classified {
async fn partition(
operations: Vec<BatchOperation>,
) -> (
Vec<BatchOperation>,
Vec<(BatchOperation, u64)>,
Vec<BatchOperation>,
Vec<OperationResult>,
) {
Expand All @@ -547,7 +547,7 @@ async fn partition(
let mut failed = Vec::new();
for item in classified {
match item {
Classified::Batchable(op) => batchable.push(op),
Classified::Batchable(op, size) => batchable.push((op, size)),
Classified::Individual(op) => individual.push(op),
Classified::Failed(result) => failed.push(result),
}
Expand Down Expand Up @@ -615,6 +615,34 @@ async fn execute_batch(operations: Vec<BatchOperation>, session: &Session) -> Ve
}
}

/// Returns a lazy iterator over batches of operations.
///
/// Each batch respects both the operation-count limit ([`MAX_BATCH_OPS`]) and the total body-size
/// limit ([`MAX_BATCH_BODY_SIZE`]).
fn iter_batches(ops: Vec<(BatchOperation, u64)>) -> impl Iterator<Item = Vec<BatchOperation>> {
let mut remaining = ops.into_iter().peekable();

std::iter::from_fn(move || {
remaining.peek()?;
let mut batch_size = 0;
let mut batch = Vec::new();

while let Some((_, op_size)) = remaining.peek() {
if batch.len() >= MAX_BATCH_OPS
|| (!batch.is_empty() && batch_size + op_size > MAX_BATCH_BODY_SIZE)
{
break;
}

let (op, op_size) = remaining.next().expect("peeked above");
batch_size += op_size;
batch.push(op);
}

Some(batch)
})
}

impl ManyBuilder {
/// Consumes this builder, returning a lazy stream over all the enqueued operations' results.
///
Expand All @@ -636,21 +664,14 @@ impl ManyBuilder {
})
.buffer_unordered(MAX_INDIVIDUAL_CONCURRENCY);

// Lazily chunk batchable operations and execute as batch requests, concurrently
let batch_results = futures_util::stream::unfold(batchable, |mut remaining| async {
if remaining.is_empty() {
return None;
}
let at = remaining.len().min(MAX_BATCH_OPS);
let chunk: Vec<_> = remaining.drain(..at).collect();
Some((chunk, remaining))
})
.map(move |chunk| {
let session = session.clone();
async move { execute_batch(chunk, &session).await }
})
.buffer_unordered(MAX_BATCH_CONCURRENCY)
.flat_map(futures_util::stream::iter);
// Chunk batchable operations and execute as batch requests, concurrently
let batch_results = futures_util::stream::iter(iter_batches(batchable))
.map(move |chunk| {
let session = session.clone();
async move { execute_batch(chunk, &session).await }
})
.buffer_unordered(MAX_BATCH_CONCURRENCY)
.flat_map(futures_util::stream::iter);

let results = futures_util::stream::iter(failed)
.chain(individual_results)
Expand All @@ -674,3 +695,73 @@ impl ManyBuilder {
self
}
}

#[cfg(test)]
mod tests {
use super::*;

/// Creates a dummy sized op for use in `iter_batches` tests.
fn op(size: u64) -> (BatchOperation, u64) {
(
BatchOperation::Delete {
key: "k".to_owned(),
},
size,
)
}

fn batch_sizes(batches: &[Vec<BatchOperation>]) -> Vec<usize> {
batches.iter().map(Vec::len).collect()
}

fn batches(ops: Vec<(BatchOperation, u64)>) -> Vec<Vec<BatchOperation>> {
iter_batches(ops).collect()
}

#[test]
fn iter_batches_empty() {
assert!(batches(vec![]).is_empty());
}

#[test]
fn iter_batches_single_batch_count_limit() {
// 1000 tiny ops → exactly one batch
let ops: Vec<_> = (0..1000).map(|_| op(1)).collect();
assert_eq!(batch_sizes(&batches(ops)), vec![1000]);
}

#[test]
fn iter_batches_splits_on_count_limit() {
// 1001 tiny ops → two batches: 1000 + 1
let ops: Vec<_> = (0..1001).map(|_| op(1)).collect();
assert_eq!(batch_sizes(&batches(ops)), vec![1000, 1]);
}

#[test]
fn iter_batches_exactly_at_size_limit() {
// 100 ops of 1 MB each = exactly 100 MB → one batch
let ops: Vec<_> = (0..100).map(|_| op(1024 * 1024)).collect();
assert_eq!(batch_sizes(&batches(ops)), vec![100]);
}

#[test]
fn iter_batches_splits_on_size_limit() {
// 101 ops of 1 MB each = 101 MB → two batches: 100 + 1
let ops: Vec<_> = (0..101).map(|_| op(1024 * 1024)).collect();
assert_eq!(batch_sizes(&batches(ops)), vec![100, 1]);
}

#[test]
fn iter_batches_size_limit_hits_before_count_limit() {
// 200 ops of ~600 KB each → size limit triggers before the 1000-op count limit
let op_size = 600 * 1024;
let ops: Vec<_> = (0..200).map(|_| op(op_size)).collect();
let result = batches(ops);
// Each batch holds floor(100 MB / 600 KB) ops
let per_batch = (MAX_BATCH_BODY_SIZE / op_size) as usize;
assert!(result.len() > 1, "expected multiple batches");
for batch in &result[..result.len() - 1] {
assert_eq!(batch.len(), per_batch);
}
}
}
Loading