Project: SPARK
Type: Bug
Component: Structured Streaming, Kafka
Affects Version: 4.1.1
Priority: Critical
KafkaMicroBatchStream.metrics() throws a NullPointerException when called before latestOffset() has populated the latestPartitionOffsets field.
This is a regression introduced in SPARK-54027 (PR #52729), which refactored the metrics method to wrap latestPartitionOffsets in Option. The original implementation (from SPARK-34854) passed the value directly and used an explicit null check, which was safe:
// Before SPARK-54027 (safe)
if (offset.nonEmpty && latestAvailablePartitionOffsets != null) {
After SPARK-54027, the instance method wraps the value in Some():
// KafkaMicroBatchStream.scala, instance metrics method
} else {
Some(latestPartitionOffsets) // creates Some(null) when field is uninitialized
}
And the companion object method uses .isDefined:
// KafkaMicroBatchStream.scala, companion object metrics method
if (offset.nonEmpty && latestAvailablePartitionOffsets.isDefined) {
...
val offsetsBehindLatest = latestAvailablePartitionOffsets.get // returns null
.map(...) // NullPointerException
Since latestPartitionOffsets is declared as private var latestPartitionOffsets: PartitionOffsetMap = _ (null), wrapping it in Some() before latestOffset() runs creates Some(null). The .isDefined check passes (Some(null).isDefined == true), then .get returns null, and calling .map() on null throws the NPE.
This crashes the streaming query during finishTrigger -> extractSourceProgress -> metrics, with no way to recover.
Stack trace:
Caused by: java.lang.NullPointerException: Cannot invoke "scala.collection.IterableOps.map(scala.Function1)" because the return value of "scala.Option.get()" is null
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream$.metrics(KafkaMicroBatchStream.scala:520)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.metrics(KafkaMicroBatchStream.scala:363)
at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$2(ProgressReporter.scala:384)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:496)
at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$1(ProgressReporter.scala:380)
at scala.collection.immutable.List.map(List.scala:236)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.extractSourceProgress(ProgressReporter.scala:379)
at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.constructNewProgress(ProgressReporter.scala:348)
at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:312)
at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:429)
at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:525)
Fix: change Some(latestPartitionOffsets) to Option(latestPartitionOffsets) in the instance metrics method, which converts null to None instead of Some(null).
Project: SPARK
Type: Bug
Component: Structured Streaming, Kafka
Affects Version: 4.1.1
Priority: Critical
KafkaMicroBatchStream.metrics() throws a NullPointerException when called before latestOffset() has populated the latestPartitionOffsets field.
This is a regression introduced in SPARK-54027 (PR #52729), which refactored the metrics method to wrap latestPartitionOffsets in Option. The original implementation (from SPARK-34854) passed the value directly and used an explicit null check, which was safe:
After SPARK-54027, the instance method wraps the value in Some():
And the companion object method uses .isDefined:
Since latestPartitionOffsets is declared as
private var latestPartitionOffsets: PartitionOffsetMap = _(null), wrapping it in Some() before latestOffset() runs creates Some(null). The .isDefined check passes (Some(null).isDefined == true), then .get returns null, and calling .map() on null throws the NPE.This crashes the streaming query during finishTrigger -> extractSourceProgress -> metrics, with no way to recover.
Stack trace:
Fix: change
Some(latestPartitionOffsets)toOption(latestPartitionOffsets)in the instance metrics method, which converts null to None instead of Some(null).