diff --git a/examples/cqrs/cmd/main.go b/examples/cqrs/cmd/main.go index 295b28e..d4c3d2f 100644 --- a/examples/cqrs/cmd/main.go +++ b/examples/cqrs/cmd/main.go @@ -11,6 +11,7 @@ import ( gettingProductByIdDtos "github.com/mehdihadeli/mediatr/examples/cqrs/internal/products/features/getting_product_by_id/dtos" "github.com/mehdihadeli/mediatr/examples/cqrs/internal/products/features/getting_product_by_id/queries" "github.com/mehdihadeli/mediatr/examples/cqrs/internal/products/repository" + "github.com/mehdihadeli/mediatr/examples/cqrs/internal/shared/behaviours" "log" "os" "os/signal" @@ -53,6 +54,14 @@ func main() { log.Fatal(err) } + ////////////////////////////////////////////////////////////////////////////////////////////// + // Register request handlers pipeline to the mediatr + loggerPipeline := &behaviours.RequestLoggerBehaviour{} + err = mediatr.RegisterRequestPipelineBehaviors(loggerPipeline) + if err != nil { + log.Fatal(err) + } + ////////////////////////////////////////////////////////////////////////////////////////////// // Controllers setup controller := productApi.NewProductsController(echo) diff --git a/examples/cqrs/internal/shared/behaviours/request_logger_behaviour.go b/examples/cqrs/internal/shared/behaviours/request_logger_behaviour.go new file mode 100644 index 0000000..d7e3ece --- /dev/null +++ b/examples/cqrs/internal/shared/behaviours/request_logger_behaviour.go @@ -0,0 +1,23 @@ +package behaviours + +import ( + "context" + "github.com/mehdihadeli/mediatr" + "log" +) + +type RequestLoggerBehaviour struct { +} + +func (r *RequestLoggerBehaviour) Handle(ctx context.Context, request interface{}, next mediatr.RequestHandlerFunc) (interface{}, error) { + log.Printf("logging some stuff before handling the request") + + response, err := next() + if err != nil { + return nil, err + } + + log.Println("logging some stuff after handling the request") + + return response, nil +} diff --git a/go.mod b/go.mod index 07460aa..16364c4 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/KyleBanks/depth v1.2.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect + github.com/ahmetb/go-linq/v3 v3.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.19.6 // indirect @@ -33,6 +34,7 @@ require ( github.com/stretchr/objx v0.4.0 // indirect github.com/stretchr/testify v1.8.0 // indirect github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe // indirect + github.com/tidwall/pretty v1.2.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.1 // indirect golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect diff --git a/go.sum b/go.sum index 21a9c27..e9b4cc8 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/agiledragon/gomonkey/v2 v2.3.1/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY= +github.com/ahmetb/go-linq/v3 v3.2.0 h1:BEuMfp+b59io8g5wYzNoFe9pWPalRklhlhbiU3hYZDE= +github.com/ahmetb/go-linq/v3 v3.2.0/go.mod h1:haQ3JfOeWK8HpVxMtHHEMPVgBKiYyQ+f1/kLZh/cj9U= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -89,6 +91,8 @@ github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe/go.mod h1:lKJPbtWzJ9J github.com/swaggo/swag v1.8.1/go.mod h1:ugemnJsPZm/kRwFUnzBlbHRd0JY9zE1M4F+uy2pAaPQ= github.com/swaggo/swag v1.8.3 h1:3pZSSCQ//gAH88lfmxM3Cd1+JCsxV8Md6f36b9hrZ5s= github.com/swaggo/swag v1.8.3/go.mod h1:jMLeXOOmYyjk8PvHTsXBdrubsNd9gUJTTCzL5iBnseg= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= diff --git a/mediatr.go b/mediatr.go index b2afee3..77ae211 100644 --- a/mediatr.go +++ b/mediatr.go @@ -2,11 +2,13 @@ package mediatr import ( "context" + "github.com/ahmetb/go-linq/v3" "github.com/goccy/go-reflect" - "github.com/pkg/errors" ) +type RequestHandlerFunc func() (interface{}, error) + type RequestHandler[TRequest any, TResponse any] interface { Handle(ctx context.Context, request TRequest) (TResponse, error) } @@ -15,8 +17,13 @@ type NotificationHandler[TNotification any] interface { Handle(ctx context.Context, notification TNotification) error } +type PipelineBehavior interface { + Handle(ctx context.Context, request interface{}, next RequestHandlerFunc) (interface{}, error) +} + var requestHandlersRegistrations = map[reflect.Type]interface{}{} var notificationHandlersRegistrations = map[reflect.Type][]interface{}{} +var pipelineBehaviours []interface{} type Unit struct{} @@ -36,8 +43,19 @@ func RegisterRequestHandler[TRequest any, TResponse any](handler RequestHandler[ return nil } -// RegisterRequestBehavior TODO -func RegisterRequestBehavior(b interface{}) error { +// RegisterRequestPipelineBehaviors register the request behaviors to mediatr registry. +func RegisterRequestPipelineBehaviors(behaviours ...PipelineBehavior) error { + for _, behavior := range behaviours { + behaviorType := reflect.TypeOf(behavior) + + existsPipe := existsPipeType(behaviorType) + if existsPipe { + return errors.Errorf("registered behavior already exists in the registry.") + } + + pipelineBehaviours = append(pipelineBehaviours, behavior) + } + return nil } @@ -75,22 +93,51 @@ func RegisterNotificationHandlers[TEvent any](handlers ...NotificationHandler[TE // Send the request to its corresponding request handler. func Send[TRequest any, TResponse any](ctx context.Context, request TRequest) (TResponse, error) { - requestType := reflect.TypeOf(request) - + var response TResponse handler, ok := requestHandlersRegistrations[requestType] if !ok { - return *new(TResponse), errors.Errorf("no handlers for command %T", request) + return *new(TResponse), errors.Errorf("no handlers for request %T", request) } handlerValue, ok := handler.(RequestHandler[TRequest, TResponse]) if !ok { - return *new(TResponse), errors.Errorf("handler for command %T is not a Handler", request) + return *new(TResponse), errors.Errorf("handler for request %T is not a Handler", request) } - response, err := handlerValue.Handle(ctx, request) - if err != nil { - return *new(TResponse), errors.Wrap(err, "error handling request") + if len(pipelineBehaviours) > 0 { + var reversPipes = reversOrder(pipelineBehaviours) + + var lastHandler RequestHandlerFunc = func() (interface{}, error) { + return handlerValue.Handle(ctx, request) + } + + aggregateResult := linq.From(reversPipes).AggregateWithSeed(lastHandler, func(next interface{}, pipe interface{}) interface{} { + pipeValue := pipe.(PipelineBehavior) + nexValue := next.(RequestHandlerFunc) + + var handlerFunc RequestHandlerFunc = func() (interface{}, error) { + return pipeValue.Handle(ctx, request, nexValue) + } + + return handlerFunc + }) + + v := aggregateResult.(RequestHandlerFunc) + response, err := v() + + if err != nil { + return *new(TResponse), errors.Wrap(err, "error handling request") + } + + return response.(TResponse), nil + } else { + res, err := handlerValue.Handle(ctx, request) + if err != nil { + return *new(TResponse), errors.Wrap(err, "error handling request") + } + + response = res } return response, nil @@ -119,3 +166,23 @@ func Publish[TNotification any](ctx context.Context, notification TNotification) return nil } + +func reversOrder(values []interface{}) []interface{} { + var reverseValues []interface{} + + for i := len(values) - 1; i >= 0; i-- { + reverseValues = append(reverseValues, values[i]) + } + + return reverseValues +} + +func existsPipeType(p reflect.Type) bool { + for _, pipe := range pipelineBehaviours { + if reflect.TypeOf(pipe) == p { + return true + } + } + + return false +} diff --git a/mediatr_test.go b/mediatr_test.go index 8c4904b..9b0e49a 100644 --- a/mediatr_test.go +++ b/mediatr_test.go @@ -3,11 +3,14 @@ package mediatr import ( "context" "fmt" + "github.com/goccy/go-reflect" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "testing" ) +var testData []string + func Test_RegisterRequestHandler_Should_Return_Error_If_Handler_Already_Registered(t *testing.T) { expectedErr := fmt.Sprintf("registered handler already exists in the registry for message %s", "*mediatr.RequestTest") handler1 := &RequestTestHandler{} @@ -17,6 +20,9 @@ func Test_RegisterRequestHandler_Should_Return_Error_If_Handler_Already_Register assert.Nil(t, err1) assert.Containsf(t, err2.Error(), expectedErr, "expected error containing %q, got %s", expectedErr, err2) + + count := len(requestHandlersRegistrations) + assert.Equal(t, 1, count) } func Test_RegisterRequestHandler_Should_Register_All_Handlers(t *testing.T) { @@ -25,8 +31,17 @@ func Test_RegisterRequestHandler_Should_Register_All_Handlers(t *testing.T) { err1 := RegisterRequestHandler[*RequestTest, *ResponseTest](handler1) err2 := RegisterRequestHandler[*RequestTest2, *ResponseTest2](handler2) - assert.Nil(t, err1) - assert.Nil(t, err2) + if err1 != nil { + t.Errorf("error registering request handler: %s", err1) + } + + if err2 != nil { + t.Errorf("error registering request handler: %s", err2) + } + + count := len(requestHandlersRegistrations) + assert.Equal(t, 2, count) + } func Test_Send_Should_Throw_Error_If_No_Handler_Registered(t *testing.T) { @@ -46,16 +61,39 @@ func Test_Send_Should_Return_Error_If_Handler_Returns_Error(t *testing.T) { assert.Containsf(t, err.Error(), expectedErr, "expected error containing %q, got %s", expectedErr, err) } -func Test_Send_Should_Dispatch_Request_To_Handler_And_Get_Response(t *testing.T) { +func Test_Send_Should_Dispatch_Request_To_Handler_And_Get_Response_Without_Pipeline(t *testing.T) { + handler := &RequestTestHandler{} + errRegister := RegisterRequestHandler[*RequestTest, *ResponseTest](handler) + if errRegister != nil { + t.Error(errRegister) + } + + response, err := Send[*RequestTest, *ResponseTest](context.Background(), &RequestTest{Data: "test"}) + assert.Nil(t, err) + assert.IsType(t, &ResponseTest{}, response) + assert.Equal(t, "test", response.Data) +} + +func Test_Send_Should_Dispatch_Request_To_Handler_And_Get_Response_With_Pipeline(t *testing.T) { + pip1 := &PipelineBehaviourTest{} + pip2 := &PipelineBehaviourTest2{} + err := RegisterRequestPipelineBehaviors(pip1, pip2) + if err != nil { + t.Errorf("error registering request pipeline behaviors: %s", err) + } + handler := &RequestTestHandler{} errRegister := RegisterRequestHandler[*RequestTest, *ResponseTest](handler) if errRegister != nil { t.Error(errRegister) } + response, err := Send[*RequestTest, *ResponseTest](context.Background(), &RequestTest{Data: "test"}) assert.Nil(t, err) assert.IsType(t, &ResponseTest{}, response) assert.Equal(t, "test", response.Data) + assert.Contains(t, testData, "PipelineBehaviourTest") + assert.Contains(t, testData, "PipelineBehaviourTest2") } func Test_RegisterNotificationHandler_Should_Register_Multiple_Handler_For_Notification(t *testing.T) { @@ -64,8 +102,15 @@ func Test_RegisterNotificationHandler_Should_Register_Multiple_Handler_For_Notif err1 := RegisterNotificationHandler[*NotificationTest](handler1) err2 := RegisterNotificationHandler[*NotificationTest](handler2) - assert.Nil(t, err1) - assert.Nil(t, err2) + if err1 != nil { + t.Errorf("error registering notification handler: %s", err1) + } + if err2 != nil { + t.Errorf("error registering notification handler: %s", err2) + } + + count := len(notificationHandlersRegistrations[reflect.TypeOf(&NotificationTest{})]) + assert.Equal(t, 2, count) } func Test_RegisterNotificationHandlers_Should_Register_Multiple_Handler_For_Notification(t *testing.T) { @@ -74,7 +119,12 @@ func Test_RegisterNotificationHandlers_Should_Register_Multiple_Handler_For_Noti handler3 := &NotificationTestHandler4{} err := RegisterNotificationHandlers[*NotificationTest](handler1, handler2, handler3) - assert.Nil(t, err) + if err != nil { + t.Errorf("error registering notification handlers: %s", err) + } + + count := len(notificationHandlersRegistrations[reflect.TypeOf(&NotificationTest{})]) + assert.Equal(t, 3, count) } func Test_Publish_Should_Throw_Error_If_No_Handler_Registered(t *testing.T) { @@ -104,8 +154,35 @@ func Test_Publish_Should_Dispatch_Notification_To_All_Handlers_Without_Any_Respo if errRegister != nil { t.Error(errRegister) } - err := Publish[*NotificationTest](context.Background(), &NotificationTest{}) + + notification := &NotificationTest{} + err := Publish[*NotificationTest](context.Background(), notification) assert.Nil(t, err) + assert.True(t, notification.Processed) +} + +func Test_Register_Behaviours_Should_Register_Behaviours_In_The_Registry_Correctly(t *testing.T) { + pip1 := &PipelineBehaviourTest{} + pip2 := &PipelineBehaviourTest2{} + + err := RegisterRequestPipelineBehaviors(pip1, pip2) + if err != nil { + t.Errorf("error registering behaviours: %s", err) + } + + count := len(pipelineBehaviours) + assert.Equal(t, 2, count) +} + +func Test_Register_Duplicate_Behaviours_Should_Throw_Error(t *testing.T) { + pip1 := &PipelineBehaviourTest{} + pip2 := &PipelineBehaviourTest{} + err := RegisterRequestPipelineBehaviors(pip1, pip2) + if err == nil { + t.Errorf("expected error, got nil") + } + + assert.Contains(t, err.Error(), "registered behavior already exists in the registry") } /////////////////////////////////////////////////////////////////////////////////////////////// @@ -121,6 +198,9 @@ type RequestTestHandler struct { } func (c *RequestTestHandler) Handle(ctx context.Context, request *RequestTest) (*ResponseTest, error) { + fmt.Println("RequestTestHandler.Handled") + testData = append(testData, "RequestTestHandler") + return &ResponseTest{Data: request.Data}, nil } @@ -137,6 +217,9 @@ type RequestTestHandler2 struct { } func (c *RequestTestHandler2) Handle(ctx context.Context, request *RequestTest2) (*ResponseTest2, error) { + fmt.Println("RequestTestHandler2.Handled") + testData = append(testData, "RequestTestHandler2") + return &ResponseTest2{Data: request.Data}, nil } @@ -150,25 +233,35 @@ func (c *RequestTestHandler3) Handle(ctx context.Context, request *RequestTest2) /////////////////////////////////////////////////////////////////////////////////////////////// type NotificationTest struct { - Data string + Data string + Processed bool } type NotificationTestHandler struct { } func (c *NotificationTestHandler) Handle(ctx context.Context, notification *NotificationTest) error { + notification.Processed = true + fmt.Println("NotificationTestHandler.Handled") + testData = append(testData, "NotificationTestHandler") + return nil } /////////////////////////////////////////////////////////////////////////////////////////////// type NotificationTest2 struct { - Data string + Data string + Processed bool } type NotificationTestHandler2 struct { } func (c *NotificationTestHandler2) Handle(ctx context.Context, notification *NotificationTest2) error { + notification.Processed = true + fmt.Println("NotificationTestHandler2.Handled") + testData = append(testData, "NotificationTestHandler2") + return nil } @@ -186,5 +279,41 @@ type NotificationTestHandler4 struct { } func (c *NotificationTestHandler4) Handle(ctx context.Context, notification *NotificationTest) error { + notification.Processed = true + fmt.Println("NotificationTestHandler4.Handled") + testData = append(testData, "NotificationTestHandler4") + return nil } + +/////////////////////////////////////////////////////////////////////////////////////////////// +type PipelineBehaviourTest struct { +} + +func (c *PipelineBehaviourTest) Handle(ctx context.Context, request interface{}, next RequestHandlerFunc) (interface{}, error) { + fmt.Println("PipelineBehaviourTest.Handled") + testData = append(testData, "PipelineBehaviourTest") + + res, err := next() + if err != nil { + return nil, err + } + + return res, nil +} + +/////////////////////////////////////////////////////////////////////////////////////////////// +type PipelineBehaviourTest2 struct { +} + +func (c *PipelineBehaviourTest2) Handle(ctx context.Context, request interface{}, next RequestHandlerFunc) (interface{}, error) { + fmt.Println("PipelineBehaviourTest2.Handled") + testData = append(testData, "PipelineBehaviourTest2") + + res, err := next() + if err != nil { + return nil, err + } + + return res, nil +} diff --git a/readme.md b/readme.md index 8247474..2173da2 100644 --- a/readme.md +++ b/readme.md @@ -1,12 +1,12 @@ -
- +
> This package is a `Mediator Pattern` implementation in golang, and inspired by great [jbogard/mediatr](https://github.com/jbogard/mediatr) library in .Net. @@ -261,4 +261,48 @@ mediatr.Publish[*events.ProductCreatedEvent](ctx, productCreatedEvent) ``` ## Using Pipeline Behaviors -TODO +Sometimes we need to add some cross-cutting concerns before after running our request handlers like logging, metrics, circuit breaker, retry, etc. In this case we can use `PipelineBehavior`. It is actually is like a middleware or [decorator pattern](https://refactoring.guru/design-patterns/decorator). + +These behaviors will execute before or after running our request handlers with calling `Send` method for a request on the mediatr. + +### Creating Pipeline Behavior +For creating a pipeline behaviour we should implement the `PipelineBehavior` interface: + +``` go +type PipelineBehavior interface { + Handle(ctx context.Context, request interface{}, next RequestHandlerFunc) (interface{}, error) +} +``` +The `request` parameter is the request object passed in through `Send` method of mediatr, while the `next` parameter is a continuation for the next action in the behavior chain and its type is `RequestHandlerFunc`. + +Here is an example of a pipeline behavior: + +```go +type RequestLoggerBehaviour struct { +} + +func (r *RequestLoggerBehaviour) Handle(ctx context.Context, request interface{}, next mediatr.RequestHandlerFunc) (interface{}, error) { + log.Printf("logging some stuff before handling the request") + + response, err := next() + if err != nil { + return nil, err + } + + log.Println("logging some stuff after handling the request") + + return response, nil +} +``` +In our defined behavior, we need to call `next` parameter that call next action in the behavior chain, if there aren't any other behaviours `next` will call our `actual request handler` and return the response. We can do something before of after of calling next action in the behavior chain. + +### Registering Pipeline Behavior to the MediatR + +For registering our pipeline behavior to the MediatR, we should use `RegisterPipelineBehaviors` method: + +```go +loggerPipeline := &behaviours.RequestLoggerBehaviour{} +err = mediatr.RegisterRequestPipelineBehaviors(loggerPipeline) +``` + +