Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Cache topic resolution in TopicIds set #17285

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
165ee41
MINOR: Cache topic resolution in TopicIds set
dajac Jul 5, 2024
a59e8c5
MINOR: fix failed cases in FeatureCommandTest (#17287)
FrankYang0529 Sep 26, 2024
03b4cdc
KAFKA-16792: Enable consumer unit tests that fail to fetch offsets on…
FrankYang0529 Sep 26, 2024
48c022c
KAFKA-14562 [1/2]: Implement epoch bump after every transaction (#16719)
jolshan Sep 26, 2024
655cddd
KAFKA-17608, KAFKA-17604, KAFKA-16963; KRaft controller crashes when …
ahuang98 Sep 26, 2024
536dfbb
KAFKA-17621; Reduce logging verbosity on ConsumerGroupHeartbeat path …
dajac Sep 26, 2024
25b048e
KAFKA-17563 Move `RequestConvertToJson` to server module (#17223)
xijiu Sep 26, 2024
0d8485a
KAFKA-17154: New consumer subscribe may join group without a call to …
FrankYang0529 Sep 26, 2024
5aaf623
KAFKA-16683 Extract security-related helpers from scala.TestUtils to …
FrankYang0529 Sep 26, 2024
813d79c
MINOR: Remove duplicate method (#17291)
lianetm Sep 26, 2024
99e9f81
KAFKA-17277: [2/2] Add feature dependency command to the storage and …
rreddy-22 Sep 26, 2024
0589e93
KAFKA-17488: Cleanup (test) code for Kafka Streams "metric version" (…
fonsdant Sep 27, 2024
fe5056e
KAFKA-16331: remove KafkaClientSupplier from StreamsProducer (#17259)
mjsax Sep 27, 2024
2059889
KAFKA-17628 New workflows for automating run approvals (#17290)
mumrah Sep 27, 2024
3b555b8
KAFKA-17626: Move common fetch related classes from storage to server…
apoorvmittal10 Sep 27, 2024
2df3ff0
MINOR: Set default group.consumer.migration.policy to BIDIRECTIONAL (…
dongnuo123 Sep 27, 2024
945023d
MINOR: Fix a race and add JMH bench for HdrHistogram (#17221)
dimitarndimitrov Sep 27, 2024
f0a2d43
KAFKA-17581: AsyncKafkaConsumer can't unsubscribe invalid topics (#17…
FrankYang0529 Sep 27, 2024
f438bb4
KAFKA-17619: Remove zk type and instance from ClusterTest (#17284)
FrankYang0529 Sep 27, 2024
b905a9f
KAFKA-14572: Migrate EmbeddedKafkaCluster used by Streams integration…
OmniaGM Sep 27, 2024
d9254d5
KAFKA-17636 Fix missing SCRAM bootstrap records (#17305)
mumrah Sep 28, 2024
0b97fa6
MINOR Don't run tests for Draft PRs (#17299)
mumrah Sep 28, 2024
1f9c7ae
KAFKA-6197: Update Streams API and Javadoc references in documentatio…
devanshikhatsuriya Sep 28, 2024
e94ed0f
fixup: add toString implementation to TopicResolvers
squah-confluent Sep 28, 2024
7281eb2
fixup: make BaseTopicResolver private
squah-confluent Sep 28, 2024
02653fc
Revert bad merge
squah-confluent Sep 30, 2024
f5d98e0
Merge remote-tracking branch 'origin/trunk' into squah-cache-topicids…
squah-confluent Sep 30, 2024
7e718fb
Merge pull request #135 from confluentinc/squah-cache-topicids-topic-…
squah-confluent Sep 30, 2024
3c71192
fixup: remove AbstractTopicResolver, inline equals and hashCode into …
squah-confluent Oct 2, 2024
43a881f
fixup: add comment about lifetime of CachedTopicResolver
squah-confluent Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,14 @@ public TargetAssignmentBuilder<T> removeMember(
*/
public TargetAssignmentResult build() throws PartitionAssignorException {
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new HashMap<>();
TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(topicsImage);

// Prepare the member spec for all members.
members.forEach((memberId, member) ->
memberSpecs.put(memberId, createMemberSubscriptionAndAssignment(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
topicsImage
topicResolver
))
);

Expand All @@ -355,7 +356,7 @@ public TargetAssignmentResult build() throws PartitionAssignorException {
memberSpecs.put(memberId, createMemberSubscriptionAndAssignment(
updatedMemberOrNull,
assignment,
topicsImage
topicResolver
));
}
});
Expand Down Expand Up @@ -420,12 +421,12 @@ private Assignment newMemberAssignment(
static <T extends ModernGroupMember> MemberSubscriptionAndAssignmentImpl createMemberSubscriptionAndAssignment(
T member,
Assignment memberAssignment,
TopicsImage topicsImage
TopicIds.TopicResolver topicResolver
) {
return new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.ofNullable(member.instanceId()),
new TopicIds(member.subscribedTopicNames(), topicsImage),
new TopicIds(member.subscribedTopicNames(), topicResolver),
memberAssignment
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,169 @@
import org.apache.kafka.image.TopicsImage;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;

/**
* TopicIds is initialized with topic names (String) but exposes a Set of topic ids (Uuid) to the
* user and performs the conversion lazily with TopicsImage.
* user and performs the conversion lazily with a TopicResolver backed by a TopicsImage.
*/
public class TopicIds implements Set<Uuid> {
/**
* Converts between topic ids (Uuids) and topic names (Strings).
*/
public interface TopicResolver {
/**
* @return The TopicsImage used by the resolver.
*/
TopicsImage image();

/**
* Converts a topic id to a topic name.
*
* @param id The topic id.
* @return The topic name for the given topic id, or null if the topic does not exist.
*/
String name(Uuid id);

/**
* Converts a topic name to a topic id.
*
* @param name The topic name.
* @return The topic id for the given topic name, or null if the topic does not exist.
*/
Uuid id(String name);

/**
* Clears any cached data.
*
* Used for benchmarking purposes.
*/
void clear();
}

/**
* A base implementation of TopicResolver.
*
* Provides an implementation of equals and hashCode based on the underlying TopicsImage.
*/
public abstract static class BaseTopicResolver implements TopicResolver {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we keep it private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it doesn't need to be public.

protected final TopicsImage image;

public BaseTopicResolver(
TopicsImage image
) {
this.image = Objects.requireNonNull(image);
}

@Override
public final TopicsImage image() {
return image;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || !(o instanceof TopicResolver)) return false;

TopicResolver that = (TopicResolver) o;

return Objects.equals(image, that.image());
}

@Override
public int hashCode() {
return image.hashCode();
}
}

/**
* A TopicResolver without any caching.
*/
public static class DefaultTopicResolver extends BaseTopicResolver {
public DefaultTopicResolver(
TopicsImage image
) {
super(image);
}

@Override
public String name(Uuid id) {
TopicImage topic = image.getTopic(id);
if (topic == null) return null;
return topic.name();
}

@Override
public Uuid id(String name) {
TopicImage topic = image.getTopic(name);
if (topic == null) return null;
return topic.id();
}

@Override
public void clear() {}
}

/**
* A TopicResolver that caches results.
*/
public static class CachedTopicResolver extends BaseTopicResolver {
private final Map<String, Uuid> topicIds = new HashMap<>();
private final Map<Uuid, String> topicNames = new HashMap<>();

public CachedTopicResolver(
TopicsImage image
) {
super(image);
}

@Override
public String name(Uuid id) {
return topicNames.computeIfAbsent(id, __ -> {
TopicImage topic = image.getTopic(id);
if (topic == null) return null;
return topic.name();
});
}

@Override
public Uuid id(String name) {
return topicIds.computeIfAbsent(name, __ -> {
TopicImage topic = image.getTopic(name);
if (topic == null) return null;
return topic.id();
});
}

@Override
public void clear() {
this.topicNames.clear();
this.topicIds.clear();
}
}

private final Set<String> topicNames;
private final TopicsImage image;
private final TopicResolver resolver;

public TopicIds(
Set<String> topicNames,
TopicsImage image
) {
this.topicNames = Objects.requireNonNull(topicNames);
this.image = Objects.requireNonNull(image);
this.resolver = new DefaultTopicResolver(image);
}

public TopicIds(
Set<String> topicNames,
TopicResolver resolver
) {
this.topicNames = Objects.requireNonNull(topicNames);
this.resolver = Objects.requireNonNull(resolver);
}

@Override
Expand All @@ -56,24 +200,24 @@ public boolean isEmpty() {
public boolean contains(Object o) {
if (o instanceof Uuid) {
Uuid topicId = (Uuid) o;
TopicImage topicImage = image.getTopic(topicId);
if (topicImage == null) return false;
return topicNames.contains(topicImage.name());
String topicName = resolver.name(topicId);
if (topicName == null) return false;
return topicNames.contains(topicName);
}
return false;
}

private static class TopicIdIterator implements Iterator<Uuid> {
final Iterator<String> iterator;
final TopicsImage image;
final TopicResolver resolver;
private Uuid next = null;

private TopicIdIterator(
Iterator<String> iterator,
TopicsImage image
TopicResolver resolver
) {
this.iterator = Objects.requireNonNull(iterator);
this.image = Objects.requireNonNull(image);
this.resolver = Objects.requireNonNull(resolver);
}

@Override
Expand All @@ -85,9 +229,9 @@ public boolean hasNext() {
return false;
}
String next = iterator.next();
TopicImage topicImage = image.getTopic(next);
if (topicImage != null) {
result = topicImage.id();
Uuid topicId = resolver.id(next);
if (topicId != null) {
result = topicId;
}
} while (result == null);
next = result;
Expand All @@ -105,7 +249,7 @@ public Uuid next() {

@Override
public Iterator<Uuid> iterator() {
return new TopicIdIterator(topicNames.iterator(), image);
return new TopicIdIterator(topicNames.iterator(), resolver);
}

@Override
Expand Down Expand Up @@ -164,20 +308,20 @@ public boolean equals(Object o) {
TopicIds uuids = (TopicIds) o;

if (!Objects.equals(topicNames, uuids.topicNames)) return false;
return Objects.equals(image, uuids.image);
return Objects.equals(resolver, uuids.resolver);
}

@Override
public int hashCode() {
int result = topicNames.hashCode();
result = 31 * result + image.hashCode();
result = 31 * result + resolver.hashCode();
return result;
}

@Override
public String toString() {
return "TopicIds(topicNames=" + topicNames +
", image=" + image +
", resolver=" + resolver +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If we toString it here, should we add toString methods to the resolvers too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a toString implementation that returns <class name>(image=<image>).

')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public void prepareMemberAssignment(

public TargetAssignmentBuilder.TargetAssignmentResult build() {
TopicsImage topicsImage = topicsImageBuilder.build().topics();
TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
// Prepare expected member specs.
Map<String, MemberSubscriptionAndAssignmentImpl> memberSubscriptions = new HashMap<>();

Expand All @@ -169,7 +170,7 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() {
memberSubscriptions.put(memberId, createMemberSubscriptionAndAssignment(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
topicsImage
topicResolver
))
);

Expand All @@ -192,7 +193,7 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() {
memberSubscriptions.put(memberId, createMemberSubscriptionAndAssignment(
updatedMemberOrNull,
assignment,
topicsImage
topicResolver
));
}
});
Expand Down Expand Up @@ -263,6 +264,7 @@ public void testCreateMemberSubscriptionSpecImpl() {
.addTopic(barTopicId, "bar", 5)
.build()
.topics();
TopicIds.TopicResolver topicResolver = new TopicIds.DefaultTopicResolver(topicsImage);

ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id")
.setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
Expand All @@ -278,7 +280,7 @@ public void testCreateMemberSubscriptionSpecImpl() {
MemberSubscription subscriptionSpec = createMemberSubscriptionAndAssignment(
member,
assignment,
topicsImage
topicResolver
);

assertEquals(new MemberSubscriptionAndAssignmentImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ public void testTopicNamesCannotBeNull() {

@Test
public void testTopicsImageCannotBeNull() {
assertThrows(NullPointerException.class, () -> new TopicIds(Collections.emptySet(), null));
assertThrows(NullPointerException.class, () -> new TopicIds(Collections.emptySet(), (TopicsImage) null));
}

@Test
public void testTopicResolverCannotBeNull() {
assertThrows(NullPointerException.class, () -> new TopicIds(Collections.emptySet(), (TopicIds.TopicResolver) null));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ public static TopicsImage createTopicsImage(Map<String, TopicMetadata> subscript
*
* @param members The ConsumerGroupMembers.
* @param subscriptionType  The group's subscription type.
* @param topicsImage The TopicsImage to use.
* @param topicResolver The TopicResolver to use.
* @return The new GroupSpec.
*/
public static GroupSpec createGroupSpec(
Map<String, ConsumerGroupMember> members,
SubscriptionType subscriptionType,
TopicsImage topicsImage
TopicIds.TopicResolver topicResolver
) {
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new HashMap<>();

Expand All @@ -177,7 +177,7 @@ public static GroupSpec createGroupSpec(
memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.ofNullable(member.instanceId()),
new TopicIds(member.subscribedTopicNames(), topicsImage),
new TopicIds(member.subscribedTopicNames(), topicResolver),
new Assignment(member.assignedPartitions())
));
}
Expand Down
Loading
Loading