Skip to content

Commit

Permalink
Extends cqrs about goroutine pool go (#18)
Browse files Browse the repository at this point in the history
* extends cqrs about goroutine pool go function

* implement generic projection for resource/device

* generalize name of args in mock eventstore

* fix after codereview
  • Loading branch information
jkralik committed May 13, 2019
1 parent 16d9db5 commit cbb344b
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 45 deletions.
32 changes: 13 additions & 19 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@
name = "github.com/nats-io/go-nats"
version = "1.7.2"

[[constraint]]
name = "github.com/panjf2000/ants"
version = "1.0.0"

[[constraint]]
name = "github.com/stretchr/testify"
version = "1.3.0"
Expand Down
5 changes: 2 additions & 3 deletions cqrs/eventbus/kafka/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (
)

type Config struct {
Endpoints []string `envconfig:"KAFKA_ENDPOINTS" default:"localhost:9092"`
ErrFunc cqrsKafka.ErrFunc //used by subscriber to report error in internal goroutine
Endpoints []string `envconfig:"KAFKA_ENDPOINTS" default:"localhost:9092"`
}

//String return string representation of Config
Expand All @@ -24,7 +23,7 @@ type Publisher struct {
*cqrsKafka.Publisher
}

//NewPublisher create new Publisher with configuration, proto marshaller and unmarshaller
// NewPublisher creates new publisher with configuration and proto marshaller.
func NewPublisher(config Config) (*Publisher, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Flush.MaxMessages = 1
Expand Down
7 changes: 4 additions & 3 deletions cqrs/eventbus/kafka/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"github.com/Shopify/sarama"
cqrsEventBus "github.com/go-ocf/cqrs/eventbus"
cqrsKafka "github.com/go-ocf/cqrs/eventbus/kafka"
cqrsUtils "github.com/go-ocf/kit/cqrs"
)
Expand All @@ -10,10 +11,10 @@ type Subscriber struct {
*cqrsKafka.Subscriber
}

//NewPublisher create new Publisher with configuration, proto marshaller and unmarshaller
func NewSubscriber(config Config) (*Subscriber, error) {
// NewSubscriber create new subscriber with configuration and proto unmarshaller.
func NewSubscriber(config Config, goroutinePoolGo cqrsEventBus.GoroutinePoolGoFunc, errFunc cqrsEventBus.ErrFunc) (*Subscriber, error) {
saramaConfig := sarama.NewConfig()
s, err := cqrsKafka.NewSubscriber(config.Endpoints, saramaConfig, cqrsUtils.Unmarshal, config.ErrFunc)
s, err := cqrsKafka.NewSubscriber(config.Endpoints, saramaConfig, cqrsUtils.Unmarshal, goroutinePoolGo, errFunc)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cqrs/eventbus/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type Publisher struct {
*cqrsNats.Publisher
}

// NewPublisher creates new Publisher with configuration proto marshaller.
// NewPublisher creates new publisher with proto marshaller.
func NewPublisher(config Config) (*Publisher, error) {
p, err := cqrsNats.NewPublisher(config.URL, cqrsUtils.Marshal, config.Options...)
if err != nil {
Expand Down
10 changes: 4 additions & 6 deletions cqrs/eventbus/nats/subscriber.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package nats

import (
cqrsEventBus "github.com/go-ocf/cqrs/eventbus"
cqrsNats "github.com/go-ocf/cqrs/eventbus/nats"
cqrsUtils "github.com/go-ocf/kit/cqrs"
"github.com/go-ocf/kit/log"
)

type Subscriber struct {
*cqrsNats.Subscriber
}

// NewSubscriber create new Subscriber with proto unmarshaller.
func NewSubscriber(config Config) (*Subscriber, error) {
s, err := cqrsNats.NewSubscriber(config.URL, cqrsUtils.Unmarshal, func(err error) {
log.Errorf("%v", err)
}, config.Options...)
// NewSubscriber create new subscriber with proto unmarshaller.
func NewSubscriber(config Config, goroutinePoolGo cqrsEventBus.GoroutinePoolGoFunc, errFunc cqrsEventBus.ErrFunc) (*Subscriber, error) {
s, err := cqrsNats.NewSubscriber(config.URL, cqrsUtils.Unmarshal, goroutinePoolGo, errFunc, config.Options...)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions cqrs/eventstore/mongodb/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/panjf2000/ants"

"github.com/go-ocf/cqrs/event"
"github.com/go-ocf/cqrs/eventstore"
Expand Down Expand Up @@ -60,7 +59,7 @@ func (c Config) String() string {
}

//NewEventStore create a event store from configuration
func NewEventStore(config Config, pool *ants.Pool) (*EventStore, error) {
func NewEventStore(config Config, goroutinePoolGo eventstore.GoroutinePoolGoFunc) (*EventStore, error) {
session, err := mgo.Dial(config.Host)
if err != nil {
return nil, fmt.Errorf("cannot dial to DB: %v", err)
Expand All @@ -69,7 +68,7 @@ func NewEventStore(config Config, pool *ants.Pool) (*EventStore, error) {
session.SetMode(mgo.Strong, true)
session.SetSafe(&mgo.Safe{W: 1})

es, err := cqrsMongodb.NewEventStoreWithSession(session, config.DatabaseName, "events", config.BatchSize, pool, cqrsUtils.Marshal, cqrsUtils.Unmarshal, log.Debugf)
es, err := cqrsMongodb.NewEventStoreWithSession(session, config.DatabaseName, "events", config.BatchSize, goroutinePoolGo, cqrsUtils.Marshal, cqrsUtils.Unmarshal, log.Debugf)
if err != nil {
return nil, err
}
Expand Down
16 changes: 10 additions & 6 deletions cqrs/eventstore/test/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package test
import (
"context"
"errors"
"fmt"

"github.com/go-ocf/cqrs/event"
"github.com/go-ocf/cqrs/eventstore"
Expand Down Expand Up @@ -92,6 +93,9 @@ func (s *MockEventStore) LoadFromSnapshot(ctx context.Context, queries []eventst
for _, q := range queriesInt {
ret = append(ret, q)
}
if len(ret) == 0 {
return fmt.Errorf("cannot load events: not found")
}

return s.LoadFromVersion(ctx, ret, eventHandler)
}
Expand All @@ -114,7 +118,7 @@ func (i *iter) Err() error {
return nil
}

func (s *MockEventStore) GetInstanceId(ctx context.Context, deviceId, resourceId string) (int64, error) {
func (s *MockEventStore) GetInstanceId(ctx context.Context, groupId, aggregateId string) (int64, error) {
return -1, errors.New("not supported")
}
func (s *MockEventStore) RemoveInstanceId(ctx context.Context, instanceId int64) error {
Expand All @@ -125,16 +129,16 @@ func NewMockEventStore() *MockEventStore {
return &MockEventStore{make(map[string]map[string][]event.EventUnmarshaler)}
}

func (e *MockEventStore) Append(deviceId, resourceId string, ev event.EventUnmarshaler) {
func (e *MockEventStore) Append(groupId, aggregateId string, ev event.EventUnmarshaler) {
var m map[string][]event.EventUnmarshaler
var ok bool
if m, ok = e.events[deviceId]; !ok {
if m, ok = e.events[groupId]; !ok {
m = make(map[string][]event.EventUnmarshaler)
e.events[deviceId] = m
e.events[groupId] = m
}
var r []event.EventUnmarshaler
if r, ok = m[resourceId]; !ok {
if r, ok = m[aggregateId]; !ok {
r = make([]event.EventUnmarshaler, 0, 10)
}
m[resourceId] = append(r, ev)
m[aggregateId] = append(r, ev)
}
98 changes: 98 additions & 0 deletions cqrs/projection/projection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package projection

import (
"context"
"fmt"

"github.com/go-ocf/cqrs"
"github.com/go-ocf/cqrs/eventbus"
"github.com/go-ocf/cqrs/eventstore"
"github.com/go-ocf/kit/log"
)

type Projection struct {
cqrsProjection *cqrs.Projection

topicManager *TopicManager
refCountMap *RefCountMap
}

func NewProjection(ctx context.Context, name string, store eventstore.EventStore, subscriber eventbus.Subscriber, factoryModel eventstore.FactoryModelFunc, getTopics GetTopicsFunc) (*Projection, error) {
cqrsProjection, err := cqrs.NewProjection(ctx, store, name, subscriber, factoryModel, log.Debugf)
if err != nil {
return nil, fmt.Errorf("cannot create Projection: %v", err)
}
return &Projection{
cqrsProjection: cqrsProjection,
topicManager: NewTopicManager(getTopics),
refCountMap: NewRefCountMap(),
}, nil
}

func (p *Projection) ForceUpdate(ctx context.Context, registrationId string, query []eventstore.SnapshotQuery) error {
_, err := p.refCountMap.Inc(registrationId, false)
if err != nil {
return fmt.Errorf("cannot force update projection: %v", err)
}

err = p.cqrsProjection.Project(ctx, query)
if err != nil {
return fmt.Errorf("cannot force update projection: %v", err)
}
_, err = p.refCountMap.Dec(registrationId)
if err != nil {
return fmt.Errorf("cannot force update projection: %v", err)
}
return nil
}

func (p *Projection) Models(query []eventstore.SnapshotQuery) []eventstore.Model {
return p.cqrsProjection.Models(query)
}

func (p *Projection) Register(ctx context.Context, registrationId string, query []eventstore.SnapshotQuery) (loaded bool, err error) {
created, err := p.refCountMap.Inc(registrationId, true)
if err != nil {
return false, fmt.Errorf("cannot register device: %v", err)
}
if !created {
return false, nil
}

err = p.cqrsProjection.Project(ctx, query)
if err != nil {
return false, fmt.Errorf("cannot register device: %v", err)
}

topics, updateSubscriber := p.topicManager.Add(registrationId)

if updateSubscriber {
err := p.cqrsProjection.SubscribeTo(topics)
if err != nil {
p.refCountMap.Dec(registrationId)
return false, fmt.Errorf("cannot register device: %v", err)
}
}
return true, nil
}

func (p *Projection) Unregister(registrationId string) error {
deleted, err := p.refCountMap.Dec(registrationId)
if err != nil {
return fmt.Errorf("cannot unregister device from projection: %v", err)
}
if !deleted {
return nil
}

topics, updateSubscriber := p.topicManager.Remove(registrationId)

if updateSubscriber {
err := p.cqrsProjection.SubscribeTo(topics)
if err != nil {
log.Errorf("cannot change topics for projection: %v", err)
}
}

return nil
}
Loading

0 comments on commit cbb344b

Please sign in to comment.