From 81fc82713f5ef18c1b342660e1db08e4d0a96d58 Mon Sep 17 00:00:00 2001 From: Luis De Anda Date: Fri, 8 Dec 2023 22:07:54 -0600 Subject: [PATCH 1/5] Add `retryIntervalOverrides` option to override the duration of all or specific retriers in a `Retry` field --- src/stateMachine/StateExecutor.ts | 26 +++++++++++++++++------ src/typings/StateMachineImplementation.ts | 10 +++++++++ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/stateMachine/StateExecutor.ts b/src/stateMachine/StateExecutor.ts index 3dd55c6..7c625f8 100644 --- a/src/stateMachine/StateExecutor.ts +++ b/src/stateMachine/StateExecutor.ts @@ -1,7 +1,7 @@ import type { AllStates } from '../typings/AllStates'; import type { ActionResult } from '../typings/StateActions'; import type { RetryResult, CatchResult, StateHandlers } from '../typings/StateExecutor'; -import type { ExecuteOptions } from '../typings/StateMachineImplementation'; +import type { ExecuteOptions, RetryIntervalOverrides } from '../typings/StateMachineImplementation'; import type { ErrorOutput } from '../typings/ErrorHandling'; import type { JSONValue } from '../typings/JSONValue'; import type { TaskState } from '../typings/TaskState'; @@ -142,7 +142,10 @@ export class StateExecutor { ); // Handle `Retry` logic - const { shouldRetry, waitTimeBeforeRetry, retrierIndex } = this.shouldRetry(error as RuntimeError); + const { shouldRetry, waitTimeBeforeRetry, retrierIndex } = this.shouldRetry( + error as RuntimeError, + options.runOptions?.overrides?.retryIntervalOverrides + ); if (shouldRetry) { const stateDefinition = this.stateDefinition as TaskState | MapState | ParallelState; @@ -221,22 +224,33 @@ export class StateExecutor { /** * Decide whether this state should be retried, according to the `Retry` field. */ - private shouldRetry(error: RuntimeError): RetryResult { + private shouldRetry(error: RuntimeError, retryIntervalOverrides?: RetryIntervalOverrides): RetryResult { if (!('Retry' in this.stateDefinition)) { return { shouldRetry: false }; } for (let i = 0; i < this.stateDefinition.Retry.length; i++) { const retrier = this.stateDefinition.Retry[i]; + + let intervalOverride = null; + if (retryIntervalOverrides?.[this.stateName] !== undefined) { + const override = retryIntervalOverrides[this.stateName]; + if (typeof override === 'number') { + intervalOverride = override / 1000; + } else if (override[i] !== undefined && override[i] >= 0) { + intervalOverride = override[i] / 1000; + } + } + const jitterStrategy = retrier.JitterStrategy ?? DEFAULT_JITTER_STRATEGY; const maxAttempts = retrier.MaxAttempts ?? DEFAULT_MAX_ATTEMPTS; const intervalSeconds = retrier.IntervalSeconds ?? DEFAULT_INTERVAL_SECONDS; const backoffRate = retrier.BackoffRate ?? DEFAULT_BACKOFF_RATE; - const waitInterval = intervalSeconds * Math.pow(backoffRate, this.retrierAttempts[i]); + const waitInterval = intervalOverride ?? intervalSeconds * Math.pow(backoffRate, this.retrierAttempts[i]); const retryable = error.isRetryable ?? true; - let waitTimeBeforeRetry = clamp(waitInterval, 1, retrier.MaxDelaySeconds) * 1000; - if (jitterStrategy === 'FULL') { + let waitTimeBeforeRetry = clamp(waitInterval, 0, retrier.MaxDelaySeconds) * 1000; + if (jitterStrategy === 'FULL' && intervalOverride === null) { waitTimeBeforeRetry = getRandomNumber(0, waitTimeBeforeRetry); } diff --git a/src/typings/StateMachineImplementation.ts b/src/typings/StateMachineImplementation.ts index fc79c32..0647466 100644 --- a/src/typings/StateMachineImplementation.ts +++ b/src/typings/StateMachineImplementation.ts @@ -12,6 +12,10 @@ export type WaitStateTimeOverride = { [waitStateName: string]: number; }; +export type RetryIntervalOverrides = { + [retryableStateName: string]: number | number[]; +}; + interface Overrides { /** * Pass an object to this option to override a `Task` state to run a local function, @@ -24,6 +28,12 @@ interface Overrides { * instead of pausing for the duration specified by the `Seconds`, `Timestamp`, `SecondsPath`, or `TimestampPath` fields. */ waitTimeOverrides?: WaitStateTimeOverride; + + /** + * Pass an object to this option to override the duration in milliseconds a retrier in a `Retry` field waits before retrying the state, + * instead of pausing for the duration calculated by the `IntervalSeconds`, `BackoffRate`, `MaxDelaySeconds`, and `JitterStrategy` fields. + */ + retryIntervalOverrides?: RetryIntervalOverrides; } export interface ValidationOptions { From 17e9abab4877e26493a355c6388fd2c1f90a108d Mon Sep 17 00:00:00 2001 From: Luis De Anda Date: Fri, 8 Dec 2023 22:18:32 -0600 Subject: [PATCH 2/5] Add tests for `retryIntervalOverrides` option --- __tests__/StateExecutor.test.ts | 114 ++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/__tests__/StateExecutor.test.ts b/__tests__/StateExecutor.test.ts index a5d6eb7..8dbdd3a 100644 --- a/__tests__/StateExecutor.test.ts +++ b/__tests__/StateExecutor.test.ts @@ -280,6 +280,120 @@ describe('State Executor', () => { expect(sleepFnMock).toHaveBeenNthCalledWith(3, expect.numberBetween(0, 4000), abortSignal); }); + test('should wait for the specified amount of milliseconds if retry interval override option is set and is a single number', async () => { + const stateDefinition: TaskState = { + Type: 'Task', + Resource: 'mock-arn', + Retry: [ + { + ErrorEquals: ['CustomError'], + IntervalSeconds: 3, + MaxDelaySeconds: 8, + }, + ], + End: true, + }; + const input = {}; + const context = {}; + const abortSignal = new AbortController().signal; + let retryCount = 0; + + const stateExecutor = new StateExecutor('TaskState', stateDefinition); + const { stateResult } = await stateExecutor.execute(input, context, { + abortSignal, + eventLogger: new EventLogger(), + stateMachineOptions: undefined, + runOptions: { + overrides: { + taskResourceLocalHandlers: { + TaskState: async () => { + if (retryCount < defaultMaxRetries) { + retryCount++; + throw new CustomError('Task state failed'); + } + + return 1; + }, + }, + retryIntervalOverrides: { + TaskState: 100, + }, + }, + }, + }); + + expect(stateResult).toBe(1); + expect(sleepFnMock).toHaveBeenNthCalledWith(1, 100, abortSignal); + expect(sleepFnMock).toHaveBeenNthCalledWith(2, 100, abortSignal); + expect(sleepFnMock).toHaveBeenNthCalledWith(3, 100, abortSignal); + }); + + test('should wait for the specified amount of milliseconds if retry interval override option is set and is an array', async () => { + const stateDefinition: TaskState = { + Type: 'Task', + Resource: 'mock-arn', + Retry: [ + { + ErrorEquals: ['CustomError'], + IntervalSeconds: 3, + MaxDelaySeconds: 8, + }, + { + ErrorEquals: ['SyntaxError'], + IntervalSeconds: 3, + MaxDelaySeconds: 8, + }, + { + ErrorEquals: ['RangeError'], + IntervalSeconds: 3, + MaxDelaySeconds: 8, + }, + ], + End: true, + }; + const input = {}; + const context = {}; + const abortSignal = new AbortController().signal; + let retryCount = 0; + + const stateExecutor = new StateExecutor('TaskState', stateDefinition); + const { stateResult } = await stateExecutor.execute(input, context, { + abortSignal, + eventLogger: new EventLogger(), + stateMachineOptions: undefined, + runOptions: { + overrides: { + taskResourceLocalHandlers: { + TaskState: async () => { + if (retryCount === 0) { + retryCount++; + throw new CustomError('Task state failed'); + } + if (retryCount === 1) { + retryCount++; + throw new SyntaxError('Task state failed'); + } + if (retryCount === 2) { + retryCount++; + throw new RangeError('Task state failed'); + } + + return 1; + }, + }, + retryIntervalOverrides: { + TaskState: [50, 125, 250], + }, + }, + }, + }); + + expect(stateResult).toBe(1); + expect(sleepFnMock).toHaveBeenNthCalledWith(1, 50, abortSignal); + expect(sleepFnMock).toHaveBeenNthCalledWith(2, 125, abortSignal); + expect(sleepFnMock).toHaveBeenNthCalledWith(3, 250, abortSignal); + }); + describe('Task state', () => { test('should retry state if retrier specifies `States.TaskFailed` error name', async () => { const stateDefinition: TaskState = { From a42832ff68d7e24da9c30c14272fe54712741860 Mon Sep 17 00:00:00 2001 From: Luis De Anda Date: Tue, 12 Dec 2023 19:05:00 -0600 Subject: [PATCH 3/5] Add example for retry interval override --- examples/README.md | 1 + examples/retry-interval-local-override.js | 63 +++++++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 examples/retry-interval-local-override.js diff --git a/examples/README.md b/examples/README.md index da9170c..0208d4a 100644 --- a/examples/README.md +++ b/examples/README.md @@ -13,5 +13,6 @@ Here you can find some useful examples demonstrating specific usages of AWS Loca - [disable-jsonpath-validation](./disable-jsonpath-validation.js): Disable JSONPath validation when instantiating a new `StateMachine` object. - [disable-state-machine-validation](./disable-state-machine-validation.js): Completely disable validation of the state machine definition when instantiating a new `StateMachine` object. - [execution-event-logs](./execution-event-logs.js): Pulling the log of events produced by an execution as it runs and printing them. +- [retry-interval-local-override](./retry-interval-local-override.js): Override the pause duration of a `Retry` policy so that instead of pausing for the duration calculated by the `IntervalSeconds`, `BackoffRate`, `MaxDelaySeconds`, and `JitterStrategy` fields, it pauses for a specified number of milliseconds. - [task-state-local-override](./task-state-local-override.js): Override the default action for a `Task` state, so that instead of invoking the Lambda specified in the `Resource` field, it runs a local function. This allows running state machines completely locally. - [wait-state-local-override](./wait-state-local-override.js): Override the wait duration of a `Wait` state so that instead of waiting the duration specified in the `Seconds`, `Timestamp`, `SecondsPath`, `TimestampPath` fields, it waits for a specified number of milliseconds. diff --git a/examples/retry-interval-local-override.js b/examples/retry-interval-local-override.js new file mode 100644 index 0000000..35802bb --- /dev/null +++ b/examples/retry-interval-local-override.js @@ -0,0 +1,63 @@ +import { StateMachine } from 'aws-local-stepfunctions'; + +const machineDefinition = { + StartAt: 'RetryTask', + States: { + RetryTask: { + // A state of type `Task` with a `Retry` policy that defines 3 retriers: + // one for `SyntaxError`, another one for `RangeError`, and the last one for all errors. + Type: 'Task', + Resource: 'arn:aws:lambda:us-east-1:123456789012:function:RetryTask', + Retry: [ + { + ErrorEquals: ['SyntaxError'], + }, + { + ErrorEquals: ['RangeError'], + }, + { + ErrorEquals: ['States.ALL'], + }, + ], + End: true, + }, + }, +}; + +const stateMachine = new StateMachine(machineDefinition); +const myInput = { num1: 10, num2: 20 }; +const execution = stateMachine.run(myInput, { + overrides: { + taskResourceLocalHandlers: { + RetryTask: (input) => { + const randNum = Math.random(); + if (randNum < 0.3) { + throw new SyntaxError('Syntax error'); + } else if (randNum > 0.3 && randNum < 0.6) { + throw new RangeError('Range error'); + } else { + throw new Error('Unknown error'); + } + }, + }, + // Property `retryIntervalOverrides` lets you override the pause duration of a `Retry` policy before the state is retried again, + // by specifying the Task/Parallel/Map state name as the key and the duration override as value (represented in milliseconds): + retryIntervalOverrides: { + RetryTask: 50, // pause for 50 milliseconds before retrying the `RetryTask` state, instead of the default 1, 2, and 4 seconds. + + // Alternatively, you can also pass an array to specify the override for each retrier. + // Pass the duration overrides in the same order as the retriers you've defined for the `Retry` field array. + // If you don't want to override a retrier, pass -1 to indicate that the retrier at that index should not be overridden. + // Uncomment the following line to test it out: + // RetryTask: [-1, 500, 0], // don't override the `SyntaxError` retrier, pause 500ms for the `RangeError` retrier, and don't pause at all for the `States.ALL` retrier + }, + }, +}); + +for await (const eventLog of execution.eventLogs) { + console.log(eventLog); + console.log('--------------------------------------------------'); +} + +const result = await execution.result; +console.log(result); From b7033d289ad384d1cec6e0c4c8b92569c0f24785 Mon Sep 17 00:00:00 2001 From: Luis De Anda Date: Tue, 12 Dec 2023 19:26:35 -0600 Subject: [PATCH 4/5] Document non-spec feature for retry interval override --- docs/feature-support.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/feature-support.md b/docs/feature-support.md index fd27c0c..ce20486 100644 --- a/docs/feature-support.md +++ b/docs/feature-support.md @@ -8,6 +8,7 @@ - [Non-spec features](#non-spec-features) - [Task resource override](#task-state-resource-override) - [Wait duration override](#wait-state-duration-override) + - [Retry interval override](#retry-field-interval-override) - [Abort a running execution](#abort-a-running-execution) - [AWS config for Lambda functions](#providing-aws-credentials-and-region-to-execute-lambda-functions-specified-in-task-states) - [Execution event logs](#execution-event-logs) @@ -152,6 +153,19 @@ Nonetheless, if you want to override the duration of the wait period of a `Wait` An example usage of this feature can be found [here](/examples/wait-state-local-override.js). +### `Retry` field interval override + +As defined by the spec, retriers in a `Retry` field will pause the execution for a certain amount of time before retrying the failed `Task`/`Parallel`/`Map` state, based on the `IntervalSeconds`, `BackoffRate`, `MaxDelaySeconds`, and `JitterStrategy` fields. + +If you wish to override the duration of this pause, you can do so by specifying the `overrides.retryIntervalOverrides` option of the [`StateMachine.run`](/README.md#statemachineruninput-options) method. This option expects an object that maps state names to numbers or arrays: + +- If you pass a number as value, this value represents the number of milliseconds that the overridden `Retry` field will pause the execution for. This applies to all retriers in the array. +- If you pass an array as value, each value in the array should be a number and it represents how many milliseconds to pause the execution for the retrier at that index, i.e., the value at index 0 applies for the retrier at index 0, the value at index 1 applies for the retrier at index 1, and so forth. If don't want to override a retrier, simply set the value to `-1`, to indicate that the retrier at that index should not be overridden. + +In a similar fashion to the [`Wait` state duration override](#wait-state-duration-override), you can pass a duration of `0` to indicate that the overridden retrier should not pause the execution at all before retrying the failed state. + +An example usage of this feature can be found [here](/examples/retry-interval-local-override.js). + ### Abort a running execution If for some reason you need to abort an execution in progress, you can do so by calling the `abort` method that is part of the object returned by the `StateMachine.run` method. From 329cb546d05eb54b104cfbf54b3d8d042a85ebec Mon Sep 17 00:00:00 2001 From: Luis De Anda Date: Tue, 12 Dec 2023 19:27:31 -0600 Subject: [PATCH 5/5] Document `retryIntervalOverrides` option for `run` method --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 59f469e..dbfa786 100644 --- a/README.md +++ b/README.md @@ -160,6 +160,7 @@ Each execution is independent of all others, meaning that you can concurrently c - `overrides?`: An object to override the behavior of certain states: - `taskResourceLocalHandlers?`: An [object that overrides](/docs/feature-support.md#task-state-resource-override) the resource of the specified `Task` states to run a local function. - `waitTimeOverrides?`: An [object that overrides](/docs/feature-support.md#wait-state-duration-override) the wait duration of the specified `Wait` states. The specified override duration should be in milliseconds. + - `retryIntervalOverrides?`: An [object that overrides](/docs/feature-support.md#retry-field-interval-override) the pause duration of the specified state's `Retry` field. The specified override duration should be a number in milliseconds; or an array of numbers, where each number represents milliseconds. - `noThrowOnAbort?`: If this option is set to `true`, aborting the execution will simply return `null` as result instead of throwing. - `context?`: An object that will be used as the [Context Object](https://docs.aws.amazon.com/step-functions/latest/dg/input-output-contextobject.html) for the execution. If not passed, the Context Object will default to an empty object. This option is useful to mock the Context Object in case your definition references it in a JSONPath.