Skip to content

Latest commit

 

History

History
526 lines (378 loc) · 16.1 KB

File metadata and controls

526 lines (378 loc) · 16.1 KB

Pipelean Reference Documentation

Overview

functional.js is an async programming library that provides core utilities for handling asynchronous operations, error management, data transformations, and functional composition patterns in JavaScript. The library is designed with a pragmatic philosophy: avoid heavy abstractions, use eager execution, and provide clear, predictable error handling.

Key Principles

  • Pragmatic: Plain JavaScript, eager execution, sequential processing.
  • First class error handling: Multiple error strategies for different use cases

Table of Contents

Error strategies

  • failFast - Error Strategy Identifier (aliases: stopOnError, fail)
  • throw_ - Error Strategy Identifier (throws on first error)
  • failLate - Error Strategy Identifier
  • collect - Error Strategy Identifier
  • skip - Error Strategy Identifier
  • Callbacks - Error handling callbacks

Iterators

Horizontal

  • series - Stateless Sequential Execution
  • scan - Stateful Sequential Transformation
  • filter - Stateless Selection

Composition

  • pipe - Vertical Composition

Misc

  • retry - Configurable Retry Logic
  • tryCatch - Single Function Lifecycle Hooks

Error strategies

failFast

Purpose: Error strategy identifier that stops immediately on the first error.

Alias: stopOnError

Use when: Critical operations where failure means the entire pipeline is invalid.

Behavior:

  • Sets failure: {item, error, index} on first error
  • Calls onError({item, error, index, total}), then onFailure({item, error, index}) immediately
  • Stops iteration

Return Format: {results: [], errors: [], failure: {item, error, index}}

Example:

import {series, failFast} from 'pipelean'

const result = await series([1, 2, 3], async item => {
  if (item === 2) throw new Error('Error')
  return item * 2
}, {strategy: failFast})

// result = {results: [], errors: [], failure: {item: 2, error: Error(...), index: 1}}

throw_

Purpose: Error strategy identifier that throws the error immediately on the first failure. Does NOT return a structured result on failure.

Use when: "Let it crash" / fail-early patterns where the caller is expected to handle errors externally (e.g. via try/catch).

Behavior:

  • Throws the error on first failure
  • Does NOT call onFailure
  • Does NOT call onError in series

Return Format: On success: {results, errors: [], failure: false}

Example:

import {series, throw_} from 'pipelean'

try {
  const result = await series([1, 2, 3], async item => {
    if (item === 2) throw new Error('Error')
    return item * 2
  }, {strategy: throw_})
} catch (error) {
  // error = Error('Error')
}

collect

Purpose: Error strategy identifier that collects all errors and continues processing (default for series and filter).

Use when: Batch operations, logging scenarios, background tasks.

Behavior:

  • Collects all errors in errors array
  • Sets failure: false
  • Does NOT call onFailure

Return Format: {results, errors: [{item, error, index}], failure: false}

Example:

import {series, collect} from 'pipelean'

const result = await series([1, 2, 3], async item => {
  if (item === 2 || item === 4) throw new Error('Error')
  return item * 2
}, {strategy: collect})

// result = {results: [2, 6], errors: [{item: 2, error: ..., index: 1}], failure: false}

failLate

Purpose: Error strategy identifier that collects all errors and returns failure: {errors} at the end.

Use when: Application-layer needs to detect if any error occurred.

Behavior:

  • Collects all errors in errors array
  • Sets failure: {errors} after loop completes (only if errors.length > 0)
  • Calls onFailure({errors}) if failure is truthy

Return Format: {results, errors: [{item, error, index}], failure: {errors}} (if any errors occurred)

Example:

import {series, failLate} from 'pipelean'

const result = await series([1, 2, 3], async item => {
  if (item === 2 || item === 4) throw new Error('Error')
  return item * 2
}, {strategy: failLate})

// result = {results: [2, 6], errors: [{item: 2, error: ..., index: 1}], failure: {errors}}

skip

Purpose: Error strategy identifier that ignores errors entirely (no collection), but onError is still called if present.

Use when: Best-effort processing, some failures are acceptable.

Behavior:

  • Ignores errors (no collection, errors stays empty)
  • Sets failure: false
  • Does NOT call onFailure

Return Format: {results, errors: [], failure: false}

Example:

import {series, skip} from 'pipelean'

const result = await series([1, 2, 3], async item => {
  if (item === 2) throw new Error('Error')
  return item * 2
}, {strategy: skip})

// result = {results: [2, 6], errors: [], failure: false}

Callbacks

onError

Optional callback for verification/telemetry (logging, metrics).

  • Called for every handled item error in series
  • Does NOT affect control flow
  • Receives {item, error, index, total}; total is omitted when unknown
  • Use for: logging, metrics, external error reporting
await series(items, fn, {
  strategy: skip,
  onError: ({item, error}) => console.error('Error:', item.id, error.message)
})

onFailure

Optional callback for application-layer error handling (UI updates, notifications).

  • Called when failure is truthy
  • Depends on strategy:
    • failFast: called with {item, error, index}
    • failLate: called with {errors}
    • collect / skip: NOT called (failure is false)
await series(items, fn, {
  strategy: failLate,
  onFailure: (failure) => {
    if (failure.errors) {
      showToast('Some items failed')
    } else {
      showToast(`Item ${failure.item} failed: ${failure.error.message}`)
    }
  }
})

Iterators

series

Purpose: Stateless sequential execution over an iterable. Runs a function on each item, one at a time.

Type: (items, fn, opts?) => Promise<Outcome> — immediate mode Curried: (fn, opts?) => (items) => Promise<Outcome>

Parameters:

  • items: An iterable (array, async iterable, generator)
  • fn(item, index): The function to apply. Returns the mapped value, or throws, or returns undefined to drop the item.
  • opts (optional):
    • strategy: Error strategy (default: collect). failFast, collect, failLate, skip, throw_.
    • onProgress({item, result, index, total}): Called after each successful item. NOT called for errors or undefined drops.
    • onError({item, error, index, total}): Called for each handled item error. Does not affect control flow.
    • onFailure(failure): Called when failure is truthy. Receives {item, error, index} for failFast, {errors} for failLate.
    • take: Limit items processed.
    • total: Explicit planned input count for progress/error callbacks. If omitted, series uses a cheap known input size when available. If take is set, callback total is limited to Math.min(take, knownTotal). If no total is known, the total key is omitted.
    • pause: Milliseconds between successful items.
    • pauseOnErrors: Whether to also pause after errors (default: false).

Return: {results, errors, failure} where failure is false on success. Collected errors are {item, error, index}.

Usage Example:

import { series, failFast } from 'pipelean'

// Basic: double each number
const { results } = await series([1, 2, 3], x => x * 2)
// results = [2, 4, 6]

// With failFast: stop on first error
const result = await series(items, fn, { strategy: failFast })

// With pause for rate limiting
const result = await series(endpoints, fn, { pause: 500 })

// With UI progress
await series(items, saveItem, {
  onProgress: ({index, total}) => updateProgress(index + 1, total),
  onError: ({item, error}) => reportItemError(item.id, error),
})

// Curried: create a reusable transform
const double = series(x => x * 2)
const { results } = await double([1, 2, 3])

// Integration with pipe for filtering
import { pipe } from 'pipelean'
const { results } = await series(items, pipe(
  x => x.active ? x : undefined,  // drop inactive
  x => x.name,                     // extract name
))

scan

Purpose: Stateful sequential transformation - transforms each item and accumulates results.

Type: (iterable, scanner, initialValue) => scanFunction

Parameters:

  • iterable: An async iterable (array, generator, or any object implementing the iteration protocol)
  • scanner: A function with signature (accumulator, item, index) => newAccumulator
  • initialValue: The starting value for the accumulator

Return Type: A Promise that resolves to an object containing:

  • results: Array of intermediate results (or [] on failFast failure)
  • errors: Array of errors encountered (empty for failFast, skip, throw)
  • failure: false on success; {item, error, index} for failFast; {errors} for failLate
  • value: Final accumulated value when storePartialResults: false (only on success)

Key Characteristics:

  • Stateful: Each transformation depends on the previous result
  • Accumulates: Both successful results and errors for inspection
  • Index Tracking: Provides index of each item for correlation

Usage Example:

import { scan } from './functional.js'

// Track insertions in a database
const { results, errors } = await scan(
  async records,
  async (acc, record) => {
    const inserted = await db.insert(record)
    return acc + inserted  // Accumulate count
  },
  0  // Initial count
)

filter

Purpose: Stateless selection tool - filters items from an iterable based on a predicate function. Delegates to series internally: the predicate is converted to a transform that returns the original item (keep) or undefined (drop).

Type: (...args) => Promise<Outcome> | filterFunction

Parameters:

  • predicate: A function (item, index) => truthy | falsy, or a plain object pattern (converted via where())
  • items: The iterable to filter (immediate mode)
  • opts (optional): Options passed through to series

Options: Same as seriesstrategy, onError, onFailure, take, total, onProgress, pause, pauseOnErrors.

Return Type: { results, errors, failure } — same shape as series:

  • results: Original items where the predicate returned truthy
  • failure: false on success (no errors), {item, error, index} for failFast, {errors} for failLate

Key Characteristics:

  • The predicate's return value is never placed into results — only truthiness is checked, and the original item is what gets kept or dropped.
  • Pattern objects are supported: filter({active: true}, users) works via where().

Usage Example:

import { filter } from 'pipelean'

const adults = await filter(
  user => user.age >= 18,
  users,
)
// result.results = [user1, user3, ...] — original items, not predicate output

Composition

pipe

Purpose: Pipelean operation composer - chains functions left-to-right and preserves Pipelean's undefined drop signal.

Type: (...fns) => (input) => Promise<ReturnType<LastFn>>

Parameters:

  • Variadic arguments: Any number of sync or async functions to execute sequentially
  • input: The initial value passed to the first function

Return Type: A Promise that resolves to the final result.

Key Characteristics:

  • Functions execute left-to-right (first argument is applied to input)
  • Output of one function becomes input to the next
  • Supports both synchronous and asynchronous functions
  • Natural data flow from input through transformations
  • Designed for composing reusable operations passed to series() or used directly
  • Undefined Short-Circuit: If any step returns undefined, remaining steps are skipped and undefined is returned. This enables selection within a composed operation — see series drop behavior.

Usage Example:

import { pipe } from './functional.js'

const normalizeUser = pipe(
  async id => validateUserId(id),
  async id => fetchUser(id),
  user => user.active ? user : undefined,
  user => ({...user, email: user.email.toLowerCase()}),
)

const user = await normalizeUser(userId)

// Compose operations in a readable pipeline
const result = await pipe(
  async data => validate(data),
  async data => transform(data),
  async data => persist(data),
)(input)

Best Practice: Use pipe() when you need to chain operations that form a coherent data processing pipeline.

Selection in pipe (via undefined short-circuit):

import { pipe, series } from 'pipelean'

// Merge filter and transform in a single operation
const result = await series(items, pipe(
  x => x.active ? x : undefined,  // drop inactive items
  x => x.name,                     // extract name
))
// Items where active is false are skipped entirely

Misc

where

Purpose: Creates a predicate function for strict equality object matching. Primarily used with filter for pattern-based selection.

Type: (pattern) => (item) => boolean

Parameters:

  • pattern: A plain object with key-value pairs to match against.

Return Type: A predicate function.

Usage Example:

import { where, filter } from 'pipelean'

const isAdmin = where({role: 'admin'})
const admins = await filter(isAdmin, users)

// Or use pattern directly with filter (curried or immediate)
const adults = await filter({active: true}, users)

retry

Purpose: Retry async functions with configurable attempts and delays between attempts.

Type: (fn, options) => retryFunction

Parameters:

  • fn: The async function to retry (required)
  • options: Configuration object with the following properties:
    • attempts: number (default: 3) - Number of retry attempts
    • delay: number (default: 0) - Delay between retry attempts in milliseconds

Behavior:

  • Retries on each attempt until successful or exhausted
  • Throws the last error after exhausting all attempts
  • No delay before first attempt
  • Applies configured delay between subsequent attempts

Usage Example:

import { retry } from './functional.js'

// Retry with default 3 attempts and 500ms delay
const result = await retry(
  async flakyOperation() => {
    return Math.random() > 0.5 // Simulate 50% failure rate
  },
  {
    attempts: 5,
    delay: 1000
  }
)

tryCatch

Purpose: Wraps individual async functions with lifecycle hooks for comprehensive error handling and telemetry. Always catches errors — returns null on failure.

Type: (fn, options) => wrapperFunction

Parameters:

  • fn: The async function to wrap
  • options: Configuration object with the following properties:
    • onStart: () => void — Called before function execution (no arguments)
    • onSuccess: (result) => void | Promise<void> — Called on successful completion (result only)
    • onError: (error) => void — Called on error (error only)
    • onFinally: () => void — Called regardless of success/failure (no arguments)

Return Type: Returns a wrapper function with the same signature as fn. Returns null on error.

Features:

  • Automatic async/await wrapping
  • Preserves original function signature
  • Comprehensive lifecycle: onStart → onSuccess/onError → onFinally
  • Deep telemetry support for debugging

Usage Example:

import { tryCatch } from 'pipelean'

const safeFetch = tryCatch(
  async (url) => {
    const response = await fetch(url)
    return await response.json()
  },
  {
    onError: (error) => {
      console.error('Fetch failed:', error)
    }
  }
)

const data = await safeFetch('https://api.example.com')
// data is the parsed JSON on success, null on error

Related Documentation

  • README.md - Project overview and philosophy
  • guide.md - Comprehensive development guide with examples
  • examples.md - Practical usage examples for all functions