Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use mongo evenstore with replaced mongo driver to DB #29

Merged
merged 2 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 33 additions & 31 deletions cqrs/eventstore/mongodb/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"math/rand"
"time"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"

"github.com/go-ocf/cqrs/event"
"github.com/go-ocf/cqrs/eventstore"
Expand All @@ -39,17 +41,19 @@ func init() {

// EventStore implements an EventStore for MongoDB.
type EventStore struct {
es *cqrsMongodb.EventStore
session *mgo.Session
config Config
es *cqrsMongodb.EventStore
client *mongo.Client
config Config

uniqueIdIsInitialized uint64
}

type Config struct {
Host string `envconfig:"MONGO_HOST" default:localhost:27017"`
DatabaseName string `envconfig:"MONGO_DATABASE" default:"eventStore"`
BatchSize int `envconfig:"MONGO_BATCH_SIZE" default:"128"`
Host string `envconfig:"MONGO_HOST" default:localhost:27017"`
DatabaseName string `envconfig:"MONGO_DATABASE" default:"eventStore"`
BatchSize int `envconfig:"MONGO_BATCH_SIZE" default:"16"`
MaxPoolSize uint16 `envconfig:"MONGO_MAX_POOL_SIZE" default:"16"`
MaxConnIdleTime time.Duration `envconfig:"MONGO_MAX_CONN_IDLE_TIME" default:"240s"`
}

//String return string representation of Config
Expand All @@ -60,22 +64,26 @@ func (c Config) String() string {

//NewEventStore create a event store from configuration
func NewEventStore(config Config, goroutinePoolGo eventstore.GoroutinePoolGoFunc) (*EventStore, error) {
session, err := mgo.Dial(config.Host)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://"+config.Host).SetMaxPoolSize(config.MaxPoolSize).SetMaxConnIdleTime(config.MaxConnIdleTime))
if err != nil {
return nil, fmt.Errorf("cannot dial to DB: %v", err)
return nil, fmt.Errorf("could not dial database: %v", err)
}
err = client.Ping(ctx, readpref.Primary())
if err != nil {
return nil, fmt.Errorf("could not dial database: %v", err)
}

session.SetMode(mgo.Strong, true)
session.SetSafe(&mgo.Safe{W: 1})

es, err := cqrsMongodb.NewEventStoreWithSession(session, config.DatabaseName, "events", config.BatchSize, goroutinePoolGo, cqrsUtils.Marshal, cqrsUtils.Unmarshal, log.Debugf)
es, err := cqrsMongodb.NewEventStoreWithClient(ctx, client, config.DatabaseName, "events", config.BatchSize, goroutinePoolGo, cqrsUtils.Marshal, cqrsUtils.Unmarshal, log.Debugf)
if err != nil {
return nil, err
}
return &EventStore{
es: es,
session: session,
config: config,
es: es,
client: client,
config: config,
}, nil
}

Expand All @@ -98,13 +106,11 @@ func (s *EventStore) LoadFromSnapshot(ctx context.Context, queries []eventstore.
// Clear clears the event storage.
func (s *EventStore) Clear(ctx context.Context) error {
err1 := s.es.Clear(ctx)
sess := s.session.Copy()
defer sess.Close()
err2 := sess.DB(s.es.DBName()).C(instanceIdsCollection).DropCollection()
err2 := s.client.Database(s.es.DBName()).Collection(instanceIdsCollection).Drop(ctx)
if err1 != nil {
return fmt.Errorf("cannot clear events: %v", err1)
}
if err2 != nil && err2 != mgo.ErrNotFound {
if err2 != nil && err2 != mongo.ErrNoDocuments {
return fmt.Errorf("cannot clear sequence number: %v", err2)
}
return nil
Expand All @@ -117,8 +123,6 @@ type seqRecord struct {

// GetInstanceId returns int64 that is unique
func (s *EventStore) GetInstanceId(ctx context.Context, aggregateId string) (int64, error) {
sess := s.session.Copy()
defer sess.Close()
var newInstanceId uint32
for {
newInstanceId = rand.Uint32()
Expand All @@ -128,8 +132,8 @@ func (s *EventStore) GetInstanceId(ctx context.Context, aggregateId string) (int
InstanceId: int64(newInstanceId),
}

if err := sess.DB(s.es.DBName()).C(instanceIdsCollection).Insert(r); err != nil {
if mgo.IsDup(err) {
if _, err := s.client.Database(s.es.DBName()).Collection(instanceIdsCollection).InsertOne(ctx, r); err != nil {
if cqrsMongodb.IsDup(err) {
rand.Seed(time.Now().UTC().UnixNano())
} else {
return -1, fmt.Errorf("cannot generate instance id: %v", err)
Expand All @@ -143,15 +147,13 @@ func (s *EventStore) GetInstanceId(ctx context.Context, aggregateId string) (int
}

func (s *EventStore) RemoveInstanceId(ctx context.Context, instanceId int64) error {
sess := s.session.Copy()
defer sess.Close()
if err := sess.DB(s.es.DBName()).C(instanceIdsCollection).Remove(bson.M{"_id": instanceId}); err != nil {
return fmt.Errorf("cannot removce instance id: %v", err)
if _, err := s.client.Database(s.es.DBName()).Collection(instanceIdsCollection).DeleteOne(ctx, bson.M{"_id": instanceId}); err != nil {
return fmt.Errorf("cannot remove instance id: %v", err)
}
return nil
}

// Close closes the database session.
func (s *EventStore) Close() {
s.es.Close()
func (s *EventStore) Close(ctx context.Context) error {
return s.es.Close(ctx)
}
2 changes: 1 addition & 1 deletion cqrs/eventstore/mongodb/eventstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestInstanceId(t *testing.T) {
}, nil)
defer func() {
store.Clear(ctx)
store.Close()
store.Close(ctx)
}()
assert.NoError(t, err)

Expand Down