Skip to content

Commit

Permalink
Use affinity router for non-CI tasks (#7476)
Browse files Browse the repository at this point in the history
  • Loading branch information
bduffany authored Sep 17, 2024
1 parent 06eb946 commit efc043a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 116 deletions.
33 changes: 13 additions & 20 deletions enterprise/server/scheduling/task_router/task_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ const (
// router for routable tasks. This is intentionally less than the number of
// probes per task (for load balancing purposes).
defaultPreferredNodeLimit = 1
// The preferred node limit for workflows.
// The preferred node limit for ci_runner tasks.
// This is set higher than the default limit since we strongly prefer
// workflow tasks to hit a node with a warm bazel workspace, but it is
// these tasks to hit a node with a warm bazel workspace, but it is
// set less than the number of probes so that we can autoscale the workflow
// executor pool effectively.
workflowsPreferredNodeLimit = 1
ciRunnerPreferredNodeLimit = 1
)

type taskRouter struct {
Expand Down Expand Up @@ -71,7 +71,7 @@ func New(env environment.Env) (interfaces.TaskRouter, error) {
if rdb == nil {
return nil, status.FailedPreconditionError("Redis is required for task router")
}
strategies := []Router{runnerRecycler{}, affinityRouter{}}
strategies := []Router{ciRunnerRouter{}, affinityRouter{}}
return &taskRouter{
env: env,
rdb: rdb,
Expand Down Expand Up @@ -268,22 +268,19 @@ type Router interface {
RoutingInfo(params routingParams) (int, []string, error)
}

// The runnerRecycler is a router that attempts to "recycle" warm execution
// nodes when possible.
type runnerRecycler struct{}
// The ciRunnerRouter routes ci_runner tasks according to git branch
// information.
type ciRunnerRouter struct{}

func (runnerRecycler) Applies(params routingParams) bool {
return platform.IsTrue(platform.FindValue(params.cmd.GetPlatform(), platform.RecycleRunnerPropertyName))
func (ciRunnerRouter) Applies(params routingParams) bool {
return platform.IsCICommand(params.cmd) && platform.IsTrue(platform.FindValue(params.cmd.GetPlatform(), platform.RecycleRunnerPropertyName))
}

func (runnerRecycler) preferredNodeLimit(params routingParams) int {
if isWorkflow(params.cmd) {
return workflowsPreferredNodeLimit
}
return defaultPreferredNodeLimit
func (ciRunnerRouter) preferredNodeLimit(params routingParams) int {
return ciRunnerPreferredNodeLimit
}

func (runnerRecycler) routingKeys(params routingParams) ([]string, error) {
func (ciRunnerRouter) routingKeys(params routingParams) ([]string, error) {
parts := []string{"task_route", params.groupID}
keys := make([]string, 0)

Expand Down Expand Up @@ -326,16 +323,12 @@ func (runnerRecycler) routingKeys(params routingParams) ([]string, error) {
return keys, nil
}

func (s runnerRecycler) RoutingInfo(params routingParams) (int, []string, error) {
func (s ciRunnerRouter) RoutingInfo(params routingParams) (int, []string, error) {
nodeLimit := s.preferredNodeLimit(params)
keys, err := s.routingKeys(params)
return nodeLimit, keys, err
}

func isWorkflow(cmd *repb.Command) bool {
return platform.FindValue(cmd.GetPlatform(), platform.WorkflowIDPropertyName) != ""
}

// affinityRouter generates Redis routing keys based on:
// - remoteInstanceName
// - groupID
Expand Down
89 changes: 1 addition & 88 deletions enterprise/server/scheduling/task_router/task_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,55 +70,6 @@ func TestTaskRouter_RankNodes_Workflows_ReturnsLatestRunnerThatExecutedWorkflow(
requireNonSequential(t, ranked[2:])
}

func TestTaskRouter_RankNodes_DefaultNodeLimit_ReturnsOnlyLatestNodeMarkedComplete(t *testing.T) {
// Mark a routable task complete by executor 1.

env := newTestEnv(t)
router := newTaskRouter(t, env)
ctx := withAuthUser(t, context.Background(), env, "US1")
cmd := &repb.Command{
Platform: &repb.Platform{
Properties: []*repb.Platform_Property{
{Name: "recycle-runner", Value: "true"},
},
},
}
instanceName := "test-instance"

router.MarkComplete(ctx, cmd, instanceName, executorHostID1)

nodes := sequentiallyNumberedNodes(100)

// Task should now be routed to executor 1.

ranked := router.RankNodes(ctx, cmd, instanceName, nodes)

requireSameExecutionNodes(t, nodes, ranked)
require.Equal(t, executorHostID1, ranked[0].GetExecutionNode().GetExecutorHostId())
requireNonSequential(t, ranked[1:])

// Mark the same task complete by executor 2 as well.

router.MarkComplete(ctx, cmd, instanceName, executorHostID2)

ranked = router.RankNodes(ctx, cmd, instanceName, nodes)

// Task should now be routed to executor 2, but executor 1 should be ranked
// randomly, since we only store up to 1 recent executor for non-workflow
// tasks.

requireSameExecutionNodes(t, nodes, ranked)
require.Equal(t, executorHostID2, ranked[0].GetExecutionNode().GetExecutorHostId())
require.True(t, ranked[0].IsPreferred())

requireNotAlwaysRanked(1, executorHostID1, t, router, ctx, cmd, instanceName)
requireNonSequential(t, ranked[1:])

for i := 1; i < 100; i++ {
require.False(t, ranked[i].IsPreferred())
}
}

func TestTaskRouter_RankNodes_RoutesByHostID(t *testing.T) {
// Mark a routable task complete by executor 1.

Expand All @@ -131,6 +82,7 @@ func TestTaskRouter_RankNodes_RoutesByHostID(t *testing.T) {
{Name: "recycle-runner", Value: "true"},
},
},
OutputPaths: []string{"foo.out"},
}
instanceName := "test-instance"

Expand Down Expand Up @@ -310,45 +262,6 @@ func TestTaskRouter_RankNodes_AffinityRoutingDisabled(t *testing.T) {
requireNotAlwaysRanked(0, executorHostID1, t, router, ctx, cmd, instanceName)
}

func TestTaskRouter_RankNodes_RunnerRecyclingTakesPrecedence(t *testing.T) {
env := newTestEnv(t)
router := newTaskRouter(t, env)
ctx := withAuthUser(t, context.Background(), env, "US1")
oaCmd := &repb.Command{
Platform: &repb.Platform{
Properties: []*repb.Platform_Property{
{Name: "recycle-runner", Value: "true"},
{Name: "affinity-routing", Value: "true"},
},
},
OutputPaths: []string{"/bazel-out/foo.a"},
}
instanceName := "test-instance"

router.MarkComplete(ctx, oaCmd, instanceName, executorHostID1)

rrCmd := &repb.Command{
Platform: &repb.Platform{
Properties: []*repb.Platform_Property{
{Name: "recycle-runner", Value: "true"},
{Name: "affinity-routing", Value: "true"},
},
},
}

router.MarkComplete(ctx, rrCmd, instanceName, executorHostID2)

nodes := sequentiallyNumberedNodes(100)

// Task should be routed to executor 2, because the runner recycling
// routing should take priority
ranked := router.RankNodes(ctx, oaCmd, instanceName, nodes)

requireSameExecutionNodes(t, nodes, ranked)
require.Equal(t, executorHostID2, ranked[0].GetExecutionNode().GetExecutorHostId())
requireNonSequential(t, ranked[1:])
}

func TestTaskRouter_RankNodes_JustShufflesIfCommandIsNotAvailable(t *testing.T) {
env := newTestEnv(t)
router := newTaskRouter(t, env)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,9 @@ func TestSimpleCommand_RunnerReuse_MultipleExecutors_RoutesCommandToSameExecutor
opts := &rbetest.ExecuteOpts{APIKey: rbe.APIKey1}

cmd := rbe.Execute(&repb.Command{
Arguments: []string{"touch", "foo.txt"},
Platform: platform,
Arguments: []string{"touch", "foo.txt"},
OutputPaths: []string{"foo.txt"},
Platform: platform,
}, opts)
res := cmd.Wait()

Expand All @@ -484,8 +485,9 @@ func TestSimpleCommand_RunnerReuse_MultipleExecutors_RoutesCommandToSameExecutor
rbetest.WaitForAnyPooledRunner(t, ctx)

cmd = rbe.Execute(&repb.Command{
Arguments: []string{"stat", "foo.txt"},
Platform: platform,
Arguments: []string{"stat", "foo.txt"},
OutputPaths: []string{"foo.txt"},
Platform: platform,
}, opts)
res = cmd.Wait()

Expand Down Expand Up @@ -520,8 +522,9 @@ func TestSimpleCommand_RunnerReuse_PoolSelectionViaHeader_RoutesCommandToSameExe
}

cmd := rbe.Execute(&repb.Command{
Arguments: []string{"touch", "foo.txt"},
Platform: platform,
Arguments: []string{"touch", "foo.txt"},
OutputPaths: []string{"foo.txt"},
Platform: platform,
}, opts)
res := cmd.Wait()

Expand All @@ -530,8 +533,9 @@ func TestSimpleCommand_RunnerReuse_PoolSelectionViaHeader_RoutesCommandToSameExe
rbetest.WaitForAnyPooledRunner(t, ctx)

cmd = rbe.Execute(&repb.Command{
Arguments: []string{"stat", "foo.txt"},
Platform: platform,
Arguments: []string{"stat", "foo.txt"},
OutputPaths: []string{"foo.txt"},
Platform: platform,
}, opts)
res = cmd.Wait()

Expand Down

0 comments on commit efc043a

Please sign in to comment.