Skip to content

[Client] lookup() function in PrimaryKeyLookuper and PrefixKeyLookuper is not thread safety #1914

@platinumhamburg

Description

@platinumhamburg

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

0.7.0 (latest release)

Please describe the bug 🐞

Currently, both PrimaryKeyLookuper and PrefixKeyLookuper utilize KeyEncoder to encode primary keys and bucket keys. However, there is no guarantee that KeyEncoder is thread-safe, which makes the lookup() function itself not thread-safe, even though the lookup() function returns a CompletableFuture, indicating that it is an asynchronous function by its signature.

When user code invokes the lookup() function concurrently, some unexpected errors may occur as follows:
java.lang.ArrayIndexOutOfBoundsException: arraycopy: last destination index 3 out of bounds for byte[2] at java.base/java.lang.System.arraycopy(Native Method) at org.apache.fluss.row.compacted.CompactedRowWriter.toBytes(CompactedRowWriter.java:108) at org.apache.fluss.row.encode.CompactedKeyEncoder.encodeKey(CompactedKeyEncoder.java:84) at org.apache.fluss.client.lookup.PrimaryKeyLookuper.lookup(PrimaryKeyLookuper.java:107) at org.apache.fluss.flink.source.lookup.FlinkAsyncLookupFunction.fetchResult(FlinkAsyncLookupFunction.java:143) at org.apache.fluss.flink.source.lookup.FlinkAsyncLookupFunction.asyncLookup(FlinkAsyncLookupFunction.java:126) at org.apache.flink.table.runtime.functions.table.lookup.CachingAsyncLookupFunction.asyncLookup(CachingAsyncLookupFunction.java:92) at org.apache.flink.table.functions.AsyncLookupFunction.eval(AsyncLookupFunction.java:52)

Solution

Add synchronized block for PrimaryKeyLookuper and PrefixKeyLookuper's lookup() to ensure thread-safety

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions