Skip to content

Commit

Permalink
Merge pull request #6 from JuliaData/td-more-cleanup-and-tests
Browse files Browse the repository at this point in the history
Ignore empty rows works before the header
  • Loading branch information
Drvi committed Jul 18, 2023
2 parents a734a34 + 417de1c commit dec42da
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 112 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ SentinelArrays = "1.4"
julia = "1.6"

[extras]
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test", "Random"]
33 changes: 33 additions & 0 deletions src/ChunkedBase.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ struct ChunkingContext
nworkers::Int
limit::Int
comment::Union{Nothing,Vector{UInt8}}
# combination on `id` and `buffer_refills` can be used to detect if any pointers to `bytes` are still valid
buffer_refills::Base.RefValue{Int}
end
function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer, comment::Union{Nothing,UInt8,String,Char,Vector{UInt8}})
(4 <= buffersize <= typemax(Int32)) || throw(ArgumentError("`buffersize` argument must be larger than 4 and smaller than 2_147_483_648 bytes."))
Expand All @@ -46,6 +48,7 @@ function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer,
nworkers,
limit,
_comment_to_bytes(comment),
Ref(0),
)
end
function ChunkingContext(ctx::ChunkingContext)
Expand All @@ -57,6 +60,7 @@ function ChunkingContext(ctx::ChunkingContext)
ctx.nworkers,
ctx.limit,
ctx.comment,
Ref(0),
)
out.newline_positions.elements[1] = 0
return out
Expand Down Expand Up @@ -127,5 +131,34 @@ export parse_file_serial, parse_file_parallel, populate_result_buffer!
# TRACING # foreach(empty!, PARSER_TASKS_TIMES)
# TRACING # return nothing
# TRACING # end
# TRACING # function dump_traces(path)
# TRACING # open(path, "w") do io
# TRACING # write(io, UInt32(length(IO_TASK_TIMES)), IO_TASK_TIMES)
# TRACING # write(io, UInt32(length(LEXER_TASK_TIMES)), LEXER_TASK_TIMES)
# TRACING # write(io, UInt32(length(T1)), T1)
# TRACING # write(io, UInt32(length(T2)), T2)
# TRACING #
# TRACING # write(io, UInt32(length(PARSER_TASKS_TIMES)))
# TRACING # for x in PARSER_TASKS_TIMES
# TRACING # write(io, UInt32(length(x)), x)
# TRACING # end
# TRACING # end
# TRACING # return nothing
# TRACING # end
# TRACING # function load_traces!(path)
# TRACING # _resize!(vv, n) = length(vv) >= n ? resize!(vv, n) : append!(vv, [UInt[] for _ in 1:n-length(vv)])
# TRACING # open(path, "r") do io
# TRACING # read!(io, resize!(IO_TASK_TIMES, read(io, UInt32)))
# TRACING # read!(io, resize!(LEXER_TASK_TIMES, read(io, UInt32)))
# TRACING # read!(io, resize!(T1, read(io, UInt32)))
# TRACING # read!(io, resize!(T2, read(io, UInt32)))
# TRACING #
# TRACING # _resize!(PARSER_TASKS_TIMES, read(io, UInt32))
# TRACING # for x in PARSER_TASKS_TIMES
# TRACING # read!(io, resize!(x, read(io, UInt32)))
# TRACING # end
# TRACING # end
# TRACING # return nothing
# TRACING # end

end
1 change: 0 additions & 1 deletion src/ConsumeContexts.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ there to be `ntasks * (1 + length(parsing_ctx.schema))` units of work per chunk
See also [`consume!`](@ref), [`setup_tasks!`](@ref), [`task_done!`](@ref), [`cleanup`](@ref)
"""
function setup_tasks!(::AbstractConsumeContext, chunking_ctx::ChunkingContext, ntasks::Int)
# TRACING # chunking_ctx.id == 1 ? push!(ChunkedBase.T1, time_ns()) : push!(ChunkedBase.T2, time_ns())
set!(chunking_ctx.counter, ntasks)
return nothing
end
Expand Down
26 changes: 15 additions & 11 deletions src/parser_parallel.jl
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
# What the read_and_lex_task! task submits to the many parser tasks
const SubtaskMetadata = @NamedTuple{task_start::Int32, task_end::Int32, row_num::Int, task_num::Int, use_current_context::Bool}

function submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, row_num)
task_size = estimate_task_size(chunking_ctx)
ntasks = cld(length(chunking_ctx.newline_positions), task_size)
# Set the expected number of parsing tasks
# TRACING # chunking_ctx.id == 1 ? push!(ChunkedBase.T1, time_ns()) : push!(ChunkedBase.T2, time_ns())
setup_tasks!(consume_ctx, chunking_ctx, ntasks)
# Send task definitions (segment of `eols` to process) to the queue
task_start = Int32(1)
task_num = 1
for task in Iterators.partition(eachindex(chunking_ctx.newline_positions), task_size)
task_end = Int32(last(task))
put!(parsing_queue, (task_start, task_end, row_num, task_num, chunking_ctx.id == 1))
put!(parsing_queue, SubtaskMetadata((task_start, task_end, row_num, task_num, chunking_ctx.id == 1)))
row_num += Int(task_end - task_start)
task_start = task_end
task_num += 1
Expand All @@ -17,12 +21,12 @@ function submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, row_num)
end

function read_and_lex_task!(
parsing_queue::Channel{T}, # To submit work for parser tasks (which segment of newlines to process)
lexer::Lexer, # Finding newlines
chunking_ctx::ChunkingContext, # Holds raw bytes, synchronization objects and stores newline positions
chunking_ctx_next::ChunkingContext, # double-buffering
consume_ctx::AbstractConsumeContext # user-provided data (to overload setup_tasks!)
) where {T}
parsing_queue::Channel{SubtaskMetadata}, # To submit work for parser tasks (which segment of newlines to process)
lexer::Lexer, # Finding newlines
chunking_ctx::ChunkingContext, # Holds raw bytes, synchronization objects and stores newline positions
chunking_ctx_next::ChunkingContext, # double-buffering
consume_ctx::AbstractConsumeContext # user-provided data (to overload setup_tasks!)
)
limit_eols!(chunking_ctx, 1) && return
row_num = submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, 1)
@inbounds while true
Expand All @@ -47,14 +51,14 @@ end

function process_and_consume_task(
worker_id::Int, # unique identifier of this task
parsing_queue::Channel{T}, # where workers receive work
parsing_queue::Channel{SubtaskMetadata}, # where workers receive work
result_buffers::Vector{<:AbstractResultBuffer}, # where we store parsed results
consume_ctx::AbstractConsumeContext, # user-provided data (what to to with the parsed results)
parsing_ctx::AbstractParsingContext, # library-provided data (to distinguish JSONL and CSV processing)
chunking_ctx::ChunkingContext, # internal data to facilitate chunking and synchronization
chunking_ctx_next::ChunkingContext, # double-buffering
::Type{CT} # compile time known data for parser
) where {T,CT}
) where {CT}
# TRACING # trace = get_parser_task_trace(worker_id)
_comment = chunking_ctx.comment
try
Expand Down Expand Up @@ -97,7 +101,7 @@ function parse_file_parallel(
@assert chunking_ctx.id == 1
length(result_buffers) != total_result_buffers_count(chunking_ctx) && ArgumentError("Expected $(total_result_buffers_count(chunking_ctx)) result buffers, got $(length(result_buffers)).")

parsing_queue = Channel{Tuple{Int32,Int32,Int,Int,Bool}}(Inf)
parsing_queue = Channel{SubtaskMetadata}(Inf)
if lexer.done
chunking_ctx_next = chunking_ctx
else
Expand All @@ -119,7 +123,7 @@ function parse_file_parallel(
end
# Cleanup
for _ in 1:chunking_ctx.nworkers
put!(parsing_queue, (Int32(0), Int32(0), 0, 0, true))
put!(parsing_queue, SubtaskMetadata((Int32(0), Int32(0), 0, 0, true)))
end
foreach(wait, parser_tasks)
close(parsing_queue)
Expand Down
1 change: 1 addition & 0 deletions src/parser_serial.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ function parse_file_serial(
task_size = estimate_task_size(chunking_ctx)
task_start = Int32(1)
for task in Iterators.partition(eachindex(chunking_ctx.newline_positions), task_size)
# TRACING # chunking_ctx.id == 1 ? push!(ChunkedBase.T1, time_ns()) : push!(ChunkedBase.T2, time_ns())
setup_tasks!(consume_ctx, chunking_ctx, 1)
task_end = Int32(last(task))
newline_segment = @view(chunking_ctx.newline_positions.elements[task_start:task_end])
Expand Down
15 changes: 7 additions & 8 deletions src/payload.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ end
Base.length(payload::ParsedPayload) = payload.len
last_row(payload::ParsedPayload) = payload.row_num + length(payload) - 1


#
# PayloadOrderer
#

function insertsorted!(arr::Vector{T}, x::T, by=identity) where {T}
idx = searchsortedfirst(arr, x, by=by)
insert!(arr, idx, x)
return idx
end

# Like a Channel, but when you take! a Payload from it, it will be the next one in order
mutable struct PayloadOrderer{B<:AbstractResultBuffer, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}}
mutable struct PayloadOrderer{B, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}}
queue::Channel{ParsedPayload{B,C}}
expected_row::Int
waititng_room::Vector{ParsedPayload{B,C}}
Expand All @@ -42,12 +47,6 @@ function _reenqueue_ordered!(queue::Channel{T}, waiting_room::Vector{T}, payload
end
end

function insertsorted!(arr::Vector{T}, x::T, by=identity) where {T}
idx = searchsortedfirst(arr, x, by=by)
insert!(arr, idx, x)
return idx
end

function _reorder!(queue::Channel{T}, waiting_room::Vector{T}, payload::T, expected_row::Int) where{T}
row = payload.row_num
if row == expected_row
Expand Down
2 changes: 2 additions & 0 deletions src/read_and_lex.jl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ function read_and_lex!(lexer::Lexer, chunking_ctx::ChunkingContext, _last_newlin
end

bytes_read_in = prepare_buffer!(lexer.io, chunking_ctx.bytes, _last_newline_at)
chunking_ctx.buffer_refills[] += 1
# _last_newline_at == 0 && bytes_read_in == 0 && return nothing # only BOM / whitespace in the buffer

start_pos = _last_newline_at == 0 ? 1 : length(chunking_ctx.bytes) - _last_newline_at + 1
Expand All @@ -84,6 +85,7 @@ function initial_read!(io, chunking_ctx, skip_leading_whitespace=false)

# This is the first time we saw the buffer, we'll just fill it up
bytes_read_in = readbytesall!(io, buf, buffersize)
chunking_ctx.buffer_refills[] += 1

# bytes_read_in = _skip_over_initial_whitespace_and_bom!(io, buf, bytes_read_in)
starts_with_bom = bytes_read_in > 2 && _hasBOM(buf)
Expand Down
57 changes: 29 additions & 28 deletions src/read_and_lex_utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ function _detect_newline(buf, pos, len)
v = view(buf, pos:len)
if isnothing(findfirst(==(UInt8('\n')), v))
if isnothing(findfirst(==(UInt8('\r')), v))
throw(ArgumentError("No newline detected. Specify the newline character explicitly via the `newline` keyword argument. Use `\n` even for CRLF."))
return UInt8('\n')
else
return UInt8('\r')
end
Expand Down Expand Up @@ -74,49 +74,49 @@ _startswith(s::AbstractVector{UInt8}, prefix::AbstractVector{UInt8}) = _startswi
_startswith(s, soff, prefix::Nothing) = false
_startswith(s, prefix::Nothing) = false

skip_rows_init!(lexer, chunking_ctx, rows_to_skip) = _skip_rows_init!(lexer, chunking_ctx, rows_to_skip, chunking_ctx.comment)

function _skip_rows_init!(lexer, chunking_ctx, rows_to_skip, comment)
function skip_rows_init!(lexer, chunking_ctx, rows_to_skip, ignoreemptyrows=false)
input_is_empty = length(chunking_ctx.newline_positions) == 1
lines_skipped_total = 0
input_is_empty && return lines_skipped_total
# To know where in the end-of-line buffer we are while deciding whether we can skip or
# if we need to refill the buffer because we skipped everything in it.
input_is_empty && (return 0)
skipped_rows = _skip_rows_init_fast!(lexer, chunking_ctx, rows_to_skip)
if chunking_ctx.comment !== nothing || ignoreemptyrows
skipped_rows += _skip_comments_and_empty_rows!(lexer, chunking_ctx, chunking_ctx.comment, ignoreemptyrows)
end
return skipped_rows
end

function _skip_comments_and_empty_rows!(lexer, chunking_ctx, comment, ignoreemptyrows)
rows_skipped = 0
eol_index = 1
prev_nl = @inbounds chunking_ctx.newline_positions[eol_index]
@inbounds while true
# Did we exhaust the buffer during skipping?
if eol_index == length(chunking_ctx.newline_positions)
if lexer.done
break
else
ChunkedBase.read_and_lex!(lexer, chunking_ctx)
length(chunking_ctx.newline_positions) == 1 && return rows_skipped
eol_index = 1
prev_nl = chunking_ctx.newline_positions[eol_index]
end
end

if !_startswith(chunking_ctx.bytes, chunking_ctx.newline_positions[eol_index], comment)
# Non commented row: if we still have rows to skip, we skip it, otherwise we're done
if rows_to_skip > 0
rows_to_skip -= 1
else
break
end
# Commented row: we skip it if we still have rows to skip, otherwise we move to another row.
# This means that if there are consecutive commented rows after the rows to skip, we'll
# skip them all.
else rows_to_skip > 0
rows_to_skip -= 1
nl = chunking_ctx.newline_positions[eol_index+1]

if (_startswith(chunking_ctx.bytes, prev_nl, comment) || (ignoreemptyrows && _isemptyrow(prev_nl, nl, chunking_ctx.bytes)))
eol_index += 1
rows_skipped += 1
prev_nl = nl
else # not a commented or empty row, we're done
break
end
eol_index += 1
lines_skipped_total += 1
end

# We need to shift the newline positions to the left to account for the skipped rows
shiftleft!(chunking_ctx.newline_positions, eol_index-1)
return lines_skipped_total
return rows_skipped
end

function _skip_rows_init!(lexer, chunking_ctx, rows_to_skip, comment::Nothing)
function _skip_rows_init_fast!(lexer, chunking_ctx, rows_to_skip)
# If there are more rows to skip than the number of rows in the buffer, we skip the whole buffer
while !lexer.done && rows_to_skip >= length(chunking_ctx.newline_positions) - 1
rows_to_skip -= length(chunking_ctx.newline_positions) - 1
Expand All @@ -139,15 +139,15 @@ mutable struct MmapStream <: IO
end
MmapStream(ios::IO) = MmapStream(ios, Mmap.mmap(ios, grow=false, shared=false), 1)
Base.close(m::MmapStream) = close(m.ios)
Base.eof(m::MmapStream) = m.pos == length(m.x)
Base.eof(m::MmapStream) = m.pos > length(m.x)
function readbytesall!(io::MmapStream, buf, n::Int)
bytes_to_read = min(bytesavailable(io), n)
unsafe_copyto!(pointer(buf), pointer(io.x) + io.pos - 1, bytes_to_read)
io.pos += bytes_to_read
return bytes_to_read
end
Base.bytesavailable(m::MmapStream) = length(m.x) - m.pos + 1
# Interop with GzipDecompressorStream
Base.bytesavailable(m::MmapStream) = length(m.x) - m.pos
Base.isopen(m::MmapStream) = isopen(m.ios) && !eof(m)
Base.filesize(io::MmapStream) = length(io.x)
function Base.unsafe_read(from::MmapStream, p::Ptr{UInt8}, nb::UInt)
Expand All @@ -165,6 +165,7 @@ function Base.read(io::MmapStream, ::Type{UInt8})
if avail == 0
throw(EOFError())
end
out = @inbounds io.x[io.pos]
io.pos += 1
return io.x[io.pos]
return out
end
Loading

0 comments on commit dec42da

Please sign in to comment.