-
Notifications
You must be signed in to change notification settings - Fork 13.6k
[FLINK-37833] [table-runtime] Fix BatchExecExchange codegen for BINARY type by evaluating Unsafe.arrayBaseOffset(byte[].class) in TM rather than in JM #26592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…YTE_ARRAY_BASE_OFFSET in TM instead of JM The issue is, if JM memory is set > 32G while TM memory is set < 32G, this causes JVM to treat the JAVA process > 32G as large heap JVM. This can impact Unsafe behavior. For eg: UNSAFE.arrayBaseOffset(byte[].class) will return 24 for large heap JVM while 16 for others. Due to this, the tasks that run on TM (<32 G while JM > 32G or vice versa) that try to read the byte[] for MurmurHash read wrong memory locations.
hello |
cc @becketqin @JingsongLi @twalthr for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. It is a good catch.
I understand it might be a little tricky to do unit test. but we may be able to do integration test. We can do the following:
- start a session cluster with normal size JM/TM ( < 32GB )
- when launch the job on the command line, use an additional JVM arg of
-XX:-UseCompressedOops
to force a 24-bytes object header with JVM memory > 4 GB.
This way we should be able to test it.
Thanks for reviewing the patch ! Yes, I'm working on e-2-e test with |
Last week, I was trying to reproduce the issue locally for Flink SQL with inner join and custom datasets, but it is not reproducible locally. Note: with remote debugging I could confirm the issue still exists - basically JM codegens
I'm still trying to construct a dataset where [32 ... 39] bytes are different for the same key. Note: even in our internal environment, this issue is reproducible only when |
What is the purpose of the change
Fix BatchExecExchange codegen for
BINARY / VARBINARY
type by evaluatingUnsafe.arrayBaseOffset(byte[].class)
on the TaskManager (TM) side instead of the JobManager (JM) side.Previously, CodegenUtils.hashCodeForType used a constant ($BYTE_ARRAY_BASE_OFFSET) derived on the JM, which may differ from the value on the TM—especially when JM and TM use different heap sizes (e.g., JM > 32G and TM < 32G). This mismatch can lead to incorrect memory access because
Unsafe.arrayBaseOffset(byte[].class)
returns 24 on large heap JVMs and 16 on smaller ones. As a result, tasks using MurmurHashUtil.hashUnsafeBytes to readbyte[]
could access invalid memory locations.The issue is, if JM memory is set > 32G while TM memory is set < 32G, this causes JVM to treat the JAVA process > 32G as large heap JVM. This can impact Unsafe behavior. For eg: UNSAFE.arrayBaseOffset(byte[].class) will return 24 for large heap JVM while 16 for others.
Verifying this change
-XX:-UseCompressedOops
JVM flag in JM and not in TM.-XX:-UseCompressedOops
JVM flagDoes this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation