diff --git a/common/lib/host_list_provider/monitoring/monitoring_host_list_provider.ts b/common/lib/host_list_provider/monitoring/monitoring_host_list_provider.ts index fb705e5f..0a6fe443 100644 --- a/common/lib/host_list_provider/monitoring/monitoring_host_list_provider.ts +++ b/common/lib/host_list_provider/monitoring/monitoring_host_list_provider.ts @@ -27,6 +27,7 @@ import { Messages } from "../../utils/messages"; import { WrapperProperties } from "../../wrapper_property"; import { BlockingHostListProvider } from "../host_list_provider"; import { logger } from "../../../logutils"; +import { isDialectTopologyAware } from "../../utils/utils"; export class MonitoringRdsHostListProvider extends RdsHostListProvider implements BlockingHostListProvider { static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(60_000_000_000); // 1 minute. @@ -76,7 +77,7 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider implement async sqlQueryForTopology(targetClient: ClientWrapper): Promise { const dialect: DatabaseDialect = this.hostListProviderService.getDialect(); - if (!this.isTopologyAwareDatabaseDialect(dialect)) { + if (!isDialectTopologyAware(dialect)) { throw new TypeError(Messages.get("RdsHostListProvider.incorrectDialect")); } return await dialect.queryForTopology(targetClient, this).then((res: any) => this.processQueryResults(res)); diff --git a/common/lib/host_list_provider/rds_host_list_provider.ts b/common/lib/host_list_provider/rds_host_list_provider.ts index 4fa17d1f..fd7353bb 100644 --- a/common/lib/host_list_provider/rds_host_list_provider.ts +++ b/common/lib/host_list_provider/rds_host_list_provider.ts @@ -149,7 +149,7 @@ export class RdsHostListProvider implements DynamicHostListProvider { async getWriterId(client: ClientWrapper): Promise { const dialect = this.hostListProviderService.getDialect(); - if (!this.isTopologyAwareDatabaseDialect(dialect)) { + if (!isDialectTopologyAware(dialect)) { throw new TypeError(Messages.get("RdsHostListProvider.incorrectDialect")); } diff --git a/common/lib/plugin_service.ts b/common/lib/plugin_service.ts index 01fe837b..07193c58 100644 --- a/common/lib/plugin_service.ts +++ b/common/lib/plugin_service.ts @@ -273,6 +273,9 @@ export class PluginService implements ErrorHandler, HostListProviderService { } private async setHostList(oldHosts: HostInfo[], newHosts: HostInfo[]) { + logger.silly("Setting host list"); + console.log(oldHosts); + console.log(newHosts); const oldHostMap: Map = new Map(oldHosts.map((e) => [e.url, e])); const newHostMap: Map = new Map(newHosts.map((e) => [e.url, e])); @@ -295,7 +298,10 @@ export class PluginService implements ErrorHandler, HostListProviderService { } }); + console.log("changes: "); + console.log(changes); if (changes.size > 0) { + logger.silly("Changes found, setting hosts"); this.hosts = newHosts ? newHosts : []; await this.pluginServiceManagerContainer.pluginManager!.notifyHostListChanged(changes); } diff --git a/common/lib/plugins/failover/failover_plugin.ts b/common/lib/plugins/failover/failover_plugin.ts index 6ba09aaa..92461751 100644 --- a/common/lib/plugins/failover/failover_plugin.ts +++ b/common/lib/plugins/failover/failover_plugin.ts @@ -415,7 +415,11 @@ export class FailoverPlugin extends AbstractConnectionPlugin { throw new AwsWrapperError(Messages.get("Failover.unableToDetermineWriter")); } + logger.silly("Failover writer complete, refreshing host list"); + await this.pluginService.refreshHostList(); + logger.silly(`Failover writer complete with writer host: ${writerHostInfo.host}`); const allowedHosts = this.pluginService.getHosts(); + console.log(allowedHosts); if (!allowedHosts.some((hostInfo: HostInfo) => hostInfo.host === writerHostInfo?.host)) { const failoverErrorMessage = Messages.get( "Failover.newWriterNotAllowed", @@ -430,7 +434,6 @@ export class FailoverPlugin extends AbstractConnectionPlugin { await this.pluginService.abortCurrentClient(); await this.pluginService.setCurrentClient(result.client, writerHostInfo); logger.debug(Messages.get("Failover.establishedConnection", this.pluginService.getCurrentHostInfo()?.host ?? "")); - await this.pluginService.refreshHostList(); this.throwFailoverSuccessError(); } catch (error: any) { if (error instanceof FailoverSuccessError) { diff --git a/common/lib/plugins/failover2/failover2_plugin.ts b/common/lib/plugins/failover2/failover2_plugin.ts index ddadb8fa..8037a4b7 100644 --- a/common/lib/plugins/failover2/failover2_plugin.ts +++ b/common/lib/plugins/failover2/failover2_plugin.ts @@ -41,6 +41,7 @@ import { HostRole } from "../../host_role"; import { CanReleaseResources } from "../../can_release_resources"; import { MonitoringRdsHostListProvider } from "../../host_list_provider/monitoring/monitoring_host_list_provider"; import { ReaderFailoverResult } from "../failover/reader_failover_result"; +import { logTopology } from "../../utils/utils"; export class Failover2Plugin extends AbstractConnectionPlugin implements CanReleaseResources { private static readonly TELEMETRY_WRITER_FAILOVER = "failover to writer instance"; @@ -119,8 +120,8 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele return ( this.enableFailoverSetting && this._rdsUrlType !== RdsUrlType.RDS_PROXY && - this.pluginService.getHosts() && - this.pluginService.getHosts().length > 0 + this.pluginService.getAllHosts() && + this.pluginService.getAllHosts().length > 0 ); } @@ -382,11 +383,22 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele this.logAndThrowError(Messages.get("Failover2.unableToFetchTopology")); } - const hosts: HostInfo[] = this.pluginService.getHosts(); + const hosts: HostInfo[] = this.pluginService.getAllHosts(); let writerCandidateClient: ClientWrapper = null; const writerCandidateHostInfo: HostInfo = hosts.find((x) => x.role === HostRole.WRITER); + const allowedHosts = this.pluginService.getHosts(); + if (!allowedHosts.some((hostInfo: HostInfo) => hostInfo.host === writerCandidateHostInfo?.host)) { + const failoverErrorMessage = Messages.get( + "Failover.newWriterNotAllowed", + writerCandidateHostInfo ? writerCandidateHostInfo.host : "", + logTopology(allowedHosts, "[Failover.newWriterNotAllowed]") + ); + logger.error(failoverErrorMessage); + throw new FailoverFailedError(failoverErrorMessage); + } + if (writerCandidateHostInfo) { try { writerCandidateClient = await this.createConnectionForHost(writerCandidateHostInfo); diff --git a/tests/integration/container/tests/autoscaling.test.ts b/tests/integration/container/tests/autoscaling.test.ts index 46818447..a3bd8d1c 100644 --- a/tests/integration/container/tests/autoscaling.test.ts +++ b/tests/integration/container/tests/autoscaling.test.ts @@ -19,7 +19,7 @@ import { DriverHelper } from "./utils/driver_helper"; import { AuroraTestUtility } from "./utils/aurora_test_utility"; import { logger } from "../../../../common/logutils"; import { TestEnvironmentFeatures } from "./utils/test_environment_features"; -import { features, instanceCount } from "./config"; +import { features, instanceCount, runTests } from "./config"; import { InternalPooledConnectionProvider } from "../../../../common/lib/internal_pooled_connection_provider"; import { AwsPoolConfig } from "../../../../common/lib/aws_pool_config"; import { TestInstanceInfo } from "./utils/test_instance_info"; @@ -28,6 +28,7 @@ import { FailoverSuccessError } from "../../../../common/lib/utils/errors"; import { PluginManager } from "../../../../common/lib"; const itIf = + runTests && !features.includes(TestEnvironmentFeatures.PERFORMANCE) && features.includes(TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY) && instanceCount >= 2 diff --git a/tests/integration/container/tests/basic_connectivity.test.ts b/tests/integration/container/tests/basic_connectivity.test.ts index 45aec100..62ed322c 100644 --- a/tests/integration/container/tests/basic_connectivity.test.ts +++ b/tests/integration/container/tests/basic_connectivity.test.ts @@ -21,12 +21,14 @@ import { AuroraTestUtility } from "./utils/aurora_test_utility"; import { logger } from "../../../../common/logutils"; import { DatabaseEngine } from "./utils/database_engine"; import { TestEnvironmentFeatures } from "./utils/test_environment_features"; -import { features } from "./config"; +import { features, runTests } from "./config"; import { DatabaseEngineDeployment } from "./utils/database_engine_deployment"; import { PluginManager } from "../../../../common/lib"; const itIf = - !features.includes(TestEnvironmentFeatures.PERFORMANCE) && !features.includes(TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY) ? it : it.skip; + runTests && !features.includes(TestEnvironmentFeatures.PERFORMANCE) && !features.includes(TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY) + ? it + : it.skip; let client: any; let auroraTestUtility: AuroraTestUtility; diff --git a/tests/integration/container/tests/config.ts b/tests/integration/container/tests/config.ts index b50208cf..f2e739d9 100644 --- a/tests/integration/container/tests/config.ts +++ b/tests/integration/container/tests/config.ts @@ -34,3 +34,4 @@ const testInfo = JSON.parse(infoJson); const request = testInfo.request; export const features = request.features; export const instanceCount = request.numOfInstances; +export const runTests = false; // TODO: remove diff --git a/tests/integration/container/tests/custom_endpoint.test.ts b/tests/integration/container/tests/custom_endpoint.test.ts index 15e82e7d..b1e9a46e 100644 --- a/tests/integration/container/tests/custom_endpoint.test.ts +++ b/tests/integration/container/tests/custom_endpoint.test.ts @@ -46,7 +46,7 @@ const itIf = const endpointId1 = `test-endpoint-1-${randomUUID()}`; const endpointId2 = `test-endpoint-2-${randomUUID()}`; -const endpointId3 = `test-endpoint-3-${randomUUID()}`; +let endpointId3: string; let endpointInfo1: DBClusterEndpoint; let endpointInfo2: DBClusterEndpoint; let endpointInfo3: DBClusterEndpoint; @@ -62,7 +62,7 @@ let currentWriter: string; let auroraTestUtility: AuroraTestUtility; -async function initDefaultConfig(host: string, port: number, connectToProxy: boolean, failoverMode: string): Promise { +async function initDefaultConfig(host: string, port: number, connectToProxy: boolean, failoverMode: string, usingFailover1: boolean): Promise { let config: any = { user: env.databaseInfo.username, host: host, @@ -76,6 +76,11 @@ async function initDefaultConfig(host: string, port: number, connectToProxy: boo telemetryTracesBackend: "OTLP", telemetryMetricsBackend: "OTLP" }; + if (usingFailover1) { + config["plugins"] = "customEndpoint,readWriteSplitting,failover"; + } else { + config["plugins"] = "customEndpoint,readWriteSplitting,failover2"; + } if (connectToProxy) { config["clusterInstanceHostPattern"] = "?." + env.proxyDatabaseInfo.instanceEndpointSuffix; } @@ -232,6 +237,8 @@ describe("custom endpoint", () => { beforeEach(async () => { await TestEnvironment.verifyClusterStatus(); currentWriter = await auroraTestUtility.getClusterWriterInstanceId(env.info.auroraClusterName); + logger.info(`Test started: ${expect.getState().currentTestName}`); + console.log("custom endpoint test before: " + currentWriter); }, 1000000); afterEach(async () => { @@ -244,12 +251,60 @@ describe("custom endpoint", () => { } await PluginManager.releaseResources(); - }); + }, 1000000); + + itIf.each([true, false])( + "test custom endpoint failover - strict reader", + async (usingFailover1: boolean) => { + endpointId3 = `test-endpoint-3-${randomUUID()}`; + await createEndpoint(env.auroraClusterName, env.instances.slice(0, 2), endpointId3, "READER"); + endpointInfo3 = await waitUntilEndpointAvailable(endpointId3); + + const config = await initDefaultConfig(endpointInfo3.Endpoint, env.databaseInfo.instanceEndpointPort, false, "strict-reader", usingFailover1); + client = initClientFunc(config); + + await client.connect(); + + const endpointMembers = endpointInfo3.StaticMembers; + const instanceId = await auroraTestUtility.queryInstanceId(client); + console.log("current test " + usingFailover1); + console.log(instanceId); + expect(endpointMembers.includes(instanceId)).toBeTruthy(); + expect(instanceId).not.toBe(currentWriter); + + // Use failover API to break connection. + await auroraTestUtility.failoverClusterAndWaitUntilWriterChanged( + currentWriter, + env.info.auroraClusterName, + instanceId === instance1 ? instance1 : instance2 + ); + + await expect(auroraTestUtility.queryInstanceId(client)).rejects.toThrow(FailoverSuccessError); + + endpointInfo3 = await waitUntilEndpointAvailable(endpointId3); + const newEndpointMembers = endpointInfo3.StaticMembers; + + const newInstanceId: string = await auroraTestUtility.queryInstanceId(client); + expect(newEndpointMembers.includes(newInstanceId)).toBeTruthy(); + + const newWriter = await auroraTestUtility.getClusterWriterInstanceId(env.info.auroraClusterName); + expect(newInstanceId).not.toBe(newWriter); + + await deleteEndpoint(rdsClient, endpointId3); + }, + 1000000 + ); - itIf( + itIf.each([true, false])( "test custom endpoint read write splitting with custom endpoint changes", - async () => { - const config = await initDefaultConfig(endpointInfo1.Endpoint, env.databaseInfo.instanceEndpointPort, false, "reader-or-writer"); + async (usingFailover1: boolean) => { + const config = await initDefaultConfig( + endpointInfo1.Endpoint, + env.databaseInfo.instanceEndpointPort, + false, + "reader-or-writer", + usingFailover1 + ); // This setting is not required for the test, but it allows us to also test re-creation of expired monitors since it // takes more than 30 seconds to modify the cluster endpoint (usually around 140s). config.customEndpointMonitorExpirationMs = 30000; @@ -323,49 +378,10 @@ describe("custom endpoint", () => { 1000000 ); - itIf( - "test custom endpoint failover - strict reader", - async () => { - await createEndpoint(env.auroraClusterName, env.instances.slice(0, 2), endpointId3, "READER"); - endpointInfo3 = await waitUntilEndpointAvailable(endpointId3); - - const config = await initDefaultConfig(endpointInfo3.Endpoint, env.databaseInfo.instanceEndpointPort, false, "strict-reader"); - client = initClientFunc(config); - - await client.connect(); - - const endpointMembers = endpointInfo3.StaticMembers; - const instanceId = await auroraTestUtility.queryInstanceId(client); - expect(endpointMembers.includes(instanceId)).toBeTruthy(); - expect(instanceId).not.toBe(currentWriter); - - // Use failover API to break connection. - await auroraTestUtility.failoverClusterAndWaitUntilWriterChanged( - currentWriter, - env.info.auroraClusterName, - instanceId === instance1 ? instance1 : instance2 - ); - - await expect(auroraTestUtility.queryInstanceId(client)).rejects.toThrow(FailoverSuccessError); - - endpointInfo3 = await waitUntilEndpointAvailable(endpointId3); - const newEndpointMembers = endpointInfo3.StaticMembers; - - const newInstanceId: string = await auroraTestUtility.queryInstanceId(client); - expect(newEndpointMembers.includes(newInstanceId)).toBeTruthy(); - - const newWriter = await auroraTestUtility.getClusterWriterInstanceId(env.info.auroraClusterName); - expect(newInstanceId).not.toBe(newWriter); - - await deleteEndpoint(rdsClient, endpointId3); - }, - 1000000 - ); - - itIf( + itIf.each([true, false])( "test custom endpoint failover - strict writer", - async () => { - const config = await initDefaultConfig(endpointInfo2.Endpoint, env.databaseInfo.instanceEndpointPort, false, "strict-writer"); + async (usingFailvoer1: boolean) => { + const config = await initDefaultConfig(endpointInfo2.Endpoint, env.databaseInfo.instanceEndpointPort, false, "strict-writer", usingFailvoer1); client = initClientFunc(config); await client.connect(); @@ -399,10 +415,16 @@ describe("custom endpoint", () => { 1000000 ); - itIf( + itIf.each([true, false])( "test custom endpoint failover - reader or writer mode", - async () => { - const config = await initDefaultConfig(endpointInfo1.Endpoint, env.databaseInfo.instanceEndpointPort, false, "reader-or-writer"); + async (usingFailover1: boolean) => { + const config = await initDefaultConfig( + endpointInfo1.Endpoint, + env.databaseInfo.instanceEndpointPort, + false, + "reader-or-writer", + usingFailover1 + ); client = initClientFunc(config); await client.connect(); diff --git a/tests/integration/container/tests/iam_authentication.test.ts b/tests/integration/container/tests/iam_authentication.test.ts index c9c00d7b..75bf360e 100644 --- a/tests/integration/container/tests/iam_authentication.test.ts +++ b/tests/integration/container/tests/iam_authentication.test.ts @@ -25,11 +25,12 @@ import { AwsMySQLClient } from "../../../../mysql/lib"; import { IamAuthenticationPlugin } from "../../../../common/lib/authentication/iam_authentication_plugin"; import { logger } from "../../../../common/logutils"; import { TestEnvironmentFeatures } from "./utils/test_environment_features"; -import { features } from "./config"; +import { features, runTests } from "./config"; import { PluginManager } from "../../../../common/lib"; import { jest } from "@jest/globals"; const itIf = + runTests && !features.includes(TestEnvironmentFeatures.PERFORMANCE) && !features.includes(TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY) && features.includes(TestEnvironmentFeatures.IAM) diff --git a/tests/integration/container/tests/read_write_splitting.test.ts b/tests/integration/container/tests/read_write_splitting.test.ts index d673dbb3..97a418e4 100644 --- a/tests/integration/container/tests/read_write_splitting.test.ts +++ b/tests/integration/container/tests/read_write_splitting.test.ts @@ -23,7 +23,7 @@ import { QueryResult } from "pg"; import { ProxyHelper } from "./utils/proxy_helper"; import { logger } from "../../../../common/logutils"; import { TestEnvironmentFeatures } from "./utils/test_environment_features"; -import { features, instanceCount } from "./config"; +import { features, instanceCount, runTests } from "./config"; import { InternalPooledConnectionProvider } from "../../../../common/lib/internal_pooled_connection_provider"; import { AwsPoolConfig } from "../../../../common/lib/aws_pool_config"; import { InternalPoolMapping } from "../../../../common/lib/utils/internal_pool_mapping"; @@ -31,6 +31,7 @@ import { HostInfo } from "../../../../common/lib/host_info"; import { PluginManager } from "../../../../common/lib"; const itIf = + runTests && !features.includes(TestEnvironmentFeatures.PERFORMANCE) && features.includes(TestEnvironmentFeatures.IAM) && !features.includes(TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY) && diff --git a/tests/integration/container/tests/session_state.test.ts b/tests/integration/container/tests/session_state.test.ts index 3ce23be8..3210593c 100644 --- a/tests/integration/container/tests/session_state.test.ts +++ b/tests/integration/container/tests/session_state.test.ts @@ -20,7 +20,7 @@ import { DriverHelper } from "./utils/driver_helper"; import { logger } from "../../../../common/logutils"; import { DatabaseEngine } from "./utils/database_engine"; import { TestEnvironmentFeatures } from "./utils/test_environment_features"; -import { features } from "./config"; +import { features, runTests } from "./config"; import { DatabaseEngineDeployment } from "./utils/database_engine_deployment"; import { PluginManager } from "../../../../common/lib"; import { AwsPGClient } from "../../../../pg/lib"; @@ -30,7 +30,9 @@ import { AwsMySQLClient } from "../../../../mysql/lib"; import { TransactionIsolationLevel } from "../../../../common/lib/utils/transaction_isolation_level"; const itIf = - !features.includes(TestEnvironmentFeatures.PERFORMANCE) && !features.includes(TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY) ? it : it.skip; + runTests && !features.includes(TestEnvironmentFeatures.PERFORMANCE) && !features.includes(TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY) + ? it + : it.skip; let client: any; @@ -74,111 +76,115 @@ class TestAwsPGClient extends AwsPGClient { } describe("session state", () => { - it.only("test update state", async () => { - const env = await TestEnvironment.getCurrent(); - const driver = DriverHelper.getDriverForDatabaseEngine(env.engine); - let initClientFunc; - switch (driver) { - case TestDriver.MYSQL: - initClientFunc = (options: any) => new TestAwsMySQLClient(options); - break; - case TestDriver.PG: - initClientFunc = (options: any) => new TestAwsPGClient(options); - break; - default: - throw new Error("invalid driver"); - } - - let props = { - user: env.databaseInfo.username, - host: env.databaseInfo.clusterEndpoint, - database: env.databaseInfo.defaultDbName, - password: env.databaseInfo.password, - port: env.databaseInfo.clusterEndpointPort - }; - props = DriverHelper.addDriverSpecificConfiguration(props, env.engine); - client = initClientFunc(props); - - const newClient = initClientFunc(props); + itIf( + "test update state", + async () => { + const env = await TestEnvironment.getCurrent(); + const driver = DriverHelper.getDriverForDatabaseEngine(env.engine); + let initClientFunc; + switch (driver) { + case TestDriver.MYSQL: + initClientFunc = (options: any) => new TestAwsMySQLClient(options); + break; + case TestDriver.PG: + initClientFunc = (options: any) => new TestAwsPGClient(options); + break; + default: + throw new Error("invalid driver"); + } - try { - await client.connect(); - await newClient.connect(); - const targetClient = client.targetClient; - const newTargetClient = newClient.targetClient; - - expect(targetClient).not.toEqual(newTargetClient); - if (driver === TestDriver.MYSQL) { - await DriverHelper.executeQuery(env.engine, client, "CREATE DATABASE IF NOT EXISTS testSessionState"); - await client.setReadOnly(true); - await client.setCatalog("testSessionState"); - await client.setTransactionIsolation(TransactionIsolationLevel.TRANSACTION_SERIALIZABLE); - await client.setAutoCommit(false); - - // Assert new client's session states are using server default values. - let readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_read_only AS readonly"); - let catalog = await DriverHelper.executeQuery(env.engine, newClient, "SELECT DATABASE() AS catalog"); - let autoCommit = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.autocommit AS autocommit"); - let transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_isolation AS level"); - expect(readOnly[0][0].readonly).toEqual(0); - expect(catalog[0][0].catalog).toEqual(env.databaseInfo.defaultDbName); - expect(autoCommit[0][0].autocommit).toEqual(1); - expect(transactionIsolation[0][0].level).toEqual("REPEATABLE-READ"); - - await client.getPluginService().setCurrentClient(newClient.targetClient); - - expect(client.targetClient).not.toEqual(targetClient); - expect(client.targetClient).toEqual(newTargetClient); - - // Assert new client's session states are set. - readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_read_only AS readonly"); - catalog = await DriverHelper.executeQuery(env.engine, newClient, "SELECT DATABASE() AS catalog"); - autoCommit = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.autocommit AS autocommit"); - transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_isolation AS level"); - expect(readOnly[0][0].readonly).toEqual(1); - expect(catalog[0][0].catalog).toEqual("testSessionState"); - expect(autoCommit[0][0].autocommit).toEqual(0); - expect(transactionIsolation[0][0].level).toEqual("SERIALIZABLE"); - - await client.setReadOnly(false); - await client.setAutoCommit(true); - await DriverHelper.executeQuery(env.engine, client, "DROP DATABASE IF EXISTS testSessionState"); - } else if (driver === TestDriver.PG) { - // End any current transaction before we can create a new test database. - await DriverHelper.executeQuery(env.engine, client, "END TRANSACTION"); - await DriverHelper.executeQuery(env.engine, client, "DROP DATABASE IF EXISTS testSessionState"); - await DriverHelper.executeQuery(env.engine, client, "CREATE DATABASE testSessionState"); - await client.setReadOnly(true); - await client.setSchema("testSessionState"); - await client.setTransactionIsolation(TransactionIsolationLevel.TRANSACTION_SERIALIZABLE); - - // Assert new client's session states are using server default values. - let readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SHOW transaction_read_only"); - let schema = await DriverHelper.executeQuery(env.engine, newClient, "SHOW search_path"); - let transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SHOW TRANSACTION ISOLATION LEVEL"); - expect(readOnly.rows[0]["transaction_read_only"]).toEqual("off"); - expect(schema.rows[0]["search_path"]).not.toEqual("testSessionState"); - expect(transactionIsolation.rows[0]["transaction_isolation"]).toEqual("read committed"); - - await client.getPluginService().setCurrentClient(newClient.targetClient); - expect(client.targetClient).not.toEqual(targetClient); - expect(client.targetClient).toEqual(newTargetClient); - - // Assert new client's session states are set. - readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SHOW transaction_read_only"); - schema = await DriverHelper.executeQuery(env.engine, newClient, "SHOW search_path"); - transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SHOW TRANSACTION ISOLATION LEVEL"); - expect(readOnly.rows[0]["transaction_read_only"]).toEqual("on"); - expect(schema.rows[0]["search_path"]).toEqual("testsessionstate"); - expect(transactionIsolation.rows[0]["transaction_isolation"]).toEqual("serializable"); - - await client.setReadOnly(false); - await DriverHelper.executeQuery(env.engine, client, "DROP DATABASE IF EXISTS testSessionState"); + let props = { + user: env.databaseInfo.username, + host: env.databaseInfo.clusterEndpoint, + database: env.databaseInfo.defaultDbName, + password: env.databaseInfo.password, + port: env.databaseInfo.clusterEndpointPort + }; + props = DriverHelper.addDriverSpecificConfiguration(props, env.engine); + client = initClientFunc(props); + + const newClient = initClientFunc(props); + + try { + await client.connect(); + await newClient.connect(); + const targetClient = client.targetClient; + const newTargetClient = newClient.targetClient; + + expect(targetClient).not.toEqual(newTargetClient); + if (driver === TestDriver.MYSQL) { + await DriverHelper.executeQuery(env.engine, client, "CREATE DATABASE IF NOT EXISTS testSessionState"); + await client.setReadOnly(true); + await client.setCatalog("testSessionState"); + await client.setTransactionIsolation(TransactionIsolationLevel.TRANSACTION_SERIALIZABLE); + await client.setAutoCommit(false); + + // Assert new client's session states are using server default values. + let readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_read_only AS readonly"); + let catalog = await DriverHelper.executeQuery(env.engine, newClient, "SELECT DATABASE() AS catalog"); + let autoCommit = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.autocommit AS autocommit"); + let transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_isolation AS level"); + expect(readOnly[0][0].readonly).toEqual(0); + expect(catalog[0][0].catalog).toEqual(env.databaseInfo.defaultDbName); + expect(autoCommit[0][0].autocommit).toEqual(1); + expect(transactionIsolation[0][0].level).toEqual("REPEATABLE-READ"); + + await client.getPluginService().setCurrentClient(newClient.targetClient); + + expect(client.targetClient).not.toEqual(targetClient); + expect(client.targetClient).toEqual(newTargetClient); + + // Assert new client's session states are set. + readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_read_only AS readonly"); + catalog = await DriverHelper.executeQuery(env.engine, newClient, "SELECT DATABASE() AS catalog"); + autoCommit = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.autocommit AS autocommit"); + transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_isolation AS level"); + expect(readOnly[0][0].readonly).toEqual(1); + expect(catalog[0][0].catalog).toEqual("testSessionState"); + expect(autoCommit[0][0].autocommit).toEqual(0); + expect(transactionIsolation[0][0].level).toEqual("SERIALIZABLE"); + + await client.setReadOnly(false); + await client.setAutoCommit(true); + await DriverHelper.executeQuery(env.engine, client, "DROP DATABASE IF EXISTS testSessionState"); + } else if (driver === TestDriver.PG) { + // End any current transaction before we can create a new test database. + await DriverHelper.executeQuery(env.engine, client, "END TRANSACTION"); + await DriverHelper.executeQuery(env.engine, client, "DROP DATABASE IF EXISTS testSessionState"); + await DriverHelper.executeQuery(env.engine, client, "CREATE DATABASE testSessionState"); + await client.setReadOnly(true); + await client.setSchema("testSessionState"); + await client.setTransactionIsolation(TransactionIsolationLevel.TRANSACTION_SERIALIZABLE); + + // Assert new client's session states are using server default values. + let readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SHOW transaction_read_only"); + let schema = await DriverHelper.executeQuery(env.engine, newClient, "SHOW search_path"); + let transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SHOW TRANSACTION ISOLATION LEVEL"); + expect(readOnly.rows[0]["transaction_read_only"]).toEqual("off"); + expect(schema.rows[0]["search_path"]).not.toEqual("testSessionState"); + expect(transactionIsolation.rows[0]["transaction_isolation"]).toEqual("read committed"); + + await client.getPluginService().setCurrentClient(newClient.targetClient); + expect(client.targetClient).not.toEqual(targetClient); + expect(client.targetClient).toEqual(newTargetClient); + + // Assert new client's session states are set. + readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SHOW transaction_read_only"); + schema = await DriverHelper.executeQuery(env.engine, newClient, "SHOW search_path"); + transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SHOW TRANSACTION ISOLATION LEVEL"); + expect(readOnly.rows[0]["transaction_read_only"]).toEqual("on"); + expect(schema.rows[0]["search_path"]).toEqual("testsessionstate"); + expect(transactionIsolation.rows[0]["transaction_isolation"]).toEqual("serializable"); + + await client.setReadOnly(false); + await DriverHelper.executeQuery(env.engine, client, "DROP DATABASE IF EXISTS testSessionState"); + } + } catch (e) { + await client.end(); + await newClient.end(); + throw e; } - } catch (e) { - await client.end(); - await newClient.end(); - throw e; - } - }, 1320000); + }, + 1320000 + ); }); diff --git a/tests/unit/failover2_plugin.test.ts b/tests/unit/failover2_plugin.test.ts index 1ffa78ad..6e14b607 100644 --- a/tests/unit/failover2_plugin.test.ts +++ b/tests/unit/failover2_plugin.test.ts @@ -132,7 +132,7 @@ describe("reader failover handler", () => { when(mockHostInfo.allAliases).thenReturn(new Set(["alias1", "alias2"])); when(mockHostInfo.getRawAvailability()).thenReturn(HostAvailability.AVAILABLE); - when(mockPluginService.getHosts()).thenReturn(hosts); + when(mockPluginService.getAllHosts()).thenReturn(hosts); when(mockPluginService.forceMonitoringRefresh(true, anything())).thenResolve(true); when(mockPluginService.connect(mockHostInfo, anything())).thenReject(test); @@ -181,7 +181,7 @@ describe("reader failover handler", () => { const hosts = [hostInfo]; when(mockHostInfo.allAliases).thenReturn(new Set(["alias1", "alias2"])); - when(mockPluginService.getHosts()).thenReturn(hosts); + when(mockPluginService.getAllHosts()).thenReturn(hosts); when(mockPluginService.forceMonitoringRefresh(true, anything())).thenResolve(true); when(mockPluginService.connect(mockHostInfo, anything())).thenResolve(null); @@ -209,7 +209,7 @@ describe("reader failover handler", () => { const hosts = [hostInfo]; when(mockHostInfo.allAliases).thenReturn(new Set(["alias1", "alias2"])); - when(mockPluginService.getHosts()).thenReturn(hosts); + when(mockPluginService.getAllHosts()).thenReturn(hosts); when(mockPluginService.forceMonitoringRefresh(true, anything())).thenResolve(true); when(mockPluginService.connect(hostInfo, anything())).thenResolve(mockClientWrapper); when(mockPluginService.getHostRole(mockClientWrapper)).thenResolve(HostRole.WRITER);