Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .bandwidth/catalog-info.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: backstage.io/v1alpha1
kind: Location
metadata:
schemaVersion: v1.0.0
name: kinesis-kafka-connector-location
description: Links to additional entities in the kinesis-kafka-connector repository.
annotations:
github.com/project-slug: Bandwidth/kinesis-kafka-connector
spec:
targets:
- ./component.yaml
19 changes: 19 additions & 0 deletions .bandwidth/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apiVersion: backstage.io/v1alpha1
kind: Component
metadata:
schemaVersion: v1.0.0
name: kinesis-kafka-connector
description: Forked from awslabs/kinesis-kafka-connector
annotations:
github.com/project-slug: Bandwidth/kinesis-kafka-connector
organization: BW
costCenter: undefined
platformType: OCP4
buildPlatform: GitHub Actions
spec:
type: Service
owner: github/band-cx
lifecycle: Prototype
dependsOn:
- resource:platform/GitHub Actions
- resource:cost-center/undefined
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ You can build the project by running "maven package" and it will build amazon-ki
| roleExternalID | IAM Role external-id (optional)| - |
| roleDurationSeconds | Duration of STS assumeRole session (optional)| - |
| usePartitionAsHashKey | Using Kafka partition key as hash key for Kinesis streams. | false |
| totalKafkaPartitions | Total number of kafka partitions that we are reading from | 20 |
| maxBufferedTime | Maximum amount of time (milliseconds) a record may spend being buffered before it gets sent. Records may be sent sooner than this depending on the other buffering limits. Range: [100..... 9223372036854775807] | 15000 |
| maxConnections | Maximum number of connections to open to the backend. HTTP requests are sent in parallel over multiple connections. Range: [1...256]. | 24 |
| rateLimit | Limits the maximum allowed put rate for a shard, as a percentage of the backend limits. | 100 |
Expand Down
1 change: 0 additions & 1 deletion config/kinesis-streams-kafka-connector.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ topics=YOUR_TOPIC_NAME
region=us-east-1
streamName=YOUR_STREAM_NAME
usePartitionAsHashKey=false
totalKafkaPartitions=20
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ public class AmazonKinesisSinkConnector extends SinkConnector {
public static final String AGGREGATION_ENABLED = "aggregation";

public static final String USE_PARTITION_AS_HASH_KEY = "usePartitionAsHashKey";

public static final String TOTAL_KAFKA_PARTITIONS = "totalKafkaPartitions";


public static final String FLUSH_SYNC = "flushSync";

public static final String SINGLE_KINESIS_PRODUCER_PER_PARTITION = "singleKinesisProducerPerPartition";
Expand Down Expand Up @@ -89,8 +87,6 @@ public class AmazonKinesisSinkConnector extends SinkConnector {
private String aggregation;

private String usePartitionAsHashKey;

private String totalKafkaPartitions;

private String flushSync;

Expand Down Expand Up @@ -122,7 +118,6 @@ public void start(Map<String, String> props) {
metricsNameSpace = props.get(METRICS_NAMESPACE);
aggregation = props.get(AGGREGATION_ENABLED);
usePartitionAsHashKey = props.get(USE_PARTITION_AS_HASH_KEY);
totalKafkaPartitions = props.get(TOTAL_KAFKA_PARTITIONS);
flushSync = props.get(FLUSH_SYNC);
singleKinesisProducerPerPartition = props.get(SINGLE_KINESIS_PRODUCER_PER_PARTITION);
pauseConsumption = props.get(PAUSE_CONSUMPTION);
Expand Down Expand Up @@ -215,11 +210,6 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
config.put(USE_PARTITION_AS_HASH_KEY, usePartitionAsHashKey);
else
config.put(USE_PARTITION_AS_HASH_KEY, "false");

if (totalKafkaPartitions != null)
config.put(TOTAL_KAFKA_PARTITIONS, totalKafkaPartitions);
else
config.put(TOTAL_KAFKA_PARTITIONS, "20");

if(flushSync != null)
config.put(FLUSH_SYNC, flushSync);
Expand Down
203 changes: 37 additions & 166 deletions src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java
Original file line number Diff line number Diff line change
@@ -1,36 +1,21 @@
package com.amazon.kinesis.kafka;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.Shard;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.util.StringUtils;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.kinesis.producer.Attempt;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
Expand All @@ -41,6 +26,8 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmazonKinesisSinkTask extends SinkTask {

Expand Down Expand Up @@ -78,8 +65,6 @@ public class AmazonKinesisSinkTask extends SinkTask {

private boolean usePartitionAsHashKey;

private int totalKafkaPartitions;

private boolean flushSync;

private boolean singleKinesisProducerPerPartition;
Expand All @@ -100,24 +85,6 @@ public class AmazonKinesisSinkTask extends SinkTask {

private ConnectException putException;

private static final BigInteger MAX_HASH = BigInteger.valueOf(2).pow(128);

private List<Shard> kinesisShardsCache = null;
private final Object kinesisShardsCacheLock = new Object();
private ScheduledExecutorService shardCacheRefresher;
private static final long SHARD_CACHE_REFRESH_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes

private final Random rand = new Random();
private final MessageDigest md5 = getMD5();

private static MessageDigest getMD5() {
try {
return MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Failed to get MD5 algorithm", e);
}
}

final FutureCallback<UserRecordResult> callback = new FutureCallback<UserRecordResult>() {
@Override
public void onFailure(Throwable t) {
Expand All @@ -139,20 +106,6 @@ public void onSuccess(UserRecordResult result) {
@Override
public void initialize(SinkTaskContext context) {
sinkTaskContext = context;

// Start scheduled cache refresh
shardCacheRefresher = Executors.newSingleThreadScheduledExecutor();
shardCacheRefresher.scheduleAtFixedRate(() -> {
try {
List<Shard> shards = getKinesisShards();
synchronized (kinesisShardsCacheLock) {
kinesisShardsCache = shards;
}
logger.info("Kinesis shard cache refreshed, {} shards loaded.", shards.size());
} catch (Exception e) {
logger.error("Error refreshing Kinesis shard cache", e);
}
}, 0, SHARD_CACHE_REFRESH_INTERVAL_MS, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -292,119 +245,27 @@ private void checkForEarlierPutException() {
}
}

private List<Shard> getCachedKinesisShards() {
if (kinesisShardsCache != null) {
return kinesisShardsCache;
}

synchronized (kinesisShardsCacheLock) {
kinesisShardsCache = getKinesisShards();
}
return kinesisShardsCache;
}


private ListenableFuture<UserRecordResult> addUserRecord(KinesisProducer kp, String streamName, String partitionKey,
boolean usePartitionAsHashKey, SinkRecord sinkRecord) {

logger.debug("Adding user record for stream: {}, partitionKey: {}, kafkaPartition: {}",
streamName, partitionKey, sinkRecord.kafkaPartition());

if (!usePartitionAsHashKey) {
logger.debug(
"Not using partition as hash key for stream: {}, partitionKey: {}, kafkaPartition: {}",
streamName, partitionKey, sinkRecord.kafkaPartition());

return kp.addUserRecord(streamName, partitionKey,
DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));
}

List<Shard> shards = getCachedKinesisShards();

// determine which shard to send to
int shardIndex = selectShardWithSpread(partitionKey, shards.size(), totalKafkaPartitions);
Shard shard = shards.get(shardIndex);

// get a hash that will send to that shard
String shardHashKey = shard.getHashKeyRange().getStartingHashKey();

// send it
return kp.addUserRecord(streamName, partitionKey, shardHashKey,
DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));
}

/**
* Handles spreading the partition key across multiple shards if there are more shards than
* Kafka partitions. This helps to avoid hot-sharding when there are few Kafka partitions.
* If there are fewer shards than Kafka partitions, only one shard will be used per Kafka partition.
*/
private int selectShardWithSpread(String partitionKey, int numShards, int numKafkaPartitions) {
// Determine how many shards we should spread across.
int numShardsToSpreadAcross = (int) Math.ceil((double) numShards / numKafkaPartitions);

// Pick a random integer between 0 and numShardsToSpreadAcross - 1
int spreadShard = rand.nextInt(numShardsToSpreadAcross);

String spreadPartitionKey = partitionKey + "-" + spreadShard;

logger.debug("Spreading partition key {} across {} shards, using spread partition key {}. (Total shards: {} and kafka partitions: {})",
partitionKey, numShardsToSpreadAcross, spreadPartitionKey, numShards, numKafkaPartitions);

return selectShard(spreadPartitionKey, numShards);
}

/**
* Selects a shard index based on the provided partition key and number of shards.
* It uses MD5 hashing to ensure an even distribution across shards.
*/
private int selectShard(String partitionKey, int numShards) {
byte[] digest = md5.digest(partitionKey.getBytes(StandardCharsets.UTF_8));

// Create a hash of the partition key
BigInteger hashedPartitionKey = new BigInteger(1, digest);

// Calculate how much of the hash space each shard covers
BigInteger hashSpacePerShard = MAX_HASH.divide(java.math.BigInteger.valueOf(numShards));

// Find where in our hash space the hashed partition key lands
BigInteger shardId = hashedPartitionKey.divide(hashSpacePerShard);
int shardIndex = shardId.intValue();
if (shardIndex < 0 || shardIndex >= numShards) {
throw new IllegalArgumentException("Shard index out of bounds: " + shardIndex + " for numShards: " + numShards);
// The schema wasn't getting set when running locally with Localstack.
Schema valueSchema = sinkRecord.valueSchema();
if (valueSchema == null) {
logger.warn("Sink Record Schema is null for record: {}:{}:{}:{}. This only should happen locally.", sinkRecord.topic(), sinkRecord.kafkaPartition(),
sinkRecord.key(), sinkRecord.value());
valueSchema = Schema.STRING_SCHEMA;
}

return shardIndex;
}

private List<Shard> getKinesisShards() {
AmazonKinesis kinesisClient = AmazonKinesisClientBuilder.standard()
.withRegion(regionName)
.build();
List<Shard> shards = new ArrayList<>();
String exclusiveStartShardId = null;
do {
DescribeStreamRequest request = new DescribeStreamRequest()
.withStreamName(streamName);
if (exclusiveStartShardId != null) {
request.setExclusiveStartShardId(exclusiveStartShardId);
}
DescribeStreamResult result = kinesisClient.describeStream(request);
shards.addAll(result.getStreamDescription().getShards());
if (result.getStreamDescription().getHasMoreShards()) {
exclusiveStartShardId = result.getStreamDescription().getShards()
.get(result.getStreamDescription().getShards().size() - 1)
.getShardId();
} else {
exclusiveStartShardId = null;
}

} while (exclusiveStartShardId != null);
kinesisClient.shutdown();
// sorted shards by shardId to ensure consistent ordering
List<Shard> sortedShards = new ArrayList<>(shards);
sortedShards.sort(Comparator.comparing(Shard::getShardId));
// If configured use kafka partition key as explicit hash key
// This will be useful when sending data from same partition into
// same shard
if (usePartitionAsHashKey)
return kp.addUserRecord(streamName, partitionKey, Integer.toString(sinkRecord.kafkaPartition()),
DataUtility.parseValue(valueSchema, sinkRecord.value()));
else
return kp.addUserRecord(streamName, partitionKey,
DataUtility.parseValue(valueSchema, sinkRecord.value()));

return sortedShards;
}

@Override
Expand Down Expand Up @@ -442,8 +303,6 @@ public void start(Map<String, String> props) {

usePartitionAsHashKey = Boolean.parseBoolean(props.get(AmazonKinesisSinkConnector.USE_PARTITION_AS_HASH_KEY));

totalKafkaPartitions = Integer.parseInt(props.get(AmazonKinesisSinkConnector.TOTAL_KAFKA_PARTITIONS));

flushSync = Boolean.parseBoolean(props.get(AmazonKinesisSinkConnector.FLUSH_SYNC));

singleKinesisProducerPerPartition = Boolean
Expand Down Expand Up @@ -484,9 +343,6 @@ public void close(Collection<TopicPartition> partitions) {

@Override
public void stop() {
if (shardCacheRefresher != null) {
shardCacheRefresher.shutdownNow();
}
// destroying kinesis producers which were not closed as part of close
if (singleKinesisProducerPerPartition) {
for (KinesisProducer kp : producerMap.values()) {
Expand All @@ -504,8 +360,23 @@ private KinesisProducer getKinesisProducer() {
config.setRegion(regionName);
config.setCredentialsProvider(IAMUtility.createCredentials(regionName, roleARN, roleExternalID, roleSessionName, roleDurationSeconds));
config.setMaxConnections(maxConnections);
if (!StringUtils.isNullOrEmpty(kinesisEndpoint))
config.setKinesisEndpoint(kinesisEndpoint);

if (!StringUtils.isNullOrEmpty(kinesisEndpoint)) {
try {
URL kinesisEndpointUrl = new URL(kinesisEndpoint);
config.setKinesisEndpoint(kinesisEndpointUrl.getHost());
config.setKinesisPort(kinesisEndpointUrl.getPort());

// If we are using localstack, we need to disable certificate validation
// as localstack uses self-signed certificates
if ("https".equals(kinesisEndpointUrl.getProtocol()) && "localstack".equals(
kinesisEndpointUrl.getHost())) {
config.setVerifyCertificate(false);
}
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}

config.setAggregationEnabled(aggregation);

Expand Down