diff --git a/temporal-sdk/src/main/java/io/temporal/common/InitialVersioningBehavior.java b/temporal-sdk/src/main/java/io/temporal/common/InitialVersioningBehavior.java new file mode 100644 index 000000000..70a2a975d --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/InitialVersioningBehavior.java @@ -0,0 +1,15 @@ +package io.temporal.common; + +/** + * Specifies the versioning behavior for the first task of a new workflow run started via + * continue-as-new. + */ +@Experimental +public enum InitialVersioningBehavior { + /** + * Start the new run with {@link VersioningBehavior#AUTO_UPGRADE} behavior for the first task, + * upgrading to the latest version. After the first workflow task completes, the workflow uses + * whatever versioning behavior is specified in the workflow code. + */ + AUTO_UPGRADE +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/SuggestContinueAsNewReason.java b/temporal-sdk/src/main/java/io/temporal/common/SuggestContinueAsNewReason.java new file mode 100644 index 000000000..3b1709a20 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/SuggestContinueAsNewReason.java @@ -0,0 +1,15 @@ +package io.temporal.common; + +/** + * Reason(s) why the server suggests a workflow should continue-as-new. Multiple reasons can be true + * at the same time. + */ +@Experimental +public enum SuggestContinueAsNewReason { + /** Workflow history size is getting too large. */ + HISTORY_SIZE_TOO_LARGE, + /** Workflow history has too many events. */ + TOO_MANY_HISTORY_EVENTS, + /** Workflow's count of completed plus in-flight updates is too large. */ + TOO_MANY_UPDATES +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java index 19a488e77..982737dbe 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java @@ -8,12 +8,14 @@ import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse; import io.temporal.common.RetryOptions; +import io.temporal.common.SuggestContinueAsNewReason; import io.temporal.internal.common.SdkFlag; import io.temporal.internal.statemachines.*; import io.temporal.workflow.Functions; import io.temporal.workflow.Functions.Func; import io.temporal.workflow.Functions.Func1; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; @@ -357,6 +359,17 @@ Integer getVersion( */ boolean isContinueAsNewSuggested(); + /** + * @return the reasons why continue-as-new is suggested, or an empty list if not suggested. This + * value changes during the lifetime of a Workflow Execution. + */ + List getSuggestContinueAsNewReasons(); + + /** + * @return true if the target worker deployment version has changed for this workflow. + */ + boolean isTargetWorkerDeploymentVersionChanged(); + /** * @return true if cancellation of the workflow is requested. */ diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java index dd1844a31..cf704544b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java @@ -10,6 +10,7 @@ import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes; import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.common.RetryOptions; +import io.temporal.common.SuggestContinueAsNewReason; import io.temporal.failure.CanceledFailure; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.SdkFlag; @@ -416,6 +417,16 @@ public boolean isContinueAsNewSuggested() { return workflowStateMachines.isContinueAsNewSuggested(); } + @Override + public List getSuggestContinueAsNewReasons() { + return workflowStateMachines.getSuggestContinueAsNewReasons(); + } + + @Override + public boolean isTargetWorkerDeploymentVersionChanged() { + return workflowStateMachines.isTargetWorkerDeploymentVersionChanged(); + } + /* * MUTABLE STATE OPERATIONS */ diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index 2f2c716f2..a39b7920b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -20,6 +20,7 @@ import io.temporal.api.protocol.v1.Message; import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.api.workflowservice.v1.GetSystemInfoResponse; +import io.temporal.common.SuggestContinueAsNewReason; import io.temporal.failure.CanceledFailure; import io.temporal.internal.common.*; import io.temporal.internal.history.LocalActivityMarkerUtils; @@ -88,6 +89,10 @@ enum HandleEventStatus { private boolean isContinueAsNewSuggested; + private List suggestContinueAsNewReasons = Collections.emptyList(); + + private boolean isTargetWorkerDeploymentVersionChanged; + /** * EventId of the last event seen by these state machines. Events earlier than this one will be * discarded. @@ -276,6 +281,14 @@ public boolean isContinueAsNewSuggested() { return isContinueAsNewSuggested; } + public List getSuggestContinueAsNewReasons() { + return suggestContinueAsNewReasons; + } + + public boolean isTargetWorkerDeploymentVersionChanged() { + return isTargetWorkerDeploymentVersionChanged; + } + public void setReplaying(boolean replaying) { this.replaying = replaying; } @@ -1493,7 +1506,9 @@ public void workflowTaskStarted( long currentTimeMillis, boolean nonProcessedWorkflowTask, long historySize, - boolean isContinueAsNewSuggested) { + boolean isContinueAsNewSuggested, + List suggestContinueAsNewReasons, + boolean isTargetWorkerDeploymentVersionChanged) { setCurrentTimeMillis(currentTimeMillis); for (CancellableCommand cancellableCommand : commands) { cancellableCommand.handleWorkflowTaskStarted(); @@ -1509,6 +1524,9 @@ public void workflowTaskStarted( WorkflowStateMachines.this.lastWFTStartedEventId = startedEventId; WorkflowStateMachines.this.historySize = historySize; WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested; + WorkflowStateMachines.this.suggestContinueAsNewReasons = suggestContinueAsNewReasons; + WorkflowStateMachines.this.isTargetWorkerDeploymentVersionChanged = + isTargetWorkerDeploymentVersionChanged; eventLoop(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowTaskStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowTaskStateMachine.java index f7f1ae88c..6be5e2432 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowTaskStateMachine.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowTaskStateMachine.java @@ -4,6 +4,10 @@ import io.temporal.api.enums.v1.EventType; import io.temporal.api.enums.v1.WorkflowTaskFailedCause; import io.temporal.api.history.v1.WorkflowTaskFailedEventAttributes; +import io.temporal.common.SuggestContinueAsNewReason; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Objects; final class WorkflowTaskStateMachine @@ -32,7 +36,9 @@ void workflowTaskStarted( long currentTimeMillis, boolean nonProcessedWorkflowTask, long historySize, - boolean isContinueAsNewSuggested); + boolean isContinueAsNewSuggested, + List suggestContinueAsNewReasons, + boolean isTargetWorkerDeploymentVersionChanged); void updateRunId(String currentRunId); } @@ -46,6 +52,8 @@ void workflowTaskStarted( private long startedEventId; private long historySize; private boolean isContinueAsNewSuggested; + private List suggestContinueAsNewReasons = Collections.emptyList(); + private boolean isTargetWorkerDeploymentVersionChanged; public static WorkflowTaskStateMachine newInstance( long workflowTaskStartedEventId, Listener listener) { @@ -103,6 +111,15 @@ private void handleStarted() { historySize = currentEvent.getWorkflowTaskStartedEventAttributes().getHistorySizeBytes(); isContinueAsNewSuggested = currentEvent.getWorkflowTaskStartedEventAttributes().getSuggestContinueAsNew(); + suggestContinueAsNewReasons = + convertSuggestContinueAsNewReasons( + currentEvent + .getWorkflowTaskStartedEventAttributes() + .getSuggestContinueAsNewReasonsList()); + isTargetWorkerDeploymentVersionChanged = + currentEvent + .getWorkflowTaskStartedEventAttributes() + .getTargetWorkerDeploymentVersionChanged(); // The last started event in the history. So no completed is expected. if (currentEvent.getEventId() >= workflowTaskStartedEventId && !hasNextEvent) { @@ -121,7 +138,33 @@ private void handleCompleted() { eventTimeOfTheLastWorkflowStartTask, lastTaskInHistory, historySize, - isContinueAsNewSuggested); + isContinueAsNewSuggested, + suggestContinueAsNewReasons, + isTargetWorkerDeploymentVersionChanged); + } + + private static List convertSuggestContinueAsNewReasons( + List protoReasons) { + if (protoReasons.isEmpty()) { + return Collections.emptyList(); + } + List reasons = new ArrayList<>(protoReasons.size()); + for (io.temporal.api.enums.v1.SuggestContinueAsNewReason proto : protoReasons) { + switch (proto) { + case SUGGEST_CONTINUE_AS_NEW_REASON_HISTORY_SIZE_TOO_LARGE: + reasons.add(SuggestContinueAsNewReason.HISTORY_SIZE_TOO_LARGE); + break; + case SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_HISTORY_EVENTS: + reasons.add(SuggestContinueAsNewReason.TOO_MANY_HISTORY_EVENTS); + break; + case SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_UPDATES: + reasons.add(SuggestContinueAsNewReason.TOO_MANY_UPDATES); + break; + default: + break; + } + } + return Collections.unmodifiableList(reasons); } private void handleFailed() { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 7a956dc4b..1c90397e0 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -1415,6 +1415,15 @@ public void continueAsNew(ContinueAsNewInput input) { .determineUseCompatibleFlag( replayContext.getTaskQueue().equals(options.getTaskQueue()))); } + if (options.getInitialVersioningBehavior() != null) { + switch (options.getInitialVersioningBehavior()) { + case AUTO_UPGRADE: + attributes.setInitialVersioningBehavior( + io.temporal.api.enums.v1.ContinueAsNewVersioningBehavior + .CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE); + break; + } + } } if (options == null && replayContext.getRetryOptions() != null) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java index 469cd6a13..b30f13d45 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java @@ -4,10 +4,12 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.common.Priority; import io.temporal.common.RetryOptions; +import io.temporal.common.SuggestContinueAsNewReason; import io.temporal.internal.common.ProtoConverters; import io.temporal.internal.replay.ReplayWorkflowContext; import io.temporal.workflow.WorkflowInfo; import java.time.Duration; +import java.util.List; import java.util.Optional; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -147,6 +149,16 @@ public boolean isContinueAsNewSuggested() { return context.isContinueAsNewSuggested(); } + @Override + public List getSuggestContinueAsNewReasons() { + return context.getSuggestContinueAsNewReasons(); + } + + @Override + public boolean isTargetWorkerDeploymentVersionChanged() { + return context.isTargetWorkerDeploymentVersionChanged(); + } + @Override public Optional getCurrentBuildId() { return context.getCurrentBuildId(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java index e179e12db..0c9238e64 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java @@ -282,6 +282,7 @@ public Throwable wrapFailure(NexusTask task, Throwable failure) { "Failure processing nexus response: " + response.getRequest().toString(), failure); } + @SuppressWarnings("deprecation") // Uses hasOperationError()/getOperationError() for compat private void handleNexusTask(NexusTask task, Scope metricsScope) { PollNexusTaskQueueResponseOrBuilder pollResponse = task.getResponse(); ByteString taskToken = pollResponse.getTaskToken(); @@ -374,6 +375,8 @@ private void logExceptionDuringResultReporting( } } + @SuppressWarnings( + "deprecation") // Uses setOperationError() for backward compat with old servers private Response getResponseForOldServer(Response response) { Response.Builder b = response.toBuilder(); Failure failure = response.getStartOperation().getFailure(); diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java b/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java index d30850045..ab9e2b7c5 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java @@ -45,6 +45,8 @@ public static final class Builder { @SuppressWarnings("deprecation") private VersioningIntent versioningIntent; + private InitialVersioningBehavior initialVersioningBehavior; + private Builder() {} private Builder(ContinueAsNewOptions options) { @@ -60,6 +62,7 @@ private Builder(ContinueAsNewOptions options) { this.typedSearchAttributes = options.getTypedSearchAttributes(); this.contextPropagators = options.getContextPropagators(); this.versioningIntent = options.versioningIntent; + this.initialVersioningBehavior = options.initialVersioningBehavior; } public Builder setWorkflowRunTimeout(Duration workflowRunTimeout) { @@ -131,6 +134,18 @@ public Builder setVersioningIntent(VersioningIntent versioningIntent) { return this; } + /** + * Specifies the versioning behavior for the first task of the new workflow run. For example, + * set to AUTO_UPGRADE to upgrade to the latest version on continue-as-new instead of inheriting + * the pinned version from the previous run. + */ + @Experimental + public Builder setInitialVersioningBehavior( + InitialVersioningBehavior initialVersioningBehavior) { + this.initialVersioningBehavior = initialVersioningBehavior; + return this; + } + public ContinueAsNewOptions build() { return new ContinueAsNewOptions( workflowRunTimeout, @@ -141,7 +156,8 @@ public ContinueAsNewOptions build() { searchAttributes, typedSearchAttributes, contextPropagators, - versioningIntent); + versioningIntent, + initialVersioningBehavior); } } @@ -157,6 +173,8 @@ public ContinueAsNewOptions build() { @SuppressWarnings("deprecation") private final @Nullable VersioningIntent versioningIntent; + private final @Nullable InitialVersioningBehavior initialVersioningBehavior; + public ContinueAsNewOptions( @Nullable Duration workflowRunTimeout, @Nullable String taskQueue, @@ -166,7 +184,8 @@ public ContinueAsNewOptions( @Nullable Map searchAttributes, @Nullable SearchAttributes typedSearchAttributes, @Nullable List contextPropagators, - @SuppressWarnings("deprecation") @Nullable VersioningIntent versioningIntent) { + @SuppressWarnings("deprecation") @Nullable VersioningIntent versioningIntent, + @Nullable InitialVersioningBehavior initialVersioningBehavior) { this.workflowRunTimeout = workflowRunTimeout; this.taskQueue = taskQueue; this.retryOptions = retryOptions; @@ -176,6 +195,7 @@ public ContinueAsNewOptions( this.typedSearchAttributes = typedSearchAttributes; this.contextPropagators = contextPropagators; this.versioningIntent = versioningIntent; + this.initialVersioningBehavior = initialVersioningBehavior; } public @Nullable Duration getWorkflowRunTimeout() { @@ -223,4 +243,13 @@ public RetryOptions getRetryOptions() { public @Nullable VersioningIntent getVersioningIntent() { return versioningIntent; } + + /** + * @return the initial versioning behavior for the first task of the new workflow run, or null if + * unset. + */ + @Experimental + public @Nullable InitialVersioningBehavior getInitialVersioningBehavior() { + return initialVersioningBehavior; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java b/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java index 04a997a53..e49929e18 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java @@ -4,7 +4,9 @@ import io.temporal.common.Experimental; import io.temporal.common.Priority; import io.temporal.common.RetryOptions; +import io.temporal.common.SuggestContinueAsNewReason; import java.time.Duration; +import java.util.List; import java.util.Optional; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -161,6 +163,22 @@ public interface WorkflowInfo { */ boolean isContinueAsNewSuggested(); + /** + * @return the reasons why continue-as-new is suggested, or an empty list if not suggested. This + * value changes during the lifetime of a Workflow Execution. + */ + @Experimental + List getSuggestContinueAsNewReasons(); + + /** + * @return true if the target worker deployment version has changed for this workflow since the + * last workflow task. This is only relevant for workflows using the PINNED versioning + * behavior. When true, the workflow may want to continue-as-new with {@link + * ContinueAsNewOptions.Builder#setInitialVersioningBehavior} set to AUTO_UPGRADE. + */ + @Experimental + boolean isTargetWorkerDeploymentVersionChanged(); + /** * @return The Build ID of the worker which executed the current Workflow Task. May be empty the * task was completed by a worker without a Build ID. If this worker is the one executing this diff --git a/temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java b/temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java index a51f0fe1b..305d60335 100644 --- a/temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java +++ b/temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java @@ -10,6 +10,7 @@ import io.temporal.api.workflowservice.v1.*; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowStub; +import io.temporal.common.InitialVersioningBehavior; import io.temporal.common.VersioningBehavior; import io.temporal.common.VersioningOverride; import io.temporal.common.WorkerDeploymentVersion; @@ -431,6 +432,86 @@ public void testWorkflowsCanUseVersioningOverride() { == PINNED_OVERRIDE_BEHAVIOR_PINNED)); } + @WorkflowInterface + public interface ContinueAsNewVersionUpgradeWorkflow { + @WorkflowMethod + String execute(int attempt); + } + + public static class TestWorkerVersioningCanV1 implements ContinueAsNewVersionUpgradeWorkflow { + @Override + @WorkflowVersioningBehavior(VersioningBehavior.PINNED) + public String execute(int attempt) { + if (attempt > 0) { + return "v1.0"; + } + while (!Workflow.getInfo().isTargetWorkerDeploymentVersionChanged()) { + Workflow.sleep(java.time.Duration.ofMillis(10)); + } + ContinueAsNewOptions options = + ContinueAsNewOptions.newBuilder() + .setInitialVersioningBehavior(InitialVersioningBehavior.AUTO_UPGRADE) + .build(); + ContinueAsNewVersionUpgradeWorkflow next = + Workflow.newContinueAsNewStub(ContinueAsNewVersionUpgradeWorkflow.class, options); + next.execute(attempt + 1); + throw new RuntimeException("unreachable"); + } + } + + public static class TestWorkerVersioningCanV2 implements ContinueAsNewVersionUpgradeWorkflow { + @Override + @WorkflowVersioningBehavior(VersioningBehavior.PINNED) + public String execute(int attempt) { + return "v2.0"; + } + } + + @Test + public void testContinueAsNewWithVersionUpgrade() { + assumeTrue("Test Server doesn't support versioning", SDKTestWorkflowRule.useExternalService); + + WorkerDeploymentVersion v1 = + new WorkerDeploymentVersion(testWorkflowRule.getDeploymentName(), "1.0"); + WorkerDeploymentVersion v2 = + new WorkerDeploymentVersion(testWorkflowRule.getDeploymentName(), "2.0"); + + Worker w1 = testWorkflowRule.newWorkerWithBuildID("1.0"); + w1.registerWorkflowImplementationTypes(TestWorkerVersioningCanV1.class); + w1.start(); + + Worker w2 = testWorkflowRule.newWorkerWithBuildID("2.0"); + w2.registerWorkflowImplementationTypes(TestWorkerVersioningCanV2.class); + w2.start(); + + // Set v1 as current + DescribeWorkerDeploymentResponse d1 = waitUntilWorkerDeploymentVisible(v1); + setCurrentVersion(v1, d1.getConflictToken()); + waitForRoutingConfigPropagation(v1); + + // Start workflow on v1 + ContinueAsNewVersionUpgradeWorkflow wf = + testWorkflowRule.newWorkflowStubTimeoutOptions( + ContinueAsNewVersionUpgradeWorkflow.class, "can-version-upgrade"); + WorkflowExecution we = WorkflowClient.start(wf::execute, 0); + + // Verify workflow is running on v1 + waitForWorkflowRunningOnVersion(we.getWorkflowId(), "1.0"); + + // Set v2 as current — triggers targetWorkerDeploymentVersionChanged + DescribeWorkerDeploymentResponse d2 = waitUntilWorkerDeploymentVisible(v2); + setCurrentVersion(v2, d2.getConflictToken()); + waitForRoutingConfigPropagation(v2); + + // V1 workflow should detect version change, CAN with AUTO_UPGRADE, v2 returns "v2.0" + String result = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub(we.getWorkflowId()) + .getResult(String.class); + Assert.assertEquals("v2.0", result); + } + @SuppressWarnings("deprecation") private DescribeWorkerDeploymentResponse waitUntilWorkerDeploymentVisible( WorkerDeploymentVersion v) { @@ -499,4 +580,64 @@ private SetWorkerDeploymentRampingVersionResponse setRampingVersion( .setPercentage(percent) .build()); } + + @SuppressWarnings("deprecation") + private void waitForRoutingConfigPropagation(WorkerDeploymentVersion v) { + Eventually.assertEventually( + Duration.ofSeconds(15), + () -> { + DescribeWorkerDeploymentResponse resp = + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeWorkerDeployment( + DescribeWorkerDeploymentRequest.newBuilder() + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setDeploymentName(v.getDeploymentName()) + .build()); + Assert.assertEquals( + v.getBuildId(), + resp.getWorkerDeploymentInfo() + .getRoutingConfig() + .getCurrentDeploymentVersion() + .getBuildId()); + // Check routing config update is not in progress + int state = resp.getWorkerDeploymentInfo().getRoutingConfigUpdateStateValue(); + Assert.assertNotEquals( + io.temporal.api.enums.v1.RoutingConfigUpdateState + .ROUTING_CONFIG_UPDATE_STATE_IN_PROGRESS_VALUE, + state); + }); + } + + private void waitForWorkflowRunningOnVersion(String workflowId, String expectedBuildId) { + Eventually.assertEventually( + Duration.ofSeconds(15), + () -> { + io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse resp = + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeWorkflowExecution( + io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest + .newBuilder() + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setExecution( + io.temporal.api.common.v1.WorkflowExecution.newBuilder() + .setWorkflowId(workflowId) + .build()) + .build()); + Assert.assertEquals( + io.temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING, + resp.getWorkflowExecutionInfo().getStatus()); + Assert.assertEquals( + expectedBuildId, + resp.getWorkflowExecutionInfo() + .getVersioningInfo() + .getDeploymentVersion() + .getBuildId()); + }); + } } diff --git a/temporal-serviceclient/src/main/proto b/temporal-serviceclient/src/main/proto index 188e309de..db5f59306 160000 --- a/temporal-serviceclient/src/main/proto +++ b/temporal-serviceclient/src/main/proto @@ -1 +1 @@ -Subproject commit 188e309dee0acb3e3c84363d2d9f11be32df3bb8 +Subproject commit db5f593060ee579460e7f4e06d79b5e4f3af2387 diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index 7183ade76..c95a7dd46 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -1023,6 +1023,7 @@ private static Failure wrapNexusOperationFailure(Failure cause) { .build(); } + @SuppressWarnings("deprecation") // Uses hasOperationError()/getOperationError() for compat @Override public void respondNexusTaskCompleted( RespondNexusTaskCompletedRequest request, diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java index 26b269199..e913a3822 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java @@ -846,6 +846,7 @@ public void testNexusOperationStartToCloseTimeout() { } } + @SuppressWarnings("deprecation") @Test public void testNexusOperationError() { Response unsuccessfulResp = @@ -1158,6 +1159,7 @@ public void testNexusOperationCancelRequestAcknowledgeSchedulesWorkflowTask() { } } + @SuppressWarnings("deprecation") @Test(timeout = 30000) public void testNexusOperationCanceledErrorWithCauseChain() { // Verifies that a canceled OperationError with a JSON-encoded failure chain properly diff --git a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java index 9f7aa44b6..f89e61c64 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java +++ b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java @@ -8,6 +8,7 @@ import io.temporal.api.failure.v1.Failure; import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.common.RetryOptions; +import io.temporal.common.SuggestContinueAsNewReason; import io.temporal.common.converter.DefaultDataConverter; import io.temporal.failure.CanceledFailure; import io.temporal.internal.common.SdkFlag; @@ -379,5 +380,15 @@ public long getHistorySize() { public boolean isContinueAsNewSuggested() { return false; } + + @Override + public List getSuggestContinueAsNewReasons() { + return Collections.emptyList(); + } + + @Override + public boolean isTargetWorkerDeploymentVersionChanged() { + return false; + } } }