Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: MAP value converter for data via debezium #2156

Merged
merged 32 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1ccdb8e
Fix: MAP value converter for debezium
priyanshi-yb Jan 6, 2025
78ba6c5
add cdc events for live
priyanshi-yb Jan 6, 2025
2f5b411
move hstore to PostgreToYBConverter
priyanshi-yb Jan 7, 2025
a952d74
try
priyanshi-yb Jan 7, 2025
7ed4582
schema log
priyanshi-yb Jan 7, 2025
1a56540
default hstore mode
priyanshi-yb Jan 8, 2025
16399c4
json hstore mode
priyanshi-yb Jan 8, 2025
891cddf
cleanup
priyanshi-yb Jan 8, 2025
d2433c3
cleanup
priyanshi-yb Jan 8, 2025
a72b633
cleanup
priyanshi-yb Jan 8, 2025
2849530
changed the hstore handling mode to json and handling it in io.debezi…
priyanshi-yb Jan 8, 2025
0a22864
try java map
priyanshi-yb Jan 8, 2025
2c9e33b
fix reports
priyanshi-yb Jan 8, 2025
73bffd0
add all datatype tests
priyanshi-yb Jan 8, 2025
b013864
fix compilation
priyanshi-yb Jan 8, 2025
69c393f
fix run test to cat only in specific case
priyanshi-yb Jan 8, 2025
dd4e212
fix
priyanshi-yb Jan 8, 2025
b894c51
Fix java hashmap approach to escape " -> \"
priyanshi-yb Jan 9, 2025
c996e46
fix
priyanshi-yb Jan 9, 2025
ffbe291
change the inserts to be proper
priyanshi-yb Jan 10, 2025
24d9530
fix expected file
priyanshi-yb Jan 12, 2025
099f1c1
review comments
priyanshi-yb Jan 15, 2025
15f04d6
fix unit test/expected file
priyanshi-yb Jan 15, 2025
d12e8ed
add tests of reporting
priyanshi-yb Jan 16, 2025
d4265dc
Merge branch 'main' into priyanshi/fix-hstore
priyanshi-yb Jan 16, 2025
d746bd1
review comments
priyanshi-yb Jan 16, 2025
4ce41f2
fix tests
priyanshi-yb Jan 16, 2025
4f779a3
fix build
priyanshi-yb Jan 16, 2025
1830dad
add --yes to import-data
priyanshi-yb Jan 16, 2025
88d0cf5
live-migration validateAfterchanges fix
priyanshi-yb Jan 16, 2025
03e2b96
fix sample-is
priyanshi-yb Jan 16, 2025
cb27941
clean up
priyanshi-yb Jan 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/pg-17-migtests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ jobs:
# - name: "TEST: pg-basic-non-public-fall-back-test"
# run: migtests/scripts/live-migration-fallb-run-test.sh pg/basic-non-public-live-test

- name: "TEST: pg-datatypes-live-test"
if: ${{ !cancelled() && matrix.test_group == 'live_basic' }}
run: migtests/scripts/live-migration-run-test.sh pg/datatypes

- name: "TEST: pg-datatypes-fall-forward-test"
if: ${{ !cancelled() && matrix.test_group == 'live_basic' }}
run: migtests/scripts/live-migration-fallf-run-test.sh pg/datatypes

- name: "TEST: pg-datatypes-fall-back-test"
if: ${{ !cancelled() && matrix.test_group == 'live_basic' }}
run: migtests/scripts/live-migration-fallb-run-test.sh pg/datatypes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class ensures of doing any transformation of the record received from debezium
* before actually writing that record.
*/
public class DebeziumRecordTransformer implements RecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordTransformer.class);

private JsonConverter jsonConverter;
public DebeziumRecordTransformer(){
Expand Down Expand Up @@ -70,6 +74,36 @@ private String makeFieldValueSerializable(Object fieldValue, Field field){
case BYTES:
case STRUCT:
return toKafkaConnectJsonConverted(fieldValue, field);
case MAP:
StringBuilder mapString = new StringBuilder();
for (Map.Entry<String, String> entry : ((HashMap<String, String>) fieldValue).entrySet()) {
String key = entry.getKey();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add debug logs before and after our transformation for the key and value.

String val = entry.getValue();
/*
Escaping the key and value here for the double quote (")" and backslash char (\) with a backslash character as mentioned here
https://www.postgresql.org/docs/9/hstore.html#:~:text=To%20include%20a%20double%20quote%20or%20a%20backslash%20in%20a%20key%20or%20value%2C%20escape%20it%20with%20a%20backslash.

Following the order of escaping the backslash first and then the double quote becasue first escape the backslashes in the string and adding the backslash for escaping to handle case like
e.g. key - "a\"b" -> (first escaping) -> "a\\"b" -> (second escaping) -> "a\\\"b"
*/
key = key.replace("\\", "\\\\"); // escaping backslash \ -> \\ ( "a\b" -> "a\\b" ) "
val = val.replace("\\", "\\\\");
key = key.replace("\"", "\\\""); // escaping double quotes " -> \" ( "a"b" -> "a\"b" ) "
val = val.replace("\"", "\\\"");
mapString.append("\"");
mapString.append(key);
mapString.append("\"");
mapString.append(" => ");
mapString.append("\"");
mapString.append(val);
mapString.append("\"");
mapString.append(",");
Comment on lines +99 to +106
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@priyanshi-yb, do we have something like fmt.Sprintf() in Java to use here?

Copy link
Collaborator

@makalaaneesh makalaaneesh Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 that would be a better. No idea why I wrote it like this initially 😀

Copy link
Contributor Author

@priyanshi-yb priyanshi-yb Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is this String.format() which looks similar to fmt.Sprintf() can fix that in a follow up PR.

}
if(mapString.length() == 0) {
return "";
}
return mapString.toString().substring(0, mapString.length() - 1);

}
return fieldValue.toString();
}
Expand Down
8 changes: 8 additions & 0 deletions migtests/scripts/run-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ main() {
fi
fi

if [ "${TEST_DIR}" = "${TESTS_DIR}/pg/datatypes" ]; then
cat ${EXPORT_DIR}/data/hstore_example_data.sql
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just for debugging? remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

if [ "${BETA_FAST_DATA_EXPORT}" = "1" ]; then
cat ${EXPORT_DIR}/data/schemas/source_db_exporter/hstore_example_schema.json
cat ${EXPORT_DIR}/logs/debezium-source_db_exporter.log
fi
fi

step "Fix data."
if [ -x "${TEST_DIR}/fix-data" ]
then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,13 @@ CREATE TABLE public.locations (

CREATE TABLE image (title text, raster lo);

CREATE TABLE employees (id INT PRIMARY KEY, salary INT, data hstore);

-- IS JSON Predicate
CREATE TABLE public.json_data (
id SERIAL PRIMARY KEY,
data_column TEXT NOT NULL CHECK (data_column IS JSON)
);
CREATE TABLE employees (id INT PRIMARY KEY, salary INT);
-- create table with multirange data types

-- Create tables with primary keys directly
Expand Down
11 changes: 11 additions & 0 deletions migtests/tests/analyze-schema/expected_issues.json
Original file line number Diff line number Diff line change
Expand Up @@ -2143,5 +2143,16 @@
"GH": "https://github.com/yugabyte/yugabyte-db/issues/25575",
"DocsLink": "https://docs.yugabyte.com/preview/yugabyte-voyager/known-issues/postgresql/#postgresql-12-and-later-features",
"MinimumVersionsFixedIn": null
},
{
"IssueType": "migration_caveats",
"ObjectType": "TABLE",
"ObjectName": "employees",
"Reason": "Unsupported datatype for Live migration with fall-forward/fallback - hstore on column - data",
"SqlStatement": "CREATE TABLE employees (id INT PRIMARY KEY, salary INT, data hstore);",
"Suggestion": "",
"GH": "https://github.com/yugabyte/yb-voyager/issues/1731",
"DocsLink": "https://docs.yugabyte.com/preview/yugabyte-voyager/known-issues/postgresql/#unsupported-datatypes-by-voyager-during-live-migration",
"MinimumVersionsFixedIn": null
}
]
2 changes: 1 addition & 1 deletion migtests/tests/analyze-schema/summary.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
{
"ObjectType": "TABLE",
"TotalCount": 64,
"InvalidCount": 53,
"InvalidCount": 54,
"ObjectNames": "test_table_in_type_file, sales_data, salaries2, sales, test_1, test_2, test_non_pk_multi_column_list, test_3, test_4, test_5, test_6, test_7, test_8, test_9, order_details, public.employees4, enum_example.bugs, table_xyz, table_abc, table_1, table_test, test_interval, public.range_columns_partition_test, public.range_columns_partition_test_copy, anydata_test, anydataset_test, anytype_test, uritype_test, \"Test\", public.meeting, public.pr, public.foreign_def_test, public.users, foreign_def_test1, foreign_def_test2, unique_def_test, unique_def_test1, test_xml_type, test_xid_type, public.test_jsonb, public.inet_type, public.citext_type, public.documents, public.ts_query_table, combined_tbl, combined_tbl1, test_udt, test_arr_enum, public.locations, public.xml_data_example, image, public.json_data, employees, bigint_multirange_table, date_multirange_table, int_multirange_table, numeric_multirange_table, timestamp_multirange_table, timestamptz_multirange_table, users_unique_nulls_distinct, users_unique_nulls_not_distinct, sales_unique_nulls_not_distinct, sales_unique_nulls_not_distinct_alter, users_unique_nulls_not_distinct_index" },
{
"ObjectType": "INDEX",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
},
{
"ObjectType": "EXTENSION",
"TotalCount": 4,
"TotalCount": 5,
"InvalidCount": 0,
"ObjectNames": "citext, pgcrypto, pg_stat_statements, lo"
"ObjectNames": "citext, pgcrypto, pg_stat_statements, lo, hstore"
},
{
"ObjectType": "TYPE",
Expand Down Expand Up @@ -1107,7 +1107,7 @@
"SchemaName": "public",
"ObjectName": "combined_tbl",
"RowCount": 0,
"ColumnCount": 12,
"ColumnCount": 13,
"Reads": 0,
"Writes": 0,
"ReadsPerSecond": 0,
Expand Down Expand Up @@ -2779,6 +2779,10 @@
{
"ObjectName": "schema2.products.item (schema2.item_details)",
"SqlStatement": ""
},
{
"ObjectName": "public.combined_tbl.data (public.hstore)",
"SqlStatement": ""
}
],
"DocsLink": "https://docs.yugabyte.com/preview/yugabyte-voyager/known-issues/postgresql/#unsupported-datatypes-by-voyager-during-live-migration",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ CREATE TYPE public.address_type AS (
);

CREATE EXTENSION lo;

CREATE EXTENSION hstore;
--other misc types
create table public.combined_tbl (
id int,
Expand All @@ -220,6 +222,7 @@ create table public.combined_tbl (
address address_type,
raster lo,
arr_enum enum_kind[],
data hstore,
PRIMARY KEY (id, arr_enum)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,5 +271,44 @@
"exported_updates": 0,
"exported_deletes": 0,
"final_row_count": 2
},
{
"table_name": "public.\"hstore_example\"",
"db_type": "source",
"exported_snapshot_rows": 13,
"imported_snapshot_rows": 0,
"imported_inserts": 0,
"imported_updates": 0,
"imported_deletes": 0,
"exported_inserts": 3,
"exported_updates": 5,
"exported_deletes": 0,
"final_row_count": 16
},
{
"table_name": "public.\"hstore_example\"",
"db_type": "target",
"exported_snapshot_rows": 0,
"imported_snapshot_rows": 13,
"imported_inserts": 3,
"imported_updates": 5,
"imported_deletes": 0,
"exported_inserts": 0,
"exported_updates": 0,
"exported_deletes": 0,
"final_row_count": 16
},
{
"table_name": "public.\"hstore_example\"",
"db_type": "source-replica",
"exported_snapshot_rows": 0,
"imported_snapshot_rows": 13,
"imported_inserts": 3,
"imported_updates": 5,
"imported_deletes": 0,
"exported_inserts": 0,
"exported_updates": 0,
"exported_deletes": 0,
"final_row_count": 16
}
]
5 changes: 5 additions & 0 deletions migtests/tests/pg/datatypes/export_data_status-report.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,10 @@
"table_name": "null_and_default",
"status": "DONE",
"exported_count": 2
},
{
"exported_count": 13,
"status": "DONE",
"table_name": "hstore_example"
}
]
7 changes: 7 additions & 0 deletions migtests/tests/pg/datatypes/import_data_status-report.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@
"imported_count": 3,
"percentage_complete": 100
},
{
"table_name": "public.\"hstore_example\"",
"status": "DONE",
"total_count": 13,
"imported_count": 13,
"percentage_complete": 100
},
{
"table_name": "public.\"null_and_default\"",
"status": "DONE",
Expand Down
19 changes: 19 additions & 0 deletions migtests/tests/pg/datatypes/pg_datatypes_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,22 @@ select * from datatypes2;
insert into null_and_default (id) VALUES (1);
insert into null_and_default VALUES(2, NULL, NULL, NULL);

INSERT INTO hstore_example (data)
VALUES
('"key1"=>"value1", "key2"=>"value2"'),
(hstore('a"b', 'd\"a')),
(NULL),
(''),
('key1 => value1, key2 => value2'),
(hstore(ARRAY['key1', 'key2'], ARRAY['value1', 'value2'])),
('key7 => value7, key8 => 123, key9 => true'),
('"paperback" => "243",
"publisher" => "postgresqltutorial.com",
"language" => "English",
"ISBN-13" => "978-1449370000",
"weight" => "11.2 ounces"'),
(hstore(ROW(1,'{"key1=value1, key2=value2"}'))),
(hstore('json_field', '{"key1=value1, key2={"key1=value1, key2=value2"}"}')), --hstore() key and values need no extra processing
('"{\"key1=value1, key2=value2\"}"=>"{\"key1=value1, key2={\"key1=value1, key2=value2\"}\"}"'), --single quotes string need to escaped properly
(hstore('"{""key1"":""value1"",""key2"":""value2""}"', '{"key1=value1, key2={"key1=value1, key2=value2"}"}')),
(hstore('"{key1:value1,key2:value2}"', '{"key1=value1, key2={"key1=value1, key2=value2"}"}'));
6 changes: 5 additions & 1 deletion migtests/tests/pg/datatypes/pg_datatypes_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,9 @@ create table datatypes2(id serial primary key, v1 json, v2 BIT(10), v3 int ARRAY
drop table if exists null_and_default;
create table null_and_default(id int PRIMARY KEY, b boolean default false, i int default 10, val varchar default 'testdefault');

create EXTENSION hstore;


CREATE TABLE hstore_example (
id SERIAL PRIMARY KEY,
data hstore
);
29 changes: 29 additions & 0 deletions migtests/tests/pg/datatypes/source_delta.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,34 @@ UPDATE datatypes2
SET v1 = '{"updated": true}', v2 = B'0101010101', v5 = B'101010101010101010101010101010', v3 = ARRAY[5, 6, 7, 8], v4 = '{{"e", "f"}, {"g", "h"}}'
WHERE v1 IS NULL;

UPDATE hstore_example
SET data = data || 'key3 => value3'
WHERE id = 1;

UPDATE hstore_example
SET data = hstore('{"key1=value1, key2=value2"}', '{"key1=value1, key2={"key1=value1, key2=value2"}"}')
WHERE id = 3;

UPDATE hstore_example
SET data = '"{\"key1=value1, key2=value2\"}"=>"{\"key1=value1, key2={\"key1=value1, key2=value2\"}\"}"'
WHERE id = 7;

INSERT INTO hstore_example (data)
VALUES
('key5 => value5, key6 => value6');

INSERT INTO hstore_example (data)
VALUES
(hstore('{"key1=value1, key2=value2"}', '{"key1=value1, key2={"key1=value1, key2=value2"}"}'));

INSERT INTO hstore_example (data)
VALUES
('');

UPDATE hstore_example
SET data = NULL
WHERE id = 5;

UPDATE hstore_example
SET data = ''
WHERE id = 6;
23 changes: 23 additions & 0 deletions migtests/tests/pg/datatypes/target_delta.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,26 @@ SET v1 = '{"new": "data"}', v2 = B'1111000011', v5=B'001010100101010101010101010

DELETE FROM datatypes2
WHERE 5 = ANY(v3);

-- NOT WORKING WIT H YB CDC GRPC connector as of now
-- INSERT INTO hstore_example (data)
-- VALUES
-- ('key7 => value7, key8 => value8');

-- UPDATE hstore_example
-- SET data = delete(data, 'key2')
-- WHERE id = 8;

-- DELETE FROM hstore_example WHERE data ? 'key5';

-- INSERT INTO hstore_example (data)
-- VALUES
-- (hstore('"{""key1"":""value1"",""key2"":""value2""}"', '{"key1=value1, key2={"key1=value1, key2=value2"}"}'));

-- UPDATE hstore_example
-- SET data = hstore('{"key1=value1, key2=value2"}', '{"key1=value1, key2={"key1=value1, key2=value2"}"}')
-- WHERE id = 15;

-- UPDATE hstore_example
-- SET data = '"{\"key1=value1, key2=value2\"}"=>"{\"key1=value1, key2={\"key1=value1, key2=value2\"}\"}"'
-- WHERE id = 14;
3 changes: 2 additions & 1 deletion migtests/tests/pg/datatypes/validate
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ EXPECTED_ROW_COUNT = {
'datetime_type2': 2,
'null_and_default' :2,
'decimal_types': 3,
'hstore_example': 13,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add data validations (for each row), for source-target, source-source-replica, target-source-replica(ignore if this is not supported).

You can use get_distinct_values_of_column_of_table

}

EXPECTED_SUM_OF_COLUMN = {
Expand Down Expand Up @@ -73,7 +74,7 @@ def migration_completed_checks_fb():
def migration_completed_checks(tgt):
table_list = tgt.get_table_names("public")
print("table_list:", table_list)
assert len(table_list) == 7
assert len(table_list) == 8

got_row_count = tgt.row_count_of_all_tables("public")
for table_name, row_count in EXPECTED_ROW_COUNT.items():
Expand Down
2 changes: 2 additions & 0 deletions migtests/tests/pg/datatypes/validateAfterChanges
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ EXPECTED_ROW_COUNT = {
'datetime_type2': 2,
'null_and_default' :2,
'decimal_types': 4,
'hstore_example': 16,
}

EXPECTED_SUM_OF_COLUMN = {
Expand Down Expand Up @@ -65,6 +66,7 @@ EXPECTED_ROW_COUNT_FF = {
'datetime_type2': 3,
'null_and_default' :2,
'decimal_types': 4,
'hstore_example': 16,
}

EXPECTED_SUM_OF_COLUMN_FF = {
Expand Down
1 change: 1 addition & 0 deletions yb-voyager/src/dbzm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/google/uuid"
log "github.com/sirupsen/logrus"

"github.com/yugabyte/yb-voyager/yb-voyager/src/utils"
)

Expand Down
2 changes: 1 addition & 1 deletion yb-voyager/src/srcdb/yugabytedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
)

// Apart from these we also skip UDT columns and error out for array of enums as unsupported tables.
var YugabyteUnsupportedDataTypesForDbzm = []string{"BOX", "CIRCLE", "LINE", "LSEG", "PATH", "PG_LSN", "POINT", "POLYGON", "TSQUERY", "TSVECTOR", "TXID_SNAPSHOT", "GEOMETRY", "GEOGRAPHY", "RASTER"}
var YugabyteUnsupportedDataTypesForDbzm = []string{"BOX", "CIRCLE", "LINE", "LSEG", "PATH", "PG_LSN", "POINT", "POLYGON", "TSQUERY", "TSVECTOR", "TXID_SNAPSHOT", "GEOMETRY", "GEOGRAPHY", "RASTER", "HSTORE"}

type YugabyteDB struct {
source *Source
Expand Down
Loading
Loading