Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Remove deprecated code
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.ConsumerMessage#Committable.commitScaladsl")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.ConsumerMessage#Committable.commitJavadsl")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.Metadata.createGetCommittedOffset")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.kafka.Metadata$CommittedOffset")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.kafka.Metadata$CommittedOffset$")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.kafka.Metadata$GetCommittedOffset")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.kafka.Metadata$GetCommittedOffset$")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.ProducerSettings.producerFactory")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.RestrictedConsumer.committed")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.MetadataClient.getCommittedOffset")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.Producer.flowWithContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.Producer.flexiFlow")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.Producer.flow")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.Producer.committableSink")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.Producer.plainSink")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.SendProducer.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.MetadataClient.getCommittedOffset")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.Producer.flowWithContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.Producer.flexiFlow")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.Producer.flow")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.Producer.committableSink")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.Producer.plainSink")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.SendProducer.apply")
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ object ConsumerMessage {
* or a number of offsets aggregated as [[CommittableOffsetBatch]].
*/
@DoNotInherit trait Committable {
@deprecated("use `Committer.flow` or `Committer.sink` instead of direct usage", "Alpakka Kafka 2.0.0")
def commitScaladsl(): Future[Done]

/**
* @deprecated use `Committer.flow` or `Committer.sink` instead of direct usage, since Alpakka Kafka 2.0.0
*/
@java.lang.Deprecated
@deprecated("use `Committer.flow` or `Committer.sink` instead of direct usage", "Alpakka Kafka 2.0.0")
def commitJavadsl(): CompletionStage[Done]

@InternalApi
private[kafka] def commitInternal(): Future[Done]
Expand Down
24 changes: 0 additions & 24 deletions core/src/main/scala/org/apache/pekko/kafka/Metadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,30 +176,6 @@ object Metadata {
def createGetOffsetForTimes(timestampsToSearch: java.util.Map[TopicPartition, java.lang.Long]): GetOffsetsForTimes =
GetOffsetsForTimes(timestampsToSearch.asScala.view.mapValues(_.toLong).toMap)

/**
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
*/
@deprecated("use `GetCommittedOffsets`", "Alpakka Kafka 2.0.3")
final case class GetCommittedOffset(partition: TopicPartition) extends Request with NoSerializationVerificationNeeded

@deprecated("use `CommittedOffsets`", "Alpakka Kafka 2.0.3")
final case class CommittedOffset(response: Try[OffsetAndMetadata], requestedPartition: TopicPartition)
extends Response
with NoSerializationVerificationNeeded {

/**
* Java API
*/
def getResponse: Optional[OffsetAndMetadata] = Optional.ofNullable(response.toOption.orNull)
}

/**
* Java API:
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
*/
@deprecated("use `createGetCommittedOffsets`", "Alpakka Kafka 2.0.3")
def createGetCommittedOffset(partition: TopicPartition): GetCommittedOffset = GetCommittedOffset(partition)

/**
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,6 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
val enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]],
val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]]) {

@deprecated(
"Use createKafkaProducer(), createKafkaProducerAsync(), or createKafkaProducerCompletionStage() to get a new KafkaProducer",
"Alpakka Kafka 2.0.0")
def producerFactory: ProducerSettings[K, V] => Producer[K, V] = _ => createKafkaProducer()

/**
* An id string to pass to the server when making requests. The purpose of this is to be able to track the source
* of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ final class RestrictedConsumer(consumer: Consumer[_, _], duration: java.time.Dur
def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit =
consumer.commitSync(offsets, duration)

/**
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#committed(TopicPartition,java.time.Duration)]]
*/
@deprecated("use `committed(java.util.Set[TopicPartition])`", "Alpakka Kafka 2.0.5")
def committed(tp: TopicPartition): OffsetAndMetadata = consumer.committed(tp, duration)

/**
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#committed(java.util.Set[TopicPartition],java.time.Duration)]]
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ sealed trait Subscription {
*/
sealed trait ManualSubscription extends Subscription {

/** @deprecated Manual subscriptions do never rebalance, since Alpakka Kafka 1.0-RC1 */
@deprecated("Manual subscription does never rebalance", "Alpakka Kafka 1.0-RC1")
/** @deprecated Manual subscriptions never rebalances, since Alpakka Kafka 1.0-RC1 */
@deprecated("Manual subscription never rebalances", "Alpakka Kafka 1.0-RC1")
def rebalanceListener: Option[ActorRef] = None

/** @deprecated Manual subscriptions do never rebalance, since Alpakka Kafka 1.0-RC1 */
@deprecated("Manual subscription does never rebalance", "Alpakka Kafka 1.0-RC1")
/** @deprecated Manual subscriptions never rebalances, since Alpakka Kafka 1.0-RC1 */
@deprecated("Manual subscription never rebalances", "Alpakka Kafka 1.0-RC1")
def withRebalanceListener(ref: ActorRef): ManualSubscription
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,15 +713,6 @@ import scala.util.control.NonFatal
.filterNot(_._2 == null)
.toMap
})

case req: Metadata.GetCommittedOffset @nowarn("cat=deprecation") =>
@nowarn("cat=deprecation") val resp = Metadata.CommittedOffset(
Try {
@nowarn("cat=deprecation") val offset = consumer.committed(req.partition, settings.getMetadataRequestTimeout)
offset
},
req.partition)
resp
}

private def stopFromMessage(msg: StopLike) = msg match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ private[kafka] trait OffsetContextBuilder[K, V]
override val partitionOffset: ConsumerMessage.PartitionOffset,
override val metadata: String)(
val committer: KafkaAsyncConsumerCommitterRef) extends CommittableOffsetMetadata {
override def commitScaladsl(): Future[Done] = commitInternal()
override def commitJavadsl(): CompletionStage[Done] = commitInternal().asJava
override def commitInternal(): Future[Done] = KafkaAsyncConsumerCommitterRef.commit(this)
override val batchSize: Long = 1
}
Expand Down Expand Up @@ -223,8 +221,6 @@ private[kafka] final class CommittableOffsetBatchImpl(
override def toString: String =
s"CommittableOffsetBatch(batchSize=$batchSize, ${offsets.mkString(", ")})"

override def commitScaladsl(): Future[Done] = commitInternal()

override def commitInternal(): Future[Done] = KafkaAsyncConsumerCommitterRef.commit(this)

override def tellCommit(): CommittableOffsetBatch = tellCommitWithPriority(emergency = false)
Expand All @@ -242,8 +238,6 @@ private[kafka] final class CommittableOffsetBatchImpl(
new CommittableOffsetBatchImpl(newOffsets, newCommitters, newOffsets.size.toLong)
}

override def commitJavadsl(): CompletionStage[Done] = commitInternal().asJava

/**
* @return true if the batch contains no commits.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ class MetadataClient private (metadataClient: pekko.kafka.scaladsl.MetadataClien
}(ExecutionContext.parasitic)
.asJava

@deprecated("use `getCommittedOffsets`", "Alpakka Kafka 2.0.3")
def getCommittedOffset(partition: TopicPartition): CompletionStage[OffsetAndMetadata] =
metadataClient
.getCommittedOffset(partition)
.asJava

def getCommittedOffsets(
partitions: java.util.Set[TopicPartition]): CompletionStage[java.util.Map[TopicPartition, OffsetAndMetadata]] =
metadataClient
Expand Down
169 changes: 0 additions & 169 deletions core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,78 +45,6 @@ object Producer {
.mapMaterializedValue(_.asJava)
.asJava

/**
* Create a sink for publishing records to Kafka topics.
*
* The [[org.apache.kafka.clients.producer.ProducerRecord Kafka ProducerRecord]] contains the topic name to which the record is being sent, an optional
* partition number, and an optional key and value.
*
* Supports sharing a Kafka Producer instance.
*
* @deprecated Pass in external or shared producer using `ProducerSettings.withProducerFactory` or `ProducerSettings.withProducer`, since Alpakka Kafka 2.0.0
*/
@Deprecated
def plainSink[K, V](
settings: ProducerSettings[K, V],
producer: org.apache.kafka.clients.producer.Producer[K, V]): Sink[ProducerRecord[K, V], CompletionStage[Done]] =
plainSink(settings.withProducer(producer))

/**
* Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]]
* from a [[Consumer.committableSource]]. It will commit the consumer offset when the message has
* been published successfully to the topic.
*
* It publishes records to Kafka topics conditionally:
*
* - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and commits the offset
*
* - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and commits the offset
*
* - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, but commits the offset
*
* Note that there is a risk that something fails after publishing but before
* committing, so it is "at-least once delivery" semantics.
*
* @deprecated use `committableSink(ProducerSettings, CommitterSettings)` instead, since Alpakka Kafka 2.0.0
*/
@Deprecated
def committableSink[K, V, IN <: Envelope[K, V, ConsumerMessage.Committable]](
settings: ProducerSettings[K, V]): Sink[IN, CompletionStage[Done]] = {
@nowarn("cat=deprecation")
val sink: Sink[IN, CompletionStage[Done]] = scaladsl.Producer
.committableSink(settings)
.mapMaterializedValue(_.asJava)
.asJava
sink
}

/**
* Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]]
* from a [[Consumer.committableSource]]. It will commit the consumer offset when the message has
* been published successfully to the topic.
*
* It publishes records to Kafka topics conditionally:
*
* - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and commits the offset
*
* - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and commits the offset
*
* - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, but commits the offset
*
* Note that there is always a risk that something fails after publishing but before
* committing, so it is "at-least once delivery" semantics.
*
* Supports sharing a Kafka Producer instance.
*
* @deprecated use `committableSink(ProducerSettings, CommitterSettings)` instead, since Alpakka Kafka 2.0.0
*/
@Deprecated
def committableSink[K, V](
settings: ProducerSettings[K, V],
producer: org.apache.kafka.clients.producer.Producer[K, V])
: Sink[Envelope[K, V, ConsumerMessage.Committable], CompletionStage[Done]] =
committableSink(settings.withProducer(producer))

/**
* Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]]
* from a [[Consumer.committableSource]]. The offsets are batched and committed regularly.
Expand Down Expand Up @@ -164,28 +92,6 @@ object Producer {
override def apply(p: japi.Pair[IN, C]) = p.first.withPassThrough(p.second)
})

/**
* Create a flow to publish records to Kafka topics and then pass it on.
*
* The records must be wrapped in a [[pekko.kafka.ProducerMessage.Message Message]] and continue in the stream as [[pekko.kafka.ProducerMessage.Result Result]].
*
* The messages support the possibility to pass through arbitrary data, which can for example be a [[ConsumerMessage.CommittableOffset CommittableOffset]]
* or [[ConsumerMessage.CommittableOffsetBatch CommittableOffsetBatch]] that can
* be committed later in the flow.
*
* @deprecated use `flexiFlow` instead, since 0.21
*/
@Deprecated
def flow[K, V, PassThrough](
settings: ProducerSettings[K, V]): Flow[Message[K, V, PassThrough], Result[K, V, PassThrough], NotUsed] = {
@nowarn("cat=deprecation")
val flow = scaladsl.Producer
.flow(settings)
.asJava
.asInstanceOf[Flow[Message[K, V, PassThrough], Result[K, V, PassThrough], NotUsed]]
flow
}

/**
* Create a flow to conditionally publish records to Kafka topics and then pass it on.
*
Expand Down Expand Up @@ -230,79 +136,4 @@ object Producer {
settings: ProducerSettings[K, V]): FlowWithContext[Envelope[K, V, NotUsed], C, Results[K, V, C], C, NotUsed] =
scaladsl.Producer.flowWithContext(settings).asJava

/**
* Create a flow to publish records to Kafka topics and then pass it on.
*
* The records must be wrapped in a [[pekko.kafka.ProducerMessage.Message Message]] and continue in the stream as [[pekko.kafka.ProducerMessage.Result Result]].
*
* The messages support the possibility to pass through arbitrary data, which can for example be a [[ConsumerMessage.CommittableOffset CommittableOffset]]
* or [[ConsumerMessage.CommittableOffsetBatch CommittableOffsetBatch]] that can
* be committed later in the flow.
*
* Supports sharing a Kafka Producer instance.
*
* @deprecated use `flexiFlow` instead, since 0.21
*/
@Deprecated
def flow[K, V, PassThrough](
settings: ProducerSettings[K, V],
producer: org.apache.kafka.clients.producer.Producer[K, V])
: Flow[Message[K, V, PassThrough], Result[K, V, PassThrough], NotUsed] =
flow(settings.withProducer(producer))

/**
* Create a flow to conditionally publish records to Kafka topics and then pass it on.
*
* It publishes records to Kafka topics conditionally:
*
* - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and continues in the stream as [[pekko.kafka.ProducerMessage.Result Result]]
*
* - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and continues in the stream as [[pekko.kafka.ProducerMessage.MultiResult MultiResult]]
*
* - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[pekko.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
*
* The messages support the possibility to pass through arbitrary data, which can for example be a [[ConsumerMessage.CommittableOffset CommittableOffset]]
* or [[ConsumerMessage.CommittableOffsetBatch CommittableOffsetBatch]] that can
* be committed later in the flow.
*
* Supports sharing a Kafka Producer instance.
*
* @deprecated Pass in external or shared producer using `ProducerSettings.withProducerFactory` or `ProducerSettings.withProducer`, since Alpakka Kafka 2.0.0
*/
@Deprecated
def flexiFlow[K, V, PassThrough](
settings: ProducerSettings[K, V],
producer: org.apache.kafka.clients.producer.Producer[K, V])
: Flow[Envelope[K, V, PassThrough], Results[K, V, PassThrough], NotUsed] =
flexiFlow(settings.withProducer(producer))

/**
* API MAY CHANGE
*
* Create a flow to conditionally publish records to Kafka topics and then pass it on.
*
* It publishes records to Kafka topics conditionally:
*
* - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and continues in the stream as [[pekko.kafka.ProducerMessage.Result Result]]
*
* - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and continues in the stream as [[pekko.kafka.ProducerMessage.MultiResult MultiResult]]
*
* - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[pekko.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
*
* This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html).
*
* Supports sharing a Kafka Producer instance.
*
* @tparam C the flow context type
*
* @deprecated Pass in external or shared producer using `ProducerSettings.withProducerFactory` or `ProducerSettings.withProducer`, since Alpakka Kafka 2.0.0
*/
@Deprecated
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/880")
def flowWithContext[K, V, C](
settings: ProducerSettings[K, V],
producer: org.apache.kafka.clients.producer.Producer[K, V])
: FlowWithContext[Envelope[K, V, NotUsed], C, Results[K, V, C], C, NotUsed] =
flowWithContext(settings.withProducer(producer))

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ import scala.jdk.FutureConverters._
*/
final class SendProducer[K, V] private (underlying: scaladsl.SendProducer[K, V]) {

// kept for bin-compatibility
@deprecated("use the variant with ClassicActorSystemProvider instead", "Alpakka Kafka 2.0.5")
private[kafka] def this(settings: ProducerSettings[K, V], system: ActorSystem) =
this(scaladsl.SendProducer(settings)(system))

/**
* Utility class for producing to Kafka without using Apache Pekko Streams.
* @param settings producer settings used to create or access the [[org.apache.kafka.clients.producer.Producer]]
Expand Down
Loading