diff --git a/Project.toml b/Project.toml index 7ac3bd4..d35f1f9 100644 --- a/Project.toml +++ b/Project.toml @@ -16,8 +16,8 @@ SentinelArrays = "1.4" julia = "1.6" [extras] -Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] test = ["Test", "Random"] diff --git a/src/ChunkedBase.jl b/src/ChunkedBase.jl index 03cd9a1..ec0c140 100644 --- a/src/ChunkedBase.jl +++ b/src/ChunkedBase.jl @@ -32,6 +32,8 @@ struct ChunkingContext nworkers::Int limit::Int comment::Union{Nothing,Vector{UInt8}} + # combination on `id` and `buffer_refills` can be used to detect if any pointers to `bytes` are still valid + buffer_refills::Base.RefValue{Int} end function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer, comment::Union{Nothing,UInt8,String,Char,Vector{UInt8}}) (4 <= buffersize <= typemax(Int32)) || throw(ArgumentError("`buffersize` argument must be larger than 4 and smaller than 2_147_483_648 bytes.")) @@ -46,6 +48,7 @@ function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer, nworkers, limit, _comment_to_bytes(comment), + Ref(0), ) end function ChunkingContext(ctx::ChunkingContext) @@ -57,6 +60,7 @@ function ChunkingContext(ctx::ChunkingContext) ctx.nworkers, ctx.limit, ctx.comment, + Ref(0), ) out.newline_positions.elements[1] = 0 return out @@ -127,5 +131,34 @@ export parse_file_serial, parse_file_parallel, populate_result_buffer! # TRACING # foreach(empty!, PARSER_TASKS_TIMES) # TRACING # return nothing # TRACING # end +# TRACING # function dump_traces(path) +# TRACING # open(path, "w") do io +# TRACING # write(io, UInt32(length(IO_TASK_TIMES)), IO_TASK_TIMES) +# TRACING # write(io, UInt32(length(LEXER_TASK_TIMES)), LEXER_TASK_TIMES) +# TRACING # write(io, UInt32(length(T1)), T1) +# TRACING # write(io, UInt32(length(T2)), T2) +# TRACING # +# TRACING # write(io, UInt32(length(PARSER_TASKS_TIMES))) +# TRACING # for x in PARSER_TASKS_TIMES +# TRACING # write(io, UInt32(length(x)), x) +# TRACING # end +# TRACING # end +# TRACING # return nothing +# TRACING # end +# TRACING # function load_traces!(path) +# TRACING # _resize!(vv, n) = length(vv) >= n ? resize!(vv, n) : append!(vv, [UInt[] for _ in 1:n-length(vv)]) +# TRACING # open(path, "r") do io +# TRACING # read!(io, resize!(IO_TASK_TIMES, read(io, UInt32))) +# TRACING # read!(io, resize!(LEXER_TASK_TIMES, read(io, UInt32))) +# TRACING # read!(io, resize!(T1, read(io, UInt32))) +# TRACING # read!(io, resize!(T2, read(io, UInt32))) +# TRACING # +# TRACING # _resize!(PARSER_TASKS_TIMES, read(io, UInt32)) +# TRACING # for x in PARSER_TASKS_TIMES +# TRACING # read!(io, resize!(x, read(io, UInt32))) +# TRACING # end +# TRACING # end +# TRACING # return nothing +# TRACING # end end diff --git a/src/ConsumeContexts.jl b/src/ConsumeContexts.jl index c6b561a..66e7d3b 100644 --- a/src/ConsumeContexts.jl +++ b/src/ConsumeContexts.jl @@ -48,7 +48,6 @@ there to be `ntasks * (1 + length(parsing_ctx.schema))` units of work per chunk See also [`consume!`](@ref), [`setup_tasks!`](@ref), [`task_done!`](@ref), [`cleanup`](@ref) """ function setup_tasks!(::AbstractConsumeContext, chunking_ctx::ChunkingContext, ntasks::Int) - # TRACING # chunking_ctx.id == 1 ? push!(ChunkedBase.T1, time_ns()) : push!(ChunkedBase.T2, time_ns()) set!(chunking_ctx.counter, ntasks) return nothing end diff --git a/src/parser_parallel.jl b/src/parser_parallel.jl index 5ade96d..87bd1d7 100644 --- a/src/parser_parallel.jl +++ b/src/parser_parallel.jl @@ -1,14 +1,18 @@ +# What the read_and_lex_task! task submits to the many parser tasks +const SubtaskMetadata = @NamedTuple{task_start::Int32, task_end::Int32, row_num::Int, task_num::Int, use_current_context::Bool} + function submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, row_num) task_size = estimate_task_size(chunking_ctx) ntasks = cld(length(chunking_ctx.newline_positions), task_size) # Set the expected number of parsing tasks + # TRACING # chunking_ctx.id == 1 ? push!(ChunkedBase.T1, time_ns()) : push!(ChunkedBase.T2, time_ns()) setup_tasks!(consume_ctx, chunking_ctx, ntasks) # Send task definitions (segment of `eols` to process) to the queue task_start = Int32(1) task_num = 1 for task in Iterators.partition(eachindex(chunking_ctx.newline_positions), task_size) task_end = Int32(last(task)) - put!(parsing_queue, (task_start, task_end, row_num, task_num, chunking_ctx.id == 1)) + put!(parsing_queue, SubtaskMetadata((task_start, task_end, row_num, task_num, chunking_ctx.id == 1))) row_num += Int(task_end - task_start) task_start = task_end task_num += 1 @@ -17,12 +21,12 @@ function submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, row_num) end function read_and_lex_task!( - parsing_queue::Channel{T}, # To submit work for parser tasks (which segment of newlines to process) - lexer::Lexer, # Finding newlines - chunking_ctx::ChunkingContext, # Holds raw bytes, synchronization objects and stores newline positions - chunking_ctx_next::ChunkingContext, # double-buffering - consume_ctx::AbstractConsumeContext # user-provided data (to overload setup_tasks!) -) where {T} + parsing_queue::Channel{SubtaskMetadata}, # To submit work for parser tasks (which segment of newlines to process) + lexer::Lexer, # Finding newlines + chunking_ctx::ChunkingContext, # Holds raw bytes, synchronization objects and stores newline positions + chunking_ctx_next::ChunkingContext, # double-buffering + consume_ctx::AbstractConsumeContext # user-provided data (to overload setup_tasks!) +) limit_eols!(chunking_ctx, 1) && return row_num = submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, 1) @inbounds while true @@ -47,14 +51,14 @@ end function process_and_consume_task( worker_id::Int, # unique identifier of this task - parsing_queue::Channel{T}, # where workers receive work + parsing_queue::Channel{SubtaskMetadata}, # where workers receive work result_buffers::Vector{<:AbstractResultBuffer}, # where we store parsed results consume_ctx::AbstractConsumeContext, # user-provided data (what to to with the parsed results) parsing_ctx::AbstractParsingContext, # library-provided data (to distinguish JSONL and CSV processing) chunking_ctx::ChunkingContext, # internal data to facilitate chunking and synchronization chunking_ctx_next::ChunkingContext, # double-buffering ::Type{CT} # compile time known data for parser -) where {T,CT} +) where {CT} # TRACING # trace = get_parser_task_trace(worker_id) _comment = chunking_ctx.comment try @@ -97,7 +101,7 @@ function parse_file_parallel( @assert chunking_ctx.id == 1 length(result_buffers) != total_result_buffers_count(chunking_ctx) && ArgumentError("Expected $(total_result_buffers_count(chunking_ctx)) result buffers, got $(length(result_buffers)).") - parsing_queue = Channel{Tuple{Int32,Int32,Int,Int,Bool}}(Inf) + parsing_queue = Channel{SubtaskMetadata}(Inf) if lexer.done chunking_ctx_next = chunking_ctx else @@ -119,7 +123,7 @@ function parse_file_parallel( end # Cleanup for _ in 1:chunking_ctx.nworkers - put!(parsing_queue, (Int32(0), Int32(0), 0, 0, true)) + put!(parsing_queue, SubtaskMetadata((Int32(0), Int32(0), 0, 0, true))) end foreach(wait, parser_tasks) close(parsing_queue) diff --git a/src/parser_serial.jl b/src/parser_serial.jl index 29bbfa9..0c8b421 100644 --- a/src/parser_serial.jl +++ b/src/parser_serial.jl @@ -14,6 +14,7 @@ function parse_file_serial( task_size = estimate_task_size(chunking_ctx) task_start = Int32(1) for task in Iterators.partition(eachindex(chunking_ctx.newline_positions), task_size) + # TRACING # chunking_ctx.id == 1 ? push!(ChunkedBase.T1, time_ns()) : push!(ChunkedBase.T2, time_ns()) setup_tasks!(consume_ctx, chunking_ctx, 1) task_end = Int32(last(task)) newline_segment = @view(chunking_ctx.newline_positions.elements[task_start:task_end]) diff --git a/src/payload.jl b/src/payload.jl index 6c4ac39..e41003b 100644 --- a/src/payload.jl +++ b/src/payload.jl @@ -14,13 +14,18 @@ end Base.length(payload::ParsedPayload) = payload.len last_row(payload::ParsedPayload) = payload.row_num + length(payload) - 1 - # # PayloadOrderer # +function insertsorted!(arr::Vector{T}, x::T, by=identity) where {T} + idx = searchsortedfirst(arr, x, by=by) + insert!(arr, idx, x) + return idx +end + # Like a Channel, but when you take! a Payload from it, it will be the next one in order -mutable struct PayloadOrderer{B<:AbstractResultBuffer, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}} +mutable struct PayloadOrderer{B, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}} queue::Channel{ParsedPayload{B,C}} expected_row::Int waititng_room::Vector{ParsedPayload{B,C}} @@ -42,12 +47,6 @@ function _reenqueue_ordered!(queue::Channel{T}, waiting_room::Vector{T}, payload end end -function insertsorted!(arr::Vector{T}, x::T, by=identity) where {T} - idx = searchsortedfirst(arr, x, by=by) - insert!(arr, idx, x) - return idx -end - function _reorder!(queue::Channel{T}, waiting_room::Vector{T}, payload::T, expected_row::Int) where{T} row = payload.row_num if row == expected_row diff --git a/src/read_and_lex.jl b/src/read_and_lex.jl index b83b585..bee9caa 100644 --- a/src/read_and_lex.jl +++ b/src/read_and_lex.jl @@ -64,6 +64,7 @@ function read_and_lex!(lexer::Lexer, chunking_ctx::ChunkingContext, _last_newlin end bytes_read_in = prepare_buffer!(lexer.io, chunking_ctx.bytes, _last_newline_at) + chunking_ctx.buffer_refills[] += 1 # _last_newline_at == 0 && bytes_read_in == 0 && return nothing # only BOM / whitespace in the buffer start_pos = _last_newline_at == 0 ? 1 : length(chunking_ctx.bytes) - _last_newline_at + 1 @@ -84,6 +85,7 @@ function initial_read!(io, chunking_ctx, skip_leading_whitespace=false) # This is the first time we saw the buffer, we'll just fill it up bytes_read_in = readbytesall!(io, buf, buffersize) + chunking_ctx.buffer_refills[] += 1 # bytes_read_in = _skip_over_initial_whitespace_and_bom!(io, buf, bytes_read_in) starts_with_bom = bytes_read_in > 2 && _hasBOM(buf) diff --git a/src/read_and_lex_utils.jl b/src/read_and_lex_utils.jl index 8c1197d..515b3a0 100644 --- a/src/read_and_lex_utils.jl +++ b/src/read_and_lex_utils.jl @@ -5,7 +5,7 @@ function _detect_newline(buf, pos, len) v = view(buf, pos:len) if isnothing(findfirst(==(UInt8('\n')), v)) if isnothing(findfirst(==(UInt8('\r')), v)) - throw(ArgumentError("No newline detected. Specify the newline character explicitly via the `newline` keyword argument. Use `\n` even for CRLF.")) + return UInt8('\n') else return UInt8('\r') end @@ -74,15 +74,20 @@ _startswith(s::AbstractVector{UInt8}, prefix::AbstractVector{UInt8}) = _startswi _startswith(s, soff, prefix::Nothing) = false _startswith(s, prefix::Nothing) = false -skip_rows_init!(lexer, chunking_ctx, rows_to_skip) = _skip_rows_init!(lexer, chunking_ctx, rows_to_skip, chunking_ctx.comment) - -function _skip_rows_init!(lexer, chunking_ctx, rows_to_skip, comment) +function skip_rows_init!(lexer, chunking_ctx, rows_to_skip, ignoreemptyrows=false) input_is_empty = length(chunking_ctx.newline_positions) == 1 - lines_skipped_total = 0 - input_is_empty && return lines_skipped_total - # To know where in the end-of-line buffer we are while deciding whether we can skip or - # if we need to refill the buffer because we skipped everything in it. + input_is_empty && (return 0) + skipped_rows = _skip_rows_init_fast!(lexer, chunking_ctx, rows_to_skip) + if chunking_ctx.comment !== nothing || ignoreemptyrows + skipped_rows += _skip_comments_and_empty_rows!(lexer, chunking_ctx, chunking_ctx.comment, ignoreemptyrows) + end + return skipped_rows +end + +function _skip_comments_and_empty_rows!(lexer, chunking_ctx, comment, ignoreemptyrows) + rows_skipped = 0 eol_index = 1 + prev_nl = @inbounds chunking_ctx.newline_positions[eol_index] @inbounds while true # Did we exhaust the buffer during skipping? if eol_index == length(chunking_ctx.newline_positions) @@ -90,33 +95,28 @@ function _skip_rows_init!(lexer, chunking_ctx, rows_to_skip, comment) break else ChunkedBase.read_and_lex!(lexer, chunking_ctx) + length(chunking_ctx.newline_positions) == 1 && return rows_skipped eol_index = 1 + prev_nl = chunking_ctx.newline_positions[eol_index] end end - - if !_startswith(chunking_ctx.bytes, chunking_ctx.newline_positions[eol_index], comment) - # Non commented row: if we still have rows to skip, we skip it, otherwise we're done - if rows_to_skip > 0 - rows_to_skip -= 1 - else - break - end - # Commented row: we skip it if we still have rows to skip, otherwise we move to another row. - # This means that if there are consecutive commented rows after the rows to skip, we'll - # skip them all. - else rows_to_skip > 0 - rows_to_skip -= 1 + nl = chunking_ctx.newline_positions[eol_index+1] + + if (_startswith(chunking_ctx.bytes, prev_nl, comment) || (ignoreemptyrows && _isemptyrow(prev_nl, nl, chunking_ctx.bytes))) + eol_index += 1 + rows_skipped += 1 + prev_nl = nl + else # not a commented or empty row, we're done + break end - eol_index += 1 - lines_skipped_total += 1 end # We need to shift the newline positions to the left to account for the skipped rows shiftleft!(chunking_ctx.newline_positions, eol_index-1) - return lines_skipped_total + return rows_skipped end -function _skip_rows_init!(lexer, chunking_ctx, rows_to_skip, comment::Nothing) +function _skip_rows_init_fast!(lexer, chunking_ctx, rows_to_skip) # If there are more rows to skip than the number of rows in the buffer, we skip the whole buffer while !lexer.done && rows_to_skip >= length(chunking_ctx.newline_positions) - 1 rows_to_skip -= length(chunking_ctx.newline_positions) - 1 @@ -139,15 +139,15 @@ mutable struct MmapStream <: IO end MmapStream(ios::IO) = MmapStream(ios, Mmap.mmap(ios, grow=false, shared=false), 1) Base.close(m::MmapStream) = close(m.ios) -Base.eof(m::MmapStream) = m.pos == length(m.x) +Base.eof(m::MmapStream) = m.pos > length(m.x) function readbytesall!(io::MmapStream, buf, n::Int) bytes_to_read = min(bytesavailable(io), n) unsafe_copyto!(pointer(buf), pointer(io.x) + io.pos - 1, bytes_to_read) io.pos += bytes_to_read return bytes_to_read end +Base.bytesavailable(m::MmapStream) = length(m.x) - m.pos + 1 # Interop with GzipDecompressorStream -Base.bytesavailable(m::MmapStream) = length(m.x) - m.pos Base.isopen(m::MmapStream) = isopen(m.ios) && !eof(m) Base.filesize(io::MmapStream) = length(io.x) function Base.unsafe_read(from::MmapStream, p::Ptr{UInt8}, nb::UInt) @@ -165,6 +165,7 @@ function Base.read(io::MmapStream, ::Type{UInt8}) if avail == 0 throw(EOFError()) end + out = @inbounds io.x[io.pos] io.pos += 1 - return io.x[io.pos] + return out end diff --git a/test/e2e_tests.jl b/test/e2e_tests.jl index b2ed82b..dfc1832 100644 --- a/test/e2e_tests.jl +++ b/test/e2e_tests.jl @@ -2,13 +2,12 @@ using ChunkedBase using SentinelArrays: BufferedVector using Test using Random - +using CodecZlibNG # ParsingContext ########################################################################### struct TestParsingContext <: AbstractParsingContext end - # ResultBuffer ############################################################################# struct TestResultBuffer <: AbstractResultBuffer @@ -26,7 +25,6 @@ function ChunkedBase.populate_result_buffer!(result_buf::TestResultBuffer, newli return nothing end - # ConsumeContext 1 ######################################################################## ### TestConsumeContext ##################################################################### struct TestConsumeContext{B,C} <: AbstractConsumeContext @@ -118,6 +116,18 @@ function test_parallel(io; buffersize=8*1024, nworkers=2, limit=0, comment=nothi end @testset "e2e newline finder" begin + q, counter = test_serial(IOBuffer("")) + @test take!(q).results.newline_positions == Int32[0] + ChunkedBase.dec!(counter) + @test counter.n == 0 + @test Base.n_avail(q.queue) == 0 + + q, counter = test_parallel(IOBuffer("")) + @test take!(q).results.newline_positions == Int32[0] + ChunkedBase.dec!(counter) + @test counter.n == 0 + @test Base.n_avail(q.queue) == 0 + q, counter = test_serial(IOBuffer("123456")) @test take!(q).results.newline_positions == Int32[0, 7] ChunkedBase.dec!(counter) @@ -261,6 +271,49 @@ end @test_throws ChunkedBase.UnmatchedQuoteError test_parallel(IOBuffer("\"ab"), buffersize=4, lexer_args=('\\', '"', '"')) end +@testset "MmapStream" begin + mktemp() do path, io + write(io, "123\n45\n67\n123\n45\n67") + flush(io) + seekstart(io) + q, counter = test_parallel(ChunkedBase.MmapStream(io), buffersize=4, skipto=5) + @test take!(q).results.newline_positions == Int32[0, 3] + ChunkedBase.dec!(counter) + @test counter.n == 0 + @test Base.n_avail(q.queue) == 0 + end + + mktemp() do path, io + compr_io = GzipCompressorStream(io) + write(compr_io, "123\n45\n67\n123\n45\n67") + close(compr_io) + q, counter = test_parallel(GzipDecompressorStream(ChunkedBase.MmapStream(open(path, "r"))), buffersize=4, skipto=5) + @test take!(q).results.newline_positions == Int32[0, 3] + ChunkedBase.dec!(counter) + @test counter.n == 0 + @test Base.n_avail(q.queue) == 0 + end + + mktemp() do path, io + write(io, "123\n") + flush(io) + seekstart(io) + mm = ChunkedBase.MmapStream(io) + @test read(mm, UInt8) == UInt8('1') + @test read(mm, UInt8) == UInt8('2') + @test filesize(mm) == 4 + @test isopen(mm) + @test !eof(mm) + @test read(mm, UInt8) == UInt8('3') + @test read(mm, UInt8) == UInt8('\n') + @test eof(mm) + @test isopen(mm.ios) + @test !isopen(mm) + close(mm) + @test !isopen(mm.ios) + end +end + @testset "Exception handling" begin @testset "consume!" begin @testset "serial" begin diff --git a/test/runtests.jl b/test/runtests.jl index ff829b4..dd743c2 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -3,6 +3,7 @@ using ChunkedBase using SentinelArrays.BufferedVectors using NewlineLexers +@testset "ChunkedBase" begin @testset "API basics" begin @testset "defaults" begin @@ -57,7 +58,6 @@ end end end - @testset "handle_file_end" begin # Lexer{Nothing,Nothing,Nothing} cannot end on a string l = NewlineLexers.Lexer(IOBuffer(), nothing) @@ -128,7 +128,7 @@ end function _get_ctx(; last_newline_at, newlines_num, buffersize, nworkers) eols = zeros(Int32, newlines_num) eols[end] = last_newline_at - return ChunkingContext(1, ChunkedBase.TaskCounter(), BufferedVector(eols, newlines_num), zeros(UInt8, buffersize), nworkers, 0, nothing) + return ChunkingContext(1, ChunkedBase.TaskCounter(), BufferedVector(eols, newlines_num), zeros(UInt8, buffersize), nworkers, 0, nothing, Ref(0)) end # Empty input (only 0 as end of line) -> return 1 ctx = _get_ctx(; last_newline_at=0, newlines_num=1, buffersize=2*16*1024, nworkers=4) @@ -144,7 +144,7 @@ end ctx = _get_ctx(; last_newline_at=100000, newlines_num=100000, buffersize=100000, nworkers=3) @test ChunkedBase.estimate_task_size(ctx) == 33334 - # Each task should be at least 16KiB (ChunkedBase.MIN_TASK_SIZE_IN_BYTES) worht of data to work on + # Each task should be at least 16KiB (ChunkedBase.MIN_TASK_SIZE_IN_BYTES) worth of data to work on ctx = _get_ctx(; last_newline_at=100000, newlines_num=100000, buffersize=100000, nworkers=10) @test ChunkedBase.estimate_task_size(ctx) == 16*1024 @@ -290,88 +290,123 @@ end end @testset "skip_rows_init!" begin - function test_skip_rows_init(data, buffersize, rows_to_skip, expected_num_skipped, comment=nothing) + function test_skip_rows_init(data, buffersize, rows_to_skip, expected_num_skipped, comment=nothing, ignoreemptyrows=false) lexer = NewlineLexers.Lexer(IOBuffer(data), nothing, UInt8('\n')) ctx = ChunkingContext(buffersize, 1, 0, comment) ChunkedBase.read_and_lex!(lexer, ctx, 0) - @test ChunkedBase.skip_rows_init!(lexer, ctx, rows_to_skip) == expected_num_skipped + @test ChunkedBase.skip_rows_init!(lexer, ctx, rows_to_skip, ignoreemptyrows) == expected_num_skipped return ctx end for buffersize in (4, 8) - ctx = test_skip_rows_init("aaaa", buffersize, 0, 0) - @test ctx.newline_positions == [0, 5] - ctx = test_skip_rows_init("aaaa", buffersize, 1, 1) - @test ctx.newline_positions == [5] - ctx = test_skip_rows_init("aaaa", buffersize, 2, 1) - @test ctx.newline_positions == [5] + for ignoreemptyrows in (true, false) + for comment in ('#', nothing) + ctx = test_skip_rows_init("aaaa", buffersize, 0, 0, comment, ignoreemptyrows) + @test ctx.newline_positions == [0, 5] + ctx = test_skip_rows_init("aaaa", buffersize, 1, 1, comment, ignoreemptyrows) + @test ctx.newline_positions == [5] + ctx = test_skip_rows_init("aaaa", buffersize, 2, 1, comment, ignoreemptyrows) + @test ctx.newline_positions == [5] + end + end end for buffersize in (5, 10) - ctx = test_skip_rows_init("aaaa\n", buffersize, 1, 1) - @test ctx.newline_positions == [5] - ctx = test_skip_rows_init("aaaa\n", buffersize, 2, 1) - @test ctx.newline_positions == [5] + for ignoreemptyrows in (true, false) + for comment in ('#', nothing) + ctx = test_skip_rows_init("aaaa\n", buffersize, 1, 1, comment, ignoreemptyrows) + @test ctx.newline_positions == [5] + ctx = test_skip_rows_init("aaaa\n", buffersize, 2, 1, comment, ignoreemptyrows) + @test ctx.newline_positions == [5] + end + end end for buffersize in (9, 10) - ctx = test_skip_rows_init("aaaa\nbbbb", buffersize, 0, 0) - @test ctx.newline_positions == [0, 5, 10] - ctx = test_skip_rows_init("aaaa\nbbbb", buffersize, 1, 1) - @test ctx.newline_positions == [5, 10] - ctx = test_skip_rows_init("aaaa\nbbbb", buffersize, 2, 2) - @test ctx.newline_positions == [10] + for ignoreemptyrows in (true, false) + for comment in ('#', nothing) + ctx = test_skip_rows_init("aaaa\nbbbb", buffersize, 0, 0, comment, ignoreemptyrows) + @test ctx.newline_positions == [0, 5, 10] + ctx = test_skip_rows_init("aaaa\nbbbb", buffersize, 1, 1, comment, ignoreemptyrows) + @test ctx.newline_positions == [5, 10] + ctx = test_skip_rows_init("aaaa\nbbbb", buffersize, 2, 2, comment, ignoreemptyrows) + @test ctx.newline_positions == [10] + end + end end for buffersize in (10, 15) - ctx = test_skip_rows_init("aaaa\nbbbb\n", buffersize, 0, 0) - @test ctx.newline_positions == [0, 5, 10] - ctx = test_skip_rows_init("aaaa\nbbbb\n", buffersize, 1, 1) - @test ctx.newline_positions == [5, 10] - ctx = test_skip_rows_init("aaaa\nbbbb\n", buffersize, 2, 2) - @test ctx.newline_positions == [10] - ctx = test_skip_rows_init("aaaa\nbbbb\n", buffersize, 3, 2) - @test ctx.newline_positions == [10] + for ignoreemptyrows in (true, false) + for comment in ('#', nothing) + ctx = test_skip_rows_init("aaaa\nbbbb\n", buffersize, 0, 0, comment, ignoreemptyrows) + @test ctx.newline_positions == [0, 5, 10] + ctx = test_skip_rows_init("aaaa\nbbbb\n", buffersize, 1, 1, comment, ignoreemptyrows) + @test ctx.newline_positions == [5, 10] + ctx = test_skip_rows_init("aaaa\nbbbb\n", buffersize, 2, 2, comment, ignoreemptyrows) + @test ctx.newline_positions == [10] + ctx = test_skip_rows_init("aaaa\nbbbb\n", buffersize, 3, 2, comment, ignoreemptyrows) + @test ctx.newline_positions == [10] + end + end end for buffersize in (9, 10) - ctx = test_skip_rows_init("aaaa\nbbbb", buffersize, 0, 0, '#') - @test ctx.newline_positions == [0, 5, 10] - ctx = test_skip_rows_init("#aaa\nbbbb", buffersize, 1, 1, '#') - @test ctx.newline_positions == [5, 10] - ctx = test_skip_rows_init("#aaa\n#bbb", buffersize, 1, 2, '#') - @test ctx.newline_positions == [10] - ctx = test_skip_rows_init("#aaa\nbbbb", buffersize, 2, 2, '#') - @test ctx.newline_positions == [10] - ctx = test_skip_rows_init("#aaa\n#bbb", buffersize, 2, 2, '#') - @test ctx.newline_positions == [10] + for ignoreemptyrows in (true, false) + ctx = test_skip_rows_init("aaaa\nbbbb", buffersize, 0, 0, '#', ignoreemptyrows) + @test ctx.newline_positions == [0, 5, 10] + ctx = test_skip_rows_init("#aaa\nbbbb", buffersize, 1, 1, '#', ignoreemptyrows) + @test ctx.newline_positions == [5, 10] + ctx = test_skip_rows_init("#aaa\n#bbb", buffersize, 1, 2, '#', ignoreemptyrows) + @test ctx.newline_positions == [10] + ctx = test_skip_rows_init("#aaa\nbbbb", buffersize, 2, 2, '#', ignoreemptyrows) + @test ctx.newline_positions == [10] + ctx = test_skip_rows_init("#aaa\n#bbb", buffersize, 2, 2, '#', ignoreemptyrows) + @test ctx.newline_positions == [10] + end end for buffersize in (10, 15) - ctx = test_skip_rows_init("aaaa\nbbbb\n", buffersize, 0, 0, '#') - @test ctx.newline_positions == [0, 5, 10] - ctx = test_skip_rows_init("aaaa\n#bbb\n", buffersize, 1, 2, '#') - @test ctx.newline_positions == [10] - ctx = test_skip_rows_init("aaaa\n#bbb\n", buffersize, 2, 2, '#') - @test ctx.newline_positions == [10] - ctx = test_skip_rows_init("aaaa\n#bbb\n", buffersize, 3, 2, '#') - @test ctx.newline_positions == [10] + for ignoreemptyrows in (true, false) + ctx = test_skip_rows_init("aaaa\nbbbb\n", buffersize, 0, 0, '#', ignoreemptyrows) + @test ctx.newline_positions == [0, 5, 10] + ctx = test_skip_rows_init("aaaa\n#bbb\n", buffersize, 1, 2, '#', ignoreemptyrows) + @test ctx.newline_positions == [10] + ctx = test_skip_rows_init("aaaa\n#bbb\n", buffersize, 2, 2, '#', ignoreemptyrows) + @test ctx.newline_positions == [10] + ctx = test_skip_rows_init("aaaa\n#bbb\n", buffersize, 3, 2, '#', ignoreemptyrows) + @test ctx.newline_positions == [10] + end end for buffersize in (4, 8) - ctx = test_skip_rows_init("#aaa", buffersize, 0, 1, '#') - @test ctx.newline_positions == [5] - ctx = test_skip_rows_init("#aaa", buffersize, 1, 1, '#') - @test ctx.newline_positions == [5] - ctx = test_skip_rows_init("#aaa", buffersize, 2, 1, '#') - @test ctx.newline_positions == [5] + for ignoreemptyrows in (true, false) + ctx = test_skip_rows_init("#aaa", buffersize, 0, 1, '#', ignoreemptyrows) + @test ctx.newline_positions == [5] + ctx = test_skip_rows_init("#aaa", buffersize, 1, 1, '#', ignoreemptyrows) + @test ctx.newline_positions == [5] + ctx = test_skip_rows_init("#aaa", buffersize, 2, 1, '#', ignoreemptyrows) + @test ctx.newline_positions == [5] + end end for buffersize in (5, 10) - ctx = test_skip_rows_init("#aaa\n", buffersize, 1, 1, '#') - @test ctx.newline_positions == [5] - ctx = test_skip_rows_init("#aaa\n", buffersize, 2, 1, '#') - @test ctx.newline_positions == [5] + for ignoreemptyrows in (true, false) + ctx = test_skip_rows_init("#aaa\n", buffersize, 1, 1, '#', ignoreemptyrows) + @test ctx.newline_positions == [5] + ctx = test_skip_rows_init("#aaa\n", buffersize, 2, 1, '#', ignoreemptyrows) + @test ctx.newline_positions == [5] + end + end + + for comment in (nothing, '#') + ctx = test_skip_rows_init("123\n\n\n\n\n\n\n\n1\n", 4, 1, 7, comment, true) + @test ctx.newline_positions == [0, 2] end -end + ctx = test_skip_rows_init("123\n\n#1\n\n#1\n1\n", 4, 1, 4, '#', true) + @test ctx.newline_positions == [0, 2] + ctx = test_skip_rows_init("123\n\n\n\n\n", 4, 1, 0, '#', false) + @test ctx.newline_positions == [0,1,2,3,4] + ctx = test_skip_rows_init("123\n\n\n\n\n", 4, 1, 4, '#', true) + @test ctx.newline_positions == [4] +end @testset "prepare_buffer!" begin buf = zeros(UInt8, 10) @@ -471,7 +506,6 @@ end @test_throws ChunkedBase.NoValidRowsInBufferError ChunkedBase.initial_lex!(lexer, ctx, 4) end - @testset "_detect_newline" begin s = b"\n" @test ChunkedBase._detect_newline(s, 1, length(s)) == UInt8('\n') @@ -482,7 +516,8 @@ end s = b"\r" @test ChunkedBase._detect_newline(s, 1, length(s)) == UInt8('\r') - @test_throws ArgumentError ChunkedBase._detect_newline(b"a", 1, 1) + @test ChunkedBase._detect_newline(b"a", 1, 1) == UInt8('\n') + @test ChunkedBase._detect_newline(b"", 1, 0) == UInt8('\n') # empty file s = b"a,b,c\ne,f,g\r" @@ -504,8 +539,29 @@ end @test ChunkedBase._detect_newline(s, 5, 8) == UInt8('\r') end +@testset "_input_to_io" begin + (path, io) = mktemp() + should_close, ret_io = ChunkedBase._input_to_io(io, false) + @test !should_close + @test io === ret_io + + should_close, ret_io = ChunkedBase._input_to_io(io, true) + @test !should_close + @test io === ret_io + + should_close, ret_io = ChunkedBase._input_to_io(path, false) + @test should_close + @test ret_io isa IOStream + + should_close, ret_io = ChunkedBase._input_to_io(path, true) + @test should_close + @test ret_io isa ChunkedBase.MmapStream +end + include("e2e_tests.jl") +end + #= using Coverage using ChunkedBase