-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathaws_internal_connection_pooling_postgres_example.ts
143 lines (125 loc) · 5.86 KB
/
aws_internal_connection_pooling_postgres_example.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import { AwsPGClient } from "../../pg/lib";
import { FailoverFailedError, FailoverSuccessError, TransactionResolutionUnknownError } from "../../common/lib/utils/errors";
import { InternalPooledConnectionProvider } from "../../common/lib/internal_pooled_connection_provider";
import { HostInfo } from "../../common/lib/host_info";
import { InternalPoolMapping } from "../../common/lib/utils/internal_pool_mapping";
import { ConnectionProviderManager } from "../../common/lib/connection_provider_manager";
import { WrapperProperties } from "../../common/lib/wrapper_property";
import { AwsPoolConfig } from "../../common/lib/aws_pool_config";
import { PluginManager } from "../../common/lib";
const postgresHost = "db-identifier.XYZ.us-east-2.rds.amazonaws.com";
const username = "john_smith";
const password = "employees";
const database = "database";
const port = 5432;
/**
* Optional methods: only required if configured to use internal connection pools.
* The configuration in these methods are only examples - you can configure as you needed in your own code.
*/
const myPoolKeyFunc: InternalPoolMapping = {
getPoolKey: (hostInfo: HostInfo, props: Map<string, any>) => {
const user = props.get(WrapperProperties.USER.name);
return hostInfo.url + user + "/" + props.get("dbUser");
}
};
/**
* Configure read-write splitting to use internal connection pools (the getPoolKey
* parameter is optional, see UsingTheReadWriteSplittingPlugin.md for more info).
*/
const poolConfig = new AwsPoolConfig({ maxConnections: 10, maxIdleConnections: 10, idleTimeoutMillis: 10000, allowExitOnIdle: true });
const provider = new InternalPooledConnectionProvider(poolConfig, myPoolKeyFunc);
const client = new AwsPGClient({
// Configure connection parameters. Enable readWriteSplitting, failover, and efm plugins.
host: postgresHost,
port: port,
user: username,
password: password,
database: database,
plugins: "readWriteSplitting,failover,efm",
// Optional: PoolKey property value and connection provider used in internal connection pools.
connectionProvider: provider,
dbUser: "john_smith"
});
// Setup Step: Open connection and create tables - uncomment this section to create table and test values.
/* try {
await client.connect();
await setInitialSessionSettings(client);
await queryWithFailoverHandling(client, "CREATE TABLE bank_test (id int primary key, name varchar(40), account_balance int)");
await queryWithFailoverHandling(
client,
"INSERT INTO bank_test VALUES (0, 'Jane Doe', 200), (1, 'John Smith', 200), (2, 'Sally Smith', 200), (3, 'Joe Smith', 200)"
);
} catch (error: any) {
// Additional error handling can be added here. See transaction step for an example.
throw error;
} */
// Transaction Step: Open connection and perform transaction.
try {
await client.connect();
await setInitialSessionSettings(client);
// Example query
const result = await queryWithFailoverHandling(client, "UPDATE bank_test SET account_balance=account_balance - 100 WHERE name='Jane Doe'");
console.log(result);
// Internally switch to a reader connection.
await client.setReadOnly(true);
for (let i = 0; i < 4; i++) {
await queryWithFailoverHandling(client, "SELECT * FROM bank_test WHERE id = " + i);
}
} catch (error) {
if (error instanceof FailoverFailedError) {
// User application should open a new connection, check the results of the failed transaction and re-run it if
// needed. See:
// https://github.com/aws/aws-advanced-nodejs-wrapper/blob/main/docs/using-the-nodejs-wrapper/using-plugins/UsingTheFailoverPlugin.md#failoverfailederror
throw error;
} else if (error instanceof TransactionResolutionUnknownError) {
// User application should check the status of the failed transaction and restart it if needed. See:
// https://github.com/aws/aws-advanced-nodejs-wrapper/blob/main/docs/using-the-nodejs-wrapper/using-plugins/UsingTheFailoverPlugin.md#transactionresolutionunknownerror
throw error;
} else {
// Unexpected exception unrelated to failover. This should be handled by the user application.
throw error;
}
} finally {
await client.end();
// Clean up resources used by the plugins.
await PluginManager.releaseResources();
// If configured to use internal connection pools, close them here.
await provider.releaseResources();
}
async function setInitialSessionSettings(client: AwsPGClient) {
// User can edit settings.
await client.query("SET TIME ZONE UTC");
}
async function queryWithFailoverHandling(client: AwsPGClient, query: string) {
try {
const result = await client.query(query);
return result;
} catch (error) {
if (error instanceof FailoverFailedError) {
// Connection failed, and Node.js wrapper failed to reconnect to a new instance.
throw error;
} else if (error instanceof FailoverSuccessError) {
// Query execution failed and Node.js wrapper successfully failed over to a new elected writer instance.
// Reconfigure the connection.
await setInitialSessionSettings(client);
// Re-run query
return await client.query(query);
} else if (error instanceof TransactionResolutionUnknownError) {
// Transaction resolution unknown. Please re-configure session state if required and try
// restarting transaction.
throw error;
}
}
}