Skip to content

Commit

Permalink
[fix][client] The partitionedProducer maxPendingMessages always is 0 (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd authored Nov 13, 2024
1 parent 04c80f1 commit 0f934f2
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
Expand Down Expand Up @@ -1340,6 +1341,49 @@ public void testProducerQueueFullBlocking() throws Exception {
setup();
}

@Test
public void testProducerQueueFullBlockingWithPartitionedTopic() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic-xyzx2";
admin.topics().createPartitionedTopic(topicName, 2);

@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();

// 1. Producer connect
PartitionedProducerImpl<byte[]> producer = (PartitionedProducerImpl<byte[]>) client.newProducer()
.topic(topicName)
.maxPendingMessages(1)
.blockIfQueueFull(true)
.sendTimeout(1, TimeUnit.SECONDS)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

// 2. Stop broker
cleanup();

// 2. producer publish messages
long startTime = System.nanoTime();
producer.sendAsync("msg".getBytes());

// Verify thread was not blocked
long delayNs = System.nanoTime() - startTime;
assertTrue(delayNs < TimeUnit.SECONDS.toNanos(1));

// Next send operation must block, until all the messages in the queue expire
startTime = System.nanoTime();
producer.sendAsync("msg".getBytes());
delayNs = System.nanoTime() - startTime;
assertTrue(delayNs > TimeUnit.MILLISECONDS.toNanos(500));
assertTrue(delayNs < TimeUnit.MILLISECONDS.toNanos(1500));

// 4. producer disconnect
producer.close();

// 5. Restart broker
setup();
}

@Test
public void testProducerQueueFullNonBlocking() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic-xyzx";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES;
import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.netty.util.Timeout;
Expand Down Expand Up @@ -84,9 +86,16 @@ public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerCo
: null;

// MaxPendingMessagesAcrossPartitions doesn't support partial partition such as SinglePartition correctly
int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
conf.setMaxPendingMessages(maxPendingMessages);
int maxPendingMessages = conf.getMaxPendingMessages();
int maxPendingMessagesAcrossPartitions = conf.getMaxPendingMessagesAcrossPartitions();
if (maxPendingMessagesAcrossPartitions != DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS) {
int maxPendingMsgsForOnePartition = maxPendingMessagesAcrossPartitions / numPartitions;
maxPendingMessages = (maxPendingMessages == DEFAULT_MAX_PENDING_MESSAGES)
? maxPendingMsgsForOnePartition
: Math.min(maxPendingMessages, maxPendingMsgsForOnePartition);
conf.setMaxPendingMessages(maxPendingMessages);
}


final List<Integer> indexList;
if (conf.isLazyStartPartitionedProducers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,44 @@ public void testGetNumOfPartitions() throws Exception {
assertEquals(producerImpl.getNumOfPartitions(), 0);
}

@Test
public void testMaxPendingQueueSize() throws Exception {
String topicName = "test-max-pending-queue-size";
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
conf.setStatsIntervalSeconds(100);

ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
@Cleanup("shutdownGracefully")
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);

@Cleanup
PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup);

// Test set maxPendingMessage to 10
ProducerConfigurationData producerConfData = new ProducerConfigurationData();
producerConfData.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
producerConfData.setCustomMessageRouter(new CustomMessageRouter());
producerConfData.setMaxPendingMessages(10);
PartitionedProducerImpl partitionedProducerImpl = new PartitionedProducerImpl(
clientImpl, topicName, producerConfData, 1, null, null, null);
assertEquals(partitionedProducerImpl.getConfiguration().getMaxPendingMessages(), 10);

// Test set MaxPendingMessagesAcrossPartitions=5
producerConfData.setMaxPendingMessages(ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES);
producerConfData.setMaxPendingMessagesAcrossPartitions(5);
partitionedProducerImpl = new PartitionedProducerImpl(
clientImpl, topicName, producerConfData, 1, null, null, null);
assertEquals(partitionedProducerImpl.getConfiguration().getMaxPendingMessages(), 5);

// Test set maxPendingMessage=10 and MaxPendingMessagesAcrossPartitions=10 with 2 partitions
producerConfData.setMaxPendingMessages(10);
producerConfData.setMaxPendingMessagesAcrossPartitions(10);
partitionedProducerImpl = new PartitionedProducerImpl(
clientImpl, topicName, producerConfData, 2, null, null, null);
assertEquals(partitionedProducerImpl.getConfiguration().getMaxPendingMessages(), 5);
}


@Test
public void testOnTopicsExtended() throws Exception {
Expand Down

0 comments on commit 0f934f2

Please sign in to comment.