Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/override retry policy #85

Merged
merged 5 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ Each execution is independent of all others, meaning that you can concurrently c
- `overrides?`: An object to override the behavior of certain states:
- `taskResourceLocalHandlers?`: An [object that overrides](/docs/feature-support.md#task-state-resource-override) the resource of the specified `Task` states to run a local function.
- `waitTimeOverrides?`: An [object that overrides](/docs/feature-support.md#wait-state-duration-override) the wait duration of the specified `Wait` states. The specified override duration should be in milliseconds.
- `retryIntervalOverrides?`: An [object that overrides](/docs/feature-support.md#retry-field-interval-override) the pause duration of the specified state's `Retry` field. The specified override duration should be a number in milliseconds; or an array of numbers, where each number represents milliseconds.
- `noThrowOnAbort?`: If this option is set to `true`, aborting the execution will simply return `null` as result instead of throwing.
- `context?`: An object that will be used as the [Context Object](https://docs.aws.amazon.com/step-functions/latest/dg/input-output-contextobject.html) for the execution. If not passed, the Context Object will default to an empty object. This option is useful to mock the Context Object in case your definition references it in a JSONPath.

Expand Down
114 changes: 114 additions & 0 deletions __tests__/StateExecutor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,120 @@ describe('State Executor', () => {
expect(sleepFnMock).toHaveBeenNthCalledWith(3, expect.numberBetween(0, 4000), abortSignal);
});

test('should wait for the specified amount of milliseconds if retry interval override option is set and is a single number', async () => {
const stateDefinition: TaskState = {
Type: 'Task',
Resource: 'mock-arn',
Retry: [
{
ErrorEquals: ['CustomError'],
IntervalSeconds: 3,
MaxDelaySeconds: 8,
},
],
End: true,
};
const input = {};
const context = {};
const abortSignal = new AbortController().signal;
let retryCount = 0;

const stateExecutor = new StateExecutor('TaskState', stateDefinition);
const { stateResult } = await stateExecutor.execute(input, context, {
abortSignal,
eventLogger: new EventLogger(),
stateMachineOptions: undefined,
runOptions: {
overrides: {
taskResourceLocalHandlers: {
TaskState: async () => {
if (retryCount < defaultMaxRetries) {
retryCount++;
throw new CustomError('Task state failed');
}

return 1;
},
},
retryIntervalOverrides: {
TaskState: 100,
},
},
},
});

expect(stateResult).toBe(1);
expect(sleepFnMock).toHaveBeenNthCalledWith(1, 100, abortSignal);
expect(sleepFnMock).toHaveBeenNthCalledWith(2, 100, abortSignal);
expect(sleepFnMock).toHaveBeenNthCalledWith(3, 100, abortSignal);
});

test('should wait for the specified amount of milliseconds if retry interval override option is set and is an array', async () => {
const stateDefinition: TaskState = {
Type: 'Task',
Resource: 'mock-arn',
Retry: [
{
ErrorEquals: ['CustomError'],
IntervalSeconds: 3,
MaxDelaySeconds: 8,
},
{
ErrorEquals: ['SyntaxError'],
IntervalSeconds: 3,
MaxDelaySeconds: 8,
},
{
ErrorEquals: ['RangeError'],
IntervalSeconds: 3,
MaxDelaySeconds: 8,
},
],
End: true,
};
const input = {};
const context = {};
const abortSignal = new AbortController().signal;
let retryCount = 0;

const stateExecutor = new StateExecutor('TaskState', stateDefinition);
const { stateResult } = await stateExecutor.execute(input, context, {
abortSignal,
eventLogger: new EventLogger(),
stateMachineOptions: undefined,
runOptions: {
overrides: {
taskResourceLocalHandlers: {
TaskState: async () => {
if (retryCount === 0) {
retryCount++;
throw new CustomError('Task state failed');
}
if (retryCount === 1) {
retryCount++;
throw new SyntaxError('Task state failed');
}
if (retryCount === 2) {
retryCount++;
throw new RangeError('Task state failed');
}

return 1;
},
},
retryIntervalOverrides: {
TaskState: [50, 125, 250],
},
},
},
});

expect(stateResult).toBe(1);
expect(sleepFnMock).toHaveBeenNthCalledWith(1, 50, abortSignal);
expect(sleepFnMock).toHaveBeenNthCalledWith(2, 125, abortSignal);
expect(sleepFnMock).toHaveBeenNthCalledWith(3, 250, abortSignal);
});

describe('Task state', () => {
test('should retry state if retrier specifies `States.TaskFailed` error name', async () => {
const stateDefinition: TaskState = {
Expand Down
14 changes: 14 additions & 0 deletions docs/feature-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- [Non-spec features](#non-spec-features)
- [Task resource override](#task-state-resource-override)
- [Wait duration override](#wait-state-duration-override)
- [Retry interval override](#retry-field-interval-override)
- [Abort a running execution](#abort-a-running-execution)
- [AWS config for Lambda functions](#providing-aws-credentials-and-region-to-execute-lambda-functions-specified-in-task-states)
- [Execution event logs](#execution-event-logs)
Expand Down Expand Up @@ -152,6 +153,19 @@ Nonetheless, if you want to override the duration of the wait period of a `Wait`

An example usage of this feature can be found [here](/examples/wait-state-local-override.js).

### `Retry` field interval override

As defined by the spec, retriers in a `Retry` field will pause the execution for a certain amount of time before retrying the failed `Task`/`Parallel`/`Map` state, based on the `IntervalSeconds`, `BackoffRate`, `MaxDelaySeconds`, and `JitterStrategy` fields.

If you wish to override the duration of this pause, you can do so by specifying the `overrides.retryIntervalOverrides` option of the [`StateMachine.run`](/README.md#statemachineruninput-options) method. This option expects an object that maps state names to numbers or arrays:

- If you pass a number as value, this value represents the number of milliseconds that the overridden `Retry` field will pause the execution for. This applies to all retriers in the array.
- If you pass an array as value, each value in the array should be a number and it represents how many milliseconds to pause the execution for the retrier at that index, i.e., the value at index 0 applies for the retrier at index 0, the value at index 1 applies for the retrier at index 1, and so forth. If don't want to override a retrier, simply set the value to `-1`, to indicate that the retrier at that index should not be overridden.

In a similar fashion to the [`Wait` state duration override](#wait-state-duration-override), you can pass a duration of `0` to indicate that the overridden retrier should not pause the execution at all before retrying the failed state.

An example usage of this feature can be found [here](/examples/retry-interval-local-override.js).

### Abort a running execution

If for some reason you need to abort an execution in progress, you can do so by calling the `abort` method that is part of the object returned by the `StateMachine.run` method.
Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ Here you can find some useful examples demonstrating specific usages of AWS Loca
- [disable-jsonpath-validation](./disable-jsonpath-validation.js): Disable JSONPath validation when instantiating a new `StateMachine` object.
- [disable-state-machine-validation](./disable-state-machine-validation.js): Completely disable validation of the state machine definition when instantiating a new `StateMachine` object.
- [execution-event-logs](./execution-event-logs.js): Pulling the log of events produced by an execution as it runs and printing them.
- [retry-interval-local-override](./retry-interval-local-override.js): Override the pause duration of a `Retry` policy so that instead of pausing for the duration calculated by the `IntervalSeconds`, `BackoffRate`, `MaxDelaySeconds`, and `JitterStrategy` fields, it pauses for a specified number of milliseconds.
- [task-state-local-override](./task-state-local-override.js): Override the default action for a `Task` state, so that instead of invoking the Lambda specified in the `Resource` field, it runs a local function. This allows running state machines completely locally.
- [wait-state-local-override](./wait-state-local-override.js): Override the wait duration of a `Wait` state so that instead of waiting the duration specified in the `Seconds`, `Timestamp`, `SecondsPath`, `TimestampPath` fields, it waits for a specified number of milliseconds.
63 changes: 63 additions & 0 deletions examples/retry-interval-local-override.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { StateMachine } from 'aws-local-stepfunctions';

const machineDefinition = {
StartAt: 'RetryTask',
States: {
RetryTask: {
// A state of type `Task` with a `Retry` policy that defines 3 retriers:
// one for `SyntaxError`, another one for `RangeError`, and the last one for all errors.
Type: 'Task',
Resource: 'arn:aws:lambda:us-east-1:123456789012:function:RetryTask',
Retry: [
{
ErrorEquals: ['SyntaxError'],
},
{
ErrorEquals: ['RangeError'],
},
{
ErrorEquals: ['States.ALL'],
},
],
End: true,
},
},
};

const stateMachine = new StateMachine(machineDefinition);
const myInput = { num1: 10, num2: 20 };
const execution = stateMachine.run(myInput, {
overrides: {
taskResourceLocalHandlers: {
RetryTask: (input) => {
const randNum = Math.random();
if (randNum < 0.3) {
throw new SyntaxError('Syntax error');
} else if (randNum > 0.3 && randNum < 0.6) {
throw new RangeError('Range error');
} else {
throw new Error('Unknown error');
}
},
},
// Property `retryIntervalOverrides` lets you override the pause duration of a `Retry` policy before the state is retried again,
// by specifying the Task/Parallel/Map state name as the key and the duration override as value (represented in milliseconds):
retryIntervalOverrides: {
RetryTask: 50, // pause for 50 milliseconds before retrying the `RetryTask` state, instead of the default 1, 2, and 4 seconds.

// Alternatively, you can also pass an array to specify the override for each retrier.
// Pass the duration overrides in the same order as the retriers you've defined for the `Retry` field array.
// If you don't want to override a retrier, pass -1 to indicate that the retrier at that index should not be overridden.
// Uncomment the following line to test it out:
// RetryTask: [-1, 500, 0], // don't override the `SyntaxError` retrier, pause 500ms for the `RangeError` retrier, and don't pause at all for the `States.ALL` retrier
},
},
});

for await (const eventLog of execution.eventLogs) {
console.log(eventLog);
console.log('--------------------------------------------------');
}

const result = await execution.result;
console.log(result);
26 changes: 20 additions & 6 deletions src/stateMachine/StateExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { AllStates } from '../typings/AllStates';
import type { ActionResult } from '../typings/StateActions';
import type { RetryResult, CatchResult, StateHandlers } from '../typings/StateExecutor';
import type { ExecuteOptions } from '../typings/StateMachineImplementation';
import type { ExecuteOptions, RetryIntervalOverrides } from '../typings/StateMachineImplementation';
import type { ErrorOutput } from '../typings/ErrorHandling';
import type { JSONValue } from '../typings/JSONValue';
import type { TaskState } from '../typings/TaskState';
Expand Down Expand Up @@ -142,7 +142,10 @@ export class StateExecutor {
);

// Handle `Retry` logic
const { shouldRetry, waitTimeBeforeRetry, retrierIndex } = this.shouldRetry(error as RuntimeError);
const { shouldRetry, waitTimeBeforeRetry, retrierIndex } = this.shouldRetry(
error as RuntimeError,
options.runOptions?.overrides?.retryIntervalOverrides
);
if (shouldRetry) {
const stateDefinition = this.stateDefinition as TaskState | MapState | ParallelState;

Expand Down Expand Up @@ -221,22 +224,33 @@ export class StateExecutor {
/**
* Decide whether this state should be retried, according to the `Retry` field.
*/
private shouldRetry(error: RuntimeError): RetryResult {
private shouldRetry(error: RuntimeError, retryIntervalOverrides?: RetryIntervalOverrides): RetryResult {
if (!('Retry' in this.stateDefinition)) {
return { shouldRetry: false };
}

for (let i = 0; i < this.stateDefinition.Retry.length; i++) {
const retrier = this.stateDefinition.Retry[i];

let intervalOverride = null;
if (retryIntervalOverrides?.[this.stateName] !== undefined) {
const override = retryIntervalOverrides[this.stateName];
if (typeof override === 'number') {
intervalOverride = override / 1000;
} else if (override[i] !== undefined && override[i] >= 0) {
intervalOverride = override[i] / 1000;
}
}

const jitterStrategy = retrier.JitterStrategy ?? DEFAULT_JITTER_STRATEGY;
const maxAttempts = retrier.MaxAttempts ?? DEFAULT_MAX_ATTEMPTS;
const intervalSeconds = retrier.IntervalSeconds ?? DEFAULT_INTERVAL_SECONDS;
const backoffRate = retrier.BackoffRate ?? DEFAULT_BACKOFF_RATE;
const waitInterval = intervalSeconds * Math.pow(backoffRate, this.retrierAttempts[i]);
const waitInterval = intervalOverride ?? intervalSeconds * Math.pow(backoffRate, this.retrierAttempts[i]);
const retryable = error.isRetryable ?? true;

let waitTimeBeforeRetry = clamp(waitInterval, 1, retrier.MaxDelaySeconds) * 1000;
if (jitterStrategy === 'FULL') {
let waitTimeBeforeRetry = clamp(waitInterval, 0, retrier.MaxDelaySeconds) * 1000;
if (jitterStrategy === 'FULL' && intervalOverride === null) {
waitTimeBeforeRetry = getRandomNumber(0, waitTimeBeforeRetry);
}

Expand Down
10 changes: 10 additions & 0 deletions src/typings/StateMachineImplementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ export type WaitStateTimeOverride = {
[waitStateName: string]: number;
};

export type RetryIntervalOverrides = {
[retryableStateName: string]: number | number[];
};

interface Overrides {
/**
* Pass an object to this option to override a `Task` state to run a local function,
Expand All @@ -24,6 +28,12 @@ interface Overrides {
* instead of pausing for the duration specified by the `Seconds`, `Timestamp`, `SecondsPath`, or `TimestampPath` fields.
*/
waitTimeOverrides?: WaitStateTimeOverride;

/**
* Pass an object to this option to override the duration in milliseconds a retrier in a `Retry` field waits before retrying the state,
* instead of pausing for the duration calculated by the `IntervalSeconds`, `BackoffRate`, `MaxDelaySeconds`, and `JitterStrategy` fields.
*/
retryIntervalOverrides?: RetryIntervalOverrides;
}

export interface ValidationOptions {
Expand Down