Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support independent activities for pinned workflows #6957

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

ShahabT
Copy link
Collaborator

@ShahabT ShahabT commented Dec 9, 2024

What changed?

Now, a pinned workflow can start activities in task queues belonging to different deployments. Such activities are versioned independently from the workflow and will start on their own task queue's current deployment.

Why?

Previously, independent activities for pinned workflows were not supported and the workflow would get stuck if attempting to start an activity on a task queue not belonging to the same deployment and the workflow task queue.

How did you test it?

Added test.

Potential risks

The independence check happens when the task is scheduled and then when the spooled task is being dispatched. If the independence check have a different result for the spooled task, we re-spool it in the new backlog.

Re-spooling tasks in a different queue is a new behavior, but since it should happen at most once for a task (from default queue to pinned queue) and only in rare edge cases dealing with late arriving pollers, it seems safe.

Documentation

None so far.

Is hotfix candidate?

No.

@ShahabT ShahabT requested a review from a team as a code owner December 9, 2024 09:27
@ShahabT ShahabT requested a review from dnr December 9, 2024 09:31
…ivity

# Conflicts:
#	service/matching/task_queue_partition_manager.go
}
// Finish the task because now it is copied to the other backlog. It should be considered
// invalid because a poller did not receive the task.
task.finish(nil, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return after this line? or continue to try to dispatch on syncMatchQueue?

wfBehavior := directive.GetBehavior()
deployment := directive.GetDeployment()

perTypeUserData, perTypeUserDataChanged, err := pm.getPerTypeUserData()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would still call it

Suggested change
perTypeUserData, perTypeUserDataChanged, err := pm.getPerTypeUserData()
perTypeUserData, userDataChanged, err := pm.getPerTypeUserData()

the channel broadcast is still when any user data changes, right?

@@ -172,13 +172,23 @@ func recordActivityTaskStarted(
// TODO (shahab): support independent activities. Independent activities do not need all
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be removed now?

if scheduledDeployment == nil {
// TODO: remove this once the ScheduledDeployment field is removed from proto
scheduledDeployment = req.GetScheduledDeployment()
}
if !scheduledDeployment.Equal(wfDeployment) {
// This must be an AT scheduled before the workflow transitions to the current
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WFT?

// we skip this check until a v1.28 if scheduledBehavior is unspecified.
// TODO (shahab): remove this line after v1.27 is released.
scheduledBehavior != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
// This must be an AT scheduled before the workflow changes behavior. Matching can drop it.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WFT?

// TODO (shahab): remove this line after v1.27 is released.
scheduledBehavior != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
// This must be an AT scheduled before the workflow changes behavior. Matching can drop it.
return nil, false, serviceerrors.NewObsoleteMatchingTask("wrong directive behavior")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could the error message be a little more descriptive? "wrong directive behavior" and "wrong directive deployment" are pretty obscure

@@ -167,10 +167,24 @@ func Invoke(
return nil, serviceerrors.NewObsoleteMatchingTask("wrong task queue type")
}

wfBehavior := mutableState.GetEffectiveVersioningBehavior()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any opportunity to share code between this and recordactivitytaskstarted? actually I'm not even sure that would make this easier to understand, maybe it's simpler to just do it inline in both. (especially after future cleanups.) just wondering.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants