Skip to content

NullPointerException in KafkaMicroBatchStream.metrics when latestPartitionOffsets is uninitialized #55236

@lnagel

Description

@lnagel

Project: SPARK
Type: Bug
Component: Structured Streaming, Kafka
Affects Version: 4.1.1
Priority: Critical

KafkaMicroBatchStream.metrics() throws a NullPointerException when called before latestOffset() has populated the latestPartitionOffsets field.

This is a regression introduced in SPARK-54027 (PR #52729), which refactored the metrics method to wrap latestPartitionOffsets in Option. The original implementation (from SPARK-34854) passed the value directly and used an explicit null check, which was safe:

// Before SPARK-54027 (safe)
if (offset.nonEmpty && latestAvailablePartitionOffsets != null) {

After SPARK-54027, the instance method wraps the value in Some():

// KafkaMicroBatchStream.scala, instance metrics method
} else {
  Some(latestPartitionOffsets)  // creates Some(null) when field is uninitialized
}

And the companion object method uses .isDefined:

// KafkaMicroBatchStream.scala, companion object metrics method
if (offset.nonEmpty && latestAvailablePartitionOffsets.isDefined) {
  ...
  val offsetsBehindLatest = latestAvailablePartitionOffsets.get  // returns null
    .map(...)  // NullPointerException

Since latestPartitionOffsets is declared as private var latestPartitionOffsets: PartitionOffsetMap = _ (null), wrapping it in Some() before latestOffset() runs creates Some(null). The .isDefined check passes (Some(null).isDefined == true), then .get returns null, and calling .map() on null throws the NPE.

This crashes the streaming query during finishTrigger -> extractSourceProgress -> metrics, with no way to recover.

Stack trace:

Caused by: java.lang.NullPointerException: Cannot invoke "scala.collection.IterableOps.map(scala.Function1)" because the return value of "scala.Option.get()" is null
        at org.apache.spark.sql.kafka010.KafkaMicroBatchStream$.metrics(KafkaMicroBatchStream.scala:520)
        at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.metrics(KafkaMicroBatchStream.scala:363)
        at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$2(ProgressReporter.scala:384)
        at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:496)
        at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$1(ProgressReporter.scala:380)
        at scala.collection.immutable.List.map(List.scala:236)
        at scala.collection.immutable.List.map(List.scala:79)
        at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.extractSourceProgress(ProgressReporter.scala:379)
        at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.constructNewProgress(ProgressReporter.scala:348)
        at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:312)
        at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:429)
        at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:525)

Fix: change Some(latestPartitionOffsets) to Option(latestPartitionOffsets) in the instance metrics method, which converts null to None instead of Some(null).

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions