Skip to content

Commit

Permalink
Add Stream.TransactAsyncEx (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Feb 18, 2020
1 parent 0abc883 commit dc3967a
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 56 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- `Stream.TransactAsyncEx`, exposing the `Core.ISyncContext` at conclusion of the sync operation, affording the ability to examine the post-state `Version` etc. (This paves the way for exposing [`SessionToken`](https://github.com/jet/equinox/issues/192) at a later point without a breaking change) [#194](https://github.com/jet/equinox/pull/194)

### Changed

- `Stream.QueryEx` to supply `Core.ISyncContext` in lieu of only exposing `Version` (to align with `TransactAsyncEx`) [#194](https://github.com/jet/equinox/pull/194)

### Removed
### Fixed

Expand Down
4 changes: 2 additions & 2 deletions samples/Tutorial/FulfilmentCenter.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ module FulfilmentCenter =
stream.Query id
let queryEx fc (projection : Fold.State -> 't) : Async<int64*'t> =
let stream = resolve fc
stream.QueryEx(fun v s -> v, projection s)
stream.QueryEx(fun c -> c.Version, projection c.State)

member __.UpdateName(id, value) = execute id (Register value)
member __.UpdateAddress(id, value) = execute id (UpdateAddress value)
Expand Down Expand Up @@ -181,4 +181,4 @@ module FulfilmentCenterSummary =
stream.Query(Option.map (fun s -> s.state))

member __.Update(id, version, value) = execute id (Update (version,value))
member __.TryRead id : Async<Summary option> = read id
member __.TryRead id : Async<Summary option> = read id
2 changes: 1 addition & 1 deletion src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1350,4 +1350,4 @@ module Events =

/// Obtains the `index` from the current write Position
let getNextIndex (ctx: Context) (streamName: string) : Async<int64> =
ctx.Sync(ctx.CreateStream streamName) |> stripPosition
ctx.Sync(ctx.CreateStream streamName) |> stripPosition
2 changes: 1 addition & 1 deletion src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ module Token =
let (*private*) ofCompactionEventNumber compactedEventNumberOption unstoredEventsPending batchSize streamName streamVersion : StreamToken =
let batchCapacityLimit = batchCapacityLimit compactedEventNumberOption unstoredEventsPending batchSize streamVersion
create compactedEventNumberOption (Some batchCapacityLimit) streamName streamVersion

/// Assume we have not seen any compaction events; use the batchSize and version to infer headroom
let ofUncompactedVersion batchSize streamName streamVersion : StreamToken =
ofCompactionEventNumber None 0 batchSize streamName streamVersion
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ type Resolver<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>,

/// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento]
member __.FromMemento(Token.Unpack stream as streamToken, state, ?context) =
Stream.ofMemento (streamToken,state) (resolveStream stream.streamName context)
Stream.ofMemento (streamToken,state) (resolveStream stream.streamName context)
2 changes: 1 addition & 1 deletion src/Equinox.SqlStreamStore/SqlStreamStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -562,4 +562,4 @@ type ConnectorBase([<O; D(null)>]?readRetryPolicy, [<O; D(null)>]?writeRetryPoli
member __.Establish(appName) : Async<Connection> = async {
let! store = __.Connect()
return Connection(readConnection=store, writeConnection=store, ?readRetryPolicy=readRetryPolicy, ?writeRetryPolicy=writeRetryPolicy)
}
}
73 changes: 41 additions & 32 deletions src/Equinox/Equinox.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,50 @@ type MaxResyncsExhaustedException(count) =
/// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic
type Stream<'event, 'state>
( log, stream : IStream<'event, 'state>, maxAttempts : int,
[<Optional; DefaultParameterValue(null)>] ?mkAttemptsExhaustedException,
[<Optional; DefaultParameterValue(null)>] ?createAttemptsExhaustedException,
[<Optional; DefaultParameterValue(null)>] ?resyncPolicy) =

let transact f =
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber f -> async { return! f })
let transact decide mapResult =
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let throwMaxResyncsExhaustedException attempts = MaxResyncsExhaustedException attempts
let handleResyncsExceeded = defaultArg mkAttemptsExhaustedException throwMaxResyncsExhaustedException
Flow.transact (maxAttempts, resyncPolicy, handleResyncsExceeded) (stream, log) f

/// 0. Invoke the supplied `interpret` function with the present state
/// 1. Attempt to sync the accumulated events to the stream
/// Tries up to `maxAttempts` times in the case of a conflict, throwing `MaxResyncsExhaustedException` to signal failure.
member __.Transact(interpret : 'state -> 'event list) : Async<unit> = transact (fun state -> async { return (), interpret state })

/// 0. Invoke the supplied `decide` function with the present state
/// 1. Attempt to sync the accumulated events to the stream
/// 2. Yield result
/// Tries up to `maxAttempts` times in the case of a conflict, throwing `MaxResyncsExhaustedException` to signal failure.
member __.Transact(decide : 'state -> 'result * 'event list) : Async<'result> = transact (fun state -> async { return decide state })

/// 0. Invoke the supplied _Async_ `decide` function with the present state
/// 1. Attempt to sync the accumulated events to the stream
/// 2. Yield result
/// Tries up to `maxAttempts` times in the case of a conflict, throwing `MaxResyncsExhaustedException` to signal failure.
member __.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> = transact decide

/// Project from the folded `State` without executing a decision flow as `Decide` does
member __.Query(projection : 'state -> 'view) : Async<'view> = Flow.query(stream, log, fun syncState -> projection syncState.State)

/// Project from the folded `State` (with the current version of the stream supplied for context) without executing a decision flow as `Decide` does
member __.QueryEx(projection : int64 -> 'state -> 'view) : Async<'view> = Flow.query(stream, log, fun syncState -> projection syncState.Version syncState.State)

/// Low-level helper to allow one to obtain a reference to a stream and state pair (including the position) in order to pass it as a continuation within the application
/// Such a memento is then held within the application and passed in lieu of a StreamId to the StreamResolver in order to avoid having to reload state
member __.CreateMemento() : Async<StreamToken * 'state> = Flow.query(stream, log, fun syncState -> syncState.Memento)
let handleResyncsExceeded = defaultArg createAttemptsExhaustedException throwMaxResyncsExhaustedException
Flow.transact (maxAttempts, resyncPolicy, handleResyncsExceeded) (stream, log) decide mapResult

/// 0. Invoke the supplied <c>interpret</c> function with the present state
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
member __.Transact(interpret : 'state -> 'event list) : Async<unit> =
transact (fun state -> async { return (), interpret state }) (fun () _context -> ())

/// 0. Invoke the supplied <c>decide</c> function with the present state, holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Yield result
member __.Transact(decide : 'state -> 'result * 'event list) : Async<'result> =
transact (fun state -> async { return decide state }) (fun result _context -> result)

/// 0. Invoke the supplied <c>_Async_</c> <c>decide</c> function with the present state, holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Yield result
member __.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> =
transact decide (fun result _context -> result)

/// 0. Invoke the supplied <c>_Async_</c> <c>decide</c> function with the present state, holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Uses <c>mapResult</c> to render the final outcome from the <c>'result</c> and/or the final <c>ISyncContext</c>
/// 3. Yields the outcome
member __.TransactAsyncEx(decide : 'state -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'resultEx) : Async<'resultEx> =
transact decide mapResult

/// Project from the folded <c>'state</c>, without executing a decision flow as <c>Transact</c> does
member __.Query(projection : 'state -> 'view) : Async<'view> =
Flow.query (stream, log, fun syncState -> projection (syncState :> ISyncContext<'state>).State)

/// Project from the stream's <c>'state<c> (including extended context), without executing a decision flow as <c>Transact<c> does
member __.QueryEx(projection : ISyncContext<'state> -> 'view) : Async<'view> =
Flow.query (stream, log, projection)

/// Store-agnostic <c>Context.Resolve</c> Options
type ResolveOption =
Expand Down
50 changes: 32 additions & 18 deletions src/Equinox/Flow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,25 @@ type SyncResult<'state> =
/// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code.
type IStream<'event, 'state> =
/// Obtain the state from the target stream
abstract Load : log: ILogger
-> Async<StreamToken * 'state>
abstract Load : log: ILogger -> Async<StreamToken * 'state>

/// Given the supplied `token` [and related `originState`], attempt to move to state `state'` by appending the supplied `events` to the underlying stream
/// SyncResult.Written: implies the state is now the value represented by the Result's value
/// SyncResult.Conflict: implies the `events` were not synced; if desired the consumer can use the included resync workflow in order to retry
abstract TrySync : log: ILogger * token: StreamToken * originState: 'state * events: 'event list -> Async<SyncResult<'state>>

/// Exposed by TransactEx / QueryEx, providing access to extended state information for cases where that's required
type ISyncContext<'state> =

/// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via a Resolver's FromMemento method
abstract member CreateMemento : unit -> StreamToken * 'state

/// Exposes the underlying Store's internal Version/Index (which, depending on the Codec, may or may not be reflected in the last event presented)
abstract member Version : int64

/// The present State of the stream within the context of this Flow
abstract member State : 'state

/// Internal implementation of the Store agnostic load + run/render. See Equinox.fs for App-facing APIs.
module internal Flow =

Expand All @@ -37,11 +48,7 @@ module internal Flow =
trySync : ILogger * StreamToken * 'state * 'event list -> Async<SyncResult<'state>>) =
let mutable tokenAndState = originState

member __.Memento = tokenAndState
member __.State = snd __.Memento
member __.Version = (fst __.Memento).version

member __.TryOr(log, events, handleFailureResync : (Async<StreamToken*'state> -> Async<bool>)) : Async<bool> = async {
let trySyncOr log events (handleFailureResync : Async<StreamToken*'state> -> Async<bool>) : Async<bool> = async {
let! res = let token, state = tokenAndState in trySync (log,token,state,events)
match res with
| SyncResult.Conflict resync ->
Expand All @@ -50,12 +57,19 @@ module internal Flow =
tokenAndState <- token', streamState'
return true }

interface ISyncContext<'state> with
member __.CreateMemento() = tokenAndState
member __.State = snd tokenAndState
member __.Version = (fst tokenAndState).version

member __.TryWithoutResync(log : ILogger, events) : Async<bool> =
trySyncOr log events (fun _resync -> async { return false })
member __.TryOrResync(runResync, attemptNumber: int, log : ILogger, events) : Async<bool> =
let resyncInPreparationForRetry resync = async {
let! streamState' = runResync log attemptNumber resync
tokenAndState <- streamState'
return false }
__.TryOr(log, events, resyncInPreparationForRetry)
trySyncOr log events resyncInPreparationForRetry

/// Process a command, ensuring a consistent final state is established on the stream.
/// 1. make a decision predicated on the known state
Expand All @@ -65,41 +79,41 @@ module internal Flow =
let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException)
(syncState : SyncState<'event, 'state>)
(decide : 'state -> Async<'result * 'event list>)
: Async<'result> =
(mapResult : 'result -> SyncState<'event, 'state> -> 'resultEx)
: Async<'resultEx> =

if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1")

/// Run a decision cycle - decide what events should be appended given the presented state
let rec loop attempt : Async<'result> = async {
let rec loop attempt : Async<'resultEx> = async {
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt)
let! result, events = decide syncState.State
let! result, events = decide (syncState :> ISyncContext<'state>).State
if List.isEmpty events then
log.Debug "No events generated"
return result
return mapResult result syncState
elif attempt = maxSyncAttempts then
// Special case: on final attempt, we won't be `resync`ing; we're giving up
let! committed = syncState.TryOr(log, events, fun _resync -> async { return false })

let! committed = syncState.TryWithoutResync(log, events)
if not committed then
log.Debug "Max Sync Attempts exceeded"
return raise (createMaxAttemptsExhaustedException attempt)
else
return result
return mapResult result syncState
else
let! committed = syncState.TryOrResync(resyncRetryPolicy, attempt, log, events)
if not committed then
log.Debug "Resyncing and retrying"
return! loop (attempt + 1)
else
return result }
return mapResult result syncState }

/// Commence, processing based on the incoming state
loop 1

let transact (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide : Async<'result> = async {
let transact (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide mapResult : Async<'result> = async {
let! streamState = stream.Load log
let syncState = SyncState(streamState, stream.TrySync)
return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) syncState decide }
return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) syncState decide mapResult }

let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event, 'state> -> 'result) : Async<'result> = async {
let! streamState = stream.Load log
Expand Down

0 comments on commit dc3967a

Please sign in to comment.