Skip to content

Commit

Permalink
worflows: activity retry policy (#644)
Browse files Browse the repository at this point in the history
* worflows: activity retry policy

Signed-off-by: Fabian Martinez <[email protected]>

* adjust name

Signed-off-by: Fabian Martinez <[email protected]>

* fix build

Signed-off-by: Fabian Martinez <[email protected]>

* add tests

Signed-off-by: Fabian Martinez <[email protected]>

* register activity

Signed-off-by: Fabian Martinez <[email protected]>

---------

Signed-off-by: Fabian Martinez <[email protected]>
Co-authored-by: Mike Nguyen <[email protected]>
  • Loading branch information
famarting and mikeee authored Nov 14, 2024
1 parent 59acca4 commit c12c959
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 8 deletions.
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/marusama/semaphore/v2 v2.5.0 // indirect
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d // indirect
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
Expand Down
2 changes: 2 additions & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ expected_stdout_lines:
- '== APP == Worker initialized'
- '== APP == TestWorkflow registered'
- '== APP == TestActivity registered'
- '== APP == FailActivity registered'
- '== APP == runner started'
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
- '== APP == workflow paused'
- '== APP == workflow resumed'
- '== APP == stage: 1'
- '== APP == workflow event raised'
- '== APP == stage: 2'
- '== APP == fail activity executions: 3'
- '== APP == workflow status: COMPLETED'
- '== APP == workflow purged'
- '== APP == stage: 2'
Expand Down
30 changes: 30 additions & 0 deletions examples/workflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"context"
"errors"
"fmt"
"log"
"time"
Expand All @@ -24,6 +25,7 @@ import (
)

var stage = 0
var failActivityTries = 0

func main() {
w, err := workflow.NewWorker()
Expand All @@ -43,6 +45,11 @@ func main() {
}
fmt.Println("TestActivity registered")

if err := w.RegisterActivity(FailActivity); err != nil {
log.Fatal(err)
}
fmt.Println("FailActivity registered")

// Start workflow runner
if err := w.Start(); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -112,6 +119,15 @@ func main() {

fmt.Printf("stage: %d\n", stage)

waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = wfClient.WaitForWorkflowCompletion(waitCtx, instanceID)
cancel()
if err != nil {
log.Fatalf("failed to wait for workflow: %v", err)
}

fmt.Printf("fail activity executions: %d\n", failActivityTries)

respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
Expand Down Expand Up @@ -186,6 +202,15 @@ func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
return nil, err
}

if err := ctx.CallActivity(FailActivity, workflow.ActivityRetryPolicy(workflow.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 1 * time.Second,
})).Await(nil); err == nil {
return nil, fmt.Errorf("unexpected no error executing fail activity")
}

return output, nil
}

Expand All @@ -199,3 +224,8 @@ func TestActivity(ctx workflow.ActivityContext) (any, error) {

return fmt.Sprintf("Stage: %d", stage), nil
}

func FailActivity(ctx workflow.ActivityContext) (any, error) {
failActivityTries += 1
return nil, errors.New("dummy activity error")
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/go-chi/chi/v5 v5.1.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428
github.com/stretchr/testify v1.9.0
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
Expand Down
32 changes: 31 additions & 1 deletion workflow/activity_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package workflow
import (
"context"
"encoding/json"
"time"

"google.golang.org/protobuf/types/known/wrapperspb"

Expand All @@ -38,7 +39,16 @@ func (wfac *ActivityContext) Context() context.Context {
type callActivityOption func(*callActivityOptions) error

type callActivityOptions struct {
rawInput *wrapperspb.StringValue
rawInput *wrapperspb.StringValue
retryPolicy *RetryPolicy
}

type RetryPolicy struct {
MaxAttempts int
InitialRetryInterval time.Duration
BackoffCoefficient float64
MaxRetryInterval time.Duration
RetryTimeout time.Duration
}

// ActivityInput is an option to pass a JSON-serializable input
Expand All @@ -61,6 +71,26 @@ func ActivityRawInput(input string) callActivityOption {
}
}

func ActivityRetryPolicy(policy RetryPolicy) callActivityOption {
return func(opts *callActivityOptions) error {
opts.retryPolicy = &policy
return nil
}
}

func (opts *callActivityOptions) getRetryPolicy() *task.ActivityRetryPolicy {
if opts.retryPolicy == nil {
return nil
}
return &task.ActivityRetryPolicy{
MaxAttempts: opts.retryPolicy.MaxAttempts,
InitialRetryInterval: opts.retryPolicy.InitialRetryInterval,
BackoffCoefficient: opts.retryPolicy.BackoffCoefficient,
MaxRetryInterval: opts.retryPolicy.MaxRetryInterval,
RetryTimeout: opts.retryPolicy.RetryTimeout,
}
}

func marshalData(input any) ([]byte, error) {
if input == nil {
return nil, nil
Expand Down
22 changes: 22 additions & 0 deletions workflow/activity_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/microsoft/durabletask-go/task"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -69,6 +71,26 @@ func TestCallActivityOptions(t *testing.T) {
opts := returnCallActivityOptions(ActivityRawInput("test"))
assert.Equal(t, "test", opts.rawInput.GetValue())
})

t.Run("activity retry policy - set", func(t *testing.T) {
opts := returnCallActivityOptions(ActivityRetryPolicy(RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 2 * time.Second,
}))
assert.Equal(t, &task.ActivityRetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 2 * time.Second,
}, opts.getRetryPolicy())
})

t.Run("activity retry policy - empty", func(t *testing.T) {
opts := returnCallActivityOptions()
assert.Empty(t, opts.getRetryPolicy())
})
}

func returnCallActivityOptions(opts ...callActivityOption) callActivityOptions {
Expand Down
2 changes: 1 addition & 1 deletion workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (wfc *WorkflowContext) CallActivity(activity interface{}, opts ...callActiv
}
}

return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()))
return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()), task.WithRetryPolicy(options.getRetryPolicy()))
}

// CallChildWorkflow returns a completable task for a given workflow.
Expand Down

0 comments on commit c12c959

Please sign in to comment.