Skip to content

Commit

Permalink
feat: Errant reporter for bad records (#7)
Browse files Browse the repository at this point in the history
Signed-off-by: Anush008 <[email protected]>
  • Loading branch information
Anush008 authored Dec 27, 2024
1 parent a780369 commit 41c39f8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 21 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ plugins {
}

group = 'io.qdrant'
version = '1.1.2'
version = '1.2.0'
description = 'Kafka Sink Connector for Qdrant.'
java.sourceCompatibility = JavaVersion.VERSION_1_8
java.targetCompatibility = JavaVersion.VERSION_1_8
Expand Down
55 changes: 35 additions & 20 deletions src/main/java/io/qdrant/kafka/QdrantSinkTask.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package io.qdrant.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.InvalidProtocolBufferException;
import io.qdrant.client.grpc.Points.PointStruct;
import java.util.*;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
Expand All @@ -14,10 +17,11 @@ public class QdrantSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(QdrantSinkTask.class);
private QdrantSinkConfig config;
private QdrantGrpc qdrantGrpc;
private ErrantRecordReporter reporter;

@Override
public String version() {
return "1.0.0";
return "1.2.0";
}

@Override
Expand All @@ -28,6 +32,10 @@ public void start(Map<String, String> props) {
protected void start(Map<String, String> props, QdrantGrpc qdrantGrpc) {
this.config = new QdrantSinkConfig(props);
this.qdrantGrpc = qdrantGrpc == null ? new QdrantGrpc(config) : qdrantGrpc;
this.reporter = context.errantRecordReporter();
if (reporter == null) {
log.warn("Errant record reporter is not configured.");
}
log.info("Starting QdrantSinkTask at " + config.getGrpcUrl());
}

Expand All @@ -36,34 +44,41 @@ public void put(Collection<SinkRecord> records) {
if (records.isEmpty()) {
return;
}
Map<String, Map<PointStruct, SinkRecord>> pointsWithRecords = new HashMap<>();

try {

Map<String, List<PointStruct>> points = new HashMap<>();
for (SinkRecord record : records) {
for (SinkRecord record : records) {
try {
if (record.value() == null) {
log.warn("Record value is null. Skipping.");
continue;
}
ValueExtractor e = new ValueExtractor(record.value());
e.validateOptions();

points
.computeIfAbsent(e.getCollectionName(), k -> new ArrayList<>())
.add(e.getPointStruct());
pointsWithRecords
.computeIfAbsent(e.getCollectionName(), k -> new HashMap<>())
.put(e.getPointStruct(), record);
} catch (InvalidProtocolBufferException | JsonProcessingException e) {
if (reporter == null) throw new DataException("Invalid sink record", e);
reporter.report(record, e);
}

points.forEach(
(collectionName, pointsList) -> {
try {
qdrantGrpc.upsert(collectionName, pointsList, null);
} catch (InterruptedException | ExecutionException e) {
throw new DataException("Qdrant server exception.", e);
}
});
} catch (Exception e) {
throw new DataException("Failed to put record.", e);
}

pointsWithRecords.forEach(
(collectionName, pointsMap) -> {
List<PointStruct> pointsList = new ArrayList<>(pointsMap.keySet());
try {
qdrantGrpc.upsert(collectionName, pointsList, null);
} catch (InterruptedException | ExecutionException e) {
pointsMap
.values()
.forEach(
record -> {
if (reporter == null)
throw new DataException("Qdrant server exception during upsert.", e);
reporter.report(record, e);
});
}
});
}

@Override
Expand Down

0 comments on commit 41c39f8

Please sign in to comment.