Skip to content

Commit

Permalink
Support multithreading in groupreduce
Browse files Browse the repository at this point in the history
Keep the default to a single thread until we find a reliable way of
predicting a reasonably optimal number of threads.
  • Loading branch information
nalimilan committed Oct 17, 2020
1 parent 8ea2edf commit bc4dc71
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 63 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ arch:
notifications:
email: false

env:
- JULIA_NUM_THREADS=2

after_success:
- julia -e 'using Pkg; Pkg.add("Coverage"); using Coverage;
Coveralls.submit(Coveralls.process_folder())'
Expand Down
3 changes: 3 additions & 0 deletions src/DataFrames.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ using Base.Sort, Base.Order, Base.Iterators
using TableTraits, IteratorInterfaceExtensions
import LinearAlgebra: norm
using Markdown
using Base.Threads

import DataAPI,
DataAPI.All,
Expand Down Expand Up @@ -87,6 +88,8 @@ else
export only
end

const NTHREADS = 1

include("other/utils.jl")
include("other/index.jl")

Expand Down
168 changes: 119 additions & 49 deletions src/groupeddataframe/splitapplycombine.jl
Original file line number Diff line number Diff line change
Expand Up @@ -453,18 +453,22 @@ julia> combine(gd, :, AsTable(Not(:a)) => sum, renamecols=false)
```
"""
function combine(f::Base.Callable, gd::GroupedDataFrame;
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
nthreads::Int=NTHREADS)
return combine_helper(f, gd, keepkeys=keepkeys, ungroup=ungroup,
copycols=true, keeprows=false, renamecols=renamecols)
copycols=true, keeprows=false, renamecols=renamecols,
nthreads=nthreads)
end

combine(f::typeof(nrow), gd::GroupedDataFrame;
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) =
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
nthreads::Int=NTHREADS) =
combine(gd, [nrow => :nrow], keepkeys=keepkeys, ungroup=ungroup,
renamecols=renamecols)

function combine(p::Pair, gd::GroupedDataFrame;
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
nthreads::Int=NTHREADS)
# move handling of aggregate to specialized combine
p_from, p_to = p

Expand All @@ -484,20 +488,24 @@ function combine(p::Pair, gd::GroupedDataFrame;
cs = p_from
end
return combine_helper(cs => p_to, gd, keepkeys=keepkeys, ungroup=ungroup,
copycols=true, keeprows=false, renamecols=renamecols)
copycols=true, keeprows=false, renamecols=renamecols,
nthreads=nthreads)
end

combine(gd::GroupedDataFrame,
cs::Union{Pair, typeof(nrow), ColumnIndex, MultiColumnIndex}...;
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) =
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
nthreads::Int=NTHREADS) =
_combine_prepare(gd, cs..., keepkeys=keepkeys, ungroup=ungroup,
copycols=true, keeprows=false, renamecols=renamecols)
copycols=true, keeprows=false, renamecols=renamecols,
nthreads=nthreads)

function _combine_prepare(gd::GroupedDataFrame,
@nospecialize(cs::Union{Pair, typeof(nrow),
ColumnIndex, MultiColumnIndex}...);
keepkeys::Bool, ungroup::Bool, copycols::Bool,
keeprows::Bool, renamecols::Bool)
keeprows::Bool, renamecols::Bool,
nthreads::Int)
cs_vec = []
for p in cs
if p === nrow
Expand Down Expand Up @@ -570,7 +578,8 @@ function _combine_prepare(gd::GroupedDataFrame,
f = Pair[first(x) => first(last(x)) for x in cs_norm]
nms = Symbol[last(last(x)) for x in cs_norm]
return combine_helper(f, gd, nms, keepkeys=keepkeys, ungroup=ungroup,
copycols=copycols, keeprows=keeprows, renamecols=renamecols)
copycols=copycols, keeprows=keeprows, renamecols=renamecols,
nthreads=nthreads)
end

function gen_groups(idx::Vector{Int})
Expand All @@ -590,11 +599,12 @@ end
function combine_helper(f, gd::GroupedDataFrame,
nms::Union{AbstractVector{Symbol},Nothing}=nothing;
keepkeys::Bool, ungroup::Bool,
copycols::Bool, keeprows::Bool, renamecols::Bool)
copycols::Bool, keeprows::Bool, renamecols::Bool,
nthreads::Int)
if !ungroup && !keepkeys
throw(ArgumentError("keepkeys=false when ungroup=false is not allowed"))
end
idx, valscat = _combine(f, gd, nms, copycols, keeprows, renamecols)
idx, valscat = _combine(f, gd, nms, copycols, keeprows, renamecols, nthreads)
!keepkeys && ungroup && return valscat
keys = groupcols(gd)
for key in keys
Expand Down Expand Up @@ -985,24 +995,72 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T
end

function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool,
incol::AbstractVector, gd::GroupedDataFrame)
incol::AbstractVector, gd::GroupedDataFrame; nthreads::Int)
n = length(gd)
groups = gd.groups
if adjust !== nothing || checkempty
counts = zeros(Int, n)
end
groups = gd.groups
@inbounds for i in eachindex(incol, groups)
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if U is not Any
if eltype(res) === Any && !isassigned(res, gix)
res[gix] = f(x, gix)
else
res[gix] = op(res[gix], f(x, gix))
nt = min(nthreads, Threads.nthreads())
if nt <= 1 || axes(incol) != axes(groups)
@inbounds for i in eachindex(incol, groups)
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if U is not Any
if eltype(res) === Any && !isassigned(res, gix)
res[gix] = f(x, gix)
else
res[gix] = op(res[gix], f(x, gix))
end
if adjust !== nothing || checkempty
counts[gix] += 1
end
end
end
else
res_vec = Vector{typeof(res)}(undef, nt)
# needs to be always allocated to fix type instability with @threads
counts_vec = Vector{Vector{Int}}(undef, nt)
res_vec[1] = res
if adjust !== nothing || checkempty
counts_vec[1] = counts
end
for i in 2:nt
res_vec[i] = copy(res)
if adjust !== nothing || checkempty
counts[gix] += 1
counts_vec[i] = zeros(Int, n)
end
end
Threads.@threads for tid in 1:nt
res′ = res_vec[tid]
if adjust !== nothing || checkempty
counts′ = counts_vec[tid]
end
start = 1 + ((tid - 1) * length(groups)) ÷ nt
stop = (tid * length(groups)) ÷ nt
@inbounds for i in start:stop
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if U is not Any
if eltype(res′) === Any && !isassigned(res′, gix)
res′[gix] = f(x, gix)
else
res′[gix] = op(res′[gix], f(x, gix))
end
if adjust !== nothing || checkempty
counts′[gix] += 1
end
end
end
end
for i in 2:length(res_vec)
res .= op.(res, res_vec[i])
end
if adjust !== nothing || checkempty
for i in 2:length(counts_vec)
counts .+= counts_vec[i]
end
end
end
Expand Down Expand Up @@ -1042,26 +1100,31 @@ end

# function barrier works around type instability of groupreduce_init due to applicable
groupreduce(f, op, condf, adjust, checkempty::Bool,
incol::AbstractVector, gd::GroupedDataFrame) =
incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Int) =
groupreduce!(groupreduce_init(op, condf, adjust, incol, gd),
f, op, condf, adjust, checkempty, incol, gd)
f, op, condf, adjust, checkempty, incol, gd, nthreads=nthreads)
# Avoids the overhead due to Missing when computing reduction
groupreduce(f, op, condf::typeof(!ismissing), adjust, checkempty::Bool,
incol::AbstractVector, gd::GroupedDataFrame) =
incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Int) =
groupreduce!(disallowmissing(groupreduce_init(op, condf, adjust, incol, gd)),
f, op, condf, adjust, checkempty, incol, gd)
f, op, condf, adjust, checkempty, incol, gd, nthreads=nthreads)

(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame) =
groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd)
(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame; nthreads::Int=NTHREADS) =
groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd,
nthreads=nthreads)

# this definition is missing in Julia 1.0 LTS and is required by aggregation for var
# TODO: remove this when we drop 1.0 support
if VERSION < v"1.1"
Base.zero(::Type{Missing}) = missing
end

function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame)
means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd)
function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Int=NTHREADS)
means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd,
nthreads=nthreads)
# !ismissing check is purely an optimization to avoid a copy later
if eltype(means) >: Missing && agg.condf !== !ismissing
T = Union{Missing, real(eltype(means))}
Expand All @@ -1071,10 +1134,11 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra
res = zeros(T, length(gd))
return groupreduce!(res, (x, i) -> @inbounds(abs2(x - means[i])), +, agg.condf,
(x, l) -> l <= 1 ? oftype(x / (l-1), NaN) : x / (l-1),
false, incol, gd)
false, incol, gd, nthreads=nthreads)
end

function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame)
function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Int=NTHREADS)
outcol = Aggregate(var, agg.condf)(incol, gd)
if eltype(outcol) <: Union{Missing, Rational}
return sqrt.(outcol)
Expand All @@ -1083,20 +1147,25 @@ function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFra
end
end

for f in (first, last)
function (agg::Aggregate{typeof(f)})(incol::AbstractVector, gd::GroupedDataFrame)
n = length(gd)
outcol = similar(incol, n)
fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last)
if isconcretetype(eltype(outcol))
return outcol
else
return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol)
for f in (:first, :last)
# Without using @eval the presence of a keyword argument triggers a Julia bug
@eval begin
function (agg::Aggregate{typeof($f)})(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Int=NTHREADS)
n = length(gd)
outcol = similar(incol, n)
fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last)
if isconcretetype(eltype(outcol))
return outcol
else
return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol)
end
end
end
end

function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame)
function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Int=NTHREADS)
if getfield(gd, :idx) === nothing
lens = zeros(Int, length(gd))
@inbounds for gix in gd.groups
Expand Down Expand Up @@ -1143,7 +1212,7 @@ end

function _combine(f::AbstractVector{<:Pair},
gd::GroupedDataFrame, nms::AbstractVector{Symbol},
copycols::Bool, keeprows::Bool, renamecols::Bool)
copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Int)
# here f should be normalized and in a form of source_cols => fun
@assert all(x -> first(x) isa Union{Int, AbstractVector{Int}, AsTable}, f)
@assert all(x -> last(x) isa Base.Callable, f)
Expand Down Expand Up @@ -1185,7 +1254,7 @@ function _combine(f::AbstractVector{<:Pair},
if length(gd) > 0 && isagg(p, gd)
incol = parentdf[!, source_cols]
agg = check_aggregate(last(p), incol)
outcol = agg(incol, gd)
outcol = agg(incol, gd, nthreads=nthreads)
res[i] = idx_agg, outcol
elseif keeprows && fun === identity && !(source_cols isa AsTable)
@assert source_cols isa Union{Int, AbstractVector{Int}}
Expand Down Expand Up @@ -1283,7 +1352,7 @@ function _combine(f::AbstractVector{<:Pair},
end

function _combine(fun::Base.Callable, gd::GroupedDataFrame, ::Nothing,
copycols::Bool, keeprows::Bool, renamecols::Bool)
copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Int)
@assert copycols && !keeprows
# use `similar` as `gd` might have been subsetted
firstres = length(gd) > 0 ? fun(gd[1]) : fun(similar(parent(gd), 0))
Expand All @@ -1293,7 +1362,7 @@ function _combine(fun::Base.Callable, gd::GroupedDataFrame, ::Nothing,
end

function _combine(p::Pair, gd::GroupedDataFrame, ::Nothing,
copycols::Bool, keeprows::Bool, renamecols::Bool)
copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Int)
# here p should not be normalized as we allow tabular return value from fun
# map and combine should not dispatch here if p is isagg
@assert copycols && !keeprows
Expand Down Expand Up @@ -1708,9 +1777,10 @@ julia> select(gd, :, AsTable(Not(:a)) => sum, renamecols=false)
```
"""
select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true,
ungroup::Bool=true, renamecols::Bool=true) =
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=NTHREADS) =
_combine_prepare(gd, args..., copycols=copycols, keepkeys=keepkeys,
ungroup=ungroup, keeprows=true, renamecols=renamecols)
ungroup=ungroup, keeprows=true, renamecols=renamecols,
nthreads=NTHREADS)

"""
transform(gd::GroupedDataFrame, args...;
Expand Down
Loading

0 comments on commit bc4dc71

Please sign in to comment.