From 2ab8db20827edaa9d957f3d377e4f0abaa4415cb Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 11 Dec 2024 15:43:17 -0500 Subject: [PATCH 1/3] Added mapping of time with timezone postgres datatype to clickhouse string and added integration test. --- .../embedded/PostgresPgoutputMultipleSchemaIT.java | 10 +++++++++- .../src/test/resources/init_postgres.sql | 9 +++++++++ .../connector/converters/ClickHouseDataTypeMapper.java | 2 +- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java index 3fda8fbb8..ba5a8166d 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java @@ -65,7 +65,7 @@ public Properties getProperties() throws Exception { properties.put("slot.retry.delay.ms", "5000" ); properties.put("database.allowPublicKeyRetrieval", "true" ); properties.put("schema.include.list", "public,public2"); - properties.put("table.include.list", "public.tm,public2.tm2,public.people" ); + properties.put("table.include.list", "public.tm,public2.tm2,public.people,public2.table_time_with_timezone" ); properties.put("column.exclude.list", "public.people.full_name_mat"); return properties; } @@ -136,6 +136,14 @@ public void testMultipleSchemaReplication() throws Exception { } Assert.assertTrue(tm2Count == 1); + // valdate table_with_timezone + int tableWithTimezoneCount = 0; + ResultSet chRsTz = writer.getConnection().prepareStatement("select count(*) from public.table_with_timezone").executeQuery(); + while(chRsTz.next()) { + tableWithTimezoneCount = chRsTz.getInt(1); + } + Assert.assertTrue(tableWithTimezoneCount == 1); + // Create a connection to postgresql and create a new table. Connection postgresConn2 = ITCommon.connectToPostgreSQL(postgreSQLContainer); postgresConn2.createStatement().execute("CREATE TABLE public.people( height_cm numeric PRIMARY KEY, height_in numeric GENERATED ALWAYS AS (height_cm / 2.54) STORED)"); diff --git a/sink-connector-lightweight/src/test/resources/init_postgres.sql b/sink-connector-lightweight/src/test/resources/init_postgres.sql index 79a07d63e..29fd1207c 100644 --- a/sink-connector-lightweight/src/test/resources/init_postgres.sql +++ b/sink-connector-lightweight/src/test/resources/init_postgres.sql @@ -165,3 +165,12 @@ INSERT INTO protocol_test VALUES ('1778432', '21481203', 'Edward V prisoners Pe create schema public2; set schema 'public2'; CREATE TABLE "tm2" (id uuid DEFAULT gen_random_uuid() NOT NULL PRIMARY KEY, secid uuid, acc_id uuid); + +CREATE TABLE public2.table_time_with_timezone ( + id SERIAL PRIMARY KEY, + event_name TEXT NOT NULL, + event_time TIME WITH TIME ZONE NOT NULL +); +INSERT INTO public2.table_time_with_timezone (event_name, event_time) +VALUES +('Meeting', '14:30:00-05:00'); diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java index 415895659..3cda79698 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java @@ -85,9 +85,9 @@ public class ClickHouseDataTypeMapper { // Timestamp -> ZonedTimeStamp -> DateTime dataTypesMap.put(new MutablePair<>(Schema.Type.STRING, ZonedTimestamp.SCHEMA_NAME), ClickHouseDataType.DateTime64); + dataTypesMap.put(new MutablePair<>(Schema.Type.STRING, ZonedTime.SCHEMA_NAME.toLowerCase()), ClickHouseDataType.String); dataTypesMap.put(new MutablePair<>(Schema.Type.STRING, Enum.LOGICAL_NAME), ClickHouseDataType.String); - dataTypesMap.put(new MutablePair<>(Schema.Type.STRING, Json.LOGICAL_NAME), ClickHouseDataType.String); dataTypesMap.put(new MutablePair<>(Schema.INT32_SCHEMA.type(), Year.SCHEMA_NAME), ClickHouseDataType.Int32); From 55542c14abce3ec93ef6bcaeb3b6c21e26038198 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 11 Dec 2024 15:52:35 -0500 Subject: [PATCH 2/3] Added mapping of time with timezone postgres datatype to clickhouse string and added integration test. --- .../debezium/embedded/PostgresPgoutputMultipleSchemaIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java index ba5a8166d..9542ff4e8 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java @@ -138,7 +138,7 @@ public void testMultipleSchemaReplication() throws Exception { // valdate table_with_timezone int tableWithTimezoneCount = 0; - ResultSet chRsTz = writer.getConnection().prepareStatement("select count(*) from public.table_with_timezone").executeQuery(); + ResultSet chRsTz = writer.getConnection().prepareStatement("select count(*) from table_time_with_timezone final").executeQuery(); while(chRsTz.next()) { tableWithTimezoneCount = chRsTz.getInt(1); } From 02316082fe8217536ae9b3ee55029067cb3c06de Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 19 Dec 2024 21:20:24 -0500 Subject: [PATCH 3/3] Add test insert to validate character varying fields with brackets. --- sink-connector-lightweight/src/test/resources/init_postgres.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sink-connector-lightweight/src/test/resources/init_postgres.sql b/sink-connector-lightweight/src/test/resources/init_postgres.sql index 29fd1207c..28a8de618 100644 --- a/sink-connector-lightweight/src/test/resources/init_postgres.sql +++ b/sink-connector-lightweight/src/test/resources/init_postgres.sql @@ -27,7 +27,7 @@ INSERT INTO public.tm VALUES ( '9cb52b2a-8ef2-4987-8856-c79a1b2c2f71', '9cb52b2a-8ef2-4987-8856-c79a1b2c2f72', '9cb52b2a-8ef2-4987-8856-c79a1b2c2f72', -'IDR', +'IDR 888884444524 (BK:BK_MTN_MOMO_PULL)', 't', 200000.00000, '2022-10-16 16:53:15.01957',