Skip to content

[FLINK-37833] [table-runtime] Fix BatchExecExchange codegen for BINARY type by evaluating Unsafe.arrayBaseOffset(byte[].class) in TM rather than in JM #26592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

venkata91
Copy link
Contributor

@venkata91 venkata91 commented May 22, 2025

What is the purpose of the change

Fix BatchExecExchange codegen for BINARY / VARBINARY type by evaluating Unsafe.arrayBaseOffset(byte[].class) on the TaskManager (TM) side instead of the JobManager (JM) side.

Previously, CodegenUtils.hashCodeForType used a constant ($BYTE_ARRAY_BASE_OFFSET) derived on the JM, which may differ from the value on the TM—especially when JM and TM use different heap sizes (e.g., JM > 32G and TM < 32G). This mismatch can lead to incorrect memory access because Unsafe.arrayBaseOffset(byte[].class) returns 24 on large heap JVMs and 16 on smaller ones. As a result, tasks using MurmurHashUtil.hashUnsafeBytes to read byte[] could access invalid memory locations.

The issue is, if JM memory is set > 32G while TM memory is set < 32G, this causes JVM to treat the JAVA process > 32G as large heap JVM. This can impact Unsafe behavior. For eg: UNSAFE.arrayBaseOffset(byte[].class) will return 24 for large heap JVM while 16 for others.

Verifying this change

  • Manually verified the change by running a Flink-Batch job on YARN cluster with 1 JM and 1 TM with 2 tasks. Set JM memory > 32G while TM memory < 32G or alternatively set -XX:-UseCompressedOops JVM flag in JM and not in TM.
  • Still figuring out writing a ITCase to test this issue using the -XX:-UseCompressedOops JVM flag

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

…YTE_ARRAY_BASE_OFFSET in TM instead of JM

The issue is, if JM memory is set > 32G while TM memory is set < 32G,
this causes JVM to treat the JAVA process > 32G as large heap JVM. This
can impact Unsafe behavior. For eg: UNSAFE.arrayBaseOffset(byte[].class)
will return 24 for large heap JVM while 16 for others.

Due to this, the tasks that run on TM (<32 G while JM > 32G or vice
versa) that try to read the byte[] for
MurmurHash read wrong memory locations.
@flinkbot
Copy link
Collaborator

flinkbot commented May 22, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@venkata91 venkata91 changed the title Fix HashPartitioner codegen for BINARY/VARBINARY type by evaluating B… FLINK-37833: Fix BatchExecExchange codegen for BINARY type by evaluating Unsafe.arrayBaseOffset(byte[].class) in TM rather than in JM May 22, 2025
@venkata91 venkata91 marked this pull request as ready for review May 22, 2025 23:31
@venkata91 venkata91 marked this pull request as draft May 22, 2025 23:31
@bigdataliuchuang
Copy link

hello

@venkata91 venkata91 marked this pull request as ready for review May 23, 2025 21:00
@venkata91
Copy link
Contributor Author

cc @becketqin @JingsongLi @twalthr for review.

@venkata91 venkata91 changed the title FLINK-37833: Fix BatchExecExchange codegen for BINARY type by evaluating Unsafe.arrayBaseOffset(byte[].class) in TM rather than in JM [FLINK-37833] [table-runtime]: Fix BatchExecExchange codegen for BINARY type by evaluating Unsafe.arrayBaseOffset(byte[].class) in TM rather than in JM May 23, 2025
@venkata91 venkata91 changed the title [FLINK-37833] [table-runtime]: Fix BatchExecExchange codegen for BINARY type by evaluating Unsafe.arrayBaseOffset(byte[].class) in TM rather than in JM [FLINK-37833] [table-runtime] Fix BatchExecExchange codegen for BINARY type by evaluating Unsafe.arrayBaseOffset(byte[].class) in TM rather than in JM May 23, 2025
Copy link
Contributor

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch. It is a good catch.

I understand it might be a little tricky to do unit test. but we may be able to do integration test. We can do the following:

  1. start a session cluster with normal size JM/TM ( < 32GB )
  2. when launch the job on the command line, use an additional JVM arg of -XX:-UseCompressedOops to force a 24-bytes object header with JVM memory > 4 GB.

This way we should be able to test it.

@venkata91
Copy link
Contributor Author

Thanks for the patch. It is a good catch.

I understand it might be a little tricky to do unit test. but we may be able to do integration test. We can do the following:

  1. start a session cluster with normal size JM/TM ( < 32GB )
  2. when launch the job on the command line, use an additional JVM arg of -XX:-UseCompressedOops to force a 24-bytes object header with JVM memory > 4 GB.

This way we should be able to test it.

Thanks for reviewing the patch !

Yes, I'm working on e-2-e test with FlinkContainers. In this case, we need 2 different JVMs (JM and TM) one with UseCompressedOops and one without that. Still working on the e-2-e test, will post it once it is ready.

@venkata91
Copy link
Contributor Author

venkata91 commented May 27, 2025

Thanks for the patch. It is a good catch.
I understand it might be a little tricky to do unit test. but we may be able to do integration test. We can do the following:

  1. start a session cluster with normal size JM/TM ( < 32GB )
  2. when launch the job on the command line, use an additional JVM arg of -XX:-UseCompressedOops to force a 24-bytes object header with JVM memory > 4 GB.

This way we should be able to test it.

Thanks for reviewing the patch !

Yes, I'm working on e-2-e test with FlinkContainers. In this case, we need 2 different JVMs (JM and TM) one with UseCompressedOops and one without that. Still working on the e-2-e test, will post it once it is ready.

Last week, I was trying to reproduce the issue locally for Flink SQL with inner join and custom datasets, but it is not reproducible locally. Note: with remote debugging I could confirm the issue still exists - basically JM codegens HashPartitioner with arrayBaseOffset = 24 while on the TM side it is 16. What happens is the following:

  1. Bytes [16 ... 31] is where the key is present and length = 16
  2. Bytes [32 ... 39] is filled with JVM garbage which looks something like [1, 0, 0, 0, ... 0]
  3. Hash function reads bytes from [24 ... 39] where [24 ... 31] first 8 bytes are valid bytes of the key and the remaining 8 bytes [31 ... 39] is just JVM garbage.
  4. The same pattern exists for both the occurrences of the same key (in fact for all the keys).
  5. Due to the above, the hashcode computed is consistent b/w both the occurrences of the same key and gets shuffled to the appropriate task, even though the hashcode computation is wrong.

I'm still trying to construct a dataset where [32 ... 39] bytes are different for the same key. Note: even in our internal environment, this issue is reproducible only when nested fields projection pushdown is enabled. This optimization is implemented in our internal connector.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants