From 4a73a9b5b40443b6691e39422f39c0d4e5f13389 Mon Sep 17 00:00:00 2001 From: ShashidharM0118 Date: Sun, 28 Dec 2025 17:14:23 +0530 Subject: [PATCH] fix: prevent state table collision by including system/user prefix --- .../apache/beam/runners/core/StateTag.java | 16 +++++++ .../apache/beam/runners/core/StateTags.java | 4 +- .../beam/runners/core/StateTagTest.java | 48 +++++++++++++++++++ .../windmill/state/CachingStateTable.java | 5 +- 4 files changed, 67 insertions(+), 6 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java index 0106f95ed748..e895e116303d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java @@ -55,6 +55,22 @@ public interface StateTag extends Serializable { /** An identifier for the state cell that this tag references. */ String getId(); + /** + * Returns the full state identifier including the system/user prefix. + * + *

This is used to distinguish between system-defined and user-defined state tags and prevent + * collisions in state tables when tags have the same raw ID but different prefixes. + */ + default String getIdWithPrefix() { + StringBuilder sb = new StringBuilder(); + try { + appendTo(sb); + } catch (IOException e) { + throw new RuntimeException("Failed to get prefixed ID", e); + } + return sb.toString(); + } + /** The specification for the state stored in the referenced cell. */ StateSpec getSpec(); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java index ba5478be6c77..4248a91de130 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java @@ -50,12 +50,12 @@ public class StateTags { new Equivalence() { @Override protected boolean doEquivalent(StateTag a, StateTag b) { - return a.getId().equals(b.getId()); + return a.getIdWithPrefix().equals(b.getIdWithPrefix()); } @Override protected int doHash(StateTag stateTag) { - return stateTag.getId().hashCode(); + return stateTag.getIdWithPrefix().hashCode(); } }; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java index 78f79dd5dbfa..4246ee5cc117 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java @@ -192,4 +192,52 @@ public void testCombiningValueWithContextEquality() { StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), StateTags.convertToBagTagInternal((StateTag) barCoder1Max)); } + + @Test + public void testSystemAndUserTagsWithSameIdDoNotCollide() { + StateTag userTag = StateTags.value("collision", StringUtf8Coder.of()); + StateTag systemTag = + StateTags.makeSystemTagInternal(StateTags.value("collision", StringUtf8Coder.of())); + + // Same raw ID, different prefixed IDs. + assertEquals(userTag.getId(), systemTag.getId()); + assertNotEquals(userTag.getIdWithPrefix(), systemTag.getIdWithPrefix()); + + // Tags are equal by ID, but state tables use prefixed IDs to prevent collision. + assertEquals(userTag, systemTag); + } + + @Test + public void testIdWithPrefixForUserTag() { + StateTag userTag = StateTags.value("test", StringUtf8Coder.of()); + String prefixedId = userTag.getIdWithPrefix(); + + assertEquals('u', prefixedId.charAt(0)); + assertEquals("utest", prefixedId); + } + + @Test + public void testIdWithPrefixForSystemTag() { + StateTag systemTag = + StateTags.makeSystemTagInternal(StateTags.value("test", StringUtf8Coder.of())); + String prefixedId = systemTag.getIdWithPrefix(); + + assertEquals('s', prefixedId.charAt(0)); + assertEquals("stest", prefixedId); + } + + @Test + public void testIdEquivalenceWithPrefix() { + StateTag userTag1 = StateTags.value("collision", StringUtf8Coder.of()); + StateTag userTag2 = StateTags.value("collision", StringUtf8Coder.of()); + StateTag systemTag = + StateTags.makeSystemTagInternal(StateTags.value("collision", StringUtf8Coder.of())); + + // Same ID and prefix. + assertEquals(StateTags.ID_EQUIVALENCE.wrap(userTag1), StateTags.ID_EQUIVALENCE.wrap(userTag2)); + + // Same ID, different prefix. + assertNotEquals( + StateTags.ID_EQUIVALENCE.wrap(userTag1), StateTags.ID_EQUIVALENCE.wrap(systemTag)); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java index 5144089f9ef6..09936ef78aa7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java @@ -274,10 +274,7 @@ abstract static class StateTableKey { public abstract String getId(); public static StateTableKey create(StateNamespace namespace, StateTag stateTag) { - // TODO(https://github.com/apache/beam/issues/36753): stateTag.getId() returns only the - // string tag without system/user prefix. This could cause a collision between system and - // user tag with the same id. Consider adding the prefix to state table key. - return new AutoValue_CachingStateTable_StateTableKey(namespace, stateTag.getId()); + return new AutoValue_CachingStateTable_StateTableKey(namespace, stateTag.getIdWithPrefix()); } }