Skip to content

General purpose 4th generation reactive library for Golang

Notifications You must be signed in to change notification settings

DusanKasan/cesium

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

20 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Cesium

** This library is a work in progress **

Coverage Status Go Report Card CircleCI GoDoc

This is a port of Project Reactor into Go. It provides reactive data streams with asynchronous pull backpressure and operator fusion. Its aim is to be as close to the proposed Java API as possible, altering it slightly where needed for it to make sense in Go.

For more information see generated godocs about:

More thorough documentation to come after most/all operators are implemented.

Naming

The explanations contained here assume the knowledge of the Observer Pattern.

  • Publisher is an observable that supports asynchronous backpressure in form of its Request(int64) method. It will only ever emit the requested amount of emissions (does not apply to the closing emissions complete and error). It emission footprint is ( [Next](0-N) Complete|Error )
  • Flux is a Publisher specific to this library that has access to operators described here. It can be switched to unbounded mode, basically transforming it to an observable by calling Request(math.MaxInt64) on its Subscripiton.
  • Mono is a Publisher specific to this library whose emission footprint is ( [Next](0-1) Complete|Error ). It also has operators described here. It can be switched to unbounded mode, basically transforming it to an observable by calling RequestUnbounded() which is really just a proxy to Request(math.MaxInt64) on its Subscripiton.
  • Subscription is the result of subscribing a Subscriber to a Publisher. Subscription provides means to control the pull-backpressure, via the Request(int64) method, that will intstruct the Publisher to emit specified amount of items. It also serves as a mean of emission cancellation via its Cancel() method that will cause the Publisher to stop emitting and shut down.
  • Subscriber is an observer that will receive a Subscription object upon subscription. This is achieved via the Subscriber OnSubscribe(Subscription) method
  • Observer in the scope of this library means a Publisher that does not control its subscription.
  • Scheduler and its only method Schedule(func(Canceller)) Cancellableis where everything is executed. It allows us to execute different things on different threads. The returned Cancelable can be Cancel()ed. When cancelled the Canceller's method IsCanceled() will return true. This is a small hindrance and is done like this because Go goroutines can be only cancelled from inside.

Simple examples

Subscribing an observer

Observer itself does not support backpressure so we have two options, apply an operator that will request emissions from the Publisher automatically or we can control this behavior via the returned Subscription object. In this example we do the latter.

subscription := flux.FromSlice(
    []cesium.T{1, 2, 3},
).Map(func(t cesium.T) cesium.T {
    return t.(int) + 1
}).Filter(func(t cesium.T) bool {
    return t.(int) < 4
}).Subscribe(PrintObserver())

subscription.Request(1)
// Printsubscriber will asynchronously print 2

subscription.Request(1)
// Printsubscriber will asynchronously print 3

subscription.Request(1)
// Printsubscriber will receive Complete signal

Subscribing an Subscriber

// This subscriber provides a Request(int64) method so we can control it
s := ControlledSubscriber()

flux.FromSlice(
    []cesium.T{1, 2, 3},
).Map(func(t cesium.T) cesium.T {
    return t.(int) + 1
}).Filter(func(t cesium.T) bool {
    return t.(int) < 3
}).Subscribe(s) // s will receive Subscription

s.Request(2) // s will receive 2 emissions

Switching to unbounded mode

subscription := flux.FromSlice(
    []cesium.T{1, 2, 3},
).Map(func(t cesium.T) cesium.T {
    return t.(int) + 1
}).Filter(func(t cesium.T) bool {
    return t.(int) < 4
}).Subscribe(PrintObserver())

subscription.RequestUnbounded() // Publisher will not wait for Request() to emit

Operator implementation progress

Operators listed according to Reactor docs

Factories

  • Just
  • Mono.JustOrEmpty
  • Mono.FromSupplier
  • FromSlice
  • FromChannel
  • Mono.FromCallable
  • Empty
  • Never
  • Error
  • Defer
  • Using
  • Flux.Generate
  • Create
  • Interval

Transforming

  • Map(func(T) T)
  • Cast
  • FlatMap
  • Handle(func(T, SynchronousSink))
  • Flux.FlatMapSequential
  • Mono.FlatMapMany
  • Flux.ToSlice
    • Maybe ToList (LinkedList would be better to handle large datasets)
  • Flux.ToSortedSlice
  • Flux.ToMap
  • Flux.ToChannel
  • Flux.Count()
  • Flux.Reduce(func(T, T) T)
  • Flux.Scan(func(T, T) T)
  • Flux.All(func(T) bool)
  • Flux.Any(func(T) bool)
  • Flux.HasElements()
  • Flux.HasElement(T) Flux
  • Flux.Concat(Publisher) Flux
  • ConcatWith(Publisher) Flux
  • Flux.ConcatDelayError
  • Flux.MergeSequential
  • Flux.Merge
  • MergeWith
  • Zip
  • ZipWith
  • Mono.And
  • Mono.When
  • Flux.CombineLatest
  • First (implement before Or)
  • Or
  • SwitchMap
  • SwitchOnNext
  • Repeat
  • SwitchIfEmpty
  • IgnoreElements
  • Then
  • ThenEmpty
  • ThenMany
  • Mono.DelayUntilOther
  • Mono.DelayUntil
  • Expand
  • ExpandDeep

Peeking

  • DoOnNext(func(T))
  • Flux.DoOnComplete
  • Mono.DoOnSuccess
  • DoOnError(func(error))
  • DoOnCancel(func())
  • DoOnSubscribe(func(Subscription))
  • DoOnRequest
  • DoOnTerminate
  • DoAfterTerminate
  • DoFinally(func())
  • Log(log.Logger)
  • DoOnEach
  • Materialize
  • Dematerialize

Filtering

  • Filter
  • FilterWhen
  • OfType
  • Flux.Distinct
  • Flux.DistinctUntilChanged
  • Flux.Take
  • Flux.TakeInPeriod
  • Flux.Next
  • Flux.LimitRequest
  • Flux.TakeUntil
  • Flux.TakeUntilOther
  • Flux.TakeWhile
  • Flux.ElementAt
  • Flux.TakeLast
  • Flux.Last
  • Flux.LastOrDefault
  • Flux.LastOrDefault
  • Flux.Skip
  • Flux.SkipPeriod
  • Flux.SkipLast
  • Flux.SkipUntil
  • Flux.SkipUntilOther
  • Flux.SkipWhile
  • Flux.Sample
  • Flux.SampleFirst
  • Flux.SampleUsingOther
  • Flux.SampleTimeout
  • Flux.SingleOrDefault
  • Flux.SingleOrEmpty

Handling errors

  • Timeout
  • OnErrorReturn
  • OnErrorResume
  • OnErrorMap
  • Retry
  • RetryWhen
  • Flux.OnBackpressureError
  • Flux.OnBackpressureBuffer
  • Flux.OnBackpressureDrop
  • Flux.OnBackpressureLatest

Working with time

  • Elapsed
  • Timestamp
  • Timeout
  • Interval
  • Mono.Delay
  • Mono.DelayElement
  • Flux.DelayElements
  • DelaySubscription

Splitting a Flux

  • Flux.Window
  • Flux.WindowPeriod
  • Flux.WindowTimeout
  • Flux.WindowUntil
  • Flux.WindowWhile
  • Flux.WindowUsingOther
  • Flux.WindowWhen
  • Flux.Buffer
  • Flux.BufferPeriod
  • Flux.BufferTimeout
  • Flux.BufferUntil
  • Flux.BufferWhile
  • Flux.BufferWhen
  • Flux.BufferUsingOther
  • Flux.GroupBy

Synchronizing

  • Flux.BlockFirst
  • Flux.BlockFirstTimeout
  • Flux.BlockLast
  • Flux.BlockLastTimeout
  • Mono.Block
  • Mono.BlockTimeout

TODO

  • Add schedule periodic and schedule after to schedulers and add ability to insert virtual clock ( this will be useful in tests)
  • How to split up tests for normal and scalar flux/mono?
  • Fix locking for flatMaps
  • Move most docs to godoc, except some examples and "how to choose an operator"
  • NoneSignal() ?
  • Performance benchmarks

About

General purpose 4th generation reactive library for Golang

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages