Skip to content

scramjetorg/scramjet-core

Repository files navigation

npm last commit FOSSA Status Known Vulnerabilities Discord

Scramjet core

This is the minimal, dependency free version of scramjet used as of Scramjet version 3.0.0 as a base for scramjet and scramjet plugins.

Unless you are sure, you should be better off with using the main repo and module.

It is built upon the logic behind three well known javascript array operations - namingly map, filter and reduce. This means that if you've ever performed operations on an Array in JavaScript - you already know Scramjet like the back of your hand.

Usage

Scramjet uses functional programming to run transformations on your data streams in a fashion very similar to the well known event-stream node module. Most transformations are done by passing a transform function.

It's all about chaining, really - you develop your flow based on a chain of calls that return another method like this:

    scramjet.from(someReadableStream)           // you can construct your stream any way you like
        .map(someMapper)                        // you can map the objects in the stream
        .map(someAsyncAPICall)                  // you can call an API for each item
        .filter(asynchronousFilterOperation)    // you can even filter by async function
        .catch(errorHandler)                    // there's built in error handling
        .until(doneCondition)                   // you can stop reading the stream whenever you're done
        .toArray();                             // you can accumulate

You can write your transforms in three ways:

  1. Synchronous

Example: a simple stream transform that outputs a stream of objects of the same id property and the length of the value string.

   datastream.map(
       (item) => ({id: item.id, length: item.value.length})
   )
  1. Asynchronous using ES2015 async await

Example: A simple stream that uses Fetch API to get all the contents of all entries in the stream

    datastream.map(
        async (item) => fetch(item)
    )
  1. Asynchronous using Promises

Example: A simple stream that fetches an url mentioned in the incoming object

   datastream.map(
       (item) => new Promise((resolve, reject) => {
           request(item.url, (err, res, data) => {
               if (err)
                   reject(err); // will emit an "error" event on the stream
               else
                   resolve(data);
           });
       })
   )

The actual logic of this transform function is as if you passed your function to the then method of a Promise resolved with the data from the input stream.

API Docs

Here's the list of the exposed classes and methods, please review the specific documentation for details:

Note that:

  • Most of the methods take a callback argument that operates on the stream items.
  • The callback, unless it's stated otherwise, will receive an argument with the next chunk.
  • You can use async functions or return Promises wherever you like.
  • Methods usually return the same class, so are chainable or are asynchronous

The quick reference of the exposed classes:

:BufferStream

A facilitation stream created for easy splitting or parsing buffers.

Useful for working on built-in Node.js streams from files, parsing binary formats etc.

A simple use case would be:

 fs.createReadStream('pixels.rgba')
     .pipe(new BufferStream)         // pipe a buffer stream into scramjet
     .breakup(4)                     // split into 4 byte fragments
     .parse(buffer => [
         buffer.readInt8(0),            // the output is a stream of R,G,B and Alpha
         buffer.readInt8(1),            // values from 0-255 in an array.
         buffer.readInt8(2),
         buffer.readInt8(3)
     ]);

Detailed :BufferStream docs here

Most popular methods:

:DataStream

DataStream is the primary stream type for Scramjet. When you parse your stream, just pipe it you can then perform calculations on the data objects streamed through your flow.

Use as:

const { DataStream } = require('scramjet');

await (DataStream.from(aStream) // create a DataStream
    .map(findInFiles)           // read some data asynchronously
    .map(sendToAPI)             // send the data somewhere
    .run());                    // wait until end

Detailed :DataStream docs here

Most popular methods:

:MultiStream

An object consisting of multiple streams than can be refined or muxed.

The idea behind a MultiStream is being able to mux and demux streams when needed.

Usage:

new MultiStream([...streams])
 .mux();

new MultiStream(function*(){ yield* streams; })
 .map(stream => stream.filter(myFilter))
 .mux();

Detailed :MultiStream docs here

Most popular methods:

:StringStream

A stream of string objects for further transformation on top of DataStream.

Example:

StringStream.from(async () => (await fetch('https://example.com/data/article.txt')).text())
    .lines()
    .append("\r\n")
    .pipe(fs.createWriteStream('./path/to/file.txt'))

Detailed :StringStream docs here

Most popular methods:

CLI

Check out the command line interface for simplified scramjet usage with scramjet-cli

$ sjr -i http://datasource.org/file.csv ./transform-module-1 ./transform-module-1 | gzip > logs.gz

License and contributions

As of version 2.0 Scramjet is MIT Licensed.

FOSSA Status

Help wanted

The project need's your help! There's lots of work to do - transforming and muxing, joining and splitting, browserifying, modularizing, documenting and issuing those issues.

If you want to help and be part of the Scramjet team, please reach out to me, scramjetorg on Github or email us: [email protected].