-
-
Notifications
You must be signed in to change notification settings - Fork 717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Task class instead of tuple #8797
Conversation
distributed/client.py
Outdated
from dask.task_spec import Task | ||
|
||
dsk.update( | ||
{ | ||
key: (apply, self.func, (tuple, list(args)), kwargs2) | ||
key: Task(key, self.func, args, kwargs2) | ||
# (apply, self.func, (tuple, list(args)), kwargs2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't necessary but an example of how this migration would look like
distributed/scheduler.py
Outdated
prefix_name = ts.prefix.name | ||
count = self.task_prefix_count[prefix_name] - 1 | ||
tp_count = self.task_prefix_count | ||
tp_count_global = self.scheduler._task_prefix_count_global | ||
if count: | ||
self.task_prefix_count[ts.prefix.name] = count | ||
tp_count[prefix_name] = count | ||
else: | ||
del self.task_prefix_count[ts.prefix.name] | ||
del tp_count[prefix_name] | ||
|
||
count = self.scheduler._task_prefix_count_global[ts.prefix.name] - 1 | ||
count = tp_count_global[prefix_name] - 1 | ||
if count: | ||
self.scheduler._task_prefix_count_global[ts.prefix.name] = count | ||
tp_count_global[prefix_name] = count | ||
else: | ||
del self.scheduler._task_prefix_count_global[ts.prefix.name] | ||
del tp_count_global[prefix_name] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an unrelated perf fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious, how noticeable is this? Apart from that, let's move this to a separate PR to keep this focused on the major change you introduce here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
distributed/scheduler.py
Outdated
dsk = convert_old_style_dsk(dsk) | ||
# TODO: This isn't working yet as expected | ||
dependencies = dict(DependenciesMapping(dsk)) | ||
|
||
return dsk, dependencies, annotations_by_type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most/all of this complexity is now either gone entirely or hidden in the class
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 25 files ± 0 25 suites ±0 10h 21m 22s ⏱️ + 1m 27s For more details on these failures, see this check. Results for commit 65ed5d5. ± Comparison against base commit 48509b3. This pull request removes 11 and adds 2 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may be implications for some of the dashboard components, the "pew pew pew" plot comes to mind. I see this is still a draft, let me know when it's in a reviewable state and I'll look over the dashboard code to see if anything needs changing there 🙂.
I'd actually be surprised if that was affected since we don't change the scheduler internal metadata (like dependencies, transfers, where the tasks are executed...). But who knows. I'll probably stumble over 50 small weird things trying to get CI green :) |
99a2ec5
to
bb0324a
Compare
b01c705
to
bb7d38e
Compare
507fcb6
to
45b42a5
Compare
eb30895
to
2ecaa5c
Compare
distributed/scheduler.py
Outdated
task_state_created = time() | ||
metrics.update( | ||
{ | ||
"start": start, | ||
"duration_materialization": materialization_done - start, | ||
"duration_ordering": materialization_done - ordering_done, | ||
"duration_state_initialization": ordering_done - task_state_created, | ||
"duration_total": task_state_created - start, | ||
} | ||
) | ||
evt_msg = { | ||
"action": "update-graph", | ||
"stimulus_id": stimulus_id, | ||
"metrics": metrics, | ||
"status": "OK", | ||
} | ||
self.log_event(["all", client, "update-graph"], evt_msg) | ||
logger.debug("Task state created. %i new tasks", len(self.tasks) - before) | ||
except Exception as e: | ||
evt_msg = { | ||
"action": "update-graph", | ||
"stimulus_id": stimulus_id, | ||
"status": "error", | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an unrelated change but it shouldn't be too disruptive for the review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be useful to log (partial) metrics on exception? Also, should we add the exception to the log event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is very useful and I don't want to add the exception to this. This is primarily supposed to be a stream of metrics and I don't like adding Exception objects or large strings to it. I also find the logging and proper exception handling mechanism should be sufficient.
If this is a contentious topic I will remove this change from the PR
distributed/scheduler.py
Outdated
logger.debug("Materialization done. Got %i tasks.", len(dsk)) | ||
dependents = reverse_dict(dependencies) | ||
dsk = resolve_aliases(dsk, keys, dependents) | ||
dependencies = dict(DependenciesMapping(dsk)) | ||
logger.debug("Removing aliases. %i left", len(dsk)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is new... it removes all those
"key": "other-key"
references that we currently schedule as real tasks. Particularly fusion adds these kinds of redirects. I should probably factor this out to a dedicated PR. The impact can be quite substantial in graph size reduction.
919ff4f
to
9974bb0
Compare
# FIXME: There should be no need to fully materialize and copy this but some | ||
# sections in the scheduler are mutating it. | ||
dependencies = {k: set(v) for k, v in DependenciesMapping(dsk3).items()} | ||
return dsk3, dependencies, annotations_by_type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #8842
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend reviewers to start with the dask/dask PR.
This PR does not include (m)any intentional changes other than adjusting the code to the new spec. There are one or two things that change behavior, I flagged them explicitly
prefix = ts.prefix | ||
duration: float = prefix.duration_average | ||
if duration >= 0: | ||
return duration | ||
|
||
s = self.unknown_durations.get(ts.prefix.name) | ||
s = self.unknown_durations.get(prefix.name) | ||
if s is None: | ||
self.unknown_durations[ts.prefix.name] = s = set() | ||
self.unknown_durations[prefix.name] = s = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unrelated (but I won't create another PR for these three lines)
processes=False, | ||
asynchronous=True, | ||
scheduler_sync_interval="1ms", | ||
dashboard_address=":0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these dashboard changes are also unrelated. Appologies. If it actually helps I will factor it out but those tests are typically disjoint from actual changes so I hope the review process is not too difficult
This change allows the tests to run in parallel
# This is removing weird references like "x-foo": "foo" which often make up | ||
# a substantial part of the graph | ||
# This also performs culling! | ||
dsk3 = resolve_aliases(dsk2, keys, dependents) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an actual change in behavior! This will reduce graphs sizes substantially for graphs that went through linear fusion
distributed/tests/test_client.py
Outdated
@@ -8402,6 +8237,9 @@ async def test_release_persisted_collection(c, s, a, b): | |||
await c.compute(arr) | |||
|
|||
|
|||
@pytest.mark.skip( | |||
reason="Deadlocks likely related to future serialization and ref counting" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add an issue for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that should've been fixed by #8827
I'll remove the skip
distributed/scheduler.py
Outdated
metrics.update( | ||
{ | ||
"start": start, | ||
"duration_materialization": materialization_done - start, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I suggest that we start following Prometheus variable naming conventions here to make our lives easier in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you suggest the appropriate names? I'm not sure what the correct way is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"duration_materialization": materialization_done - start, | |
"materialization_duration_seconds": materialization_done - start, |
distributed/scheduler.py
Outdated
task_state_created = time() | ||
metrics.update( | ||
{ | ||
"start": start, | ||
"duration_materialization": materialization_done - start, | ||
"duration_ordering": materialization_done - ordering_done, | ||
"duration_state_initialization": ordering_done - task_state_created, | ||
"duration_total": task_state_created - start, | ||
} | ||
) | ||
evt_msg = { | ||
"action": "update-graph", | ||
"stimulus_id": stimulus_id, | ||
"metrics": metrics, | ||
"status": "OK", | ||
} | ||
self.log_event(["all", client, "update-graph"], evt_msg) | ||
logger.debug("Task state created. %i new tasks", len(self.tasks) - before) | ||
except Exception as e: | ||
evt_msg = { | ||
"action": "update-graph", | ||
"stimulus_id": stimulus_id, | ||
"status": "error", | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be useful to log (partial) metrics on exception? Also, should we add the exception to the log event?
distributed/scheduler.py
Outdated
"stimulus_id": stimulus_id, | ||
"status": "error", | ||
} | ||
self.log_event(["all", client, "update-graph"], evt_msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a particular reason you prefer a dedicated update-graph
topic instead of something like a scheduler
topic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No particular reason. I used this in a test but I will change it. I will also drop the all
topic (feels a bit like an anti pattern)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that dropping the all
pattern is a user-facing breaking change. That being said, I'm all for redesigning our topics, etc., this might just require some changes for downstream users, e.g., for Coiled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then again, we've also renamed the action, so these changes are breaking already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think this is breaking and I'm fine with it. This is overall a pretty burried feature and I doubt (m)any users will notice.
needs dask/dask#11429 |
Also this dask/dask#11431 |
The mindeps builds are sad. Everything else seems unrelated |
distributed/scheduler.py
Outdated
seen: set[Key] = set() | ||
sadd = seen.add | ||
for k in list(keys): | ||
work = {k} | ||
wpop = work.pop | ||
wupdate = work.update | ||
while work: | ||
d = wpop() | ||
if d in seen: | ||
continue | ||
sadd(d) | ||
if d not in dsk: | ||
if d not in self.tasks: | ||
lost_keys.add(d) | ||
lost_keys.add(k) | ||
logger.info("User asked for computation on lost data, %s", k) | ||
dependencies.pop(d, None) | ||
keys.discard(k) | ||
continue | ||
wupdate(dsk[d].dependencies) | ||
return lost_keys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rewrote this section. I had issues with it and just barely understand the old code (and ran into multiple bugs in the recent past)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code generally looks good to me. CI test results are confusing, it seems like they're out of sync with the actual test jobs?
I've added a bunch of suggestions for Prometheus-style metric naming within update_graph.
distributed/scheduler.py
Outdated
"new-tasks": len(new_tasks), | ||
"key-collisions": colliding_task_count, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prometheus naming prefers underscores over hyphens
"new-tasks": len(new_tasks), | |
"key-collisions": colliding_task_count, | |
"new_tasks": len(new_tasks), | |
"key_collisions": colliding_task_count, |
distributed/scheduler.py
Outdated
metrics.update( | ||
{ | ||
"start": start, | ||
"duration_materialization": materialization_done - start, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"duration_materialization": materialization_done - start, | |
"materialization_duration_seconds": materialization_done - start, |
distributed/scheduler.py
Outdated
"duration_ordering": materialization_done - ordering_done, | ||
"duration_state_initialization": ordering_done - task_state_created, | ||
"duration_total": task_state_created - start, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prometheus naming convention (roughly): (_total if accumulating)
"duration_ordering": materialization_done - ordering_done, | |
"duration_state_initialization": ordering_done - task_state_created, | |
"duration_total": task_state_created - start, | |
"ordering_duration_seconds": materialization_done - ordering_done, | |
"state_initialization_duration_seconds": ordering_done - task_state_created, | |
"duration_seconds": task_state_created - start, |
(I don't have a great suggestion for the e2e duration)
distributed/scheduler.py
Outdated
task_state_created = time() | ||
metrics.update( | ||
{ | ||
"start": start, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"start": start, | |
"start_timestamp_seconds": start, |
Update: |
I've been trying to reproduce but without any luck so far |
I think the test_merge failures are actually unrelated. The exception is
def validate_data(self, data: pd.DataFrame) -> None:
> if set(data.columns) != set(self.meta.columns):
E AttributeError: 'tuple' object has no attribute 'columns' which indicates that |
yeah, so the exception is pretty much what I expected if not isinstance(data, pd.DataFrame):
> raise TypeError(f"Expected {data=} to be a DataFrame, got {type(data)}.")
E TypeError: Expected data=('assign-3d7cfa7cea412465799bea6cfac1b512', 1) to be a DataFrame, got <class 'tuple'>. |
Ah, this is the build with dask-expr disabled. Now I can reproduce! |
shuffle_transfer, | ||
(self.name_input, i), | ||
TaskRef((self.name_input, i)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test_merge
tests w/ dask-expr enabled never take this code path. That's interesting but not incredibly surprising.
|
dask/dask#11445 is hopefully the last one |
Is there anything left to do here? |
This is an early version that will close dask/dask#9969
It introduces a new
Task
class (name is subject to change) and a couple of other related subclasses that should replace the tuple as a representation of runnable tasks.The benefits of this are outlined in dask/dask#9969 but are primarily focused to reduce overhead during serialization and parsing of results. An important result is also that we can trivially cache functions (and arguments if we wish) to avoid problems like #8767 where users are erroneously providing expensive to pickle functions (which also happens frequently in our own code and/or downstream projects like xarray)
This approach allows us to convert the legacy dsk graph to the new representation with full backwards compatibility. Old graphs can be migrated and new ones written directly using this new representation which will ultimately reduce overhead.
I will follow up with measurements shortly.
Sibling PR in dask dask/dask#11248