Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pekko migration #17

Merged
merged 32 commits into from
Feb 5, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b9734a4
first migration
Seetaramayya Feb 1, 2023
60af2ba
temp fix in build.sbt
Seetaramayya Feb 1, 2023
2145a27
migrated pekko kafka
Seetaramayya Feb 1, 2023
7ac492e
package folders adjusted for core module (main)
Seetaramayya Feb 1, 2023
896e10b
package folders adjusted for tests/test
Seetaramayya Feb 1, 2023
362a76d
package folders adjusted for tests/it
Seetaramayya Feb 1, 2023
19d04ff
package folders adjusted for tests/main
Seetaramayya Feb 1, 2023
967910c
package folders adjusted for testkit (main)
Seetaramayya Feb 1, 2023
9cb099a
package folders adjusted for benchmarks (it)
Seetaramayya Feb 1, 2023
cf331f4
package folders adjusted for benchmarks (main)
Seetaramayya Feb 1, 2023
bb70254
package folders adjusted for cluster-sharding (main)
Seetaramayya Feb 1, 2023
4886eca
package folders adjusted for testkit java classes
Seetaramayya Feb 1, 2023
0e504bb
Some more akka import adjustments in scala classes
Seetaramayya Feb 1, 2023
18e4355
Some more akka import adjustments in java classes
Seetaramayya Feb 1, 2023
a6d98ad
- Finally, replaced akka word with org.apache.pekko everywhere in .sc…
Seetaramayya Feb 1, 2023
1c01fcb
Finally, replaced akka word with org.apache.pekko everywhere in .java…
Seetaramayya Feb 1, 2023
06c30e0
adjusted logback as well
Seetaramayya Feb 1, 2023
f865a74
PR comments
Seetaramayya Feb 1, 2023
e6d23a8
SCM and contributor data adjusted
Seetaramayya Feb 2, 2023
9d71592
formatted files
Seetaramayya Feb 2, 2023
f834557
fixed the documentation build
Seetaramayya Feb 4, 2023
a5a58e3
fixed pekko discovery check
Seetaramayya Feb 4, 2023
30d9337
Fixed another unit test
Seetaramayya Feb 4, 2023
db626e7
one more test fix
Seetaramayya Feb 4, 2023
e641032
Renamed package names in logback
Seetaramayya Feb 4, 2023
f2780a8
Fixed all unit tests (build should be green)
Seetaramayya Feb 4, 2023
8345031
Renamed Alpakka... -> PekkoConnectors...
Seetaramayya Feb 4, 2023
82d4b75
Renamed Alpakka... -> PekkoConnectors...
Seetaramayya Feb 4, 2023
4bbd401
Removed unnecessary TODO
Seetaramayya Feb 4, 2023
107a927
Removed dependency on lightbend stream-alpakka-csv
Seetaramayya Feb 4, 2023
4bfdac4
- Using package configuration mapping instead of default scaladoc.bas…
Seetaramayya Feb 5, 2023
68bdfe4
All doc links in build.sbt are adjusted
Seetaramayya Feb 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ optIn.configStyleArguments = false
danglingParentheses.preset = false
spaces.inImportCurlyBraces = true
rewrite.neverInfix.excludeFilters = [
at
and
min
max
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
Seetaramayya marked this conversation as resolved.
Show resolved Hide resolved
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase.{ topic_100_100, topic_100_5000 }
import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.BenchmarksBase.{ topic_100_100, topic_100_5000 }
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

/**
* Compares the `CommittingProducerSinkStage` with the composed implementation of `Producer.flexiFlow` and `Committer.sink`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase.{ topic_1000_100, topic_1000_5000, topic_1000_5000_8 }
import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.BenchmarksBase.{ topic_1000_100, topic_1000_5000, topic_1000_5000_8 }
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

class ApacheKafkaBatchedConsumer extends BenchmarksBase() {
it should "bench with small messages" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase._
import akka.kafka.benchmarks.InflightMetrics._
import akka.kafka.benchmarks.PerfFixtureHelpers.FilledTopic
import akka.kafka.benchmarks.Timed.{ runPerfTest, runPerfTestInflightMetrics }
import akka.kafka.benchmarks.app.RunTestCommand
import akka.kafka.testkit.KafkaTestkitTestcontainersSettings
import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike
package org.apache.pekko.kafka.benchmarks

import org.apache.pekko.kafka.benchmarks.BenchmarksBase._
import org.apache.pekko.kafka.benchmarks.InflightMetrics._
import org.apache.pekko.kafka.benchmarks.PerfFixtureHelpers.FilledTopic
import org.apache.pekko.kafka.benchmarks.Timed.{ runPerfTest, runPerfTestInflightMetrics }
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.testkit.KafkaTestkitTestcontainersSettings
import org.apache.pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike
import com.typesafe.config.Config

object BenchmarksBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

import BenchmarksBase._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase.{ topic_2000_100, topic_2000_500, topic_2000_5000, topic_2000_5000_8 }
import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.BenchmarksBase.{
topic_2000_100,
topic_2000_500,
topic_2000_5000,
topic_2000_5000_8
}
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

class ApacheKafkaPlainProducer extends BenchmarksBase() {
private val prefix = "apache-kafka-plain-producer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
import org.apache.pekko.kafka.testkit.scaladsl.ScalatestKafkaSpec
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase.{ topic_100_100, topic_100_5000 }
import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.BenchmarksBase.{ topic_100_100, topic_100_5000 }
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand
import scala.concurrent.duration._

class ApacheKafkaTransactions extends BenchmarksBase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{ Committable, CommittableMessage }
import akka.kafka.ProducerMessage.Envelope
import akka.kafka.benchmarks.app.RunTestCommand
import akka.kafka.scaladsl.Consumer.{ Control, DrainingControl }
import akka.kafka.scaladsl.{ Committer, Consumer, Producer }
import akka.kafka._
import akka.stream.Materializer
import akka.stream.scaladsl.{ Keep, Sink, Source }
package org.apache.pekko.kafka.benchmarks

import org.apache.pekko.Done
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.kafka.ConsumerMessage.{ Committable, CommittableMessage }
import org.apache.pekko.kafka.ProducerMessage.Envelope
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.scaladsl.Consumer.{ Control, DrainingControl }
import org.apache.pekko.kafka.scaladsl.{ Committer, Consumer, Producer }
import org.apache.pekko.kafka._
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{ Keep, Sink, Source }
import com.codahale.metrics.Meter
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer.ConsumerConfig
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.kafka.benchmarks

import akka.stream.alpakka.csv.scaladsl.CsvQuotingStyle
import org.apache.pekko.util.ByteString

import java.nio.charset.{ Charset, StandardCharsets }
import scala.collection.immutable

// TODO: This needs to be deleted after migrating alpakka to pekko.
pjfanning marked this conversation as resolved.
Show resolved Hide resolved
// This is just temporary base to see everything compiles and tests will pass without any issue
private[benchmarks] class CsvFormatter(delimiter: Char,
quoteChar: Char,
escapeChar: Char,
endOfLine: String,
quotingStyle: CsvQuotingStyle,
charset: Charset = StandardCharsets.UTF_8) {

private[this] val charsetName = charset.name()

private[this] val delimiterBs = ByteString(String.valueOf(delimiter), charsetName)
private[this] val quoteBs = ByteString(String.valueOf(quoteChar), charsetName)
private[this] val duplicatedQuote = ByteString(String.valueOf(Array(quoteChar, quoteChar)), charsetName)
private[this] val duplicatedEscape = ByteString(String.valueOf(Array(escapeChar, escapeChar)), charsetName)
private[this] val endOfLineBs = ByteString(endOfLine, charsetName)

def toCsv(fields: immutable.Iterable[Any]): ByteString =
if (fields.nonEmpty) nonEmptyToCsv(fields)
else endOfLineBs

private def nonEmptyToCsv(fields: immutable.Iterable[Any]) = {
val builder = ByteString.createBuilder

def splitAndDuplicateQuotesAndEscapes(field: String, splitAt: Int) = {

@inline def indexOfQuoteOrEscape(lastIndex: Int) = {
var index = lastIndex
var found = -1
while (index < field.length && found == -1) {
val char = field(index)
if (char == quoteChar || char == escapeChar) found = index
index += 1
}
found
}

var lastIndex = 0
var index = splitAt
while (index > -1) {
builder ++= ByteString.apply(field.substring(lastIndex, index), charsetName)
val char = field.charAt(index)
if (char == quoteChar) {
builder ++= duplicatedQuote
} else {
builder ++= duplicatedEscape
}
lastIndex = index + 1
index = indexOfQuoteOrEscape(lastIndex)
}
if (lastIndex < field.length) {
builder ++= ByteString(field.substring(lastIndex), charsetName)
}
}

def append(field: String) = {
val (quoteIt, splitAt) = requiresQuotesOrSplit(field)
if (quoteIt || quotingStyle == CsvQuotingStyle.Always) {
builder ++= quoteBs
if (splitAt != -1) {
splitAndDuplicateQuotesAndEscapes(field, splitAt)
} else {
builder ++= ByteString(field, charsetName)
}
builder ++= quoteBs
} else {
builder ++= ByteString(field, charsetName)
}
}

val iterator = fields.iterator
var hasNext = iterator.hasNext
while (hasNext) {
val next = iterator.next()
if (next != null) {
append(next.toString)
}
hasNext = iterator.hasNext
if (hasNext) {
builder ++= delimiterBs
}
}
builder ++= endOfLineBs
builder.result()
}

private def requiresQuotesOrSplit(field: String): (Boolean, Int) = {
var quotes = CsvQuotingStyle.Always == quotingStyle
var split = -1
var index = 0
while (index < field.length && !(quotes && split != -1)) {
val char = field(index)
if (char == `quoteChar` || char == `escapeChar`) {
quotes = true
split = index
} else if (char == '\r' || char == '\n' || char == `delimiter`) {
quotes = true
}
index += 1
}
(quotes, split)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

case class FixtureGen[F](command: RunTestCommand, generate: Int => F)
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import java.lang.management.{ BufferPoolMXBean, ManagementFactory, MemoryType }

import akka.NotUsed
import akka.actor.Cancellable
import akka.kafka.scaladsl.Consumer.Control
import akka.stream.Materializer
import akka.stream.scaladsl.{ Keep, Sink, Source }
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.Cancellable
import org.apache.pekko.kafka.scaladsl.Consumer.Control
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{ Keep, Sink, Source }
import com.codahale.metrics.{ Histogram, MetricRegistry }
import javax.management.remote.{ JMXConnectorFactory, JMXServiceURL }
import javax.management.{ Attribute, MBeanServerConnection, ObjectName }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import java.time.Duration
import java.util
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand
import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer }
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, StringDeserializer }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import com.codahale.metrics.Meter
import com.typesafe.scalalogging.LazyLogging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.PerfFixtureHelpers.FilledTopic
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.PerfFixtureHelpers.FilledTopic
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand
import org.apache.kafka.clients.producer.KafkaProducer

case class KafkaProducerTestFixture(topic: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.KafkaConsumerBenchmarks.pollTimeoutMs
import org.apache.pekko.kafka.benchmarks.KafkaConsumerBenchmarks.pollTimeoutMs
import com.codahale.metrics.Meter
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import java.util.Locale

import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand
import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer }
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig }
import org.apache.kafka.common.IsolationLevel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import java.time.Duration
import java.util
Expand Down
Loading