Skip to content

Commit

Permalink
feat: add AbortSignal options to retry and parallel (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
SaidbekAbdiganiev authored and radashi-bot committed Nov 12, 2024
1 parent 5f43e47 commit 0700644
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 12 deletions.
3 changes: 2 additions & 1 deletion .github/next-minor.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ The `####` headline should be short and descriptive of the new functionality. In

## New Features

####
#### Add `AbortSignal` options to retry and parallel
https://github.com/radashi-org/radashi/pull/262
33 changes: 32 additions & 1 deletion docs/async/parallel.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ since: 12.1.0

Like `_.map` but built specifically to run the async callback functions
in parallel. The first argument is a limit of how many functions should
be allowed to run at once. Returns an array of results.
be allowed to run at once.

Optionally, you can pass `AbortController.signal` as the last parameter to abort the operation if needed.

Returns an array of results.

```ts
import * as _ from 'radashi'
Expand Down Expand Up @@ -41,3 +45,30 @@ console.log(err) // => AggregateError
console.log(err.errors) // => [Error, Error, Error]
console.log(err.errors[1].message) // => No, I don't want to find user 2
```

When aborting, it will throw a single `Error` with the message `Operation aborted`.

```ts
import * as _ from 'radashi'

const userIds = [1, 2, 3, 4, 5, 6, 7, 8, 9]
const abortController = new AbortController()
const signal = abortController.signal

const [err, users] = await _.tryit(_.parallel)(
{
limit: 3,
signal: signal,
},
userIds,
async userId => {
return await api.users.find(userId)
},
)

// To abort the operation:
abortController.abort()

console.log(err) // => Error: Operation Aborted
console.log(err.message) // => "Operation Aborted"
```
22 changes: 22 additions & 0 deletions docs/async/retry.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ The `times` option defaults to `3`. The `delay` option (defaults to null) can sp

The `backoff` option is like delay but uses a function to sleep -- makes for easy exponential backoff.

The `signal` option allows you to pass an AbortSignal to abort the retry operation if needed.

```ts
import * as _ from 'radashi'

Expand All @@ -31,5 +33,25 @@ await _.retry({ times: 10 }, api.users.list) // try 10 times before failing
await _.retry({ times: 2, delay: 1000 }, api.users.list) // try 2 times with 1 second delay

// exponential backoff
await _.retry({ backoff: i => 10 ** i }, api.users.list)

// aborting the retry operation
const abortController = new AbortController()
const signal = abortController.signal

const promise = _.retry({ times: 3, delay: 1000, signal }, api.users.list)

// To abort the operation:
abortController.abort()

try {
await promise
} catch (err) {
if (err.message === 'Operation aborted') {
console.log('Retry operation was aborted')
}
}


await _.retry({ backoff: i => 10 ** i }, api.users.list) // try 3 times with 10, 100, 1000 ms delay
```
63 changes: 55 additions & 8 deletions src/async/parallel.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,88 @@
import { AggregateError, flat, fork, list, sort, tryit } from 'radashi'
import {
AggregateError,
flat,
fork,
isNumber,
list,
sort,
tryit,
} from 'radashi'

type AbortSignal = {
readonly aborted: boolean
addEventListener(type: 'abort', listener: () => void): void
removeEventListener(type: 'abort', listener: () => void): void
throwIfAborted(): void
}
type WorkItemResult<K> = {
index: number
result: K
error: any
}
export type ParallelOptions =
| {
limit: number
signal?: AbortSignal
}
| number

/**
* Executes many async functions in parallel. Returns the results from
* all functions as an array. After all functions have resolved, if
* any errors were thrown, they are rethrown in an instance of
* AggregateError.
* AggregateError. The operation can be aborted by passing optional AbortSignal,
* which will throw an Error if aborted.
*
* @see https://radashi.js.org/reference/async/parallel
* @example
* ```ts
* // Process images concurrently, resizing each image to a standard size.
* const images = await parallel(2, imageFiles, async (file) => {
* const abortController = new AbortController();
* const images = await parallel(
* {
* limit: 2,
* signal: abortController.signal,
* },
* imageFiles,
* async file => {
* return await resizeImage(file)
* })
*
* // To abort the operation:
* // abortController.abort()
* ```
* @version 12.1.0
*/
export async function parallel<T, K>(
limit: number,
options: ParallelOptions,
array: readonly T[],
func: (item: T) => Promise<K>,
): Promise<K[]> {
const work = array.map((item, index) => ({
index,
item,
}))

if (isNumber(options)) {
options = {
limit: options,
}
}

options.signal?.throwIfAborted()

// Process array items
const processor = async (res: (value: WorkItemResult<K>[]) => void) => {
const processor = async (
resolve: (value: WorkItemResult<K>[]) => void,
reject: (error: any) => void,
) => {
const results: WorkItemResult<K>[] = []
const abortListener = () => reject(new Error('This operation was aborted'))
options.signal?.addEventListener('abort', abortListener)
while (true) {
const next = work.pop()
if (!next) {
return res(results)
break
}
const [error, result] = await tryit(func)(next.item)
results.push({
Expand All @@ -45,9 +91,10 @@ export async function parallel<T, K>(
index: next.index,
})
}
options.signal?.removeEventListener('abort', abortListener)
return resolve(results)
}
// Create queues
const queues = list(1, limit).map(() => new Promise(processor))
const queues = list(1, options.limit).map(() => new Promise(processor))
// Wait for all queues to complete
const itemResults = (await Promise.all(queues)) as WorkItemResult<K>[][]
const [errors, results] = fork(
Expand Down
13 changes: 12 additions & 1 deletion src/async/retry.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import { sleep, tryit } from 'radashi'

type AbortSignal = {
throwIfAborted(): void
}

export type RetryOptions = {
times?: number
delay?: number | null
backoff?: (count: number) => number
signal?: AbortSignal
}

/**
Expand All @@ -12,9 +17,12 @@ export type RetryOptions = {
* @see https://radashi.js.org/reference/async/retry
* @example
* ```ts
* const result = await retry({ times: 3, delay: 1000 }, async () => {
* const abortController = new AbortController();
* const result = await retry({ times: 3, delay: 1000, signal: abortController.signal }, async () => {
* return await fetch('https://example.com')
* })
* // To abort the operation:
* // abortController.abort()
* ```
* @version 12.1.0
*/
Expand All @@ -25,11 +33,14 @@ export async function retry<TResponse>(
const times = options?.times ?? 3
const delay = options?.delay
const backoff = options?.backoff ?? null
const signal = options?.signal

let i = 0
while (true) {
const [err, result] = (await tryit(func)((err: any) => {
throw { _exited: err }
})) as [any, TResponse]
signal?.throwIfAborted()
if (!err) {
return result
}
Expand Down
71 changes: 71 additions & 0 deletions tests/async/parallel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,75 @@ describe('parallel', () => {
})
expect(Math.max(...tracking)).toBe(3)
})
test('aborts the operation when the signal is triggered', async () => {
const abortController = new AbortController()

setTimeout(() => abortController.abort(), 150)

const [error, results] = await _.try(async () => {
return _.parallel(
{
limit: 1,
signal: abortController.signal,
},
_.list(1, 12),
async num => {
await _.sleep(50)
return `hi_${num}`
},
)
})()

expect(results).toBeUndefined()
expect(error).toBeInstanceOf(Error)
expect(error?.message).toBe('This operation was aborted')
})
test('should throw if the abort controller aborted before first iteration has finished execution', async () => {
const abortController = new AbortController()

abortController.abort()

const [error, results] = await _.try(async () => {
return _.parallel(
{
limit: 1,
signal: abortController.signal,
},
_.list(1, 24),
async num => {
await _.sleep(50)
return `hi_${num}`
},
)
})()

expect(results).toBeUndefined()
expect(error).toBeInstanceOf(Error)
expect(error?.message).toBe('This operation was aborted')
})
test('removes abort event listener after completion', async () => {
const mockAbortSignal = {
aborted: false,
addEventListener: vi.fn(),
removeEventListener: vi.fn(),
throwIfAborted: vi.fn(),
}

await _.parallel(
{
limit: 2,
signal: mockAbortSignal,
},
_.list(1, 5),
async num => {
await new Promise(resolve => setTimeout(resolve, 10))
return `hi_${num}`
},
)

expect(mockAbortSignal.removeEventListener).toHaveBeenCalledWith(
'abort',
expect.any(Function),
)
})
})
21 changes: 20 additions & 1 deletion tests/async/retry.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// cSpell:ignore backoffs

import * as _ from 'radashi'
import type { RetryOptions } from 'radashi'
import * as _ from 'radashi'

const cast = <T = RetryOptions>(value: any): T => value

Expand Down Expand Up @@ -116,4 +116,23 @@ describe('retry', () => {
// or 2 milliseconds after.
expect(diff).toBeGreaterThanOrEqual(backoffs)
})
test('aborts the retry operation when signal is aborted', async () => {
try {
const abortController = new AbortController()
let attempt = 0
await _.retry({ signal: abortController.signal }, async () => {
attempt++
if (attempt === 2) {
abortController.abort()
}
throw 'quit again'
})
} catch (err) {
expect(err).toBeInstanceOf(Error)
expect((err as Error).message).toBe('This operation was aborted')
return
}

expect.fail('error should have been thrown')
})
})

0 comments on commit 0700644

Please sign in to comment.