diff --git a/.bandwidth/catalog-info.yaml b/.bandwidth/catalog-info.yaml new file mode 100644 index 0000000..f95a144 --- /dev/null +++ b/.bandwidth/catalog-info.yaml @@ -0,0 +1,11 @@ +apiVersion: backstage.io/v1alpha1 +kind: Location +metadata: + schemaVersion: v1.0.0 + name: kinesis-kafka-connector-location + description: Links to additional entities in the kinesis-kafka-connector repository. + annotations: + github.com/project-slug: Bandwidth/kinesis-kafka-connector +spec: + targets: + - ./component.yaml diff --git a/.bandwidth/component.yaml b/.bandwidth/component.yaml new file mode 100644 index 0000000..f8479ff --- /dev/null +++ b/.bandwidth/component.yaml @@ -0,0 +1,19 @@ +apiVersion: backstage.io/v1alpha1 +kind: Component +metadata: + schemaVersion: v1.0.0 + name: kinesis-kafka-connector + description: Forked from awslabs/kinesis-kafka-connector + annotations: + github.com/project-slug: Bandwidth/kinesis-kafka-connector + organization: BW + costCenter: undefined + platformType: OCP4 + buildPlatform: GitHub Actions +spec: + type: Service + owner: github/band-cx + lifecycle: Prototype + dependsOn: + - resource:platform/GitHub Actions + - resource:cost-center/undefined diff --git a/README.md b/README.md index ae9852f..057c17e 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,6 @@ You can build the project by running "maven package" and it will build amazon-ki | roleExternalID | IAM Role external-id (optional)| - | | roleDurationSeconds | Duration of STS assumeRole session (optional)| - | | usePartitionAsHashKey | Using Kafka partition key as hash key for Kinesis streams. | false | -| totalKafkaPartitions | Total number of kafka partitions that we are reading from | 20 | | maxBufferedTime | Maximum amount of time (milliseconds) a record may spend being buffered before it gets sent. Records may be sent sooner than this depending on the other buffering limits. Range: [100..... 9223372036854775807] | 15000 | | maxConnections | Maximum number of connections to open to the backend. HTTP requests are sent in parallel over multiple connections. Range: [1...256]. | 24 | | rateLimit | Limits the maximum allowed put rate for a shard, as a percentage of the backend limits. | 100 | diff --git a/config/kinesis-streams-kafka-connector.properties b/config/kinesis-streams-kafka-connector.properties index 568bb73..5e70b61 100644 --- a/config/kinesis-streams-kafka-connector.properties +++ b/config/kinesis-streams-kafka-connector.properties @@ -5,7 +5,6 @@ topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false -totalKafkaPartitions=20 flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true diff --git a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java index 4bb466a..914aad8 100644 --- a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java +++ b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java @@ -33,9 +33,7 @@ public class AmazonKinesisSinkConnector extends SinkConnector { public static final String AGGREGATION_ENABLED = "aggregation"; public static final String USE_PARTITION_AS_HASH_KEY = "usePartitionAsHashKey"; - - public static final String TOTAL_KAFKA_PARTITIONS = "totalKafkaPartitions"; - + public static final String FLUSH_SYNC = "flushSync"; public static final String SINGLE_KINESIS_PRODUCER_PER_PARTITION = "singleKinesisProducerPerPartition"; @@ -89,8 +87,6 @@ public class AmazonKinesisSinkConnector extends SinkConnector { private String aggregation; private String usePartitionAsHashKey; - - private String totalKafkaPartitions; private String flushSync; @@ -122,7 +118,6 @@ public void start(Map props) { metricsNameSpace = props.get(METRICS_NAMESPACE); aggregation = props.get(AGGREGATION_ENABLED); usePartitionAsHashKey = props.get(USE_PARTITION_AS_HASH_KEY); - totalKafkaPartitions = props.get(TOTAL_KAFKA_PARTITIONS); flushSync = props.get(FLUSH_SYNC); singleKinesisProducerPerPartition = props.get(SINGLE_KINESIS_PRODUCER_PER_PARTITION); pauseConsumption = props.get(PAUSE_CONSUMPTION); @@ -215,11 +210,6 @@ public List> taskConfigs(int maxTasks) { config.put(USE_PARTITION_AS_HASH_KEY, usePartitionAsHashKey); else config.put(USE_PARTITION_AS_HASH_KEY, "false"); - - if (totalKafkaPartitions != null) - config.put(TOTAL_KAFKA_PARTITIONS, totalKafkaPartitions); - else - config.put(TOTAL_KAFKA_PARTITIONS, "20"); if(flushSync != null) config.put(FLUSH_SYNC, flushSync); diff --git a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java index 4d0309b..d7672d1 100644 --- a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java +++ b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java @@ -1,36 +1,21 @@ package com.amazon.kinesis.kafka; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; -import com.amazonaws.services.kinesis.model.Shard; -import java.math.BigInteger; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; +import java.net.MalformedURLException; +import java.net.URL; import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; -import java.util.List; import java.util.Map; import com.amazonaws.util.StringUtils; import com.google.common.util.concurrent.MoreExecutors; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.sink.SinkTaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.amazonaws.services.kinesis.producer.Attempt; import com.amazonaws.services.kinesis.producer.KinesisProducer; @@ -41,6 +26,8 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AmazonKinesisSinkTask extends SinkTask { @@ -78,8 +65,6 @@ public class AmazonKinesisSinkTask extends SinkTask { private boolean usePartitionAsHashKey; - private int totalKafkaPartitions; - private boolean flushSync; private boolean singleKinesisProducerPerPartition; @@ -100,24 +85,6 @@ public class AmazonKinesisSinkTask extends SinkTask { private ConnectException putException; - private static final BigInteger MAX_HASH = BigInteger.valueOf(2).pow(128); - - private List kinesisShardsCache = null; - private final Object kinesisShardsCacheLock = new Object(); - private ScheduledExecutorService shardCacheRefresher; - private static final long SHARD_CACHE_REFRESH_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes - - private final Random rand = new Random(); - private final MessageDigest md5 = getMD5(); - - private static MessageDigest getMD5() { - try { - return MessageDigest.getInstance("MD5"); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException("Failed to get MD5 algorithm", e); - } - } - final FutureCallback callback = new FutureCallback() { @Override public void onFailure(Throwable t) { @@ -139,20 +106,6 @@ public void onSuccess(UserRecordResult result) { @Override public void initialize(SinkTaskContext context) { sinkTaskContext = context; - - // Start scheduled cache refresh - shardCacheRefresher = Executors.newSingleThreadScheduledExecutor(); - shardCacheRefresher.scheduleAtFixedRate(() -> { - try { - List shards = getKinesisShards(); - synchronized (kinesisShardsCacheLock) { - kinesisShardsCache = shards; - } - logger.info("Kinesis shard cache refreshed, {} shards loaded.", shards.size()); - } catch (Exception e) { - logger.error("Error refreshing Kinesis shard cache", e); - } - }, 0, SHARD_CACHE_REFRESH_INTERVAL_MS, TimeUnit.MILLISECONDS); } @Override @@ -292,119 +245,27 @@ private void checkForEarlierPutException() { } } - private List getCachedKinesisShards() { - if (kinesisShardsCache != null) { - return kinesisShardsCache; - } - - synchronized (kinesisShardsCacheLock) { - kinesisShardsCache = getKinesisShards(); - } - return kinesisShardsCache; - } - - private ListenableFuture addUserRecord(KinesisProducer kp, String streamName, String partitionKey, boolean usePartitionAsHashKey, SinkRecord sinkRecord) { - logger.debug("Adding user record for stream: {}, partitionKey: {}, kafkaPartition: {}", - streamName, partitionKey, sinkRecord.kafkaPartition()); - - if (!usePartitionAsHashKey) { - logger.debug( - "Not using partition as hash key for stream: {}, partitionKey: {}, kafkaPartition: {}", - streamName, partitionKey, sinkRecord.kafkaPartition()); - - return kp.addUserRecord(streamName, partitionKey, - DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value())); - } - - List shards = getCachedKinesisShards(); - - // determine which shard to send to - int shardIndex = selectShardWithSpread(partitionKey, shards.size(), totalKafkaPartitions); - Shard shard = shards.get(shardIndex); - - // get a hash that will send to that shard - String shardHashKey = shard.getHashKeyRange().getStartingHashKey(); - - // send it - return kp.addUserRecord(streamName, partitionKey, shardHashKey, - DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value())); - } - - /** - * Handles spreading the partition key across multiple shards if there are more shards than - * Kafka partitions. This helps to avoid hot-sharding when there are few Kafka partitions. - * If there are fewer shards than Kafka partitions, only one shard will be used per Kafka partition. - */ - private int selectShardWithSpread(String partitionKey, int numShards, int numKafkaPartitions) { - // Determine how many shards we should spread across. - int numShardsToSpreadAcross = (int) Math.ceil((double) numShards / numKafkaPartitions); - - // Pick a random integer between 0 and numShardsToSpreadAcross - 1 - int spreadShard = rand.nextInt(numShardsToSpreadAcross); - - String spreadPartitionKey = partitionKey + "-" + spreadShard; - - logger.debug("Spreading partition key {} across {} shards, using spread partition key {}. (Total shards: {} and kafka partitions: {})", - partitionKey, numShardsToSpreadAcross, spreadPartitionKey, numShards, numKafkaPartitions); - - return selectShard(spreadPartitionKey, numShards); - } - - /** - * Selects a shard index based on the provided partition key and number of shards. - * It uses MD5 hashing to ensure an even distribution across shards. - */ - private int selectShard(String partitionKey, int numShards) { - byte[] digest = md5.digest(partitionKey.getBytes(StandardCharsets.UTF_8)); - - // Create a hash of the partition key - BigInteger hashedPartitionKey = new BigInteger(1, digest); - - // Calculate how much of the hash space each shard covers - BigInteger hashSpacePerShard = MAX_HASH.divide(java.math.BigInteger.valueOf(numShards)); - - // Find where in our hash space the hashed partition key lands - BigInteger shardId = hashedPartitionKey.divide(hashSpacePerShard); - int shardIndex = shardId.intValue(); - if (shardIndex < 0 || shardIndex >= numShards) { - throw new IllegalArgumentException("Shard index out of bounds: " + shardIndex + " for numShards: " + numShards); + // The schema wasn't getting set when running locally with Localstack. + Schema valueSchema = sinkRecord.valueSchema(); + if (valueSchema == null) { + logger.warn("Sink Record Schema is null for record: {}:{}:{}:{}. This only should happen locally.", sinkRecord.topic(), sinkRecord.kafkaPartition(), + sinkRecord.key(), sinkRecord.value()); + valueSchema = Schema.STRING_SCHEMA; } - return shardIndex; - } - - private List getKinesisShards() { - AmazonKinesis kinesisClient = AmazonKinesisClientBuilder.standard() - .withRegion(regionName) - .build(); - List shards = new ArrayList<>(); - String exclusiveStartShardId = null; - do { - DescribeStreamRequest request = new DescribeStreamRequest() - .withStreamName(streamName); - if (exclusiveStartShardId != null) { - request.setExclusiveStartShardId(exclusiveStartShardId); - } - DescribeStreamResult result = kinesisClient.describeStream(request); - shards.addAll(result.getStreamDescription().getShards()); - if (result.getStreamDescription().getHasMoreShards()) { - exclusiveStartShardId = result.getStreamDescription().getShards() - .get(result.getStreamDescription().getShards().size() - 1) - .getShardId(); - } else { - exclusiveStartShardId = null; - } - - } while (exclusiveStartShardId != null); - kinesisClient.shutdown(); - // sorted shards by shardId to ensure consistent ordering - List sortedShards = new ArrayList<>(shards); - sortedShards.sort(Comparator.comparing(Shard::getShardId)); + // If configured use kafka partition key as explicit hash key + // This will be useful when sending data from same partition into + // same shard + if (usePartitionAsHashKey) + return kp.addUserRecord(streamName, partitionKey, Integer.toString(sinkRecord.kafkaPartition()), + DataUtility.parseValue(valueSchema, sinkRecord.value())); + else + return kp.addUserRecord(streamName, partitionKey, + DataUtility.parseValue(valueSchema, sinkRecord.value())); - return sortedShards; } @Override @@ -442,8 +303,6 @@ public void start(Map props) { usePartitionAsHashKey = Boolean.parseBoolean(props.get(AmazonKinesisSinkConnector.USE_PARTITION_AS_HASH_KEY)); - totalKafkaPartitions = Integer.parseInt(props.get(AmazonKinesisSinkConnector.TOTAL_KAFKA_PARTITIONS)); - flushSync = Boolean.parseBoolean(props.get(AmazonKinesisSinkConnector.FLUSH_SYNC)); singleKinesisProducerPerPartition = Boolean @@ -484,9 +343,6 @@ public void close(Collection partitions) { @Override public void stop() { - if (shardCacheRefresher != null) { - shardCacheRefresher.shutdownNow(); - } // destroying kinesis producers which were not closed as part of close if (singleKinesisProducerPerPartition) { for (KinesisProducer kp : producerMap.values()) { @@ -504,8 +360,23 @@ private KinesisProducer getKinesisProducer() { config.setRegion(regionName); config.setCredentialsProvider(IAMUtility.createCredentials(regionName, roleARN, roleExternalID, roleSessionName, roleDurationSeconds)); config.setMaxConnections(maxConnections); - if (!StringUtils.isNullOrEmpty(kinesisEndpoint)) - config.setKinesisEndpoint(kinesisEndpoint); + + if (!StringUtils.isNullOrEmpty(kinesisEndpoint)) { + try { + URL kinesisEndpointUrl = new URL(kinesisEndpoint); + config.setKinesisEndpoint(kinesisEndpointUrl.getHost()); + config.setKinesisPort(kinesisEndpointUrl.getPort()); + + // If we are using localstack, we need to disable certificate validation + // as localstack uses self-signed certificates + if ("https".equals(kinesisEndpointUrl.getProtocol()) && "localstack".equals( + kinesisEndpointUrl.getHost())) { + config.setVerifyCertificate(false); + } + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } config.setAggregationEnabled(aggregation);