Skip to content

Commit

Permalink
More docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Drvi committed Nov 9, 2023
1 parent 18dd946 commit 8981ec9
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 20 deletions.
13 changes: 5 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Packages like `ChunkedCSV.jl` and `ChunkedJSONL.jl` hook into this structure by
The main entry point of this package is the `parse_file_parallel` function, which accepts several "context" arguments, each controlling a different aspect of the process:
```julia
function parse_file_parallel(
lexer::NewlineLexers.Lexer,
lexer::Lexer,
parsing_ctx::AbstractParsingContext,
consume_ctx::AbstractConsumeContext,
chunking_ctx::ChunkingContext,
Expand All @@ -39,7 +39,6 @@ Since `ChunkedBase.jl` handles newline detection automatically, a very simple ch

```julia
using ChunkedBase
using NewlineLexers: Lexer

# Our chunked processor doesn't require any settings nor does it need to maintain
# any additional state, so we'll define our `ParsingContext` and `ConsumeContext` only
Expand Down Expand Up @@ -86,20 +85,18 @@ function print_newlines(io, buffersize, nworkers)
lexer = Lexer(io, nothing, '\n')
parsing_ctx = ParsingContext()
consume_ctx = ConsumeContext()
chunking_ctx = ChunkingContext(buffersize, nworkers, 0, nothing)
chunking_ctx = ChunkingContext(buffersize, nworkers)
# ChunkedBase.jl requires 2 result buffers per worker task, we'd get an error otherwise
result_buffers = [ResultBuffer(Int[]) for _ in 1:2nworkers]
# We need to fill the byte buffer and find the newlines in it before we hand
# the `consume_ctx` to the function
ChunkedBase.read_and_lex!(lexer, chunking_ctx)

parse_file_parallel(lexer, parsing_ctx, consume_ctx, chunking_ctx, result_buffers)
return nothing
end
```
Let's run it on some data:
```julia
julia> io = IOBuffer((("x" ^ 4095) * "\n") ^ 64); # 256KiB
julia> print_newlines(io, 64*1024, 4);
julia> print_newlines(io, 64 * 1024, 4);
# [ Info: Newlines in chunk (id:(1, 1)): [16384, 20480, 24576, 28672, 32768, 36864]
# [ Info: Newlines in chunk (id:(1, 1)): [0, 4096, 8192, 12288, 16384]
# [ Info: Newlines in chunk (id:(1, 1)): [36864, 40960, 45056, 49152, 53248, 57344]
Expand All @@ -117,6 +114,6 @@ julia> print_newlines(io, 64*1024, 4);
# [ Info: Newlines in chunk (id:(2, 2)): [36864, 40960, 45056, 49152, 53248, 57344]
# [ Info: Newlines in chunk (id:(2, 2)): [57344, 61440, 65536]
```
Behind the scenes, `ChunkedBase.jl` was using two 64KiB buffers, finding newlines in them, and splitting the found newlines among 4 tasks. We can see that each of the buffers (identified by the first number in the `id` tuple) was refilled two times (the refill number is the second element of the tuple).
Behind the scenes, `ChunkedBase.jl` was using two 64KiB buffers, finding newlines in them, and splitting the found newlines among 4 tasks. We can see that each of the buffers (identified by the first number in the `id` tuple) was refilled two times (the refill number is the second element of the tuple).
The way we set up our data, there should be one newline every 4KiB of input, so we'd expect 16 newlines per chunk, but we could see that there are 20 numbers reported per chunk -- each newline segment we send to the tasks starts with the last newline position from the previous segment, or 0 for the first segment, so we get 4 duplicated elements in this case.

10 changes: 5 additions & 5 deletions src/ChunkingContext.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ number of worker tasks to spawn and the maximum number of rows to parse in `pars
- `buffersize`: the size of the byte buffer to allocate .
If the input is bigger than `buffersize`, a secondary `ChunkingContext` object will be used to
double-buffer the input, which will allocate a new buffer of the same size as `buffersize`.
- `nworkers`: the number of worker tasks that should be spawned in `parse_file_parallel`
- `limit`: the maximum number of rows to parse, see `limit_eols!`
- `comment`: the comment prefix to skip, if any
- `nworkers`: the number of worker tasks that should be spawned in `parse_file_parallel`.
- `limit`: the maximum number of rows to parse, see `limit_eols!`, by default no limit is set.
- `comment`: the comment prefix to skip, if any. By default no comment prefix is skipped.
# Notes:
- One can use the `id` and `buffer_refills` fields to uniquely identify a chunk of input.
Expand Down Expand Up @@ -58,7 +58,7 @@ struct ChunkingContext
# number of times we refilled the buffer, can be combined with `id` to uniquely identify a chunk
buffer_refills::Base.RefValue{Int}
end
function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer, comment::Union{Nothing,UInt8,String,Char,Vector{UInt8}})
function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer=0, comment::Union{Nothing,UInt8,String,Char,Vector{UInt8}}=nothing)
(4 <= buffersize <= typemax(Int32)) || throw(ArgumentError("`buffersize` argument must be larger than 4 and smaller than 2_147_483_648 bytes."))
(0 < nworkers < 256) || throw(ArgumentError("`nworkers` argument must be larger than 0 and smaller than 256."))
(0 <= limit <= typemax(Int)) || throw(ArgumentError("`limit` argument must be positive and smaller than 9_223_372_036_854_775_808."))
Expand Down Expand Up @@ -101,7 +101,7 @@ end

# Instead of splitting the newlines among `nworker` tasks equally, we try to ensure that each task
# has at least `MIN_TASK_SIZE_IN_BYTES` bytes of input to work on. For smaller inputs or for
# the last, trialing bytes of a bigger file, there won't be enough newlines to utilize each
# the last, trailing bytes of a bigger file, there won't be enough newlines to utilize each
# of the `nworkers` tasks properly, so we'll send out fewer chunks of work that are bigger to
# the parsing queue.
function estimate_task_size(ctx::ChunkingContext)
Expand Down
8 changes: 6 additions & 2 deletions src/parser_parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ end
* `CapturedException`: if an exception was thrown in one of the parser/consumer tasks
# Notes:
* The `chunking_ctx` is assumed to be filled with data whose newline positions are already detected, e.g.
by calling `read_and_lex!` with the `lexer` object on it.
* You can initialize the `chunking_ctx` yourself using `read_and_lex!` or with `initial_read!` + `initial_lex!`
which gives you the opportunity to sniff the first chunk of data before you call `parse_file_parallel`.
* If the input is bigger than `chunking_ctx.bytes`, a secondary `chunking_ctx` object will be used to
double-buffer the input, which will allocate a new buffer of the same size as `chunking_ctx.bytes`.
* This function spawns `chunking_ctx.nworkers` + 1 tasks.
Expand All @@ -144,6 +144,10 @@ function parse_file_parallel(
) where {CT}
@assert chunking_ctx.id == 1
length(result_buffers) != total_result_buffers_count(chunking_ctx) && ArgumentError("Expected $(total_result_buffers_count(chunking_ctx)) result buffers, got $(length(result_buffers)).")
# In case we were given an uninitialized chunking_ctx, we need to fill it first
if chunking_ctx.buffer_refills[] == 0 && !lexer.done
read_and_lex!(lexer, chunking_ctx)
end

parsing_queue = Channel{SubtaskMetadata}(Inf)
if lexer.done
Expand Down
4 changes: 4 additions & 0 deletions src/parser_serial.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ function parse_file_serial(
result_buf::AbstractResultBuffer,
::Type{CT}=Tuple{},
) where {CT}
# In case we were given an uninitialized chunking_ctx, we need to fill it first
if chunking_ctx.buffer_refills[] == 0 && !lexer.done
read_and_lex!(lexer, chunking_ctx)
end
row_num = 1
_comment = chunking_ctx.comment
try
Expand Down
67 changes: 63 additions & 4 deletions src/payload.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,29 @@
# ParsedPayload
#

# What we send to the consume! method
"""
ParsedPayload{B, C<:AbstractParsingContext}
A payload of parsed results, which is passed to `consume!` after each `populate_result_buffer!` call.
# Fields:
- `row_num::Int`: row number of the first row in the payload
- `len::Int`: number of rows in the payload
- `results::B`: parsed result buffer
- `parsing_ctx::C`: library-provided data (to distinguish JSONL and CSV processing)
- `chunking_ctx::ChunkingContext`: contains the raw bytes, synchronization objects and newline positions
- `eols_buffer_index::Int32`: The start index of the newline positions in `chunking_ctx.newline_positions` that this payload corresponds to.
# See also:
- [`consume!`](@ref), [`AbstractParsingContext`](@ref), [`ChunkingContext`](@ref), [`AbstractResultBuffer`](@ref), [`PayloadOrderer`](@ref)
"""
struct ParsedPayload{B, C<:AbstractParsingContext}
row_num::Int # row number of the first row in the payload
len::Int # number of rows in the payload
results::B # parsed result buffer
parsing_ctx::C # library-provided data (to distinguish JSONL and CSV processing)
chunking_ctx::ChunkingContext # internal data to facilitate chunking and synchronization
eols_buffer_index::Int32 # index of the [e]nd-[o]f-[l]ine[s] buffer in the chunking_ctx
eols_buffer_index::Int32 # The start index of the newline positions in `chunking_ctx.newline_positions` that this payload corresponds to.
end
Base.length(payload::ParsedPayload) = payload.len
last_row(payload::ParsedPayload) = payload.row_num + length(payload) - 1
Expand All @@ -24,8 +39,52 @@ function insertsorted!(arr::Vector{T}, x::T, by=identity) where {T}
return idx
end

# Like a Channel, but when you take! a Payload from it, it will be the next one in order
# take! is not threadsafe

"""
PayloadOrderer{B, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}}
A channel-like object that ensures that the payloads are consumed in order.
To use the `PayloadOrderer` you should create your own `AbstractConsumeContext` that contains it
and override the `consume!` to only `put!` payloads in the `PayloadOrderer` and `take!` payloads
from it using a separate task. For example:
```
struct MyConsumeContext <: AbstractConsumeContext
orderer::PayloadOrderer{MyResultBuffer, MyParsingContext}
end
# Forward the payloads, which will arrive in random order, to the orderer
function ChunkedBase.consume!(consume_ctx::MyConsumeContext, payload::ParsedPayload)
put!(consume_ctx.orderer, payload)
end
# Expect `ChunkedBase.task_done!` to be called twice per payload.
# By default, we'd do it once after every `consume!`
# But we'll add another one in the task that takes from the orderer.
# This will make sure that the current chunk won't ger recycled after our task is done with it.
function ChunkedBase.setup_tasks!(::MyConsumeContext, chunking_ctx::ChunkingContext, ntasks::Int)
set!(chunking_ctx.counter, 2*ntasks)
end
consume_ctx = MyConsumeContext(PayloadOrderer{MyResultBuffer, MyParsingContext}())
# Our task that needs to process the payloads in order
@spawn begin
while true
payload = take!(consume_ctx.orderer)
do_something_that_requires_ordered_results(payload)
task_done!(consume_ctx, payload.chunking_ctx)
end
end
parse_file_parallel(lexer, parsing_ctx, consume_ctx, chunking_ctx, result_buf)
```
NOTE: It is not safe to call `take!` from multiple tasks on a `PayloadOrderer`.
# See also:
- [`ParsedPayload`](@ref)
"""
mutable struct PayloadOrderer{B, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}}
queue::Channel{ParsedPayload{B,C}}
expected_row::Int
Expand Down
5 changes: 4 additions & 1 deletion src/read_and_lex.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ function handle_file_end!(lexer::Lexer, eols, end_pos)
end
end


# Fill the chunking_ctx.bytes buffer with bytes from the input and find newlines in it
# The trailing bytes between the last newline and the end of the buffer are copied to the
# beginning of the buffer and the rest of the buffer is refilled.
function read_and_lex!(lexer::Lexer, chunking_ctx::ChunkingContext, _last_newline_at=last_newline_at(chunking_ctx))
@assert !lexer.done

Expand All @@ -80,6 +82,7 @@ function read_and_lex!(lexer::Lexer, chunking_ctx::ChunkingContext, _last_newlin
end

# Separate initial read and lex function for package that sniff the file first (e.g. to detect newline character)
# This function should be paired with `initial_lex!` to properly initialize the chunking context
function initial_read!(io, chunking_ctx, skip_leading_whitespace=false)
# First ingestion of raw bytes from io
buf = chunking_ctx.bytes
Expand Down

0 comments on commit 8981ec9

Please sign in to comment.