Skip to content

Commit

Permalink
adding pipeline behaviors to mediatr and its tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mehdihadeli committed Aug 5, 2022
1 parent 77c2050 commit 8dc1be3
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 23 deletions.
9 changes: 9 additions & 0 deletions examples/cqrs/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
gettingProductByIdDtos "github.com/mehdihadeli/mediatr/examples/cqrs/internal/products/features/getting_product_by_id/dtos"
"github.com/mehdihadeli/mediatr/examples/cqrs/internal/products/features/getting_product_by_id/queries"
"github.com/mehdihadeli/mediatr/examples/cqrs/internal/products/repository"
"github.com/mehdihadeli/mediatr/examples/cqrs/internal/shared/behaviours"
"log"
"os"
"os/signal"
Expand Down Expand Up @@ -53,6 +54,14 @@ func main() {
log.Fatal(err)
}

//////////////////////////////////////////////////////////////////////////////////////////////
// Register request handlers pipeline to the mediatr
loggerPipeline := &behaviours.RequestLoggerBehaviour{}
err = mediatr.RegisterRequestPipelineBehaviors(loggerPipeline)
if err != nil {
log.Fatal(err)
}

//////////////////////////////////////////////////////////////////////////////////////////////
// Controllers setup
controller := productApi.NewProductsController(echo)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package behaviours

import (
"context"
"github.com/mehdihadeli/mediatr"
"log"
)

type RequestLoggerBehaviour struct {
}

func (r *RequestLoggerBehaviour) Handle(ctx context.Context, request interface{}, next mediatr.RequestHandlerFunc) (interface{}, error) {
log.Printf("logging some stuff before handling the request")

response, err := next()
if err != nil {
return nil, err
}

log.Println("logging some stuff after handling the request")

return response, nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/ahmetb/go-linq/v3 v3.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.6 // indirect
Expand All @@ -33,6 +34,7 @@ require (
github.com/stretchr/objx v0.4.0 // indirect
github.com/stretchr/testify v1.8.0 // indirect
github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.1 // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/agiledragon/gomonkey/v2 v2.3.1/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY=
github.com/ahmetb/go-linq/v3 v3.2.0 h1:BEuMfp+b59io8g5wYzNoFe9pWPalRklhlhbiU3hYZDE=
github.com/ahmetb/go-linq/v3 v3.2.0/go.mod h1:haQ3JfOeWK8HpVxMtHHEMPVgBKiYyQ+f1/kLZh/cj9U=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -89,6 +91,8 @@ github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe/go.mod h1:lKJPbtWzJ9J
github.com/swaggo/swag v1.8.1/go.mod h1:ugemnJsPZm/kRwFUnzBlbHRd0JY9zE1M4F+uy2pAaPQ=
github.com/swaggo/swag v1.8.3 h1:3pZSSCQ//gAH88lfmxM3Cd1+JCsxV8Md6f36b9hrZ5s=
github.com/swaggo/swag v1.8.3/go.mod h1:jMLeXOOmYyjk8PvHTsXBdrubsNd9gUJTTCzL5iBnseg=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
Expand Down
87 changes: 77 additions & 10 deletions mediatr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package mediatr

import (
"context"
"github.com/ahmetb/go-linq/v3"
"github.com/goccy/go-reflect"

"github.com/pkg/errors"
)

type RequestHandlerFunc func() (interface{}, error)

type RequestHandler[TRequest any, TResponse any] interface {
Handle(ctx context.Context, request TRequest) (TResponse, error)
}
Expand All @@ -15,8 +17,13 @@ type NotificationHandler[TNotification any] interface {
Handle(ctx context.Context, notification TNotification) error
}

type PipelineBehavior interface {
Handle(ctx context.Context, request interface{}, next RequestHandlerFunc) (interface{}, error)
}

var requestHandlersRegistrations = map[reflect.Type]interface{}{}
var notificationHandlersRegistrations = map[reflect.Type][]interface{}{}
var pipelineBehaviours []interface{}

type Unit struct{}

Expand All @@ -36,8 +43,19 @@ func RegisterRequestHandler[TRequest any, TResponse any](handler RequestHandler[
return nil
}

// RegisterRequestBehavior TODO
func RegisterRequestBehavior(b interface{}) error {
// RegisterRequestPipelineBehaviors register the request behaviors to mediatr registry.
func RegisterRequestPipelineBehaviors(behaviours ...PipelineBehavior) error {
for _, behavior := range behaviours {
behaviorType := reflect.TypeOf(behavior)

existsPipe := existsPipeType(behaviorType)
if existsPipe {
return errors.Errorf("registered behavior already exists in the registry.")
}

pipelineBehaviours = append(pipelineBehaviours, behavior)
}

return nil
}

Expand Down Expand Up @@ -75,22 +93,51 @@ func RegisterNotificationHandlers[TEvent any](handlers ...NotificationHandler[TE

// Send the request to its corresponding request handler.
func Send[TRequest any, TResponse any](ctx context.Context, request TRequest) (TResponse, error) {

requestType := reflect.TypeOf(request)

var response TResponse
handler, ok := requestHandlersRegistrations[requestType]
if !ok {
return *new(TResponse), errors.Errorf("no handlers for command %T", request)
return *new(TResponse), errors.Errorf("no handlers for request %T", request)
}

handlerValue, ok := handler.(RequestHandler[TRequest, TResponse])
if !ok {
return *new(TResponse), errors.Errorf("handler for command %T is not a Handler", request)
return *new(TResponse), errors.Errorf("handler for request %T is not a Handler", request)
}

response, err := handlerValue.Handle(ctx, request)
if err != nil {
return *new(TResponse), errors.Wrap(err, "error handling request")
if len(pipelineBehaviours) > 0 {
var reversPipes = reversOrder(pipelineBehaviours)

var lastHandler RequestHandlerFunc = func() (interface{}, error) {
return handlerValue.Handle(ctx, request)
}

aggregateResult := linq.From(reversPipes).AggregateWithSeed(lastHandler, func(next interface{}, pipe interface{}) interface{} {
pipeValue := pipe.(PipelineBehavior)
nexValue := next.(RequestHandlerFunc)

var handlerFunc RequestHandlerFunc = func() (interface{}, error) {
return pipeValue.Handle(ctx, request, nexValue)
}

return handlerFunc
})

v := aggregateResult.(RequestHandlerFunc)
response, err := v()

if err != nil {
return *new(TResponse), errors.Wrap(err, "error handling request")
}

return response.(TResponse), nil
} else {
res, err := handlerValue.Handle(ctx, request)
if err != nil {
return *new(TResponse), errors.Wrap(err, "error handling request")
}

response = res
}

return response, nil
Expand Down Expand Up @@ -119,3 +166,23 @@ func Publish[TNotification any](ctx context.Context, notification TNotification)

return nil
}

func reversOrder(values []interface{}) []interface{} {
var reverseValues []interface{}

for i := len(values) - 1; i >= 0; i-- {
reverseValues = append(reverseValues, values[i])
}

return reverseValues
}

func existsPipeType(p reflect.Type) bool {
for _, pipe := range pipelineBehaviours {
if reflect.TypeOf(pipe) == p {
return true
}
}

return false
}
Loading

0 comments on commit 8dc1be3

Please sign in to comment.