Skip to content

[FLINK-39248][runtime] Fix RowDataKeySerializerSnapshot restore#27766

Merged
rkhachatryan merged 1 commit intoapache:masterfrom
rkhachatryan:f39248
Mar 13, 2026
Merged

[FLINK-39248][runtime] Fix RowDataKeySerializerSnapshot restore#27766
rkhachatryan merged 1 commit intoapache:masterfrom
rkhachatryan:f39248

Conversation

@rkhachatryan
Copy link
Contributor

@rkhachatryan rkhachatryan commented Mar 13, 2026

What is the purpose of the change

Due to the use of input stream read instead of read fully, deserialization might fail with

java.lang.Exception: Exception while creating StreamOperatorStateContext.
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:360)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:299)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:120)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:884)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:836)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:836)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:787)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:1112)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:1072)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:887)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:683)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for SinkUpsertMaterializerV2_10f3bc77c24e457cd85e23da85a2136d_(1/1) from any of the 1 provided restore options.
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:165)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:489)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:220)
	... 12 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
	at org.apache.flink.state.rocksdb.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:511)
	at org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:566)
	at org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:100)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:478)
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
	... 14 more
Caused by: java.io.StreamCorruptedException: invalid type code: 00
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1793)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2618)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2469)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2284)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1762)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:540)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:498)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:472)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:467)
	at org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowDataKeySerializerSnapshot.restore(RowDataKeySerializerSnapshot.java:97)
	at org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowDataKeySerializerSnapshot.readSnapshot(RowDataKeySerializerSnapshot.java:71)
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:167)
	at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:136)
	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:170)
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:167)
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:162)
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:123)
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:62)
	at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:162)
	at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:176)
	at org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.readMetaData(RocksDBIncrementalRestoreOperation.java:962)
	at org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.readMetaData(RocksDBIncrementalRestoreOperation.java:945)
	at org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.restoreBaseDBFromLocalState(RocksDBIncrementalRestoreOperation.java:746)
	at org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(RocksDBIncrementalRestoreOperation.java:417)
	at org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:355)
	at org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.lambda$restore$1(RocksDBIncrementalRestoreOperation.java:270)
	at org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:931)
	at org.apache.flink.state.rocksdb.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:269)
	at org.apache.flink.state.rocksdb.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:412)
	... 19 more

Brief change log

(for example:)

  • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
  • Deployments RPC transmits only the blob storage reference
  • TaskManagers retrieve the TaskInfo from the blob cache

Verifying this change

  • added RowDataKeySerializerSnapshotTest
  • verified the fix manually

Does this pull request potentially affect one of the following parts:

no to all:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

no to all:

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 13, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@rkhachatryan rkhachatryan requested a review from 1996fanrui March 13, 2026 08:48
@rkhachatryan rkhachatryan marked this pull request as ready for review March 13, 2026 08:51
@rkhachatryan
Copy link
Contributor Author

CI failure is not related to this PR: FLINK-34447

@rkhachatryan rkhachatryan merged commit 360aa97 into apache:master Mar 13, 2026
@rkhachatryan rkhachatryan deleted the f39248 branch March 13, 2026 12:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants