パッケージの詳細

funstream

iarna400ISC4.2.0

Funstream gives you iteratorish methods on your streams.

readme

funstream

Funstream gives you iteratorish methods on your streams.

const fun = require('funstream')

/***** USAGE EXAMPLES *****/

// Make a stream, fun, then run some list comprehensions on it
fun(boringStream)
  .map(n => n + 1)
  .filter(n => n % 2)
  .map(n => `${n}\n`)
  .pipe(process.stdout)
// it's still a stream all the way through so each chunk is processed as its read

// funstreams are also promises, which is particularly useful when
// collection results from something like reduce
fun(boringStream)
  .map(n => n + 1)
  .filter(n => n % 2)
  .reduce((a, b) => a + b)
  .then(console.log)

/***** CONSTRUCTING *****/

// We can make just about anything a funstream
// readable streams…
fun(process.stdin)

// Promises of streams... which means you can immediately attach your body stream processing and it'll
// start getting data once the fetch completes.
fun(fetch('https://example.com').then(r => r.body))

// generators…
function * mygen () {
  for (let ii = 0; ii < 10000; ++ii) {
    yield ii
  }
}
fun(mygen())

// async generators
async function * myasyncgen () {
  for (let ii = 0; ii < 10000; ++ii) {
    yield ii
  }
}
fun(myasyncgen())

// and on node 10, you can iterate a funstream
for await (let value of fun(…)) {
  …
}

// arrays
fun([1, 2, 3, 4])

// even writable streams, which can be treated as promises to see when they resolve or reject
fun(writestream)
  .then(() => console.log('finished!'))
  .catch(err => console.error('stream error', err)

// fun writable streams are streams, continue to be writable streams even
// though you can use them likes promises
process.stdin.pipe(fun(writestream))
  .then(() => console.log('done!'))

// fun streams can also be piped into, handy when the source stream isn't a
// standard Node.js stream, but does implement the pipe interface.
process.stdin.pipe(fun())
  .map(str => transformStr(str))
  .pipe(process.stdout)

// You can bundle up a series of transforms into single ttransform stream
const mytransformStream =
  fun(stream => stream.map(str => str.toUpperCase).
                       flatMap(v = [v, v]))

/***** ADDED METHODS *****/

// Fun functions can be sync…
.map(str => str.slice(10))

// Fun functions can be async…
.map(async str => (await transformStr(str)).slice(10))

// Fun functions can be promise returning…
.map(str => transformStr(str), {async: true})

// Fun functions can be callback using…
.map((str, cb) => transformStrCB(str, cb), {async: true})

Funstream makes object streams better.

Funstream constructors

fun(readableStream[, opts]) → FunStream

This is probably what you want.

Makes an existing stream a funstream! Has the advantage over fun() of handling error propagation for you. All funs are promises of their completion too, so you can await or .catch your stream.

opts is an optional options object. The only option currently is async which let's you explicitly tell Funstream if your callbacks are sync or async. If you don't include this we'll detect which you're using by looking at the number of arguments your callback takes. Because promises and sync functions take the same number of arguments, if you're using promise returning callbacks you'll need to explicitly pass in async: true.

fun(callback[, opts]) → FunStream

This lets you bundle a fun-stream pipeline up into a single transform stream that you might pass to something else. The callback receives a FunStream as its only argument, chain off of that as you like and return the result. The stream returned by fun() will write to that first FunStream and read from the end of your chain. (With the usual error propagation.)

fun(writableStream[, opts]) → PromiseStream

Writable streams can't be fun per se, since being fun means having iterators. What we can do is make them resolvable as promises, with an extra feature or two.

fun(array[,opts]) → FunStream

Returns a funstream that will receive entries from the array one at a time while respecting back pressure.

fun(string[,opts]) → FunStream

Returns a funstream that will receive entries from the array one at a time while respecting back pressure.

fun(iterator[,opts]) → FunStream

Returns a funstream that will receive values from the iterator one at a time while respecting back pressure.

fun(asyncIterator[,opts]) → FunStream

Returns a funstream that will receive values from the async iterator one at a time while respecting back pressure.

fun(promise[,opts]) → FunStream

Returns a funstream that will consume the result of the promise exactly as the equivalent plain value would be. No data will be emitted until the promise resolves. If it rejects it will be propagated as an error in the usual ways.

These are special, because the returned object only lazily becomes a stream. If you treat it as a promise then no stream construction occurs.

fun([opts]) → FunStream

Make a passthrough Funstream. You can pipe into this to get access to our handy methods.

fun.with(todo[, opts]) → FunStream

For those times when you want to create a stream from nothing, or at least, from a non-stream source, fun.with provides any easy interface for doing that.

Pass it a function and you'll get a stream you can write to. You can close it off by just resolving your promise which is particularly convenient when your function is declared async.

Constructs a passthrough funstream and passes it as an argument to todo. todo is a function that returns a Promise. When the Promise resolves the stream will end.

const sleep = (ms, ...args) => new Promise(_ => setTimeout(_, ms, ...args))
const result = await fun.with(async st => {
  for (let count = 0; count < 5; ++count) {
    st.write(count)
    await sleep(100)
  }
}).list() // [ 0, 1, 2, 3, 4 ]
// demonstrating identity here, you wouldn't want to do this:
let a
let b = fun.with(async st => { a = st })
a === b // true

fun.FunStream

Exactly the same as stream.PassThrough but with fun added. fun() is mostly the same as new fun.FunStream(). (The former will use Bluebird for promises if available but fallback to system promises. The latter has no magic and just uses system promises.)

require('funstream/fun-stream').mixin

The core extension mechanism (otherwise unneeded). It adds fun to an existing class or object. Classes that have fun mixed in need to also call FunPassThrough.funInit.call(this, opts) in their constructors.

Funstream and Pipelines

Contrary to ordinary, BORING streams, we make sure errors are passed along when we chain into something. This applies when you .map or .filter but it ALSO applies when you .pipe.

PromiseStream methods

.fun.finished() → Promise

Available on Writable promise streams, the returned Promise will resolved when the stream emits a finish event. The promise will be rejected if the stream emits an error event.

If the stream emits a result event then the stream will resolve with that value.

.fun.closed() → Promise

Available on Writable promise streams, the returned Promise will resolved when the stream emits a close event. The promise will be rejected if the stream emits an error event.

NOTE: Not all streams emit a close event and if you use this on a stream that does not then it will never resolve.

.async() → this

.sync() → this

Sets the stream async stream option to true and false respectively.

.async(todo) → FunStream

.sync(todo) → FunStream

Runs todo with a stream with the appropriate async option set. The returned value is restored to the previous setting.

fun([1,2,3])
  .filter(async n => n > 0)
  .sync(str => str.filter(n => n < 3).map(n => n * 2))

.fun.ended() → Promise

Returns a Promise that resolves when the stream emits an end event. If the stream emits an error event then it will reject.

.fun.writable() → Promise(Boolean)

With Node >= 11.4.0:

Returns a promise that resolves when the stream is writable to ensure you don't bloat out the buffers of a stream that is slow to consume data:

await stream.fun.writable()
stream.write('my chunk of data')

Under the hood what this does is check the writable flag introduced in 11.4, if that's true then it just resolves, if false it attaches itself to the drain event and resolves when that happens.

With Node < 11.4.0:

Prior to 11.4, you have to track writable status yourself, but you can still use fun.writable() to be notified when the stream is ready for more data.

DANGER: Be sure that the stream is NOT writable before calling fun.writable or the promise may never resolve (because the underlying stream won't emit a drain event.)

const writable = stream.write('my chunk of data')
if (!writable) await stream.fun.writable()

FunStream methods

This is the good stuff. All callbacks can be sync or async. You can indicate this by setting the async property on the opts object either when calling the method below or when constructing the objects to start with. Values of the async property propagate down the chain, for example:

.map(…, {async: true}).map(…)

The second map callback will also be assume do to be async.

Multiple sync functions of the same time will be automatically aggregated without constructing additional streams, so:

.filter(n => n < 23).filter(n => n > 5)

The second filter call actually returns the same stream object. This does mean that if you try to fork the streams in between it won't work. Sorry.

.pipe(target[, opts]) → FunStream(target)

Like an ordinary pipe, but funerer. In addition mutating the target into a funstream we also forward errors to it.

.head(numberOfItems) → FunStream

Will only forward the first numberOfItems down stream. The remainder are ignored. At the moment this does not end the stream after the numberOfItems limit is hit, but in future it likely will.

fun(stream)
  .head(5)
  .forEach(item => { // only sees the first five items regardless of how long the stream is.
  })

.filter(filterWith[, opts]) → FunStream

Filter the stream!

  • filterWith(data) → Boolean (can throw)
  • filterWith(data, cb) (and cb(err, shouldInclude))
  • `filterWith(data) → Promise(Boolean)

If filterWith returns true, we include the value in the output stream, otherwise not.

.map(mapWith[, opts]) → FunStream

Transform the stream!

  • mapWith(data) → newData (can throw)
  • mapWith(data, cb) (and cb(err, newData))
  • `mapWith(data) → Promise(newData)

data is replaced with newData from mapWith in the output stream.

.mutate(mutateWith[, opts]) → FunStream

stream.mutate(data => {…}) is sugar for stream.map(data => {…; return data})

.flat([opts]) → FunStream

Flattens arrays in the streams into object emissions! That is to say, a stream of two objects:

[1, 2, 3], [23, 42, 57]

Will become a stream of six objects:

1, 2, 3, 23, 42, 57

This is implemented as stream.flatMap(v => v, opts)

.flatMap([opts]) → FunStream

Transform all the stream elements and flatten any return values. This is the equivalent of:

stream.map(…).flat()

Only without multiple phases.

.lines([opts]) → FunStream

Parse the input stream into lines, emitting one line per chunk. Newlines are removed.

.ndjson([opts]) → FunStream

.fromNdjson([opts]) → FunStream

Parse the input stream as newline delimited JSON, emitting one parsed JSON object per line. Empty lines are ignored.

.toNdjson([opts]) → FunStream

Take an input object stream and emit as newline delimited JSON. Sugar for:

stream.map(_ => JSON.stringify(_) + '\n', opts)

.sort(sortWith[, opts]) → FunStream

WARNING: This has to load all of your content into memory in order to sort it, so be sure to do your filtering or limiting (with .head) before you call this. This results in a funstream fed from the sorted array.

sortWith(a, b) → -1 | 0 | 1 – It's the usual sort comparison function. It must be synchronous as it's ultimately passed to Array.sort.

Sort a stream alphabetically:

fun(stream)
  .sort((a, b) => a.localeCompare(b))

.grab(grabWith[, opts]) → FunStream

WARNING: This has to load all of your content into memory in order to sort it, so be sure to do your filtering or limiting (with .head) before you call this. This results in a funstream fed from the sorted array.

grabWith is a synchronous function. It takes an array as an argument and turns the return value back into a stream with fun(). The array is produced by reading the entire stream, so be warned.

For example, sort can be implemented as:

function sortStream (st) {
  return st.grab(v => v.sort(sortWith))
}

It makes it easy to apply array verbs to a stream that aren't otherwise supported but it does mean loading the entire stream into memory.

It's the equivalent of fun(grabWith(await stream.list()))

.list([opts]) → FunStream

Alias: .collect(opts)

Promise an array of all of the values in the stream. Let's you do things like…

const data = await fun().map(…).filter(…).list()

It's just sugar for: reduceToArray((acc, val) => acc.push(val), opts)

.concat([opts]) → FunStream

Promise a string produced by concatenating all of the values in the stream.

.json([opts]) → PromiseStream

.fromJson([opts]) → PromiseStream

Promise an object produced by JSON parsing the result of .concat(). Sugar for:

stream.concat().then(str => JSON.parse(str))

.toJson([opts]) -> PromiseStream

Given a stream of objects, produces a JSON stringified array of them. Sugar for:

stream.grab(_ => JSON.stringify(_), opts)

.reduce(reduceWith[, initial[, opts]]) → FunStream

Promise the result of computing everything.

  • reduceWith(acc, value) → acc (can throw)
  • reduceWith(acc, value, cb) (and cb(err, acc))
  • `reduceWith(acc, value) → Promise(acc)

Concat a stream:

fun(stream)
  .reduce((acc, value) => acc + value)
  .then(wholeThing => { … })

The return value is also a stream, so you can hang the usual event listeners off it. Reduce streams emit a result event just before finish with the final value of the accumulator in the reduce.

.reduceToArray(reduceWith[, opts]) → FunStream

Promise the result of reducing into an array. Handy when you want to push on to an array without worrying about your return value. This is sugar for:

fun(stream)
  .reduce((acc, value) => { reduceWith(acc, value) ; return acc }, [])

.reduceToArray(reduceWith[, opts]) → FunStream

Promise the result of reducing into an array. Handy when you want to build an object without worrying about your return values. This is sugar for:

fun(stream)
  .reduce((acc, value) => { reduceWith(acc, value) ; return acc }, {})

.forEach(consumeWith[, opts]) → PromiseStream

Run some code for every chunk, promise that the stream is done.

Example, print each line:

fun(stream)
  .forEach(chunk => console.log(chunk)
  .then(() => console.log('Done!'))

As with reduce streams the return value from forEach is both a promise and a stream.

Benchmarks

map: fun sync565 ops/s
map: fun async (cb)454 ops/s
map: stream.Transform403 ops/s
map: through2311 ops/s
map: fun async (async/await)304 ops/s
map: fun async (new Promise)237

更新履歴

4.2.0

  • Add stream.fun.writable() to get a promisey way of asserting a stream is ready for data
  • Add stream.toJson() which takes a series of objects and emits them as a JSON stringified array.
  • Add stream.toNdjson() which takes a series of objects and emits them as new-line delimited JSON.
  • Add stream.fromJson() which is the same as stream.json()
  • Add stream.fromNdjson() which is the same as stream.ndjson()
  • Fixed fun.finished and fun.ended, which previously would refuse to run.

4.1.1

  • Fix stream.lines() (and in turn stream.ndjson()) to support CRLF terminated lines.
  • Fix a bug where if multiple chunks were emitted without newlines end them and then the stream closed, they would be emitted with newlines injected between them (but as a single "line").

4.1.0

  • Add support for stream.lines() to split the stream on lines and emit one line at a time.
  • Add support for stream.ndjson() to parse the stream as newline delimited JSON and emit one object at a time.

4.0.0

  • Our recently introduced closed/finished/ended methods stomped on some stream implementation's use of them as properties. Change the API to put them in a safer place.

3.2.0

  • Fix crash when a stream object's pipe method was readonly. (For example, with minipass.)
  • Improve stream ducktyping to support minipass based streams.
  • Fix error in reduceTo family of functions when promise returning reductions, where we were running all the reductions in parallel rather than waiting for them to complete individually.
  • Guard against crashes when promises are rejected with null/undefined values.
  • Improve error messaging on invalid arguments.
  • Add support for fun(123n) working like fun(123)

3.1.0

  • Fix bug where, when mixing in promise-streams (when upgrading a regular stream to a funstream), the provision of ended versus finished methods was swapped.
  • Add support for async generators (thank you Node 10!!)
  • Add stream.mutate(), which acts like stream.map() but doesn't do anything with the return value of the callback. stream.mutate(_ => { ... }) is equiv of stream.map(_ => { ...; return _})
  • Add stream.collect() as an alias for stream.list()
  • Add fun.with(cb) constructor that passes the callback a stream that's ended when the promise returned by the callback is resolved. This makes manual writing to a stream far more convenient.
  • Add stream.json() convenience method, equiv of stream.concat().then(_ => JSON.parse(_))
  • Fix error propagation in stream.reduce().
  • Guard iterator/thenable type checking when given symbols. This would caused a crash if you constructed a funstream from a Symbol. Weird, but shouldn't crash.

3.0.0

I'm doing a 3.0.0 to backout the feature added in 2.3.0 where Readable streams were thenable.

This turns out to be very unfun. Specifically it means that async functions can't return fun-streams, something I very much want to do.

Because it's still sometimes nice to get a promise in these circumstances, you can ask for one with a few ways:

.finished() is available on writable and duplex streams and resolves when the stream emits finish. Is is a no-op on writable streams, as they already are a promise that resolves when the stream emits finish.

.ended() is available on readable and duplex streams and resolves when the stream emits end.

.closed() is available on writable and duplex streams and resolves when the stream emits close. Note that not all streams emit close and if this is one of those then the promise will never resolve.

2.6.1

  • Revert: Patch to preserve funopts confuses itself.

2.6.0

  • Feature: Lazily convert promises to streams. If you funify a promise and use as a promise, no stream infrastructure will be created. (This is important for promise-returning fun functions.)
  • Feature: Funify writable promise-streams as if they were promises. This means that if they a value that value will be preserved, and that means that fun().reduce().forEach() will work, for instance.
  • Feature: forEach on array-streams now has a synchronous fast-path implementation, where we ignore the stream entirely and just loop over the in memory array.
  • Tests: Many, MANY, were added. A number of files are at 100% coverage now.
  • Fix: Async reduce streams previously would finish early and given incomplete results.
  • Fix: The async reduceTo impelementation previously did not work at all.
  • Fix: Reduce streams weren't copying along options from their parents. This only matters if you chain off of them as a fun-stream and not a promise.
  • Fix: .concat() will now work with streams w/ elements that can't be added to strings. (eg, Symbols)
  • Fix: If you pass an existing readable fun-stream to fun we used to just return it. We still do that if you provide the same options as the fun-stream, if you didn't we pipe into a new one with your options.
  • Fix: .grab() now returns a promise-stream.
  • Fix: Sugar like .concat() would previously fail if you'd explictly set the mode to .async(). This is now fixed.

2.5.1

  • Readme improvements

2.5.0

  • Strings can now be fun too.
  • Also Buffers.
  • Improve stream duck typing
  • Really, truely, all things are also promises now.

2.4.0

  • Add pipe-chain bundling in the form of fun(stream => stream.pipe.chain).

2.3.2

  • Fix async flatMap

2.3.1

  • Eep actually include flat-map-stream in the artifact

2.3.0

New features!

  • All streams are promises: fun(mystream).then(…) will wait wait for your stream to end in the case of read streams or finish in the case of write and duplex streams. There's no overhead to this: No promise is constructed if you don't call a promise method on the resulting objects. If you want a Bluebird (or other, non-native) promise implementation you can pass one in as an option fun(mystream, {Promise: require('bluebird'})
  • flat: emit each object of an array as a new element of the stream
  • flatMap: transform the input and apply flat as above to the output
  • list: sugar for turning the stream into an array
  • grab: opperate on the entire stream as an array while still chaining back into a stream
  • concat: returns the entire stream as a single string

Improvements!

  • forEach: sync calls have a fast-path now
  • All module loading that can be lazy, now is lazy, for faster loads and less memory use.

Bug fixes!

  • The async stream reducer was entirely broken. Now fixed.

2.2.0

  • Writable streams are now fun! Fun a writable stream and get a promise/stream hybrid. The promise part will resolve when the stream finishes, or rejects when it errors. The stream part is still a perfectly ordinary write stream with all the usual features.
  • Errors aren't fun, but they now at least cop to who was to blame for them in error.src.
  • Generators can be fun! Fun a generator to get a fun stream of values.

  • Bug fix: forEach fun now gets option choices and those your choice of promises.

  • More symbols are more fun. isFun property is now Symbol(ISFUN) and no longer polutes your namespaces.