Skip to content

Commit fc625a4

Browse files
authored
Remove some deprecated code (#369)
* remove deprecated methods * compile issues * more * Create remove-deprecated-code.excludes * scalafmt * remove old imports
1 parent 10282b5 commit fc625a4

File tree

20 files changed

+47
-473
lines changed

20 files changed

+47
-473
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Remove deprecated code
19+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.ConsumerMessage#Committable.commitScaladsl")
20+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.ConsumerMessage#Committable.commitJavadsl")
21+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.Metadata.createGetCommittedOffset")
22+
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.kafka.Metadata$CommittedOffset")
23+
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.kafka.Metadata$CommittedOffset$")
24+
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.kafka.Metadata$GetCommittedOffset")
25+
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.kafka.Metadata$GetCommittedOffset$")
26+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.ProducerSettings.producerFactory")
27+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.RestrictedConsumer.committed")
28+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.MetadataClient.getCommittedOffset")
29+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.Producer.flowWithContext")
30+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.Producer.flexiFlow")
31+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.Producer.flow")
32+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.Producer.committableSink")
33+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.Producer.plainSink")
34+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.javadsl.SendProducer.this")
35+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.MetadataClient.getCommittedOffset")
36+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.Producer.flowWithContext")
37+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.Producer.flexiFlow")
38+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.Producer.flow")
39+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.Producer.committableSink")
40+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.Producer.plainSink")
41+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.scaladsl.SendProducer.apply")

core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,6 @@ object ConsumerMessage {
5757
* or a number of offsets aggregated as [[CommittableOffsetBatch]].
5858
*/
5959
@DoNotInherit trait Committable {
60-
@deprecated("use `Committer.flow` or `Committer.sink` instead of direct usage", "Alpakka Kafka 2.0.0")
61-
def commitScaladsl(): Future[Done]
62-
63-
/**
64-
* @deprecated use `Committer.flow` or `Committer.sink` instead of direct usage, since Alpakka Kafka 2.0.0
65-
*/
66-
@java.lang.Deprecated
67-
@deprecated("use `Committer.flow` or `Committer.sink` instead of direct usage", "Alpakka Kafka 2.0.0")
68-
def commitJavadsl(): CompletionStage[Done]
6960

7061
@InternalApi
7162
private[kafka] def commitInternal(): Future[Done]

core/src/main/scala/org/apache/pekko/kafka/Metadata.scala

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -176,30 +176,6 @@ object Metadata {
176176
def createGetOffsetForTimes(timestampsToSearch: java.util.Map[TopicPartition, java.lang.Long]): GetOffsetsForTimes =
177177
GetOffsetsForTimes(timestampsToSearch.asScala.view.mapValues(_.toLong).toMap)
178178

179-
/**
180-
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
181-
*/
182-
@deprecated("use `GetCommittedOffsets`", "Alpakka Kafka 2.0.3")
183-
final case class GetCommittedOffset(partition: TopicPartition) extends Request with NoSerializationVerificationNeeded
184-
185-
@deprecated("use `CommittedOffsets`", "Alpakka Kafka 2.0.3")
186-
final case class CommittedOffset(response: Try[OffsetAndMetadata], requestedPartition: TopicPartition)
187-
extends Response
188-
with NoSerializationVerificationNeeded {
189-
190-
/**
191-
* Java API
192-
*/
193-
def getResponse: Optional[OffsetAndMetadata] = Optional.ofNullable(response.toOption.orNull)
194-
}
195-
196-
/**
197-
* Java API:
198-
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
199-
*/
200-
@deprecated("use `createGetCommittedOffsets`", "Alpakka Kafka 2.0.3")
201-
def createGetCommittedOffset(partition: TopicPartition): GetCommittedOffset = GetCommittedOffset(partition)
202-
203179
/**
204180
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
205181
*/

core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,11 +229,6 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
229229
val enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]],
230230
val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]]) {
231231

232-
@deprecated(
233-
"Use createKafkaProducer(), createKafkaProducerAsync(), or createKafkaProducerCompletionStage() to get a new KafkaProducer",
234-
"Alpakka Kafka 2.0.0")
235-
def producerFactory: ProducerSettings[K, V] => Producer[K, V] = _ => createKafkaProducer()
236-
237232
/**
238233
* An id string to pass to the server when making requests. The purpose of this is to be able to track the source
239234
* of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.

core/src/main/scala/org/apache/pekko/kafka/RestrictedConsumer.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,6 @@ final class RestrictedConsumer(consumer: Consumer[_, _], duration: java.time.Dur
4343
def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit =
4444
consumer.commitSync(offsets, duration)
4545

46-
/**
47-
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#committed(TopicPartition,java.time.Duration)]]
48-
*/
49-
@deprecated("use `committed(java.util.Set[TopicPartition])`", "Alpakka Kafka 2.0.5")
50-
def committed(tp: TopicPartition): OffsetAndMetadata = consumer.committed(tp, duration)
51-
5246
/**
5347
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#committed(java.util.Set[TopicPartition],java.time.Duration)]]
5448
*/

core/src/main/scala/org/apache/pekko/kafka/Subscriptions.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ sealed trait Subscription {
4242
*/
4343
sealed trait ManualSubscription extends Subscription {
4444

45-
/** @deprecated Manual subscriptions do never rebalance, since Alpakka Kafka 1.0-RC1 */
46-
@deprecated("Manual subscription does never rebalance", "Alpakka Kafka 1.0-RC1")
45+
/** @deprecated Manual subscriptions never rebalances, since Alpakka Kafka 1.0-RC1 */
46+
@deprecated("Manual subscription never rebalances", "Alpakka Kafka 1.0-RC1")
4747
def rebalanceListener: Option[ActorRef] = None
4848

49-
/** @deprecated Manual subscriptions do never rebalance, since Alpakka Kafka 1.0-RC1 */
50-
@deprecated("Manual subscription does never rebalance", "Alpakka Kafka 1.0-RC1")
49+
/** @deprecated Manual subscriptions never rebalances, since Alpakka Kafka 1.0-RC1 */
50+
@deprecated("Manual subscription never rebalances", "Alpakka Kafka 1.0-RC1")
5151
def withRebalanceListener(ref: ActorRef): ManualSubscription
5252
}
5353

core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -713,15 +713,6 @@ import scala.util.control.NonFatal
713713
.filterNot(_._2 == null)
714714
.toMap
715715
})
716-
717-
case req: Metadata.GetCommittedOffset @nowarn("cat=deprecation") =>
718-
@nowarn("cat=deprecation") val resp = Metadata.CommittedOffset(
719-
Try {
720-
@nowarn("cat=deprecation") val offset = consumer.committed(req.partition, settings.getMetadataRequestTimeout)
721-
offset
722-
},
723-
req.partition)
724-
resp
725716
}
726717

727718
private def stopFromMessage(msg: StopLike) = msg match {

core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,6 @@ private[kafka] trait OffsetContextBuilder[K, V]
142142
override val partitionOffset: ConsumerMessage.PartitionOffset,
143143
override val metadata: String)(
144144
val committer: KafkaAsyncConsumerCommitterRef) extends CommittableOffsetMetadata {
145-
override def commitScaladsl(): Future[Done] = commitInternal()
146-
override def commitJavadsl(): CompletionStage[Done] = commitInternal().asJava
147145
override def commitInternal(): Future[Done] = KafkaAsyncConsumerCommitterRef.commit(this)
148146
override val batchSize: Long = 1
149147
}
@@ -223,8 +221,6 @@ private[kafka] final class CommittableOffsetBatchImpl(
223221
override def toString: String =
224222
s"CommittableOffsetBatch(batchSize=$batchSize, ${offsets.mkString(", ")})"
225223

226-
override def commitScaladsl(): Future[Done] = commitInternal()
227-
228224
override def commitInternal(): Future[Done] = KafkaAsyncConsumerCommitterRef.commit(this)
229225

230226
override def tellCommit(): CommittableOffsetBatch = tellCommitWithPriority(emergency = false)
@@ -242,8 +238,6 @@ private[kafka] final class CommittableOffsetBatchImpl(
242238
new CommittableOffsetBatchImpl(newOffsets, newCommitters, newOffsets.size.toLong)
243239
}
244240

245-
override def commitJavadsl(): CompletionStage[Done] = commitInternal().asJava
246-
247241
/**
248242
* @return true if the batch contains no commits.
249243
*/

core/src/main/scala/org/apache/pekko/kafka/javadsl/MetadataClient.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,6 @@ class MetadataClient private (metadataClient: pekko.kafka.scaladsl.MetadataClien
7575
}(ExecutionContext.parasitic)
7676
.asJava
7777

78-
@deprecated("use `getCommittedOffsets`", "Alpakka Kafka 2.0.3")
79-
def getCommittedOffset(partition: TopicPartition): CompletionStage[OffsetAndMetadata] =
80-
metadataClient
81-
.getCommittedOffset(partition)
82-
.asJava
83-
8478
def getCommittedOffsets(
8579
partitions: java.util.Set[TopicPartition]): CompletionStage[java.util.Map[TopicPartition, OffsetAndMetadata]] =
8680
metadataClient

core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala

Lines changed: 0 additions & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -45,78 +45,6 @@ object Producer {
4545
.mapMaterializedValue(_.asJava)
4646
.asJava
4747

48-
/**
49-
* Create a sink for publishing records to Kafka topics.
50-
*
51-
* The [[org.apache.kafka.clients.producer.ProducerRecord Kafka ProducerRecord]] contains the topic name to which the record is being sent, an optional
52-
* partition number, and an optional key and value.
53-
*
54-
* Supports sharing a Kafka Producer instance.
55-
*
56-
* @deprecated Pass in external or shared producer using `ProducerSettings.withProducerFactory` or `ProducerSettings.withProducer`, since Alpakka Kafka 2.0.0
57-
*/
58-
@Deprecated
59-
def plainSink[K, V](
60-
settings: ProducerSettings[K, V],
61-
producer: org.apache.kafka.clients.producer.Producer[K, V]): Sink[ProducerRecord[K, V], CompletionStage[Done]] =
62-
plainSink(settings.withProducer(producer))
63-
64-
/**
65-
* Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]]
66-
* from a [[Consumer.committableSource]]. It will commit the consumer offset when the message has
67-
* been published successfully to the topic.
68-
*
69-
* It publishes records to Kafka topics conditionally:
70-
*
71-
* - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and commits the offset
72-
*
73-
* - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and commits the offset
74-
*
75-
* - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, but commits the offset
76-
*
77-
* Note that there is a risk that something fails after publishing but before
78-
* committing, so it is "at-least once delivery" semantics.
79-
*
80-
* @deprecated use `committableSink(ProducerSettings, CommitterSettings)` instead, since Alpakka Kafka 2.0.0
81-
*/
82-
@Deprecated
83-
def committableSink[K, V, IN <: Envelope[K, V, ConsumerMessage.Committable]](
84-
settings: ProducerSettings[K, V]): Sink[IN, CompletionStage[Done]] = {
85-
@nowarn("cat=deprecation")
86-
val sink: Sink[IN, CompletionStage[Done]] = scaladsl.Producer
87-
.committableSink(settings)
88-
.mapMaterializedValue(_.asJava)
89-
.asJava
90-
sink
91-
}
92-
93-
/**
94-
* Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]]
95-
* from a [[Consumer.committableSource]]. It will commit the consumer offset when the message has
96-
* been published successfully to the topic.
97-
*
98-
* It publishes records to Kafka topics conditionally:
99-
*
100-
* - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and commits the offset
101-
*
102-
* - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and commits the offset
103-
*
104-
* - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, but commits the offset
105-
*
106-
* Note that there is always a risk that something fails after publishing but before
107-
* committing, so it is "at-least once delivery" semantics.
108-
*
109-
* Supports sharing a Kafka Producer instance.
110-
*
111-
* @deprecated use `committableSink(ProducerSettings, CommitterSettings)` instead, since Alpakka Kafka 2.0.0
112-
*/
113-
@Deprecated
114-
def committableSink[K, V](
115-
settings: ProducerSettings[K, V],
116-
producer: org.apache.kafka.clients.producer.Producer[K, V])
117-
: Sink[Envelope[K, V, ConsumerMessage.Committable], CompletionStage[Done]] =
118-
committableSink(settings.withProducer(producer))
119-
12048
/**
12149
* Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]]
12250
* from a [[Consumer.committableSource]]. The offsets are batched and committed regularly.
@@ -164,28 +92,6 @@ object Producer {
16492
override def apply(p: japi.Pair[IN, C]) = p.first.withPassThrough(p.second)
16593
})
16694

167-
/**
168-
* Create a flow to publish records to Kafka topics and then pass it on.
169-
*
170-
* The records must be wrapped in a [[pekko.kafka.ProducerMessage.Message Message]] and continue in the stream as [[pekko.kafka.ProducerMessage.Result Result]].
171-
*
172-
* The messages support the possibility to pass through arbitrary data, which can for example be a [[ConsumerMessage.CommittableOffset CommittableOffset]]
173-
* or [[ConsumerMessage.CommittableOffsetBatch CommittableOffsetBatch]] that can
174-
* be committed later in the flow.
175-
*
176-
* @deprecated use `flexiFlow` instead, since 0.21
177-
*/
178-
@Deprecated
179-
def flow[K, V, PassThrough](
180-
settings: ProducerSettings[K, V]): Flow[Message[K, V, PassThrough], Result[K, V, PassThrough], NotUsed] = {
181-
@nowarn("cat=deprecation")
182-
val flow = scaladsl.Producer
183-
.flow(settings)
184-
.asJava
185-
.asInstanceOf[Flow[Message[K, V, PassThrough], Result[K, V, PassThrough], NotUsed]]
186-
flow
187-
}
188-
18995
/**
19096
* Create a flow to conditionally publish records to Kafka topics and then pass it on.
19197
*
@@ -230,79 +136,4 @@ object Producer {
230136
settings: ProducerSettings[K, V]): FlowWithContext[Envelope[K, V, NotUsed], C, Results[K, V, C], C, NotUsed] =
231137
scaladsl.Producer.flowWithContext(settings).asJava
232138

233-
/**
234-
* Create a flow to publish records to Kafka topics and then pass it on.
235-
*
236-
* The records must be wrapped in a [[pekko.kafka.ProducerMessage.Message Message]] and continue in the stream as [[pekko.kafka.ProducerMessage.Result Result]].
237-
*
238-
* The messages support the possibility to pass through arbitrary data, which can for example be a [[ConsumerMessage.CommittableOffset CommittableOffset]]
239-
* or [[ConsumerMessage.CommittableOffsetBatch CommittableOffsetBatch]] that can
240-
* be committed later in the flow.
241-
*
242-
* Supports sharing a Kafka Producer instance.
243-
*
244-
* @deprecated use `flexiFlow` instead, since 0.21
245-
*/
246-
@Deprecated
247-
def flow[K, V, PassThrough](
248-
settings: ProducerSettings[K, V],
249-
producer: org.apache.kafka.clients.producer.Producer[K, V])
250-
: Flow[Message[K, V, PassThrough], Result[K, V, PassThrough], NotUsed] =
251-
flow(settings.withProducer(producer))
252-
253-
/**
254-
* Create a flow to conditionally publish records to Kafka topics and then pass it on.
255-
*
256-
* It publishes records to Kafka topics conditionally:
257-
*
258-
* - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and continues in the stream as [[pekko.kafka.ProducerMessage.Result Result]]
259-
*
260-
* - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and continues in the stream as [[pekko.kafka.ProducerMessage.MultiResult MultiResult]]
261-
*
262-
* - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[pekko.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
263-
*
264-
* The messages support the possibility to pass through arbitrary data, which can for example be a [[ConsumerMessage.CommittableOffset CommittableOffset]]
265-
* or [[ConsumerMessage.CommittableOffsetBatch CommittableOffsetBatch]] that can
266-
* be committed later in the flow.
267-
*
268-
* Supports sharing a Kafka Producer instance.
269-
*
270-
* @deprecated Pass in external or shared producer using `ProducerSettings.withProducerFactory` or `ProducerSettings.withProducer`, since Alpakka Kafka 2.0.0
271-
*/
272-
@Deprecated
273-
def flexiFlow[K, V, PassThrough](
274-
settings: ProducerSettings[K, V],
275-
producer: org.apache.kafka.clients.producer.Producer[K, V])
276-
: Flow[Envelope[K, V, PassThrough], Results[K, V, PassThrough], NotUsed] =
277-
flexiFlow(settings.withProducer(producer))
278-
279-
/**
280-
* API MAY CHANGE
281-
*
282-
* Create a flow to conditionally publish records to Kafka topics and then pass it on.
283-
*
284-
* It publishes records to Kafka topics conditionally:
285-
*
286-
* - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and continues in the stream as [[pekko.kafka.ProducerMessage.Result Result]]
287-
*
288-
* - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and continues in the stream as [[pekko.kafka.ProducerMessage.MultiResult MultiResult]]
289-
*
290-
* - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[pekko.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
291-
*
292-
* This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html).
293-
*
294-
* Supports sharing a Kafka Producer instance.
295-
*
296-
* @tparam C the flow context type
297-
*
298-
* @deprecated Pass in external or shared producer using `ProducerSettings.withProducerFactory` or `ProducerSettings.withProducer`, since Alpakka Kafka 2.0.0
299-
*/
300-
@Deprecated
301-
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/880")
302-
def flowWithContext[K, V, C](
303-
settings: ProducerSettings[K, V],
304-
producer: org.apache.kafka.clients.producer.Producer[K, V])
305-
: FlowWithContext[Envelope[K, V, NotUsed], C, Results[K, V, C], C, NotUsed] =
306-
flowWithContext(settings.withProducer(producer))
307-
308139
}

0 commit comments

Comments
 (0)