-
Notifications
You must be signed in to change notification settings - Fork 426
Description
Search before asking
- I searched in the issues and found nothing similar.
Fluss version
0.7.0 (latest release)
Please describe the bug 🐞
Currently, both PrimaryKeyLookuper and PrefixKeyLookuper utilize KeyEncoder to encode primary keys and bucket keys. However, there is no guarantee that KeyEncoder is thread-safe, which makes the lookup() function itself not thread-safe, even though the lookup() function returns a CompletableFuture, indicating that it is an asynchronous function by its signature.
When user code invokes the lookup() function concurrently, some unexpected errors may occur as follows:
java.lang.ArrayIndexOutOfBoundsException: arraycopy: last destination index 3 out of bounds for byte[2] at java.base/java.lang.System.arraycopy(Native Method) at org.apache.fluss.row.compacted.CompactedRowWriter.toBytes(CompactedRowWriter.java:108) at org.apache.fluss.row.encode.CompactedKeyEncoder.encodeKey(CompactedKeyEncoder.java:84) at org.apache.fluss.client.lookup.PrimaryKeyLookuper.lookup(PrimaryKeyLookuper.java:107) at org.apache.fluss.flink.source.lookup.FlinkAsyncLookupFunction.fetchResult(FlinkAsyncLookupFunction.java:143) at org.apache.fluss.flink.source.lookup.FlinkAsyncLookupFunction.asyncLookup(FlinkAsyncLookupFunction.java:126) at org.apache.flink.table.runtime.functions.table.lookup.CachingAsyncLookupFunction.asyncLookup(CachingAsyncLookupFunction.java:92) at org.apache.flink.table.functions.AsyncLookupFunction.eval(AsyncLookupFunction.java:52)
Solution
Add synchronized block for PrimaryKeyLookuper and PrefixKeyLookuper's lookup() to ensure thread-safety
Are you willing to submit a PR?
- I'm willing to submit a PR!