rxbp is a Python library that integrates backpressure into the Observable via Flowables.
- Observable Pattern: built on the reactive programming model.
- Backpressure: enables memory-safe handling of fast data producers and slow consumers.
- Continuation certificate: ensures that the execution of a Flowable completes, avoiding any continuation deadlock.
- RxPY compatibility: interoperates with RxPY, bridging classic observables and backpressure-aware Flowables.
- Favor usability - Favor an implementation that is simple, safe, and user-friendly, while accepting some computational overhead.
You can install rxbp using pip:
pip install rxbp
import rxbp
source = rxbp.from_iterable(("Alpha", "Beta", "Gamma", "Delta", "Epsilon"))
flowable = (
source
.map(lambda s: len(s))
.filter(lambda i: i >= 5)
.tap(on_next=lambda v: print(f'Received {v}'))
)
# execute the flowable
rxbp.run(flowable)
connectable
- create a Flowable whose source must be specified by theconnections
argument when calling therun
functioncount
- create a Flowable emitting 0, 1, 2, ...create
- creates a Flowable from a ContinuationMonadempty
- create a Flowable emitting no itemserror
- create a Flowable emitting an exceptionfrom_iterable
(orfrom_
) - create a Flowable that emits each element of an iterablefrom_value
(orreturn_
) - create a Flowable that emits a single elementfrom_rx
- wrap a rx.Observable and exposes it as a Flowable, relaying signals in a backpressure-aware manner.interval
- create a Flowable emitting an item after every time intervalrepeat
- create a Flowable that repeats the given elementschedule_on
- schedule task on a dedicated schedulersleep
(ordelay
) - schedule task on a dedicated scheduler after a relative time
accumulate
- apply an accumulator function over a Flowable sequence and returns each intermediate result.batch
- gathers items into batches of provided sizeconcat_map
- apply a function to each item emitted by the source and flatten the results sequentiallydefault_if_empty
- emits a given value if the source completes without emitting anythingfilter
- emit only those items for which the given predicate holdsfirst
- emit the first element onlyflat_map
- apply a function to each item emitted by the source and flattens the resultlast
- emit last itemmap
- map each element emitted by the source by applying the given functionreduce
- apply an accumulator function over a Flowable sequence and emits a single elementrepeat
- returns a Flowable that will resubscribe to the source when the source completesrepeat_first
- return a Flowable that repeats the first element it receives from the source forever (until disposed).skip
- skip the first n itemsskip_while
- skip first items while the given predicate holdstake
- take the first n itemstake_while
- take items while the given predicate holdstap
- used to perform side-effects for notifications from the source Flowableto_list
- create a new Flowable that collects the items from the source sequence, and emits a single itemzip_with_index
- zip each item emitted by the source with the enumerated index
merge
- merge the items of the Flowable sequences into a single Flowablezip
- Create a new Flowable from two Flowables by combining their item in pairs in a strict sequence
share
- share a Flowable to possibly multiple subscribers
to_rx
- create a rx Observable from a Observable
A Flowable can be created from an RxPY Observable using the rxbp.from_rx
function.
Likewise, a Flowable can be converted to an RxPY Observable using the rxbp.to_rx
function.
The example below demonstrates the two conversion:
import reactivex as rx
import rxbp
rx_source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
# convert Observable to Flowable
source = rxbp.from_rx(rx_source)
flowable = (
source
.map(lambda s: len(s))
.filter(lambda i: i >= 5)
)
# convert Flowable to Observable
rxbp.to_rx(flowable).subscribe(lambda v: print(f"Received {v}"))
Below are some references related to this project:
- continuationmonad is a Python library that implements stack-safe continuations based on schedulers.
- RxPY is rx extension for Python implementing the Observable pattern (without backpressure).