Skip to content

Commit

Permalink
KAFKA-12823 Remove Deprecated method KStream#through (#16761)
Browse files Browse the repository at this point in the history
Implements KIP-1087

Reviewers: Matthias J. Sax <[email protected]>, Lucas Brutschy <[email protected]>, Anna Sophie Blee-Goldman <[email protected]>
  • Loading branch information
ardada2468 authored Sep 28, 2024
1 parent 235cafa commit 0569603
Show file tree
Hide file tree
Showing 9 changed files with 5 additions and 245 deletions.
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/tools/StreamsResetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package kafka.tools;


import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

Expand Down
17 changes: 1 addition & 16 deletions docs/streams/developer-guide/app-reset-tool.html
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@
<p>You can reset an application and force it to reprocess its data from scratch by using the application reset tool.
This can be useful for development and testing, or when fixing bugs.</p>
<p>The application reset tool handles the Kafka Streams <a class="reference internal" href="manage-topics.html#streams-developer-guide-topics-user"><span class="std std-ref">user topics</span></a> (input,
output, and intermediate topics) and <a class="reference internal" href="manage-topics.html#streams-developer-guide-topics-internal"><span class="std std-ref">internal topics</span></a> differently
and output) and <a class="reference internal" href="manage-topics.html#streams-developer-guide-topics-internal"><span class="std std-ref">internal topics</span></a> differently
when resetting the application.</p>
<p>Here&#8217;s what the application reset tool does for each topic type:</p>
<ul class="simple">
<li>Input topics: Reset offsets to specified position (by default to the beginning of the topic).</li>
<li>Intermediate topics: Skip to the end of the topic, i.e., set the application&#8217;s committed consumer offsets for all partitions to each partition&#8217;s <code class="docutils literal"><span class="pre">logSize</span></code> (for consumer group <code class="docutils literal"><span class="pre">application.id</span></code>).</li>
<li>Internal topics: Delete the internal topic (this automatically deletes any committed offsets).</li>
</ul>
<p>The application reset tool does not:</p>
Expand All @@ -61,16 +60,6 @@
</li>
<li><p class="first">Use this tool with care and double-check its parameters: If you provide wrong parameter values (e.g., typos in <code class="docutils literal"><span class="pre">application.id</span></code>) or specify parameters inconsistently (e.g., specify the wrong input topics for the application), this tool might invalidate the application&#8217;s state or even impact other applications, consumer groups, or your Kafka topics.</p>
</li>
<li><p class="first">You should manually delete and re-create any intermediate topics before running the application reset tool. This will free up disk space in Kafka brokers.</p>
</li>
<li><p class="first">You should delete and recreate intermediate topics before running the application reset tool, unless the following applies:</p>
<blockquote>
<div><ul class="simple">
<li>You have external downstream consumers for the application&#8217;s intermediate topics.</li>
<li>You are in a development environment where manually deleting and re-creating intermediate topics is unnecessary.</li>
</ul>
</div></blockquote>
</li>
</ul>
</dd>
</dl>
Expand Down Expand Up @@ -106,10 +95,6 @@ <h2>Step 1: Run the application reset tool<a class="headerlink" href="#step-1-ru
topics. For these topics, the tool will
reset the offset to the earliest
available offset.
--intermediate-topics &lt;String: list&gt; Comma-separated list of intermediate user
topics (topics used in the through()
method). For these topics, the tool
will skip to the end.
--internal-topics &lt;String: list&gt; Comma-separated list of internal topics
to delete. Must be a subset of the
internal topics marked for deletion by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,46 +797,6 @@ <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V,
*/
KStream<K, V> merge(final KStream<K, V> stream, final Named named);

/**
* Materialize this stream to a topic and creates a new {@code KStream} from the topic using default serializers,
* deserializers, and producer's default partitioning strategy.
* The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
* started).
* <p>
* This is similar to calling {@link #to(String) #to(someTopicName)} and
* {@link StreamsBuilder#stream(String) StreamsBuilder#stream(someTopicName)}.
* Note that {@code through()} uses a hard coded {@link org.apache.kafka.streams.processor.FailOnInvalidTimestamp
* timestamp extractor} and does not allow to customize it, to ensure correct timestamp propagation.
*
* @param topic the topic name
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
* @deprecated since 2.6; use {@link #repartition()} instead
*/
// TODO: when removed, update `StreamsResetter` description of --intermediate-topics
@Deprecated
KStream<K, V> through(final String topic);

/**
* Materialize this stream to a topic and creates a new {@code KStream} from the topic using the
* {@link Produced} instance for configuration of the {@link Serde key serde}, {@link Serde value serde},
* and {@link StreamPartitioner}.
* The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
* started).
* <p>
* This is similar to calling {@link #to(String, Produced) to(someTopic, Produced.with(keySerde, valueSerde)}
* and {@link StreamsBuilder#stream(String, Consumed) StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde))}.
* Note that {@code through()} uses a hard coded {@link org.apache.kafka.streams.processor.FailOnInvalidTimestamp
* timestamp extractor} and does not allow to customize it, to ensure correct timestamp propagation.
*
* @param topic the topic name
* @param produced the options to use when producing to the topic
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
* @deprecated since 2.6; use {@link #repartition(Repartitioned)} instead
*/
@Deprecated
KStream<K, V> through(final String topic,
final Produced<K, V> produced);

/**
* Materialize this stream to an auto-generated repartition topic and create a new {@code KStream}
* from the auto-generated topic using default serializers, deserializers, and producer's default partitioning strategy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.kafka.streams.kstream.internals.graph.StreamToTableNode;
import org.apache.kafka.streams.kstream.internals.graph.UnoptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.UnoptimizableRepartitionNode.UnoptimizableRepartitionNodeBuilder;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
Expand Down Expand Up @@ -498,39 +497,6 @@ private KStream<K, V> merge(final InternalStreamsBuilder builder,
builder);
}

@Deprecated
@Override
public KStream<K, V> through(final String topic) {
return through(topic, Produced.with(keySerde, valueSerde, null));
}

@Deprecated
@Override
public KStream<K, V> through(final String topic,
final Produced<K, V> produced) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(produced, "produced can't be null");

final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
if (producedInternal.keySerde() == null) {
producedInternal.withKeySerde(keySerde);
}
if (producedInternal.valueSerde() == null) {
producedInternal.withValueSerde(valueSerde);
}
to(topic, producedInternal);

return builder.stream(
Collections.singleton(topic),
new ConsumedInternal<>(
producedInternal.keySerde(),
producedInternal.valueSerde(),
new FailOnInvalidTimestamp(),
null
)
);
}

@Override
public KStream<K, V> repartition() {
return doRepartition(Repartitioned.as(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,28 +332,6 @@ public void shouldProcessingFromSinkTopic() {
processorSupplier.theCapturedProcessor().processed());
}

@Deprecated
@Test
public void shouldProcessViaThroughTopic() {
final KStream<String, String> source = builder.stream("topic-source");
final KStream<String, String> through = source.through("topic-sink");

final MockApiProcessorSupplier<String, String, Void, Void> sourceProcessorSupplier = new MockApiProcessorSupplier<>();
source.process(sourceProcessorSupplier);

final MockApiProcessorSupplier<String, String, Void, Void> throughProcessorSupplier = new MockApiProcessorSupplier<>();
through.process(throughProcessorSupplier);

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("topic-source", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
inputTopic.pipeInput("A", "aa");
}

assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed());
assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed());
}

@Test
public void shouldProcessViaRepartitionTopic() {
final KStream<String, String> source = builder.stream("topic-source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,33 +515,6 @@ public void shouldNotAllowNullNamedOnMerge() {
assertThat(exception.getMessage(), equalTo("named can't be null"));
}

@Deprecated // specifically testing the deprecated variant
@Test
public void shouldNotAllowNullTopicOnThrough() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.through(null));
assertThat(exception.getMessage(), equalTo("topic can't be null"));
}

@Deprecated // specifically testing the deprecated variant
@Test
public void shouldNotAllowNullTopicOnThroughWithProduced() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.through(null, Produced.as("through")));
assertThat(exception.getMessage(), equalTo("topic can't be null"));
}

@Deprecated // specifically testing the deprecated variant
@Test
public void shouldNotAllowNullProducedOnThrough() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.through("topic", null));
assertThat(exception.getMessage(), equalTo("produced can't be null"));
}

@Test
public void shouldNotAllowNullTopicOnTo() {
final NullPointerException exception = assertThrows(
Expand Down Expand Up @@ -1277,10 +1250,6 @@ public void shouldPreserveSerdesForOperators() {
assertNull(((AbstractStream) stream1.merge(stream1)).keySerde());
assertNull(((AbstractStream) stream1.merge(stream1)).valueSerde());

assertEquals(((AbstractStream) stream1.through("topic-3")).keySerde(), consumedInternal.keySerde());
assertEquals(((AbstractStream) stream1.through("topic-3")).valueSerde(), consumedInternal.valueSerde());
assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).keySerde(), mySerde);
assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).valueSerde(), mySerde);

assertEquals(((AbstractStream) stream1.repartition()).keySerde(), consumedInternal.keySerde());
assertEquals(((AbstractStream) stream1.repartition()).valueSerde(), consumedInternal.valueSerde());
Expand Down Expand Up @@ -1329,24 +1298,6 @@ public void shouldPreserveSerdesForOperators() {
assertNull(((AbstractStream) stream1.leftJoin(table2, selector, joiner)).valueSerde());
}

@Deprecated
@Test
public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), stringConsumed);
final KStream<String, String> stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), stringConsumed);

stream1.to("topic-5");
stream2.through("topic-6");

final ProcessorTopology processorTopology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology();
assertThat(processorTopology.source("topic-6").timestampExtractor(), instanceOf(FailOnInvalidTimestamp.class));
assertNull(processorTopology.source("topic-4").timestampExtractor());
assertNull(processorTopology.source("topic-3").timestampExtractor());
assertNull(processorTopology.source("topic-2").timestampExtractor());
assertNull(processorTopology.source("topic-1").timestampExtractor());
}

@Test
public void shouldUseRecordMetadataTimestampExtractorWithRepartition() {
final StreamsBuilder builder = new StreamsBuilder();
Expand All @@ -1364,22 +1315,6 @@ public void shouldUseRecordMetadataTimestampExtractorWithRepartition() {
assertNull(processorTopology.source("topic-1").timestampExtractor());
}

@Deprecated
@Test
public void shouldSendDataThroughTopicUsingProduced() {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "topic";
final KStream<String, String> stream = builder.stream(input, stringConsumed);
stream.through("through-topic", Produced.with(Serdes.String(), Serdes.String())).process(processorSupplier);

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
inputTopic.pipeInput("a", "b");
}
assertThat(processorSupplier.theCapturedProcessor().processed(), equalTo(Collections.singletonList(new KeyValueTimestamp<>("a", "b", 0))));
}

@Test
public void shouldSendDataThroughRepartitionTopicUsingRepartitioned() {
final StreamsBuilder builder = new StreamsBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,19 +195,6 @@ public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChang
assertEquals(2, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
}

// no need to optimize as user has already performed the repartitioning manually
@Deprecated
@Test
public void shouldNotOptimizeWhenAThroughOperationIsDone() {
final Topology attemptedOptimize = getTopologyWithThroughOperation(StreamsConfig.OPTIMIZE);
final Topology noOptimization = getTopologyWithThroughOperation(StreamsConfig.NO_OPTIMIZATION);

assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString());
assertEquals(0, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
assertEquals(0, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));

}

@Test
public void shouldOptimizeSeveralMergeNodesWithCommonKeyChangingParent() {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
Expand Down Expand Up @@ -256,23 +243,6 @@ private Topology getTopologyWithChangingValuesAfterChangingKey(final String opti

}

@Deprecated // specifically testing the deprecated variant
private Topology getTopologyWithThroughOperation(final String optimizeConfig) {

final StreamsBuilder builder = new StreamsBuilder();
final Properties properties = new Properties();
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig);

final KStream<String, String> inputStream = builder.stream("input");
final KStream<String, String> mappedKeyStream = inputStream.selectKey((k, v) -> k + v).through("through-topic");

mappedKeyStream.groupByKey().count().toStream().to("output");
mappedKeyStream.groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000))).count().toStream().to("windowed-output");

return builder.build(properties);

}

private Topology getTopologyWithRepartitionOperation(final String optimizeConfig) {
final StreamsBuilder builder = new StreamsBuilder();
final Properties properties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,40 +349,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
def split(named: Named): BranchedKStream[K, V] =
new BranchedKStream(inner.split(named))

/**
* Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Produced` instance for
* configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`
* <p>
* The user can either supply the `Produced` instance as an implicit in scope or they can also provide implicit
* key and value serdes that will be converted to a `Produced` instance implicitly.
* <p>
* {{{
* Example:
*
* // brings implicit serdes in scope
* import Serdes._
*
* //..
* val clicksPerRegion: KStream[String, Long] = //..
*
* // Implicit serdes in scope will generate an implicit Produced instance, which
* // will be passed automatically to the call of through below
* clicksPerRegion.through(topic)
*
* // Similarly you can create an implicit Produced and it will be passed implicitly
* // to the through call
* }}}
*
* @param topic the topic name
* @param produced the instance of Produced that gives the serdes and `StreamPartitioner`
* @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#through`
* @deprecated use `repartition()` instead
*/
@deprecated("use `repartition()` instead", "2.6.0")
def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V] =
new KStream(inner.through(topic, produced))

/**
* Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Repartitioned` instance
* for configuration of the `Serde key serde`, `Serde value serde`, `StreamPartitioner`, number of partitions, and
Expand Down
Loading

0 comments on commit 0569603

Please sign in to comment.