This is the minimal, dependency free version of scramjet
used as of Scramjet
version 3.0.0 as a base for scramjet
and scramjet plugins.
Unless you are sure, you should be better off with using the main repo and module.
It is built upon the logic behind three well known javascript array operations - namingly map, filter and reduce. This means that if you've ever performed operations on an Array in JavaScript - you already know Scramjet like the back of your hand.
Scramjet uses functional programming to run transformations on your data streams in a fashion very similar to the well known event-stream node module. Most transformations are done by passing a transform function.
It's all about chaining, really - you develop your flow based on a chain of calls that return another method like this:
scramjet.from(someReadableStream) // you can construct your stream any way you like
.map(someMapper) // you can map the objects in the stream
.map(someAsyncAPICall) // you can call an API for each item
.filter(asynchronousFilterOperation) // you can even filter by async function
.catch(errorHandler) // there's built in error handling
.until(doneCondition) // you can stop reading the stream whenever you're done
.toArray(); // you can accumulate
You can write your transforms in three ways:
- Synchronous
Example: a simple stream transform that outputs a stream of objects of the same id property and the length of the value string.
datastream.map(
(item) => ({id: item.id, length: item.value.length})
)
- Asynchronous using ES2015 async await
Example: A simple stream that uses Fetch API to get all the contents of all entries in the stream
datastream.map(
async (item) => fetch(item)
)
- Asynchronous using Promises
Example: A simple stream that fetches an url mentioned in the incoming object
datastream.map(
(item) => new Promise((resolve, reject) => {
request(item.url, (err, res, data) => {
if (err)
reject(err); // will emit an "error" event on the stream
else
resolve(data);
});
})
)
The actual logic of this transform function is as if you passed your function to the then
method of a Promise
resolved with the data from the input stream.
Here's the list of the exposed classes and methods, please review the specific documentation for details:
exports
- module exports explainedscramjet.DataStream
- the base class for all scramjet classes.scramjet.BufferStream
- a DataStream of Buffers.scramjet.StringStream
- a DataStream of Strings.scramjet.MultiStream
- a DataStream of Strings.- more on plugins - a description and link.
Note that:
- Most of the methods take a callback argument that operates on the stream items.
- The callback, unless it's stated otherwise, will receive an argument with the next chunk.
- You can use
async
functions or returnPromise
s wherever you like. - Methods usually return the same class, so are chainable
↺
or are asynchronous⇄
The quick reference of the exposed classes:
A facilitation stream created for easy splitting or parsing buffers.
Useful for working on built-in Node.js streams from files, parsing binary formats etc.
A simple use case would be:
fs.createReadStream('pixels.rgba')
.pipe(new BufferStream) // pipe a buffer stream into scramjet
.breakup(4) // split into 4 byte fragments
.parse(buffer => [
buffer.readInt8(0), // the output is a stream of R,G,B and Alpha
buffer.readInt8(1), // values from 0-255 in an array.
buffer.readInt8(2),
buffer.readInt8(3)
]);
Detailed :BufferStream docs here
Most popular methods:
new BufferStream([opts])
- Creates the BufferStreambufferStream.shift(chars, func) ↺
- Shift given number of bytes from the original streambufferStream.split(splitter) : BufferStream ↺
- Splits the buffer stream into buffer objectsbufferStream.breakup(number) : BufferStream ↺
- Breaks up a stream apart into chunks of the specified lengthbufferStream.stringify([encoding]) : StringStream
- Creates a string stream from the given buffer streambufferStream.parse(parser) : DataStream
- Parses every buffer to objectBufferStream:pipeline(readable) : BufferStream
- Creates a pipeline of streams and returns a scramjet stream.BufferStream:from(stream, [options]) : BufferStream
- Create BufferStream from anything.
DataStream is the primary stream type for Scramjet. When you parse your stream, just pipe it you can then perform calculations on the data objects streamed through your flow.
Use as:
const { DataStream } = require('scramjet');
await (DataStream.from(aStream) // create a DataStream
.map(findInFiles) // read some data asynchronously
.map(sendToAPI) // send the data somewhere
.run()); // wait until end
Detailed :DataStream docs here
Most popular methods:
new DataStream([opts])
- Create the DataStream.dataStream.map(func, [ClassType]) ↺
- Transforms stream objects into new ones, just like Array.prototype.mapdataStream.filter(func) ↺
- Filters object based on the function outcome, just like Array.prototype.filter.dataStream.reduce(func, into) ⇄
- Reduces the stream into a given accumulatordataStream.do(func) ↺
- Perform an asynchronous operation without changing or resuming the stream.dataStream.all(functions) ↺
- Processes a number of functions in parallel, returns a stream of arrays of results.dataStream.race(functions) ↺
- Processes a number of functions in parallel, returns the first resolved.dataStream.unorder(func)
- Allows processing items without keeping orderdataStream.into(func, into) ↺
- Allows own implementation of stream chaining.dataStream.use(func) ↺
- Calls the passed method in place with the stream as first argument, returns result.dataStream.run() ⇄
- Consumes all stream items doing nothing. Resolves when the stream is ended.dataStream.tap() ↺
- Stops merging transform Functions at the current place in the command chain.dataStream.whenRead() ⇄
- Reads a chunk from the stream and resolves the promise when read.dataStream.whenWrote(chunk) ⇄
- Writes a chunk to the stream and returns a Promise resolved when more chunks can be written.dataStream.whenEnd() ⇄
- Resolves when stream ends - rejects on uncaught errordataStream.whenDrained() ⇄
- Returns a promise that resolves when the stream is draineddataStream.whenError() ⇄
- Returns a promise that resolves (!) when the stream is errorsdataStream.setOptions(options) ↺
- Allows resetting stream options.dataStream.copy(func) ↺
- Returns a copy of the streamdataStream.tee(func) ↺
- Duplicate the streamdataStream.each(func) ↺
- Performs an operation on every chunk, without changing the streamdataStream.while(func) ↺
- Reads the stream while the function outcome is truthy.dataStream.until(func) ↺
- Reads the stream until the function outcome is truthy.dataStream.catch(callback) ↺
- Provides a way to catch errors in chained streams.dataStream.raise(err) ⇄
- Executes all error handlers and if none resolves, then emits an error.dataStream.bufferify(serializer) : BufferStream ↺
- Creates a BufferStream.dataStream.stringify([serializer]) : StringStream ↺
- Creates a StringStream.dataStream.toArray([initial]) : Array.<any> ⇄
- Aggregates the stream into a single ArraydataStream.toGenerator() : Generator.<Promise.<any>>
- Returns an async generatordataStream.toBufferStream(serializer) : BufferStream ↺
- Creates a BufferStream.dataStream.toStringStream([serializer]) : StringStream ↺
- Creates a StringStream.dataStream.toBufferStream(serializer) : BufferStream ↺
- Creates a BufferStream.dataStream.toStringStream([serializer]) : StringStream ↺
- Creates a StringStream.DataStream:from(input, [options]) : DataStream
- Returns a DataStream from pretty much anything sensibly possible.DataStream:pipeline(readable) : DataStream
- Creates a pipeline of streams and returns a scramjet stream.DataStream:fromArray(array, [options]) : DataStream
- Create a DataStream from an ArrayDataStream:fromIterator(iterator, [options]) : DataStream
- Create a DataStream from an Iterator
An object consisting of multiple streams than can be refined or muxed.
The idea behind a MultiStream is being able to mux and demux streams when needed.
Usage:
new MultiStream([...streams])
.mux();
new MultiStream(function*(){ yield* streams; })
.map(stream => stream.filter(myFilter))
.mux();
Detailed :MultiStream docs here
Most popular methods:
new MultiStream(streams, [options])
- Crates an instance of MultiStream with the specified stream listmultiStream.streams : Array
- Array of all streamsmultiStream.source : DataStream
- Source of the MultiStream.multiStream.length : number
- Returns the current stream lengthmultiStream.map(aFunc, rFunc) : Promise.<MultiStream> ↺
- Returns new MultiStream with the streams returned by the transform.multiStream.find() : DataStream
- Calls Array.prototype.find on the streamsmultiStream.filter(func) : MultiStream ↺
- Filters the stream list and returns a new MultiStream with only themultiStream.mux([comparator], [ClassType]) : DataStream
- Muxes the streams into a single onemultiStream.add(stream)
- Adds a stream to the MultiStreammultiStream.remove(stream)
- Removes a stream from the MultiStreamMultiStream:from(streams, [StreamClass]) : MultiStream
- Constructs MultiStream from any number of streams-likes
A stream of string objects for further transformation on top of DataStream.
Example:
StringStream.from(async () => (await fetch('https://example.com/data/article.txt')).text())
.lines()
.append("\r\n")
.pipe(fs.createWriteStream('./path/to/file.txt'))
Detailed :StringStream docs here
Most popular methods:
new StringStream([encoding], [options])
- Constructs the stream with the given encodingstringStream.shift(bytes, func) ↺
- Shifts given length of chars from the original streamstringStream.split(splitter) ↺
- Splits the string stream by the specified RegExp or stringstringStream.match(matcher) ↺
- Finds matches in the string stream and streams the match resultsstringStream.toBufferStream() : BufferStream ↺
- Transforms the StringStream to BufferStreamstringStream.parse(parser, [StreamClass]) : DataStream ↺
- Parses every string to objectstringStream.toDataStream()
- Alias for {@link StringStream#parse}StringStream:SPLIT_LINE
- A handy split by line regex to quickly get a line-by-line streamStringStream:fromString(stream, encoding) : StringStream
- Creates a StringStream and writes a specific string.StringStream:pipeline(readable, transforms) : StringStream
- Creates a pipeline of streams and returns a scramjet stream.StringStream:from(source, [options]) : StringStream
- Create StringStream from anything.
Check out the command line interface for simplified scramjet usage with scramjet-cli
$ sjr -i http://datasource.org/file.csv ./transform-module-1 ./transform-module-1 | gzip > logs.gz
As of version 2.0 Scramjet is MIT Licensed.
The project need's your help! There's lots of work to do - transforming and muxing, joining and splitting, browserifying, modularizing, documenting and issuing those issues.
If you want to help and be part of the Scramjet team, please reach out to me, scramjetorg on Github or email us: [email protected].