Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together #16486

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from

Conversation

yazgoo
Copy link

@yazgoo yazgoo commented Jun 28, 2024

see https://issues.apache.org/jira/browse/KAFKA-17049

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

example

  • 🟥: task for connector 1
  • 🟩: task for connector 2
  • 🟦: task for connector 3

let's say we want to get 4 revoked task for a worker with this tasks list (currentWorkerAllocation in the original code):

🟥 🟥 🟥 🟥 🟩 🟩 🟩 🟩 🟦 🟦 🟦 🟦

right now the first 4 are taken.

🟥 🟥 🟥 🟥

In our example, the first had all the same connector, so the revocation is unfair.

In the end this leads to an unbalanced distribution of tasks.

What this PR suggest is a fairer revocation, like so:

🟥 🟩 🟦 🟥

new algorithm detail

To do this first group the tasks by connectors

  • 🟥 🟥 🟥 🟥
  • 🟩 🟩 🟩 🟩
  • 🟦 🟦 🟦 🟦

and then we pick the first items of each list, one by one, so if we iterate:

iteration state result
0 🟥 🟥 🟥 🟥
🟩 🟩 🟩 🟩
🟦 🟦 🟦 🟦
1 🟥 🟥 🟥
🟩 🟩 🟩 🟩
🟦 🟦 🟦 🟦
🟥
2 🟥 🟥 🟥
🟩 🟩 🟩
🟦 🟦 🟦 🟦
🟥 🟩
3 🟥 🟥 🟥
🟩 🟩 🟩
🟦 🟦 🟦
🟥 🟩 🟦
4 🟥 🟥
🟩 🟩 🟩
🟦 🟦 🟦
🟥 🟩 🟦 🟥

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yazgoo for the bug report and fix! I found another source of unbalance that I didn't see before, but should also be addressed here.

performStandardRebalance();
performStandardRebalance();
assertEquals(3, memberAssignments.size());
memberAssignments.forEach((k, v) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an assertion like this (but generic) to assertBalancedAllocation and saw that there are other unit tests which leave the tasks for individual connectors unbalanced.

For example, testTaskAssignmentWhenLeaderBounces has this allocation at one point:

"worker2" -> "{ connectorIds=[connector2], taskIds=[connector1-1, connector2-0, connector2-3, connector2-2]}"
"worker3" -> "{ connectorIds=[connector1], taskIds=[connector1-2, connector2-1, connector1-0, connector1-3]}"

I think this means that we also need to adjust the allocation strategy with a similar strategy to the one you've implemented for revocation.

Otherwise the scale-up scenario is the only one fixed, and there could be other situations that cause unbalance.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I will have a look at it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored everything in BalancedIterator and added it to assignTasks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are still some scenarios that would lead to unbalanced per-connector task allocations across the cluster. We may need to perform an additional revocation to get to the state we want.

Demonstration

This is one possible scenario that could force us to perform an additional revocation in order to preserve balance in the allocation of total tasks per worker, and tasks per connector per worker.

Initial state

We start with three workers (W1, W2, and W3) and four connectors. Connector C1 is configured with 1 task, connector C2 is configured with 6 tasks, connector C3 is configured with two 2 tasks, and connector C4 is configured with 1 task.

The exact distribution of tasks across workers is:

W1: C1 C2 C2 C4
W2: C2 C2 C3
W3: C2 C2 C3

RIP W3

Then, worker W3 dies.

Unallocated: C2 C2 C3
W1: C1 C2 C2 C4
W2: C2 C2 C3

We now have to choose how to re-allocate the tasks from W3 across our two remaining workers.

Option 1: Allocate one C3 task to W1, and two C2 tasks to W2

W1: C1 C2 C2 C4 | C3
W2: C2 C2 C3    | C2 C2

This is unbalanced for C2's tasks (2 on W1, 4 on W2)

Option 2: Allocate one C2 task to each worker, and one C3 task to W2

W1: C1 C2 C2 C4 | C2
W2: C2 C2 C3    | C2 C3

This is unbalanced for C3's tasks (0 on W1, 2 on W2)

Option 3: Allocate one C2 task to each worker, and one C3 task to W1

W1: C1 C2 C2 C4 | C2 C3
W2: C2 C2 C3    | C2

This is unbalanced for total task counts (6 on W1, 4 on W2)

Option 4: Revocation + re-allocation

First, we revoke a C1 task from W1:

W1:    C2 C2 C4
W2: C2 C2 C3

Then we allocate one C2 task to each worker, one C3 task to W1, and one C1 task to W2:

W1:    C2 C2 C4 | C2 C2
W2: C2 C2 C3    | C2 C1

This leads to a balanced allocation in total tasks and tasks per connector across workers.

Test case

The following test case can be copy+pasted into the IncrementalCooperativeAssignorTest suite and should demonstrate the above scenario:

@Test
public void testForceUnbalancedPerConnectorTaskAllocation() {
    // Customize assignor for this test case
    time = new MockTime();
    initAssignor();

    final String c1 = "connector1";
    final String c2 = "connector2";
    final String c3 = "connector3";
    final String c4 = "connector4";

    final String w1 = "worker1";
    final String w2 = "worker2";
    final String w3 = "worker3";

    connectors.clear();
    addNewConnector(c1, 1);
    addNewConnector(c2, 6);
    addNewConnector(c3, 2);
    addNewConnector(c4, 1);

    removeWorkers(w1);
    int c1Tasks = -1;
    int c2Tasks = -1;
    int c3Tasks = -1;
    int c4Tasks = -1;
    addNewWorker(
            w1,
            Arrays.asList(c1, c4),
            Arrays.asList(
                    new ConnectorTaskId(c1, ++c1Tasks),
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c4, ++c4Tasks)
            )
    );
    addNewWorker(
            w2,
            Arrays.asList(c2),
            Arrays.asList(
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c3, ++c3Tasks)
            )
    );
    addNewWorker(
            w3,
            Arrays.asList(c3),
            Arrays.asList(
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c3, ++c3Tasks)
            )
    );

    // Initial rebalance: should result in no change in allocation
    performStandardRebalance();
    assertDelay(0);
    assertBalancedAndCompleteAllocation();

    // RIP W3
    // This round should result in no assignments since we need to wait for the delay to expire
    removeWorkers(w3);
    performStandardRebalance();
    assertDelay(rebalanceDelay);

    // After the delay has expired, W3's connectors and total tasks should be reallocated
    // evenly across the cluster
    time.sleep(rebalanceDelay);
    performStandardRebalance();
    assertDelay(0);
    assertBalancedAndCompleteAllocation();

    // However, the allocation of tasks per connector is not even;
    // at least one of the following assertion should fail
    for (String connector : Arrays.asList(c1, c2, c3, c4)) {
        long w1Tasks = memberAssignments.get(w1).tasks().stream()
                .filter(taskId -> connector.equals(taskId.connector()))
                .count();
        long w2Tasks = memberAssignments.get(w2).tasks().stream()
                .filter(taskId -> connector.equals(taskId.connector()))
                .count();
        long allocationDiff = Math.abs(w1Tasks - w2Tasks);
        assertTrue(
                allocationDiff <= 1,
                "Tasks are not allocated evenly across remaining workers for connector " + connector
                        + "; worker 1 was assigned " + w1Tasks + " tasks and worker 2 was assigned " + w2Tasks + " tasks"
        );
    }
}

Conclusion

Even if we do implement the logic to perform the additional revocation for the scenario above, I don't know if it's a good idea. More task revocations will lead to more task downtime and more rebalances (which can cause temporary unavailability for portions of the cluster's REST API). Those are not dealbreakers, but when coupled with the fact that connector loads may still vary greatly within a cluster, it's unclear that this improvement would buy us much, especially in cases where there are more workers than the number of tasks per connector.

@yazgoo @gharris1727 what are your thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thanks for finding that test case and rebalance situation! I suspected this wouldn't be so easy :)

This is a tradeoff between:

  • Job continuity (not being interrupted while running)
  • Job availability (running instead of being unassigned)
  • Global balance (workers each run approximately the same number of jobs)
  • Local balance (connectors have approximately the same number of jobs on each worker)

The existing algorithm prioritizes job continuity before the rebalance timeout expires, and global balance afterwards. I would probably prefer to follow this pattern for the local balance, and rely on the rebalance timeout to limit the impact on job continuity. I think local balance is a good property to have, especially because we don't have resource-aware-scheduling.

If we were to prioritize job continuity over local balance and not perform these additional revocations, what happens on an upgrade to a cluster with a very poor existing balance? I think if the rolling upgrade was performed within the rebalance timeout, the existing balance could remain after the upgrade.

Perhaps the workers could end up in a state like this:

W1: C1 C3 C4
W2: C2 C2 C2
W3: C2 C2 C2

After W3 dies, but we prioritize global balance + continuity over local balance:

W1: C1 C3 C4 C2 C2
W2: C2 C2 C2 C2

After W3 rejoins, it could end up in the original state, or like this, depending on the connector-name-ordering:

W1: C1 C3 C2
W2: C2 C2 C2
W3: C2 C2 C4

I think W2 will maintain that homogeneous allocation until it is offline for more than the rebalance timeout.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gharris1727 , just wanted to know why do you think Local balance is a good property to have? I think it is slightly hard to predict how the tasks are going to behave but with the global balancing (as described above), we are trying to ensure that all workers run more or less equal number of tasks. We would still guarantee the same with this new algorithm, but considering the heterogeneity of connect tasks and how the load patterns can change over time for the tasks of the same or different connectors, what advantage do we get by achieving Local balance?

I am just asking because from what I have seen, the best assignment in terms of tasks running as of now might not hold true a few hours later if the load patterns change considering we don't have Resource aware scheduling. Just wanted to know your thoughts on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vamossagar12 Local balance is a good property to have in situations where tasks of the same connector have similar resource requirements. For example, if connector A is memory-heavy, it makes sense to try to distribute it around the cluster evenly to avoid exhausting the memory capacity of any one worker.

A locally-balanced cluster should "average" the resource requirements of these heterogeneous connectors.

If the tasks of a connector are very heterogeneous (such as task 0 being high-load and all others being low-load) the local-balance strategy will be ineffective. I don't expect that it will be harmful, but I don't have any evidence for this.

Do you have an alternative property that you think we should use instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gharris1727 , thanks for the explanation. Yes I agree that when the tasks have similar resource requirements, Local balancing might yield better results. Having said that, it might not hold true for all the connectors that are getting deployed on a given connect cluster so we might end up optimising for something which may or mayn't be true for all the connectors. Even I don't think it would be harmful to always opt for Local balancing but what might still end up happening is that resource-heavy tasks from 2 different heterogenous connectors (let's say task-0 of 2 connectors c1 and c2) get scheduled on the same worker which puts load on the worker. This may or mayn't happen with the global balancing strategy.

The main point I am trying to make here is that the benefits of Local balancing v/s Global balancing is hard to quantify and really depends upon the connectors. So if we go ahead and make the changes as suggested in this PR, we might solve the local balancing problem but in theory could lead to imbalances for clusters which need global balancing (connectors running heterogenous tasks). Maybe, we keep both strategies and let users choose based on the kind of connectors they run (might need a KIP) since they are best placed to make that decision based on the connectors they run. Do you think that would be an overkill?

BTW, I am not at all against the changes in the PR. Just that I feel in the absence of any quantifiable way of assigning tasks (like metrics or load usage), we can pivot either way but the overall benefits might get evened out based upon the nature of tasks running and how their behaviour changes over time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even I don't think it would be harmful to always opt for Local balancing but what might still end up happening is that resource-heavy tasks from 2 different heterogenous connectors (let's say task-0 of 2 connectors c1 and c2)
get scheduled on the same worker which puts load on the worker.

I agree that this is problematic, but it isn't a problem that we can currently solve in resource-unaware scheduling. Before we have resource-aware-scheduling, we should take steps to improve the algorithm that we do have.

The main point I am trying to make here is that the benefits of Local balancing v/s Global balancing is hard to quantify and really depends upon the connectors.

It isn't global vs local balancing, it's global + local balancing. Nobody is proposing that we move away from global balancing, that will still be in place. Additionally, the "local balancing" is not a totally new property. If you look at the assignTasks algorithm when run against an empty cluster, such as in testAssignTasksWhenBalanced(), it produces a locally-balanced assignment. The thing addressed here is that after successive generations, the assignment is no longer locally-balanced like it would be if there was just one round of assignments.

Maybe, we keep both strategies and let users choose based on the kind of connectors they run (might need a KIP) since they are best placed to make that decision based on the connectors they run. Do you think that would be an overkill?

Certainly. If we're introducing a KIP to change the rebalance protocol, I would want it to solve the problem holistically instead of just adding a configuration to use a different revocation algorithm. This is a change within the bounds of backwards-compatibility, because the revocation and balance algorithm was never officially specified.

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix, @yazgoo. I left a few small stylistic comments that should be easy to address, but I've also noticed there are some test failures in the IncrementalCooperativeAssignorTest suite. You can see them in our CI here; can you take a look at those?


@Override
public E next() {
for (; k < k + this.keys.size(); ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can't this be a while loop instead of a for loop?
  2. Is the condition correct? Technically the loop will terminate eventually, but only if either this.keys is empty, or overflow occurs and the value of k + this.keys.size() becomes negative.

Comment on lines +136 to +147
connectors.clear();
addNewConnector("connector1", 12);
performStandardRebalance();
addNewConnector("connector2", 12);
performStandardRebalance();
addNewEmptyWorkers("worker2");
performStandardRebalance();
performStandardRebalance();
addNewEmptyWorkers("worker3");
performStandardRebalance();
performStandardRebalance();
assertEquals(3, memberAssignments.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try to follow the style in the other tests, where each rebalance is split out into its own separate group of lines, and each has a comment explaining what should take place during that rebalance?

These tests get pretty hard to read if it's just a wall of adding connectors and rebalancing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take the blame for this one, I just spat out a unit test following the end-to-end reproduction case. Sorry @yazgoo that you're cleaning up after me :^)

yazgoo and others added 3 commits July 1, 2024 17:45
…/distributed/IncrementalCooperativeAssignor.java

Co-authored-by: Chris Egerton <[email protected]>
…/distributed/IncrementalCooperativeAssignor.java

Co-authored-by: Chris Egerton <[email protected]>
List::iterator
)
));
this.keys = new ArrayList<>(grouped.keySet());
Copy link
Contributor

@gharris1727 gharris1727 Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for this key set to coincide across different workers? For example, if connector "a" always showed up first in the listing.

If so, then connector "a" would be the first thing revoked from all of the existing workers, and "a" could be concentrated on the new worker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants