Skip to content

Commit

Permalink
Merge pull request #590 from Altinity/debezium_2.7
Browse files Browse the repository at this point in the history
Upgraded to debezium 2.7.0 and update new function definition for MyS…
  • Loading branch information
subkanthi authored Jul 12, 2024
2 parents 1f5854f + 9fb5fc1 commit d1cb792
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 11 deletions.
4 changes: 2 additions & 2 deletions sink-connector-lightweight/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>2.5.0.Beta1</version>
<version>2.7.0.Alpha2</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -306,7 +306,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<maven.compiler.target>17</maven.compiler.target>
<version.debezium>2.5.0.Beta1</version.debezium>
<version.debezium>2.7.0.Alpha2</version.debezium>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
</properties>
</project>
3 changes: 3 additions & 0 deletions sink-connector-lightweight/docker/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ database.allowPublicKeyRetrieval: "true"
# The default value of the property is initial. You can customize the way that the connector creates snapshots by changing the value of the snapshot.mode property
snapshot.mode: "initial"

# Snapshot.locking.mode Required for Debezium 2.7.0 and later. The snapshot.locking.mode configuration property specifies the mode that the connector uses to lock tables during snapshotting.
#snapshot.locking.mode: "minimal"

# offset.flush.interval.ms: The number of milliseconds to wait before flushing recent offsets to Kafka. This ensures that offsets are committed within the specified time interval.
offset.flush.interval.ms: 5000

Expand Down
5 changes: 3 additions & 2 deletions sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<maven.compiler.target>17</maven.compiler.target>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.debezium>2.5.0.Beta1</version.debezium>
<version.debezium>2.7.0.Beta2</version.debezium>
<version.junit>5.9.1</version.junit>
<version.testcontainers>1.19.1</version.testcontainers>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
Expand All @@ -23,7 +23,7 @@
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>2.14.0.Final</quarkus.platform.version>
<surefire-plugin.version>3.0.0-M7</surefire-plugin.version>
<sink-connector-library-version>0.0.8</sink-connector-library-version>
<sink-connector-library-version>0.0.9</sink-connector-library-version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -523,6 +523,7 @@
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication</mainClass>
</transformer>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,36 @@ public void exitPartitionFunctionList(MySqlParser.PartitionFunctionListContext p

}

@Override
public void enterPartitionSystemVersion(MySqlParser.PartitionSystemVersionContext partitionSystemVersionContext) {

}

@Override
public void exitPartitionSystemVersion(MySqlParser.PartitionSystemVersionContext partitionSystemVersionContext) {

}

@Override
public void enterPartitionSystemVersionDefinitions(MySqlParser.PartitionSystemVersionDefinitionsContext partitionSystemVersionDefinitionsContext) {

}

@Override
public void exitPartitionSystemVersionDefinitions(MySqlParser.PartitionSystemVersionDefinitionsContext partitionSystemVersionDefinitionsContext) {

}

@Override
public void enterPartitionSystemVersionDefinition(MySqlParser.PartitionSystemVersionDefinitionContext partitionSystemVersionDefinitionContext) {

}

@Override
public void exitPartitionSystemVersionDefinition(MySqlParser.PartitionSystemVersionDefinitionContext partitionSystemVersionDefinitionContext) {

}

@Override
public void enterSubPartitionFunctionHash(MySqlParser.SubPartitionFunctionHashContext subPartitionFunctionHashContext) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.clickhouse.data.ClickHouseDataType;
import io.debezium.antlr.DataTypeResolver;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.mysql.MySqlValueConverters;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
Expand Down Expand Up @@ -34,7 +34,8 @@ public static ClickHouseDataType convert(String columnName, MySqlParser.DataType
JdbcValueConverters.DecimalMode.PRECISE,
TemporalPrecisionMode.ADAPTIVE,
JdbcValueConverters.BigIntUnsignedMode.LONG,
CommonConnectorConfig.BinaryHandlingMode.BYTES);
CommonConnectorConfig.BinaryHandlingMode.BYTES,
x ->x, CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN);


DataType dataType = initializeDataTypeResolver().resolveDataType(columnDefChild);
Expand All @@ -50,8 +51,8 @@ public static String convertToString(String columnName, int scale, int precision
JdbcValueConverters.DecimalMode.PRECISE,
TemporalPrecisionMode.ADAPTIVE,
JdbcValueConverters.BigIntUnsignedMode.LONG,
CommonConnectorConfig.BinaryHandlingMode.BYTES
);
CommonConnectorConfig.BinaryHandlingMode.BYTES,
x ->x, CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN);


String convertedDataType = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ schema.history.internal.schema.history.table.ddl="CREATE TABLE %s(`id` VARCHAR(3
schema.history.internal.schema.history.table.name="default.replicate_schema_history"
auto.create.tables= false
replacingmergetree.delete.column=_sign
metrics.enable= true
metrics.port= 8083
snapshot.mode= "initial"
replica.status.view="CREATE VIEW IF NOT EXISTS %s.show_replica_status AS SELECT now() - fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS seconds_behind_source, toDateTime(fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')), 'UTC') AS utc_time, fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS local_time FROM %s FINAL"
4 changes: 2 additions & 2 deletions sink-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.altinity</groupId>
<artifactId>clickhouse-kafka-sink-connector</artifactId>
<version>0.0.8</version>
<version>0.0.9</version>
<packaging>jar</packaging>
<name>ClickHouse Kafka Sink Connector</name>
<description>Sinks data from Kafka into ClickHouse</description>
Expand Down Expand Up @@ -308,7 +308,7 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>2.5.0.Beta1</version>
<version>2.7.0.Beta2</version>
</dependency>

<dependency>
Expand Down

0 comments on commit d1cb792

Please sign in to comment.