-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: MAP value converter for data via debezium #2156
Changes from 25 commits
1ccdb8e
78ba6c5
2f5b411
a952d74
7ed4582
1a56540
16399c4
891cddf
d2433c3
a72b633
2849530
0a22864
2c9e33b
73bffd0
b013864
69c393f
dd4e212
b894c51
c996e46
ffbe291
24d9530
099f1c1
15f04d6
d12e8ed
d4265dc
d746bd1
4ce41f2
4f779a3
1830dad
88d0cf5
03e2b96
cb27941
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,12 +16,16 @@ | |
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.Map; | ||
import java.util.HashMap; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* This class ensures of doing any transformation of the record received from debezium | ||
* before actually writing that record. | ||
*/ | ||
public class DebeziumRecordTransformer implements RecordTransformer { | ||
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordTransformer.class); | ||
|
||
private JsonConverter jsonConverter; | ||
public DebeziumRecordTransformer(){ | ||
|
@@ -70,6 +74,36 @@ private String makeFieldValueSerializable(Object fieldValue, Field field){ | |
case BYTES: | ||
case STRUCT: | ||
return toKafkaConnectJsonConverted(fieldValue, field); | ||
case MAP: | ||
StringBuilder mapString = new StringBuilder(); | ||
for (Map.Entry<String, String> entry : ((HashMap<String, String>) fieldValue).entrySet()) { | ||
String key = entry.getKey(); | ||
String val = entry.getValue(); | ||
/* | ||
Escaping the key and value here for the double quote (")" and backslash char (\) with a backslash character as mentioned here | ||
https://www.postgresql.org/docs/9/hstore.html#:~:text=To%20include%20a%20double%20quote%20or%20a%20backslash%20in%20a%20key%20or%20value%2C%20escape%20it%20with%20a%20backslash. | ||
|
||
Following the order of escaping the backslash first and then the double quote becasue first escape the backslashes in the string and adding the backslash for escaping to handle case like | ||
e.g. key - "a\"b" -> (first escaping) -> "a\\"b" -> (second escaping) -> "a\\\"b" | ||
*/ | ||
key = key.replace("\\", "\\\\"); // escaping backslash \ -> \\ ( "a\b" -> "a\\b" ) " | ||
val = val.replace("\\", "\\\\"); | ||
key = key.replace("\"", "\\\""); // escaping double quotes " -> \" ( "a"b" -> "a\"b" ) " | ||
val = val.replace("\"", "\\\""); | ||
mapString.append("\""); | ||
mapString.append(key); | ||
mapString.append("\""); | ||
mapString.append(" => "); | ||
mapString.append("\""); | ||
mapString.append(val); | ||
mapString.append("\""); | ||
mapString.append(","); | ||
Comment on lines
+99
to
+106
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @priyanshi-yb, do we have something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 that would be a better. No idea why I wrote it like this initially 😀 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is this |
||
} | ||
if(mapString.length() == 0) { | ||
return ""; | ||
} | ||
return mapString.toString().substring(0, mapString.length() - 1); | ||
|
||
} | ||
return fieldValue.toString(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -136,6 +136,14 @@ main() { | |
fi | ||
fi | ||
|
||
if [ "${TEST_DIR}" = "${TESTS_DIR}/pg/datatypes" ]; then | ||
cat ${EXPORT_DIR}/data/hstore_example_data.sql | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is just for debugging? remove? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
if [ "${BETA_FAST_DATA_EXPORT}" = "1" ]; then | ||
cat ${EXPORT_DIR}/data/schemas/source_db_exporter/hstore_example_schema.json | ||
cat ${EXPORT_DIR}/logs/debezium-source_db_exporter.log | ||
fi | ||
fi | ||
|
||
step "Fix data." | ||
if [ -x "${TEST_DIR}/fix-data" ] | ||
then | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ EXPECTED_ROW_COUNT = { | |
'datetime_type2': 2, | ||
'null_and_default' :2, | ||
'decimal_types': 3, | ||
'hstore_example': 13, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's also add data validations (for each row), for source-target, source-source-replica, target-source-replica(ignore if this is not supported). You can use |
||
} | ||
|
||
EXPECTED_SUM_OF_COLUMN = { | ||
|
@@ -73,7 +74,7 @@ def migration_completed_checks_fb(): | |
def migration_completed_checks(tgt): | ||
table_list = tgt.get_table_names("public") | ||
print("table_list:", table_list) | ||
assert len(table_list) == 7 | ||
assert len(table_list) == 8 | ||
|
||
got_row_count = tgt.row_count_of_all_tables("public") | ||
for table_name, row_count in EXPECTED_ROW_COUNT.items(): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add debug logs before and after our transformation for the key and value.