diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6de797902f..0dbdb484e7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,6 +35,7 @@ jobs: - uses: actions/cache@v1 env: cache-name: cache-artifacts + JULIA_NUM_THREADS: 2 with: path: ~/.julia/artifacts key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }} diff --git a/NEWS.md b/NEWS.md index a5cdf5a2fd..8580fcea0a 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,12 @@ +# DataFrames v1.0 Release Notes + +## New functionalities + +* `combine`, `select` and `transform` with `GroupedDataFrame` now accept + a `nthreads` argument which enables multithreading for some optimized + grouped reductions ([#2491](https://github.com/JuliaData/DataFrames.jl/pull/2491)). + + # DataFrames v0.22 Release Notes ## Breaking changes diff --git a/src/DataFrames.jl b/src/DataFrames.jl index c3e7ac701b..e596b4aa03 100644 --- a/src/DataFrames.jl +++ b/src/DataFrames.jl @@ -3,7 +3,7 @@ module DataFrames using Statistics, Printf, REPL using Reexport, SortingAlgorithms, Compat, Unicode, PooledArrays, CategoricalArrays @reexport using Missings, InvertedIndices -using Base.Sort, Base.Order, Base.Iterators +using Base.Sort, Base.Order, Base.Iterators, Base.Threads using TableTraits, IteratorInterfaceExtensions import LinearAlgebra: norm using Markdown diff --git a/src/abstractdataframe/selection.jl b/src/abstractdataframe/selection.jl index df58ac2cea..41cb38bc84 100644 --- a/src/abstractdataframe/selection.jl +++ b/src/abstractdataframe/selection.jl @@ -644,9 +644,10 @@ end select(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true) select(args::Callable, df::DataFrame; renamecols::Bool=true) select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, - ungroup::Bool=true, renamecols::Bool=true) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, nthreads::Integer=1) Create a new data frame that contains columns from `df` or `gd` specified by `args` and return it. The result is guaranteed to have the same number of rows @@ -664,6 +665,9 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. +- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1 + currently has an effect only for some optimized grouped reductions. Values higher than + `Threads.nthreads()` will be replaced with that value. # Examples ```jldoctest @@ -858,9 +862,11 @@ end transform(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true) transform(f::Callable, df::DataFrame; renamecols::Bool=true) transform(gd::GroupedDataFrame, args...; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, nthreads::Integer=1) transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, nthreads::Integer=1) Create a new data frame that contains columns from `df` or `gd` plus columns specified by `args` and return it. The result is guaranteed to have the same @@ -877,6 +883,9 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. +- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1 + currently has an effect only for some optimized grouped reductions. Values higher than + `Threads.nthreads()` will be replaced with that value. Note that when the first argument is a `GroupedDataFrame`, `keepkeys=false` is needed to be able to return a different value for the grouping column: @@ -924,9 +933,11 @@ end combine(df::AbstractDataFrame, args...; renamecols::Bool=true) combine(f::Callable, df::AbstractDataFrame; renamecols::Bool=true) combine(gd::GroupedDataFrame, args...; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, nthreads::Integer=1) combine(f::Base.Callable, gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, nthreads::Integer=1) Create a new data frame that contains columns from `df` or `gd` specified by `args` and return it. The result can have any number of rows that is determined @@ -941,6 +952,9 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. +- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1 + currently has an effect only for some optimized grouped reductions. Values higher than + `Threads.nthreads()` will be replaced with that value. # Examples ```jldoctest diff --git a/src/groupeddataframe/fastaggregates.jl b/src/groupeddataframe/fastaggregates.jl index 4bce0e6c5f..55ba845460 100644 --- a/src/groupeddataframe/fastaggregates.jl +++ b/src/groupeddataframe/fastaggregates.jl @@ -157,24 +157,72 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T end function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame) + incol::AbstractVector, gd::GroupedDataFrame, nthreads::Integer) n = length(gd) + groups = gd.groups if adjust !== nothing || checkempty counts = zeros(Int, n) end - groups = gd.groups - @inbounds for i in eachindex(incol, groups) - gix = groups[i] - x = incol[i] - if gix > 0 && (condf === nothing || condf(x)) - # this check should be optimized out if U is not Any - if eltype(res) === Any && !isassigned(res, gix) - res[gix] = f(x, gix) - else - res[gix] = op(res[gix], f(x, gix)) + nt = min(nthreads, Threads.nthreads()) + if nt <= 1 || axes(incol) != axes(groups) + @inbounds for i in eachindex(incol, groups) + gix = groups[i] + x = incol[i] + if gix > 0 && (condf === nothing || condf(x)) + # this check should be optimized out if U is not Any + if eltype(res) === Any && !isassigned(res, gix) + res[gix] = f(x, gix) + else + res[gix] = op(res[gix], f(x, gix)) + end + if adjust !== nothing || checkempty + counts[gix] += 1 + end end + end + else + res_vec = Vector{typeof(res)}(undef, nt) + # needs to be always allocated to fix type instability with @threads + counts_vec = Vector{Vector{Int}}(undef, nt) + res_vec[1] = res + if adjust !== nothing || checkempty + counts_vec[1] = counts + end + for i in 2:nt + res_vec[i] = copy(res) + if adjust !== nothing || checkempty + counts_vec[i] = zeros(Int, n) + end + end + Threads.@threads for tid in 1:nt + res′ = res_vec[tid] if adjust !== nothing || checkempty - counts[gix] += 1 + counts′ = counts_vec[tid] + end + start = 1 + ((tid - 1) * length(groups)) ÷ nt + stop = (tid * length(groups)) ÷ nt + @inbounds for i in start:stop + gix = groups[i] + x = incol[i] + if gix > 0 && (condf === nothing || condf(x)) + # this check should be optimized out if U is not Any + if eltype(res′) === Any && !isassigned(res′, gix) + res′[gix] = f(x, gix) + else + res′[gix] = op(res′[gix], f(x, gix)) + end + if adjust !== nothing || checkempty + counts′[gix] += 1 + end + end + end + end + for i in 2:length(res_vec) + res .= op.(res, res_vec[i]) + end + if adjust !== nothing || checkempty + for i in 2:length(counts_vec) + counts .+= counts_vec[i] end end end @@ -218,17 +266,20 @@ end # function barrier works around type instability of groupreduce_init due to applicable groupreduce(f, op, condf, adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame) = + incol::AbstractVector, gd::GroupedDataFrame, + nthreads::Integer) = groupreduce!(groupreduce_init(op, condf, adjust, incol, gd), - f, op, condf, adjust, checkempty, incol, gd) + f, op, condf, adjust, checkempty, incol, gd, nthreads) # Avoids the overhead due to Missing when computing reduction groupreduce(f, op, condf::typeof(!ismissing), adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame) = + incol::AbstractVector, gd::GroupedDataFrame, + nthreads::Integer) = groupreduce!(disallowmissing(groupreduce_init(op, condf, adjust, incol, gd)), - f, op, condf, adjust, checkempty, incol, gd) + f, op, condf, adjust, checkempty, incol, gd, nthreads) -(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame) = - groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd) +(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Integer=1) = + groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd, nthreads) # this definition is missing in Julia 1.0 LTS and is required by aggregation for var # TODO: remove this when we drop 1.0 support @@ -236,8 +287,10 @@ if VERSION < v"1.1" Base.zero(::Type{Missing}) = missing end -function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame) - means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd) +function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Integer=1) + means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, + incol, gd, nthreads) # !ismissing check is purely an optimization to avoid a copy later if eltype(means) >: Missing && agg.condf !== !ismissing T = Union{Missing, real(eltype(means))} @@ -247,11 +300,12 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra res = zeros(T, length(gd)) return groupreduce!(res, (x, i) -> @inbounds(abs2(x - means[i])), +, agg.condf, (x, l) -> l <= 1 ? oftype(x / (l-1), NaN) : x / (l-1), - false, incol, gd) + false, incol, gd, nthreads) end -function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame) - outcol = Aggregate(var, agg.condf)(incol, gd) +function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Integer=1) + outcol = Aggregate(var, agg.condf)(incol, gd; nthreads=nthreads) if eltype(outcol) <: Union{Missing, Rational} return sqrt.(outcol) else @@ -259,20 +313,25 @@ function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFra end end -for f in (first, last) - function (agg::Aggregate{typeof(f)})(incol::AbstractVector, gd::GroupedDataFrame) - n = length(gd) - outcol = similar(incol, n) - fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last) - if isconcretetype(eltype(outcol)) - return outcol - else - return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol) +for f in (:first, :last) + # Without using @eval the presence of a keyword argument triggers a Julia bug + @eval begin + function (agg::Aggregate{typeof($f)})(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Integer=1) + n = length(gd) + outcol = similar(incol, n) + fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last) + if isconcretetype(eltype(outcol)) + return outcol + else + return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol) + end end end end -function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame) +function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Integer=1) if getfield(gd, :idx) === nothing lens = zeros(Int, length(gd)) @inbounds for gix in gd.groups diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index cc59e79153..41eb912f39 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -22,7 +22,7 @@ function _combine_prepare(gd::GroupedDataFrame, @nospecialize(cs::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex}...); keepkeys::Bool, ungroup::Bool, copycols::Bool, - keeprows::Bool, renamecols::Bool) + keeprows::Bool, renamecols::Bool, nthreads::Integer) if !ungroup && !keepkeys throw(ArgumentError("keepkeys=false when ungroup=false is not allowed")) end @@ -63,7 +63,8 @@ function _combine_prepare(gd::GroupedDataFrame, # if optional_transform[i] is true then the transformation will be skipped # if earlier column with a column with the same name was created - idx, valscat = _combine(gd, cs_norm, optional_transform, copycols, keeprows, renamecols) + idx, valscat = _combine(gd, cs_norm, optional_transform, + copycols, keeprows, renamecols, nthreads) !keepkeys && ungroup && return valscat @@ -194,13 +195,14 @@ function _combine_process_agg(@nospecialize(cs_i::Pair{Int, <:Pair{<:Function, S gd::GroupedDataFrame, seen_cols::Dict{Symbol, Tuple{Bool, Int}}, trans_res::Vector{TransformationResult}, - idx_agg::Union{Nothing, AbstractVector{Int}}) + idx_agg::Union{Nothing, AbstractVector{Int}}, + nthreads::Integer) @assert isagg(cs_i, gd) @assert !optional_i out_col_name = last(last(cs_i)) incol = parentdf[!, first(cs_i)] agg = check_aggregate(first(last(cs_i)), incol) - outcol = agg(incol, gd) + outcol = agg(incol, gd; nthreads=nthreads) if haskey(seen_cols, out_col_name) optional, loc = seen_cols[out_col_name] @@ -485,7 +487,7 @@ end function _combine(gd::GroupedDataFrame, @nospecialize(cs_norm::Vector{Any}), optional_transform::Vector{Bool}, - copycols::Bool, keeprows::Bool, renamecols::Bool) + copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Integer) if isempty(cs_norm) if keeprows && nrow(parent(gd)) > 0 && minimum(gd.groups) == 0 throw(ArgumentError("select and transform do not support " * @@ -529,7 +531,8 @@ function _combine(gd::GroupedDataFrame, optional_i = optional_transform[i] if length(gd) > 0 && isagg(cs_i, gd) - _combine_process_agg(cs_i, optional_i, parentdf, gd, seen_cols, trans_res, idx_agg) + _combine_process_agg(cs_i, optional_i, parentdf, gd, + seen_cols, trans_res, idx_agg, nthreads) elseif keeprows && cs_i isa Pair && first(last(cs_i)) === identity && !(first(cs_i) isa AsTable) && (last(last(cs_i)) isa Symbol) # this is a fast path used when we pass a column or rename a column in select or transform @@ -620,82 +623,95 @@ function _combine(gd::GroupedDataFrame, end function combine(f::Base.Callable, gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Integer=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame")) end - return combine(gd, f, keepkeys=keepkeys, ungroup=ungroup, renamecols=renamecols) + return combine(gd, f, keepkeys=keepkeys, ungroup=ungroup, renamecols=renamecols, + nthreads=nthreads) end combine(f::Pair, gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) = + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Integer=1) = throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame. " * "You can pass a `Pair` as the second argument of the transformation. If you want the return " * "value to be processed as having multiple columns add `=> AsTable` suffix to the pair.")) combine(gd::GroupedDataFrame, cs::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex}...; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) = + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Integer=1) = _combine_prepare(gd, cs..., keepkeys=keepkeys, ungroup=ungroup, - copycols=true, keeprows=false, renamecols=renamecols) + copycols=true, keeprows=false, renamecols=renamecols, + nthreads=nthreads) function select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Integer=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return select(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup) + return select(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup, + nthreads=nthreads) end select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, - ungroup::Bool=true, renamecols::Bool=true) = + ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) = _combine_prepare(gd, args..., copycols=copycols, keepkeys=keepkeys, - ungroup=ungroup, keeprows=true, renamecols=renamecols) + ungroup=ungroup, keeprows=true, renamecols=renamecols, + nthreads=nthreads) function transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Integer=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return transform(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup) + return transform(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup, + nthreads=nthreads) end function transform(gd::GroupedDataFrame, args...; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Integer=1) res = select(gd, :, args..., copycols=copycols, keepkeys=keepkeys, - ungroup=ungroup, renamecols=renamecols) + ungroup=ungroup, renamecols=renamecols, nthreads=nthreads) # res can be a GroupedDataFrame based on DataFrame or a DataFrame, # so parent always gives a data frame select!(parent(res), propertynames(parent(gd)), :) return res end -function select!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) +function select!(f::Base.Callable, gd::GroupedDataFrame; + ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return select!(gd, f, ungroup=ungroup) + return select!(gd, f, ungroup=ungroup, nthreads=nthreads) end function select!(gd::GroupedDataFrame{DataFrame}, args...; - ungroup::Bool=true, renamecols::Bool=true) - newdf = select(gd, args..., copycols=false, renamecols=renamecols) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) + newdf = select(gd, args..., copycols=false, renamecols=renamecols, nthreads=nthreads) df = parent(gd) _replace_columns!(df, newdf) return ungroup ? df : gd end -function transform!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) +function transform!(f::Base.Callable, gd::GroupedDataFrame; + ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return transform!(gd, f, ungroup=ungroup) + return transform!(gd, f, ungroup=ungroup, nthreads=nthreads) end function transform!(gd::GroupedDataFrame{DataFrame}, args...; - ungroup::Bool=true, renamecols::Bool=true) - newdf = select(gd, :, args..., copycols=false, renamecols=renamecols) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) + newdf = select(gd, :, args..., copycols=false, renamecols=renamecols, nthreads=nthreads) df = parent(gd) select!(newdf, propertynames(df), :) _replace_columns!(df, newdf) diff --git a/test/grouping.jl b/test/grouping.jl index 27d05e7f49..70c1d51e50 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -98,6 +98,22 @@ function groupby_checked(df::AbstractDataFrame, keys, args...; kwargs...) return ogd end +function combine_checked(gd::GroupedDataFrame, args...; kwargs...) + res1 = combine(gd, args...; nthreads=1, kwargs...) + res2 = combine(gd, args...; nthreads=2, kwargs...) + @test names(res1) == names(res2) + for (c1, c2) in zip(eachcol(res1), eachcol(res2)) + if eltype(c1) <: Union{AbstractFloat, Missing} + @test ismissing.(c1) == ismissing.(c2) + @test isapprox(collect(skipmissing(c1)), collect(skipmissing(c2)), + nans=true) + else + @test c1 ≅ c2 + end + end + res1 +end + @testset "parent" begin df = DataFrame(a = [1, 1, 2, 2], b = [5, 6, 7, 8]) gd = groupby_checked(df, :a) @@ -911,7 +927,7 @@ Base.isless(::TestType, ::TestType) = false gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x1 => f => :y) + res = combine_checked(gd, :x1 => f => :y) expected = combine(gd, :x1 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -921,7 +937,7 @@ Base.isless(::TestType, ::TestType) = false df.x3 = Vector{T}(df.x1) gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x3 => f => :y) + res = combine_checked(gd, :x3 => f => :y) expected = combine(gd, :x3 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -933,11 +949,11 @@ Base.isless(::TestType, ::TestType) = false df.x3[1] = missing gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x3 => f => :y) + res = combine_checked(gd, :x3 => f => :y) expected = combine(gd, :x3 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) - res = combine(gd, :x3 => f∘skipmissing => :y) + res = combine_checked(gd, :x3 => f∘skipmissing => :y) expected = combine(gd, :x3 => (x -> f(collect(skipmissing(x)))) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -949,7 +965,7 @@ Base.isless(::TestType, ::TestType) = false if f in (maximum, minimum, first, last) @test_throws ArgumentError combine(gd, :x3 => f∘skipmissing => :y) else - res = combine(gd, :x3 => f∘skipmissing => :y) + res = combine_checked(gd, :x3 => f∘skipmissing => :y) expected = combine(gd, :x3 => (x -> f(collect(skipmissing(x)))) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -960,7 +976,7 @@ Base.isless(::TestType, ::TestType) = false gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x2 => f => :y) + res = combine_checked(gd, :x2 => f => :y) expected = combine(gd, :x2 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -973,14 +989,14 @@ Base.isless(::TestType, ::TestType) = false m && (df.x3[1] = missing) gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x3 => f => :y) + res = combine_checked(gd, :x3 => f => :y) expected = combine(gd, :x3 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) f === length && continue - res = combine(gd, :x3 => f∘skipmissing => :y) + res = combine_checked(gd, :x3 => f∘skipmissing => :y) expected = combine(gd, :x3 => (x -> f(collect(skipmissing(x)))) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -989,7 +1005,7 @@ Base.isless(::TestType, ::TestType) = false @test_throws ArgumentError combine(gd, :x3 => f∘skipmissing => :y) end end - @test combine(gd, :x1 => maximum => :y, :x2 => sum => :z) ≅ + @test combine_checked(gd, :x1 => maximum => :y, :x2 => sum => :z) ≅ combine(gd, :x1 => (x -> maximum(x)) => :y, :x2 => (x -> sum(x)) => :z) # Test floating point corner cases @@ -1000,7 +1016,7 @@ Base.isless(::TestType, ::TestType) = false gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x1 => f => :y) + res = combine_checked(gd, :x1 => f => :y) expected = combine(gd, :x1 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -1011,18 +1027,18 @@ Base.isless(::TestType, ::TestType) = false df.x3[1] = missing gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x3 => f => :y) + res = combine_checked(gd, :x3 => f => :y) expected = combine(gd, :x3 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) - res = combine(gd, :x3 => f∘skipmissing => :y) + res = combine_checked(gd, :x3 => f∘skipmissing => :y) expected = combine(gd, :x3 => (x -> f(collect(skipmissing(x)))) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) end df = DataFrame(x = [1, 1, 2, 2], y = Any[1, 2.0, 3.0, 4.0]) - res = combine(groupby_checked(df, :x), :y => maximum => :z) + res = combine_checked(groupby_checked(df, :x), :y => maximum => :z) @test res.z isa Vector{Float64} @test res.z == combine(groupby_checked(df, :x), :y => (x -> maximum(x)) => :z).z @@ -1031,7 +1047,7 @@ Base.isless(::TestType, ::TestType) = false gd = groupby_checked(df, :x, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices for f in (maximum, minimum) - res = combine(gd, :y => maximum => :z) + res = combine_checked(gd, :y => maximum => :z) @test res.z isa Vector{Any} @test res.z == combine(gd, :y => (x -> maximum(x)) => :z).z end @@ -2328,6 +2344,13 @@ end @test parent(gdf2).y ≅ df.y @test parent(gdf2).g === df.g + # Test that nthreads argument is accepted + # Correctness tests are run by combine_checked + @test select(gdf, :x => sum, nthreads=2) ≅ + select(gdf, :x => sum) + @test transform(gdf, :x => sum, nthreads=2) ≅ + transform(gdf, :x => sum) + gdf = groupby_checked(df, :g, sort=dosort, skipmissing=true) @test_throws ArgumentError select(gdf, :x => sum) @test_throws ArgumentError select(gdf, :x => sum, ungroup=false) @@ -2405,6 +2428,15 @@ end @test dfc.x_first == [1, 2, 2, 4] @test propertynames(dfc) == [:g, :x, :y, :x_first] + # Test that nthreads argument is accepted + # Correctness tests are run by combine_checked + dfc = copy(df) + gdf = groupby_checked(dfc, :g, sort=dosort, skipmissing=false) + @test select(gdf, :x => sum) ≅ select!(gdf, :x => sum, nthreads=2) + dfc = copy(df) + gdf = groupby_checked(dfc, :g, sort=dosort, skipmissing=false) + @test transform(gdf, :x => sum) ≅ transform!(gdf, :x => sum, nthreads=2) + dfc = copy(df) gdf = groupby_checked(dfc, :g, sort=dosort, skipmissing=true) @test_throws ArgumentError select!(gdf, :x => sum) @@ -2564,43 +2596,67 @@ end @testset "corner cases of group_reduce" begin df = DataFrame(g=[1, 1, 1, 2, 2, 2], x=Any[1, 1, 1, 1.5, 1.5, 1.5]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum) == DataFrame(g=1:2, x_sum=[3.0, 4.5]) - - @test combine(gdf, :x => sum∘skipmissing) == DataFrame(g=1:2, x_sum_skipmissing=[3.0, 4.5]) - @test combine(gdf, :x => mean∘skipmissing) == DataFrame(g=1:2, x_mean_skipmissing=[1.0, 1.5]) - @test combine(gdf, :x => var∘skipmissing) == DataFrame(g=1:2, x_var_skipmissing=[0.0, 0.0]) - @test combine(gdf, :x => mean) == DataFrame(g=1:2, x_mean=[1.0, 1.5]) - @test combine(gdf, :x => var) == DataFrame(g=1:2, x_var=[0.0, 0.0]) + @test combine_checked(gdf, :x => sum) == DataFrame(g=1:2, x_sum=[3.0, 4.5]) + + @test combine_checked(gdf, :x => sum∘skipmissing) == + DataFrame(g=1:2, x_sum_skipmissing=[3.0, 4.5]) + @test combine_checked(gdf, :x => mean∘skipmissing) == + DataFrame(g=1:2, x_mean_skipmissing=[1.0, 1.5]) + @test combine_checked(gdf, :x => var∘skipmissing) == + DataFrame(g=1:2, x_var_skipmissing=[0.0, 0.0]) + @test combine_checked(gdf, :x => mean) == + DataFrame(g=1:2, x_mean=[1.0, 1.5]) + @test combine_checked(gdf, :x => var) == + DataFrame(g=1:2, x_var=[0.0, 0.0]) df = DataFrame(g=[1, 1, 1, 2, 2, 2], x=Any[1, 1, 1, 1, 1, missing]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum∘skipmissing) == DataFrame(g=1:2, x_sum_skipmissing=[3, 2]) - @test combine(gdf, :x => mean∘skipmissing) == DataFrame(g=1:2, x_mean_skipmissing=[1.0, 1.0]) - @test combine(gdf, :x => var∘skipmissing) == DataFrame(g=1:2, x_var_skipmissing=[0.0, 0.0]) - @test combine(gdf, :x => sum) ≅ DataFrame(g=1:2, x_sum=[3, missing]) - @test combine(gdf, :x => mean) ≅ DataFrame(g=1:2, x_mean=[1.0, missing]) - @test combine(gdf, :x => var) ≅ DataFrame(g=1:2, x_var=[0.0, missing]) + @test combine_checked(gdf, :x => sum∘skipmissing) == + DataFrame(g=1:2, x_sum_skipmissing=[3, 2]) + @test combine_checked(gdf, :x => mean∘skipmissing) == + DataFrame(g=1:2, x_mean_skipmissing=[1.0, 1.0]) + @test combine_checked(gdf, :x => var∘skipmissing) == + DataFrame(g=1:2, x_var_skipmissing=[0.0, 0.0]) + @test combine_checked(gdf, :x => sum) ≅ + DataFrame(g=1:2, x_sum=[3, missing]) + @test combine_checked(gdf, :x => mean) ≅ + DataFrame(g=1:2, x_mean=[1.0, missing]) + @test combine_checked(gdf, :x => var) ≅ + DataFrame(g=1:2, x_var=[0.0, missing]) df = DataFrame(g=[1, 1, 1, 2, 2, 2], x=Union{Real, Missing}[1, 1, 1, 1, 1, missing]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum∘skipmissing) == DataFrame(g=1:2, x_sum_skipmissing=[3, 2]) - @test combine(gdf, :x => mean∘skipmissing) == DataFrame(g=1:2, x_mean_skipmissing=[1.0, 1.0]) - @test combine(gdf, :x => var∘skipmissing) == DataFrame(g=1:2, x_var_skipmissing=[0.0, 0.0]) - @test combine(gdf, :x => sum) ≅ DataFrame(g=1:2, x_sum=[3, missing]) - @test combine(gdf, :x => mean) ≅ DataFrame(g=1:2, x_mean=[1.0, missing]) - @test combine(gdf, :x => var) ≅ DataFrame(g=1:2, x_var=[0.0, missing]) + @test combine_checked(gdf, :x => sum∘skipmissing) == + DataFrame(g=1:2, x_sum_skipmissing=[3, 2]) + @test combine_checked(gdf, :x => mean∘skipmissing) == + DataFrame(g=1:2, x_mean_skipmissing=[1.0, 1.0]) + @test combine_checked(gdf, :x => var∘skipmissing) == + DataFrame(g=1:2, x_var_skipmissing=[0.0, 0.0]) + @test combine_checked(gdf, :x => sum) ≅ + DataFrame(g=1:2, x_sum=[3, missing]) + @test combine_checked(gdf, :x => mean) ≅ + DataFrame(g=1:2, x_mean=[1.0, missing]) + @test combine_checked(gdf, :x => var) ≅ + DataFrame(g=1:2, x_var=[0.0, missing]) Random.seed!(1) df = DataFrame(g = rand(1:2, 1000), x1 = rand(Int, 1000)) df.x2 = big.(df.x1) gdf = groupby_checked(df, :g) - res = combine(gdf, :x1 => sum, :x2 => sum, :x1 => x -> sum(x), :x2 => x -> sum(x)) + res = combine_checked(gdf, + :x1 => sum, :x2 => sum, + :x1 => x -> sum(x), + :x2 => x -> sum(x)) @test res.x1_sum == res.x1_function @test res.x2_sum == res.x2_function @test res.x1_sum != res.x2_sum # we are large enough to be sure we differ - res = combine(gdf, :x1 => mean, :x2 => mean, :x1 => x -> mean(x), :x2 => x -> mean(x)) + res = combine_checked(gdf, + :x1 => mean, + :x2 => mean, + :x1 => x -> mean(x), + :x2 => x -> mean(x)) if VERSION >= v"1.5" @test res.x1_mean ≈ res.x1_function else @@ -2612,71 +2668,87 @@ end # make sure we do correct promotions in corner case similar to Base df = DataFrame(g=[1, 1, 1, 1, 1, 1], x=Real[1, 1, big(typemax(Int)), 1, 1, 1]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum)[1, 2] == sum(df.x) - @test eltype(combine(gdf, :x => sum)[!, 2]) === BigInt + @test combine_checked(gdf, :x => sum)[1, 2] == sum(df.x) + @test eltype(combine_checked(gdf, :x => sum)[!, 2]) === BigInt df = DataFrame(g=[1, 1, 1, 1, 1, 1], x=Real[1, 1, typemax(Int), 1, 1, 1]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum)[1, 2] == sum(df.x) - @test eltype(combine(gdf, :x => sum)[!, 2]) === Int + @test combine_checked(gdf, :x => sum)[1, 2] == sum(df.x) + @test eltype(combine_checked(gdf, :x => sum)[!, 2]) === Int df = DataFrame(g=[1, 1, 1, 1, 1, 1], x=fill(missing, 6)) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum)[1, 2] isa Missing - @test eltype(combine(gdf, :x => sum)[!, 2]) === Missing + res = combine_checked(gdf, :x => sum) + @test res[1, 2] isa Missing + @test eltype(res[!, 2]) === Missing @test_throws MethodError combine(gdf, :x => sum∘skipmissing) df = DataFrame(g=[1, 1, 1, 1, 1, 1], x=convert(Vector{Union{Real, Missing}}, fill(missing, 6))) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum)[1, 2] isa Missing - @test eltype(combine(gdf, :x => sum)[!, 2]) === Missing - @test combine(gdf, :x => sum∘skipmissing) == DataFrame(g=1, x_sum_skipmissing=0) - @test eltype(combine(gdf, :x => sum∘skipmissing)[!, 2]) === Int + @test combine_checked(gdf, :x => sum)[1, 2] isa Missing + @test eltype(combine_checked(gdf, :x => sum)[!, 2]) === Missing + @test combine_checked(gdf, :x => sum∘skipmissing) == DataFrame(g=1, x_sum_skipmissing=0) + @test eltype(combine_checked(gdf, :x => sum∘skipmissing)[!, 2]) === Int df = DataFrame(g=[1, 1, 1, 1, 1, 1], x=convert(Vector{Union{Int, Missing}}, fill(missing, 6))) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum)[1, 2] isa Missing - @test eltype(combine(gdf, :x => sum)[!, 2]) === Missing - @test combine(gdf, :x => sum∘skipmissing)[1, 2] == 0 - @test eltype(combine(gdf, :x => sum∘skipmissing)[!, 2]) === Int + @test combine_checked(gdf, :x => sum)[1, 2] isa Missing + @test eltype(combine_checked(gdf, :x => sum)[!, 2]) === Missing + @test combine_checked(gdf, :x => sum∘skipmissing)[1, 2] == 0 + @test eltype(combine_checked(gdf, :x => sum∘skipmissing)[!, 2]) === Int df = DataFrame(g=[1, 1, 1, 1, 1, 1], x=convert(Vector{Any}, fill(missing, 6))) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum)[1, 2] isa Missing - @test eltype(combine(gdf, :x => sum)[!, 2]) === Missing + res = combine_checked(gdf, :x => sum) + @test res[1, 2] isa Missing + @test eltype(res[!, 2]) === Missing @test_throws MethodError combine(gdf, :x => sum∘skipmissing) # these questions can go to a final exam in "mastering combine" class df = DataFrame(g=[1, 2, 3], x=["a", "b", "c"]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum => :a, :x => prod => :b) == - combine(gdf, :x => (x -> sum(x)) => :a, :x => (x -> prod(x)) => :b) + @test combine_checked(gdf, :x => sum => :a, :x => prod => :b) == + combine_checked(gdf, :x => (x -> sum(x)) => :a, :x => (x -> prod(x)) => :b) + df = DataFrame(g=[1, 2, 3], x=Any["a", "b", "c"]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum => :a, :x => prod => :b) == - combine(gdf, :x => (x -> sum(x)) => :a, :x => (x -> prod(x)) => :b) + @test combine_checked(gdf, :x => sum => :a, :x => prod => :b) == + combine_checked(gdf, :x => (x -> sum(x)) => :a, :x => (x -> prod(x)) => :b) + df = DataFrame(g=[1, 1], x=[missing, "a"]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum∘skipmissing => :a, :x => prod∘skipmissing => :b) == - combine(gdf, :x => (x -> sum(skipmissing(x))) => :a, :x => (x -> prod(skipmissing(x))) => :b) + @test combine_checked(gdf, + :x => sum∘skipmissing => :a, + :x => prod∘skipmissing => :b) == + combine_checked(gdf, + :x => (x -> sum(skipmissing(x))) => :a, + :x => (x -> prod(skipmissing(x))) => :b) + df = DataFrame(g=[1, 1], x=Any[missing, "a"]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum∘skipmissing => :a, :x => prod∘skipmissing => :b) == - combine(gdf, :x => (x -> sum(skipmissing(x))) => :a, :x => (x -> prod(skipmissing(x))) => :b) + @test combine_checked(gdf, + :x => sum∘skipmissing => :a, + :x => prod∘skipmissing => :b) == + combine_checked(gdf, + :x => (x -> sum(skipmissing(x))) => :a, + :x => (x -> prod(skipmissing(x))) => :b) df = DataFrame(g=[1, 2], x=Any[nothing, "a"]) gdf = groupby_checked(df, :g) - df2 = combine(gdf, :x => sum => :a, :x => prod => :b) + df2 = combine_checked(gdf, :x => sum => :a, :x => prod => :b) @test df2 == DataFrame(g=[1, 2], a=[nothing, "a"], b=[nothing, "a"]) @test eltype(df2.a) === eltype(df2.b) === Union{Nothing, String} + df = DataFrame(g=[1, 2], x=Any[1, 1.0]) gdf = groupby_checked(df, :g) - df2 = combine(gdf, :x => sum => :a, :x => prod => :b) + df2 = _checked(gdf, :x => sum => :a, :x => prod => :b) @test df2 == DataFrame(g=[1, 2], a=ones(2), b=ones(2)) @test eltype(df2.a) === eltype(df2.b) === Float64 + df = DataFrame(g=[1, 2], x=[1, "1"]) gdf = groupby_checked(df, :g) - df2 = combine(gdf, :x => sum => :a, :x => prod => :b) + df2 = combine_checked(gdf, :x => sum => :a, :x => prod => :b) @test df2 == DataFrame(g=[1, 2], a=[1, "1"], b=[1, "1"]) @test eltype(df2.a) === eltype(df2.b) === Any + df = DataFrame(g=[1, 1, 2], x=[UInt8(1), UInt8(1), missing]) gdf = groupby_checked(df, :g) - df2 = combine(gdf, :x => sum => :a, :x => prod => :b) + df2 = combine_checked(gdf, :x => sum => :a, :x => prod => :b) @test df2 ≅ DataFrame(g=[1, 2], a=[2, missing], b=[1, missing]) @test eltype(df2.a) === eltype(df2.b) === Union{UInt, Missing} end