Skip to content

Commit

Permalink
Merge pull request #102 from cultureamp/acsims/modifypartitionchanges
Browse files Browse the repository at this point in the history
Small fixes to ModifyPartition plugin
  • Loading branch information
amatueva authored Feb 27, 2025
2 parents da2fcc6 + 968f625 commit 8a42f72
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Assume the following configuration:

```json
"transforms": "ModifyPartition",
"transforms.ModifyPartition.type":"com.cultureamp.kafka.connect.transforms.ModifyPartition",
"transforms.ModifyPartition.type":"com.cultureamp.kafka.connect.plugins.transforms.ModifyPartition",
"transforms.ModifyPartition.header.key": "account_id"
"transforms.ModifyPartition.number.partitions": "10"
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ModifyPartition<R : ConnectRecord<R>> : Transformation<R> {
} else if (partitionCount!! <= 0) {
throw ConnectException("Partition count should be greater than 0")
}
val userSpecifiedPartitionKey = record.headers().lastWithName(headerKey).value() as String?
val userSpecifiedPartitionKey = record.headers().lastWithName(headerKey)?.value() as String?

if (userSpecifiedPartitionKey != null) {
val partitionNumber = Partitioner(CRC32(), partitionCount!!).partitionNumberFor(userSpecifiedPartitionKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,27 @@ class ModifyPartitionTest {
return messageHeaders
}

@Test
fun `a missing header key should not result in a null pointer exception, but a connect exception`() {
val partitionSmt = configure("header.key" to "account_id", "number.partitions" to 10)
val record = SourceRecord(
null,
null,
"test",
0,
null,
null,
null,
"",
789L,
headers()
)

assertFailsWith<ConnectException> {
partitionSmt.apply(record)
}
}

@Test
fun `calculate the partition number using the account_id header`() {
val partitionSmt = configure("header.key" to "account_id", "number.partitions" to 10)
Expand Down

0 comments on commit 8a42f72

Please sign in to comment.