Skip to content

Commit

Permalink
RePause All Partitions After Rebalance if user paused any and request…
Browse files Browse the repository at this point in the history
…ed (#363)

* RePause All Partitions After Rebalance if user paused any and requested

* RePause All Partitions After Rebalance if user paused any and requested

* RePause All Partitions:fix pausedByUser and unit test

* RePause All Partitions:code review feedback

---------

Co-authored-by: jyang1 <[email protected]>
  • Loading branch information
Gin2022Null and jinyangsdk committed Oct 19, 2023
1 parent 1a44d5c commit f535b9c
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 33 deletions.
3 changes: 3 additions & 0 deletions src/docs/asciidoc/api-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,9 @@ When group management is used, assignment listeners are invoked whenever partiti
to the consumer after a rebalance operation. When manual assignment is used, assignment listeners
are invoked when the consumer is started. Assignment listeners can be used to seek to particular offsets
in the assigned partitions so that messages are consumed from the specified offset.
When a user pauses topics/partitions before rebalancing, the behavior depends on the value of
`pauseAllAfterRebalance`. If it is set to `false`, the paused topics/partitions will remain paused after the rebalance.
However, if it is set to `true`, all assigned topics/partitions will be paused after the rebalance.

When group management is used, revocation listeners are invoked whenever partitions are revoked
from a consumer after a rebalance operation. When manual assignment is used, revocation listeners
Expand Down
111 changes: 85 additions & 26 deletions src/main/java/reactor/kafka/receiver/ImmutableReceiverOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class ImmutableReceiverOptions<K, V> implements ReceiverOptions<K, V> {
private final Pattern subscribePattern;
private final Supplier<Scheduler> schedulerSupplier;
private final ConsumerListener consumerListener;
private final boolean pauseAllAfterRebalance;

ImmutableReceiverOptions() {
this(new HashMap<>());
Expand Down Expand Up @@ -105,6 +106,7 @@ class ImmutableReceiverOptions<K, V> implements ReceiverOptions<K, V> {
this.properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
schedulerSupplier = Schedulers::immediate;
consumerListener = null;
pauseAllAfterRebalance = false;
}

ImmutableReceiverOptions(
Expand All @@ -127,7 +129,8 @@ class ImmutableReceiverOptions<K, V> implements ReceiverOptions<K, V> {
Collection<TopicPartition> partitions,
Pattern pattern,
Supplier<Scheduler> supplier,
ConsumerListener consumerListener
ConsumerListener consumerListener,
boolean pauseAllAfterRebalance
) {
this.properties = new HashMap<>(properties);
this.assignListeners = new ArrayList<>(assignListeners);
Expand All @@ -149,6 +152,7 @@ class ImmutableReceiverOptions<K, V> implements ReceiverOptions<K, V> {
this.subscribePattern = pattern;
this.schedulerSupplier = supplier;
this.consumerListener = consumerListener;
this.pauseAllAfterRebalance = pauseAllAfterRebalance;
}

@Override
Expand Down Expand Up @@ -189,7 +193,8 @@ public ReceiverOptions<K, V> consumerProperty(String name, Object newValue) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -215,7 +220,8 @@ public ReceiverOptions<K, V> withKeyDeserializer(Deserializer<K> keyDeserializer
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -246,7 +252,8 @@ public ReceiverOptions<K, V> withValueDeserializer(Deserializer<V> valueDeserial
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -285,7 +292,8 @@ public ReceiverOptions<K, V> pollTimeout(Duration timeout) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -319,7 +327,8 @@ public ReceiverOptions<K, V> closeTimeout(Duration timeout) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -350,7 +359,8 @@ public ReceiverOptions<K, V> addAssignListener(Consumer<Collection<ReceiverParti
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -381,7 +391,8 @@ public ReceiverOptions<K, V> addRevokeListener(Consumer<Collection<ReceiverParti
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -407,7 +418,8 @@ public ReceiverOptions<K, V> clearAssignListeners() {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -433,7 +445,8 @@ public ReceiverOptions<K, V> clearRevokeListeners() {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -469,7 +482,8 @@ public ReceiverOptions<K, V> subscription(Collection<String> topics) {
null,
null,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -495,7 +509,8 @@ public ReceiverOptions<K, V> subscription(Pattern pattern) {
null,
Objects.requireNonNull(pattern),
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -521,7 +536,8 @@ public ReceiverOptions<K, V> assignment(Collection<TopicPartition> partitions) {
Objects.requireNonNull(partitions),
null,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -582,7 +598,8 @@ public ReceiverOptions<K, V> commitInterval(Duration commitInterval) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -616,7 +633,8 @@ public ReceiverOptions<K, V> commitBatchSize(int commitBatchSize) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -650,7 +668,8 @@ public ReceiverOptions<K, V> atmostOnceCommitAheadSize(int commitAheadSize) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -684,7 +703,8 @@ public ReceiverOptions<K, V> maxCommitAttempts(int maxAttempts) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -715,7 +735,8 @@ public ReceiverOptions<K, V> maxDeferredCommits(int maxDeferred) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -746,10 +767,43 @@ public ReceiverOptions<K, V> maxDelayRebalance(Duration maxDelay) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

@Override
public boolean pauseAllAfterRebalance() {
return this.pauseAllAfterRebalance;
}

@Override
public ReceiverOptions<K, V> pauseAllAfterRebalance(boolean pauseAll) {
return new ImmutableReceiverOptions<>(
properties,
assignListeners,
revokeListeners,
keyDeserializer,
valueDeserializer,
pollTimeout,
closeTimeout,
commitInterval,
commitBatchSize,
atmostOnceCommitAheadSize,
maxCommitAttempts,
commitRetryInterval,
maxDeferredCommits,
maxDelayRebalance,
commitIntervalDuringDelay,
subscribeTopics,
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener,
pauseAll
);
}

@Override
public long commitIntervalDuringDelay() {
return this.commitIntervalDuringDelay;
Expand All @@ -776,7 +830,8 @@ public ReceiverOptions<K, V> commitIntervalDuringDelay(long interval) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -807,7 +862,8 @@ public ReceiverOptions<K, V> schedulerSupplier(Supplier<Scheduler> schedulerSupp
assignTopicPartitions,
subscribePattern,
Objects.requireNonNull(schedulerSupplier),
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -833,7 +889,8 @@ public ReceiverOptions<K, V> commitRetryInterval(Duration commitRetryInterval) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -858,7 +915,8 @@ public ReceiverOptions<K, V> consumerListener(@Nullable ConsumerListener consume
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -912,8 +970,8 @@ public int hashCode() {
subscribeTopics,
assignTopicPartitions,
subscribePattern,
consumerListener
);
consumerListener) +
(this.pauseAllAfterRebalance ? 1 : 0);
}

@Override
Expand All @@ -940,7 +998,8 @@ public boolean equals(Object object) {
&& Objects.equals(subscribeTopics, that.subscribeTopics)
&& Objects.equals(assignTopicPartitions, that.assignTopicPartitions)
&& Objects.equals(subscribePattern, that.subscribePattern)
&& Objects.equals(consumerListener, that.consumerListener);
&& Objects.equals(consumerListener, that.consumerListener)
&& pauseAllAfterRebalance == that.pauseAllAfterRebalance;
}
return false;
}
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/reactor/kafka/receiver/ReceiverOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,16 @@ default ReceiverOptions<K, V> maxDelayRebalance(Duration maxDelay) {
return this;
}

/**
* When true, pause all partitions on assignment after rebalance,
* if any partitions were paused by User before the rebalance. Default false
* @param pauseAll
* @return
*/
default ReceiverOptions<K, V> pauseAllAfterRebalance(boolean pauseAll) {
return this;
}

/**
* Set how often to commit offsets, in milliseconds, while a rebalance is being
* delayed. Default 100ms.
Expand Down Expand Up @@ -461,6 +471,16 @@ default Duration maxDelayRebalance() {
return Duration.ofSeconds(60);
}

/**
* When true, pause all partitions on assignment after rebalance,
* if any partitions were paused by User before the rebalance.
* Default false
* @return
*/
default boolean pauseAllAfterRebalance() {
return false;
}

/**
* Get how often to commit offsets, in milliseconds, while a rebalance is being
* delayed. Default 100ms.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,22 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
if (!pausedByUser.isEmpty()) {
List<TopicPartition> toRepause = new ArrayList<>();
Iterator<TopicPartition> iterator = pausedByUser.iterator();
while (iterator.hasNext()) {
TopicPartition next = iterator.next();
if (partitions.contains(next)) {
toRepause.add(next);
} else {
iterator.remove();
if (receiverOptions.pauseAllAfterRebalance()) {
// pause all partitions if any partitions are paused by user before rebalance
log.debug("User requested re-pausing all assignments");
toRepause.addAll(partitions);
pausedByUser.clear();
pausedByUser.addAll(partitions);

} else {
Iterator<TopicPartition> iterator = pausedByUser.iterator();
while (iterator.hasNext()) {
TopicPartition next = iterator.next();
if (partitions.contains(next)) {
toRepause.add(next);
} else {
iterator.remove();
}
}
}
if (!repausedAll && !toRepause.isEmpty()) {
Expand Down
Loading

0 comments on commit f535b9c

Please sign in to comment.