Skip to content

Commit

Permalink
Fix/send poisonous messages to dlq (#15)
Browse files Browse the repository at this point in the history
* Handle poisonous kafka messages

* Update to 1.0.7
  • Loading branch information
buinauskas authored Oct 23, 2023
1 parent acc9ab7 commit a7ee4fd
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 4 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ This connector has not yet been published to Confluent Hub. To install it, downl
install it using `confluent-hub` command line tool.

```sh
wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.6/vinted-kafka-connect-vespa-1.0.6-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.6-SNAPSHOT.zip -q
wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.7/vinted-kafka-connect-vespa-1.0.7-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.7-SNAPSHOT.zip -q
```

```sh
confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.6-SNAPSHOT.zip
confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.7-SNAPSHOT.zip
```

### Operational modes
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.vinted.kafka.connect.vespa</groupId>
<artifactId>kafka-connect-vespa</artifactId>
<version>1.0.6-SNAPSHOT</version>
<version>1.0.7-SNAPSHOT</version>
<name>kafka-connect-vespa</name>
<description>The Vespa Sink Connector is used to write data from Kafka to a Vespa search engine.</description>
<url>https://github.com/vinted/kafka-connect-vespa</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ private static boolean isMalformed(Throwable throwable) {
.findFirst()
.orElse(throwable);

return rootCause.toString().toLowerCase().contains("status 400")
String rootCauseString = rootCause.toString().toLowerCase();

return rootCauseString.contains("status 400")
|| rootCauseString.contains("string field value contains illegal code point")
|| rootCause instanceof OperationParseException
|| rootCause instanceof JsonParseException;
}
Expand Down

0 comments on commit a7ee4fd

Please sign in to comment.