Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12829: Remove deprecated Topology#addProcessor of old Processor API #17190

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

pegasas
Copy link
Contributor

@pegasas pegasas commented Sep 13, 2024

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax mjsax added the streams label Sep 28, 2024
@mjsax mjsax changed the title KAFKA-12829: Remove deprecated Topology#addProcessor of old Processor… KAFKA-12829: Remove deprecated Topology#addProcessor of old Processor API Sep 28, 2024
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Overall LGTM. I think we can remove more unused code though.

topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2)
.addProcessor("processor1", defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OldAPIStatefulProcessor should be unused now and can be removed, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, remove these deprecated API.

private Topology createStatefulTopology(final String storeName) {
return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addProcessor("processor", define(new OldAPIStatefulProcessor(storeName)), "source")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

define() should be unused now and can be removed, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

final StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String());
return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addProcessor("processor", defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defineWithStoresOldAPI() should be unused now and can be removed, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@pegasas
Copy link
Contributor Author

pegasas commented Oct 6, 2024

image
passing CI on local.

@@ -312,79 +312,6 @@ public void testDrivingSimpleTopologyWithDroppingPartitioner() {
assertTrue(outputTopic1.isEmpty());
}

@Test
public void testDrivingStatefulTopology() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing this test? Seems we should keep it. It's not marked as @Deprecated and does not say "old API" in it's name either, nor is their a comment that it would test old API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pegasas -- i don't see any reply on this comment (same for the other 3 below)

}

@Test
public void testDrivingConnectedStateStoreTopology() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pegasas -- i don't see any reply on this comment (2ed one)

@@ -1776,27 +1229,6 @@ public void process(final Record<String, String> record) {
}
}

@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we should not remove this (it's not @Deprecated) and comment says Need to be migrated, but rather rewrite to for the new processor API? Or is it unused now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pegasas -- i don't see any reply on this comment (3re one)

return () -> processor;
}

@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pegasas -- i don't see any reply on this comment (last one)

@mjsax
Copy link
Member

mjsax commented Oct 24, 2024

@pegasas -- Any updates on this PR?

@mjsax
Copy link
Member

mjsax commented Oct 30, 2024

@pegasas We are slowly approaching AK 4.0 release deadline. Would be great to push this over the finish line soon.

@pegasas
Copy link
Contributor Author

pegasas commented Oct 30, 2024

@pegasas We are slowly approaching AK 4.0 release deadline. Would be great to push this over the finish line soon.

Sorry for the delay due to busy last month.
Has replied your comments.

@mjsax
Copy link
Member

mjsax commented Nov 2, 2024

@pegasas -- I don't see any replies to the latest comment, nor an update to the PR?

@pegasas
Copy link
Contributor Author

pegasas commented Nov 2, 2024

@pegasas -- I don't see any replies to the latest comment, nor an update to the PR?

yes, I've replied to previous comments.
It seems these changes are expected.

@mjsax
Copy link
Member

mjsax commented Nov 15, 2024

@pegasas -- I am still somewhat confused. Did you see my latest in-line tagging?

These are still open questions that I think we need to resolve before we can merge this PR. And AK 4.0 release deadline is approaching, so I would like to finish this :)

@pegasas
Copy link
Contributor Author

pegasas commented Nov 18, 2024

@pegasas -- I am still somewhat confused. Did you see my latest in-line tagging?

These are still open questions that I think we need to resolve before we can merge this PR. And AK 4.0 release deadline is approaching, so I would like to finish this :)

Hi, @mjsax ,

as line318 shows,

driver = new TopologyTestDriver(createStatefulTopology(storeName), props);

which createStatefulTopology would be deprecated.

https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java#L1571-L1578

It seems my comments are disappeared several times. it is weird.

@mjsax
Copy link
Member

mjsax commented Nov 18, 2024

I believe the annotation and comment is wrong. While it does test the old API, there is no corresponding test for the new API (or did I miss it?), and thus, it should have be @SuppressWarning("deprecation") (instead of @Deprecated) and the comment should say // Old PAPI. Needs to be migrated. similar to the other cases for which I did leave a comment.

We should not remove these 4 tests, but rather rewrite all 4 to use the new PAPI, to avoid reducing test coverage. For the other test you remove in this PR, eg, testDrivingConnectedStateStoreInDifferentProcessorsTopologyWithOldAPI, there is already equivalent tests for the new PAPI (testDrivingConnectedStateStoreInDifferentProcessorsTopology) so it's ok to remove the "duplicate" test using the old API.

Does this make sense?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants