diff --git a/README.md b/README.md index e5aa95a..3ac490b 100644 --- a/README.md +++ b/README.md @@ -1 +1,146 @@ # ChunkedBase.jl + +The package handles the ingestion of data chunks and the distribution & synchronization of work that happens on these chunks in parallel. It came into existence while refactoring the [`ChunkedCSV.jl`](https://github.com/RelationalAI/ChunkedCSV.jl) and [`ChunkedJSONL.jl`](https://github.com/RelationalAI/ChunkedJSONL.jl) packages and was designed to be extended by packages like these. It is a package used to write parser packages. + +## Overview + +The purpose of the package is to provide a framework for creating parsers for textual formats, where each record is separated by a newline character. Examples of such formats include CSV and JSONLines. The package is designed to operate on data chunks and to reuse preallocated buffers for each of these chunks. It simplifies the process of ingesting data, locating newlines, distributing work among multiple tasks, and synchronizing them. To implement a parsing package, one only needs to define how to *parse* the records and how to *consume* the parsed results. + +Parsing refers to taking the raw input bytes and the position of newlines that will be available to the user, and using a package like `Parsers.jl` or `JSON3.jl`, or some custom parsing code to produce records of Julia types that could be stored in user-defined result buffers, which can be customized e.g for row-oriented or column-oriented storage. + +Consuming refers to taking the parsed results and doing something with them, such as inserting them into a database, appending them to a `DataFrame`, or writing them to a file. + +## How it works + +Internally, `ChunkedBase.jl` uses a coordinator task and a set number of worker tasks to process data. The coordinator task is responsible for handling IO operations and acts as a coordinator for the worker tasks. It reads bytes from an input buffer and uses the [`NewlineLexers.jl`](https://github.com/JuliaData/NewlineLexers.jl) package to identify newlines in the data. Since the data is delimited by newlines, these newlines are used to split up the work among the worker tasks. + +The coordinator task alternates between two buffers, processing one while the workers process the other. This ensures that there is always data available for the workers to consume, which maximizes throughput across all tasks. We call this technique double-buffering. + +The process looks something like this, with the coordinator task at the bottom: + +| ![Diagram](/docs/diagrams/chunked_base.png) | +|:--:| +| *The coordinator uses a counter behind a mutex to synchronize with the workers. There is one such counter per buffer, and the counter is incremented by N after the coordinator splits the newlines into N segments and distributes them. After distributing the work, the coordinator starts to process the second chunk of data while the first buffer is still being worked on. A handoff occurs between the two buffers, where the bytes after the last newline from the first buffer are copied to the second. Each worker decrements the counter after completing the `consume!` operation, and the coordinator waits for the counter to reach 0 before overwriting the buffer with new data.* | + +Packages like `ChunkedCSV.jl` and `ChunkedJSONL.jl` hook into this structure by defining their own `populate_result_buffer!` methods that parse the records they were assigned into their custom `Result` buffers which are then handed to the `consume!` method (e.g. to be inserted into a database). + +The main entry point of this package is the `parse_file_parallel` function, which accepts several "context" arguments, each controlling a different aspect of the process: +```julia +function parse_file_parallel( + lexer::Lexer, + parsing_ctx::AbstractParsingContext, # user-defined + consume_ctx::AbstractConsumeContext, # user-defined + chunking_ctx::ChunkingContext, + result_buffers::Vector{<:AbstractResultBuffer}, # user-defined + ::Type{CT}=Tuple{} # ignore this for now +) where {CT} +``` +Let's break it down: +* `lexer` controls how we find newlines in the ingested chunks of data. Newlines serve as record delimiters and knowing their positions allows us to split work safely among multiple workers, which are spawned internally. `Lexer`s are defined in the `NewlineLexers.jl` package. +* `parsing_ctx` controls how we parse the data. It allows the user to dispatch on custom `populate_result_buffer!` overload and to forward configurations to it. `populate_result_buffer!` is where we take the records identified by the `lexer` and parse them into `result_buffers`. +* `consume_ctx` controls how the parsed results are consumed (e.g. inserted into a database, appended to a `DataFrame`...). `consume_ctx` allows the user to dispatch on their `consume!` method and hold any state necessary for consumption. This happens immediately after `populate_result_buffer!`. +* `chunking_ctx` controls how the work on individual chunks of data is scheduled. It contains buffers for input bytes and found newlines. Through this struct the user controls the size of the chunks and the number of spawned tasks that carry out the parsing and consuming. If there is enough data in the input, a secondary `chunking_ctx` is created internally to facilitate the double-buffering described above. +* `result_buffers` controls in which format the results are stored. These result buffers hold results from `populate_result_buffer!` and are passed to `consume!`. This allows the user to have multiple result formats for the same `parsing_ctx` e.g. row-oriented vs column-oriented buffers. + +There is also `parse_file_serial` which doesn't spawn any tasks and just calls `populate_result_buffer!` and `consume!` sequentially without double-buffering. This can be useful for debugging or for small files. + +See the docstring of `populate_result_buffer!` and `consume!` for more information about how to integrate with them. + +## Example: Examining the results of the Lexer + +Since `ChunkedBase.jl` handles newline detection automatically, a very simple chunked processor could just show the newline positions to the user. Of course, this is not very useful apart from demonstrating how to use this package. + +```julia +using ChunkedBase + +# Our chunked processor doesn't require any settings nor does it need to maintain +# any additional state, so we'll define our `ParsingContext` and `ConsumeContext` only +# for dispatch reasons. +struct ParsingContext <: AbstractParsingContext end +struct ConsumeContext <: AbstractConsumeContext end +# Our buffer will hold the newline positions in the current chunk +struct ResultBuffer <: AbstractResultBuffer + newlines_in_chunk::Vector{Int} +end + +# Our overload copies `newlines_segment` to our result buffer +function ChunkedBase.populate_result_buffer!( + result_buf::ResultBuffer, + newlines_segment::AbstractVector{Int32}, + ::ParsingContext, + ::Vector{UInt8}, + ::Union{Nothing,Vector{UInt8}}=nothing, + ::Type=Tuple{} +) + resize!(result_buf.newlines_in_chunk, length(newlines_segment)) + result_buf.newlines_in_chunk .= newlines_segment + return nothing +end + +# We consume result buffer by simply printing it +function ChunkedBase.consume!(::ConsumeContext, payload::ParsedPayload) + # The ParsedPayload wraps the result buffer and the other contexts + chunking_ctx = payload.chunking_ctx + result_buffer = payload.results + + # To demonstrate how double-buffering works, print the buffer id and the refill number + chunk_info = (chunking_ctx.id, chunking_ctx.buffer_refills[]) + @info "Newlines in chunk (id:$chunk_info): $(result_buffer.newlines_in_chunk)" + return nothing +end +``` + +Now that we defined all overloads and contexts we can define our user-facing function: + +```julia +# Prints relative positions of `\n` in each ingested chunk of size `buffersize`, +# using `nworkers` spawned tasks. +function print_newlines(io, buffersize, nworkers) + lexer = Lexer(io, nothing, '\n') + parsing_ctx = ParsingContext() + consume_ctx = ConsumeContext() + chunking_ctx = ChunkingContext(buffersize, nworkers) + # ChunkedBase.jl requires 2 result buffers per worker task, we'd get an error otherwise + result_buffers = [ResultBuffer(Int[]) for _ in 1:2nworkers] + + parse_file_parallel(lexer, parsing_ctx, consume_ctx, chunking_ctx, result_buffers) + return nothing +end +``` +Let's run it on some data: +```julia +julia> io = IOBuffer((("x" ^ 4095) * "\n") ^ 64); # 256KiB +julia> print_newlines(io, 64 * 1024, 4); +# [ Info: Newlines in chunk (id:(1, 1)): [16384, 20480, 24576, 28672, 32768, 36864] +# [ Info: Newlines in chunk (id:(1, 1)): [0, 4096, 8192, 12288, 16384] +# [ Info: Newlines in chunk (id:(1, 1)): [36864, 40960, 45056, 49152, 53248, 57344] +# [ Info: Newlines in chunk (id:(1, 1)): [57344, 61440, 65536] +# [ Info: Newlines in chunk (id:(2, 1)): [16384, 20480, 24576, 28672, 32768, 36864] +# [ Info: Newlines in chunk (id:(2, 1)): [0, 4096, 8192, 12288, 16384] +# [ Info: Newlines in chunk (id:(2, 1)): [36864, 40960, 45056, 49152, 53248, 57344] +# [ Info: Newlines in chunk (id:(2, 1)): [57344, 61440, 65536] +# [ Info: Newlines in chunk (id:(1, 2)): [0, 4096, 8192, 12288, 16384] +# [ Info: Newlines in chunk (id:(1, 2)): [36864, 40960, 45056, 49152, 53248, 57344] +# [ Info: Newlines in chunk (id:(1, 2)): [16384, 20480, 24576, 28672, 32768, 36864] +# [ Info: Newlines in chunk (id:(1, 2)): [57344, 61440, 65536] +# [ Info: Newlines in chunk (id:(2, 2)): [0, 4096, 8192, 12288, 16384] +# [ Info: Newlines in chunk (id:(2, 2)): [16384, 20480, 24576, 28672, 32768, 36864] +# [ Info: Newlines in chunk (id:(2, 2)): [36864, 40960, 45056, 49152, 53248, 57344] +# [ Info: Newlines in chunk (id:(2, 2)): [57344, 61440, 65536] +``` +Behind the scenes, `ChunkedBase.jl` was using two 64KiB buffers, finding newlines in them, and splitting the found newlines among 4 tasks. We can see that each of the buffers (identified by the first number in the `id` tuple) was refilled two times (the refill number is the second element of the tuple). +The way we set up our data, there should be one newline every 4KiB of input, so we'd expect 16 newlines per chunk, but we could see that there are 20 numbers reported per chunk -- this is because each newline segment we send to the tasks starts with the last newline position from the previous segment or 0 for the first segment, so we get 4 duplicated elements in this case. + +## Advanced usage + +### Customizing work coordination + +Internally, the coordination of work happens through `setup_tasks!`, `task_done!` and `sync_tasks` functions which are defined in the `ConsumeContexts.jl` file. `setup_tasks!` sets the counter for the number of units of work that are expected to happen on the current chunk. `task_done!` is called by the worker tasks when they are done with their work, which will decrement the counter. `sync_tasks` is called by the coordinator task to wait for all workers to finish their work, i.e. for the counter to be 0. These functions are exposed to the user through the `AbstractConsumeContext` interface, so they can be overloaded to implement custom synchronization strategies. For example, if the user wants to send the parsed result buffers to external tasks from their `consume!` method and wait for *them* to be done with their work, they can overload `setup_tasks!` to increase the expected number of units of work and then call `task_done!` from the external tasks when they are done. This way, the coordinator task will wait for the external tasks to finish their work before refilling the buffer with new data. See the docstrings of these functions for more information. + +### Sniffing the beginning of the file + +Sometimes we want to skip over the first couple of lines of a file, e.g. when they contain comments or metadata, or we might want to set up our `AbstractParsingContext` with some information that is available at the beginning of the file (like the header names of a CSV). + +To do this, we can use fill our `ChunkingContext` with `read_and_lex!` which will read the first `buffersize` bytes from the input and lex them so that we can inspect the newlines and decide what to do with the file. We can use `skip_rows_init!` to skip over rows that we don't want to parse, and then use `parse_file_parallel` to parse the rest of the file, so our `populate_result_buffers!` method can focus on the "cleaner" part of the file. + +Instead of `read_and_lex!`, one could also call `initial_read!` and `initial_lex!` separately. This gives you the opportunity to detect which newline type is used in the file and set up your lexer accordingly. diff --git a/docs/diagrams/chunked_base.png b/docs/diagrams/chunked_base.png new file mode 100644 index 0000000..aeae42a Binary files /dev/null and b/docs/diagrams/chunked_base.png differ diff --git a/src/ChunkedBase.jl b/src/ChunkedBase.jl index e31b120..4a790b4 100644 --- a/src/ChunkedBase.jl +++ b/src/ChunkedBase.jl @@ -6,103 +6,39 @@ using CodecZlibNG using NewlineLexers using SentinelArrays.BufferedVectors -const MIN_TASK_SIZE_IN_BYTES = 16 * 1024 +# The means through which the user can provide their own parsing logic. +include("ParsingContexts.jl") -# populate_result_buffer!(result_buf::AbstractResultBuffer, newlines::AbstractVector{Int32}, parsing_ctx::AbstractParsingContext, comment::Union{Nothing,Vector{UInt8}}=nothing, ::Type{CT}=Tuple{}) where {CT} -function populate_result_buffer! end - -abstract type AbstractParsingContext end - -# Synchronization mechanism -- after we lexed all rows, we split them in N tasks and TaskCounter -# in ChunkingContext will block the io/lexer to overwrite the current chunk unless workers -# report back N times that they are done with their tasks. +# A counter-based synchronization primitive that is used to coordinate the parsing/consuming tasks. include("TaskCounters.jl") using .TaskCounters -_comment_to_bytes(x::AbstractString) = Vector{UInt8}(x) -_comment_to_bytes(x::Char) = _comment_to_bytes(ncodeunits(x) > 1 ? string(x) : UInt8(x)) -_comment_to_bytes(x::UInt8) = [x] -_comment_to_bytes(x::Vector{UInt8}) = x -_comment_to_bytes(::Nothing) = nothing -struct ChunkingContext - id::Int - counter::TaskCounter - newline_positions::BufferedVector{Int32} - bytes::Vector{UInt8} - nworkers::Int - limit::Int - comment::Union{Nothing,Vector{UInt8}} - # combination on `id` and `buffer_refills` can be used to detect if any pointers to `bytes` are still valid - buffer_refills::Base.RefValue{Int} -end -function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer, comment::Union{Nothing,UInt8,String,Char,Vector{UInt8}}) - (4 <= buffersize <= typemax(Int32)) || throw(ArgumentError("`buffersize` argument must be larger than 4 and smaller than 2_147_483_648 bytes.")) - (0 < nworkers < 256) || throw(ArgumentError("`nworkers` argument must be larger than 0 and smaller than 256.")) - (0 <= limit <= typemax(Int)) || throw(ArgumentError("`limit` argument must be positive and smaller than 9_223_372_036_854_775_808.")) - # TRACING # clear_traces!(nworkers) - return ChunkingContext( - 1, - TaskCounter(), - BufferedVector{Int32}(Int32[0], 1), - Vector{UInt8}(undef, buffersize), - nworkers, - limit, - _comment_to_bytes(comment), - Ref(0), - ) -end -function ChunkingContext(ctx::ChunkingContext) - out = ChunkingContext( - ctx.id + 1, - TaskCounter(), - BufferedVector{Int32}(Vector{Int32}(undef, max(1, length(ctx.newline_positions))), 1), - similar(ctx.bytes), - ctx.nworkers, - ctx.limit, - ctx.comment, - Ref(0), - ) - out.newline_positions.elements[1] = 0 - return out -end -tasks_per_chunk(ctx::ChunkingContext) = ctx.nworkers -total_result_buffers_count(ctx::ChunkingContext) = 2tasks_per_chunk(ctx) -last_newline_at(ctx::ChunkingContext) = Int(last(ctx.newline_positions)) -function should_use_parallel(ctx::ChunkingContext, _force) - return !( - _force === :serial || - ((_force !== :parallel) && (Threads.nthreads() == 1 || ctx.nworkers == 1 || last_newline_at(ctx) < MIN_TASK_SIZE_IN_BYTES)) - ) -end - -# We split the detected newlines equally among thr nworkers parsing tasks, but each -# unit of work should contain at least 16 KiB of raw bytes (MIN_TASK_SIZE_IN_BYTES). -function estimate_task_size(ctx::ChunkingContext) - eols = ctx.newline_positions - length(eols) == 1 && return 1 # empty file - bytes_to_parse = last(eols) - rows = length(eols) # actually rows + 1 - buffersize = length(ctx.bytes) - # There are 2*nworkers result buffers total, but there are nworkers tasks per chunk - prorated_maxtasks = ceil(Int, tasks_per_chunk(ctx) * (bytes_to_parse / buffersize)) - # Lower bound is 2 because length(eols) == 2 => 1 row - # bump min rows if average row is much smaller than MIN_TASK_SIZE_IN_BYTES - min_rows = max(2, cld(MIN_TASK_SIZE_IN_BYTES, cld(bytes_to_parse, rows))) - return min(max(min_rows, cld(rows, prorated_maxtasks)), rows) -end - -abstract type AbstractResultBuffer end +# The `ChunkingContext` is used to keep track of the current chunk of data being processed +# and to coordinate with the parsing/consuming tasks. Two `ChunkingContext` objects are used +# to double-buffer the input. +include("ChunkingContext.jl") +# The `ParsedPayload` is used to pass the results of parsing to the `consume!` method. All relevant +# information about the current chunk of data being processed is passed to `consume!` via `ParsedPayload`. +# In case the parsed results need to be consumed in order, the `PayloadOrderer` can be used to +# sort the payloads. include("payload.jl") +# Consuming is the means through which the parsed results are used by the end user. +include("ConsumeContexts.jl") +using .ConsumeContexts + +# By lexing the input in advance we can detect that we got to an inconsistent state +# (e.g. an unmatched quote at the end of the file) in which case we throw one of these exceptions. include("exceptions.jl") + +# Utilities for the coordinator task to handle the input and the newline positions. include("read_and_lex_utils.jl") include("read_and_lex.jl") -include("ConsumeContexts.jl") -using .ConsumeContexts -include("parser_serial.jl") +# The main entrypoints of the library. include("parser_parallel.jl") +include("parser_serial.jl") export ChunkingContext, tasks_per_chunk, total_result_buffers_count export AbstractParsingContext @@ -114,6 +50,14 @@ export SkipContext export Lexer export parse_file_serial, parse_file_parallel, populate_result_buffer! + +# By uncommenting all `# TRACING #` in the package, e.g. by turning them to `#= ... =#`, +# you'll enable low-overhead tracing capability. +# Before parsing, call `clear_traces!` to reset the traces, then call +# include("_tracing.jl") # once +# plot_traces() # to plot the traces +# TODO(#9): Port this over to be macro-based, with plotting being provided by a package extension + # TRACING # const PARSER_TASKS_TIMES = [UInt[]] # TRACING # const CONSUMER_TASKS_TIMES = [UInt[]] # TRACING # const IO_TASK_TIMES = UInt[] diff --git a/src/ChunkingContext.jl b/src/ChunkingContext.jl new file mode 100644 index 0000000..0c29ad8 --- /dev/null +++ b/src/ChunkingContext.jl @@ -0,0 +1,119 @@ +# When splitting the work among multiple tasks, we aim for each task to have at least this +# many bytes of input to work on (even if it means that some tasks will have nothing to process) +# This is to avoid overhead from too many task switches. +# TODO(#10): make this configurable and find a good default (the current 16 KiB is a guess) +const MIN_TASK_SIZE_IN_BYTES = 16 * 1024 + +_comment_to_bytes(x::AbstractString) = Vector{UInt8}(x) +_comment_to_bytes(x::Char) = _comment_to_bytes(ncodeunits(x) > 1 ? string(x) : UInt8(x)) +_comment_to_bytes(x::UInt8) = [x] +_comment_to_bytes(x::Vector{UInt8}) = x +_comment_to_bytes(::Nothing) = nothing + +""" + ChunkingContext( + buffersize::Integer, + nworkers::Integer, + limit::Integer, + comment::Union{Nothing,UInt8,String,Char,Vector{UInt8}} + ) -> ChunkingContext + +A context object used to coordinate parallel parsing of a single file, chunk by chunk. + +The user can use this object to specify the size of the byte buffer(s) to allocate, the +number of worker tasks to spawn and the maximum number of rows to parse in `parse_file_parallel`. + +# Arguments: +- `buffersize`: the size of the byte buffer to allocate . + If the input is bigger than `buffersize`, a secondary `ChunkingContext` object will be used to + double-buffer the input, which will allocate a new buffer of the same size as `buffersize`. +- `nworkers`: the number of worker tasks that should be spawned in `parse_file_parallel`. +- `limit`: the maximum number of rows to parse, see `limit_eols!`, by default no limit is set. +- `comment`: the comment prefix to skip, if any. By default no comment prefix is skipped. + +# Notes: +- One can use the `id` and `buffer_refills` fields to uniquely identify a chunk of input. +The `id` field is necessary because we internally create a secondary `ChunkingContext` object, with +`id` equal to the `id` of the original `ChunkingContext` + 1. +- The `counter` field is used to synchronize the parser/consumer tasks. +- The `newline_positions` field is used to store the newline positions in the input. +- The `bytes` field is used to store the raw bytes ingested from the input. +- `comment` can be used to skip the *initial* comment lines in the `skip_rows_init!`. This value is also passed to `populate_result_buffer!` for user to apply handle commented rows in the middle of the file during parsing (`_startswith` could be used to do the check). +- The `buffersize` should be large enough to fit the longest row in the input, otherwise the lexer will fail. +- The `buffersize` should be chosen such that each of the `nworkers` tasks has enough bytes to work on. Using 1MiB per task seems to work reasonably well in practice. + +# See also: +- [`parse_file_parallel`](@ref), [`parse_file_serial`](@ref) +""" +struct ChunkingContext + id::Int # id of the chunking context (1 or 2) + counter::TaskCounter # synchronization mechanism to coordinate parsing + newline_positions::BufferedVector{Int32} # positions of newlines in the bytes + bytes::Vector{UInt8} # raw bytes ingested from the input + nworkers::Int # number of worker tasks + limit::Int # maximum number of rows to parse, see `limit_eols!` + # byte prefix to skip, used in `skip_rows_init!` and handed to `populate_result_buffer!` + # for user to handle with consistently (`_startswith` could be used to do the check) + comment::Union{Nothing,Vector{UInt8}} + # number of times we refilled the buffer, can be combined with `id` to uniquely identify a chunk + buffer_refills::Base.RefValue{Int} +end +function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer=0, comment::Union{Nothing,UInt8,String,Char,Vector{UInt8}}=nothing) + (4 <= buffersize <= typemax(Int32)) || throw(ArgumentError("`buffersize` argument must be larger than 4 and smaller than 2_147_483_648 bytes.")) + (0 < nworkers < 256) || throw(ArgumentError("`nworkers` argument must be larger than 0 and smaller than 256.")) + (0 <= limit <= typemax(Int)) || throw(ArgumentError("`limit` argument must be positive and smaller than 9_223_372_036_854_775_808.")) + # TRACING # clear_traces!(nworkers) + return ChunkingContext( + 1, + TaskCounter(), + BufferedVector{Int32}(Int32[0], 1), + Vector{UInt8}(undef, buffersize), + nworkers, + limit, + _comment_to_bytes(comment), + Ref(0), + ) +end +# Convenience for double-buffering +function ChunkingContext(ctx::ChunkingContext) + out = ChunkingContext( + ctx.id + 1, + TaskCounter(), + BufferedVector{Int32}(Vector{Int32}(undef, max(1, length(ctx.newline_positions))), 1), + similar(ctx.bytes), + ctx.nworkers, + ctx.limit, + ctx.comment, + Ref(0), + ) + out.newline_positions.elements[1] = 0 + return out +end +tasks_per_chunk(ctx::ChunkingContext) = ctx.nworkers +total_result_buffers_count(ctx::ChunkingContext) = 2tasks_per_chunk(ctx) +last_newline_at(ctx::ChunkingContext) = Int(last(ctx.newline_positions)) +function should_use_parallel(ctx::ChunkingContext, _force) + return !( + _force === :serial || + ((_force !== :parallel) && (Threads.nthreads() == 1 || ctx.nworkers == 1 || last_newline_at(ctx) < MIN_TASK_SIZE_IN_BYTES)) + ) +end + +# Instead of splitting the newlines among `nworker` tasks equally, we try to ensure that each task +# has at least `MIN_TASK_SIZE_IN_BYTES` bytes of input to work on. For smaller inputs or for +# the last, trailing bytes of a bigger file, there won't be enough newlines to utilize each +# of the `nworkers` tasks properly, so we'll send out fewer chunks of work that are bigger to +# the parsing queue. +function estimate_task_size(ctx::ChunkingContext) + eols = ctx.newline_positions + length(eols) == 1 && return 1 # empty file + bytes_to_parse = last(eols) + rows = length(eols) # actually rows + 1 + buffersize = length(ctx.bytes) + # There are `2*nworkers` result buffers total, but there are `nworkers` tasks per chunk + prorated_maxtasks = ceil(Int, tasks_per_chunk(ctx) * (bytes_to_parse / buffersize)) + # Lower bound is 2 because length(eols) == 2 => 1 row + # bump min rows if average row is much smaller than MIN_TASK_SIZE_IN_BYTES + min_rows = max(2, cld(MIN_TASK_SIZE_IN_BYTES, cld(bytes_to_parse, rows))) + return min(max(min_rows, cld(rows, prorated_maxtasks)), rows) +end diff --git a/src/ConsumeContexts.jl b/src/ConsumeContexts.jl index 66e7d3b..977a3fc 100644 --- a/src/ConsumeContexts.jl +++ b/src/ConsumeContexts.jl @@ -7,6 +7,17 @@ using ..ChunkedBase: set!, dec! export AbstractConsumeContext, SkipContext export setup_tasks!, consume!, task_done!, sync_tasks, cleanup +""" + AbstractConsumeContext + +End users should subtype this to create custom consume contexts which are then +used in `parse_file_parallel` and `parse_file_serial`, to dispatch on their +`populate_result_buffer!` method. + +# See also: +- [`parse_file_parallel`](@ref), [`parse_file_serial`](@ref), [`populate_result_buffer!`](@ref) +- See also [`consume!`](@ref), [`setup_tasks!`](@ref), [`setup_tasks!`](@ref), [`cleanup`](@ref), [`AbstractResultBuffer`](@ref) +""" abstract type AbstractConsumeContext end """ @@ -14,7 +25,7 @@ abstract type AbstractConsumeContext end Override with your `AbstractConsumeContext` to provide a custom logic for processing the parsed results in `AbstractResultBuffer`. The method is called from multiple tasks in parallel, just after each corresponding `task_buf` has been populated. -`task_buf` is only filled once per chunk. +`task_buf` is only filled once per chunk and is only accessed by one task at a time. See also [`consume!`](@ref), [`setup_tasks!`](@ref), [`setup_tasks!`](@ref), [`cleanup`](@ref), [`AbstractResultBuffer`](@ref) """ @@ -34,11 +45,11 @@ is considered to be entirely processed. This function is called just after the we're done detecting newline positions in the current chunk of data and we are about to submit partitions of the detected newlines to the parse/consume tasks. -`ntasks` is between 1 and two times the `nworkers` agrument to `parse_file`, depeneding on -the size of the input. Most of the time, the value is `2*nworkers` is used, but for smaller -buffer sizes, smaller files or when handling the last bytes of the file, `ntasks` will be -smaller as we try to ensure the minimal average tasks size if terms of bytes of input is at least -$(Base.format_bytes(MIN_TASK_SIZE_IN_BYTES)). For `:serial` parsing mode, `ntasks` is always 1. +`ntasks` is between 1 and `nworkers` argument to `parse_file`, depending on the size of the input. +Most of the time, the value is `nworkers` is used, but for smaller buffer sizes, smaller files or +when handling the last bytes of the file, `ntasks` will be smaller as we try to ensure the minimal +average tasks size if terms of bytes of input is at least $(Base.format_bytes(MIN_TASK_SIZE_IN_BYTES)). +For `:serial` parsing mode, `ntasks` is always 1. You should override this method when you further subdivide the amount of concurrent work on the chunk, e.g. when you want to process each column separately in `@spawn` tasks, in which case you'd expect @@ -67,7 +78,7 @@ function task_done!(::AbstractConsumeContext, chunking_ctx::ChunkingContext) return nothing end -# TODO: If we want to support schema inference, this would be a good place to sync a `Vector{TaskResultBuffer}` belonging to current `parsing_ctx` +# TODO(#11): If we want to support schema inference, this would be a good place to sync a `Vector{TaskResultBuffer}` belonging to current `parsing_ctx` """ sync_tasks(consume_ctx::AbstractConsumeContext, chunking_ctx::ChunkingContext) diff --git a/src/ParsingContexts.jl b/src/ParsingContexts.jl new file mode 100644 index 0000000..ad7c95c --- /dev/null +++ b/src/ParsingContexts.jl @@ -0,0 +1,65 @@ +""" + populate_result_buffer!( + result_buf::AbstractResultBuffer, + newline_segment:AbstractVector{Int32}, + parsing_ctx::AbstractParsingContext, + bytes::Vector{UInt8}, + comment::Union{Nothing,Vector{UInt8}}=nothing, + ::Type{CT}=Tuple{} + ) where {CT} + +Override with your `AbstractParsingContext` to provide custom logic for parsing the input bytes +in `parsing_ctx.bytes` between the newline positions in `newline_segment` into `result_buf`. +The method is called from multiple tasks in parallel, each having a different `newline_segment`, +some sharing the same `parsing_ctx.bytes`. The `result_buf` is only accessed by one task at a time. + +# Arguments: +* `result_buf`: a user-provided object which is meant to store the parsing results from this function +* `newline_segment`: a vector of newline positions in `bytes` which delimit the rows of the input. +* `parsing_ctx`: a user-provided object that is used to dispatch to this method and carry parsing specific config +* `bytes`: the raw bytes ingested from the input +* `comment`: the comment prefix to skip, if any +* `CT`: an optional, compile-time known object which was passed to `parse_file_parallel` / `parse_file_serial` + +# Notes: +Each consecutive pair of `newline_segment` values defines an exclusive range of bytes in `bytes` which +constitutes a single row. + +The range needs to be treated as exclusive because we add a fictional newline at the beginning of the chunk +at position 0 and past the end of the file if it doesn't end on a newline. +A safe way of processing each row would be e.g.: + +``` +start_index = first(newline_segment) +for i in 2:length(newline_segment) + end_index = newline_segment[i] + row_bytes = view(bytes, start_index+1:end_index-1) # +/- 1 is needed! + + # ... actually populate the result_buf + + start_index = end_index +end +``` +""" +function populate_result_buffer! end +""" + AbstractParsingContext + +Users should subtype this to create custom parsing contexts variables which are +then used in `parse_file_parallel` / `parse_file_serial`, to dispatch on their +`populate_result_buffer!` method. + +# See also: +- [`parse_file_parallel`](@ref), [`parse_file_serial`](@ref), [`populate_result_buffer!`](@ref) +""" +abstract type AbstractParsingContext end +""" + AbstractResultBuffer + +Users should subtype this to create custom result buffer objects to store the +parsed results in `populate_result_buffer!`. + +# See also: +- [`parse_file_parallel`](@ref), [`parse_file_serial`](@ref), [`populate_result_buffer!`](@ref) +""" +abstract type AbstractResultBuffer end diff --git a/src/_traces.jl b/src/_traces.jl index 72a4a74..5d09403 100644 --- a/src/_traces.jl +++ b/src/_traces.jl @@ -1,15 +1,16 @@ using GLMakie, ChunkedBase function plot_traces() - t1 = copy(ChunkedBase.T1) - t2 = copy(ChunkedBase.T2) - io_task = copy(ChunkedBase.IO_TASK_TIMES) - lexer_task = copy(ChunkedBase.LEXER_TASK_TIMES) - parser_tasks = filter(x->length(x)>0, ChunkedBase.PARSER_TASKS_TIMES) - consume_tasks = filter(x->length(x)>0, ChunkedBase.CONSUMER_TASKS_TIMES) + t1 = copy(ChunkedBase.T1) # total parse/consume time for the first byte buffer by all workers + t2 = copy(ChunkedBase.T2) # total parse/consume time for the second byte buffer by all workers + io_task = copy(ChunkedBase.IO_TASK_TIMES) # time spent in IO + lexer_task = copy(ChunkedBase.LEXER_TASK_TIMES) # time spent in lexer + parser_tasks = filter(x->length(x)>0, ChunkedBase.PARSER_TASKS_TIMES) # individual parse/consume times for each worker + consume_tasks = filter(x->length(x)>0, ChunkedBase.CONSUMER_TASKS_TIMES) # optional extensions for consumes that spawn tasks start = Int(mapreduce(first, min, parser_tasks, init=min(io_task[1], lexer_task[1]))) + # convert to seconds, subtract start time lexer_timing = map(x->(x - start) / (1e9), lexer_task) io_timing = map(x->(x - start) / (1e9), io_task) pa_timings = map.(x->(x - start) / (1e9), parser_tasks) diff --git a/src/exceptions.jl b/src/exceptions.jl index e360032..9f51945 100644 --- a/src/exceptions.jl +++ b/src/exceptions.jl @@ -1,8 +1,8 @@ abstract type FatalLexingError <: Exception end Base.showerror(io::IO, e::FatalLexingError) = print(io, e.msg) -# TODO: Add some data to help debug the problematic file, like the first row with an escape character -# and/or the quote character. +# TODO(#12): Add some data to help debug the problematic file, like the first row with an escape character +# and/or the quote character. struct NoValidRowsInBufferError <: FatalLexingError msg::String buffersize::Int diff --git a/src/parser_parallel.jl b/src/parser_parallel.jl index 02f48ba..0e6b010 100644 --- a/src/parser_parallel.jl +++ b/src/parser_parallel.jl @@ -1,4 +1,9 @@ -# What the read_and_lex_task! task submits to the many parser tasks +# What the `read_and_lex_task!` task submits to the parser tasks via a Channel +# * task_start: first index in the newline_positions buffer the parser task should process +# * task_end: first index in the newline_positions buffer the parser task should process +# * row_num: the (global) row number of the first newline in the chunk +# * task_num: the index of the task (1-based) +# * use_current_context: whether to use the current or the next chunking context (double-buffering) const SubtaskMetadata = @NamedTuple{task_start::Int32, task_end::Int32, row_num::Int, task_num::Int, use_current_context::Bool} function submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx, row_num) @@ -90,6 +95,47 @@ function process_and_consume_task( end end +""" + parse_file_parallel( + lexer::Lexer, + parsing_ctx::AbstractParsingContext, + consume_ctx::AbstractConsumeContext, + chunking_ctx::ChunkingContext, + result_buffers::Vector{<:AbstractResultBuffer}, + ::Type{CT}=Tuple{} + ) where {CT} -> Nothing + +Parse the file in `lexer.io` in parallel using `chunking_ctx.nworkers` tasks. User must provide +a `populate_result_buffer!` method which is used to parse ingested data in `chunking_ctx.bytes`, using the +newline positions in `chunking_ctx.newline_positions` as row boundaries into the `result_buffers`. +The `consume!` method is called after each `populate_result_buffer!` call, so the user can process +the parsed results in parallel. No `result_buffer` is accessed by more than one task at a time. + +# Arguments: +* `lexer`: a `NewlineLexers.Lexer` object which is used to find newline positions in the input. The +type of the lexer affects whether the search is quote-aware or not. +* `parsing_ctx`: a user-provided object which is passed to `populate_result_buffer!` +* `consume_ctx`: a user-provided object which is passed to `consume!` +* `chunking_ctx`: an internal object that is used to keep track of the current chunk of data being processed +* `result_buffers`: a vector of user-provided objects which are used to store the parsed results +* `CT`: an optional, compile-time known type which is passed to `populate_result_buffer!`. +This is bit of niche functionality required by ChunkedCSV, which needs to know about +"custom types" at compile time in order to unroll the parsing loop on them. + +# Exceptions: +* `UnmatchedQuoteError`: if the input ends with an unmatched quote +* `NoValidRowsInBufferError`: if not a single newline was found in the input buffer +* `CapturedException`: if an exception was thrown in one of the parser/consumer tasks + +# Notes: +* You can initialize the `chunking_ctx` yourself using `read_and_lex!` or with `initial_read!` + `initial_lex!` +which gives you the opportunity to sniff the first chunk of data before you call `parse_file_parallel`. +* If the input is bigger than `chunking_ctx.bytes`, a secondary `chunking_ctx` object will be used to +double-buffer the input, which will allocate a new buffer of the same size as `chunking_ctx.bytes`. +* This function spawns `chunking_ctx.nworkers` + 1 tasks. + +See also [`populate_result_buffer!`](@ref), [`consume!`](@ref), [`parse_file_serial`](@ref). +""" function parse_file_parallel( lexer::Lexer, parsing_ctx::AbstractParsingContext, @@ -100,6 +146,10 @@ function parse_file_parallel( ) where {CT} @assert chunking_ctx.id == 1 length(result_buffers) != total_result_buffers_count(chunking_ctx) && ArgumentError("Expected $(total_result_buffers_count(chunking_ctx)) result buffers, got $(length(result_buffers)).") + # In case we were given an uninitialized chunking_ctx, we need to fill it first + if chunking_ctx.buffer_refills[] == 0 && !lexer.done + read_and_lex!(lexer, chunking_ctx) + end parsing_queue = Channel{SubtaskMetadata}(Inf) if lexer.done diff --git a/src/parser_serial.jl b/src/parser_serial.jl index 0c8b421..677a3ff 100644 --- a/src/parser_serial.jl +++ b/src/parser_serial.jl @@ -1,3 +1,18 @@ +""" + parse_file_serial( + lexer::Lexer, + parsing_ctx::AbstractParsingContext, + consume_ctx::AbstractConsumeContext, + chunking_ctx::ChunkingContext, + result_buf::AbstractResultBuffer, + ::Type{CT}=Tuple{}, + ) where {CT} + +The serial analog of `parse_file_parallel` which doesn't spawn any tasks, +useful for debugging and processing very small files. + +See also [`populate_result_buffer!`](@ref), [`consume!`](@ref), [`parse_file_parallel`](@ref). +""" function parse_file_serial( lexer::Lexer, parsing_ctx::AbstractParsingContext, @@ -6,6 +21,10 @@ function parse_file_serial( result_buf::AbstractResultBuffer, ::Type{CT}=Tuple{}, ) where {CT} + # In case we were given an uninitialized chunking_ctx, we need to fill it first + if chunking_ctx.buffer_refills[] == 0 && !lexer.done + read_and_lex!(lexer, chunking_ctx) + end row_num = 1 _comment = chunking_ctx.comment try diff --git a/src/payload.jl b/src/payload.jl index e41003b..c62a52f 100644 --- a/src/payload.jl +++ b/src/payload.jl @@ -2,14 +2,29 @@ # ParsedPayload # -# What we send to the consume! method +""" + ParsedPayload{B, C<:AbstractParsingContext} + +A payload of parsed results, which is passed to `consume!` after each `populate_result_buffer!` call. + +# Fields: +- `row_num::Int`: row number of the first row in the payload +- `len::Int`: number of rows in the payload +- `results::B`: parsed result buffer which was populated by `populate_result_buffer!` +- `parsing_ctx::C`: library-provided data (to distinguish JSONL and CSV processing) +- `chunking_ctx::ChunkingContext`: contains the raw bytes, synchronization objects and newline positions +- `eols_buffer_index::Int32`: The start index of the newline positions in `chunking_ctx.newline_positions` that this payload corresponds to. + +# See also: +- [`consume!`](@ref), [`AbstractParsingContext`](@ref), [`ChunkingContext`](@ref), [`AbstractResultBuffer`](@ref), [`PayloadOrderer`](@ref) +""" struct ParsedPayload{B, C<:AbstractParsingContext} - row_num::Int - len::Int - results::B - parsing_ctx::C - chunking_ctx::ChunkingContext - eols_buffer_index::Int32 + row_num::Int # row number of the first row in the payload + len::Int # number of rows in the payload + results::B # parsed result buffer which was populated by `populate_result_buffer!` + parsing_ctx::C # library-provided data (to distinguish JSONL and CSV processing) + chunking_ctx::ChunkingContext # internal data to facilitate chunking and synchronization + eols_buffer_index::Int32 # The start index of the newline positions in `chunking_ctx.newline_positions` that this payload corresponds to. end Base.length(payload::ParsedPayload) = payload.len last_row(payload::ParsedPayload) = payload.row_num + length(payload) - 1 @@ -24,7 +39,52 @@ function insertsorted!(arr::Vector{T}, x::T, by=identity) where {T} return idx end -# Like a Channel, but when you take! a Payload from it, it will be the next one in order + +""" + PayloadOrderer{B, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}} + +A channel-like object that ensures that the payloads are consumed in order. + +To use the `PayloadOrderer` you should create your own `AbstractConsumeContext` that contains it +and override the `consume!` to only `put!` payloads in the `PayloadOrderer` and `take!` payloads +from it using a separate task. For example: + +``` +struct MyConsumeContext <: AbstractConsumeContext + orderer::PayloadOrderer{MyResultBuffer, MyParsingContext} +end + +# Forward the payloads, which will arrive in random order, to the orderer +function ChunkedBase.consume!(consume_ctx::MyConsumeContext, payload::ParsedPayload) + put!(consume_ctx.orderer, payload) +end + +# Expect `ChunkedBase.task_done!` to be called twice per payload. +# By default, we'd do it once after every `consume!` +# But we'll add another one in the task that takes from the orderer. +# This will make sure that the current chunk won't ger recycled after our task is done with it. +function ChunkedBase.setup_tasks!(::MyConsumeContext, chunking_ctx::ChunkingContext, ntasks::Int) + set!(chunking_ctx.counter, 2*ntasks) +end + +consume_ctx = MyConsumeContext(PayloadOrderer{MyResultBuffer, MyParsingContext}()) + +# Our task that needs to process the payloads in order +@spawn begin + while true + payload = take!(consume_ctx.orderer) + do_something_that_requires_ordered_results(payload) + task_done!(consume_ctx, payload.chunking_ctx) + end +end + +parse_file_parallel(lexer, parsing_ctx, consume_ctx, chunking_ctx, result_buf) +``` +NOTE: It is not safe to call `take!` from multiple tasks on a `PayloadOrderer`. + +# See also: +- [`ParsedPayload`](@ref) +""" mutable struct PayloadOrderer{B, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}} queue::Channel{ParsedPayload{B,C}} expected_row::Int @@ -41,6 +101,7 @@ function _reenqueue_ordered!(queue::Channel{T}, waiting_room::Vector{T}, payload payload = first(waiting_room) if payload.row_num == (nrows + row) put!(queue, popfirst!(waiting_room)) + row = payload.row_num else break end @@ -51,10 +112,10 @@ function _reorder!(queue::Channel{T}, waiting_room::Vector{T}, payload::T, expec row = payload.row_num if row == expected_row _reenqueue_ordered!(queue, waiting_room, payload) - return false + return false # we don't need to keep searching, we found the next payload in order end insertsorted!(waiting_room, payload, x->x.row_num) - return true + return true # we need to keep searching the next payload in order end Base.put!(o::PayloadOrderer{B,C}, x::ParsedPayload{B,C}) where {B,C} = put!(o.queue, x) diff --git a/src/read_and_lex.jl b/src/read_and_lex.jl index bee9caa..4c51067 100644 --- a/src/read_and_lex.jl +++ b/src/read_and_lex.jl @@ -12,7 +12,7 @@ function readbytesall!(io::IO, buf, n::Integer) end function prepare_buffer!(io::IO, buf::Vector{UInt8}, last_newline_at::Int) - ptr = pointer(buf) + ptr = pointer(buf) # `buf` is rooted in ChunkingContext, so we shouldn't need to GC.@preserve it buffersize = length(buf) @inbounds if last_newline_at == 0 # This is the first time we saw the buffer, we'll fill it up and skip leading BOM @@ -34,8 +34,10 @@ function prepare_buffer!(io::IO, buf::Vector{UInt8}, last_newline_at::Int) end function check_any_valid_rows(lexer, chunking_ctx) + # we always prepend a 0 to newline_positions as a fake newline from previous chunk + # thus there must be at least 2 elements in newline_positions to have a valid row eols = chunking_ctx.newline_positions - if (length(eols) == 0 || (length(eols) == 1 && first(eols) == 0)) && !eof(lexer.io) # TODO: check done instead of eof? + if (length(eols) == 0 || (length(eols) == 1 && first(eols) == 0)) && !eof(lexer.io) close(lexer.io) throw(NoValidRowsInBufferError(length(chunking_ctx.bytes))) end @@ -52,12 +54,14 @@ function handle_file_end!(lexer::Lexer, eols, end_pos) end end - +# Fill the chunking_ctx.bytes buffer with bytes from the input and find newlines in it +# The trailing bytes between the last newline and the end of the buffer are copied to the +# beginning of the buffer and the rest of the buffer is refilled. function read_and_lex!(lexer::Lexer, chunking_ctx::ChunkingContext, _last_newline_at=last_newline_at(chunking_ctx)) @assert !lexer.done empty!(chunking_ctx.newline_positions) - push!(chunking_ctx.newline_positions, Int32(0)) + push!(chunking_ctx.newline_positions, Int32(0)) # fake newline from previous chunk if eof(lexer.io) # Catches the empty input case lexer.done = true return nothing @@ -65,7 +69,6 @@ function read_and_lex!(lexer::Lexer, chunking_ctx::ChunkingContext, _last_newlin bytes_read_in = prepare_buffer!(lexer.io, chunking_ctx.bytes, _last_newline_at) chunking_ctx.buffer_refills[] += 1 - # _last_newline_at == 0 && bytes_read_in == 0 && return nothing # only BOM / whitespace in the buffer start_pos = _last_newline_at == 0 ? 1 : length(chunking_ctx.bytes) - _last_newline_at + 1 end_pos = start_pos + bytes_read_in - 1 @@ -78,6 +81,8 @@ function read_and_lex!(lexer::Lexer, chunking_ctx::ChunkingContext, _last_newlin return nothing end +# Separate initial read and lex function for package that sniff the file first (e.g. to detect newline character) +# This function should be paired with `initial_lex!` to properly initialize the chunking context function initial_read!(io, chunking_ctx, skip_leading_whitespace=false) # First ingestion of raw bytes from io buf = chunking_ctx.bytes @@ -87,7 +92,6 @@ function initial_read!(io, chunking_ctx, skip_leading_whitespace=false) bytes_read_in = readbytesall!(io, buf, buffersize) chunking_ctx.buffer_refills[] += 1 - # bytes_read_in = _skip_over_initial_whitespace_and_bom!(io, buf, bytes_read_in) starts_with_bom = bytes_read_in > 2 && _hasBOM(buf) if skip_leading_whitespace @@ -107,7 +111,7 @@ function initial_read!(io, chunking_ctx, skip_leading_whitespace=false) # We found a non-space byte -- we'll left-shift the spaces before it so that the buffer # begins with a valid value and we'll try to refill the rest of the buffer. - # If first_valid_byte was already at the beginnig of the buffer, we don't have to do + # If first_valid_byte was already at the beginning of the buffer, we don't have to do # anything. skip_over = first_valid_byte - 1 if skip_over > 0 diff --git a/src/read_and_lex_utils.jl b/src/read_and_lex_utils.jl index 515b3a0..12c4da0 100644 --- a/src/read_and_lex_utils.jl +++ b/src/read_and_lex_utils.jl @@ -1,3 +1,5 @@ +# If we don't find any `\n` but we do find a `\r`, we assume the file uses `\r` as a newline, +# '\n' otherwise. function _detect_newline(buf, pos, len) len == 0 && return UInt8('\n') # empty file @assert 1 <= pos <= len <= length(buf) @@ -18,6 +20,10 @@ function _hasBOM(bytes::Vector{UInt8}) return @inbounds bytes[1] == 0xef && bytes[2] == 0xbb && bytes[3] == 0xbf end +# Either we were looking for `\n` as a newline char and then an empty row has +# two newline positions next to each other or there is a single byte between them +# and it's `\r`. Or we were looking for `\r` in which case we only consider two +# neighboring `\r` as an empty row. function _isemptyrow(prev_nl, next_nl, bytes) return prev_nl + 1 == next_nl || (prev_nl + 2 == next_nl && @inbounds(bytes[prev_nl+1]) == UInt8('\r')) end @@ -32,10 +38,7 @@ _input_to_io(input::IO, use_mmap::Bool) = false, input function _input_to_io(input::String, use_mmap::Bool) ios = open(input, "r") if !eof(ios) && peek(ios, UInt16) == 0x8b1f - # TODO: GzipDecompressorStream doesn't respect MmapStream reaching EOF for some reason - # io = CodecZlibNG.GzipDecompressorStream(use_mmap ? MmapStream(ios) : ios) - use_mmap && @warn "`use_mmap=true` is currently unsupported when reading gzipped files, using file io." - io = CodecZlibNG.GzipDecompressorStream(ios) + io = CodecZlibNG.GzipDecompressorStream(use_mmap ? MmapStream(ios) : ios) elseif use_mmap io = MmapStream(ios) else @@ -48,6 +51,7 @@ end # limit_eols! # +# We can set a limit in our `chunking_ctx` to limit the number of rows we parse function limit_eols!(chunking_ctx::ChunkingContext, row_num::Int) _limit = chunking_ctx.limit _limit == 0 && return false @@ -71,9 +75,17 @@ function _startswith(s::AbstractVector{UInt8}, soff::Integer, prefix::AbstractVe return true end _startswith(s::AbstractVector{UInt8}, prefix::AbstractVector{UInt8}) = _startswith(s, 0, prefix) +# nothing for a comment / prefix means we are not skipping commented rows _startswith(s, soff, prefix::Nothing) = false _startswith(s, prefix::Nothing) = false +# Skip rows at the beginning of the file, optionally skipping empty rows as well. +# If the inputs has fewer lines than `rows_to_skip`, we skip the whole input. +# If we exhaust the buffer during skipping, we refill it and continue skipping. +# If we specify comment in in `chunking_ctx`, we skip commented rows as well. +# If we specify `ignoreemptyrows=true`, we skip empty rows as well. +# If we'are skipping empty rows and/or comments, we might skip more rows than `rows_to_skip`, +# we'll continue skipping until we find a non-empty, non-commented row. function skip_rows_init!(lexer, chunking_ctx, rows_to_skip, ignoreemptyrows=false) input_is_empty = length(chunking_ctx.newline_positions) == 1 input_is_empty && (return 0) @@ -141,6 +153,7 @@ MmapStream(ios::IO) = MmapStream(ios, Mmap.mmap(ios, grow=false, shared=false), Base.close(m::MmapStream) = close(m.ios) Base.eof(m::MmapStream) = m.pos > length(m.x) function readbytesall!(io::MmapStream, buf, n::Int) + # `io` and `buf` are rooted in ChunkingContext, so we don't need to preserve them bytes_to_read = min(bytesavailable(io), n) unsafe_copyto!(pointer(buf), pointer(io.x) + io.pos - 1, bytes_to_read) io.pos += bytes_to_read diff --git a/test/e2e_tests.jl b/test/e2e_tests.jl index dfc1832..4cb2a85 100644 --- a/test/e2e_tests.jl +++ b/test/e2e_tests.jl @@ -35,17 +35,17 @@ ChunkedBase.task_done!(::TestConsumeContext, ::ChunkingContext) = nothing ### TestThrowingContext #################################################################### struct TestThrowingContext <: AbstractConsumeContext - tasks::Vector{Task} - conds::Vector{TaskCounter} + tasks::Base.IdSet{Task} + conds::Base.IdSet{TaskCounter} throw_row::Int end -TestThrowingContext(throw_row) = TestThrowingContext(Task[], ChunkedBase.TaskCounter[], throw_row) +TestThrowingContext(throw_row) = TestThrowingContext(Base.IdSet{Task}(), Base.IdSet{TaskCounter}(), throw_row) function ChunkedBase.consume!(ctx::TestThrowingContext, payload::ParsedPayload) t = current_task() c = payload.chunking_ctx.counter - c in ctx.conds || push!(ctx.conds, c) - t in ctx.tasks || push!(ctx.tasks, t) + push!(ctx.conds, c) + push!(ctx.tasks, t) payload.row_num >= ctx.throw_row && error("These contexts are for throwing, and that's all what they do") sleep(0.01) # trying to get the task off a fast path to claim everything from the parsing queue return nothing @@ -328,8 +328,8 @@ end ) end @assert !isempty(throw_ctx.tasks) - @test throw_ctx.tasks[1] === current_task() - @test throw_ctx.conds[1].exception isa ErrorException + @test only(throw_ctx.tasks) === current_task() + @test only(throw_ctx.conds).exception isa ErrorException end @testset "parallel" begin @@ -348,8 +348,8 @@ end sleep(0.2) @test length(throw_ctx.tasks) == nworkers @test all(istaskdone, throw_ctx.tasks) - @test throw_ctx.conds[1].exception isa CapturedException - @test throw_ctx.conds[1].exception.ex.msg == "These contexts are for throwing, and that's all what they do" + @test first(throw_ctx.conds).exception isa CapturedException + @test first(throw_ctx.conds).exception.ex.msg == "These contexts are for throwing, and that's all what they do" end end @@ -366,10 +366,9 @@ end ) end @assert !isempty(throw_ctx.tasks) - @test throw_ctx.tasks[1] === current_task() - @test throw_ctx.conds[1].exception isa ErrorException + @test only(throw_ctx.tasks) === current_task() + @test only(throw_ctx.conds).exception isa ErrorException end - @testset "parallel" begin throw_ctx = TestThrowingContext(typemax(Int)) # Only capture tasks, let IO do the throwing nworkers = min(3, Threads.nthreads()) @@ -385,10 +384,13 @@ end sleep(0.2) @test length(throw_ctx.tasks) == min(3, Threads.nthreads()) @test all(istaskdone, throw_ctx.tasks) - @test throw_ctx.conds[1].exception isa CapturedException - @test throw_ctx.conds[1].exception.ex.task.result.msg == "That should be enough data for everyone" - @test throw_ctx.conds[2].exception isa CapturedException - @test throw_ctx.conds[2].exception.ex.task.result.msg == "That should be enough data for everyone" + conds = collect(throw_ctx.conds) + cond = pop!(conds) + @test cond.exception isa CapturedException + @test cond.exception.ex.task.result.msg == "That should be enough data for everyone" + cond = pop!(conds) + @test cond.exception isa CapturedException + @test cond.exception.ex.task.result.msg == "That should be enough data for everyone" end end end diff --git a/test/runtests.jl b/test/runtests.jl index dd743c2..b108b38 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -459,6 +459,7 @@ end @testset "initial_lex!" begin lexer = NewlineLexers.Lexer(IOBuffer("1"), nothing, UInt8('\n')) ctx = ChunkingContext(4, 1, 0, nothing) + ctx.bytes .= false @assert lexer.done == false @assert ctx.newline_positions == [0] ChunkedBase.initial_lex!(lexer, ctx, 0) @@ -466,6 +467,7 @@ end lexer = NewlineLexers.Lexer(IOBuffer("1"), nothing, UInt8('\n')) ctx = ChunkingContext(4, 1, 0, nothing) + ctx.bytes .= false @assert lexer.done == false @assert ctx.newline_positions == [0] seekend(lexer.io) @@ -489,6 +491,7 @@ end lexer = NewlineLexers.Lexer(IOBuffer("1"), nothing, UInt8('\n')) ctx = ChunkingContext(4, 1, 0, nothing) + ctx.bytes .= false ctx.newline_positions.elements[1] = 1 @test_throws AssertionError ChunkedBase.initial_lex!(lexer, ctx, 0) @test_throws AssertionError ChunkedBase.initial_lex!(lexer, ctx, 5) @@ -496,6 +499,7 @@ end lexer = NewlineLexers.Lexer(IOBuffer("1"), nothing, UInt8('\n')) lexer.done = true ctx = ChunkingContext(4, 1, 0, nothing) + ctx.bytes .= false @test_throws AssertionError ChunkedBase.initial_lex!(lexer, ctx, 0) @test_throws AssertionError ChunkedBase.initial_lex!(lexer, ctx, 5)