Skip to content

Commit

Permalink
Remove unchunking evalScan and apply logging to chunks for performance
Browse files Browse the repository at this point in the history
  • Loading branch information
bcarter97 committed Feb 19, 2024
1 parent 4de6a8d commit ad4363f
Showing 1 changed file with 33 additions and 13 deletions.
46 changes: 33 additions & 13 deletions src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ object TopicLoader extends TopicLoader {
private[topicloader] given [K, V]: Show[ConsumerRecord[K, V]] =
Show.show(cr => s"${cr.topic}-${cr.partition}")

private case class PartitionLast(topicPartition: TopicPartition, offset: Long)

private case class HighestOffsetsWithRecord[K, V](
partitionOffsets: Map[TopicPartition, Long],
consumerRecord: Option[ConsumerRecord[K, V]] = none[ConsumerRecord[K, V]]
consumerRecord: Option[ConsumerRecord[K, V]] = none[ConsumerRecord[K, V]],
partitionLast: Option[PartitionLast] = none[PartitionLast]
)

private object WithRecord {
Expand Down Expand Up @@ -97,15 +100,21 @@ trait TopicLoader {

private def filterBelowHighestOffset[F[_] : Monad : LoggerFactory, K, V](
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
): Pipe[F, ConsumerRecord[K, V], ConsumerRecord[K, V]] = {
): Pipe[F, ConsumerRecord[K, V], ConsumerRecord[K, V]] = stream => {
val logger = LoggerFactory[F].getLogger

val nonEmptyOffsets =
logOffsets.toSortedMap.filter((_, o) => o.highest > o.lowest)

val allHighestOffsets: HighestOffsetsWithRecord[K, V] =
HighestOffsetsWithRecord[K, V](nonEmptyOffsets.map((p, o) => p -> (o.highest - 1)))

_.evalScan(allHighestOffsets)(emitRecordRemovingConsumedPartition[F, K, V])
stream
.scan(allHighestOffsets)(emitRecordRemovingConsumedPartition[K, V])
.takeWhile(_.partitionOffsets.nonEmpty, takeFailure = true)
.evalTapChunk(_.partitionLast.traverse { last =>
logger.warn(s"Finished loading data from ${last.topicPartition.show} at offset ${last.offset}")
})
.collect { case WithRecord(r) => r }
}

Expand Down Expand Up @@ -148,23 +157,34 @@ trait TopicLoader {
partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor)
} yield partitionInfo.map(pi => TopicPartition(pi.topic, pi.partition)).toSet

private def emitRecordRemovingConsumedPartition[F[_] : Monad : LoggerFactory, K, V](
private def emitRecordRemovingConsumedPartition[K, V](
t: HighestOffsetsWithRecord[K, V],
r: ConsumerRecord[K, V]
): F[HighestOffsetsWithRecord[K, V]] = {
val logger = LoggerFactory[F].getLogger

): HighestOffsetsWithRecord[K, V] = {
val partitionHighest: Option[Long] = t.partitionOffsets.get(TopicPartition(r.topic, r.partition))

val reachedHighest: OptionT[F, TopicPartition] = for {
offset <- OptionT.fromOption[F](partitionHighest)
highest <- OptionT.fromOption[F](if (r.offset >= offset) TopicPartition(r.topic, r.partition).some else None)
_ <- OptionT.liftF(logger.warn(s"Finished loading data from ${r.show} at offset ${r.offset}"))
val reachedHighest: Option[TopicPartition] = for {
offset <- partitionHighest
highest <- Option.when(r.offset >= offset)(TopicPartition(r.topic, r.partition))
} yield highest

val updatedHighests = reachedHighest.fold(t.partitionOffsets)(highest => t.partitionOffsets - highest)
val emittableRecord = partitionHighest.collect { case h if r.offset <= h => r }
updatedHighests.map(HighestOffsetsWithRecord(_, emittableRecord))

reachedHighest match {
case Some(highest) =>
HighestOffsetsWithRecord(
partitionOffsets = t.partitionOffsets - highest,
consumerRecord = emittableRecord,
partitionLast = PartitionLast(highest, r.offset).some
)

case None =>
HighestOffsetsWithRecord(
partitionOffsets = t.partitionOffsets,
consumerRecord = emittableRecord,
partitionLast = none[PartitionLast]
)
}
}

}

0 comments on commit ad4363f

Please sign in to comment.