Skip to content

Commit dee5426

Browse files
authored
Graceful stop (#240)
* wip * wip * fix * updates to state * ungraceful stops in test suite * wip * test hook fixes * fix test suite * schema migration and test updates * tests and db close race condition * test fixes and docs * fixes and tests * test and docs * added output to archive op + docs * readme update [skip ci] * fixes * added timeout to error message * fix max timeout for batchSize * added early abort to worker loop delay during stop * update types and docs [skip ci] * new test related to GH issue * update npm ignore * updated ignore rules * update default retention policy in sql
1 parent 810a00f commit dee5426

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+7409
-1736
lines changed

.gitignore

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
node_modules
2-
coverage/
3-
*.tgz
4-
.nyc_output/
5-
.vscode
2+
.nyc_output
3+
.vscode
4+
*.tgz

.npmignore

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
test/
2-
coverage/
1+
.github
2+
.nyc_output
3+
.vscode
4+
docs
5+
test
6+
.editorconfig
37
.travis.yml
48
*.tgz
5-
.nyc_output/
6-
docs/
7-
.editorconfig
8-
.github/

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ services:
66
- postgresql
77
language: node_js
88
node_js:
9+
- "16"
910
- "14"
1011
- "12"
11-
- "10"
1212
before_script:
1313
- psql -c 'create database pgboss' -U postgres
1414
- psql -c 'create extension pgcrypto' -d pgboss -U postgres

CHANGELOG.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,37 @@
11
# Changes
22

3+
## 6.0.0 :tada:
4+
5+
- CHANGE: `stop()` has been enhanced with a graceful stop feature that will signal and monitor any polling subscriptions (workers using `subscribe()` or `onComlete()`) before closing the internal connection pool and stopping maintenance operations. The defalt options, `{ graceful = true, timeout = 30000 }`, will wait for 30s before shutting down.
6+
- NEW: Added a `stopped` event that will be emitted after `stop()` when all workers have completed active jobs, or when the timeout is met, whichever is sooner.
7+
- NEW: Added a `wip` event that will emit as jobs are both fetched and completed per instance. If no work is being done, no events will be emitted. This will emit at most once every 2 seconds for monitoring purposes.
8+
- NEW: Added the `output` jsonb column to storage tables to store result or error data along with the original job, which were previously only available via completion jobs. This has the added benefit of storing any errors or results from completion jobs themselves, which were previously discarded.
9+
- NEW: `getJobById(id)` can now be used to fetch a job from either primary or archive storage by id. This may be helpful if needed to inspect `output` and you have the job id.
10+
- NEW: Added new function, `publishSingleton()`, similar to publishOnce(), but throttles publish to only allow 1 job in the queue at a time, allowing a job to be queued even if 1 or more jobs are currently active.
11+
- CHANGE: `subscribe()` and `onComplete()` now resolve with a unique id that will be visible in `wip` along with additional metadata about the subscription.
12+
- CHANGE: `subscribe()` and `onComplete()` now abort promise execution client-side based on the job's expiration.
13+
- CHANGE: `unsubscribe()` and `offComplete()` now accept an object as an argument to allow removing a specific subscription by id.
14+
- MAJOR: The `onComplete` publish option is now defaulted to `false`, which breaks backward compatability for automatic creation of completion jobs. To restore the previous behavior of completion jobs being created by default, you should set `onComplete` to `true` in your constructor options.
15+
- MAJOR: The default retention policy has been reduced from 30 to 14 days. This can still be customized as an option in the constructor.
16+
- MAJOR: Node 10 is EOL. Node 12 is now the minimum supported version.
17+
- MAJOR: Added a new index to the primary job table, `job_fetch`, to improve fetch time performace as the job table size increases. Depending on how many jobs you have in your job table, creating this index may delay `start()` promise resolution. If this is a concern, you can get the SQL via `getMigrationPlans()` and create the index out of band. The `CREATE INDEX` command includes an `IF NOT EXISTS` which will skip this step in the migration if already created.
18+
19+
For example, once you have installed this package version, using the node repl, console.log the output of `getMigrationPlans()`.
20+
21+
```shell
22+
23+
$ node
24+
Welcome to Node.js v14.16.1.
25+
Type ".help" for more information.
26+
> console.log(require('./node_modules/pg-boss').getMigrationPlans())
27+
28+
BEGIN;
29+
...
30+
CREATE INDEX IF NOT EXISTS job_fetch ...
31+
...
32+
COMMIT;
33+
```
34+
335
## 5.2.3
436

537
- Dependency PR from dependabot

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ This will likely cater the most to teams already familiar with the simplicity of
5050
* Automatic maintenance operations to manage table growth
5151

5252
## Requirements
53-
* Node 10 or higher
53+
* Node 12 or higher
5454
* PostgreSQL 9.5 or higher
5555

5656
## Installation

docs/configuration.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,6 @@ Alternatively, the following options can be set as properties in an object.
7474
This option may be beneficial if you'd like to use an existing database service
7575
with its own connection pool.
7676
77-
For example, you may be relying on the cluster module on
78-
a web server, and you'd like to limit the growth of total connections as much as possible.
79-
8077
* **schema** - string, defaults to "pgboss"
8178
8279
Database schema that contains all required storage objects. Only alphanumeric and underscore allowed, length: <= 50 characters
@@ -259,7 +256,7 @@ For example, if you set the `singletonMinutes` to 1, then submit 2 jobs within a
259256
Setting `singletonNextSlot` to true will cause the job to be scheduled to run after the current time slot if and when a job is throttled. This option is set to true, for example, when calling the convenience function `publishDebounced()`.
260257
261258
### Completion jobs
262-
* **onComplete**, bool (Default: true)
259+
* **onComplete**, bool (Default: false)
263260
264261
When a job completes, a completion job will be created in the queue, copying the same retention policy as the job, for the purpose of `onComplete()` or `fetchCompleted()`. If completion jobs are not used, they will be archived according to the retention policy. If the queue in question has a very high volume, this can be set to `false` to bypass creating the completion job. This can also be set in the constructor as a default for all calls to `publish()`.
265262
@@ -288,6 +285,8 @@ When a job completes, a completion job will be created in the queue, copying the
288285
| createdon | string, timestamp |
289286
| completedon | string, timestamp |
290287
| keepuntil | string, timestamp |
288+
| oncomplete | bool |
289+
| output | object |
291290
292291
## Subscribe options
293292

docs/images/job-states.png

-7.41 KB
Loading

docs/usage.md

Lines changed: 76 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
- [Events](#events)
1212
- [`error`](#error)
1313
- [`monitor-states`](#monitor-states)
14+
- [`wip`](#wip)
15+
- [`stopped`](#stopped)
1416
- [Static functions](#static-functions)
1517
- [`string getConstructionPlans(schema)`](#string-getconstructionplansschema)
1618
- [`string getMigrationPlans(schema, version)`](#string-getmigrationplansschema-version)
@@ -25,13 +27,14 @@
2527
- [`publish(request)`](#publishrequest)
2628
- [`publishAfter(name, data, options, seconds | ISO date string | Date)`](#publishaftername-data-options-seconds--iso-date-string--date)
2729
- [`publishOnce(name, data, options, key)`](#publishoncename-data-options-key)
30+
- [`publishSingleton(name, data, options)`](#publishsingletonname-data-options)
2831
- [`publishThrottled(name, data, options, seconds [, key])`](#publishthrottledname-data-options-seconds--key)
2932
- [`publishDebounced(name, data, options, seconds [, key])`](#publishdebouncedname-data-options-seconds--key)
3033
- [`subscribe()`](#subscribe)
3134
- [`subscribe(name [, options], handler)`](#subscribename--options-handler)
3235
- [`onComplete(name [, options], handler)`](#oncompletename--options-handler)
33-
- [`unsubscribe(name)`](#unsubscribename)
34-
- [`offComplete(name)`](#offcompletename)
36+
- [`unsubscribe(value)`](#unsubscribevalue)
37+
- [`offComplete(value)`](#offcompletevalue)
3538
- [`fetch()`](#fetch)
3639
- [`fetch(name)`](#fetchname)
3740
- [`fetch(name, batchSize, [, options])`](#fetchname-batchsize--options)
@@ -47,6 +50,7 @@
4750
- [`fail(id [, data])`](#failid--data)
4851
- [`fail([ids])`](#failids)
4952
- [`getQueueSize(name [, options])`](#getqueuesizename--options)
53+
- [`getJobById(id)`](#getjobbyidid)
5054
- [`deleteQueue(name)`](#deletequeuename)
5155
- [`deleteAllQueues()`](#deleteallqueues)
5256
- [`clearStorage()`](#clearstorage)
@@ -105,25 +109,27 @@ If you need to interact with pg-boss outside of Node.js, such as other clients o
105109
The following command is the definition of the primary job table. For manual job creation, the only required column is `name`. All other columns are nullable or have sensible defaults.
106110

107111
```sql
108-
CREATE TABLE ${schema}.job (
109-
id uuid primary key not null default gen_random_uuid(),
110-
name text not null,
111-
priority integer not null default(0),
112-
data jsonb,
113-
state ${schema}.job_state not null default('${states.created}'),
114-
retryLimit integer not null default(0),
115-
retryCount integer not null default(0),
116-
retryDelay integer not null default(0),
117-
retryBackoff boolean not null default false,
118-
startAfter timestamp with time zone not null default now(),
119-
startedOn timestamp with time zone,
120-
singletonKey text,
121-
singletonOn timestamp without time zone,
122-
expireIn interval not null default interval '15 minutes',
123-
createdOn timestamp with time zone not null default now(),
124-
completedOn timestamp with time zone,
125-
keepUntil timestamp with time zone NOT NULL default now() + interval '30 days'
126-
)
112+
CREATE TABLE ${schema}.job (
113+
id uuid primary key not null default gen_random_uuid(),
114+
name text not null,
115+
priority integer not null default(0),
116+
data jsonb,
117+
state ${schema}.job_state not null default('${states.created}'),
118+
retryLimit integer not null default(0),
119+
retryCount integer not null default(0),
120+
retryDelay integer not null default(0),
121+
retryBackoff boolean not null default false,
122+
startAfter timestamp with time zone not null default now(),
123+
startedOn timestamp with time zone,
124+
singletonKey text,
125+
singletonOn timestamp without time zone,
126+
expireIn interval not null default interval '15 minutes',
127+
createdOn timestamp with time zone not null default now(),
128+
completedOn timestamp with time zone,
129+
keepUntil timestamp with time zone NOT NULL default now() + interval '14 days',
130+
on_complete boolean not null default true,
131+
output jsonb
132+
)
127133
```
128134

129135
# Events
@@ -187,6 +193,32 @@ The payload of the event is an object with a key per queue and state, such as th
187193
"all": 4694
188194
}
189195
```
196+
## `wip`
197+
198+
Emitted at most once every 2 seconds when polling subscriptions are active and jobs are entering or leaving active state. The payload is an array that represents each worker in this instance of pg-boss. If you want to monitor queue activity across all instances, use `monitor-states`.
199+
200+
```js
201+
[
202+
{
203+
id: 'fc738fb0-1de5-4947-b138-40d6a790749e',
204+
name: 'my-queue',
205+
options: { newJobCheckInterval: 2000 },
206+
state: 'active',
207+
count: 1,
208+
createdOn: 1620149137015,
209+
lastFetchedOn: 1620149137015,
210+
lastJobStartedOn: 1620149137015,
211+
lastJobEndedOn: null,
212+
lastJobDuration: 343
213+
lastError: null,
214+
lastErrorOn: null
215+
}
216+
]
217+
```
218+
219+
## `stopped`
220+
221+
Emitted after `stop()` once all subscription workers have completed their work and maintenance has been shut down.
190222

191223
# Static functions
192224

@@ -332,7 +364,13 @@ This is a convenience version of `publish()` with the `startAfter` option assign
332364

333365
### `publishOnce(name, data, options, key)`
334366

335-
Publish a job with a unique key to make sure it isn't processed more than once. Any other jobs published during this archive interval with the same queue name and key will be rejected.
367+
Publish a job with a unique key to only allow 1 job to be in created, retry, or active state at a time.
368+
369+
This is a convenience version of `publish()` with the `singletonKey` option assigned.
370+
371+
### `publishSingleton(name, data, options)`
372+
373+
Publish a job but only allow 1 job to be in created or retry state at at time.
336374

337375
This is a convenience version of `publish()` with the `singletonKey` option assigned.
338376

@@ -352,7 +390,7 @@ This is a convenience version of `publish()` with the `singletonSeconds`, `singl
352390

353391
**returns: Promise**
354392

355-
Polls the database by a queue name or a pattern and executes the provided callback function when jobs are found. The promise resolves once a subscription has been created.
393+
Polls the database by a queue name or a pattern and executes the provided callback function when jobs are found. The promise resolves once a subscription has been created with a unique id of the subscription. You can monitor the state of subscriptions using the `wip` event.
356394

357395
Queue patterns use the `*` character to match 0 or more characters. For example, a job from queue `status-report-12345` would be fetched with pattern `status-report-*` or even `stat*5`.
358396

@@ -461,13 +499,20 @@ The following is an example data object from the job retrieved in the onComplete
461499
}
462500
```
463501

464-
## `unsubscribe(name)`
502+
## `unsubscribe(value)`
503+
504+
Removes a subscription by name or id and stops polling.
505+
506+
** Arguments **
507+
- value: string or object
465508

466-
Removes a subscription by name and stops polling.
509+
If a string, removes all subscriptions found matching the name. If an object, only the subscription with a matching `id` will be removed.
467510

468-
### `offComplete(name)`
511+
### `offComplete(value)`
469512

470-
Same as `unsubscribe()`, but removes an `onComplete()` subscription.
513+
Similar to `unsubscribe()`, but removes an `onComplete()` subscription.
514+
515+
**
471516

472517
## `fetch()`
473518

@@ -638,6 +683,10 @@ As an example, the following options object include active jobs along with creat
638683
}
639684
```
640685

686+
## `getJobById(id)`
687+
688+
Retrieves a job with all metadata by id in either the primary or archive storage.
689+
641690
## `deleteQueue(name)`
642691

643692
Deletes all pending jobs in the specified queue from the active job table. All jobs in the archive table are retained.

0 commit comments

Comments
 (0)