Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start work on supporting filtering while parsing #656

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 141 additions & 65 deletions src/file.jl
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ function File(source;
tasks::Integer=Threads.nthreads(),
select=nothing,
drop=nothing,
filter=nothing,
# parsing options
missingstrings=String[],
missingstring="",
Expand Down Expand Up @@ -233,14 +234,15 @@ function File(source;
refs = Vector{RefPool}(undef, ncols)
if threaded === true
# multithreaded
finalrows, columns = multithreadparse(types, flags, buf, datapos, len, options, coloptions, rowsguess, datarow - 1, pool, refs, ncols, typemap, h.categorical, customtypes, limit, tasks, debug)
finalrows, columns = multithreadparse(types, flags, buf, datapos, len, options, coloptions, rowsguess, datarow - 1, pool, refs, ncols, typemap, h.categorical, customtypes, limit, filter, h.names, tasks, debug)
else
if limit < rowsguess
rowsguess = limit
end
columns = allocate(rowsguess, ncols, types, flags)
codes = filter === nothing ? EMPTY_CODES : zeros(Int16, ncols)
t = Base.time()
finalrows, pos = parsefilechunk!(Val(transpose), ncols, typemap, columns, buf, datapos, len, limit, positions, pool, refs, rowsguess, datarow - 1, types, flags, debug, options, coloptions, customtypes)
finalrows, pos = parsefilechunk!(Val(transpose), ncols, typemap, columns, buf, datapos, len, limit, positions, pool, refs, rowsguess, datarow - 1, types, flags, filter, h.names, codes, debug, options, coloptions, customtypes)
debug && println("time for initial parsing: $(Base.time() - t)")
# cleanup our columns if needed
for i = 1:ncols
Expand Down Expand Up @@ -292,6 +294,7 @@ end

const EMPTY_INT_ARRAY = Int64[]
const EMPTY_REFRECODE = UInt32[]
const EMPTY_CODES = Int16[]

# after multithreaded parsing, we need to synchronize pooled refs from different chunks of the file
# we pick one chunk as "source of truth", then adjust other chunks as needed
Expand Down Expand Up @@ -379,12 +382,12 @@ function makeandsetpooled!(columns, i, column, refs, flags, categorical)
return
end

function multithreadparse(types, flags, buf, datapos, len, options, coloptions, rowsguess, datarow, pool, refs, ncols, typemap, categorical, customtypes, limit, N, debug)
function multithreadparse(types, flags, buf, datapos, len, options, coloptions, rowsguess, datarow, pool, refs, ncols, typemap, categorical, customtypes, limit, filter, names, N, debug)
# when limiting w/ multithreaded parsing, we try to guess about where in the file the limit row # will be
# then adjust our final file len to the end of that row
# we add some cushion so we hopefully get the limit row correctly w/o shooting past too far and needing to resize! down
# but we also don't guarantee limit will be exact w/ multithreaded parsing
if limit < rowsguess
if limit < rowsguess && filter === nothing
newlen = [0, ceil(Int64, (limit / (rowsguess * 0.8)) * len), 0]
findrowstarts!(buf, len, options, newlen, ncols)
len = newlen[2] - 1
Expand All @@ -411,7 +414,8 @@ function multithreadparse(types, flags, buf, datapos, len, options, coloptions,
task_types = copy(types)
task_columns = allocate(rowchunkguess, ncols, task_types, task_flags)
pertaskcolumns[i] = task_columns
task_rows, task_pos = parsefilechunk!(Val(false), ncols, typemap, task_columns, buf, task_pos, task_len, typemax(Int64), EMPTY_INT_ARRAY, pool, task_refs, rowchunkguess, datarow + (rowchunkguess * (i - 1)), task_types, task_flags, debug, options, coloptions, customtypes)
codes = filter === nothing ? EMPTY_CODES : zeros(Int16, ncols)
task_rows, task_pos = parsefilechunk!(Val(false), ncols, typemap, task_columns, buf, task_pos, task_len, typemax(Int64), EMPTY_INT_ARRAY, pool, task_refs, rowchunkguess, datarow + (rowchunkguess * (i - 1)), task_types, task_flags, filter, names, codes, debug, options, coloptions, customtypes)
rows[i] = task_rows
# promote column types/flags across task chunks
for col = 1:ncols
Expand Down Expand Up @@ -509,13 +513,13 @@ end # @static if VERSION >= v"1.3-DEV"
return limit < finalrows ? limit : finalrows, finalcolumns
end

function parsefilechunk!(TR::Val{transpose}, ncols, typemap, columns, buf, pos, len, limit, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options::Parsers.Options{ignorerepeated}, coloptions, ::Type{customtypes}) where {transpose, ignorerepeated, customtypes}
function parsefilechunk!(TR::Val{transpose}, ncols, typemap, columns, buf, pos, len, limit, positions, pool, refs, rowsguess, rowoffset, types, flags, filter, names, codes, debug, options, coloptions, ::Type{customtypes}) where {transpose, customtypes}
row = 0
startpos = pos
if pos <= len && len > 0
while row < limit
row += 1
pos = parserow(row, TR, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options, coloptions, customtypes)
pos = parserow(row, TR, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, filter, names, codes, debug, options, coloptions, customtypes)
(pos > len || row == limit) && break
# if our initial row estimate was too few, we need to reallocate our columsn to read the rest of the file
if row + 1 > rowsguess
Expand All @@ -537,7 +541,7 @@ function parsefilechunk!(TR::Val{transpose}, ncols, typemap, columns, buf, pos,
return row, pos
end

@noinline function promotetostring!(col, TR::Val{transpose}, ncols, typemap, columns, buf, pos, len, limit, positions, pool, refs, rowsguess, rowoffset, types, origflags, debug, options::Parsers.Options{ignorerepeated}, coloptions, ::Type{customtypes}) where {transpose, ignorerepeated, customtypes}
@noinline function promotetostring!(col, TR::Val{transpose}, ncols, typemap, columns, buf, pos, len, limit, positions, pool, refs, rowsguess, rowoffset, types, origflags, debug, options, coloptions, ::Type{customtypes}) where {transpose, customtypes}
flags = copy(origflags)
for i = 1:ncols
if i == col
Expand All @@ -550,10 +554,11 @@ end
end
row = 0
startpos = pos
names = Symbol[]
if pos <= len && len > 0
while row < limit
row += 1
pos = parserow(row, TR, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options, coloptions, customtypes)
pos = parserow(row, TR, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, nothing, names, EMPTY_CODES, debug, options, coloptions, customtypes)
pos > len && break
end
end
Expand Down Expand Up @@ -594,70 +599,141 @@ end
end
end

@inline function parserow(row, TR::Val{transpose}, ncols, typemap, columns, startpos, buf, pos::A, len, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options::B, coloptions::C, ::Type{customtypes}) where {transpose, A, B, C, customtypes}
for col = 1:ncols
if transpose
@inbounds pos = positions[col]
end
@inbounds flag = flags[col]
@inbounds column = columns[col]
@inbounds opts = coloptions === nothing ? options : coloptions[col]
# @show typeof(column)
if willdrop(flag) || (user(flag) && column isa MissingVector)
pos, code = parsemissing!(buf, pos, len, opts, row, rowoffset, col)
elseif !typedetected(flag)
pos, code = detect(columns, buf, pos, len, opts, row, rowoffset, col, typemap, pool, refs, debug, types, flags, rowsguess)
elseif column isa SVec{Int64}
pos, code = parseint!(flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{Float64}
pos, code = parsevalue!(Float64, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec2{String}
pos, code = parsestring2!(flag, column, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{Date}
pos, code = parsevalue!(Date, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{DateTime}
pos, code = parsevalue!(DateTime, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{Time}
pos, code = parsevalue!(Time, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa Vector{Union{Missing, Bool}}
pos, code = parsevalue!(Bool, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa Vector{UInt32}
pos, code = parsepooled!(flag, column, columns, buf, pos, len, opts, row, rowoffset, col, rowsguess, pool, refs, types, flags)
elseif column isa Vector{PosLen}
pos, code = parsestring!(flag, column, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif customtypes !== Tuple{}
pos, code = parsecustom!(customtypes, flag, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
else
error("bad array type: $(typeof(column))")
end
if promote_to_string(code)
# debug && println("promoting col = $col to string")
promotetostring!(col, TR, ncols, typemap, columns, buf, startpos, len, row, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options, coloptions, customtypes)
end
if transpose
@inbounds positions[col] = pos
else
if col < ncols
if Parsers.newline(code) || pos > len
options.silencewarnings || notenoughcolumns(col, ncols, rowoffset + row)
for j = (col + 1):ncols
@inbounds flags[j] |= ANYMISSING
@inbounds types[j] = Union{Missing, types[j]}
end
break # from for col = 1:ncols
end
@inline function parserow(row, TR::Val{transpose}, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, filter, names, codes, debug, options, coloptions, ::Type{customtypes}) where {transpose, customtypes}
while pos <= len
rowstart = pos
rowcode = None
for col = 1:ncols
if transpose
@inbounds pos = positions[col]
end
@inbounds flag = flags[col]
@inbounds column = columns[col]
@inbounds opts = coloptions === nothing ? options : coloptions[col]
# @show typeof(column)
if willdrop(flag) || (user(flag) && column isa MissingVector)
pos, code = parsemissing!(buf, pos, len, opts, row, rowoffset, col)
elseif !typedetected(flag)
pos, code = detect(columns, buf, pos, len, opts, row, rowoffset, col, typemap, pool, refs, debug, types, flags, rowsguess)
elseif column isa SVec{Int64}
pos, code = parseint!(flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{Float64}
pos, code = parsevalue!(Float64, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec2{String}
pos, code = parsestring2!(flag, column, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{Date}
pos, code = parsevalue!(Date, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{DateTime}
pos, code = parsevalue!(DateTime, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{Time}
pos, code = parsevalue!(Time, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa Vector{Union{Missing, Bool}}
pos, code = parsevalue!(Bool, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa Vector{UInt32}
pos, code = parsepooled!(flag, column, columns, buf, pos, len, opts, row, rowoffset, col, rowsguess, pool, refs, types, flags)
elseif column isa Vector{PosLen}
pos, code = parsestring!(flag, column, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif customtypes !== Tuple{}
pos, code = parsecustom!(customtypes, flag, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
else
if pos <= len && !Parsers.newline(code)
options.silencewarnings || toomanycolumns(ncols, rowoffset + row)
# ignore the rest of the line
pos = skiptorow(buf, pos, len, options.oq, options.e, options.cq, 1, 2)
error("bad array type: $(typeof(column))")
end
if promote_to_string(code)
# debug && println("promoting col = $col to string")
promotetostring!(col, TR, ncols, typemap, columns, buf, startpos, len, row, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options, coloptions, customtypes)
end
if filter !== nothing
@inbounds codes[col] = code
end
if transpose
@inbounds positions[col] = pos
else
if col < ncols
if Parsers.newline(code) || pos > len
rowcode = NotEnoughColumns
options.silencewarnings || notenoughcolumns(col, ncols, rowoffset + row)
for j = (col + 1):ncols
@inbounds flags[j] |= ANYMISSING
@inbounds types[j] = Union{Missing, types[j]}
end
break # from for col = 1:ncols
end
else
if col < ncols
if Parsers.newline(code) || pos > len
rowcode = NotEnoughColumns
options.silencewarnings || notenoughcolumns(col, ncols, rowoffset + row)
for j = (col + 1):ncols
@inbounds flags[j] |= ANYMISSING
@inbounds types[j] = Union{Missing, types[j]}
end
break # from for col = 1:ncols
end
else
if pos <= len && !Parsers.newline(code)
rowcode = TooManyColumns
options.silencewarnings || toomanycolumns(ncols, rowoffset + row)
# ignore the rest of the line
pos = skiptorow(buf, pos, len, options.oq, options.e, options.cq, 1, 2)
end
end
end
end
end
if filter === nothing
break
elseif filter(ParsingRow(names, types, flags, rowcode, codes, row, columns, buf, rowstart, pos - rowstart))
break
end
end
return pos
end

struct ParsingRow <: Tables.AbstractRow
names::Vector{Symbol}
types::Vector{Type}
flags::Vector{UInt8}
code::RowErrorCode
codes::Vector{Int16}
row::Int64
columns::Vector{AbstractVector}
buf::AbstractVector{UInt8}
pos::Int64 # starting byte position of row
len::Int64 # # of bytes in row
end

getnames(r::ParsingRow) = getfield(r, :names)
getcolumn(r::ParsingRow, col::Int) = getfield(r, :columns)[col]
getcolumn(r::ParsingRow, col::Symbol) = getfield(r, :columns)[findfirst(==(col), getnames(r))]
gettypes(r::ParsingRow) = getfield(r, :types)
getflags(r::ParsingRow) = getfield(r, :flags)
getcode(r::ParsingRow) = getfield(r, :code)
getcodes(r::ParsingRow) = getfield(r, :codes)
getrow(r::ParsingRow) = getfield(r, :row)
getbuf(r::ParsingRow) = getfield(r, :buf)
getpos(r::ParsingRow) = getfield(r, :pos)
getlen(r::ParsingRow) = getfield(r, :len)

Tables.columnnames(r::ParsingRow) = getnames(r)

@inline function Tables.getcolumn(row::ParsingRow, ::Type{T}, i::Int, nm::Symbol) where {T}
column = getcolumn(row, i)
@inbounds x = column[getrow(row)]
return x
end

@inline function Tables.getcolumn(row::ParsingRow, col::Symbol)
column = getcolumn(row, col)
@inbounds x = column[getrow(row)]
return x
end

@inline function Tables.getcolumn(row::ParsingRow, col::Int)
column = getcolumn(row, col)
@inbounds x = column[getrow(row)]
return x
end

@inline function poslen(code, pos, len)
pos = Core.bitcast(UInt64, pos) << 20
pos |= ifelse(Parsers.sentinel(code), MISSING_BIT, UInt64(0))
Expand Down
Loading