Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together #16486

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -657,13 +657,15 @@ private Map<String, ConnectorsAndTasks> performLoadBalancingRevocations(
"connector",
configured.connectors().size(),
workers,
WorkerLoad::connectors
WorkerLoad::connectors,
Function.identity()
);
Map<String, Set<ConnectorTaskId>> taskRevocations = loadBalancingRevocations(
"task",
configured.tasks().size(),
workers,
WorkerLoad::tasks
WorkerLoad::tasks,
ConnectorTaskId::connector
);

connectorRevocations.forEach((worker, revoked) ->
Expand All @@ -680,7 +682,8 @@ private <E> Map<String, Set<E>> loadBalancingRevocations(
String allocatedResourceName,
int totalToAllocate,
Collection<WorkerLoad> workers,
Function<WorkerLoad, Collection<E>> workerAllocation
Function<WorkerLoad, Collection<E>> workerAllocation,
Function<E, String> allocationGrouper
) {
int totalWorkers = workers.size();
// The minimum instances of this resource that should be assigned to each worker
Expand Down Expand Up @@ -736,7 +739,7 @@ private <E> Map<String, Set<E>> loadBalancingRevocations(
Set<E> revokedFromWorker = new LinkedHashSet<>();
result.put(worker.worker(), revokedFromWorker);

Iterator<E> currentWorkerAllocation = workerAllocation.apply(worker).iterator();
Iterator<E> currentWorkerAllocation = new BalancedIterator<E>(workerAllocation.apply(worker), allocationGrouper);
// Revoke resources from the worker until it isn't allocated any more than it should be
for (int numRevoked = 0; currentAllocationSizeForWorker - numRevoked > maxAllocationForWorker; numRevoked++) {
if (!currentWorkerAllocation.hasNext()) {
Expand Down Expand Up @@ -793,6 +796,43 @@ protected void assignConnectors(List<WorkerLoad> workerAssignment, Collection<St
}
}

static class BalancedIterator<E> implements Iterator<E> {

private final Map<String, Iterator<E>> grouped;
private final List<String> keys;

private int k;

public BalancedIterator(Collection<E> collection, Function<E, String> allocationGrouper) {
this.k = 0;
this.grouped = collection.stream().collect(Collectors.groupingBy(
allocationGrouper,
Collectors.collectingAndThen(
Collectors.toList(),
List::iterator
)
));
this.keys = new ArrayList<>(grouped.keySet());
Copy link
Contributor

@gharris1727 gharris1727 Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for this key set to coincide across different workers? For example, if connector "a" always showed up first in the listing.

If so, then connector "a" would be the first thing revoked from all of the existing workers, and "a" could be concentrated on the new worker.

}

@Override
public boolean hasNext() {
return grouped.values().stream().anyMatch(Iterator::hasNext);
}

@Override
public E next() {
while (!this.keys.isEmpty() && k < k + this.keys.size()) {
Iterator<E> iterator = grouped.get(this.keys.get(k % this.keys.size()));
k++;
if (iterator.hasNext()) {
return iterator.next();
}
}
return null;
}
}

/**
* Perform a round-robin assignment of tasks to workers with existing worker load. This
* assignment tries to balance the load between workers, by assigning tasks to workers that
Expand All @@ -802,10 +842,9 @@ protected void assignConnectors(List<WorkerLoad> workerAssignment, Collection<St
* @param tasks the tasks to be assigned
*/
protected void assignTasks(List<WorkerLoad> workerAssignment, Collection<ConnectorTaskId> tasks) {
workerAssignment.sort(WorkerLoad.taskComparator());
WorkerLoad first = workerAssignment.get(0);

Iterator<ConnectorTaskId> load = tasks.iterator();
Iterator<ConnectorTaskId> load = new BalancedIterator<>(tasks, ConnectorTaskId::connector);
while (load.hasNext()) {
int firstLoad = first.tasksSize();
int upTo = IntStream.range(0, workerAssignment.size())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,46 @@ public void testTaskAssignmentWhenWorkerJoins() {
assertEmptyAssignment();
}

@Test
public void testConnectorWellBalancedOnScaleOut() {
// Customize assignor for this test case
time = new MockTime();
initAssignor();
connectors.clear();

// Add first connector
addNewConnector("connector1", 12);
performStandardRebalance();
assertDelay(0);

// add second connector
addNewConnector("connector2", 12);
performStandardRebalance();
assertDelay(0);

// add second worker
addNewEmptyWorkers("worker2");
performStandardRebalance();
performStandardRebalance();
assertDelay(0);

// add third worker
addNewEmptyWorkers("worker3");
performStandardRebalance();
performStandardRebalance();
assertDelay(0);

// assert the connectors are well balanced
// over the workers
assertEquals(3, memberAssignments.size());
memberAssignments.forEach((k, v) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an assertion like this (but generic) to assertBalancedAllocation and saw that there are other unit tests which leave the tasks for individual connectors unbalanced.

For example, testTaskAssignmentWhenLeaderBounces has this allocation at one point:

"worker2" -> "{ connectorIds=[connector2], taskIds=[connector1-1, connector2-0, connector2-3, connector2-2]}"
"worker3" -> "{ connectorIds=[connector1], taskIds=[connector1-2, connector2-1, connector1-0, connector1-3]}"

I think this means that we also need to adjust the allocation strategy with a similar strategy to the one you've implemented for revocation.

Otherwise the scale-up scenario is the only one fixed, and there could be other situations that cause unbalance.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I will have a look at it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored everything in BalancedIterator and added it to assignTasks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are still some scenarios that would lead to unbalanced per-connector task allocations across the cluster. We may need to perform an additional revocation to get to the state we want.

Demonstration

This is one possible scenario that could force us to perform an additional revocation in order to preserve balance in the allocation of total tasks per worker, and tasks per connector per worker.

Initial state

We start with three workers (W1, W2, and W3) and four connectors. Connector C1 is configured with 1 task, connector C2 is configured with 6 tasks, connector C3 is configured with two 2 tasks, and connector C4 is configured with 1 task.

The exact distribution of tasks across workers is:

W1: C1 C2 C2 C4
W2: C2 C2 C3
W3: C2 C2 C3

RIP W3

Then, worker W3 dies.

Unallocated: C2 C2 C3
W1: C1 C2 C2 C4
W2: C2 C2 C3

We now have to choose how to re-allocate the tasks from W3 across our two remaining workers.

Option 1: Allocate one C3 task to W1, and two C2 tasks to W2

W1: C1 C2 C2 C4 | C3
W2: C2 C2 C3    | C2 C2

This is unbalanced for C2's tasks (2 on W1, 4 on W2)

Option 2: Allocate one C2 task to each worker, and one C3 task to W2

W1: C1 C2 C2 C4 | C2
W2: C2 C2 C3    | C2 C3

This is unbalanced for C3's tasks (0 on W1, 2 on W2)

Option 3: Allocate one C2 task to each worker, and one C3 task to W1

W1: C1 C2 C2 C4 | C2 C3
W2: C2 C2 C3    | C2

This is unbalanced for total task counts (6 on W1, 4 on W2)

Option 4: Revocation + re-allocation

First, we revoke a C1 task from W1:

W1:    C2 C2 C4
W2: C2 C2 C3

Then we allocate one C2 task to each worker, one C3 task to W1, and one C1 task to W2:

W1:    C2 C2 C4 | C2 C2
W2: C2 C2 C3    | C2 C1

This leads to a balanced allocation in total tasks and tasks per connector across workers.

Test case

The following test case can be copy+pasted into the IncrementalCooperativeAssignorTest suite and should demonstrate the above scenario:

@Test
public void testForceUnbalancedPerConnectorTaskAllocation() {
    // Customize assignor for this test case
    time = new MockTime();
    initAssignor();

    final String c1 = "connector1";
    final String c2 = "connector2";
    final String c3 = "connector3";
    final String c4 = "connector4";

    final String w1 = "worker1";
    final String w2 = "worker2";
    final String w3 = "worker3";

    connectors.clear();
    addNewConnector(c1, 1);
    addNewConnector(c2, 6);
    addNewConnector(c3, 2);
    addNewConnector(c4, 1);

    removeWorkers(w1);
    int c1Tasks = -1;
    int c2Tasks = -1;
    int c3Tasks = -1;
    int c4Tasks = -1;
    addNewWorker(
            w1,
            Arrays.asList(c1, c4),
            Arrays.asList(
                    new ConnectorTaskId(c1, ++c1Tasks),
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c4, ++c4Tasks)
            )
    );
    addNewWorker(
            w2,
            Arrays.asList(c2),
            Arrays.asList(
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c3, ++c3Tasks)
            )
    );
    addNewWorker(
            w3,
            Arrays.asList(c3),
            Arrays.asList(
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c3, ++c3Tasks)
            )
    );

    // Initial rebalance: should result in no change in allocation
    performStandardRebalance();
    assertDelay(0);
    assertBalancedAndCompleteAllocation();

    // RIP W3
    // This round should result in no assignments since we need to wait for the delay to expire
    removeWorkers(w3);
    performStandardRebalance();
    assertDelay(rebalanceDelay);

    // After the delay has expired, W3's connectors and total tasks should be reallocated
    // evenly across the cluster
    time.sleep(rebalanceDelay);
    performStandardRebalance();
    assertDelay(0);
    assertBalancedAndCompleteAllocation();

    // However, the allocation of tasks per connector is not even;
    // at least one of the following assertion should fail
    for (String connector : Arrays.asList(c1, c2, c3, c4)) {
        long w1Tasks = memberAssignments.get(w1).tasks().stream()
                .filter(taskId -> connector.equals(taskId.connector()))
                .count();
        long w2Tasks = memberAssignments.get(w2).tasks().stream()
                .filter(taskId -> connector.equals(taskId.connector()))
                .count();
        long allocationDiff = Math.abs(w1Tasks - w2Tasks);
        assertTrue(
                allocationDiff <= 1,
                "Tasks are not allocated evenly across remaining workers for connector " + connector
                        + "; worker 1 was assigned " + w1Tasks + " tasks and worker 2 was assigned " + w2Tasks + " tasks"
        );
    }
}

Conclusion

Even if we do implement the logic to perform the additional revocation for the scenario above, I don't know if it's a good idea. More task revocations will lead to more task downtime and more rebalances (which can cause temporary unavailability for portions of the cluster's REST API). Those are not dealbreakers, but when coupled with the fact that connector loads may still vary greatly within a cluster, it's unclear that this improvement would buy us much, especially in cases where there are more workers than the number of tasks per connector.

@yazgoo @gharris1727 what are your thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thanks for finding that test case and rebalance situation! I suspected this wouldn't be so easy :)

This is a tradeoff between:

  • Job continuity (not being interrupted while running)
  • Job availability (running instead of being unassigned)
  • Global balance (workers each run approximately the same number of jobs)
  • Local balance (connectors have approximately the same number of jobs on each worker)

The existing algorithm prioritizes job continuity before the rebalance timeout expires, and global balance afterwards. I would probably prefer to follow this pattern for the local balance, and rely on the rebalance timeout to limit the impact on job continuity. I think local balance is a good property to have, especially because we don't have resource-aware-scheduling.

If we were to prioritize job continuity over local balance and not perform these additional revocations, what happens on an upgrade to a cluster with a very poor existing balance? I think if the rolling upgrade was performed within the rebalance timeout, the existing balance could remain after the upgrade.

Perhaps the workers could end up in a state like this:

W1: C1 C3 C4
W2: C2 C2 C2
W3: C2 C2 C2

After W3 dies, but we prioritize global balance + continuity over local balance:

W1: C1 C3 C4 C2 C2
W2: C2 C2 C2 C2

After W3 rejoins, it could end up in the original state, or like this, depending on the connector-name-ordering:

W1: C1 C3 C2
W2: C2 C2 C2
W3: C2 C2 C4

I think W2 will maintain that homogeneous allocation until it is offline for more than the rebalance timeout.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gharris1727 , just wanted to know why do you think Local balance is a good property to have? I think it is slightly hard to predict how the tasks are going to behave but with the global balancing (as described above), we are trying to ensure that all workers run more or less equal number of tasks. We would still guarantee the same with this new algorithm, but considering the heterogeneity of connect tasks and how the load patterns can change over time for the tasks of the same or different connectors, what advantage do we get by achieving Local balance?

I am just asking because from what I have seen, the best assignment in terms of tasks running as of now might not hold true a few hours later if the load patterns change considering we don't have Resource aware scheduling. Just wanted to know your thoughts on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vamossagar12 Local balance is a good property to have in situations where tasks of the same connector have similar resource requirements. For example, if connector A is memory-heavy, it makes sense to try to distribute it around the cluster evenly to avoid exhausting the memory capacity of any one worker.

A locally-balanced cluster should "average" the resource requirements of these heterogeneous connectors.

If the tasks of a connector are very heterogeneous (such as task 0 being high-load and all others being low-load) the local-balance strategy will be ineffective. I don't expect that it will be harmful, but I don't have any evidence for this.

Do you have an alternative property that you think we should use instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gharris1727 , thanks for the explanation. Yes I agree that when the tasks have similar resource requirements, Local balancing might yield better results. Having said that, it might not hold true for all the connectors that are getting deployed on a given connect cluster so we might end up optimising for something which may or mayn't be true for all the connectors. Even I don't think it would be harmful to always opt for Local balancing but what might still end up happening is that resource-heavy tasks from 2 different heterogenous connectors (let's say task-0 of 2 connectors c1 and c2) get scheduled on the same worker which puts load on the worker. This may or mayn't happen with the global balancing strategy.

The main point I am trying to make here is that the benefits of Local balancing v/s Global balancing is hard to quantify and really depends upon the connectors. So if we go ahead and make the changes as suggested in this PR, we might solve the local balancing problem but in theory could lead to imbalances for clusters which need global balancing (connectors running heterogenous tasks). Maybe, we keep both strategies and let users choose based on the kind of connectors they run (might need a KIP) since they are best placed to make that decision based on the connectors they run. Do you think that would be an overkill?

BTW, I am not at all against the changes in the PR. Just that I feel in the absence of any quantifiable way of assigning tasks (like metrics or load usage), we can pivot either way but the overall benefits might get evened out based upon the nature of tasks running and how their behaviour changes over time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even I don't think it would be harmful to always opt for Local balancing but what might still end up happening is that resource-heavy tasks from 2 different heterogenous connectors (let's say task-0 of 2 connectors c1 and c2)
get scheduled on the same worker which puts load on the worker.

I agree that this is problematic, but it isn't a problem that we can currently solve in resource-unaware scheduling. Before we have resource-aware-scheduling, we should take steps to improve the algorithm that we do have.

The main point I am trying to make here is that the benefits of Local balancing v/s Global balancing is hard to quantify and really depends upon the connectors.

It isn't global vs local balancing, it's global + local balancing. Nobody is proposing that we move away from global balancing, that will still be in place. Additionally, the "local balancing" is not a totally new property. If you look at the assignTasks algorithm when run against an empty cluster, such as in testAssignTasksWhenBalanced(), it produces a locally-balanced assignment. The thing addressed here is that after successive generations, the assignment is no longer locally-balanced like it would be if there was just one round of assignments.

Maybe, we keep both strategies and let users choose based on the kind of connectors they run (might need a KIP) since they are best placed to make that decision based on the connectors they run. Do you think that would be an overkill?

Certainly. If we're introducing a KIP to change the rebalance protocol, I would want it to solve the problem holistically instead of just adding a configuration to use a different revocation algorithm. This is a change within the bounds of backwards-compatibility, because the revocation and balance algorithm was never officially specified.

Map<String, List<ConnectorTaskId>> countsByConnector = v.tasks().stream().collect(Collectors.groupingBy(ConnectorTaskId::connector));
assertEquals(countsByConnector.size(), 2);
countsByConnector.forEach((k2, v2) -> assertEquals(v2.size(), 4));
});
assertBalancedAndCompleteAllocation();
}

@Test
public void testAssignmentsWhenWorkersJoinAfterRevocations() {
// Customize assignor for this test case
Expand Down