Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel computing not only for Distributor, but also for DataFrame-related preprocessing #728

Open
qinxuye opened this issue Jul 1, 2020 · 4 comments
Assignees

Comments

@qinxuye
Copy link

qinxuye commented Jul 1, 2020

Hi all, thank you for your excellent work around tsfresh, I am from the team that works on the Mars project which is a parallel and distributed framework to scale numpy, pandas, sklearn and python functions.

Our users are very interested in scaling tsfresh to a massive dataset. For now, IMHO, tsfresh provides a customize distributor to scale computation to a cluster, and we have implemented one in Mars project .

However, maybe it's a silly question, but I found that the functions like extract_features accept a pandas DataFrame and so forth that are bounded to the local memory, all the preprocess cannot be scaled to a cluster, It's a handicap that stops our users from leveraging tsfresh to process a massive data that is impossible to be held in the memory.

Thus I am wondering if we want to scale the preprocessing to the cluster, maybe leveraging Mars or other similar libraries, what actually shall we do to achieve the goal? IMHO, maybe we need to support multiple backends that can process DataFrame-like data in the tsfresh project, but we definitely need quite a few effort.

Thank you in advance if you could enlighten me and discuss more on the possible work in the upcoming future.

@nils-braun
Copy link
Collaborator

nils-braun commented Jul 4, 2020

Hello @qinxuye!
Thank you very much for opening this issue and thank you very much also for bringing in Mars into the discussion.
First of all, let me congratulate you for this very nice package. From the first looks I had, it looks quite well written and very well documented.

We are very happy to support you and the community with additional bindings. Users with "too much data for local memory" are currently using techniques, which are described e.g. in a recent blog post I created: basically the idea boils down to the fact, that tsfresh works on each time series independently.
In a recent commit, we introduced dask and pyspark bindings. They both work in the same way: preprocessing is handled by the user in dask/pyspark until the data is in a form, which can easily be grouped by id and time series kind (= dimension). Then, the feature extraction is added to the computational graph for each of these time series chunks and the distribution is handled by the respective framework.

I am not 100% sure if this can be copied directly, but it might be a good first try. Is there a way in Mars to groupby and then apply an arbitrary python function, which only gets the data of a single group (like e.g. dask's groupby(...).apply(...) would do)?

As far as I understood, the real power of Mars comes when building up the full operation graph, instead of relying on "black box" python functions (which would e.g. also allow it to run on GPUs). Of course, that would be a very rewarding goal. However, I am not sure if by any means all of our feature calculators can be rewritten in a way, to be usable with Mars and if that makes sense at all.

Happy to hear your thoughts and thanks again for bringing up this discussion item.

@nils-braun nils-braun self-assigned this Jul 4, 2020
@rpanai
Copy link

rpanai commented Aug 28, 2020

Sorry to jump in. I think that I have a similar issue (see this question). In my case I already have my data partitioned on a way that every file contains a fixed number of time series and every time series belongs to a file only. So it seem to me that in this case the problem should be embarrassingly parallel yet when I distribute it with dask I'm not able to use all the threads/cores on each worker. I even wrote a MultithreadingDistributor following the documentation but it didn't work.

@nils-braun
Copy link
Collaborator

nils-braun commented Aug 28, 2020

Thanks for the heads-up @rpanai (I would have missed your question otherwise). I answered directly in SO, but if your question was really tsfresh-centric (and not a more general one on dask), feel free to also discuss in the tsfresh repo!

Just for reference: the solution could be to not use tsfreshs multiprocessing capabilities at all but chunk up the work already for dask into the smallest pieces.

@rpanai
Copy link

rpanai commented Sep 3, 2020

@nils-braun I'm not sure if it's better to continue the discussion here, on another issue or on SO. Please let me know. Regarding the question I actually have it is tsfresh-centric but I think it will be interesting to generalize to multithreadings within dask worker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants