Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do we want something like this? #1

Open
tkf opened this issue Apr 12, 2022 · 13 comments
Open

Do we want something like this? #1

tkf opened this issue Apr 12, 2022 · 13 comments

Comments

@tkf
Copy link
Owner

tkf commented Apr 12, 2022

This package is a POC implementation of a package that could support a part of usages like JuliaData/DataFrames.jl#3030. APIs can be found here:

The rough idea is that libraries that spawn tasks wrap the "fork-join region" in within_parallel and libraries that provide parallelizable algorithms check if they should fallback to serial (or less parallel) algorithm based on what is_parallelized() returns.

It may be better to integrate this into Julia itself (e.g., when we have JuliaLang/julia#35833) and I also have an idea to use JuliaLang/julia#39773 for providing this information more efficiently. However, until we get there, I think it's a reasonably simple package to have it around and use it right away.

Do people want something like this package? Is the design/direction good (enough)?

Once we decide on the rough design idea, I think it's better to move this package to JuliaParallel organization.

cc @jpsamaroo @vchuravy @bkamins @nalimilan

@nalimilan
Copy link

Thanks for starting this! I wonder whether this API gives enough information though. For DataFrames at least, knowing that we're in a context that is already parallelized isn't enough to decide whether we should spawn new tasks or not. In many places, spawning tasks is cheap so we probably always want to do it (except for thread-safety, which is out of scope here IIUC). In other places, we need to know whether all CPUs are likely to be busy or not, which could be approximated by knowing the total number of parallel tasks. Shouldn't the API provide this information? It could also allow choosing a reasonable basesize.

@tkf
Copy link
Owner Author

tkf commented Apr 12, 2022

In other places, we need to know whether all CPUs are likely to be busy or not, which could be approximated by knowing the total number of parallel tasks.

This is a very dangerous route that I will avoid. The number of active worker threads is a non-deterministic quantity. Packages shouldn't depend on it.

First, if package authors depend on it, I suspect that we will have an ecosystem that cannot compute deterministic results. Given that Julia users have a wide range of concurrent programming skills, I will keep advocating that packages should compute deterministic results by default for debuggability and reproducibility. (Of course, I'm not saying it's impossible to build a robust function that computes identical results given such non-determinism.) The proposed API is designed in such a way that the package authors don't have to worry about this much.

Second, the number of active worker threads that the current worker thread happened to observe at the moment your API is called is not a very useful quantity. It is not hard to imagine a program where all worker threads are initially busy but then a few milliseconds later all other tasks started concurrently end (and this happens consistently). Imagine that the invocation of your API takes 10 seconds. It's clear that deciding the number of tasks to use at the beginning of the API invocation is a bad idea.

Lastly, it is important that packages don't become the scheduler 1. Instead, packages should try to expose a parallel task structure that maximizes what the scheduler can do... in principle. However, we need to take a nuanced approach because the task overhead is relatively high. This is why it may make sense to simplify the "inner" parallel task structure somtimes. It is kind of like some Julia programs pre-allocate and manually reuses memory to help GC. As explained in Extended help of is_parallelized some parallel algorithms (e.g., prefix sum) have a clear reason to avoid parallelism when used inside other tasks.

All that said, I think it is reasonable to expose a "parallel region depth" as in

@contextvar DEPTH::Int = 0
within_parallel(f::F) where {F} = with_context(f, DEPTH => DEPTH[] + 1)

parallel_depth() = DEPTH[]
is_parallelized() = DEPTH[] != 0

If packages use a divide-and-conquer approach, you can guess that there are roughly 2^parallel_depth() tasks. We could also let the within_parallel caller specify the number of parallel "tasks" and provide an API for returning a stack of specified numbers of tasks in the parent parallel regions. It gives you some more information although that is rather hard to interpret.

Footnotes

  1. I'm aware there are interesting algorithms where managing scheduling details in a clever way is useful. However, a large class of parallel programs can simply be expressed using the fork-join model very efficiently. So, I suggest focusing on a relatively "easy" problem here.

@nalimilan
Copy link

I didn't mean we should track the number of active threads, but rather than each call to within_parallel could specify (optionally) into roughly how many independent pieces work has been split. For a divide-and-conquer approach this would indeed be 2, but more generally it could be a very different number. That seems strictly better than assuming 2^parallel_depth() in all cases. For example in DataFrames we start one task per operation, but also one task for each chunk of groups, etc. Imagine you're on a machine with 128 cores and a function spawns two tasks, each of which calls DataFrames operations. We certainly don't want to disable multithreading because is_parallelized() returns true, or limit the number of tasks to 2^1 because that's what parallel_depth() returns.

That said, I agree this is a very tricky issue. As soon as the overhead due to multithreading is significant, it's very hard to decide whether using multiple tasks is a good idea or not (we haven't been able to merge JuliaData/DataFrames.jl#2491 yet because we couldn't figure out reliable conditions under which multithreading is faster than single-threading).

@tkf
Copy link
Owner Author

tkf commented Apr 13, 2022

So, just to make it very concrete, is possible to implement:

struct TaskSplitInfo
    ntasks::Union{Int,Nothing}
    parent::Union{TaskSplitInfo,Nothing}
end

Base.iterate(info::TaskSplitInfo) = (info, info)
function Base.iterate(::TaskSplitInfo, pre::TaskSplitInfo)
    parent = pre.parent
    parent === nothing ? nothing : (parent, parent)
end
# (... omitting other mandatory iterator interfaces ...)

@contextvar TASKSPLIT::Union{TaskSplitInfo,Nothing} = 0

# APIs:
within_parallel(f::F, ntasks = nothing) where {F} =
    with_context(f, TASKSPLIT => TaskSplitInfo(ntasks, TASKSPLIT[]))

taskplitinfo() = TASKSPLIT[]
parallel_depth() = cont(Returns(true), TASKSPLIT[])
is_parallelized() = TASKSPLIT[] !== nothing

However, it has several issues:

  1. Fundamentally, you can't really assume prod(info -> something(info.ntasks, 2), taskplitinfo()) to be a lower bound of the existing tasks since there's no guarantee that task spawn tree is balanced.

  2. A slightly minor issue is that this requires a context variable. However, ContextVariablesX.jl does not work well with a custom logger. OTOH, the atomics-based implementation (which is the default) works with the very minimalistic API currently implemented here.

Given these two issues, I designed a very minimalistic API as suggested in the OP.

@bkamins
Copy link

bkamins commented Apr 13, 2022

I think it is up to @jpsamaroo to say what is needed.
DataFrames.jl is a consumer of this API but Dagger.jl should specify what is needed for it to work efficiently.

@jpsamaroo
Copy link

So, based on what strategy Dagger thinks will be most efficient, Dagger might choose any amount of parallelism from 1 task up to infinity (in the (rare) event that all tasks are expected to immediately yield and otherwise be very cheap, we might just schedule all of them at once). Based on this knowledge, and an assumption that DataFrames (among other libraries) is looking to tune its task splitting based on what we tell it, we should be able to indicate how much parallelism should be exposed per Dagger-submitted task.

For example, if we have 128 threads on a worker, and Dagger has 16 total tasks (each calling into some DataFrames function) that it can schedule concurrently on this worker, Dagger would like to inform DataFrames to use about 8 concurrent tasks per DataFrames top-level invocation. So Dagger can be pretty exact about how many tasks it wants to see spawned.

If this package can provide an interface like that, I'd be happy to integrate it and start teaching the scheduler to "explore" what number of Dagger tasks is optimal for the workload (i.e. minimizes total runtime).

Relatedly: can we have a mechanism for Dagger to query DataFrames and ask how much parallelism a given operation (potentially for a given set of inputs) could expose? This would let us bound our exploration to some max amount of parallelism so that we don't spend too much time exploring solutions which undersubscribe the system. This is, of course, not strictly necessary, but it might prove useful.

@bkamins
Copy link

bkamins commented Apr 13, 2022

can we have a mechanism for Dagger to query DataFrames and ask how much parallelism a given operation (potentially for a given set of inputs) could expose?

Here the problem is Amdahl's law :). That is the operations that can be parallelized are only a part of execution schedule and it is hard in general to say what portion of load can be parallelized. The reason is that it would depend on how expensive a user supplied operation that is parallelized is (which cannot be checked).

In short - at least currently we cannot reliably provide such information unfortunately.

@bkamins
Copy link

bkamins commented Apr 13, 2022

The second limitation is that, at least DataFrames.jl, currently can do all-or-nothing approach. That is it uses standard Julia @spawn macro if you say you allow multi-threading or just does just @async (simplifying the story a bit) if you do not allow it. So it is on Julia side how many threads are actually used. DataFrames.jl does not have control over this.

@tkf
Copy link
Owner Author

tkf commented Apr 13, 2022

So Dagger can be pretty exact about how many tasks it wants to see spawned.

can we have a mechanism for Dagger to query DataFrames and ask how much parallelism a given operation (potentially for a given set of inputs) could expose?

I think this mindset made sense when Dagger had explicit DAG. However, it does not seem to be compatible with that Dagger is now gearing towards a more "dynamic" task model (which is my understanding of the eager thunk) that is closer to Julia's task system, and the DAG is implicitly defined. In such a system, it is important to design the scheduler given that workload of each task and the structure of the DAG are unknowable before the execution.

@nalimilan
Copy link

Querying DataFrames to know how many tasks could we spawned indeed sounds relatively difficult. Maybe for now we can focus on the first issue, i.e. having Dagger indicate to DataFrames (and others) the number of desired tasks?

@jpsamaroo
Copy link

I think this mindset made sense when Dagger had explicit DAG. However, it does not seem to be compatible with that Dagger is now gearing towards a more "dynamic" task model (which is my understanding of the eager thunk) that is closer to Julia's task system, and the DAG is implicitly defined.

The scheduler still has full knowledge of all submitted tasks and their dependencies (the DAG is just more lazily elaborated). While we may not be able to apply an optimization to the "full" DAG at any one time, we might be able to do it for a portion of the DAG that's already submitted.

Maybe for now we can focus on the first issue, i.e. having Dagger indicate to DataFrames (and others) the number of desired tasks?

Agreed, that is the most important part. If it's going to be all-or-nothing parallelism for now, then that's fine, I'll just need to figure out how to communicate that to Dagger.

@bkamins
Copy link

bkamins commented Apr 13, 2022

As an additional comment on DataFrames.jl side. I believe that all or nothing approach is enough because we already make sure not to spawn very small tasks as then cost of spawning might be bigger than the benefit. If the operation is very cheap we will not execute it in multithreaded mode anyway even if we are allowed to.

Therefore, the only benefit from passing exact number of allowed threads would be if the scheduler could exactly compute the cost of given operation (e.g. it knows it has two tasks 1 and 2 of which both parallelize well and are followed by task 3 (i.e. it can run only if both 1 and 2 are finished). And task 1 is 2x cheaper which means that it cen get 2x less CPUs - but it seems that such detailed information is not likely to be available to the scheduler - but maybe I am wrong here).

@tkf
Copy link
Owner Author

tkf commented Apr 14, 2022

Maybe for now we can focus on the first issue, i.e. having Dagger indicate to DataFrames (and others) the number of desired tasks?

Agreed, that is the most important part.

As I explained above, this has undesirable properties like unpredictability in computation results and performance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants