Safe, async-first parallel operators with bounded concurrency, retries, cancellation, and streaming backpressure for I/O-heavy workloads.
Safe, async-first parallel operators with bounded concurrency, retries, and backpressure for I/O-heavy workloads. Docs
Key Features:
- Bounded Concurrency - Control max parallel operations with backpressure
- Adaptive Concurrency - Auto-scale workers based on latency and success rate (AIMD algorithm)
- Retry Policies - Automatic retries with exponential backoff for transient errors
- Circuit Breaker - Prevent cascading failures with automatic service protection
- Rate Limiting - Token bucket algorithm for controlling operation rates
- Error Handling Modes - FailFast, CollectAndContinue, or BestEffort
- Streaming Support - Process results incrementally via
IAsyncEnumerable<T> - Ordered Output - Maintain input sequence order when needed
- Runtime Metrics - Built-in monitoring via EventCounters and custom callbacks
- Progress Reporting - Periodic snapshots with throughput, ETA, and percent-complete
- Cancellation - Full
CancellationTokensupport throughout - Lifecycle Hooks - OnStart, OnComplete, OnRetry, OnError, OnThrottle, OnDrain callbacks
- Fallback Values - Supply default results for failed items instead of throwing
- Per-Item Timeouts - Enforce timeouts for individual operations
- Works with both
IEnumerable<T>andIAsyncEnumerable<T>
Enterprise observability for Rivulet.Core with EventListener wrappers, metric aggregators, and health check integration. Docs
Key Features:
- EventListener Wrappers: Console, File, and Structured JSON logging
- Metrics Aggregation: Time-window based metric aggregation with statistics
- Prometheus Export: Export metrics in Prometheus text format
- Health Check Integration: Microsoft.Extensions.Diagnostics.HealthChecks support
- Fluent Builder API: Easy configuration with DiagnosticsBuilder
OpenTelemetry integration for Rivulet.Core providing distributed tracing, metrics export, and comprehensive observability. Docs
Key Features:
- Distributed Tracing: Automatic activity creation with parent-child relationships
- Metrics Export: Bridge EventCounters to OpenTelemetry Meters
- Retry Tracking: Record retry attempts as activity events
- Circuit Breaker Events: Track circuit state changes in traces
- Adaptive Concurrency: Monitor concurrency adjustments
- Error Correlation: Link errors with retry attempts and transient classification
Integration package for using Rivulet with Microsoft.Extensions.Hosting, ASP.NET Core, and the .NET Generic Host. Docs
Key Features:
- Dependency Injection integration
- Configuration binding for
ParallelOptionsRivulet - Base classes for parallel background services
- Health checks for monitoring parallel operations
- Support for ASP.NET Core and Worker Services
Testing utilities for Rivulet parallel operations including deterministic schedulers, virtual time, fake channels, and chaos injection. Docs
Key Features:
- VirtualTimeProvider: Control time in tests without actual delays
- FakeChannel: Testable channel implementation with operation tracking
- ChaosInjector: Inject failures and delays for resilience testing
- ConcurrencyAsserter: Assert and verify concurrency behavior
Parallel HTTP operations with automatic retries, resilient downloads, and HttpClientFactory integration. Docs
Key Features:
- HttpClientFactory integration
- Connection pooling awareness
- Transient error handling (timeouts, 5xx responses)
- Bounded concurrency to avoid overwhelming servers
- Progress reporting for downloads
Parallel file and directory operations with bounded concurrency, resilience, and streaming support for efficient I/O processing. Docs
Key Features:
- Safe concurrent file access
- Directory tree processing
- File pattern matching (glob patterns)
- Progress reporting
- Atomic write operations
Safe parallel SQL operations with connection pooling awareness and bulk operations. Docs
Key Features:
- Works with any ADO.NET provider
- Connection pooling awareness
- Transaction support
- Parameterized queries
- Respects database connection pool limits
SQL Server-specific optimizations for Rivulet.Sql including SqlBulkCopy integration for 10-100x faster bulk inserts. Docs
Key Features:
- SqlBulkCopy Integration: Ultra-high performance bulk inserts (50,000+ rows/sec)
- Parallel Bulk Operations: Process multiple batches in parallel
- Automatic Column Mapping: Maps DataTable columns to SQL Server table columns
- Custom Column Mappings: Support for explicit source-to-destination column mappings
- DataReader Support: Bulk insert from IDataReader sources
- Configurable Batching: Control batch size and timeout settings
PostgreSQL-specific optimizations for Rivulet.Sql including COPY command integration for 10-100x faster bulk inserts. Docs
Key Features:
- COPY Command Integration: Ultra-high performance bulk inserts using COPY
- Multiple Formats: Binary, CSV, and text formats supported
- Parallel Operations: Process multiple batches in parallel
- Streaming Import: Efficient memory usage with streaming
- Custom Delimiters: Support for CSV with custom delimiters
- Header Support: Handle CSV files with headers
MySQL-specific optimizations for Rivulet.Sql using MySqlBulkLoader (LOAD DATA LOCAL INFILE) for 10-100x faster bulk inserts. Docs
Key Features:
- MySqlBulkLoader: LOAD DATA LOCAL INFILE for maximum performance
- File-based Loading: Direct file import support
- Parallel Operations: Process multiple batches in parallel
- Custom Delimiters: Support for any field separator
- Automatic Column Mapping: Maps columns automatically
Integration between Rivulet parallel processing and Polly resilience policies. Docs
Key Features:
- Use Polly policies with Rivulet - Apply any Polly policy to parallel operations
- Convert Rivulet to Polly - Use Rivulet configuration as standalone Polly policies
- Advanced resilience patterns - Hedging, result-based retry, and more
- Battle-tested - Built on Polly's production-proven resilience library
Multi-stage pipeline composition for Rivulet with fluent API, per-stage concurrency, backpressure management between stages, and streaming support. Docs
Key Features:
- Fluent Builder API - Type-safe pipeline construction with IntelliSense support
- Per-Stage Concurrency - Different parallelism levels for each processing stage
- Backpressure Management - Automatic flow control between stages using channels
- Streaming & Buffered Modes - Memory-efficient streaming or materialized results
- Full Rivulet.Core Integration - Retries, circuit breakers, rate limiting, metrics
Parallel CSV parsing and writing with CsvHelper integration, bounded concurrency, and batching support for high-throughput data processing. Docs
Key Features:
- CsvHelper integration for robust CSV parsing
- Multi-type operations (2-5 generic type parameters)
- Memory-efficient streaming with IAsyncEnumerable
- Per-file CSV configuration (delimiters, culture, class maps)
- Progress tracking and lifecycle callbacks
- Error handling modes (FailFast, CollectAndContinue, BestEffort)
- Circuit breaker and retry support
- Ordered and unordered output options
- Async-first (
ValueTask), works withIEnumerable<T>andIAsyncEnumerable<T> - Bounded concurrency with backpressure (Channels)
- Retry policy with transient detection and configurable backoff strategies (Exponential, ExponentialJitter, DecorrelatedJitter, Linear, LinearJitter)
- Per-item timeouts, cancellation, lifecycle hooks
- Flexible error modes: FailFast, CollectAndContinue, BestEffort
- Ordered output mode for sequence-sensitive operations
var results = await urls.SelectParallelAsync(
async (url, ct) =>
{
using var resp = await http.GetAsync(url, ct);
resp.EnsureSuccessStatusCode();
return (url, (int)resp.StatusCode);
},
new ParallelOptionsRivulet {
MaxDegreeOfParallelism = 32,
MaxRetries = 3,
IsTransient = ex => ex is HttpRequestException or TaskCanceledException,
ErrorMode = ErrorMode.CollectAndContinue
});await foreach (var r in source.SelectParallelStreamAsync(
async (x, ct) => await ComputeAsync(x, ct),
new ParallelOptionsRivulet { MaxDegreeOfParallelism = 16 }))
{
// consume incrementally
}Maintain input order when sequence matters:
// Results returned in same order as input, despite parallel processing
var results = await items.SelectParallelAsync(
async (item, ct) => await ProcessAsync(item, ct),
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
OrderedOutput = true // Ensures results match input order
});
// Streaming with ordered output
await foreach (var result in source.SelectParallelStreamAsync(
async (x, ct) => await TransformAsync(x, ct),
new ParallelOptionsRivulet { OrderedOutput = true }))
{
// Results arrive in input order
}Choose from multiple backoff strategies to optimize retry behavior:
// Exponential backoff with jitter - recommended for rate-limited APIs
// Reduces thundering herd by randomizing retry delays
var results = await requests.SelectParallelAsync(
async (req, ct) => await apiClient.SendAsync(req, ct),
new ParallelOptionsRivulet
{
MaxRetries = 4,
BaseDelay = TimeSpan.FromMilliseconds(100),
BackoffStrategy = BackoffStrategy.ExponentialJitter, // Random(0, BaseDelay * 2^attempt)
IsTransient = ex => ex is HttpRequestException
});
// Decorrelated jitter - best for preventing synchronization across multiple clients
var results = await tasks.SelectParallelAsync(
async (task, ct) => await ProcessAsync(task, ct),
new ParallelOptionsRivulet
{
MaxRetries = 3,
BackoffStrategy = BackoffStrategy.DecorrelatedJitter, // Random based on previous delay
IsTransient = ex => ex is TimeoutException
});
// Linear backoff - gentler, predictable increase
var results = await items.SelectParallelAsync(
async (item, ct) => await SaveAsync(item, ct),
new ParallelOptionsRivulet
{
MaxRetries = 5,
BaseDelay = TimeSpan.FromSeconds(1),
BackoffStrategy = BackoffStrategy.Linear, // BaseDelay * attempt
IsTransient = ex => ex is InvalidOperationException
});Available strategies:
- Exponential (default):
BaseDelay * 2^(attempt-1)- Predictable exponential growth - ExponentialJitter:
Random(0, BaseDelay * 2^(attempt-1))- Reduces thundering herd - DecorrelatedJitter:
Random(BaseDelay, PreviousDelay * 3)- Prevents client synchronization - Linear:
BaseDelay * attempt- Gentler, linear growth - LinearJitter:
Random(0, BaseDelay * attempt)- Linear with randomization
Track progress with real-time metrics for long-running operations:
// Monitor ETL job progress with ETA
var records = await database.GetRecordsAsync().SelectParallelAsync(
async (record, ct) => await TransformAndLoadAsync(record, ct),
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 20,
Progress = new ProgressOptions
{
ReportInterval = TimeSpan.FromSeconds(5),
OnProgress = progress =>
{
Console.WriteLine($"Progress: {progress.ItemsCompleted}/{progress.TotalItems}");
Console.WriteLine($"Rate: {progress.ItemsPerSecond:F1} items/sec");
Console.WriteLine($"ETA: {progress.EstimatedTimeRemaining}");
Console.WriteLine($"Errors: {progress.ErrorCount}");
return ValueTask.CompletedTask;
}
}
});
// Streaming progress (total unknown)
await foreach (var result in stream.SelectParallelStreamAsync(
async (item, ct) => await ProcessAsync(item, ct),
new ParallelOptionsRivulet
{
Progress = new ProgressOptions
{
ReportInterval = TimeSpan.FromSeconds(10),
OnProgress = progress =>
{
// No ETA or percent for streams - total is unknown
Console.WriteLine($"Processed: {progress.ItemsCompleted}");
Console.WriteLine($"Rate: {progress.ItemsPerSecond:F1} items/sec");
return ValueTask.CompletedTask;
}
}
}))
{
// Process results as they arrive
}Progress metrics:
- ItemsStarted: Total items that began processing
- ItemsCompleted: Successfully completed items
- TotalItems: Total count (known for arrays/lists, null for streams)
- ErrorCount: Failed items across all retries
- Elapsed: Time since operation started
- ItemsPerSecond: Processing rate
- EstimatedTimeRemaining: ETA (when total is known)
- PercentComplete: 0-100% (when total is known)
Process items in batches for bulk operations like database inserts, batch API calls, or file operations:
// Bulk database inserts - batch 100 records at a time
var results = await records.BatchParallelAsync(
batchSize: 100,
async (batch, ct) =>
{
// Insert entire batch in a single database call
await db.BulkInsertAsync(batch, ct);
return batch.Count;
},
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 4, // Process 4 batches in parallel
MaxRetries = 3,
IsTransient = ex => ex is SqlException
});
// Batch API calls with timeout - flush partial batches after delay
var apiResults = await items.BatchParallelAsync(
batchSize: 50,
async (batch, ct) =>
{
// Call API with batch of items
return await apiClient.ProcessBatchAsync(batch, ct);
},
batchTimeout: TimeSpan.FromSeconds(2) // Flush batch after 2 seconds even if not full
);
// Streaming batches from async source
await foreach (var result in dataStream.BatchParallelStreamAsync(
batchSize: 100,
async (batch, ct) =>
{
await ProcessBatchAsync(batch, ct);
return batch.Count;
},
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 8,
OrderedOutput = true, // Maintain batch order
Progress = new ProgressOptions
{
ReportInterval = TimeSpan.FromSeconds(5),
OnProgress = progress =>
{
Console.WriteLine($"Batches processed: {progress.ItemsCompleted}");
return ValueTask.CompletedTask;
}
}
}))
{
// Process batch results as they complete
Console.WriteLine($"Batch completed with {result} items");
}Key Features:
- Size-based batching: Groups items into batches of specified size
- Timeout-based flushing: Optional timeout to flush incomplete batches (async streams only)
- Parallel batch processing: Process multiple batches concurrently with bounded parallelism
- All existing features: Works with retries, error handling, progress tracking, ordered output
- Efficient for bulk operations: Reduces API calls, database round-trips, and I/O overhead
Use Cases:
- Bulk database inserts/updates/deletes
- Batch API calls to external services
- File processing in chunks
- Message queue batch processing
- ETL pipelines with staged operations
Monitor parallel operations with built-in metrics via .NET EventCounters and optional callbacks for custom monitoring systems:
// Zero-cost monitoring with EventCounters (always enabled)
// Monitor with: dotnet-counters monitor --process-id <PID> --counters Rivulet.Core
var results = await items.SelectParallelAsync(ProcessAsync, options);
// Custom metrics callback for Prometheus, DataDog, Application Insights
var options = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
Metrics = new MetricsOptions
{
SampleInterval = TimeSpan.FromSeconds(10),
OnMetricsSample = async snapshot =>
{
// Export to your monitoring system
await prometheus.RecordMetrics(new
{
active_workers = snapshot.ActiveWorkers,
items_completed = snapshot.ItemsCompleted,
throughput = snapshot.ItemsPerSecond,
error_rate = snapshot.ErrorRate,
total_retries = snapshot.TotalRetries
});
}
}
};
var results = await urls.SelectParallelAsync(
async (url, ct) => await httpClient.GetAsync(url, ct),
options);Available Metrics:
- ActiveWorkers: Current number of active worker tasks
- QueueDepth: Items waiting in the input channel queue
- ItemsStarted: Total items that began processing
- ItemsCompleted: Total items completed successfully
- TotalRetries: Cumulative retry attempts across all items
- TotalFailures: Total failed items (after all retries)
- ThrottleEvents: Backpressure events when queue is full
- ItemsPerSecond: Current throughput rate
- ErrorRate: Failure rate (TotalFailures / ItemsStarted)
- Elapsed: Time since operation started
EventCounters (zero-cost monitoring):
# Monitor in real-time with dotnet-counters
dotnet-counters monitor --process-id <PID> --counters Rivulet.Core
# Available counters:
# - items-started
# - items-completed
# - total-retries
# - total-failures
# - throttle-events
# - drain-eventsKey Features:
- Zero-cost when not monitored: EventCounters have minimal overhead
- Thread-safe: Uses lock-free Interlocked operations
- Callback isolation: Exceptions in callbacks don't break operations
- Integrates with all operators: SelectParallelAsync, SelectParallelStreamAsync, ForEachParallelAsync, BatchParallel*
Use Cases:
- Production monitoring and alerting
- Performance tuning and capacity planning
- Debugging throughput issues
- SLA compliance verification
- Auto-scaling triggers
Control the maximum rate of operations using the token bucket algorithm, perfect for respecting API rate limits or smoothing traffic bursts:
// Limit to 100 requests/sec with burst capacity of 200
var results = await apiUrls.SelectParallelAsync(
async (url, ct) => await httpClient.GetAsync(url, ct),
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
RateLimit = new RateLimitOptions
{
TokensPerSecond = 100, // Sustained rate
BurstCapacity = 200 // Allow brief bursts
}
});
// Heavy operations consume more tokens
var results = await heavyTasks.SelectParallelAsync(
async (task, ct) => await ProcessHeavyAsync(task, ct),
new ParallelOptionsRivulet
{
RateLimit = new RateLimitOptions
{
TokensPerSecond = 50,
BurstCapacity = 50,
TokensPerOperation = 5 // Each operation costs 5 tokens
}
});Key Features:
- Token bucket algorithm: Allows controlled bursts while maintaining average rate
- Configurable rates: Set tokens per second and burst capacity
- Weighted operations: Different operations can consume different token amounts
- Works with all operators: SelectParallel*, ForEachParallel*, BatchParallel*
- Combines with retries: Rate limiting applies to retry attempts too
Use Cases:
- Respect API rate limits (e.g., 1000 requests/hour)
- Smooth traffic bursts to downstream services
- Prevent resource exhaustion
- Control database connection usage
- Implement fair resource sharing
Protect your application from cascading failures when a downstream service is unhealthy. The circuit breaker monitors for failures and automatically fails fast when a threshold is reached, giving the failing service time to recover.
// Protect against a flaky API - open after 5 consecutive failures
var results = await urls.SelectParallelAsync(
async (url, ct) => await httpClient.GetAsync(url, ct),
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
CircuitBreaker = new CircuitBreakerOptions
{
FailureThreshold = 5, // Open after 5 consecutive failures
SuccessThreshold = 2, // Close after 2 consecutive successes in HalfOpen
OpenTimeout = TimeSpan.FromSeconds(30), // Test recovery after 30 seconds
OnStateChange = async (from, to) =>
{
Console.WriteLine($"Circuit {from} → {to}");
await LogCircuitStateChangeAsync(from, to);
}
}
});
// Time-based failure tracking (percentage within window)
var results = await requests.SelectParallelAsync(
async (req, ct) => await apiClient.SendAsync(req, ct),
new ParallelOptionsRivulet
{
CircuitBreaker = new CircuitBreakerOptions
{
FailureThreshold = 10, // Open if 10 failures occur...
SamplingDuration = TimeSpan.FromSeconds(60), // ...within 60 seconds
OpenTimeout = TimeSpan.FromMinutes(5)
}
});Circuit States:
- Closed: Normal operation. Operations execute normally. Failures are tracked.
- Open: Failure threshold exceeded. Operations fail immediately with
CircuitBreakerOpenExceptionwithout executing. Prevents cascading failures. - HalfOpen: After
OpenTimeoutexpires, circuit allows limited operations to test recovery. Success transitions to Closed. Failure transitions back to Open.
Key Features:
- Fail-fast protection: Prevents overwhelming failing services
- Automatic recovery testing: Transitions to HalfOpen after timeout to probe health
- Flexible failure tracking: Consecutive failures or time-window based (with
SamplingDuration) - State change callbacks: Monitor circuit transitions for alerting/logging
- Works with all operators: SelectParallel*, ForEachParallel*, BatchParallel*
Use Cases:
- Protecting downstream microservices from overload
- Preventing cascading failures in distributed systems
- Graceful degradation when dependencies are unhealthy
- Reducing latency by failing fast instead of waiting for timeouts
Automatically adjust parallelism based on real-time performance metrics. Instead of using a fixed MaxDegreeOfParallelism, adaptive concurrency dynamically scales workers up when performance is good and scales down when latency increases or errors occur.
// Auto-scale between 1-32 workers based on latency and success rate
var results = await urls.SelectParallelAsync(
async (url, ct) => await httpClient.GetAsync(url, ct),
new ParallelOptionsRivulet
{
AdaptiveConcurrency = new AdaptiveConcurrencyOptions
{
MinConcurrency = 1, // Lower bound
MaxConcurrency = 32, // Upper bound
InitialConcurrency = 8, // Starting point (optional)
TargetLatency = TimeSpan.FromMilliseconds(100), // Target p50 latency
MinSuccessRate = 0.95, // 95% success rate threshold
SampleInterval = TimeSpan.FromSeconds(1), // How often to adjust
OnConcurrencyChange = async (old, @new) =>
{
Console.WriteLine($"Concurrency: {old} → {@new}");
await metricsClient.RecordGaugeAsync("concurrency", @new);
}
}
});
// Different adjustment strategies
var results = await tasks.SelectParallelAsync(
async (task, ct) => await ProcessAsync(task, ct),
new ParallelOptionsRivulet
{
AdaptiveConcurrency = new AdaptiveConcurrencyOptions
{
MinConcurrency = 2,
MaxConcurrency = 64,
IncreaseStrategy = AdaptiveConcurrencyStrategy.Aggressive, // Faster increase
DecreaseStrategy = AdaptiveConcurrencyStrategy.Gradual, // Slower decrease
MinSuccessRate = 0.90
}
});How It Works: Uses AIMD (Additive Increase Multiplicative Decrease) algorithm similar to TCP congestion control:
- Increase: When success rate is high and latency is acceptable, add workers gradually (AIMD: +1, Aggressive: +10%)
- Decrease: When latency exceeds target or success rate drops, reduce workers sharply (AIMD/Aggressive: -50%, Gradual: -25%)
- Samples performance every
SampleIntervaland adjusts within[MinConcurrency, MaxConcurrency]bounds
Adjustment Strategies:
- AIMD (default): Additive Increase (+1), Multiplicative Decrease (-50%) - Like TCP
- Aggressive: Faster increase (+10%), same decrease (-50%) - For rapidly changing workloads
- Gradual: Same increase (+1), gentler decrease (-25%) - For stable workloads
Key Features:
- Self-tuning: Automatically finds optimal concurrency for current load
- Latency-aware: Reduces workers when operations are too slow
- Error-aware: Scales down when success rate drops below threshold
- Bounded: Always stays within configured min/max limits
- Observable: Callbacks for monitoring concurrency changes
- Works with all operators: SelectParallel*, ForEachParallel*, BatchParallel*
Use Cases:
- Variable load scenarios where optimal concurrency changes over time
- Auto-scaling to match downstream service capacity
- Preventing overload when downstream services slow down
- Maximizing throughput without manual tuning
- Handling unpredictable workload patterns
This project is licensed under the MIT License - see the LICENSE file for details.
Built with ❤️ using:
- .NET 8.0 and .NET 9.0
- System.Threading.Channels for backpressure
- xUnit for testing
- BenchmarkDotNet for performance validation
