Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is possible to restrict number of threads? #2988

Closed
1991jhf opened this issue Jan 12, 2022 · 21 comments · Fixed by #3030
Closed

Is possible to restrict number of threads? #2988

1991jhf opened this issue Jan 12, 2022 · 21 comments · Fixed by #3030
Labels
Milestone

Comments

@1991jhf
Copy link

1991jhf commented Jan 12, 2022

Is possible to restrict number of threads for groupby->transform/combine?
I have some function i want to use is not thread safe however I do not want to run entire julia session on single thread.

@bkamins bkamins added this to the 1.4 milestone Jan 12, 2022
@bkamins
Copy link
Member

bkamins commented Jan 12, 2022

Currently not. However, we probably should add this feature.

CC @nalimilan

@nalimilan
Copy link
Member

We had discussed this when we added multithreading support. There are two options:

  • add a keyword arguments to all functions that support multithreading
  • have a global variable setting the number of threads for all DataFrames operations

We could also have both.

Something which might be useful is to allow not only to choose between single-threading and multi-threading, but also to set the number of threads, as sometimes using all threads isn't faster. This is sometimes easy to implement, but sometimes more difficult. It doesn't have to be supported immediately but it's important to keep in mind when choosing the name of the keyword argument (so that it can take either a Boolean or an integer).

@bkamins
Copy link
Member

bkamins commented Feb 21, 2022

@nalimilan - I understand that the plan would be to:

  • ntasks argument in select, combine, etc. (name selected for consistency with CSV.jl); by default the argument would take the Threads.nthreads() value, but it could be also changed to ntasks=1 to avoid using multi-threading; these two values for now would be the only supported values;
  • We then would implement Support multithreading in groupreduce #2491 also.

@nalimilan
Copy link
Member

Maybe we should use the same strategy as Arrow.jl:

  • ntasks::Int: number of concurrent threaded tasks to allow while writing input partitions out as arrow record batches; default is no limit; to disable multithreaded writing, pass ntasks=1

https://github.com/apache/arrow-julia/blob/52bfe1f72fa72b0b2d150efeeeb9edacd853f96f/src/write.jl#L49

The default is Inf (could also be typemax(Int)). This better reflects what we do as we spawn as many tasks as we can, independent of the number of threads.

@bkamins
Copy link
Member

bkamins commented Feb 23, 2022

OK - but we should add a comment that currently only 1 or "unlimited" is supported.

@bkamins
Copy link
Member

bkamins commented Mar 9, 2022

Following #3019 the key decision to make is if we:

  • want to be able to disable multi-threading by a global flag
  • or, we want to be able to disable it only in split-apply-combine functions, but we are fine with it in joins and indexing

@haberdashPI
Copy link

For my use cases I primarily need it in the latter case (split-apply-combine) where a user function can be called that might require some debugging (e.g. via Infiltrator). I think a global flag is also very nice, but less important to me.

@bkamins
Copy link
Member

bkamins commented Mar 11, 2022

After several discussions I am now leaning towards a global switch rather than argument passed to functions. In particular it will be then easier to handle with DTable.

What @jpsamaroo suggested is that we could consider using https://github.com/tkf/ContextVariablesX.jl instead of a global switch.

@tkf the question is if you consider ContextVariablesX.jl stable/mature enough to be used (and what would be a recommended pattern here). What we need is a switch that will enable/disable using multi threading in DataFrames.jl operations.

CC @krynju

@tkf
Copy link
Contributor

tkf commented Mar 12, 2022

if you consider ContextVariablesX.jl stable/mature enough

ContextVariablesX.jl is a hack and "X" here means "eXperimental" (not, e.g., "eXtended"). It uses logger as a "payload." It means that with_logger can reset the context variable.

julia> @contextvar CTX = 0
Main.CTX :: ContextVar [3eaa8739-b516-401d-aa2a-f5dd85d0f89c] => 0

julia> with_context(CTX => 1) do
           @show CTX[]
           with_logger(NullLogger()) do
               @show CTX[]
           end
           @show CTX[]
       end;
CTX[] = 1
CTX[] = 0
CTX[] = 1

That said, I don't think people call with_logger frequently and so this mechanism may be OK for your use case. But we need something like JuliaLang/julia#35833 for "true" dynamically scoped configuration.

Maybe we should use the same strategy as Arrow.jl:

  • ntasks::Int: number of concurrent threaded tasks to allow while writing input partitions out as arrow record batches; default is no limit; to disable multithreaded writing, pass ntasks=1

FWIW, I don't think this pattern is great since it does not generalize over the amount of data to be processed. I think a much more composable pattern is to specify the size of per-task problem size (what is called basesize in JuliaFolds). Note that it automatically disables parallel execution for a sufficiently small problems.

@nalimilan
Copy link
Member

After several discussions I am now leaning towards a global switch rather than argument passed to functions. In particular it will be then easier to handle with DTable.

@bkamins What is needed exactly with DTable?

A global variable doesn't seem like the best API if it's intended to be used when passing a function that isn't thread safe (you'd have to set it before and unset it after an operation). Though we could easily have both a global variable, and use its value as the default for the argument that relevant functions would take.

FWIW, I don't think this pattern is great since it does not generalize over the amount of data to be processed. I think a much more composable pattern is to specify the size of per-task problem size (what is called basesize in JuliaFolds). Note that it automatically disables parallel execution for a sufficiently small problems.

@tkf Actually we already have this kind of mechanism for some operations internally. But the logic to choose the number of tasks is relatively complex so I'm not sure we can easily expose this to users. For example, for grouped operations, we spawn one task per operation (this is cheap so it's probably always worth it), and then one task per thread (as having one task per group would have a large overhead). We could allow specifying the number of groups per task, but the total number of rows can also be a relevant criterion depending on operations.

For now, the idea is mainly to have a way to disable multithreading as that's the most common need.

@bkamins
Copy link
Member

bkamins commented Mar 19, 2022

What is needed exactly with DTable?

Here @krynju should confirm this, but to my understanding the requirement is that the call of some operation should not have to specify how many cores should be used.

Instead there should be some separate mechanism that would allow the scheduler of the calls decide if multithreading should be allowed or not (and it is OK to have just two options - do not do threading or use maximum number of threads available).

I hope it is clear, but let me try to restate it (so that @krynju can confirm):
There is a low level scheduler of operations (Dagger) and high-level processor of tabular operations (DTable). It is DTable that calls DataFrames.jl functions. But DTable does not decide how to run things - it only decides what to run and then passes the tasks to the Dagger scheduler. On the other hand Dagger scheduler knows how many threads it wants to allow to be used by some operation, but it does not understand tabular API. Therefore Dagger should have a way to pass this information in a generic way. That is why @jpsamaroo suggested using contexts as it is exactly kind of metadata that is independent from what is run.


Having said that I think that the proposal of @nalimilan is OK. Essentially it says (as a temporary solution until context lands in Julia which can take some time):

  1. we have a global atomic state of how many threads should be used by default.
  2. when DataFrames.jl is loaded this state is set to use all available threads.
  3. There is an API that allows the user to change this default state
  4. additionally some functions still will allow explicit passing of this value, without changing the global state (i.e. if we want to temporarily opt-out from using the default without changing it)

I think it is an OK approach:

  • a generic way to change number of threads would be to change global state.
  • however, a more fine-grained control is possible per function call for functions that allow it.

@nalimilan - have I understood what you propose correctly?

@tkf
Copy link
Contributor

tkf commented Mar 20, 2022

For now, the idea is mainly to have a way to disable multithreading as that's the most common need.

I think that's a nice idea to have a high-level hinting mechanism. If that's the intention, I think it'd be great if it were clear at the API level. For this, it'd be nice if the API talks about the idea and not the implementation details (e.g., number of actual Tasks used internally).

@nalimilan
Copy link
Member

OK. So DTable would need the global state to affect all multithreading, including cases where we know it's thread-safe and has very low overhead so that creating as many tasks as possible is most likely a good idea (e.g. when copying columns)? Having that state disable multithreading altogether it easy to implement, but allowing it to determine the number of tasks would require adjusting the code, making it more complex in several places. This may not be a great idea as @tkf notes. Ideally we would start as many tasks as we can and the scheduler would choose how many threads to use to run them.

@bkamins
Copy link
Member

bkamins commented Mar 20, 2022

The point is that Dagger.jl scheduler would want to opt-out of mulit-threading not because it is unsafe, but because it knows it does other operations in parallel that DTable does not even have to be aware of.

@nalimilan
Copy link
Member

Yeah, that's easy to support. But does it also need to specify that DataFrames should use e.g. only 2 threads?

@bkamins
Copy link
Member

bkamins commented Mar 20, 2022

No - as far as @jpsamaroo explained he thinks that for Dagger.jl it is enough to have a binary switch - either do not use threading or use as many threads as are available.

@nalimilan
Copy link
Member

See #3030 for an implementation of the global flag to disable multithreading. As a second step we will be able to add keyword arguments to some functions whose value would default to that of the global flag if we want.

@bkamins
Copy link
Member

bkamins commented Mar 27, 2022

Thank you for working on this!

@jpsamaroo
Copy link

My hope is that the DTable can be agnostic to how underlying libraries like DataFrames choose to parallelize internal operations, but that Dagger itself can either:

  1. Ensure that DataFrames knows that it is being executed alongside other tasks concurrently (which may or may not be table operations)
  2. Propagate some information set by the user to internal DataFrames operations to indicate that they shouldn't use parallelism

Really, we need some shared API that DataFrames and Dagger can use to indicate when one parallelizable operation is nested within another, enough granularity to communicate how much parallelism is exposed (e.g. single-thread vs. all-threads), and a way to select the desired level of parallelism in a way that doesn't impose burden on user-facing APIs. My hope is that we could communicate this information via context variables, since that prevents burdening user-facing APIs, and the parallelism level can be "magically" propagated through task spawns by Julia and across workers by Dagger.

@bkamins
Copy link
Member

bkamins commented Mar 27, 2022

My hope is that we could communicate this information via context variables, since that prevents burdening user-facing APIs

Agreed, but this can be achieved only in long term as far as I understand, and we need some temporary solution till this is possible.

Currently In #3030 @nalimilan proposed a global variable - as discussed above. I think it is a reasonable temporary choice.
I understand that at the moment we will also have context the hierarchy will be: use context, and if not set use global variable.
Also I think that even in the long run global variable is easier to work with for less advanced users than setting context.

Does it make sense to you?

@jpsamaroo
Copy link

Currently In #3030 @nalimilan proposed a global variable - as discussed above. I think it is a reasonable temporary choice.
I understand that at the moment we will also have context the hierarchy will be: use context, and if not set use global variable.
Also I think that even in the long run global variable is easier to work with for less advanced users than setting context.

👍 to this! I'm just pointing out what, in my mind, is a more ideal future API, but a single global flag would definitely suffice (for most users) for now!

@bkamins bkamins linked a pull request Apr 9, 2022 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants