Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Feature/issue28 test kpl properties #30

Merged
merged 17 commits into from
Sep 11, 2017
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,22 +410,21 @@ kpa ! Send(producerEvent) //Send without a callback confirmation

<a name="usage-usage-producer-pure-scala-based-implementation-simple-wrapper-around-kpl"></a>
### Pure Scala based implementation (simple wrapper around KPL)
*Note that throttling will be unavailable using this method.*
*Note that future throttling will be unavailable using this method.*

```scala
import java.util.UUID
import com.amazonaws.services.kinesis.producer.{UserRecordFailedException, UserRecordResult}
import com.weightwatchers.reactive.kinesis.producer.KinesisProducer
import com.weightwatchers.reactive.kinesis.producer.ProducerConf
import com.typesafe.config._
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerKPL
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global //Not for production

val kinesisConfig: Config = ConfigFactory.load().getConfig("kinesis")
val producerConfig: Config = kinesisConfig.getConfig("some-producer")
val streamName: String = producerConfig.getString("stream-name")

val kpl = KinesisProducerKPL(kinesisConfig.getConfig("kpl"), streamName)
val kpl = KinesisProducer(ProducerConf(kinesisConfig, "some-producer"))

val producerEvent = ProducerEvent(UUID.randomUUID.toString, "{Some Payload}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.{
SendSuccessful,
SendWithCallback
}
import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, KinesisProducerKPL}
import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, KinesisProducer}

import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -62,7 +62,7 @@ class SimpleKinesisProducer(kConfig: Config) extends Actor with LazyLogging {
//We're creating the producer the hard way to get access to the underlying KPL
val kpaProps = KinesisProducerActor.props(kinesisConfig, "testProducer")
val kpa = context.actorOf(kpaProps)
val kinesisProducerKPL = kpaProps.args.head.asInstanceOf[KinesisProducerKPL]
val kinesisProducerKPL = kpaProps.args.head.asInstanceOf[KinesisProducer]

/* producer without actor:
val producerConfig = kinesisConfig.getConfig("testProducer")
Expand Down
6 changes: 3 additions & 3 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,9 @@ kinesis {

# Sets the threading model that the native process will use.
# Enum:
# ThreadingModel.PER_REQUEST: Tells the native process to create a thread for each request.
# ThreadingModel.POOLED: Tells the native process to use a thread pool. The size of the pool can be controlled by ThreadPoolSize
# Default = ThreadingModel.PER_REQUEST
# PER_REQUEST: Tells the native process to create a thread for each request.
# POOLED: Tells the native process to use a thread pool. The size of the pool can be controlled by ThreadPoolSize
# Default = PER_REQUEST
# ThreadingModel =

# Sets the maximum number of threads that the native process' thread pool will be configured with.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,64 +29,7 @@ import com.weightwatchers.reactive.kinesis.utils.{FutureUtils, TypesafeConfigExt

import scala.concurrent.{ExecutionContextExecutor, Future}

trait KinesisProducer {

/**
* Adds a message to the next batch to be sent to the configured stream.
*
* @return On success: Future{UserRecordResult}
* On failure: Future.failed(...): Any Throwable related to put.
* @see Callee `com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord`
* @see UserRecordResult
* @see KinesisProducerConfiguration#setRecordTtl(long)
* @see UserRecordFailedException
*/
def addUserRecord(event: ProducerEvent)(
implicit ec: ExecutionContextExecutor
): Future[UserRecordResult]

/**
* Get the number of unfinished records currently being processed. The
* records could either be waiting to be sent to the child process, or have
* reached the child process and are being worked on.
*
* <p>
* This is equal to the number of futures returned from [[addUserRecord]]
* that have not finished.
*
* This is useful for applying backpressure and throttling the number of concurrent Futures.
*
* @return The number of unfinished records currently being processed.
*/
def outstandingRecordsCount(): Int

/**
* Firstly, blocks whilst all all records are complete (either succeeding or failing).
*
* <p>
*
* The includes whilst any retries are performed. Depending on
* your configuration of record TTL and request timeout, this can
* potentially take a long time if the library is having trouble delivering
* records to the backend, for example due to network problems.
*
* <p>
*
* Finally the [[KinesisProducer]] is destroyed, preventing further use.
*
* @throws com.amazonaws.services.kinesis.producer.DaemonException if the child process is dead //TODO - handle this better?
* @see [[AWSKinesisProducer]]
*/
def stop(): Unit

/**
* @return true if the [[KinesisProducer]] has been stopped & destroyed.
*/
def destroyed(): Boolean

}

object KinesisProducerKPL extends LazyLogging {
object KinesisProducer extends LazyLogging {

/**
* The config passed is expected to contain the AWS KPL properties at the top level.
Expand All @@ -100,9 +43,11 @@ object KinesisProducerKPL extends LazyLogging {
* @param credentialsProvider A specific CredentialsProvider. The KCL defaults to DefaultAWSCredentialsProviderChain.
* @return an instantiated [[KinesisProducer]]
*/
@deprecated("Use KinesisProducer(producerConf: ProducerConf) instead", "v0.5.7")
def apply(kplConfig: Config,
streamName: String,
credentialsProvider: Option[AWSCredentialsProvider] = None): KinesisProducer = {

import TypesafeConfigExtensions._

// We directly load our properties into the KPL as a Java `Properties` object
Expand All @@ -117,7 +62,32 @@ object KinesisProducerKPL extends LazyLogging {
KinesisProducerConfiguration.fromProperties(kplProps)
credentialsProvider.foreach(kplLibConfiguration.setCredentialsProvider)

new KinesisProducerKPL(new AWSKinesisProducer(kplLibConfiguration), streamName)
new KinesisProducer(new AWSKinesisProducer(kplLibConfiguration), streamName)
}

/**
* The config passed is expected to contain the AWS KPL properties at the top level.
*
* @param producerConf An instance of [[ProducerConf]] which contains all required configuration for the KPL.
* @return an instantiated [[KinesisProducer]]
*/
def apply(producerConf: ProducerConf): KinesisProducer = {
apply(producerConf.kplLibConfiguration, producerConf.streamName)
}

/**
* The [[KinesisProducerConfiguration]] argument is passed directly to the KPL library.
* This constructor makes no use of the Typesafe config.
*
* @see `src/it/resources/reference.conf` for a more detailed example.
* @param kplConfig An instance of the underlying [[KinesisProducerConfiguration]] to be passed
* directly to the library.
* @param streamName Th name of the Kinesis stream, which must exist.
* @return an instantiated [[KinesisProducer]]
*/
def apply(kplConfig: KinesisProducerConfiguration, streamName: String): KinesisProducer = {
//TODO add logging
new KinesisProducer(new AWSKinesisProducer(kplConfig), streamName)
}
}

Expand All @@ -126,9 +96,7 @@ object KinesisProducerKPL extends LazyLogging {
*
* To create an instance of this class, we recommend using the apply method to instantiate from config.
*/
class KinesisProducerKPL(kinesis: AWSKinesisProducer, streamName: String)
extends LazyLogging
with KinesisProducer {
class KinesisProducer(kinesis: AWSKinesisProducer, streamName: String) extends LazyLogging {

val underlying = kinesis
private var _destroyed = false
Expand All @@ -137,9 +105,16 @@ class KinesisProducerKPL(kinesis: AWSKinesisProducer, streamName: String)
//TODO seems difficult to get access to stream specific operations from producer

/**
* @see [[KinesisProducer]].addUserRecord
* Adds a message to the next batch to be sent to the configured stream.
*
* @return On success: Future{UserRecordResult}
* On failure: Future.failed(...): Any Throwable related to put.
* @see Callee `com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord`
* @see UserRecordResult
* @see KinesisProducerConfiguration#setRecordTtl(long)
* @see UserRecordFailedException
*/
override def addUserRecord(
def addUserRecord(
event: ProducerEvent
)(implicit ec: ExecutionContextExecutor): Future[UserRecordResult] = {
assert(!_destroyed, "Kinesis has been destroyed, no longer accepting messages") //TODO specific exception?
Expand All @@ -148,24 +123,47 @@ class KinesisProducerKPL(kinesis: AWSKinesisProducer, streamName: String)
}

/**
* @see [[KinesisProducer]].outstandingRecordsCount()
* Get the number of unfinished records currently being processed. The
* records could either be waiting to be sent to the child process, or have
* reached the child process and are being worked on.
*
* <p>
* This is equal to the number of futures returned from [[addUserRecord]]
* that have not finished.
*
* This is useful for applying backpressure and throttling the number of concurrent Futures.
*
* @return The number of unfinished records currently being processed.
*/
override def outstandingRecordsCount(): Int = {
def outstandingRecordsCount(): Int = {
kinesis.getOutstandingRecordsCount
}

/**
* @see [[KinesisProducer]].stop()sbt publish
* Firstly, blocks whilst all all records are complete (either succeeding or failing).
*
* <p>
*
* The includes whilst any retries are performed. Depending on
* your configuration of record TTL and request timeout, this can
* potentially take a long time if the library is having trouble delivering
* records to the backend, for example due to network problems.
*
* <p>
*
* Finally the [[KinesisProducer]] is destroyed, preventing further use.
*
* @throws com.amazonaws.services.kinesis.producer.DaemonException if the child process is dead //TODO - handle this better?
* @see [[AWSKinesisProducer]]
*/
override def stop(): Unit = {
def stop(): Unit = {
kinesis.flushSync() //This blocks until all records are flushed
kinesis.destroy()
_destroyed = true
}

/**
* @see [[KinesisProducer]]destroyed()
* @return true if the [[KinesisProducer]] has been stopped & destroyed.
*/
override def destroyed(): Boolean = _destroyed
def destroyed(): Boolean = _destroyed
}
Loading