TypeScript port of Monix reactive streams module. All credits goes to monix authors!
This library is Work in Progress, api may change until version 1.0.0
.
Install rx-stream
library:
npm install --save rx-stream
Create an Observable
and observe it's events:
import Observable from 'rx-stream';
import {Future} from 'funfix';
Observable.range(0, 10)
.mapFuture(n => Future.pure(n).delayResult(1000))
.foreach((e) => {
console.log('got item', e);
})
For synchronous sources, in order to use takeUntil
and onErrorRestart
- need to add
asyncBoundary (ex: bufferWithPressure
), otherwise event loop may never reach takeUntil
let failed = false;
Observable.loop()
.map((n): number => {
// will throw here
if (n == 3 && !failed) {
failed = true;
throw new Error('something went wrong');
}
return n;
})
.bufferWithPressure(10) // this will break synchronous loop, to "make room" async events (sigTrigger)
.onErrorRestartUnlimited()
.takeUntil(sigTrigger)
More usage examples and documentation will come closer to version 1.0
All code in this repository is licensed under the Apache License, Version 2.0. See LICENCE.