Skip to content

Commit 5c4793f

Browse files
authored
fix(Stripe): Routinely save state in Stripe payments workflow (#584)
* fix(Stripe): Routinely save state in Stripe payments workflow * Increase workflow timeout to 5 minutes * Convert StartToCloseTimeoutMinutes value to constant * Only update the activity timeout for FetchNext.* activities * Define activity timeout consts in models to avoid import cycle
1 parent d296a38 commit 5c4793f

File tree

8 files changed

+33
-6
lines changed

8 files changed

+33
-6
lines changed

internal/connectors/engine/workflow/context.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,26 @@ package workflow
33
import (
44
"time"
55

6+
"github.com/formancehq/payments/internal/models"
67
"go.temporal.io/sdk/temporal"
78
"go.temporal.io/sdk/workflow"
89
)
910

1011
func infiniteRetryContext(ctx workflow.Context) workflow.Context {
1112
return workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
12-
StartToCloseTimeout: 60 * time.Second,
13+
StartToCloseTimeout: models.ActivityStartToCloseTimeoutMinutesDefault * time.Minute,
14+
RetryPolicy: &temporal.RetryPolicy{
15+
InitialInterval: time.Second,
16+
BackoffCoefficient: 2,
17+
MaximumInterval: 100 * time.Second,
18+
NonRetryableErrorTypes: []string{},
19+
},
20+
})
21+
}
22+
23+
func fetchNextActivityRetryContext(ctx workflow.Context) workflow.Context {
24+
return workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
25+
StartToCloseTimeout: models.ActivityStartToCloseTimeoutMinutesLong * time.Minute,
1326
RetryPolicy: &temporal.RetryPolicy{
1427
InitialInterval: time.Second,
1528
BackoffCoefficient: 2,

internal/connectors/engine/workflow/fetch_accounts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (w Workflow) fetchAccounts(
5252
hasMore := true
5353
for hasMore {
5454
accountsResponse, err := activities.PluginFetchNextAccounts(
55-
infiniteRetryContext(ctx),
55+
fetchNextActivityRetryContext(ctx),
5656
fetchNextAccount.ConnectorID,
5757
fetchNextAccount.FromPayload.GetPayload(),
5858
state.State,

internal/connectors/engine/workflow/fetch_balances.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (w Workflow) fetchBalances(
5252
hasMore := true
5353
for hasMore {
5454
balancesResponse, err := activities.PluginFetchNextBalances(
55-
infiniteRetryContext(ctx),
55+
fetchNextActivityRetryContext(ctx),
5656
fetchNextBalances.ConnectorID,
5757
fetchNextBalances.FromPayload.GetPayload(),
5858
state.State,

internal/connectors/engine/workflow/fetch_external_accounts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (w Workflow) fetchExternalAccounts(
5252
hasMore := true
5353
for hasMore {
5454
externalAccountsResponse, err := activities.PluginFetchNextExternalAccounts(
55-
infiniteRetryContext(ctx),
55+
fetchNextActivityRetryContext(ctx),
5656
fetchNextExternalAccount.ConnectorID,
5757
fetchNextExternalAccount.FromPayload.GetPayload(),
5858
state.State,

internal/connectors/engine/workflow/fetch_others.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (w Workflow) fetchNextOthers(
5151
hasMore := true
5252
for hasMore {
5353
othersResponse, err := activities.PluginFetchNextOthers(
54-
infiniteRetryContext(ctx),
54+
fetchNextActivityRetryContext(ctx),
5555
fetchNextOthers.ConnectorID,
5656
fetchNextOthers.Name,
5757
fetchNextOthers.FromPayload.GetPayload(),

internal/connectors/engine/workflow/fetch_payments.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (w Workflow) fetchNextPayments(
5252
hasMore := true
5353
for hasMore {
5454
paymentsResponse, err := activities.PluginFetchNextPayments(
55-
infiniteRetryContext(ctx),
55+
fetchNextActivityRetryContext(ctx),
5656
fetchNextPayments.ConnectorID,
5757
fetchNextPayments.FromPayload.GetPayload(),
5858
state.State,

internal/connectors/plugins/public/stripe/client/payments.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package client
33
import (
44
"context"
55
"fmt"
6+
"time"
67

78
"github.com/formancehq/payments/internal/connectors/metrics"
9+
"github.com/formancehq/payments/internal/models"
810
"github.com/stripe/stripe-go/v79"
911
)
1012

@@ -27,11 +29,17 @@ func (c *client) GetPayments(
2729
) (results []*stripe.BalanceTransaction, _ Timeline, hasMore bool, err error) {
2830
results = make([]*stripe.BalanceTransaction, 0, int(pageSize))
2931

32+
timer := time.NewTimer((models.ActivityStartToCloseTimeoutMinutesLong - 1) * time.Minute)
33+
defer timer.Stop()
34+
3035
for {
3136
select {
3237
case <-ctx.Done():
3338
// if the app is shutting down
3439
return results, timeline, false, fmt.Errorf("context closed before first payment found")
40+
case <-timer.C:
41+
// after the timer expires let's save the state to prevent workflow timeout
42+
return results, timeline, true, nil
3543
default: //fallthrough
3644
}
3745

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package models
2+
3+
const (
4+
ActivityStartToCloseTimeoutMinutesDefault = 1
5+
ActivityStartToCloseTimeoutMinutesLong = 5
6+
)

0 commit comments

Comments
 (0)