Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extends cqrs about goroutine pool go #18

Merged
merged 4 commits into from
May 13, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 11 additions & 19 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 1 addition & 5 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
name = "github.com/globalsign/mgo"

[[constraint]]
branch = "master"
branch = "goroutinePoolHandler"
name = "github.com/go-ocf/cqrs"

[[constraint]]
Expand All @@ -49,10 +49,6 @@
name = "github.com/nats-io/go-nats"
version = "1.7.2"

[[constraint]]
name = "github.com/panjf2000/ants"
version = "1.0.0"

[[constraint]]
name = "github.com/stretchr/testify"
version = "1.3.0"
Expand Down
5 changes: 2 additions & 3 deletions cqrs/eventbus/kafka/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (
)

type Config struct {
Endpoints []string `envconfig:"KAFKA_ENDPOINTS" default:"localhost:9092"`
ErrFunc cqrsKafka.ErrFunc //used by subscriber to report error in internal goroutine
Endpoints []string `envconfig:"KAFKA_ENDPOINTS" default:"localhost:9092"`
}

//String return string representation of Config
Expand All @@ -24,7 +23,7 @@ type Publisher struct {
*cqrsKafka.Publisher
}

//NewPublisher create new Publisher with configuration, proto marshaller and unmarshaller
// NewPublisher creates new publisher with configuration and proto marshaller.
func NewPublisher(config Config) (*Publisher, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Flush.MaxMessages = 1
Expand Down
7 changes: 4 additions & 3 deletions cqrs/eventbus/kafka/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
jkralik marked this conversation as resolved.
Show resolved Hide resolved
"github.com/Shopify/sarama"
cqrsEventBus "github.com/go-ocf/cqrs/eventbus"
cqrsKafka "github.com/go-ocf/cqrs/eventbus/kafka"
cqrsUtils "github.com/go-ocf/kit/cqrs"
)
Expand All @@ -10,10 +11,10 @@ type Subscriber struct {
*cqrsKafka.Subscriber
}

//NewPublisher create new Publisher with configuration, proto marshaller and unmarshaller
func NewSubscriber(config Config) (*Subscriber, error) {
// NewSubscriber create new subscriber with configuration and proto unmarshaller.
func NewSubscriber(config Config, goroutinePoolGo cqrsEventBus.GoroutinePoolGoFunc, errFunc cqrsEventBus.ErrFunc) (*Subscriber, error) {
saramaConfig := sarama.NewConfig()
s, err := cqrsKafka.NewSubscriber(config.Endpoints, saramaConfig, cqrsUtils.Unmarshal, config.ErrFunc)
s, err := cqrsKafka.NewSubscriber(config.Endpoints, saramaConfig, cqrsUtils.Unmarshal, goroutinePoolGo, errFunc)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cqrs/eventbus/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type Publisher struct {
*cqrsNats.Publisher
}

// NewPublisher creates new Publisher with configuration proto marshaller.
// NewPublisher creates new publisher with proto marshaller.
func NewPublisher(config Config) (*Publisher, error) {
p, err := cqrsNats.NewPublisher(config.URL, cqrsUtils.Marshal, config.Options...)
if err != nil {
Expand Down
10 changes: 4 additions & 6 deletions cqrs/eventbus/nats/subscriber.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package nats

import (
cqrsEventBus "github.com/go-ocf/cqrs/eventbus"
cqrsNats "github.com/go-ocf/cqrs/eventbus/nats"
cqrsUtils "github.com/go-ocf/kit/cqrs"
"github.com/go-ocf/kit/log"
)

type Subscriber struct {
*cqrsNats.Subscriber
}

// NewSubscriber create new Subscriber with proto unmarshaller.
func NewSubscriber(config Config) (*Subscriber, error) {
s, err := cqrsNats.NewSubscriber(config.URL, cqrsUtils.Unmarshal, func(err error) {
log.Errorf("%v", err)
}, config.Options...)
// NewSubscriber create new subscriber with proto unmarshaller.
func NewSubscriber(config Config, goroutinePoolGo cqrsEventBus.GoroutinePoolGoFunc, errFunc cqrsEventBus.ErrFunc) (*Subscriber, error) {
s, err := cqrsNats.NewSubscriber(config.URL, cqrsUtils.Unmarshal, goroutinePoolGo, errFunc, config.Options...)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions cqrs/eventstore/mongodb/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/panjf2000/ants"

"github.com/go-ocf/cqrs/event"
"github.com/go-ocf/cqrs/eventstore"
Expand Down Expand Up @@ -60,7 +59,7 @@ func (c Config) String() string {
}

//NewEventStore create a event store from configuration
func NewEventStore(config Config, pool *ants.Pool) (*EventStore, error) {
func NewEventStore(config Config, goroutinePoolGo eventstore.GoroutinePoolGoFunc) (*EventStore, error) {
session, err := mgo.Dial(config.Host)
if err != nil {
return nil, fmt.Errorf("cannot dial to DB: %v", err)
Expand All @@ -69,7 +68,7 @@ func NewEventStore(config Config, pool *ants.Pool) (*EventStore, error) {
session.SetMode(mgo.Strong, true)
session.SetSafe(&mgo.Safe{W: 1})

es, err := cqrsMongodb.NewEventStoreWithSession(session, config.DatabaseName, "events", config.BatchSize, pool, cqrsUtils.Marshal, cqrsUtils.Unmarshal, log.Debugf)
es, err := cqrsMongodb.NewEventStoreWithSession(session, config.DatabaseName, "events", config.BatchSize, goroutinePoolGo, cqrsUtils.Marshal, cqrsUtils.Unmarshal, log.Debugf)
if err != nil {
return nil, err
}
Expand Down
16 changes: 10 additions & 6 deletions cqrs/eventstore/test/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package test
import (
"context"
"errors"
"fmt"

"github.com/go-ocf/cqrs/event"
"github.com/go-ocf/cqrs/eventstore"
Expand Down Expand Up @@ -92,6 +93,9 @@ func (s *MockEventStore) LoadFromSnapshot(ctx context.Context, queries []eventst
for _, q := range queriesInt {
ret = append(ret, q)
}
if len(ret) == 0 {
return fmt.Errorf("cannot load events: not found")
}

return s.LoadFromVersion(ctx, ret, eventHandler)
}
Expand All @@ -114,7 +118,7 @@ func (i *iter) Err() error {
return nil
}

func (s *MockEventStore) GetInstanceId(ctx context.Context, deviceId, resourceId string) (int64, error) {
func (s *MockEventStore) GetInstanceId(ctx context.Context, groupId, aggregateId string) (int64, error) {
return -1, errors.New("not supported")
}
func (s *MockEventStore) RemoveInstanceId(ctx context.Context, instanceId int64) error {
Expand All @@ -125,16 +129,16 @@ func NewMockEventStore() *MockEventStore {
return &MockEventStore{make(map[string]map[string][]event.EventUnmarshaler)}
}

func (e *MockEventStore) Append(deviceId, resourceId string, ev event.EventUnmarshaler) {
func (e *MockEventStore) Append(groupId, aggregateId string, ev event.EventUnmarshaler) {
var m map[string][]event.EventUnmarshaler
var ok bool
if m, ok = e.events[deviceId]; !ok {
if m, ok = e.events[groupId]; !ok {
m = make(map[string][]event.EventUnmarshaler)
e.events[deviceId] = m
e.events[groupId] = m
}
var r []event.EventUnmarshaler
if r, ok = m[resourceId]; !ok {
if r, ok = m[aggregateId]; !ok {
r = make([]event.EventUnmarshaler, 0, 10)
}
m[resourceId] = append(r, ev)
m[aggregateId] = append(r, ev)
}
Loading