Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore empty rows works before the header #6

Merged
merged 3 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ SentinelArrays = "1.4"
julia = "1.6"

[extras]
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test", "Random"]
33 changes: 33 additions & 0 deletions src/ChunkedBase.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ struct ChunkingContext
nworkers::Int
limit::Int
comment::Union{Nothing,Vector{UInt8}}
# combination on `id` and `buffer_refills` can be used to detect if any pointers to `bytes` are still valid
buffer_refills::Base.RefValue{Int}
end
function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer, comment::Union{Nothing,UInt8,String,Char,Vector{UInt8}})
(4 <= buffersize <= typemax(Int32)) || throw(ArgumentError("`buffersize` argument must be larger than 4 and smaller than 2_147_483_648 bytes."))
Expand All @@ -46,6 +48,7 @@ function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer,
nworkers,
limit,
_comment_to_bytes(comment),
Ref(0),
)
end
function ChunkingContext(ctx::ChunkingContext)
Expand All @@ -57,6 +60,7 @@ function ChunkingContext(ctx::ChunkingContext)
ctx.nworkers,
ctx.limit,
ctx.comment,
Ref(0),
)
out.newline_positions.elements[1] = 0
return out
Expand Down Expand Up @@ -127,5 +131,34 @@ export parse_file_serial, parse_file_parallel, populate_result_buffer!
# TRACING # foreach(empty!, PARSER_TASKS_TIMES)
# TRACING # return nothing
# TRACING # end
# TRACING # function dump_traces(path)
# TRACING # open(path, "w") do io
# TRACING # write(io, UInt32(length(IO_TASK_TIMES)), IO_TASK_TIMES)
# TRACING # write(io, UInt32(length(LEXER_TASK_TIMES)), LEXER_TASK_TIMES)
# TRACING # write(io, UInt32(length(T1)), T1)
# TRACING # write(io, UInt32(length(T2)), T2)
# TRACING #
# TRACING # write(io, UInt32(length(PARSER_TASKS_TIMES)))
# TRACING # for x in PARSER_TASKS_TIMES
# TRACING # write(io, UInt32(length(x)), x)
# TRACING # end
# TRACING # end
# TRACING # return nothing
# TRACING # end
# TRACING # function load_traces!(path)
# TRACING # _resize!(vv, n) = length(vv) >= n ? resize!(vv, n) : append!(vv, [UInt[] for _ in 1:n-length(vv)])
# TRACING # open(path, "r") do io
# TRACING # read!(io, resize!(IO_TASK_TIMES, read(io, UInt32)))
# TRACING # read!(io, resize!(LEXER_TASK_TIMES, read(io, UInt32)))
# TRACING # read!(io, resize!(T1, read(io, UInt32)))
# TRACING # read!(io, resize!(T2, read(io, UInt32)))
# TRACING #
# TRACING # _resize!(PARSER_TASKS_TIMES, read(io, UInt32))
# TRACING # for x in PARSER_TASKS_TIMES
# TRACING # read!(io, resize!(x, read(io, UInt32)))
# TRACING # end
# TRACING # end
# TRACING # return nothing
# TRACING # end

end
1 change: 0 additions & 1 deletion src/ConsumeContexts.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ there to be `ntasks * (1 + length(parsing_ctx.schema))` units of work per chunk
See also [`consume!`](@ref), [`setup_tasks!`](@ref), [`task_done!`](@ref), [`cleanup`](@ref)
"""
function setup_tasks!(::AbstractConsumeContext, chunking_ctx::ChunkingContext, ntasks::Int)
# TRACING # chunking_ctx.id == 1 ? push!(ChunkedBase.T1, time_ns()) : push!(ChunkedBase.T2, time_ns())
set!(chunking_ctx.counter, ntasks)
return nothing
end
Expand Down
26 changes: 15 additions & 11 deletions src/parser_parallel.jl
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
# What the read_and_lex_task! task submits to the many parser tasks
const SubtaskMetadata = @NamedTuple{task_start::Int32, task_end::Int32, row_num::Int, task_num::Int, use_current_context::Bool}

function submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, row_num)
task_size = estimate_task_size(chunking_ctx)
ntasks = cld(length(chunking_ctx.newline_positions), task_size)
# Set the expected number of parsing tasks
# TRACING # chunking_ctx.id == 1 ? push!(ChunkedBase.T1, time_ns()) : push!(ChunkedBase.T2, time_ns())
setup_tasks!(consume_ctx, chunking_ctx, ntasks)
# Send task definitions (segment of `eols` to process) to the queue
task_start = Int32(1)
task_num = 1
for task in Iterators.partition(eachindex(chunking_ctx.newline_positions), task_size)
task_end = Int32(last(task))
put!(parsing_queue, (task_start, task_end, row_num, task_num, chunking_ctx.id == 1))
put!(parsing_queue, SubtaskMetadata((task_start, task_end, row_num, task_num, chunking_ctx.id == 1)))
row_num += Int(task_end - task_start)
task_start = task_end
task_num += 1
Expand All @@ -17,12 +21,12 @@ function submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, row_num)
end

function read_and_lex_task!(
parsing_queue::Channel{T}, # To submit work for parser tasks (which segment of newlines to process)
lexer::Lexer, # Finding newlines
chunking_ctx::ChunkingContext, # Holds raw bytes, synchronization objects and stores newline positions
chunking_ctx_next::ChunkingContext, # double-buffering
consume_ctx::AbstractConsumeContext # user-provided data (to overload setup_tasks!)
) where {T}
parsing_queue::Channel{SubtaskMetadata}, # To submit work for parser tasks (which segment of newlines to process)
lexer::Lexer, # Finding newlines
chunking_ctx::ChunkingContext, # Holds raw bytes, synchronization objects and stores newline positions
chunking_ctx_next::ChunkingContext, # double-buffering
consume_ctx::AbstractConsumeContext # user-provided data (to overload setup_tasks!)
)
limit_eols!(chunking_ctx, 1) && return
row_num = submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, 1)
@inbounds while true
Expand All @@ -47,14 +51,14 @@ end

function process_and_consume_task(
worker_id::Int, # unique identifier of this task
parsing_queue::Channel{T}, # where workers receive work
parsing_queue::Channel{SubtaskMetadata}, # where workers receive work
result_buffers::Vector{<:AbstractResultBuffer}, # where we store parsed results
consume_ctx::AbstractConsumeContext, # user-provided data (what to to with the parsed results)
parsing_ctx::AbstractParsingContext, # library-provided data (to distinguish JSONL and CSV processing)
chunking_ctx::ChunkingContext, # internal data to facilitate chunking and synchronization
chunking_ctx_next::ChunkingContext, # double-buffering
::Type{CT} # compile time known data for parser
) where {T,CT}
) where {CT}
# TRACING # trace = get_parser_task_trace(worker_id)
_comment = chunking_ctx.comment
try
Expand Down Expand Up @@ -97,7 +101,7 @@ function parse_file_parallel(
@assert chunking_ctx.id == 1
length(result_buffers) != total_result_buffers_count(chunking_ctx) && ArgumentError("Expected $(total_result_buffers_count(chunking_ctx)) result buffers, got $(length(result_buffers)).")

parsing_queue = Channel{Tuple{Int32,Int32,Int,Int,Bool}}(Inf)
parsing_queue = Channel{SubtaskMetadata}(Inf)
if lexer.done
chunking_ctx_next = chunking_ctx
else
Expand All @@ -119,7 +123,7 @@ function parse_file_parallel(
end
# Cleanup
for _ in 1:chunking_ctx.nworkers
put!(parsing_queue, (Int32(0), Int32(0), 0, 0, true))
put!(parsing_queue, SubtaskMetadata((Int32(0), Int32(0), 0, 0, true)))
end
foreach(wait, parser_tasks)
close(parsing_queue)
Expand Down
1 change: 1 addition & 0 deletions src/parser_serial.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ function parse_file_serial(
task_size = estimate_task_size(chunking_ctx)
task_start = Int32(1)
for task in Iterators.partition(eachindex(chunking_ctx.newline_positions), task_size)
# TRACING # chunking_ctx.id == 1 ? push!(ChunkedBase.T1, time_ns()) : push!(ChunkedBase.T2, time_ns())
setup_tasks!(consume_ctx, chunking_ctx, 1)
task_end = Int32(last(task))
newline_segment = @view(chunking_ctx.newline_positions.elements[task_start:task_end])
Expand Down
15 changes: 7 additions & 8 deletions src/payload.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ end
Base.length(payload::ParsedPayload) = payload.len
last_row(payload::ParsedPayload) = payload.row_num + length(payload) - 1


#
# PayloadOrderer
#

function insertsorted!(arr::Vector{T}, x::T, by=identity) where {T}
idx = searchsortedfirst(arr, x, by=by)
insert!(arr, idx, x)
return idx
end

# Like a Channel, but when you take! a Payload from it, it will be the next one in order
mutable struct PayloadOrderer{B<:AbstractResultBuffer, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}}
mutable struct PayloadOrderer{B, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}}
queue::Channel{ParsedPayload{B,C}}
expected_row::Int
waititng_room::Vector{ParsedPayload{B,C}}
Expand All @@ -42,12 +47,6 @@ function _reenqueue_ordered!(queue::Channel{T}, waiting_room::Vector{T}, payload
end
end

function insertsorted!(arr::Vector{T}, x::T, by=identity) where {T}
idx = searchsortedfirst(arr, x, by=by)
insert!(arr, idx, x)
return idx
end

function _reorder!(queue::Channel{T}, waiting_room::Vector{T}, payload::T, expected_row::Int) where{T}
row = payload.row_num
if row == expected_row
Expand Down
2 changes: 2 additions & 0 deletions src/read_and_lex.jl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ function read_and_lex!(lexer::Lexer, chunking_ctx::ChunkingContext, _last_newlin
end

bytes_read_in = prepare_buffer!(lexer.io, chunking_ctx.bytes, _last_newline_at)
chunking_ctx.buffer_refills[] += 1
# _last_newline_at == 0 && bytes_read_in == 0 && return nothing # only BOM / whitespace in the buffer

start_pos = _last_newline_at == 0 ? 1 : length(chunking_ctx.bytes) - _last_newline_at + 1
Expand All @@ -84,6 +85,7 @@ function initial_read!(io, chunking_ctx, skip_leading_whitespace=false)

# This is the first time we saw the buffer, we'll just fill it up
bytes_read_in = readbytesall!(io, buf, buffersize)
chunking_ctx.buffer_refills[] += 1

# bytes_read_in = _skip_over_initial_whitespace_and_bom!(io, buf, bytes_read_in)
starts_with_bom = bytes_read_in > 2 && _hasBOM(buf)
Expand Down
57 changes: 29 additions & 28 deletions src/read_and_lex_utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ function _detect_newline(buf, pos, len)
v = view(buf, pos:len)
if isnothing(findfirst(==(UInt8('\n')), v))
if isnothing(findfirst(==(UInt8('\r')), v))
throw(ArgumentError("No newline detected. Specify the newline character explicitly via the `newline` keyword argument. Use `\n` even for CRLF."))
return UInt8('\n')
else
return UInt8('\r')
end
Expand Down Expand Up @@ -74,49 +74,49 @@ _startswith(s::AbstractVector{UInt8}, prefix::AbstractVector{UInt8}) = _startswi
_startswith(s, soff, prefix::Nothing) = false
_startswith(s, prefix::Nothing) = false

skip_rows_init!(lexer, chunking_ctx, rows_to_skip) = _skip_rows_init!(lexer, chunking_ctx, rows_to_skip, chunking_ctx.comment)

function _skip_rows_init!(lexer, chunking_ctx, rows_to_skip, comment)
function skip_rows_init!(lexer, chunking_ctx, rows_to_skip, ignoreemptyrows=false)
input_is_empty = length(chunking_ctx.newline_positions) == 1
lines_skipped_total = 0
input_is_empty && return lines_skipped_total
# To know where in the end-of-line buffer we are while deciding whether we can skip or
# if we need to refill the buffer because we skipped everything in it.
input_is_empty && (return 0)
skipped_rows = _skip_rows_init_fast!(lexer, chunking_ctx, rows_to_skip)
if chunking_ctx.comment !== nothing || ignoreemptyrows
skipped_rows += _skip_comments_and_empty_rows!(lexer, chunking_ctx, chunking_ctx.comment, ignoreemptyrows)
end
return skipped_rows
end

function _skip_comments_and_empty_rows!(lexer, chunking_ctx, comment, ignoreemptyrows)
rows_skipped = 0
eol_index = 1
prev_nl = @inbounds chunking_ctx.newline_positions[eol_index]
@inbounds while true
# Did we exhaust the buffer during skipping?
if eol_index == length(chunking_ctx.newline_positions)
if lexer.done
break
else
ChunkedBase.read_and_lex!(lexer, chunking_ctx)
length(chunking_ctx.newline_positions) == 1 && return rows_skipped
eol_index = 1
prev_nl = chunking_ctx.newline_positions[eol_index]
end
end

if !_startswith(chunking_ctx.bytes, chunking_ctx.newline_positions[eol_index], comment)
# Non commented row: if we still have rows to skip, we skip it, otherwise we're done
if rows_to_skip > 0
rows_to_skip -= 1
else
break
end
# Commented row: we skip it if we still have rows to skip, otherwise we move to another row.
# This means that if there are consecutive commented rows after the rows to skip, we'll
# skip them all.
else rows_to_skip > 0
rows_to_skip -= 1
nl = chunking_ctx.newline_positions[eol_index+1]

if (_startswith(chunking_ctx.bytes, prev_nl, comment) || (ignoreemptyrows && _isemptyrow(prev_nl, nl, chunking_ctx.bytes)))
eol_index += 1
rows_skipped += 1
prev_nl = nl
else # not a commented or empty row, we're done
break
end
eol_index += 1
lines_skipped_total += 1
end

# We need to shift the newline positions to the left to account for the skipped rows
shiftleft!(chunking_ctx.newline_positions, eol_index-1)
return lines_skipped_total
return rows_skipped
end

function _skip_rows_init!(lexer, chunking_ctx, rows_to_skip, comment::Nothing)
function _skip_rows_init_fast!(lexer, chunking_ctx, rows_to_skip)
# If there are more rows to skip than the number of rows in the buffer, we skip the whole buffer
while !lexer.done && rows_to_skip >= length(chunking_ctx.newline_positions) - 1
rows_to_skip -= length(chunking_ctx.newline_positions) - 1
Expand All @@ -139,15 +139,15 @@ mutable struct MmapStream <: IO
end
MmapStream(ios::IO) = MmapStream(ios, Mmap.mmap(ios, grow=false, shared=false), 1)
Base.close(m::MmapStream) = close(m.ios)
Base.eof(m::MmapStream) = m.pos == length(m.x)
Base.eof(m::MmapStream) = m.pos > length(m.x)
function readbytesall!(io::MmapStream, buf, n::Int)
bytes_to_read = min(bytesavailable(io), n)
unsafe_copyto!(pointer(buf), pointer(io.x) + io.pos - 1, bytes_to_read)
io.pos += bytes_to_read
return bytes_to_read
end
Base.bytesavailable(m::MmapStream) = length(m.x) - m.pos + 1
# Interop with GzipDecompressorStream
Base.bytesavailable(m::MmapStream) = length(m.x) - m.pos
Base.isopen(m::MmapStream) = isopen(m.ios) && !eof(m)
Base.filesize(io::MmapStream) = length(io.x)
function Base.unsafe_read(from::MmapStream, p::Ptr{UInt8}, nb::UInt)
Expand All @@ -165,6 +165,7 @@ function Base.read(io::MmapStream, ::Type{UInt8})
if avail == 0
throw(EOFError())
end
out = @inbounds io.x[io.pos]
io.pos += 1
return io.x[io.pos]
return out
end
Loading
Loading