Skip to content

Commit 4839989

Browse files
author
Attila Tóth
committed
remove(metadatareader): topic strategy support
This feature is planned to be put into a different PR.
1 parent 293f99e commit 4839989

15 files changed

+34
-913
lines changed

README.md

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -327,53 +327,6 @@ all of them.
327327
complete backlog is read at once.</td>
328328
</tr>
329329

330-
<tr>
331-
<td>
332-
`forwardStrategy`
333-
</td>
334-
<td>
335-
`simple`, `large-first` or `proportional`
336-
</td>
337-
<td>`simple`</td>
338-
<td>Streaming query</td>
339-
<td>If `maxEntriesPerTrigger` is set, this parameter controls
340-
which forwarding strategy is in use during the read of multiple
341-
topics.
342-
<li>
343-
`simple` just divides the allowed number of entries equally
344-
between all topics, regardless of their backlog size
345-
</li>
346-
<li>
347-
`large-first` will load the largest topic backlogs first,
348-
as the maximum number of allowed entries allows
349-
</li>
350-
<li>
351-
`proportional` will forward all topics proportional to the
352-
topic backlog/overall backlog ratio
353-
</li>
354-
</td>
355-
</tr>
356-
357-
<tr>
358-
<td>
359-
`ensureEntriesPerTopic`
360-
</td>
361-
<td>Number to forward each topic with during a micro-batch.</td>
362-
<td>0</td>
363-
<td>Streaming query</td>
364-
<td>If multiple topics are read, and the maximum number of
365-
entries is also specified, always forward all topics with the
366-
amount of entries specified here. Using this, users can ensure that topics
367-
with considerably smaller backlogs than others are also forwarded
368-
and read. Note that:
369-
<li>If this number is higher than the maximum allowed entries divided
370-
by the number of topics, then this value is taken into account, overriding
371-
the maximum number of entries per micro-batch.
372-
</li>
373-
<li>This parameter has an effect only for forwarding strategies
374-
`large-first` and `proportional`.</li>
375-
</td>
376-
</tr>
377330
<tr>
378331
<td>
379332
`allowDifferentTopicSchemas`
@@ -400,7 +353,6 @@ taken into account during operation.
400353
</td>
401354
</tr>
402355

403-
404356
</table>
405357

406358
#### Authentication

src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala

Lines changed: 21 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -261,30 +261,14 @@ private[pulsar] case class PulsarMetadataReader(
261261
}.toMap)
262262
}
263263

264-
265-
def forwardOffset(actualOffset: Map[String, MessageId],
266-
strategy: String,
267-
numberOfEntriesToForward: Long,
268-
ensureEntriesPerTopic: Long): SpecificPulsarOffset = {
264+
def fetchNextOffsetWithMaxEntries(actualOffset: Map[String, MessageId],
265+
numberOfEntries: Long): SpecificPulsarOffset = {
269266
getTopicPartitions()
270267

271268
// Collect internal stats for all topics
272269
val topicStats = topicPartitions.map( topic => {
273-
val internalStats = admin.topics().getInternalStats(topic)
274-
val topicActualMessageId = actualOffset.getOrElse(topic, MessageId.earliest)
275-
topic -> TopicState(internalStats,
276-
PulsarSourceUtils.getLedgerId(topicActualMessageId),
277-
PulsarSourceUtils.getEntryId(topicActualMessageId))
278-
} ).toMap
279-
280-
val forwarder = strategy match {
281-
case PulsarOptions.ProportionalForwardStrategy =>
282-
new ProportionalForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
283-
case PulsarOptions.LargeFirstForwardStrategy =>
284-
new LargeFirstForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
285-
case _ =>
286-
new LinearForwardStrategy(numberOfEntriesToForward)
287-
}
270+
topic -> admin.topics().getInternalStats(topic)
271+
} ).toMap.asJava
288272

289273
SpecificPulsarOffset(topicPartitions.map { topic =>
290274
topic -> PulsarSourceUtils.seekableLatestMid {
@@ -298,39 +282,41 @@ private[pulsar] case class PulsarMetadataReader(
298282
// Get the partition index
299283
val partitionIndex = PulsarSourceUtils.getPartitionIndex(topicActualMessageId)
300284
// Cache topic internal stats
301-
val internalStats = topicStats.get(topic).get.internalStat
285+
val internalStats = topicStats.get(topic)
302286
// Calculate the amount of messages we will pull in
303-
val numberOfEntriesPerTopic = forwarder.forward(topicStats)(topic)
304-
// Get a future message ID which corresponds
305-
// to the maximum number of messages
287+
val numberOfEntriesPerTopic = numberOfEntries / topics.size
288+
// Get a next message ID which respects
289+
// the maximum number of messages
306290
val (nextLedgerId, nextEntryId) = TopicInternalStatsUtils.forwardMessageId(
307291
internalStats,
308292
actualLedgerId,
309293
actualEntryId,
310294
numberOfEntriesPerTopic)
311-
// Build a message id
312-
val forwardedMessageId =
313-
DefaultImplementation.newMessageId(nextLedgerId, nextEntryId, partitionIndex)
295+
// Build the next message ID
296+
val nextMessageId =
297+
DefaultImplementation
298+
.getDefaultImplementation
299+
.newMessageId(nextLedgerId, nextEntryId, partitionIndex)
314300
// Log state
315-
val forwardedEntry = TopicInternalStatsUtils.numOfEntriesUntil(
301+
val entryCountUntilNextMessageId = TopicInternalStatsUtils.numOfEntriesUntil(
316302
internalStats, nextLedgerId, nextEntryId)
317303
val entryCount = internalStats.numberOfEntries
318-
val progress = f"${forwardedEntry.toFloat / entryCount.toFloat}%1.3f"
319-
val logMessage = s"Pulsar Connector forward on topic. " +
320-
s"[$numberOfEntriesPerTopic/$numberOfEntriesToForward]" +
304+
val progress = f"${entryCountUntilNextMessageId.toFloat / entryCount.toFloat}%1.3f"
305+
val logMessage = s"Pulsar Connector offset step forward. " +
306+
s"[$numberOfEntriesPerTopic/$numberOfEntries]" +
321307
s"${topic.reverse.take(30).reverse} $topicActualMessageId -> " +
322-
s"$forwardedMessageId ($forwardedEntry/$entryCount) [$progress]"
308+
s"$nextMessageId ($entryCountUntilNextMessageId/$entryCount) [$progress]"
323309
log.debug(logMessage)
324310
// Return the message ID
325-
forwardedMessageId
311+
nextMessageId
326312
} catch {
327313
case e: PulsarAdminException if e.getStatusCode == 404 =>
328314
MessageId.earliest
329315
case e: Throwable =>
330316
throw new RuntimeException(
331317
s"Failed to get forwarded messageId for ${TopicName.get(topic).toString} " +
332-
s"(tried to forward ${forwarder.forward(topicStats)(topic)} messages " +
333-
s"starting from `$topicActualMessageId` using strategy $strategy)", e)
318+
s"(tried to forward ${numberOfEntries} messages " +
319+
s"starting from `$topicActualMessageId`)", e)
334320
}
335321

336322
}

src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,6 @@ private[pulsar] object PulsarOptions {
3838
val TopicOptionKeys: Set[String] = Set(TopicSingle, TopicMulti, TopicPattern)
3939

4040
val MaxEntriesPerTrigger = "maxentriespertrigger"
41-
val EnsureEntriesPerTopic = "ensureentriespertopic"
42-
val ForwardStrategy = "forwardstrategy"
43-
val ProportionalForwardStrategy = "proportional"
44-
val LargeFirstForwardStrategy = "large-first"
4541

4642
val ServiceUrlOptionKey: String = "service.url"
4743
val AdminUrlOptionKey: String = "admin.url"

src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,7 @@ private[pulsar] class PulsarProvider
114114
failOnDataLoss(caseInsensitiveParams),
115115
subscriptionNamePrefix,
116116
jsonOptions,
117-
maxEntriesPerTrigger(caseInsensitiveParams),
118-
minEntriesPerTopic(caseInsensitiveParams),
119-
forwardStrategy(caseInsensitiveParams)
117+
maxEntriesPerTrigger(caseInsensitiveParams)
120118
)
121119
}
122120

@@ -402,12 +400,6 @@ private[pulsar] object PulsarProvider extends Logging {
402400
private def maxEntriesPerTrigger(caseInsensitiveParams: Map[String, String]): Long =
403401
caseInsensitiveParams.getOrElse(MaxEntriesPerTrigger, "-1").toLong
404402

405-
private def minEntriesPerTopic(caseInsensitiveParams: Map[String, String]): Long =
406-
caseInsensitiveParams.getOrElse(EnsureEntriesPerTopic, "0").toLong
407-
408-
private def forwardStrategy(caseInsensitiveParams: Map[String, String]): String =
409-
caseInsensitiveParams.getOrElse(ForwardStrategy, "simple")
410-
411403
private def validateGeneralOptions(
412404
caseInsensitiveParams: Map[String, String]): Map[String, String] = {
413405
if (!caseInsensitiveParams.contains(ServiceUrlOptionKey)) {

src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ private[pulsar] class PulsarSource(
3737
failOnDataLoss: Boolean,
3838
subscriptionNamePrefix: String,
3939
jsonOptions: JSONOptionsInRead,
40-
maxEntriesPerTrigger: Long,
41-
ensureEntriesPerTopic: Long,
42-
forwardStrategy: String)
40+
maxEntriesPerTrigger: Long)
4341
extends Source
4442
with Logging {
4543

@@ -68,11 +66,11 @@ private[pulsar] class PulsarSource(
6866
} else {
6967
currentTopicOffsets match {
7068
case Some(value) =>
71-
metadataReader.forwardOffset(value,
72-
forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
69+
metadataReader.fetchNextOffsetWithMaxEntries(value,
70+
maxEntriesPerTrigger)
7371
case _ =>
74-
metadataReader.forwardOffset(initialTopicOffsets.topicOffsets,
75-
forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
72+
metadataReader.fetchNextOffsetWithMaxEntries(initialTopicOffsets.topicOffsets,
73+
maxEntriesPerTrigger)
7674
}
7775
}
7876
logDebug(s"GetOffset: ${nextOffsets.topicOffsets.toSeq.map(_.toString).sorted}")

src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategy.scala

Lines changed: 0 additions & 96 deletions
This file was deleted.

src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategy.scala

Lines changed: 0 additions & 40 deletions
This file was deleted.

0 commit comments

Comments
 (0)