Skip to content

[SPARK-57438][SS] Fix NullPointerException in Kafka source metrics when latest partition offsets are unavailable#56526

Open
yadavay-amzn wants to merge 1 commit into
apache:masterfrom
yadavay-amzn:fix/SPARK-57438-kafka-metrics-npe
Open

[SPARK-57438][SS] Fix NullPointerException in Kafka source metrics when latest partition offsets are unavailable#56526
yadavay-amzn wants to merge 1 commit into
apache:masterfrom
yadavay-amzn:fix/SPARK-57438-kafka-metrics-npe

Conversation

@yadavay-amzn

@yadavay-amzn yadavay-amzn commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Fix a NullPointerException in the Kafka micro-batch source when reporting custom metrics before the latest partition offsets are known.

Two changes in KafkaMicroBatchStream.scala:

  1. Call site (root cause): in the non-real-time-mode branch of the instance metrics(), Some(latestPartitionOffsets) is changed to Option(latestPartitionOffsets). latestPartitionOffsets is a var initialized to null and only populated by latestOffset(); if metrics() runs first, the old code produced Some(null).
  2. Defense-in-depth: the companion KafkaMicroBatchStream.metrics(...) (a package-visible method called directly in tests) now collapses Some(null) to None via flatMap(Option(_)) before checking isDefined/get, so the public method cannot NPE on a degenerate input.

Why are the changes needed?

latestPartitionOffsets is private var ... = _ (null) until latestOffset() is invoked during batch planning. metrics() can be called earlier (e.g. progress reporting before the first batch completes), so the non-RTM branch wrapped null as Some(null). The companion metrics then evaluated latestAvailablePartitionOffsets.isDefined as true, called .get (returning null), and invoked .map on it:

java.lang.NullPointerException: Cannot invoke "scala.collection.IterableOps.map(scala.Function1)"
  because the return value of "scala.Option.get()" is null
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream$.metrics(KafkaMicroBatchStream.scala:520)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.metrics(KafkaMicroBatchStream.scala:363)
    ...

This was introduced by #52729 (SPARK-54027, Kafka RTM support). With Option(latestPartitionOffsets), a null becomes None and the metrics computation is correctly skipped (empty map).

Does this PR introduce any user-facing change?

No behavioral change for well-formed inputs. It only prevents the streaming query from crashing when metrics are requested before latest partition offsets are available; in that case an empty metrics map is returned (as intended).

How was this patch tested?

Added a test in KafkaMicroBatchSourceSuite that calls the companion metrics(...) with Some(null) for the latest available partition offsets and asserts an empty map is returned. Verified it fails (NPE at KafkaMicroBatchStream.scala:520) without the fix and passes with it; existing custom-metrics tests still pass.

Credit

Thanks to Thomas Newton, who reported this issue (SPARK-57438), diagnosed the root cause (identifying #52729 and the Some(null) at the non-RTM branch), and proposed the Some(...)Option(...) call-site patch that this PR is based on.

Was this patch authored or co-authored using generative AI tooling?

No.

@Tom-Newton Tom-Newton left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the speedy fix

}
} else {
Some(latestPartitionOffsets)
Option(latestPartitionOffsets)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to write a test to assert that this prevents the Some(null)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants