Skip to content

Commit

Permalink
Ignore empty rows works before the header
Browse files Browse the repository at this point in the history
and other cleanups and tests
  • Loading branch information
Drvi committed Jul 17, 2023
1 parent a734a34 commit a35e652
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 109 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ SentinelArrays = "1.4"
julia = "1.6"

[extras]
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test", "Random"]
28 changes: 28 additions & 0 deletions src/ChunkedBase.jl
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,33 @@ export parse_file_serial, parse_file_parallel, populate_result_buffer!
# TRACING # foreach(empty!, PARSER_TASKS_TIMES)
# TRACING # return nothing
# TRACING # end
# TRACING # function dump_traces(path)
# TRACING # open(path, "w") do io
# TRACING # write(io, UInt32(length(IO_TASK_TIMES)), IO_TASK_TIMES)
# TRACING # write(io, UInt32(length(LEXER_TASK_TIMES)), LEXER_TASK_TIMES)
# TRACING # write(io, UInt32(length(T1)), T1)
# TRACING # write(io, UInt32(length(T2)), T2)
# TRACING #
# TRACING # write(io, UInt32(length(PARSER_TASKS_TIMES)))
# TRACING # for x in PARSER_TASKS_TIMES
# TRACING # write(io, UInt32(length(x)), x)
# TRACING # end
# TRACING # end
# TRACING # return nothing
# TRACING # end
# TRACING # function load_traces!(path)
# TRACING # open(path, "r") do io
# TRACING # read!(io, resize!(IO_TASK_TIMES, read(io, UInt32)))
# TRACING # read!(io, resize!(LEXER_TASK_TIMES, read(io, UInt32)))
# TRACING # read!(io, resize!(T1, read(io, UInt32)))
# TRACING # read!(io, resize!(T2, read(io, UInt32)))
# TRACING #
# TRACING # resize!(PARSER_TASKS_TIMES, read(io, UInt32))
# TRACING # for x in PARSER_TASKS_TIMES
# TRACING # read!(io, resize!(x, read(io, UInt32)))
# TRACING # end
# TRACING # end
# TRACING # return nothing
# TRACING # end

end
25 changes: 14 additions & 11 deletions src/parser_parallel.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# What the read_and_lex_task! task submits to the many parser tasks
const SubtaskMetadata = @NamedTuple{task_start::Int32, task_end::Int32, row_num::Int, task_num::Int, use_current_context::Bool}

function submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, row_num)
task_size = estimate_task_size(chunking_ctx)
ntasks = cld(length(chunking_ctx.newline_positions), task_size)
Expand All @@ -8,7 +11,7 @@ function submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, row_num)
task_num = 1
for task in Iterators.partition(eachindex(chunking_ctx.newline_positions), task_size)
task_end = Int32(last(task))
put!(parsing_queue, (task_start, task_end, row_num, task_num, chunking_ctx.id == 1))
put!(parsing_queue, SubtaskMetadata((task_start, task_end, row_num, task_num, chunking_ctx.id == 1)))
row_num += Int(task_end - task_start)
task_start = task_end
task_num += 1
Expand All @@ -17,12 +20,12 @@ function submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, row_num)
end

function read_and_lex_task!(
parsing_queue::Channel{T}, # To submit work for parser tasks (which segment of newlines to process)
lexer::Lexer, # Finding newlines
chunking_ctx::ChunkingContext, # Holds raw bytes, synchronization objects and stores newline positions
chunking_ctx_next::ChunkingContext, # double-buffering
consume_ctx::AbstractConsumeContext # user-provided data (to overload setup_tasks!)
) where {T}
parsing_queue::Channel{SubtaskMetadata}, # To submit work for parser tasks (which segment of newlines to process)
lexer::Lexer, # Finding newlines
chunking_ctx::ChunkingContext, # Holds raw bytes, synchronization objects and stores newline positions
chunking_ctx_next::ChunkingContext, # double-buffering
consume_ctx::AbstractConsumeContext # user-provided data (to overload setup_tasks!)
)
limit_eols!(chunking_ctx, 1) && return
row_num = submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, 1)
@inbounds while true
Expand All @@ -47,14 +50,14 @@ end

function process_and_consume_task(
worker_id::Int, # unique identifier of this task
parsing_queue::Channel{T}, # where workers receive work
parsing_queue::Channel{SubtaskMetadata}, # where workers receive work
result_buffers::Vector{<:AbstractResultBuffer}, # where we store parsed results
consume_ctx::AbstractConsumeContext, # user-provided data (what to to with the parsed results)
parsing_ctx::AbstractParsingContext, # library-provided data (to distinguish JSONL and CSV processing)
chunking_ctx::ChunkingContext, # internal data to facilitate chunking and synchronization
chunking_ctx_next::ChunkingContext, # double-buffering
::Type{CT} # compile time known data for parser
) where {T,CT}
) where {CT}
# TRACING # trace = get_parser_task_trace(worker_id)
_comment = chunking_ctx.comment
try
Expand Down Expand Up @@ -97,7 +100,7 @@ function parse_file_parallel(
@assert chunking_ctx.id == 1
length(result_buffers) != total_result_buffers_count(chunking_ctx) && ArgumentError("Expected $(total_result_buffers_count(chunking_ctx)) result buffers, got $(length(result_buffers)).")

parsing_queue = Channel{Tuple{Int32,Int32,Int,Int,Bool}}(Inf)
parsing_queue = Channel{SubtaskMetadata}(Inf)
if lexer.done
chunking_ctx_next = chunking_ctx
else
Expand All @@ -119,7 +122,7 @@ function parse_file_parallel(
end
# Cleanup
for _ in 1:chunking_ctx.nworkers
put!(parsing_queue, (Int32(0), Int32(0), 0, 0, true))
put!(parsing_queue, SubtaskMetadata((Int32(0), Int32(0), 0, 0, true)))
end
foreach(wait, parser_tasks)
close(parsing_queue)
Expand Down
15 changes: 7 additions & 8 deletions src/payload.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ end
Base.length(payload::ParsedPayload) = payload.len
last_row(payload::ParsedPayload) = payload.row_num + length(payload) - 1


#
# PayloadOrderer
#

function insertsorted!(arr::Vector{T}, x::T, by=identity) where {T}
idx = searchsortedfirst(arr, x, by=by)
insert!(arr, idx, x)
return idx
end

# Like a Channel, but when you take! a Payload from it, it will be the next one in order
mutable struct PayloadOrderer{B<:AbstractResultBuffer, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}}
mutable struct PayloadOrderer{B, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}}
queue::Channel{ParsedPayload{B,C}}
expected_row::Int
waititng_room::Vector{ParsedPayload{B,C}}
Expand All @@ -42,12 +47,6 @@ function _reenqueue_ordered!(queue::Channel{T}, waiting_room::Vector{T}, payload
end
end

function insertsorted!(arr::Vector{T}, x::T, by=identity) where {T}
idx = searchsortedfirst(arr, x, by=by)
insert!(arr, idx, x)
return idx
end

function _reorder!(queue::Channel{T}, waiting_room::Vector{T}, payload::T, expected_row::Int) where{T}
row = payload.row_num
if row == expected_row
Expand Down
57 changes: 29 additions & 28 deletions src/read_and_lex_utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ function _detect_newline(buf, pos, len)
v = view(buf, pos:len)
if isnothing(findfirst(==(UInt8('\n')), v))
if isnothing(findfirst(==(UInt8('\r')), v))
throw(ArgumentError("No newline detected. Specify the newline character explicitly via the `newline` keyword argument. Use `\n` even for CRLF."))
return UInt8('\n')
else
return UInt8('\r')
end
Expand Down Expand Up @@ -74,49 +74,49 @@ _startswith(s::AbstractVector{UInt8}, prefix::AbstractVector{UInt8}) = _startswi
_startswith(s, soff, prefix::Nothing) = false
_startswith(s, prefix::Nothing) = false

skip_rows_init!(lexer, chunking_ctx, rows_to_skip) = _skip_rows_init!(lexer, chunking_ctx, rows_to_skip, chunking_ctx.comment)

function _skip_rows_init!(lexer, chunking_ctx, rows_to_skip, comment)
function skip_rows_init!(lexer, chunking_ctx, rows_to_skip, ignoreemptyrows=false)
input_is_empty = length(chunking_ctx.newline_positions) == 1
lines_skipped_total = 0
input_is_empty && return lines_skipped_total
# To know where in the end-of-line buffer we are while deciding whether we can skip or
# if we need to refill the buffer because we skipped everything in it.
input_is_empty && (return 0)
skipped_rows = _skip_rows_init_fast!(lexer, chunking_ctx, rows_to_skip)
if chunking_ctx.comment !== nothing || ignoreemptyrows
skipped_rows += _skip_comments_and_empty_rows!(lexer, chunking_ctx, chunking_ctx.comment, ignoreemptyrows)
end
return skipped_rows
end

function _skip_comments_and_empty_rows!(lexer, chunking_ctx, comment, ignoreemptyrows)
rows_skipped = 0
eol_index = 1
prev_nl = @inbounds chunking_ctx.newline_positions[eol_index]
@inbounds while true
# Did we exhaust the buffer during skipping?
if eol_index == length(chunking_ctx.newline_positions)
if lexer.done
break
else
ChunkedBase.read_and_lex!(lexer, chunking_ctx)
length(chunking_ctx.newline_positions) == 1 && return rows_skipped
eol_index = 1
prev_nl = chunking_ctx.newline_positions[eol_index]
end
end

if !_startswith(chunking_ctx.bytes, chunking_ctx.newline_positions[eol_index], comment)
# Non commented row: if we still have rows to skip, we skip it, otherwise we're done
if rows_to_skip > 0
rows_to_skip -= 1
else
break
end
# Commented row: we skip it if we still have rows to skip, otherwise we move to another row.
# This means that if there are consecutive commented rows after the rows to skip, we'll
# skip them all.
else rows_to_skip > 0
rows_to_skip -= 1
nl = chunking_ctx.newline_positions[eol_index+1]

if (_startswith(chunking_ctx.bytes, prev_nl, comment) || (ignoreemptyrows && _isemptyrow(prev_nl, nl, chunking_ctx.bytes)))
eol_index += 1
rows_skipped += 1
prev_nl = nl
else # not a commented or empty row, we're done
break
end
eol_index += 1
lines_skipped_total += 1
end

# We need to shift the newline positions to the left to account for the skipped rows
shiftleft!(chunking_ctx.newline_positions, eol_index-1)
return lines_skipped_total
return rows_skipped
end

function _skip_rows_init!(lexer, chunking_ctx, rows_to_skip, comment::Nothing)
function _skip_rows_init_fast!(lexer, chunking_ctx, rows_to_skip)
# If there are more rows to skip than the number of rows in the buffer, we skip the whole buffer
while !lexer.done && rows_to_skip >= length(chunking_ctx.newline_positions) - 1
rows_to_skip -= length(chunking_ctx.newline_positions) - 1
Expand All @@ -139,15 +139,15 @@ mutable struct MmapStream <: IO
end
MmapStream(ios::IO) = MmapStream(ios, Mmap.mmap(ios, grow=false, shared=false), 1)
Base.close(m::MmapStream) = close(m.ios)
Base.eof(m::MmapStream) = m.pos == length(m.x)
Base.eof(m::MmapStream) = m.pos > length(m.x)
function readbytesall!(io::MmapStream, buf, n::Int)
bytes_to_read = min(bytesavailable(io), n)
unsafe_copyto!(pointer(buf), pointer(io.x) + io.pos - 1, bytes_to_read)
io.pos += bytes_to_read
return bytes_to_read
end
Base.bytesavailable(m::MmapStream) = length(m.x) - m.pos + 1
# Interop with GzipDecompressorStream
Base.bytesavailable(m::MmapStream) = length(m.x) - m.pos
Base.isopen(m::MmapStream) = isopen(m.ios) && !eof(m)
Base.filesize(io::MmapStream) = length(io.x)
function Base.unsafe_read(from::MmapStream, p::Ptr{UInt8}, nb::UInt)
Expand All @@ -165,6 +165,7 @@ function Base.read(io::MmapStream, ::Type{UInt8})
if avail == 0
throw(EOFError())
end
out = @inbounds io.x[io.pos]
io.pos += 1
return io.x[io.pos]
return out
end
59 changes: 56 additions & 3 deletions test/e2e_tests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ using ChunkedBase
using SentinelArrays: BufferedVector
using Test
using Random

using CodecZlibNG

# ParsingContext ###########################################################################

struct TestParsingContext <: AbstractParsingContext end


# ResultBuffer #############################################################################

struct TestResultBuffer <: AbstractResultBuffer
Expand All @@ -26,7 +25,6 @@ function ChunkedBase.populate_result_buffer!(result_buf::TestResultBuffer, newli
return nothing
end


# ConsumeContext 1 ########################################################################
### TestConsumeContext #####################################################################
struct TestConsumeContext{B,C} <: AbstractConsumeContext
Expand Down Expand Up @@ -118,6 +116,18 @@ function test_parallel(io; buffersize=8*1024, nworkers=2, limit=0, comment=nothi
end

@testset "e2e newline finder" begin
q, counter = test_serial(IOBuffer(""))
@test take!(q).results.newline_positions == Int32[0]
ChunkedBase.dec!(counter)
@test counter.n == 0
@test Base.n_avail(q.queue) == 0

q, counter = test_parallel(IOBuffer(""))
@test take!(q).results.newline_positions == Int32[0]
ChunkedBase.dec!(counter)
@test counter.n == 0
@test Base.n_avail(q.queue) == 0

q, counter = test_serial(IOBuffer("123456"))
@test take!(q).results.newline_positions == Int32[0, 7]
ChunkedBase.dec!(counter)
Expand Down Expand Up @@ -261,6 +271,49 @@ end
@test_throws ChunkedBase.UnmatchedQuoteError test_parallel(IOBuffer("\"ab"), buffersize=4, lexer_args=('\\', '"', '"'))
end

@testset "MmapStream" begin
mktemp() do path, io
write(io, "123\n45\n67\n123\n45\n67")
flush(io)
seekstart(io)
q, counter = test_parallel(ChunkedBase.MmapStream(io), buffersize=4, skipto=5)
@test take!(q).results.newline_positions == Int32[0, 3]
ChunkedBase.dec!(counter)
@test counter.n == 0
@test Base.n_avail(q.queue) == 0
end

mktemp() do path, io
compr_io = GzipCompressorStream(io)
write(compr_io, "123\n45\n67\n123\n45\n67")
close(compr_io)
q, counter = test_parallel(GzipDecompressorStream(ChunkedBase.MmapStream(open(path, "r"))), buffersize=4, skipto=5)
@test take!(q).results.newline_positions == Int32[0, 3]
ChunkedBase.dec!(counter)
@test counter.n == 0
@test Base.n_avail(q.queue) == 0
end

mktemp() do path, io
write(io, "123\n")
flush(io)
seekstart(io)
mm = ChunkedBase.MmapStream(io)
@test read(mm, UInt8) == UInt8('1')
@test read(mm, UInt8) == UInt8('2')
@test filesize(mm) == 4
@test isopen(mm)
@test !eof(mm)
@test read(mm, UInt8) == UInt8('3')
@test read(mm, UInt8) == UInt8('\n')
@test eof(mm)
@test isopen(mm.ios)
@test !isopen(mm)
close(mm)
@test !isopen(mm.ios)
end
end

@testset "Exception handling" begin
@testset "consume!" begin
@testset "serial" begin
Expand Down
Loading

0 comments on commit a35e652

Please sign in to comment.