Skip to content

Commit

Permalink
bulk-cdk-toolkit-extract-cdc,source-mysql-v2: speed up Debezium tests…
Browse files Browse the repository at this point in the history
… with mysql (#46341)
  • Loading branch information
postamar authored Oct 3, 2024
1 parent d0d3d40 commit a384791
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ class CdcPartitionReaderMySQLTest :
.withDebeziumName(databaseName)
.withHeartbeats(heartbeat)
.with("include.schema.changes", "false")
.with("connect.keep.alive.interval.ms", "1000")
.withDatabase("hostname", host)
.withDatabase("port", firstMappedPort.toString())
.withDatabase("user", username)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c
dockerImageTag: 0.0.13
dockerImageTag: 0.0.14
dockerRepository: airbyte/source-mysql-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package io.airbyte.integrations.source.mysql

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.command.CdcSourceConfiguration
import io.airbyte.cdk.command.ConfigurationSpecificationSupplier
import io.airbyte.cdk.command.JdbcSourceConfiguration
import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.cdk.command.SourceConfigurationFactory
import io.airbyte.cdk.ssh.SshConnectionOptions
import io.airbyte.cdk.ssh.SshTunnelMethodConfiguration
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton
import java.net.URLDecoder
import java.nio.charset.StandardCharsets
Expand All @@ -30,9 +32,22 @@ data class MysqlSourceConfiguration(
override val resourceAcquisitionHeartbeat: Duration = Duration.ofMillis(100L),
override val checkpointTargetInterval: Duration,
override val checkPrivileges: Boolean,
override val debeziumHeartbeatInterval: Duration = Duration.ofSeconds(1),
override val debeziumHeartbeatInterval: Duration = Duration.ofSeconds(10),
val debeziumKeepAliveInterval: Duration = Duration.ofMinutes(1),
) : JdbcSourceConfiguration, CdcSourceConfiguration {
override val global = cursorConfiguration is CdcCursor

/** Required to inject [MysqlSourceConfiguration] directly. */
@Factory
private class MicronautFactory {
@Singleton
fun mysqlSourceConfig(
factory:
SourceConfigurationFactory<
MysqlSourceConfigurationJsonObject, MysqlSourceConfiguration>,
supplier: ConfigurationSpecificationSupplier<MysqlSourceConfigurationJsonObject>,
): MysqlSourceConfiguration = factory.make(supplier.get())
}
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ArrayNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.command.CdcSourceConfiguration
import io.airbyte.cdk.command.JdbcSourceConfiguration
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.data.OffsetDateTimeCodec
import io.airbyte.cdk.discover.CommonMetaField
Expand All @@ -28,6 +26,7 @@ import io.airbyte.cdk.read.cdc.DebeziumSchemaHistory
import io.airbyte.cdk.read.cdc.DebeziumState
import io.airbyte.cdk.ssh.TunnelSession
import io.airbyte.cdk.util.Jsons
import io.airbyte.integrations.source.mysql.MysqlSourceConfiguration
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.debezium.connector.mysql.MySqlConnector
Expand All @@ -53,8 +52,7 @@ import org.apache.kafka.connect.source.SourceRecord
@Singleton
class MySqlDebeziumOperations(
val jdbcConnectionFactory: JdbcConnectionFactory,
jdbcSourceConfiguration: JdbcSourceConfiguration,
cdcSourceConfiguration: CdcSourceConfiguration,
configuration: MysqlSourceConfiguration,
random: Random = Random.Default,
) : DebeziumOperations<MySqlPosition> {
private val log = KotlinLogging.logger {}
Expand Down Expand Up @@ -249,7 +247,7 @@ class MySqlDebeziumOperations(
return Jsons.objectNode().apply { set<JsonNode>(STATE, stateNode) }
}

val databaseName: String = jdbcSourceConfiguration.namespaces.first()
val databaseName: String = configuration.namespaces.first()
val serverID: Int = random.nextInt(MIN_SERVER_ID..MAX_SERVER_ID)

val commonProperties: Map<String, String> by lazy {
Expand All @@ -258,7 +256,7 @@ class MySqlDebeziumOperations(
.withDefault()
.withConnector(MySqlConnector::class.java)
.withDebeziumName(databaseName)
.withHeartbeats(cdcSourceConfiguration.debeziumHeartbeatInterval)
.withHeartbeats(configuration.debeziumHeartbeatInterval)
// This to make sure that binary data represented as a base64-encoded String.
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-binary-handling-mode
.with("binary.handling.mode", "base64")
Expand All @@ -271,7 +269,11 @@ class MySqlDebeziumOperations(
.with("snapshot.locking.mode", "none")
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-include-schema-changes
.with("include.schema.changes", "false")
.withDatabase(jdbcSourceConfiguration.jdbcProperties)
.with(
"connect.keep.alive.interval.ms",
configuration.debeziumKeepAliveInterval.toMillis().toString()
)
.withDatabase(configuration.jdbcProperties)
.withDatabase("hostname", tunnelSession.address.hostName)
.withDatabase("port", tunnelSession.address.port.toString())
.withDatabase("dbname", databaseName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ class MysqlSourceTestConfigurationFactory :
): MysqlSourceConfiguration =
MysqlSourceConfigurationFactory()
.makeWithoutExceptionHandling(pojo)
.copy(maxConcurrency = 1, checkpointTargetInterval = Duration.ofSeconds(3))
.copy(
maxConcurrency = 1,
checkpointTargetInterval = Duration.ofSeconds(3),
debeziumHeartbeatInterval = Duration.ofMillis(100),
debeziumKeepAliveInterval = Duration.ofSeconds(1),
)
}

0 comments on commit a384791

Please sign in to comment.