Skip to content

Commit

Permalink
Pekko migration (#17)
Browse files Browse the repository at this point in the history
* first migration

* temp fix in build.sbt

* migrated pekko kafka

* package folders adjusted for core module (main)

* package folders adjusted for tests/test

* package folders adjusted for tests/it

* package folders adjusted for tests/main

* package folders adjusted for testkit (main)

* package folders adjusted for benchmarks (it)

* package folders adjusted for benchmarks (main)

* package folders adjusted for cluster-sharding (main)

* package folders adjusted for testkit java classes

* Some more akka import adjustments in scala classes

* Some more akka import adjustments in java classes

* - Finally, replaced akka word with org.apache.pekko everywhere in .scala files
- Added temporary copy & paste class from alpakka (till migration completes)

* Finally, replaced akka word with org.apache.pekko everywhere in .java files

* adjusted logback as well

* PR comments

* SCM and contributor data adjusted

* formatted files

* fixed the documentation build

* fixed pekko discovery check

* Fixed another unit test

* one more test fix

* Renamed package names in logback

* Fixed all unit tests (build should be green)

* Renamed Alpakka... -> PekkoConnectors...

* Renamed Alpakka... -> PekkoConnectors...

* Removed unnecessary TODO

* Removed dependency on lightbend stream-alpakka-csv

* - Using package configuration mapping instead of default scaladoc.base_url configuration
- Adjusted pekko extref links

* All doc links in build.sbt are adjusted
  • Loading branch information
Seetaramayya authored Feb 5, 2023
1 parent 52b0a4e commit a98a7bd
Show file tree
Hide file tree
Showing 203 changed files with 2,144 additions and 1,961 deletions.
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ optIn.configStyleArguments = false
danglingParentheses.preset = false
spaces.inImportCurlyBraces = true
rewrite.neverInfix.excludeFilters = [
at
and
min
max
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase.{ topic_1000_100, topic_1000_5000, topic_1000_5000_8 }
import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.BenchmarksBase.{ topic_1000_100, topic_1000_5000, topic_1000_5000_8 }
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

class ApacheKafkaBatchedConsumer extends BenchmarksBase() {
it should "bench with small messages" in {
Expand Down Expand Up @@ -36,7 +36,7 @@ class ApacheKafkaBatchedConsumer extends BenchmarksBase() {
}
}

class AlpakkaKafkaBatchedConsumer extends BenchmarksBase() {
class PekkoConnectorsKafkaBatchedConsumer extends BenchmarksBase() {

it should "bench with small messages" in {
val cmd = RunTestCommand("alpakka-kafka-batched-consumer", bootstrapServers, topic_1000_100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase._
import akka.kafka.benchmarks.InflightMetrics._
import akka.kafka.benchmarks.PerfFixtureHelpers.FilledTopic
import akka.kafka.benchmarks.Timed.{ runPerfTest, runPerfTestInflightMetrics }
import akka.kafka.benchmarks.app.RunTestCommand
import akka.kafka.testkit.KafkaTestkitTestcontainersSettings
import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike
package org.apache.pekko.kafka.benchmarks

import org.apache.pekko.kafka.benchmarks.BenchmarksBase._
import org.apache.pekko.kafka.benchmarks.InflightMetrics._
import org.apache.pekko.kafka.benchmarks.PerfFixtureHelpers.FilledTopic
import org.apache.pekko.kafka.benchmarks.Timed.{ runPerfTest, runPerfTestInflightMetrics }
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.testkit.KafkaTestkitTestcontainersSettings
import org.apache.pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike
import com.typesafe.config.Config

object BenchmarksBase {
Expand Down Expand Up @@ -61,7 +61,7 @@ class ApacheKafkaConsumerNokafka extends BenchmarksBase() {
}
}

class AlpakkaKafkaConsumerNokafka extends BenchmarksBase() {
class PekkoConnectorsKafkaConsumerNokafka extends BenchmarksBase() {
it should "bench" in {
val cmd = RunTestCommand("alpakka-kafka-plain-consumer-nokafka", bootstrapServers, topic_2000_100)
runPerfTest(cmd,
Expand All @@ -83,7 +83,7 @@ class ApacheKafkaPlainConsumer extends BenchmarksBase() {
}
}

class AlpakkaKafkaPlainConsumer extends BenchmarksBase() {
class PekkoConnectorsKafkaPlainConsumer extends BenchmarksBase() {
it should "bench" in {
val cmd = RunTestCommand("alpakka-kafka-plain-consumer", bootstrapServers, topic_2000_100)
runPerfTest(cmd, ReactiveKafkaConsumerFixtures.plainSources(cmd), ReactiveKafkaConsumerBenchmarks.consumePlain)
Expand Down Expand Up @@ -127,7 +127,7 @@ class ApacheKafkaAtMostOnceConsumer extends BenchmarksBase() {
}
}

class AlpakkaKafkaAtMostOnceConsumer extends BenchmarksBase() {
class PekkoConnectorsKafkaAtMostOnceConsumer extends BenchmarksBase() {
it should "bench" in {
val cmd = RunTestCommand("alpakka-kafka-at-most-once-consumer", bootstrapServers, topic_50_100)
runPerfTest(cmd,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

import BenchmarksBase._

Expand Down Expand Up @@ -38,7 +38,7 @@ class RawKafkaCommitEveryPollConsumer extends BenchmarksBase() {
// }
}

class AlpakkaCommitAndForgetConsumer extends BenchmarksBase() {
class PekkoConnectorsCommitAndForgetConsumer extends BenchmarksBase() {
val prefix = "alpakka-kafka-commit-and-forget-"

it should "bench with small messages" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,45 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase.{ topic_100_100, topic_100_5000 }
import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.BenchmarksBase.{ topic_100_100, topic_100_5000 }
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

/**
* Compares the `CommittingProducerSinkStage` with the composed implementation of `Producer.flexiFlow` and `Committer.sink`.
*/
class AlpakkaCommittableProducer extends BenchmarksBase() {
class PekkoConnectorsCommittableProducer extends BenchmarksBase() {
it should "bench composed sink with 100b messages" in {
val cmd = RunTestCommand("alpakka-committable-producer-composed", bootstrapServers, topic_100_100)
runPerfTest(
cmd,
AlpakkaCommittableSinkFixtures.composedSink(cmd),
AlpakkaCommittableSinkBenchmarks.run)
PekkoConnectorsCommittableSinkFixtures.composedSink(cmd),
PekkoConnectorsCommittableSinkBenchmarks.run)
}

it should "bench composed sink with 5000b messages" in {
val cmd = RunTestCommand("alpakka-committable-producer-composed-5000b", bootstrapServers, topic_100_5000)
runPerfTest(
cmd,
AlpakkaCommittableSinkFixtures.composedSink(cmd),
AlpakkaCommittableSinkBenchmarks.run)
PekkoConnectorsCommittableSinkFixtures.composedSink(cmd),
PekkoConnectorsCommittableSinkBenchmarks.run)
}

it should "bench `Producer.committableSink` with 100b messages" in {
val cmd = RunTestCommand("alpakka-committable-producer", bootstrapServers, topic_100_100)
runPerfTest(
cmd,
AlpakkaCommittableSinkFixtures.producerSink(cmd),
AlpakkaCommittableSinkBenchmarks.run)
PekkoConnectorsCommittableSinkFixtures.producerSink(cmd),
PekkoConnectorsCommittableSinkBenchmarks.run)
}

it should "bench `Producer.committableSink` with 5000b messages" in {
val cmd = RunTestCommand("alpakka-committable-producer-5000b", bootstrapServers, topic_100_5000)
runPerfTest(
cmd,
AlpakkaCommittableSinkFixtures.producerSink(cmd),
AlpakkaCommittableSinkBenchmarks.run)
PekkoConnectorsCommittableSinkFixtures.producerSink(cmd),
PekkoConnectorsCommittableSinkBenchmarks.run)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase.{ topic_2000_100, topic_2000_500, topic_2000_5000, topic_2000_5000_8 }
import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.BenchmarksBase.{
topic_2000_100,
topic_2000_500,
topic_2000_5000,
topic_2000_5000_8
}
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

class ApacheKafkaPlainProducer extends BenchmarksBase() {
private val prefix = "apache-kafka-plain-producer"
Expand All @@ -34,7 +39,7 @@ class ApacheKafkaPlainProducer extends BenchmarksBase() {
}
}

class AlpakkaKafkaPlainProducer extends BenchmarksBase() {
class PekkoConnectorsKafkaPlainProducer extends BenchmarksBase() {
private val prefix = "alpakka-kafka-plain-producer"

it should "bench with small messages" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
import org.apache.pekko.kafka.testkit.scaladsl.ScalatestKafkaSpec
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase.{ topic_100_100, topic_100_5000 }
import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.BenchmarksBase.{ topic_100_100, topic_100_5000 }
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand
import scala.concurrent.duration._

class ApacheKafkaTransactions extends BenchmarksBase() {
Expand All @@ -26,7 +26,7 @@ class ApacheKafkaTransactions extends BenchmarksBase() {
}
}

class AlpakkaKafkaTransactions extends BenchmarksBase() {
class PekkoConnectorsKafkaTransactions extends BenchmarksBase() {
it should "bench with small messages" in {
val cmd = RunTestCommand("alpakka-kafka-transactions", bootstrapServers, topic_100_100)
runPerfTest(
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
</appender>

<logger name="org.apache" level="WARN"/>
<logger name="akka" level="WARN"/>
<logger name="akka.kafka.benchmarks" level="INFO"/>
<logger name="org.apache.pekko" level="WARN"/>
<logger name="org.apache.pekko.kafka.benchmarks" level="INFO"/>
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="ERROR"/>
<logger name="org.apache.kafka.clients.NetworkClient" level="ERROR"/>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.kafka.benchmarks

import org.apache.pekko.util.ByteString

import java.nio.charset.{ Charset, StandardCharsets }
import scala.collection.immutable

private[benchmarks] sealed trait CsvQuotingStyle

object CsvQuotingStyle {

/** Quote only fields requiring quotes */
case object Required extends CsvQuotingStyle

/** Quote all fields */
case object Always extends CsvQuotingStyle
}

// TODO: This needs to be deleted after migrating alpakka to pekko.
// This is just temporary base to see everything compiles and tests will pass without any issue
private[benchmarks] class CsvFormatter(delimiter: Char,
quoteChar: Char,
escapeChar: Char,
endOfLine: String,
quotingStyle: CsvQuotingStyle,
charset: Charset = StandardCharsets.UTF_8) {

private[this] val charsetName = charset.name()

private[this] val delimiterBs = ByteString(String.valueOf(delimiter), charsetName)
private[this] val quoteBs = ByteString(String.valueOf(quoteChar), charsetName)
private[this] val duplicatedQuote = ByteString(String.valueOf(Array(quoteChar, quoteChar)), charsetName)
private[this] val duplicatedEscape = ByteString(String.valueOf(Array(escapeChar, escapeChar)), charsetName)
private[this] val endOfLineBs = ByteString(endOfLine, charsetName)

def toCsv(fields: immutable.Iterable[Any]): ByteString =
if (fields.nonEmpty) nonEmptyToCsv(fields)
else endOfLineBs

private def nonEmptyToCsv(fields: immutable.Iterable[Any]) = {
val builder = ByteString.createBuilder

def splitAndDuplicateQuotesAndEscapes(field: String, splitAt: Int) = {

@inline def indexOfQuoteOrEscape(lastIndex: Int) = {
var index = lastIndex
var found = -1
while (index < field.length && found == -1) {
val char = field(index)
if (char == quoteChar || char == escapeChar) found = index
index += 1
}
found
}

var lastIndex = 0
var index = splitAt
while (index > -1) {
builder ++= ByteString.apply(field.substring(lastIndex, index), charsetName)
val char = field.charAt(index)
if (char == quoteChar) {
builder ++= duplicatedQuote
} else {
builder ++= duplicatedEscape
}
lastIndex = index + 1
index = indexOfQuoteOrEscape(lastIndex)
}
if (lastIndex < field.length) {
builder ++= ByteString(field.substring(lastIndex), charsetName)
}
}

def append(field: String) = {
val (quoteIt, splitAt) = requiresQuotesOrSplit(field)
if (quoteIt || quotingStyle == CsvQuotingStyle.Always) {
builder ++= quoteBs
if (splitAt != -1) {
splitAndDuplicateQuotesAndEscapes(field, splitAt)
} else {
builder ++= ByteString(field, charsetName)
}
builder ++= quoteBs
} else {
builder ++= ByteString(field, charsetName)
}
}

val iterator = fields.iterator
var hasNext = iterator.hasNext
while (hasNext) {
val next = iterator.next()
if (next != null) {
append(next.toString)
}
hasNext = iterator.hasNext
if (hasNext) {
builder ++= delimiterBs
}
}
builder ++= endOfLineBs
builder.result()
}

private def requiresQuotesOrSplit(field: String): (Boolean, Int) = {
var quotes = CsvQuotingStyle.Always == quotingStyle
var split = -1
var index = 0
while (index < field.length && !(quotes && split != -1)) {
val char = field(index)
if (char == `quoteChar` || char == `escapeChar`) {
quotes = true
split = index
} else if (char == '\r' || char == '\n' || char == `delimiter`) {
quotes = true
}
index += 1
}
(quotes, split)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

case class FixtureGen[F](command: RunTestCommand, generate: Int => F)
Loading

0 comments on commit a98a7bd

Please sign in to comment.