Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
added ConsumerConfig test plus missing consumer fields
Browse files Browse the repository at this point in the history
  • Loading branch information
markglh committed Sep 8, 2017
1 parent 2997479 commit 9f60a4c
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 56 deletions.
25 changes: 21 additions & 4 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ kinesis {
# Default: 443
# Minimum: 1
# Maximum (inclusive): 65535
#KinesisPort = 443
# KinesisPort = 443

# If true, throttled puts are not retried. The records that got throttled
# will be failed immediately upon receiving the throttling error. This is
Expand Down Expand Up @@ -527,11 +527,11 @@ kinesis {
# http://developer.amazonwebservices.com/connect/entry.jspa?externalID=3912
#
# Default: null
#kinesisEndpoint = https://kinesis
#kinesisEndpoint = "https://kinesis"

# DynamoDB endpoint
# Default: null
#DynamoDBEndpoint =
#dynamoDBEndpoint = "https://dynamo"

# Don't call processRecords() on the record processor for empty record lists.
# Enables applications flush/checkpoint (if they have some data "in progress but don't get new data for while)
Expand Down Expand Up @@ -582,6 +582,13 @@ kinesis {
# Default: Operation, ShardId
#metricsEnabledDimensions = Operation, ShardId

# Sets the max size of the thread pool that will be used to renew leases.
# Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate.
#
# Min: 2
# Default: 20
#maxLeaseRenewalThreads=20


# The max number of leases (shards) this worker should process.
# This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
Expand Down Expand Up @@ -628,7 +635,17 @@ kinesis {

# TableName name of the lease table in DynamoDB
# Default = <applicationName>
#TableName =
#tableName =

# A timeout when dispatching records to the client MultiLang record processor.
# If the record processor doesn't respond within the timeout the parent Java process will be terminated.
# This is a temporary fix to handle cases where the KCL becomes blocked while waiting for a client record processor.
# Setting this can cause the KCL to exit suddenly,
# before using this ensure that you have an automated restart for your application
#
# Default: no timeout
#timeoutInSeconds =

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,52 +59,157 @@ class KinesisConsumerSpec

val kinesisConfig = ConfigFactory
.parseString("""
|kinesis {
|
| application-name = "TestSpec"
|
| testConsumer-1 {
| stream-name = "test-kinesis-reliability"
|
| worker {
| batchTimeoutSeconds = 1234
| gracefulShutdownHook = false
| shutdownTimeoutSeconds = 2
| }
|
| checkpointer {
| backoffMillis = 4321
| }
|
| kcl {
| AWSCredentialsProvider = EnvironmentVariableCredentialsProvider
| regionName = us-east-1
| KinesisEndpoint = "CustomKinesisEndpoint"
| DynamoDBEndpoint = "CustomDynamoDBEndpoint"
| SkipShardSyncAtStartupIfLeasesExist = true
| TableName = "TableName"
| }
| }
|
| testConsumer-2 {
| stream-name = "some-other-stream"
|
| worker {
| failedMessageRetries = 3
| gracefulShutdownHook = false
| }
|
| checkpointer {
| backoffMillis = 111
| }
|
| kcl {
| AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
| regionName = us-east-2
| }
| }
|}
""".stripMargin)
|kinesis {
|
| application-name = "TestSpec"
|
| testConsumer-1 {
| stream-name = "test-kinesis-reliability"
|
| worker {
| batchTimeoutSeconds = 1234
| gracefulShutdownHook = false
| shutdownTimeoutSeconds = 2
| }
|
| checkpointer {
| backoffMillis = 4321
| }
|
| kcl {
| AWSCredentialsProvider = EnvironmentVariableCredentialsProvider
| regionName = us-east-1
| KinesisEndpoint = "CustomKinesisEndpoint"
| DynamoDBEndpoint = "CustomDynamoDBEndpoint"
| SkipShardSyncAtStartupIfLeasesExist = true
| TableName = "TableName"
| }
| }
|
| testConsumer-2 {
| stream-name = "some-other-stream"
|
| worker {
| failedMessageRetries = 3
| gracefulShutdownHook = false
| }
|
| checkpointer {
| backoffMillis = 111
| }
|
| kcl {
| AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
| regionName = us-east-2
| }
| }
|
| testConsumer-3 {
| stream-name = "some-other-stream"
|
| worker {
| batchTimeoutSeconds = 1234
| gracefulShutdownHook = false
| shutdownTimeoutSeconds = 2
| }
|
| checkpointer {
| backoffMillis = 4321
| }
|
| kcl {
| AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
|
| regionName = us-east-2
|
| # Default: LATEST
| initialPositionInStream = TRIM_HORIZON
|
| # Default = 10000
| maxRecords = 20000
|
| # Default = 1000
| idleTimeBetweenReadsInMillis = 1234
|
| # Default: 10000
| failoverTimeMillis = 11000
|
| # Default: 60000
| shardSyncIntervalMillis = 70000
|
| # Default: true
| cleanupLeasesUponShardCompletion = false
|
| # Default: true
| validateSequenceNumberBeforeCheckpointing = false
|
| # Default: null
| kinesisEndpoint = "https://kinesis"
|
| # Default: null
| dynamoDBEndpoint = "https://dynamo"
|
| # Default: false
| callProcessRecordsEvenForEmptyRecordList = true
|
| # Default: 10000
| parentShardPollIntervalMillis = 40000
|
| # Default: 500
| taskBackoffTimeMillis = 600
|
| # Default: 10000
| metricsBufferTimeMillis = 10001
|
|
| # Default: 10000
| metricsMaxQueueSize = 10009
|
|
| # Default: DETAILED
| metricsLevel = NONE
|
|
| # Default: Operation, ShardId
| metricsEnabledDimensions = Operation
|
|
| # Default: 2147483647 (Integer.MAX_VALUE)
| maxLeasesForWorker = 11111111
|
|
| # Default: 1
| maxLeasesToStealAtOneTime = 2
|
|
| # Default: 10
| initialLeaseTableReadCapacity = 15
|
|
| # Default: 10
| initialLeaseTableWriteCapacity = 14
|
| # Default: false
| skipShardSyncAtStartupIfLeasesExist=true
|
|
| # Default: <applicationName>
| userAgent = testy123
|
| # Default = <applicationName>
| tableName = meh
|
| # Default: 20
| maxLeaseRenewalThreads=9
|
|
| # Default: no timeout
| timeoutInSeconds = 10
| }
|
| }
|}
""".stripMargin)
.getConfig("kinesis")
.withFallback(defaultKinesisConfig)

Expand All @@ -120,7 +225,7 @@ class KinesisConsumerSpec
.createProcessor() shouldBe a[ConsumerProcessingManager]
}

def assertConsumer1(): Assertion = {
def assertConsumer1Config(): Assertion = {
val consumerConf = ConsumerConf(kinesisConfig, "testConsumer-1")

consumerConf.workerConf.batchTimeout should be(1234.seconds)
Expand Down Expand Up @@ -157,11 +262,78 @@ class KinesisConsumerSpec
}

"Should parse the Config into a ConsumerConf for a single consumer" in {
assertConsumer1()
assertConsumer1Config()
}

"Should parse consumer 3 the Config into a ConsumerConf, setting all properties in the KinesisClientLibConfiguration" in {
//This will fail when fields are added or renamed in the KCL

// Some setters don't match the field names.
val confToFieldConversions = Map(
"skipShardSyncAtStartupIfLeasesExist" -> "skipShardSyncAtWorkerInitializationIfLeasesExist"
)

val fieldsToSkip = List(
"useragent", //this gets nested internally
"streamname",
"timestampatinitialpositioninstream",
"commonclientconfig",
"shardprioritizationstrategy",
"kinesisclientconfig",
"dynamodbclientconfig",
"cloudwatchclientconfig",
"credentialsprovider", //these must be tested individually
"applicationname"
)

val consumerConf = ConsumerConf(kinesisConfig, "testConsumer-3")

consumerConf.workerConf.batchTimeout should be(1234.seconds)
consumerConf.workerConf.failedMessageRetries should be(1)
consumerConf.workerConf.failureTolerancePercentage should be(0.25)
consumerConf.workerConf.shutdownHook should be(false)
consumerConf.workerConf.shutdownTimeout should be(Timeout(2.seconds))
consumerConf.checkpointerConf.backoff should be(4321.millis)
consumerConf.checkpointerConf.interval should be(2000.millis) //reference default
consumerConf.checkpointerConf.notificationDelay should be(1000.millis) //reference default
consumerConf.dispatcher should be(Some("kinesis.akka.default-dispatcher"))
consumerConf.kclConfiguration.getApplicationName should be(
"TestSpec-some-other-stream"
)

val kclConfig = kinesisConfig.getConfig("testConsumer-3.kcl")
val kclLibConfiguration = consumerConf.kclConfiguration

//We're dealing with Java classes so using Java reflection is cleaner here
//Start with the setters to prevent picking up all the unrelated private fields, stripping the "with"
val configKeys = kclLibConfiguration.getClass.getDeclaredMethods
.filter(_.getName.startsWith("with"))
.map(_.getName.drop(4))
.map(field => field.head.toLower + field.tail)
.filterNot(
field => //java.lang.reflect.Modifier.isStatic(field.getModifiers)
fieldsToSkip.contains(field.toLowerCase)
)

configKeys foreach { configKey =>
val field =
kclLibConfiguration.getClass.getDeclaredField(
confToFieldConversions.getOrElse(configKey, configKey)
)
field.setAccessible(true)

withClue(
s"Property `$configKey` was not as expected when asserting the KCL configuration: "
) {
kclConfig.hasPath(configKey) should be(true)
field.get(kclLibConfiguration).toString should include(kclConfig.getString(configKey))
}
}

}

"Should parse the Config into multiple ConsumerConf objects for multiple consumers" in {
assertConsumer1()
assertConsumer1Config()

val consumerConf2 = ConsumerConf(kinesisConfig, "testConsumer-2")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ProducerConfSpec
val defaultKinesisConfig =
ConfigFactory.parseFile(new File("src/main/resources/reference.conf")).getConfig("kinesis")

val kinesisConfig2 = ConfigFactory
val kinesisConfig = ConfigFactory
.parseString(
"""
|kinesis {
Expand Down Expand Up @@ -211,14 +211,14 @@ class ProducerConfSpec
"The ProducerConf" - {

"Should parse the Config into a ProducerConf, setting all properties in the KinesisProducerConfiguration" in {
val producerConf = ProducerConf(kinesisConfig2, "testProducer")
val producerConf = ProducerConf(kinesisConfig, "testProducer")

producerConf.streamName should be("core-test-kinesis-producer")
producerConf.dispatcher should be(Some("kinesis.akka.custom-dispatcher"))
producerConf.throttlingConf.get.maxOutstandingRequests should be(50000)
producerConf.throttlingConf.get.retryDuration should be(100.millis)

val kplConfig = kinesisConfig2.getConfig("testProducer.kpl")
val kplConfig = kinesisConfig.getConfig("testProducer.kpl")

val kplLibConfiguration = producerConf.kplLibConfiguration
kplLibConfiguration.isAggregationEnabled should be(false)
Expand Down

0 comments on commit 9f60a4c

Please sign in to comment.