Skip to content

Commit

Permalink
[HUDI-8704] Added state upgrade implementation to StreamReadMonitorin…
Browse files Browse the repository at this point in the history
…gFunction
  • Loading branch information
voonhous committed Dec 25, 2024
1 parent 450e34a commit 3f06f2c
Showing 1 changed file with 13 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metrics.FlinkStreamReadMetrics;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.state.upgrade.StateUpgradeHelper;
import org.apache.hudi.state.upgrade.StateUpgrader;
import org.apache.hudi.state.upgrade.StateVersion;
import org.apache.hudi.state.upgrade.source.StreamReadMonitoringStateUpgrader;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;

Expand Down Expand Up @@ -148,6 +152,10 @@ public void initializeState(FunctionInitializationContext context) throws Except
LOG.info("Restoring state for the class {} with table {} and base path {}.",
getClass().getSimpleName(), conf.getString(FlinkOptions.TABLE_NAME), path);

// Handle state upgrades
StateUpgrader<String> stateUpgrader = new StreamReadMonitoringStateUpgrader(metaClient, issuedInstant);
new StateUpgradeHelper<>(instantState, stateUpgrader, StateVersion.V2).upgradeState();

List<String> retrievedStates = new ArrayList<>();
for (String entry : this.instantState.get()) {
retrievedStates.add(entry);
Expand All @@ -156,27 +164,11 @@ public void initializeState(FunctionInitializationContext context) throws Except
ValidationUtils.checkArgument(retrievedStates.size() <= 2,
getClass().getSimpleName() + " retrieved invalid state.");

if (retrievedStates.size() == 1 && issuedInstant != null) {
// this is the case where we have both legacy and new state.
// the two should be mutually exclusive for the operator, thus we throw the exception.

throw new IllegalArgumentException(
"The " + getClass().getSimpleName() + " has already restored from a previous Flink version.");

} else if (retrievedStates.size() == 1) {
// for forward compatibility
this.issuedInstant = retrievedStates.get(0);
if (LOG.isDebugEnabled()) {
LOG.debug("{} retrieved an issued instant of time {} for table {} with path {}.",
getClass().getSimpleName(), issuedInstant, conf.get(FlinkOptions.TABLE_NAME), path);
}
} else if (retrievedStates.size() == 2) {
this.issuedInstant = retrievedStates.get(0);
this.issuedOffset = retrievedStates.get(1);
if (LOG.isDebugEnabled()) {
LOG.debug("{} retrieved an issued instant of time [{}, {}] for table {} with path {}.",
getClass().getSimpleName(), issuedInstant, issuedOffset, conf.get(FlinkOptions.TABLE_NAME), path);
}
this.issuedInstant = retrievedStates.get(0);
this.issuedOffset = retrievedStates.get(1);
if (LOG.isDebugEnabled()) {
LOG.debug("{} retrieved an issued instant of time [{}, {}] for table {} with path {}.",
getClass().getSimpleName(), issuedInstant, issuedOffset, conf.get(FlinkOptions.TABLE_NAME), path);
}
}
}
Expand Down

0 comments on commit 3f06f2c

Please sign in to comment.