Skip to content

Conversation

jiateoh
Copy link
Contributor

@jiateoh jiateoh commented Oct 8, 2025

What changes were proposed in this pull request?

Fix the TransformWithState StateServer's parseProtoMessage method to fully read the desired message using the correct readFully DataInputStream API rather than read (InputStream/FilterInputStream) which only reads all available data and may not return the full message. readFully (DataInputStream) will continue fetching until it fills up the provided buffer.

In addition to the linked API above, this StackOverflow post also illustrates the difference between the two APIs: https://stackoverflow.com/a/25900095

Why are the changes needed?

For large state values used in the TransformWithState API, inputStream.read is not guaranteed to read messageLen's bytes of data as per the InputStream API. For large values, read will return prematurely and the messageBytes will only be partially filled, yielding an incorrect and likely unparseable proto message.

This is not a common scenario, as testing also indicated that the actual proto messages had to be somewhat large to consistently trigger this error. The test case I added uses 512KB strings in the state value updates.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added a new test case using 512KB strings:

  • Value state update
  • List state update with 3 (different) values (note: list state provides a multi-value update API, so this message is even larger than the other two)
  • Map state update with single key/value
build/sbt -Phive -Phive-thriftserver -DskipTests package
python/run-tests --testnames 'pyspark.sql.tests.pandas.test_pandas_transform_with_state TransformWithStateInPandasTests'
python/run-tests --testnames 'pyspark.sql.tests.pandas.test_pandas_transform_with_state TransformWithStateInPySparkTests'

The configured data size (512KB) triggers an incomplete read, while also completing in a reasonable time (within 30s on my laptop). I had separately tested a larger input size of 4MB which took 30min which I considered too expensive to include in the test.
Below is sample/testing results from using read only (i.e., no fix) and adding a check on message length vs read bytes (test code is included in this commit but reverted later for the PR). The check is no longer required after the readFully fix as that is handled within the provided API.

    TransformWithStateInPandasTests
        pyspark.errors.exceptions.base.PySparkRuntimeError: Error updating map state value: TESTING: Failed to read message bytes: expected 524369 bytes, but only read 261312 bytes

    TransformWithStateInPySparkTests
        pyspark.errors.exceptions.base.PySparkRuntimeError: Error updating value state: TESTING: Failed to read message bytes: expected 524336 bytes, but only read 392012 bytes

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-sonnet-4-5-20250929)

@jiateoh jiateoh force-pushed the tws_readFully_fix branch 3 times, most recently from fe1a90f to 8806a06 Compare October 10, 2025 18:31
@jiateoh jiateoh changed the title [WIP][SPARK-XXXXX][PS][SS] Fix partial read bug for large state values in TransformWithStateInPySparkStateServer [SPARK-53870][PYTHON][SS] Fix partial read bug for large state values in TransformWithStateInPySparkStateServer Oct 10, 2025
@jiateoh jiateoh changed the title [SPARK-53870][PYTHON][SS] Fix partial read bug for large state values in TransformWithStateInPySparkStateServer [SPARK-53870][PYTHON][SS] Fix partial read bug for large proto messages in TransformWithStateInPySparkStateServer Oct 10, 2025
@jiateoh jiateoh changed the title [SPARK-53870][PYTHON][SS] Fix partial read bug for large proto messages in TransformWithStateInPySparkStateServer [WIP][SPARK-53870][PYTHON][SS] Fix partial read bug for large proto messages in TransformWithStateInPySparkStateServer Oct 10, 2025
@jiateoh jiateoh force-pushed the tws_readFully_fix branch 2 times, most recently from 5a53558 to ff81ddd Compare October 11, 2025 00:15
jiateoh and others added 5 commits October 10, 2025 17:54
…ateServer

Use readFully() instead of read() to ensure the entire protobuf message
is read from the input stream. The read() method may only read partial
results (experimentally 8KB), which can cause failures when processing
large state values.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
This reverts commit 3d8f9ab.
@jiateoh jiateoh changed the title [WIP][SPARK-53870][PYTHON][SS] Fix partial read bug for large proto messages in TransformWithStateInPySparkStateServer [SPARK-53870][PYTHON][SS] Fix partial read bug for large proto messages in TransformWithStateInPySparkStateServer Oct 11, 2025
@jiateoh jiateoh marked this pull request as ready for review October 11, 2025 00:57
@jiateoh
Copy link
Contributor Author

jiateoh commented Oct 11, 2025

@HeartSaVioR can you help take a look? Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant