Skip to content

Commit

Permalink
[fix][broker] Broker is failing to create non-durable sub if topic is…
Browse files Browse the repository at this point in the history
… fenced (apache#23579)
  • Loading branch information
rdhabalia authored Nov 11, 2024
1 parent 137df29 commit 7822dca
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3930,7 +3930,8 @@ private void checkManagedLedgerIsOpen() throws ManagedLedgerException {
}
}

synchronized void setFenced() {
@VisibleForTesting
public synchronized void setFenced() {
log.info("{} Moving to Fenced state", name);
STATE_UPDATER.set(this, State.Fenced);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,11 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
} else if (ex.getCause() instanceof BrokerServiceException.SubscriptionFencedException
&& isCompactionSubscription(subscriptionName)) {
log.warn("[{}] Failed to create compaction subscription: {}", topic, ex.getMessage());
} else if (ex.getCause() instanceof ManagedLedgerFencedException) {
// If the topic has been fenced, we cannot continue using it. We need to close and reopen
log.warn("[{}][{}] has been fenced. closing the topic {}", topic, subscriptionName,
ex.getMessage());
close();
} else {
log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4979,4 +4979,31 @@ private int compareMessageIds(MessageIdImpl messageId1, MessageIdImpl messageId2
return 0;
}
}

@Test
public void testFencedLedger() throws Exception {
log.info("-- Starting {} test --", methodName);

final String topic = "persistent://my-property/my-ns/fencedLedger";

@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();

@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(false).create();

final int numMessages = 5;
for (int i = 0; i < numMessages; i++) {
producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) * 100L).sendAsync();
}
producer.flush();

PersistentTopic pTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) pTopic.getManagedLedger();
ml.setFenced();

Reader<byte[]> reader = newPulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest)
.createAsync().get(5, TimeUnit.SECONDS);
assertNotNull(reader);
}
}

0 comments on commit 7822dca

Please sign in to comment.