Skip to content

Commit 6f15faf

Browse files
committed
early return if readLimitLeft == 0
1 parent 355d08e commit 6f15faf

File tree

1 file changed

+6
-0
lines changed

1 file changed

+6
-0
lines changed

src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ private[pulsar] case class PulsarHelper(
6262
private var topics: Seq[String] = _
6363
private var topicPartitions: Seq[String] = _
6464

65+
// We can do this because pulsarAdmin will only be called if latestOffset is called
66+
// and there should be an exception thrown in PulsarProvider if maxBytes is set,
67+
// and maxBytes is not set
6568
private lazy val pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(adminUrl.get).build()
6669

6770
override def close(): Unit = {
@@ -273,6 +276,9 @@ private[pulsar] case class PulsarHelper(
273276
var messageId = startMessageId
274277
var readLimitLeft = readLimit
275278
ledgers.filter(_.entries != 0).sortBy(_.ledgerId).foreach { ledger =>
279+
if (readLimitLeft == 0) {
280+
return messageId
281+
}
276282
val avgBytesPerEntries = ledger.size / ledger.entries
277283
// approximation of bytes left in ledger to deal with case
278284
// where we are at the middle of the ledger

0 commit comments

Comments
 (0)