Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Adding streaming capability for ray.data.Dataset.unique #51207

Open
marcmk6 opened this issue Mar 10, 2025 · 3 comments
Open

[Data] Adding streaming capability for ray.data.Dataset.unique #51207

marcmk6 opened this issue Mar 10, 2025 · 3 comments
Labels
data Ray Data-related issues enhancement Request for new feature and/or capability triage Needs triage (eg: priority, bug/not-bug, and owning component)

Comments

@marcmk6
Copy link

marcmk6 commented Mar 10, 2025

Description

The current doc indicates that ray.data.Dataset.unique is a blocking operation: This operation requires all inputs to be materialized in object store for it to execute..
But I presume, conceptually, it's possible to implement a streaming one: keeps a record of "seen" values and drops entry when its value is in the "seen" collection

Use case

A streaming unique function will be very useful when the amount of data is too large to be materialized.

@marcmk6 marcmk6 added enhancement Request for new feature and/or capability triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Mar 10, 2025
@jcotant1 jcotant1 added the data Ray Data-related issues label Mar 10, 2025
@wingkitlee0
Copy link
Contributor

This operation requires all inputs to be materialized in object store for it to execute..

I believe the wording can be clearer. It does not requires all the inputs to be materialized at the same time. The current wording actually applies to the doc of other global aggregations.

keeps a record of "seen" values and drops entry when its value is in the "seen" collection

It is doing that right now, in parallel.

streaming

A true streaming op will give you partial results before everything is done. I don't think it's easy to do that, without some internal partitioning (?)

@marcmk6
Copy link
Author

marcmk6 commented Mar 11, 2025

It does not requires all the inputs to be materialized at the same time.

Could you elaborate? You meant the current implementation effectively materialize all the entries of unique results? (instead of materializing every single entries in the input dataset in which there are duplicates (before applying unique).)
I think this still can be very costly if the dataset is large.

The current implementation, although being parallel underneath, is still blocking: none of the data entries will go into next stage before the unique operation is done for the input dataset. Is this right?

I believe filter is a streaming operation so I presume the ideal way of working for unique is like "filtering/dropping seen entries in a streaming way".

@wingkitlee0
Copy link
Contributor

Sorry I may be thinking that is something not turned on by default: https://docs.ray.io/en/latest/data/shuffling-data.html#enabling-push-based-shuffle
All aggregate functions are doing partial aggregation first (say per block) in parallel, then they are aggregated the final result.

is still blocking: none of the data entries will go into next stage before the unique operation is done for the input dataset.

Correct. However, the current API returns a list (to the head node). So it is blocking by design (as an all-to-all operation).

I think your suggestion is a parallel version of the following (which may not be easily parallelize-able without shuffle?):

seen = set()
for x in data:
  if x not in seen:
    seen.add(x)
return list(seen)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data Ray Data-related issues enhancement Request for new feature and/or capability triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

No branches or pull requests

3 participants