Skip to content

[Data] Fix ActorPool scaling to avoid scaling down when the input queue is empty #53009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
May 19, 2025

Conversation

alexeykudinkin
Copy link
Contributor

Why are these changes needed?

This is a follow-up for a #52806 that inadvertently modified autoscaling condition allowing downscaling to happen before all inputs have completed.

Changes

  1. Fixed ActorPool to avoid downscaling if the enqueued blocks are 0 until all inputs are done.
  2. Unified and streamlined ActorPool scaling handler to be just 1 method
  3. Updated tests
  4. Tidying up

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Rebased `get_actor_info` to use it;
Tidying up

Signed-off-by: Alexey Kudinkin <[email protected]>
Removed unnecessary methods;

Signed-off-by: Alexey Kudinkin <[email protected]>
Tidying up

Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
@alexeykudinkin alexeykudinkin requested a review from a team as a code owner May 14, 2025 21:07
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label May 14, 2025
@alexeykudinkin alexeykudinkin requested a review from bveeramani May 14, 2025 21:10
Signed-off-by: Alexey Kudinkin <[email protected]>
Tidying up;

Signed-off-by: Alexey Kudinkin <[email protected]>
if op.completed() or (op_state.total_enqueued_input_bundles() == 0):
return False
if op.completed() or (
op._inputs_complete and op_state.total_enqueued_input_bundles() == 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signed-off-by: Alexey Kudinkin <[email protected]>
util = self._calculate_actor_pool_util(actor_pool)
return util < self._actor_pool_scaling_down_threshold
if util >= self._actor_pool_scaling_up_threshold:
return _AutoscalingAction.SCALE_UP
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation, we don't scale up if the previous scale up hasn't finished. I think this diff might remove that behavior?
https://github.com/anyscale/rayturbo/blob/e156a9f9424ec1e499e93892d1a33d4bfb8599a1/python/ray/anyscale/data/autoscaler/anyscale_autoscaler.py#L357-L360

Comment on lines 173 to 180
@dataclass
class _ActorInfo:
"""Breakdown of the state of the actors used by the ``PhysicalOperator``"""

running: int
pending: int
restarting: int

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I felt kinda confused by these name. _ActorInfo makes me think a single actor rather than all of the actors for an operator, and running etc. seemed like bools rather than counts. 

Maybe something like ActorStateCounts or OperatorActorStats and num_running etc. might be clearer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was calling it OpActorInfo initially, but it's now used inside the ActorPool. Will rename it to _ActorPoolInfo

Comment on lines 575 to 581
logger.info(
f"Scaling up actor pool by {num_actors} ("
f"running={self.num_running_actors()}, "
f"pending={self.num_pending_actors()}, "
f"restarting={self.num_restarting_actors()})"
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We autoscale actors as much as twice a second, so I'm worried these info logs might get spammy. Would debug make sense instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There actually should be no bouncing back and forth in autoscaler (it just doesn't really make sense)

 - AP has high utilization (actor-wise), but
 - Op is throttled, or
 - There are still free slots available

Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
)
elif util <= self._actor_pool_scaling_down_threshold:
if not actor_pool.can_scale_down():
return _AutoscalingAction.NO_OP, "debounced"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, the method name can_scale_down doesn't suggest it's because of debouncing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@@ -760,20 +798,20 @@ def num_free_slots(self) -> int:
for running_actor in self._running_actors.values()
)

def _kill_inactive_actor(self) -> bool:
def _release_inactive_actor(self) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, I prefer using the word "remove".
Because both kill and release are implementation details that can change in the future.
also, please update the comments as well

return (
f"{base} (running={info.running}, restarting={info.restarting}, "
f"pending={info.pending})"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implement this as _ActorInfo.__str__.
So the logs in scale_up/scale_down can use it as well

Comment on lines 173 to 180
@dataclass
class _ActorInfo:
"""Breakdown of the state of the actors used by the ``PhysicalOperator``"""

running: int
pending: int
restarting: int

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -574,18 +577,53 @@ def current_in_flight_tasks(self) -> int:
for actor_state in self._running_actors.values()
)

def scale_up(self, num_actors: int) -> int:
def can_scale_down(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not putting the debouncing logic in the autoscaler?
It's also part of the autoscaling policy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went back and forth on it decided to keep it in AP, since it owns the logic of actually scaling up and tracking most recent actions


def scale_up(self, num_actors: int, *, reason: Optional[str] = None) -> int:
logger.info(
f"Scaling up actor pool by {num_actors} ("
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f"Scaling up actor pool by {num_actors} ("
f"Scaling up actor pool by {num_actors}, current actor counts: ("

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also includes reason so that needs to be more generic

self,
actor_pool: AutoscalingActorPool,
op: "PhysicalOperator",
op_state: "OpState",
):
) -> Tuple[_AutoscalingAction, Optional[str]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, wrap action, number, and reason as one class, and move it to autoscaling_actor_pool.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Planning to take this up in a follow-up one

Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
@richardliaw richardliaw merged commit 05f3517 into ray-project:master May 19, 2025
5 checks passed
kenmcheng pushed a commit to kenmcheng/ray that referenced this pull request May 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-backlog go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants