diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index 1395c00..abb8039 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -14,8 +14,10 @@ import java.time.Instant; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static java.util.stream.Collectors.toList; @@ -62,6 +64,15 @@ public SourceRecord toSourceRecord( String shardId, String sequenceNumber) throws Exception { + // Sanitise the incoming attributes to remove any invalid Avro characters + final Map sanitisedAttributes = attributes.entrySet().stream() + .collect(Collectors.toMap( + e -> this.sanitiseAttributeName(e.getKey()), + Map.Entry::getValue, + (u, v) -> u, + LinkedHashMap::new + )); + // Leveraging offsets to store shard and sequence number with each item pushed to Kafka. // This info will only be used to update `shardRegister` and won't be used to reset state after restart Map offsets = SourceInfo.toOffset(sourceInfo); @@ -70,13 +81,13 @@ public SourceRecord toSourceRecord( // DynamoDB keys can be changed only by recreating the table if (keySchema == null) { - keys = tableDesc.getKeySchema().stream().map(KeySchemaElement::getAttributeName).collect(toList()); + keys = tableDesc.getKeySchema().stream().map(this::sanitiseAttributeName).collect(toList()); keySchema = getKeySchema(keys); } Struct keyData = new Struct(getKeySchema(keys)); for (String key : keys) { - AttributeValue attributeValue = attributes.get(key); + AttributeValue attributeValue = sanitisedAttributes.get(key); if (attributeValue.getS() != null) { keyData.put(key, attributeValue.getS()); continue; @@ -89,7 +100,7 @@ public SourceRecord toSourceRecord( Struct valueData = new Struct(valueSchema) .put(Envelope.FieldName.VERSION, sourceInfo.version) - .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(attributes)) + .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(sanitisedAttributes)) .put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo)) .put(Envelope.FieldName.OPERATION, op.code()) .put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli()); @@ -113,4 +124,11 @@ private Schema getKeySchema(List keys) { return keySchemaBuilder.build(); } + private String sanitiseAttributeName(KeySchemaElement element) { + return this.sanitiseAttributeName(element.getAttributeName()); + } + + private String sanitiseAttributeName(final String attributeName) { + return attributeName.replaceAll("^[^a-zA-Z_]|(? getAttributes() { return attributes; } + private Map getAttributesWithInvalidAvroCharacters() { + Map attributes = new HashMap<>(); + attributes.put("test-1234", new AttributeValue().withS("testKV1Value")); + attributes.put("1-starts-with-number", new AttributeValue().withS("2")); + attributes.put("_starts_with_underscore", new AttributeValue().withN("1")); + attributes.put("test!@£$%^", new AttributeValue().withS("testStringValue")); + + return attributes; + } + + + private SourceInfo getSourceInfo(String table) { SourceInfo sourceInfo = new SourceInfo(table, Clock.fixed(Instant.parse("2001-01-02T00:00:00Z"), ZoneId.of("UTC"))); sourceInfo.initSyncStatus = InitSyncStatus.RUNNING; @@ -191,6 +203,81 @@ public void recordAttributesAreAddedToValueData() throws Exception { ((Struct) record.value()).getString("document")); } + @Test + public void singleItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws Exception { + // Arrange + List keySchema = new LinkedList<>(); + keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234")); + + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); + + // Act + SourceRecord record = converter.toSourceRecord( + getSourceInfo(table), + Envelope.Operation.forCode("r"), + getAttributesWithInvalidAvroCharacters(), + Instant.parse("2001-01-02T00:00:00.00Z"), + "testShardID1", + "testSequenceNumberID1" + ); + + // Assert + assertEquals("test_1234", record.keySchema().fields().get(0).name()); + assertEquals(SchemaBuilder.string().build(), record.keySchema().fields().get(0).schema()); + assertEquals("testKV1Value", ((Struct) record.key()).getString("test_1234")); + } + + @Test + public void multiItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws Exception { + // Arrange + List keySchema = new LinkedList<>(); + keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234")); + keySchema.add(new KeySchemaElement().withKeyType("N").withAttributeName("1-starts-with-number")); + + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); + + // Act + SourceRecord record = converter.toSourceRecord( + getSourceInfo(table), + Envelope.Operation.forCode("r"), + getAttributesWithInvalidAvroCharacters(), + Instant.parse("2001-01-02T00:00:00.00Z"), + "testShardID1", + "testSequenceNumberID1" + ); + + // Assert + assertEquals("test_1234", record.keySchema().fields().get(0).name()); + assertEquals(SchemaBuilder.string().build(), record.keySchema().fields().get(0).schema()); + assertEquals("testKV1Value", ((Struct) record.key()).getString("test_1234")); + + assertEquals("__starts_with_number", record.keySchema().fields().get(1).name()); + assertEquals(SchemaBuilder.string().build(), record.keySchema().fields().get(1).schema()); + assertEquals("2", ((Struct) record.key()).getString("__starts_with_number")); + } + + @Test + public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidCharacters() throws Exception { + // Arrange + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + + // Act + SourceRecord record = converter.toSourceRecord( + getSourceInfo(table), + Envelope.Operation.forCode("r"), + getAttributesWithInvalidAvroCharacters(), + Instant.parse("2001-01-02T00:00:00.00Z"), + "testShardID1", + "testSequenceNumberID1" + ); + + String expected = "{\"test_1234\":{\"s\":\"testKV1Value\"},\"_starts_with_underscore\":{\"n\":\"1\"},\"__starts_with_number\":{\"s\":\"2\"},\"test______\":{\"s\":\"testStringValue\"}}"; + + // Assert + assertEquals(expected, + ((Struct) record.value()).getString("document")); + } + @Test public void sourceInfoIsAddedToValueData() throws Exception { // Arrange