Skip to content

Commit

Permalink
Merge pull request #368 from maxekman/fix-client-closes
Browse files Browse the repository at this point in the history
Fix / Only close DB clients that are owned
  • Loading branch information
maxekman authored Dec 1, 2021
2 parents b6067b2 + 415dab3 commit f84d75b
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 39 deletions.
28 changes: 23 additions & 5 deletions eventstore/mongodb/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,22 @@ import (
// as values.
type EventStore struct {
client *mongo.Client
clientOwnership clientOwnership
db *mongo.Database
aggregates *mongo.Collection
eventHandlerAfterSave eh.EventHandler
eventHandlerInTX eh.EventHandler
}

type clientOwnership int

const (
internalClient clientOwnership = iota
externalClient
)

// NewEventStore creates a new EventStore with a MongoDB URI: `mongodb://hostname`.
func NewEventStore(uri, db string, options ...Option) (*EventStore, error) {
func NewEventStore(uri, dbName string, options ...Option) (*EventStore, error) {
opts := mongoOptions.Client().ApplyURI(uri)
opts.SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
opts.SetReadConcern(readconcern.Majority())
Expand All @@ -56,21 +64,26 @@ func NewEventStore(uri, db string, options ...Option) (*EventStore, error) {
return nil, fmt.Errorf("could not connect to DB: %w", err)
}

return NewEventStoreWithClient(client, db, options...)
return newEventStoreWithClient(client, internalClient, dbName, options...)
}

// NewEventStoreWithClient creates a new EventStore with a client.
func NewEventStoreWithClient(client *mongo.Client, dbName string, options ...Option) (*EventStore, error) {
return newEventStoreWithClient(client, externalClient, dbName, options...)
}

func newEventStoreWithClient(client *mongo.Client, clientOwnership clientOwnership, dbName string, options ...Option) (*EventStore, error) {
if client == nil {
return nil, fmt.Errorf("missing DB client")
}

db := client.Database(dbName)

s := &EventStore{
client: client,
db: db,
aggregates: db.Collection("events"),
client: client,
clientOwnership: clientOwnership,
db: db,
aggregates: db.Collection("events"),
}

for _, option := range options {
Expand Down Expand Up @@ -376,6 +389,11 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error)

// Close implements the Close method of the eventhorizon.EventStore interface.
func (s *EventStore) Close() error {
if s.clientOwnership == externalClient {
// Don't close a client we don't own.
return nil
}

return s.client.Disconnect(context.Background())
}

Expand Down
28 changes: 23 additions & 5 deletions eventstore/mongodb_v2/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,21 @@ import (
// keep tracks of the global position of events, stored as metadata.
type EventStore struct {
client *mongo.Client
clientOwnership clientOwnership
db *mongo.Database
events *mongo.Collection
streams *mongo.Collection
eventHandlerAfterSave eh.EventHandler
eventHandlerInTX eh.EventHandler
}

type clientOwnership int

const (
internalClient clientOwnership = iota
externalClient
)

// NewEventStore creates a new EventStore with a MongoDB URI: `mongodb://hostname`.
func NewEventStore(uri, dbName string, options ...Option) (*EventStore, error) {
opts := mongoOptions.Client().ApplyURI(uri)
Expand All @@ -57,21 +65,26 @@ func NewEventStore(uri, dbName string, options ...Option) (*EventStore, error) {
return nil, fmt.Errorf("could not connect to DB: %w", err)
}

return NewEventStoreWithClient(client, dbName, options...)
return newEventStoreWithClient(client, internalClient, dbName, options...)
}

// NewEventStoreWithClient creates a new EventStore with a client.
func NewEventStoreWithClient(client *mongo.Client, dbName string, options ...Option) (*EventStore, error) {
return newEventStoreWithClient(client, externalClient, dbName, options...)
}

func newEventStoreWithClient(client *mongo.Client, clientOwnership clientOwnership, dbName string, options ...Option) (*EventStore, error) {
if client == nil {
return nil, fmt.Errorf("missing DB client")
}

db := client.Database(dbName)
s := &EventStore{
client: client,
db: db,
events: db.Collection("events"),
streams: db.Collection("streams"),
client: client,
clientOwnership: clientOwnership,
db: db,
events: db.Collection("events"),
streams: db.Collection("streams"),
}

for _, option := range options {
Expand Down Expand Up @@ -453,6 +466,11 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error)

// Close implements the Close method of the eventhorizon.EventStore interface.
func (s *EventStore) Close() error {
if s.clientOwnership == externalClient {
// Don't close a client we don't own.
return nil
}

return s.client.Disconnect(context.Background())
}

Expand Down
66 changes: 42 additions & 24 deletions outbox/mongodb/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,36 @@ var (

// Outbox implements an eventhorizon.Outbox for MongoDB.
type Outbox struct {
client *mongo.Client
outbox *mongo.Collection
handlers []*matcherHandler
handlersByType map[eh.EventHandlerType]*matcherHandler
handlersMu sync.RWMutex
errCh chan error
watchToken string
resumeToken bson.Raw
processingMu sync.Mutex
cctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
codec eh.EventCodec
client *mongo.Client
clientOwnership clientOwnership
outbox *mongo.Collection
handlers []*matcherHandler
handlersByType map[eh.EventHandlerType]*matcherHandler
handlersMu sync.RWMutex
errCh chan error
watchToken string
resumeToken bson.Raw
processingMu sync.Mutex
cctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
codec eh.EventCodec
}

type clientOwnership int

const (
internalClient clientOwnership = iota
externalClient
)

type matcherHandler struct {
eh.EventMatcher
eh.EventHandler
}

// NewOutbox creates a new Outbox with a MongoDB URI: `mongodb://hostname`.
func NewOutbox(uri, db string, options ...Option) (*Outbox, error) {
func NewOutbox(uri, dbName string, options ...Option) (*Outbox, error) {
opts := mongoOptions.Client().ApplyURI(uri)
opts.SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
opts.SetReadConcern(readconcern.Majority())
Expand All @@ -65,25 +73,30 @@ func NewOutbox(uri, db string, options ...Option) (*Outbox, error) {
return nil, fmt.Errorf("could not connect to DB: %w", err)
}

return NewOutboxWithClient(client, db, options...)
return newOutboxWithClient(client, internalClient, dbName, options...)
}

// NewOutboxWithClient creates a new Outbox with a client.
func NewOutboxWithClient(client *mongo.Client, db string, options ...Option) (*Outbox, error) {
func NewOutboxWithClient(client *mongo.Client, dbName string, options ...Option) (*Outbox, error) {
return newOutboxWithClient(client, externalClient, dbName, options...)
}

func newOutboxWithClient(client *mongo.Client, clientOwnership clientOwnership, dbName string, options ...Option) (*Outbox, error) {
if client == nil {
return nil, fmt.Errorf("missing DB client")
}

ctx, cancel := context.WithCancel(context.Background())

o := &Outbox{
client: client,
outbox: client.Database(db).Collection("outbox"),
handlersByType: map[eh.EventHandlerType]*matcherHandler{},
errCh: make(chan error, 100),
cctx: ctx,
cancel: cancel,
codec: &bsonCodec.EventCodec{},
client: client,
clientOwnership: clientOwnership,
outbox: client.Database(dbName).Collection("outbox"),
handlersByType: map[eh.EventHandlerType]*matcherHandler{},
errCh: make(chan error, 100),
cctx: ctx,
cancel: cancel,
codec: &bsonCodec.EventCodec{},
}

for _, option := range options {
Expand Down Expand Up @@ -216,7 +229,12 @@ func (o *Outbox) Close() error {
o.cancel()
o.wg.Wait()

return nil
if o.clientOwnership == externalClient {
// Don't close a client we don't own.
return nil
}

return o.client.Disconnect(context.Background())
}

// Errors implements the Errors method of the eventhorizon.EventBus interface.
Expand Down
28 changes: 23 additions & 5 deletions repo/mongodb/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,21 @@ var (
// Repo implements an MongoDB repository for entities.
type Repo struct {
client *mongo.Client
clientOwnership clientOwnership
entities *mongo.Collection
newEntity func() eh.Entity
connectionCheck bool
}

type clientOwnership int

const (
internalClient clientOwnership = iota
externalClient
)

// NewRepo creates a new Repo.
func NewRepo(uri, db, collection string, options ...Option) (*Repo, error) {
func NewRepo(uri, dbName, collection string, options ...Option) (*Repo, error) {
opts := mongoOptions.Client().ApplyURI(uri)
opts.SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
opts.SetReadConcern(readconcern.Majority())
Expand All @@ -61,18 +69,23 @@ func NewRepo(uri, db, collection string, options ...Option) (*Repo, error) {
return nil, fmt.Errorf("could not connect to DB: %w", err)
}

return NewRepoWithClient(client, db, collection, options...)
return newRepoWithClient(client, internalClient, dbName, collection, options...)
}

// NewRepoWithClient creates a new Repo with a client.
func NewRepoWithClient(client *mongo.Client, db, collection string, options ...Option) (*Repo, error) {
func NewRepoWithClient(client *mongo.Client, dbName, collection string, options ...Option) (*Repo, error) {
return newRepoWithClient(client, externalClient, dbName, collection, options...)
}

func newRepoWithClient(client *mongo.Client, clientOwnership clientOwnership, dbName, collection string, options ...Option) (*Repo, error) {
if client == nil {
return nil, fmt.Errorf("missing DB client")
}

r := &Repo{
client: client,
entities: client.Database(db).Collection(collection),
client: client,
clientOwnership: clientOwnership,
entities: client.Database(dbName).Collection(collection),
}

for _, option := range options {
Expand Down Expand Up @@ -415,5 +428,10 @@ func (r *Repo) Clear(ctx context.Context) error {

// Close implements the Close method of the eventhorizon.WriteRepo interface.
func (r *Repo) Close() error {
if r.clientOwnership == externalClient {
// Don't close a client we don't own.
return nil
}

return r.client.Disconnect(context.Background())
}

0 comments on commit f84d75b

Please sign in to comment.