Package detail

prom-utils

smartprocure2.5kISC0.17.0

Promise utilities: rate limiting, queueing/batching, defer, etc.

async, promise, delay, rate

readme

Prom-Utils

Promise utilities designed for handling asynchronous operations and controlling throughput in JavaScript/TypeScript applications.

Table of Contents

Installation

npm install prom-utils

API Reference

rateLimit

Limits the concurrency of promises. This can be used to control how many requests are made to a server or API at once.

Note: Exceptions will be swallowed internally to prevent UnhandledPromiseRejection errors when promises reject before the limit is reached. Handle exceptions on a per-promise basis.

Parameters

  • limit: number - Maximum number of concurrent promises (set to Infinity to disable)
  • options: RateLimitOptions & ThroughputLimiterOptions - Configuration options

Options

The rateLimit function accepts two types of options:

interface RateLimitOptions {
    /**
     * Maximum throughput allowed (items/period). Defaults to items/sec.
     */
    maxItemsPerPeriod?: number
}

Since rateLimit internally uses throughputLimiter, it also accepts all options from ThroughputLimiterOptions. Below are the options for ThroughputLimiterOptions with the defaults used for rateLimit.

interface ThroughputLimiterOptions {
    /**
     * The period of time in ms to track the rate. Set to 60000 for 1 minute.
     * Defaults to 1000, which is units/sec.
     */
    period?: number
    /**
     * The minimum number of throttle invocations prior to checking the rate.
     * Use this to allow for short bursts without throttling.
     * Should be 1 or more. Defaults to 1.
     */
    minWindowLength?: number
    /**
     * The maximum number of throttle invocations to hold in memory.
     * Should be 1 or more. Defaults to maxItemsPerPeriod.
     */
    maxWindowLength?: number
    /**
     * Expire throttle invocations after this many ms.
     * Defaults to the period.
     */
    expireAfter?: number
    /**
     * The timeframe to use for calculating the rate.
     * Defaults to getTimeframeUsingPeriod.
     */
    getTimeframe?: GetTimeframe
}

Returns

{
    /**
     * Add a promise. Waits for one promise to resolve if limit is met or for
     * throughput to drop below threshold if `maxItemsPerPeriod` is set.
     * Optionally, set `bypass` to true to bypass async waiting.
     */
    add: (prom: Promise<T>, options?: AddOptions) => Promise<void>
    /**
     * Wait for all promises to resolve
     */
    finish: () => Promise<void>
    /**
     * Number of pending promises.
     */
    length: number
    /**
     * Get current rate statistics
     */
    getStats: () => {
        itemsPerPeriod: number
    }
}

Example

const limiter = rateLimit(5, { maxItemsPerPeriod: 75, period: 60000 }) // 5 concurrent, max 75 per minute

for (const url of urls) {
    // Will wait for one promise to finish if limit is reached
    await limiter.add(fetch(url))
}
// Wait for unresolved promises to resolve
await limiter.finish()

batchQueue

Batches calls via a local queue. This can be used to accumulate values before writing to a database or making API calls.

Parameters

  • fn: (arr: A[]) => B - Function to call with batched items
  • options: QueueOptions - Configuration options

Options

interface QueueOptions {
    /**
     * Wait for the batch to reach this number of elements before flushing the queue.
     * Defaults to 500.
     */
    batchSize?: number
    /**
     * Wait for the batch to reach this size in bytes before flushing the queue.
     */
    batchBytes?: number
    /**
     * Wait this long in ms before flushing the queue.
     */
    timeout?: number
    /**
     * Maximum throughput allowed (items/sec).
     * Defaults to Infinity.
     */
    maxItemsPerSec?: number
    /**
     * Maximum throughput allowed (bytes/sec).
     * Defaults to Infinity.
     */
    maxBytesPerSec?: number
}

Returns

{
  /**
   * Call fn with the items in the queue.
   */
  flush: () => Promise<void>
  /**
   * Add an item to the queue. When a queue condition is met flush will be called.
   */
  enqueue: (item: A) => Promise<void>
  /**
   * The last result returned from calling fn.
   */
  lastResult?: Awaited<B>
  /**
   * The cause for the last automatic queue flush. Will be one of:
   * timeout, batchSize, or batchBytes.
   */
  lastFlush?: LastFlush
  /**
   * Get the current throughput rates.
   */
  getStats: () => QueueStats
  /**
   * Length of the queue.
   */
  length: number
}

Example

const writeToDatabase = async (records) => {
    // database write logic here
    return { success: true }
}

const queue = batchQueue(writeToDatabase, {
    batchSize: 250,
    timeout: 5000, // also flush after 5 seconds
    maxItemsPerSec: 1000, // limit to 1000 items per second
})

for (const record of records) {
    await queue.enqueue(record)
}

// Call fn with remaining queued items
await queue.flush()

// Check statistics
console.log(queue.getStats())

batchQueueParallel

Batches calls via a local queue, similar to batchQueue but designed to be safe for concurrent access. This can be used to accumulate values before writing to a database or making API calls when you need to call it from multiple concurrent contexts.

Note: Unlike batchQueue, this function does not support timeout-based flushing or throughput limiting options. It only supports batchSize and batchBytes triggers.

Parameters

  • fn: (arr: A[]) => unknown - Function to call with batched items
  • options: QueueOptionsParallel - Configuration options

Options

interface QueueOptionsParallel {
    /**
     * Wait for the batch to reach this number of elements before flushing the queue.
     * Defaults to 500.
     */
    batchSize?: number
    /**
     * Wait for the batch to reach this size in bytes before flushing the queue.
     */
    batchBytes?: number
}

Returns

{
  /**
   * Call fn with the items in the queue.
   */
  flush: () => void
  /**
   * Add an item to the queue. When a queue condition is met flush will be called.
   */
  enqueue: (item: A) => void
  /**
   * Length of the queue.
   */
  length: number
}

Example

const writeToDatabase = (records) => {
    // database write logic here
    console.log(`Writing ${records.length} records`)
}

const queue = batchQueueParallel(writeToDatabase, {
    batchSize: 250,
    batchBytes: 1024 * 1024, // 1MB
})

// Safe to call from multiple concurrent contexts
await Promise.all(
    records.map(async (record) => {
        // This is safe to call concurrently
        queue.enqueue(record)
    })
)

// Call fn with remaining queued items
queue.flush()

throughputLimiter

Limits throughput by sleeping until the rate (units/period) is less than the maximum limit. Units and period are intentionally abstract since they could represent requests/min, bytes/sec, etc.

Parameters

  • maxUnitsPerPeriod: number - Maximum units allowed per period
  • options: ThroughputLimiterOptions - Configuration options

Options

interface ThroughputLimiterOptions {
    /**
     * The period of time in ms to track the rate. Set to 60000 for 1 minute.
     * Defaults to 1000, which is units/sec.
     */
    period?: number
    /**
     * The minimum number of throttle invocations prior to checking the rate.
     * Use this to allow for short bursts without throttling.
     * Should be 1 or more. Defaults to 1.
     */
    minWindowLength?: number
    /**
     * The maximum number of throttle invocations to hold in memory.
     * Should be 1 or more. Defaults to 3.
     */
    maxWindowLength?: number
    /**
     * Expire throttle invocations after this many ms.
     * Defaults to Infinity.
     */
    expireAfter?: number
    /**
     * The timeframe to use for calculating the rate.
     * Two built-in options: getTimeframeUsingElapsed or getTimeframeUsingPeriod.
     * Defaults to getTimeframeUsingElapsed.
     */
    getTimeframe?: GetTimeframe
}

Returns

{
  /**
   * Get the current rate (units/period).
   */
  getCurrentRate: () => number
  /**
   * Sleep until the rate is below the maximum.
   */
  throttle: () => Promise<void>
  /**
   * Add units to the sliding window.
   */
  append: (numUnits: number) => void
  /**
   * Throttle first, then append.
   */
  throttleAndAppend: (numUnits: number) => Promise<void>
  /**
   * Append first, then throttle.
   */
  appendAndThrottle: (numUnits: number) => Promise<void>
}

Example

// Limit to at most 1000 items/sec
const limiter = throughputLimiter(1000)

for (const batch of batches) {
    // Will wait until the rate is < maxUnitsPerPeriod
    await limiter.throttleAndAppend(batch.length)
    console.log('Current rate: %d items/sec', limiter.getCurrentRate())
}

pausable

Creates a mechanism to pause and resume a loop. When pause is called, maybeBlock will return a promise that resolves when resume is called.

Parameters

  • timeout?: number - Optional timeout in ms to auto-resume

Returns

{
  /**
   * Pause execution when maybeBlock is called.
   */
  pause: () => void
  /**
   * Resume execution.
   */
  resume: () => void
  /**
   * Call in your loop to potentially block execution.
   */
  maybeBlock: () => Promise<void> | undefined
  /**
   * Whether currently paused.
   */
  isPaused: boolean
}

Example

const shouldProcess = pausable()

// In some event handler or condition
onSomeCondition(() => shouldProcess.pause())
onSomeOtherCondition(() => shouldProcess.resume())

// In your processing loop
for (const record of records) {
    await shouldProcess.maybeBlock()
    await processRecord(record)
}

defer

Creates a deferred promise that resolves when done is called.

Returns

{
  /**
   * Resolves the promise when called.
   */
  done: () => void
  /**
   * Promise that resolves when done() is called.
   */
  promise: Promise<void>
}

Example

const delay = (milliseconds: number) => {
    const deferred = defer()
    setTimeout(deferred.done, milliseconds)
    return deferred.promise
}

// Use the delay function
await delay(1000) // Wait 1 second

sleep

Sleep for a specified time before resolving the promise.

Parameters

  • time?: number - Time to sleep in ms, defaults to 0

Returns

  • Promise<void> - Resolves after the specified time

Example

// Sleep for one second
await sleep(1000)

pacemaker

Calls a heartbeat function at regular intervals until a promise resolves or rejects.

Parameters

  • heartbeatFn: () => void - Function to call at intervals
  • promise: Promise<T> - Promise to wait for
  • interval?: number - Interval in ms, defaults to 1000

Returns

  • The value from the resolved promise

Example

const heartbeatFn = () => {
    console.log('Still processing...')
}

const result = await pacemaker(heartbeatFn, longRunningOperation())

waitUntil

Waits until a predicate function returns true or a timeout expires.

Parameters

  • pred: () => Promise<boolean> | boolean - Predicate function
  • options: WaitOptions - Configuration options

Options

interface WaitOptions {
    /**
     * Wait this long in ms before rejecting. Defaults to 5000 ms.
     */
    timeout?: number
    /**
     * Check the predicate with this frequency. Defaults to 50 ms.
     */
    checkFrequency?: number
}

Returns

  • Promise<void> - Resolves when the predicate returns true, rejects if timeout expires

Example

let isReady = false
setTimeout(() => {
    isReady = true
}, 2000)

try {
    await waitUntil(() => isReady, { timeout: 5000 })
    console.log('Ready!')
} catch (error) {
    if (error instanceof TimeoutError) {
        console.log('Timed out waiting for ready state')
    } else {
        throw error
    }
}

raceTimeout

Returns the value of a promise if it resolves before a timeout, otherwise returns the exported TIMEOUT symbol.

Parameters

  • prom: Promise<A> - Promise to race
  • timeout: number - Timeout in ms

Returns

  • Promise<A | typeof TIMEOUT> - Either the promise result or TIMEOUT symbol

Example

const winner = await raceTimeout(someLongOperation(), 5000)

if (winner === TIMEOUT) {
    console.log('Operation timed out')
} else {
    console.log('Operation completed with result:', winner)
}

Error Handling

The library exports two error classes:

  • OptionsError - Thrown when invalid options are provided
  • TimeoutError - Thrown when an operation times out

Example:

import { TimeoutError, waitUntil } from 'prom-utils'

try {
    await waitUntil(() => false, { timeout: 100 })
} catch (error) {
    if (error instanceof TimeoutError) {
        console.log('Timed out:', error.message)
    } else {
        throw error
    }
}

changelog

0.17.0

  • Added batchQueueParallel - A concurrent-safe version of batchQueue that supports batchSize and batchBytes options but not timeout or throughput limiting.

0.16.0

  • Dependencies updated.

0.15.0

  • rateLimit
    • Added maxItemsPerPeriod option along with the options from throughputLimiter.
    • Added bypass option to add.
  • throughputLimiter
    • Added minWindowLength, period, expireAfter, and getTimeframe options.
    • The windowLength option is now maxWindowLength.
    • Separated throttle and append. These two convenience methods were also created: throttleAndAppend and appendAndThrottle.

0.14.0

  • batchQueue - Added lastFlush.

0.13.0

  • Added debug statements for pausable.
  • Bumped dependencies.

0.12.1

  • Fixed type for raceTimeout.

0.12.0

  • Added raceTimeout.

0.11.0

  • Fixed: rateLimit should remove an item if it has rejected.
  • rateLimit - .finish swallows exceptions.
  • rateLimit - Added length getter.
  • batchQueue - Added length getter.
  • BREAKING: pausable.proceed changed to pausable.maybeBlock.

0.10.0

  • Added throughputLimiter - Limit throughput (items/sec).
  • Added sleep - Sleep for time ms before resolving the Promise.
  • batchQueue - Added maxItemsPerSec and maxBytesPerSec options for throughput throttling.
  • Changed testing library from jest to vitest.
  • Bumped packages.

0.9.0

  • Better types - removed all use of any.
  • rateLimit - Changed the return type of the finish method to Promise<void>.

0.8.0

  • Added waitUntil - Wait until the predicate returns truthy or the timeout expires.
  • Changed function signature to use generics for batchQueue.

0.7.0

  • Changed dependency for determining size in bytes of a value for batchQueue.

0.6.1

  • rateLimit - Catch promise exceptions which will be re-thrown in .add or .finish.

0.6.0

  • Added pacemaker.

0.5.0

  • Changed debug code to be more logical.

0.4.0

  • Added batchBytes option to batchQueue which triggers a queue flush if the total size of the queue is greater than or equal to batchBytes.

0.3.0

  • defer
  • pausable

0.2.0

  • batchQueue now accepts an option object which includes batchSize and timeout.
  • A timer will be set if the timeout option is passed when the first item is enqueued. If the timeout occurs before batchSize is reached, flush is called.

0.1.0

  • Initial release.