Skip to content

Commit

Permalink
Change DLQ reporting (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
buinauskas authored Oct 24, 2023
1 parent a7ee4fd commit 8d35955
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ This connector has not yet been published to Confluent Hub. To install it, downl
install it using `confluent-hub` command line tool.

```sh
wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.7/vinted-kafka-connect-vespa-1.0.7-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.7-SNAPSHOT.zip -q
wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.8/vinted-kafka-connect-vespa-1.0.8-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.8-SNAPSHOT.zip -q
```

```sh
confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.7-SNAPSHOT.zip
confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.8-SNAPSHOT.zip
```

### Operational modes
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.vinted.kafka.connect.vespa</groupId>
<artifactId>kafka-connect-vespa</artifactId>
<version>1.0.7-SNAPSHOT</version>
<version>1.0.8-SNAPSHOT</version>
<name>kafka-connect-vespa</name>
<description>The Vespa Sink Connector is used to write data from Kafka to a Vespa search engine.</description>
<url>https://github.com/vinted/kafka-connect-vespa</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ public CompletableFuture<Result> handle(SinkRecord record, CompletableFuture<Res

future.whenComplete((result, throwable) -> {
if (result == null) {
reporter.report(record, throwable);
// An exception occurred while indexing the document

if (!isMalformed(throwable)) {
log.error(errorMessage(record), throwable);

promise.completeExceptionally(throwable);
} else {
reporter.report(record, throwable);

switch (config.behaviorOnMalformedDoc) {
case IGNORE:
log.info(ignoreMessage(record), throwable);
Expand All @@ -53,8 +55,12 @@ public CompletableFuture<Result> handle(SinkRecord record, CompletableFuture<Res
}
}
} else if (result.type() == Result.Type.success) {
// Document was indexed successfully
promise.complete(result);
} else {
// Document could not be indexed. Vespa responded with an error message.
// This usually is a condition not met error.

Throwable resultThrowable = new Throwable(result.toString());

reporter.report(record, resultThrowable);
Expand Down

0 comments on commit 8d35955

Please sign in to comment.