Skip to content

Add MapAsync for concurrent collection processing#2408

Draft
GarrettBeatty wants to merge 3 commits into
gcbeatty/durable-parallelfrom
gcbeatty/durable-map
Draft

Add MapAsync for concurrent collection processing#2408
GarrettBeatty wants to merge 3 commits into
gcbeatty/durable-parallelfrom
gcbeatty/durable-map

Conversation

@GarrettBeatty
Copy link
Copy Markdown
Contributor

@GarrettBeatty GarrettBeatty commented Jun 5, 2026

#2216

What

Adds IDurableContext.MapAsync to Amazon.Lambda.DurableExecution. MapAsync processes a collection in parallel with one child context per item, mirroring the Python/JS/Java SDKs where Map is a sibling of Parallel sharing one concurrency engine. It reuses the IBatchResult<T> family and concurrency/completion machinery introduced by ParallelAsync in #2375.

Public API:

Type Purpose
IDurableContext.MapAsync<TItem, TResult>(IEnumerable<TItem>, Func<TItem, ...>, ...) Process each item concurrently in its own child context.
MapConfig MaxConcurrency, CompletionConfig, NestingType, ItemNamer. Defaults CompletionConfig to AllCompleted() (permissive), matching Python/Java Map — intentionally differs from ParallelConfig's AllSuccessful().
ItemNamer Supplies a per-item name for traces / test inspection. (No ItemBatcher — not implemented in any reference SDK.)
MapException Thrown when CompletionConfig signals FailureToleranceExceeded, so callers can distinguish Map failures from Parallel failures; carries the IBatchResult<T>.

Implementation notes:

  • Extracts a ConcurrentOperation<T> base holding all orchestration, completion, checkpoint, and replay logic. ParallelOperation and MapOperation are thin subclasses supplying only the per-unit (name, func), sub-type labels, and failure-exception factory.
  • Generalizes ParallelSummary / ParallelJsonContext into shared BatchSummary / BatchJsonContext.

Per-item checkpoint payloads are serialized via the ILambdaSerializer registered on ILambdaContext.Serializer — the same pattern as ParallelAsync / StepAsync / RunInChildContextAsync. The AOT story is determined entirely by which serializer the user registers with the runtime (e.g., SourceGeneratorLambdaJsonSerializer<TContext>).

Testing

24 new unit tests in MapOperationTests.cs, mirroring the Parallel set:

  • CompletionConfig matrix: AllSuccessful, AllCompleted, FirstSuccessful, MinSuccessful, ToleratedFailureCount, ToleratedFailurePercentage — both pass and fail thresholds.
  • Concurrency: MaxConcurrency enforced; unbounded when null.
  • Replay determinism: mixed-status replay, named vs. unnamed items via ItemNamer.
  • IBatchResult<T> accessors and GetResults / GetErrors / ThrowIfError semantics.

6 new integration functions/tests mirroring the Parallel set (require AWS credentials to run). Full suite 325/325 on net8.0 and net10.0.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

Implements IDurableContext.MapAsync, processing a collection in parallel
with one child context per item. Mirrors the Python/JS/Java SDKs, where
Map is a sibling of Parallel sharing one concurrency engine.

- Extract ConcurrentOperation<T> base holding all orchestration, completion,
  checkpoint, and replay logic; ParallelOperation and MapOperation are thin
  subclasses supplying only the per-unit (name, func), sub-type labels, and
  failure-exception factory.
- MapConfig defaults CompletionConfig to AllCompleted() (permissive), matching
  Python/Java Map; intentionally differs from ParallelConfig's AllSuccessful().
  Adds ItemNamer; no ItemBatcher (not implemented in any reference SDK).
- New MapException so callers can distinguish Map from Parallel failures.
- Generalize ParallelSummary/ParallelJsonContext into shared BatchSummary/
  BatchJsonContext.
- Tests: 24 unit tests (MapOperationTests) + 6 integration functions/tests
  mirroring the Parallel set. Full suite 325/325 on net8.0 and net10.0.

COPY bin/publish/ ${LAMBDA_TASK_ROOT}

ENTRYPOINT ["/var/task/bootstrap"]

COPY bin/publish/ ${LAMBDA_TASK_ROOT}

ENTRYPOINT ["/var/task/bootstrap"]

COPY bin/publish/ ${LAMBDA_TASK_ROOT}

ENTRYPOINT ["/var/task/bootstrap"]

COPY bin/publish/ ${LAMBDA_TASK_ROOT}

ENTRYPOINT ["/var/task/bootstrap"]

COPY bin/publish/ ${LAMBDA_TASK_ROOT}

ENTRYPOINT ["/var/task/bootstrap"]

COPY bin/publish/ ${LAMBDA_TASK_ROOT}

ENTRYPOINT ["/var/task/bootstrap"]
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a new concurrent collection-processing primitive, IDurableContext.MapAsync, to the Amazon.Lambda.DurableExecution SDK. It mirrors the existing ParallelAsync concurrency engine while introducing Map-specific defaults (notably a permissive AllCompleted() completion policy) and Map-specific exception semantics.

Changes:

  • Introduces MapAsync + MapConfig + MapException, and a new MapOperation built on shared concurrency orchestration.
  • Refactors the ParallelAsync implementation by extracting shared logic into ConcurrentOperation<T> and generalizing parent checkpoint summaries to BatchSummary.
  • Adds unit and integration tests plus integration test Lambda function projects for Map scenarios (happy path, partial failure, max concurrency, first-successful, failure tolerance, replay determinism).

Reviewed changes

Copilot reviewed 41 out of 41 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
MAP-IMPLEMENTATION-PLAN.md Design/implementation plan for MapAsync and related refactors.
Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs Updates replay JSON payload shape from Branches to Units for parallel tests.
Libraries/test/Amazon.Lambda.DurableExecution.Tests/MapOperationTests.cs Adds comprehensive unit tests for MapAsync behavior, replay, naming, and completion policies.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/MapReplayDeterminismFunction.csproj New integration test function project for map replay determinism.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/Function.cs Workflow that forces suspend/resume to validate per-item replay determinism.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/Dockerfile Container packaging for the replay determinism test function.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/MapPartialFailureFunction.csproj New integration test function project for permissive-default partial failure.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/Function.cs Workflow validating Map default AllCompleted() preserves partial failures without failing workflow.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/Dockerfile Container packaging for the partial failure test function.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/MapMaxConcurrencyFunction.csproj New integration test function project for MaxConcurrency.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/Function.cs Workflow validating dispatch throttling via durable waits + timestamps.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/Dockerfile Container packaging for the max concurrency test function.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/MapHappyPathFunction.csproj New integration test function project for map happy path.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/Function.cs Workflow validating step-per-item processing and ItemNamer visibility.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/Dockerfile Container packaging for the happy path test function.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/MapFirstSuccessfulFunction.csproj New integration test function project for first-successful short-circuit.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/Function.cs Workflow validating FirstSuccessful() and started/unfinished item reporting.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/Dockerfile Container packaging for the first-successful test function.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/MapFailureToleranceFunction.csproj New integration test function project for failure tolerance exceeded.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/Function.cs Workflow validating failure tolerance triggers MapException and fails workflow.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/Dockerfile Container packaging for the failure tolerance test function.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapReplayDeterminismTest.cs Integration test asserting deterministic item operation IDs and replayed step results.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapPartialFailureTest.cs Integration test asserting permissive default allows partial failure with SUCCEEDED workflow.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapMaxConcurrencyTest.cs Integration test asserting MaxConcurrency throttles dispatch waves.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapHappyPathTest.cs Integration test validating end-to-end happy path and history events/names.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapFirstSuccessfulTest.cs Integration test validating first-successful short-circuit behavior.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapFailureToleranceTest.cs Integration test validating failure tolerance triggers FAILED workflow + MapException indication.
Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs Adds OperationSubTypes.Map and OperationSubTypes.MapItem.
Libraries/src/Amazon.Lambda.DurableExecution/MapConfig.cs Adds Map configuration object with permissive default completion config and ItemNamer.
Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelSummary.cs Removes the Parallel-specific summary type (replaced by shared BatchSummary).
Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelOperation.cs Refactors ParallelOperation to a thin subclass of ConcurrentOperation.
Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelJsonContext.cs Removes Parallel-specific JSON context (replaced by BatchJsonContext).
Libraries/src/Amazon.Lambda.DurableExecution/Internal/MapOperation.cs Adds MapOperation as a thin subclass of ConcurrentOperation.
Libraries/src/Amazon.Lambda.DurableExecution/Internal/ConcurrentOperation.cs Adds extracted shared orchestration/replay/checkpoint engine for Parallel + Map.
Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchSummary.cs Adds shared parent checkpoint payload type for concurrent ops.
Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchJsonContext.cs Adds shared source-gen JSON context for BatchSummary payloads.
Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs Adds public MapAsync<TItem, TResult> API with XML docs.
Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs Adds MapException type parallel to ParallelException.
Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs Wires MapAsync into the durable context runtime implementation.
Docs/durable-execution-design.md Updates design docs: Map default completion behavior and removes ItemBatcher references.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +14 to +21
internal sealed class BatchSummary
{
[JsonPropertyName("CompletionReason")]
public string? CompletionReason { get; set; }

[JsonPropertyName("Units")]
public IList<BatchUnitSummary> Units { get; set; } = new List<BatchUnitSummary>();
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine - im going to release mapasync with the parallasync change together

Comment on lines +527 to +539
var payload = JsonSerializer.Serialize(summary, BatchJsonContext.Default.BatchSummary);
var failed = failureException != null;

await EnqueueAsync(new SdkOperationUpdate
{
Id = OperationId,
Type = OperationTypes.Context,
Action = failed ? OperationAction.FAIL : OperationAction.SUCCEED,
SubType = ParentSubType,
Name = Name,
Payload = failed ? null : payload,
Error = failed ? BuildAggregateError(result, failureException!) : null
}, cancellationToken);
Comment on lines +292 to +307
// Build BatchItems for every unit in original order.
var items = new List<IBatchItem<T>>(unitCount);
for (var i = 0; i < unitCount; i++)
{
var (unitName, _) = GetUnit(i);
if (dispatched[i])
{
var outcome = slots[i];
items.Add(new BatchItem<T>
{
Index = i,
Name = unitName,
Status = outcome.Status,
Result = outcome.Status == BatchItemStatus.Succeeded ? outcome.Result : default,
Error = outcome.Status == BatchItemStatus.Failed ? outcome.Error : null
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants