-
Notifications
You must be signed in to change notification settings - Fork 0
Components: Source
Source components are responsible for generating events and/or data that should be processed.
Events and data generated by sources will trigger processor components asynchronously.
Source components are constructed using Rx.Observable.create
function from RxJS.
The default exported user function will passed to the constructor.
In the simplest instance, "Hello world!" source component would look like this:
export default (observable) => {
observable.onNext('Hello world!');
observable.onCompleted();
};
This source dispatches a string "Hello world!" and immediately notifies that it is complete.
It is possible to create endless source components.
This might be useful for cases when you might need to receive data continuously. (e.g. slack bot)
Simplest case is endless counter dispatcher with preset interval:
export default (observable) => {
let i = 0;
setInterval(() => observable.onNext(i++), 1000);
};
This source will dispatch counter every second until user manually shuts it down.
Sometimes you might need to allow to pass configuration options for the source.
This can be done by adding more parameters to the function.
Here's an example of source component that will get the text for given URL:
import request from 'superagent';
export default (url, observable) => {
request.get(url).end((err, res) => {
if (err) return observable.onError(err);
observable.onNext(res.text);
});
};
Note that observable
variable will always be the last parameter of the function.
It is also possible to use HTTP requests and WebSocket to pass additional data to sources that are already running.
The input will be mapped to the URI of the pipeline appended by /input
, e.g. http://alpha.exynize.com/pipeline/user/pipeline/input
.
There are two ways to handle this.
Simple way is to define new exported function called routeHandler
that will get incoming requests from all the transports (POST, PUT, GET requests and websocket).
This can be done like so:
// create subject to pass request to main source function
const incoming = new Rx.Subject();
// define function that will handle incoming requests
export const routeHandler = (req) => incoming.onNext(req);
// main source function
export default (observable) => {
incoming.subscribe(observable);
observable.onNext('Waiting for input!');
};
This example uses Rx.Subject
to pass the incoming request directly to source output.
More controlled way is to define new exported object called routeHandler
that will process incoming requests from any transports it defines handlers for.
This can be done like so:
// create subject to pass request to main source function
const incoming = new Rx.Subject();
// define object that will handle only incoming GET requests
export const routeHandler = {
get: (req) => incoming.onNext(req.query)
// other possible methods:
// "post" for POST requests
// "put" for PUT requests
// "ws" for WebSocket connection
};
// main source function
export default (observable) => {
incoming.subscribe(observable);
observable.onNext('Waiting for input!');
};
This example will only handle GET requests and will pass any query
that incoming request has to the source function.
Note that source observable must not be complete and source must be running to accept requests.
You can find real-world examples of source components in usecases folder, currently the following ones are available: