Skip to content

Commit

Permalink
Merge pull request #396 from klowdo/eventstore-snapshot
Browse files Browse the repository at this point in the history
Eventstore snapshot
  • Loading branch information
klowdo authored Dec 8, 2022
2 parents a7a482f + 4f802e8 commit c851bec
Show file tree
Hide file tree
Showing 16 changed files with 783 additions and 38 deletions.
11 changes: 10 additions & 1 deletion aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/looplab/eventhorizon/uuid"
)
Expand Down Expand Up @@ -58,6 +59,13 @@ type AggregateStore interface {
Save(context.Context, Aggregate) error
}

// SnapshotStrategy determines if a snapshot should be taken or not.
type SnapshotStrategy interface {
ShouldTakeSnapshot(lastSnapshotVersion int,
lastSnapshotTimestamp time.Time,
event Event) bool
}

var (
// ErrAggregateNotFound is when no aggregate can be found.
ErrAggregateNotFound = errors.New("aggregate not found")
Expand Down Expand Up @@ -148,7 +156,8 @@ func (e *AggregateError) Cause() error {
// used to create concrete aggregate types when loading from the database.
//
// An example would be:
// RegisterAggregate(func(id UUID) Aggregate { return &MyAggregate{id} })
//
// RegisterAggregate(func(id UUID) Aggregate { return &MyAggregate{id} })
func RegisterAggregate(factory func(uuid.UUID) Aggregate) {
// Check that the created aggregate matches the registered type.
// TODO: Explore the use of reflect/gob for creating concrete types without
Expand Down
91 changes: 87 additions & 4 deletions aggregatestore/events/aggregatestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"time"

eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/uuid"
Expand All @@ -27,7 +28,10 @@ import (
// uses an event store for loading and saving events used to build the aggregate
// and an event handler to handle resulting events.
type AggregateStore struct {
store eh.EventStore
store eh.EventStore
snapshotStore eh.SnapshotStore
isSnapshotStore bool
snapshotStrategy eh.SnapshotStrategy
}

var (
Expand All @@ -39,10 +43,10 @@ var (
ErrMismatchedEventType = errors.New("mismatched event type and aggregate type")
)

// NewAggregateStore creates a aggregate store with an event store and an event
// NewAggregateStore creates an aggregate store with an event store and an event
// handler that will handle resulting events (for example by publishing them
// on an event bus).
func NewAggregateStore(store eh.EventStore) (*AggregateStore, error) {
func NewAggregateStore(store eh.EventStore, options ...Option) (*AggregateStore, error) {
if store == nil {
return nil, ErrInvalidEventStore
}
Expand All @@ -51,9 +55,31 @@ func NewAggregateStore(store eh.EventStore) (*AggregateStore, error) {
store: store,
}

d.snapshotStrategy = &NoSnapshotStrategy{}

for _, option := range options {
if err := option(d); err != nil {
return nil, fmt.Errorf("error while applying option: %w", err)
}
}

d.snapshotStore, d.isSnapshotStore = store.(eh.SnapshotStore)

return d, nil
}

// Option is an option setter used to configure creation.
type Option func(*AggregateStore) error

// WithSnapshotStrategy add the strategy to use when determining if a snapshot should be taken
func WithSnapshotStrategy(s eh.SnapshotStrategy) Option {
return func(as *AggregateStore) error {
as.snapshotStrategy = s

return nil
}
}

// Load implements the Load method of the eventhorizon.AggregateStore interface.
// It loads an aggregate from the event store by creating a new aggregate of the
// type with the ID and then applies all events to it, thus making it the most
Expand All @@ -79,7 +105,26 @@ func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateTyp
}
}

events, err := r.store.Load(ctx, a.EntityID())
fromVersion := 1

if sa, ok := a.(eh.Snapshotable); ok && r.isSnapshotStore {
snapshot, err := r.snapshotStore.LoadSnapshot(ctx, id)
if err != nil {
return nil, &eh.AggregateStoreError{
Err: err,
Op: eh.AggregateStoreOpLoad,
AggregateType: aggregateType,
AggregateID: id,
}
}

if snapshot != nil {
sa.ApplySnapshot(snapshot)
fromVersion = snapshot.Version + 1
}
}

events, err := r.store.LoadFrom(ctx, a.EntityID(), fromVersion)
if err != nil && !errors.Is(err, eh.ErrAggregateNotFound) {
return nil, &eh.AggregateStoreError{
Err: err,
Expand Down Expand Up @@ -142,6 +187,44 @@ func (r *AggregateStore) Save(ctx context.Context, agg eh.Aggregate) error {
}
}

return r.takeSnapshot(ctx, agg, events[len(events)-1])
}

func (r *AggregateStore) takeSnapshot(ctx context.Context, agg eh.Aggregate, lastEvent eh.Event) error {
a, ok := agg.(eh.Snapshotable)
if !ok || !r.isSnapshotStore {
return nil
}

s, err := r.snapshotStore.LoadSnapshot(ctx, agg.EntityID())
if err != nil {
return &eh.AggregateStoreError{
Err: err,
Op: eh.AggregateStoreOpSave,
AggregateType: agg.AggregateType(),
AggregateID: agg.EntityID(),
}
}

version := 0
timestamp := time.Now()

if s != nil {
version = s.Version
timestamp = s.Timestamp
}

if res := r.snapshotStrategy.ShouldTakeSnapshot(version, timestamp, lastEvent); res {
if err = r.snapshotStore.SaveSnapshot(ctx, agg.EntityID(), *a.CreateSnapshot()); err != nil {
return &eh.AggregateStoreError{
Err: err,
Op: eh.AggregateStoreOpSave,
AggregateType: agg.AggregateType(),
AggregateID: agg.EntityID(),
}
}
}

return nil
}

Expand Down
61 changes: 60 additions & 1 deletion aggregatestore/events/aggregatestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package events
import (
"context"
"errors"
"fmt"
"reflect"
"testing"
"time"

eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/mocks"
"github.com/looplab/eventhorizon/uuid"
"github.com/stretchr/testify/assert"
)

func TestNewAggregateStore(t *testing.T) {
Expand Down Expand Up @@ -253,6 +255,46 @@ func TestAggregateStore_SaveEvents(t *testing.T) {
agg.err = nil
}

func TestAggregateStore_TakeSnapshot(t *testing.T) {
eventStore := &mocks.EventStore{
Events: make([]eh.Event, 0),
}

store, err := NewAggregateStore(eventStore, WithSnapshotStrategy(NewEveryNumberEventSnapshotStrategy(2)))
if err != nil {
t.Fatal("there should be no error:", err)
}

ctx := context.Background()

id := uuid.New()
agg := NewTestAggregateOther(id)

timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)

for i := 0; i < 3; i++ {
agg.AppendEvent(mocks.EventType, &mocks.EventData{Content: fmt.Sprintf("event%d", i)}, timestamp)

if err := store.Save(ctx, agg); err != nil {
t.Error("should not be an error")
}
}

assert.NotNil(t, eventStore.Snapshot, "snapshot should be taken")

agg2, err := store.Load(ctx, agg.AggregateType(), agg.EntityID())
if err != nil {
t.Error("should not be an error")
}

a, ok := agg2.(*TestAggregateOther)
if !ok {
t.Error("wrong aggregate type")
}

assert.Equal(t, 1, a.appliedEvents)
}

func TestAggregateStore_AggregateNotRegistered(t *testing.T) {
store, _ := createStore(t)

Expand Down Expand Up @@ -296,7 +338,8 @@ const TestAggregateOtherType eh.AggregateType = "TestAggregateOther"

type TestAggregateOther struct {
*AggregateBase
err error
err error
appliedEvents int
}

var _ = VersionedAggregate(&TestAggregateOther{})
Expand All @@ -316,5 +359,21 @@ func (a *TestAggregateOther) ApplyEvent(ctx context.Context, event eh.Event) err
return a.err
}

a.appliedEvents++

return nil
}

func (a *TestAggregateOther) CreateSnapshot() *eh.Snapshot {
return &eh.Snapshot{
Version: a.AggregateVersion(),
Timestamp: time.Now(),
AggregateType: TestAggregateType,
State: a,
}
}

func (a *TestAggregateOther) ApplySnapshot(snapshot *eh.Snapshot) {
agg := snapshot.State.(*TestAggregateOther)
a.id = agg.id
}
65 changes: 65 additions & 0 deletions aggregatestore/events/snapshotstrategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2021 - The Event Horizon authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package events

import (
"time"

eh "github.com/looplab/eventhorizon"
)

// NoSnapshotStrategy no snapshot should be taken.
type NoSnapshotStrategy struct {
}

func (s *NoSnapshotStrategy) ShouldTakeSnapshot(_ int,
_ time.Time,
_ eh.Event) bool {
return false
}

// EveryNumberEventSnapshotStrategy use to take a snapshot every n number of events.
type EveryNumberEventSnapshotStrategy struct {
snapshotThreshold int
}

func NewEveryNumberEventSnapshotStrategy(threshold int) *EveryNumberEventSnapshotStrategy {
return &EveryNumberEventSnapshotStrategy{
snapshotThreshold: threshold,
}
}

func (s *EveryNumberEventSnapshotStrategy) ShouldTakeSnapshot(lastSnapshotVersion int,
_ time.Time,
event eh.Event) bool {
return event.Version()-lastSnapshotVersion >= s.snapshotThreshold
}

// PeriodSnapshotStrategy use to take a snapshot every time a period has elapsed, for example every hour.
type PeriodSnapshotStrategy struct {
snapshotThreshold time.Duration
}

func NewPeriodSnapshotStrategy(threshold time.Duration) *PeriodSnapshotStrategy {
return &PeriodSnapshotStrategy{
snapshotThreshold: threshold,
}
}

func (s *PeriodSnapshotStrategy) ShouldTakeSnapshot(_ int,
lastSnapshotTimestamp time.Time,
event eh.Event) bool {
return event.Timestamp().Sub(lastSnapshotTimestamp) >= s.snapshotThreshold
}
Loading

0 comments on commit c851bec

Please sign in to comment.