-
Notifications
You must be signed in to change notification settings - Fork 6.3k
[Data] Fix ActorPool scaling to avoid scaling down when the input queue is empty #53009
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Fix ActorPool scaling to avoid scaling down when the input queue is empty #53009
Conversation
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Rebased `get_actor_info` to use it; Tidying up Signed-off-by: Alexey Kudinkin <[email protected]>
Removed unnecessary methods; Signed-off-by: Alexey Kudinkin <[email protected]>
Tidying up Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
…are dispatched Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Tidying up; Signed-off-by: Alexey Kudinkin <[email protected]>
if op.completed() or (op_state.total_enqueued_input_bundles() == 0): | ||
return False | ||
if op.completed() or ( | ||
op._inputs_complete and op_state.total_enqueued_input_bundles() == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bveeramani this
Signed-off-by: Alexey Kudinkin <[email protected]>
python/ray/data/_internal/execution/autoscaler/default_autoscaler.py
Outdated
Show resolved
Hide resolved
util = self._calculate_actor_pool_util(actor_pool) | ||
return util < self._actor_pool_scaling_down_threshold | ||
if util >= self._actor_pool_scaling_up_threshold: | ||
return _AutoscalingAction.SCALE_UP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current implementation, we don't scale up if the previous scale up hasn't finished. I think this diff might remove that behavior?
https://github.com/anyscale/rayturbo/blob/e156a9f9424ec1e499e93892d1a33d4bfb8599a1/python/ray/anyscale/data/autoscaler/anyscale_autoscaler.py#L357-L360
@dataclass | ||
class _ActorInfo: | ||
"""Breakdown of the state of the actors used by the ``PhysicalOperator``""" | ||
|
||
running: int | ||
pending: int | ||
restarting: int | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I felt kinda confused by these name. _ActorInfo
makes me think a single actor rather than all of the actors for an operator, and running
etc. seemed like bools rather than counts.
Maybe something like ActorStateCounts
or OperatorActorStats
and num_running
etc. might be clearer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was calling it OpActorInfo initially, but it's now used inside the ActorPool. Will rename it to _ActorPoolInfo
logger.info( | ||
f"Scaling up actor pool by {num_actors} (" | ||
f"running={self.num_running_actors()}, " | ||
f"pending={self.num_pending_actors()}, " | ||
f"restarting={self.num_restarting_actors()})" | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We autoscale actors as much as twice a second, so I'm worried these info logs might get spammy. Would debug make sense instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There actually should be no bouncing back and forth in autoscaler (it just doesn't really make sense)
- AP has high utilization (actor-wise), but - Op is throttled, or - There are still free slots available Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
) | ||
elif util <= self._actor_pool_scaling_down_threshold: | ||
if not actor_pool.can_scale_down(): | ||
return _AutoscalingAction.NO_OP, "debounced" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, the method name can_scale_down
doesn't suggest it's because of debouncing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
@@ -760,20 +798,20 @@ def num_free_slots(self) -> int: | |||
for running_actor in self._running_actors.values() | |||
) | |||
|
|||
def _kill_inactive_actor(self) -> bool: | |||
def _release_inactive_actor(self) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, I prefer using the word "remove".
Because both kill
and release
are implementation details that can change in the future.
also, please update the comments as well
return ( | ||
f"{base} (running={info.running}, restarting={info.restarting}, " | ||
f"pending={info.pending})" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implement this as _ActorInfo.__str__
.
So the logs in scale_up/scale_down
can use it as well
@dataclass | ||
class _ActorInfo: | ||
"""Breakdown of the state of the actors used by the ``PhysicalOperator``""" | ||
|
||
running: int | ||
pending: int | ||
restarting: int | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -574,18 +577,53 @@ def current_in_flight_tasks(self) -> int: | |||
for actor_state in self._running_actors.values() | |||
) | |||
|
|||
def scale_up(self, num_actors: int) -> int: | |||
def can_scale_down(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not putting the debouncing logic in the autoscaler?
It's also part of the autoscaling policy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went back and forth on it decided to keep it in AP, since it owns the logic of actually scaling up and tracking most recent actions
|
||
def scale_up(self, num_actors: int, *, reason: Optional[str] = None) -> int: | ||
logger.info( | ||
f"Scaling up actor pool by {num_actors} (" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f"Scaling up actor pool by {num_actors} (" | |
f"Scaling up actor pool by {num_actors}, current actor counts: (" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also includes reason so that needs to be more generic
self, | ||
actor_pool: AutoscalingActorPool, | ||
op: "PhysicalOperator", | ||
op_state: "OpState", | ||
): | ||
) -> Tuple[_AutoscalingAction, Optional[str]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, wrap action, number, and reason as one class, and move it to autoscaling_actor_pool.py
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Planning to take this up in a follow-up one
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Why are these changes needed?
This is a follow-up for a #52806 that inadvertently modified autoscaling condition allowing downscaling to happen before all inputs have completed.
Changes
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.