diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderTest.kt index d96c591e7ca5..fcb56e451f91 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderTest.kt @@ -387,6 +387,7 @@ class CdcPartitionReaderMySQLTest : .withDebeziumName(databaseName) .withHeartbeats(heartbeat) .with("include.schema.changes", "false") + .with("connect.keep.alive.interval.ms", "1000") .withDatabase("hostname", host) .withDatabase("port", firstMappedPort.toString()) .withDatabase("user", username) diff --git a/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml b/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml index 395f79cb3e06..5eb38ff4168b 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c - dockerImageTag: 0.0.13 + dockerImageTag: 0.0.14 dockerRepository: airbyte/source-mysql-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql-v2 diff --git a/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfiguration.kt b/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfiguration.kt index ee5968563872..547234ab19c9 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfiguration.kt +++ b/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfiguration.kt @@ -3,12 +3,14 @@ package io.airbyte.integrations.source.mysql import io.airbyte.cdk.ConfigErrorException import io.airbyte.cdk.command.CdcSourceConfiguration +import io.airbyte.cdk.command.ConfigurationSpecificationSupplier import io.airbyte.cdk.command.JdbcSourceConfiguration import io.airbyte.cdk.command.SourceConfiguration import io.airbyte.cdk.command.SourceConfigurationFactory import io.airbyte.cdk.ssh.SshConnectionOptions import io.airbyte.cdk.ssh.SshTunnelMethodConfiguration import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.Factory import jakarta.inject.Singleton import java.net.URLDecoder import java.nio.charset.StandardCharsets @@ -30,9 +32,22 @@ data class MysqlSourceConfiguration( override val resourceAcquisitionHeartbeat: Duration = Duration.ofMillis(100L), override val checkpointTargetInterval: Duration, override val checkPrivileges: Boolean, - override val debeziumHeartbeatInterval: Duration = Duration.ofSeconds(1), + override val debeziumHeartbeatInterval: Duration = Duration.ofSeconds(10), + val debeziumKeepAliveInterval: Duration = Duration.ofMinutes(1), ) : JdbcSourceConfiguration, CdcSourceConfiguration { override val global = cursorConfiguration is CdcCursor + + /** Required to inject [MysqlSourceConfiguration] directly. */ + @Factory + private class MicronautFactory { + @Singleton + fun mysqlSourceConfig( + factory: + SourceConfigurationFactory< + MysqlSourceConfigurationJsonObject, MysqlSourceConfiguration>, + supplier: ConfigurationSpecificationSupplier, + ): MysqlSourceConfiguration = factory.make(supplier.get()) + } } @Singleton diff --git a/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt b/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt index 29becb5dade7..706595636740 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt +++ b/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt @@ -8,8 +8,6 @@ import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.ArrayNode import com.fasterxml.jackson.databind.node.ObjectNode import io.airbyte.cdk.ConfigErrorException -import io.airbyte.cdk.command.CdcSourceConfiguration -import io.airbyte.cdk.command.JdbcSourceConfiguration import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.data.OffsetDateTimeCodec import io.airbyte.cdk.discover.CommonMetaField @@ -28,6 +26,7 @@ import io.airbyte.cdk.read.cdc.DebeziumSchemaHistory import io.airbyte.cdk.read.cdc.DebeziumState import io.airbyte.cdk.ssh.TunnelSession import io.airbyte.cdk.util.Jsons +import io.airbyte.integrations.source.mysql.MysqlSourceConfiguration import io.airbyte.protocol.models.v0.AirbyteRecordMessage import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta import io.debezium.connector.mysql.MySqlConnector @@ -53,8 +52,7 @@ import org.apache.kafka.connect.source.SourceRecord @Singleton class MySqlDebeziumOperations( val jdbcConnectionFactory: JdbcConnectionFactory, - jdbcSourceConfiguration: JdbcSourceConfiguration, - cdcSourceConfiguration: CdcSourceConfiguration, + configuration: MysqlSourceConfiguration, random: Random = Random.Default, ) : DebeziumOperations { private val log = KotlinLogging.logger {} @@ -249,7 +247,7 @@ class MySqlDebeziumOperations( return Jsons.objectNode().apply { set(STATE, stateNode) } } - val databaseName: String = jdbcSourceConfiguration.namespaces.first() + val databaseName: String = configuration.namespaces.first() val serverID: Int = random.nextInt(MIN_SERVER_ID..MAX_SERVER_ID) val commonProperties: Map by lazy { @@ -258,7 +256,7 @@ class MySqlDebeziumOperations( .withDefault() .withConnector(MySqlConnector::class.java) .withDebeziumName(databaseName) - .withHeartbeats(cdcSourceConfiguration.debeziumHeartbeatInterval) + .withHeartbeats(configuration.debeziumHeartbeatInterval) // This to make sure that binary data represented as a base64-encoded String. // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-binary-handling-mode .with("binary.handling.mode", "base64") @@ -271,7 +269,11 @@ class MySqlDebeziumOperations( .with("snapshot.locking.mode", "none") // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-include-schema-changes .with("include.schema.changes", "false") - .withDatabase(jdbcSourceConfiguration.jdbcProperties) + .with( + "connect.keep.alive.interval.ms", + configuration.debeziumKeepAliveInterval.toMillis().toString() + ) + .withDatabase(configuration.jdbcProperties) .withDatabase("hostname", tunnelSession.address.hostName) .withDatabase("port", tunnelSession.address.port.toString()) .withDatabase("dbname", databaseName) diff --git a/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceTestConfigurationFactory.kt b/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceTestConfigurationFactory.kt index 43bdc7183e1d..723817bc701b 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceTestConfigurationFactory.kt +++ b/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceTestConfigurationFactory.kt @@ -18,5 +18,10 @@ class MysqlSourceTestConfigurationFactory : ): MysqlSourceConfiguration = MysqlSourceConfigurationFactory() .makeWithoutExceptionHandling(pojo) - .copy(maxConcurrency = 1, checkpointTargetInterval = Duration.ofSeconds(3)) + .copy( + maxConcurrency = 1, + checkpointTargetInterval = Duration.ofSeconds(3), + debeziumHeartbeatInterval = Duration.ofMillis(100), + debeziumKeepAliveInterval = Duration.ofSeconds(1), + ) }