Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ public interface StateTag<StateT extends State> extends Serializable {
/** An identifier for the state cell that this tag references. */
String getId();

/**
* Returns the full state identifier including the system/user prefix.
*
* <p>This is used to distinguish between system-defined and user-defined state tags and prevent
* collisions in state tables when tags have the same raw ID but different prefixes.
*/
default String getIdWithPrefix() {
StringBuilder sb = new StringBuilder();
try {
appendTo(sb);
} catch (IOException e) {
throw new RuntimeException("Failed to get prefixed ID", e);
}
return sb.toString();
}

/** The specification for the state stored in the referenced cell. */
StateSpec<StateT> getSpec();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ public class StateTags {
new Equivalence<StateTag>() {
@Override
protected boolean doEquivalent(StateTag a, StateTag b) {
return a.getId().equals(b.getId());
return a.getIdWithPrefix().equals(b.getIdWithPrefix());
}

@Override
protected int doHash(StateTag stateTag) {
return stateTag.getId().hashCode();
return stateTag.getIdWithPrefix().hashCode();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,52 @@ public void testCombiningValueWithContextEquality() {
StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1),
StateTags.convertToBagTagInternal((StateTag) barCoder1Max));
}

@Test
public void testSystemAndUserTagsWithSameIdDoNotCollide() {
StateTag<?> userTag = StateTags.value("collision", StringUtf8Coder.of());
StateTag<?> systemTag =
StateTags.makeSystemTagInternal(StateTags.value("collision", StringUtf8Coder.of()));

// Same raw ID, different prefixed IDs.
assertEquals(userTag.getId(), systemTag.getId());
assertNotEquals(userTag.getIdWithPrefix(), systemTag.getIdWithPrefix());

// Tags are equal by ID, but state tables use prefixed IDs to prevent collision.
assertEquals(userTag, systemTag);
}

@Test
public void testIdWithPrefixForUserTag() {
StateTag<?> userTag = StateTags.value("test", StringUtf8Coder.of());
String prefixedId = userTag.getIdWithPrefix();

assertEquals('u', prefixedId.charAt(0));
assertEquals("utest", prefixedId);
}

@Test
public void testIdWithPrefixForSystemTag() {
StateTag<?> systemTag =
StateTags.makeSystemTagInternal(StateTags.value("test", StringUtf8Coder.of()));
String prefixedId = systemTag.getIdWithPrefix();

assertEquals('s', prefixedId.charAt(0));
assertEquals("stest", prefixedId);
}

@Test
public void testIdEquivalenceWithPrefix() {
StateTag<?> userTag1 = StateTags.value("collision", StringUtf8Coder.of());
StateTag<?> userTag2 = StateTags.value("collision", StringUtf8Coder.of());
StateTag<?> systemTag =
StateTags.makeSystemTagInternal(StateTags.value("collision", StringUtf8Coder.of()));

// Same ID and prefix.
assertEquals(StateTags.ID_EQUIVALENCE.wrap(userTag1), StateTags.ID_EQUIVALENCE.wrap(userTag2));

// Same ID, different prefix.
assertNotEquals(
StateTags.ID_EQUIVALENCE.wrap(userTag1), StateTags.ID_EQUIVALENCE.wrap(systemTag));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,7 @@ abstract static class StateTableKey {
public abstract String getId();

public static StateTableKey create(StateNamespace namespace, StateTag<?> stateTag) {
// TODO(https://github.com/apache/beam/issues/36753): stateTag.getId() returns only the
// string tag without system/user prefix. This could cause a collision between system and
// user tag with the same id. Consider adding the prefix to state table key.
return new AutoValue_CachingStateTable_StateTableKey(namespace, stateTag.getId());
return new AutoValue_CachingStateTable_StateTableKey(namespace, stateTag.getIdWithPrefix());
}
}

Expand Down
Loading