Skip to content

Commit

Permalink
Pad the byte buffer to make SIMD parsing easier
Browse files Browse the repository at this point in the history
  • Loading branch information
Drvi committed Jun 24, 2024
1 parent a992d4c commit 86accb9
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 39 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ The package handles the ingestion of data chunks and the distribution & synchron

## Overview

The purpose of the package is to provide a framework for creating parsers for textual formats, where each record is separated by a newline character. Examples of such formats include CSV and JSONLines. The package is designed to operate on data chunks and to reuse preallocated buffers for each of these chunks. It simplifies the process of ingesting data, locating newlines, distributing work among multiple tasks, and synchronizing them. To implement a parsing package, one only needs to define how to *parse* the records and how to *consume* the parsed results.
The purpose of the package is to provide a framework for creating parsers for textual formats, where each record is separated by a newline character. Examples of such formats include CSV and JSONLines. The package is designed to operate on data chunks and to reuse preallocated buffers for each of these chunks. It simplifies the process of ingesting data, locating newlines, distributing work among multiple tasks, and synchronizing them. To implement a parsing package, one only needs to define how to *parse* the records and how to *consume* the parsed results.

Parsing refers to taking the raw input bytes and the position of newlines that will be available to the user, and using a package like `Parsers.jl` or `JSON3.jl`, or some custom parsing code to produce records of Julia types that could be stored in user-defined result buffers, which can be customized e.g for row-oriented or column-oriented storage.

Consuming refers to taking the parsed results and doing something with them, such as inserting them into a database, appending them to a `DataFrame`, or writing them to a file.

## How it works

Internally, `ChunkedBase.jl` uses a coordinator task and a set number of worker tasks to process data. The coordinator task is responsible for handling IO operations and acts as a coordinator for the worker tasks. It reads bytes from an input buffer and uses the [`NewlineLexers.jl`](https://github.com/JuliaData/NewlineLexers.jl) package to identify newlines in the data. Since the data is delimited by newlines, these newlines are used to split up the work among the worker tasks.
Internally, `ChunkedBase.jl` uses a coordinator task and a set number of worker tasks to process data. The coordinator task is responsible for handling IO operations and acts as a coordinator for the worker tasks. It reads bytes from an input buffer and uses the [`NewlineLexers.jl`](https://github.com/JuliaData/NewlineLexers.jl) package to identify newlines in the data. Since the data is delimited by newlines, these newlines are used to split up the work among the worker tasks.

The coordinator task alternates between two buffers, processing one while the workers process the other. This ensures that there is always data available for the workers to consume, which maximizes throughput across all tasks. We call this technique double-buffering.

Expand Down Expand Up @@ -128,7 +128,7 @@ julia> print_newlines(io, 64 * 1024, 4);
# [ Info: Newlines in chunk (id:(2, 2)): [36864, 40960, 45056, 49152, 53248, 57344]
# [ Info: Newlines in chunk (id:(2, 2)): [57344, 61440, 65536]
```
Behind the scenes, `ChunkedBase.jl` was using two 64KiB buffers, finding newlines in them, and splitting the found newlines among 4 tasks. We can see that each of the buffers (identified by the first number in the `id` tuple) was refilled two times (the refill number is the second element of the tuple).
Behind the scenes, `ChunkedBase.jl` was using two 64KiB buffers, finding newlines in them, and splitting the found newlines among 4 tasks. We can see that each of the buffers (identified by the first number in the `id` tuple) was refilled two times (the refill number is the second element of the tuple).
The way we set up our data, there should be one newline every 4KiB of input, so we'd expect 16 newlines per chunk, but we could see that there are 20 numbers reported per chunk -- this is because each newline segment we send to the tasks starts with the last newline position from the previous segment or 0 for the first segment, so we get 4 duplicated elements in this case.

## Advanced usage
Expand Down
19 changes: 12 additions & 7 deletions src/ChunkingContext.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This is to avoid overhead from too many task switches.
# TODO(#10): make this configurable and find a good default (the current 16 KiB is a guess)
const MIN_TASK_SIZE_IN_BYTES = 16 * 1024

const BUFFER_PADDING_BYTES = 64
_comment_to_bytes(x::AbstractString) = Vector{UInt8}(x)
_comment_to_bytes(x::Char) = _comment_to_bytes(ncodeunits(x) > 1 ? string(x) : UInt8(x))
_comment_to_bytes(x::UInt8) = [x]
Expand All @@ -24,17 +24,18 @@ The user can use this object to specify the size of the byte buffer(s) to alloca
number of worker tasks to spawn and the maximum number of rows to parse in `parse_file_parallel`.
# Arguments:
- `buffersize`: the size of the byte buffer to allocate .
- `buffersize`: the size of the byte buffer to allocate.
If the input is bigger than `buffersize`, a secondary `ChunkingContext` object will be used to
double-buffer the input, which will allocate a new buffer of the same size as `buffersize`.
The buffer will also have $BUFFER_PADDING_BYTES zeroed out padding bytes at the end to make SIMD parsing easier.
- `nworkers`: the number of worker tasks that should be spawned in `parse_file_parallel`.
- `limit`: the maximum number of rows to parse, see `limit_eols!`, by default no limit is set.
- `comment`: the comment prefix to skip, if any. By default no comment prefix is skipped.
# Notes:
- One can use the `id` and `buffer_refills` fields to uniquely identify a chunk of input.
The `id` field is necessary because we internally create a secondary `ChunkingContext` object, with
`id` equal to the `id` of the original `ChunkingContext` + 1.
The `id` field is necessary because we internally create a secondary `ChunkingContext` object, with
`id` equal to the `id` of the original `ChunkingContext` + 1.
- The `counter` field is used to synchronize the parser/consumer tasks.
- The `newline_positions` field is used to store the newline positions in the input.
- The `bytes` field is used to store the raw bytes ingested from the input.
Expand Down Expand Up @@ -63,11 +64,13 @@ function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer=
(0 < nworkers < 256) || throw(ArgumentError("`nworkers` argument must be larger than 0 and smaller than 256."))
(0 <= limit <= typemax(Int)) || throw(ArgumentError("`limit` argument must be positive and smaller than 9_223_372_036_854_775_808."))
# TRACING # clear_traces!(nworkers)
buf = Vector{UInt8}(undef, buffersize + BUFFER_PADDING_BYTES)
buf[end-BUFFER_PADDING_BYTES+1:end] .= UInt8(0)
return ChunkingContext(
1,
TaskCounter(),
BufferedVector{Int32}(Int32[0], 1),
Vector{UInt8}(undef, buffersize),
buf,
nworkers,
limit,
_comment_to_bytes(comment),
Expand All @@ -76,16 +79,18 @@ function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer=
end
# Convenience for double-buffering
function ChunkingContext(ctx::ChunkingContext)
buf = similar(ctx.bytes)
out = ChunkingContext(
ctx.id + 1,
TaskCounter(),
BufferedVector{Int32}(Vector{Int32}(undef, max(1, length(ctx.newline_positions))), 1),
similar(ctx.bytes),
buf,
ctx.nworkers,
ctx.limit,
ctx.comment,
Ref(0),
)
buf[end-BUFFER_PADDING_BYTES+1:end] .= UInt8(0)
out.newline_positions.elements[1] = 0
return out
end
Expand All @@ -109,7 +114,7 @@ function estimate_task_size(ctx::ChunkingContext)
length(eols) == 1 && return 1 # empty file
bytes_to_parse = last(eols)
rows = length(eols) # actually rows + 1
buffersize = length(ctx.bytes)
buffersize = length(ctx.bytes) - BUFFER_PADDING_BYTES
# There are `2*nworkers` result buffers total, but there are `nworkers` tasks per chunk
prorated_maxtasks = ceil(Int, tasks_per_chunk(ctx) * (bytes_to_parse / buffersize))
# Lower bound is 2 because length(eols) == 2 => 1 row
Expand Down
2 changes: 1 addition & 1 deletion src/exceptions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ struct NoValidRowsInBufferError <: FatalLexingError
return new(
string(
"Parsing job failed on lexing newlines. There was no valid newline in the ",
"entire buffer of $(Base.format_bytes(buffersize)). ",
"entire buffer of $(Base.format_bytes(buffersize - BUFFER_PADDING_BYTES)). ",
"This could happen if your buffer is too small, or if your quote or escape ",
"characters are not setup correctly.",
),
Expand Down
45 changes: 27 additions & 18 deletions src/parser_parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ function read_and_lex_task!(
_last_newline_at = last_newline_at(chunking_ctx)
# We need to copy the trailing bytes from the previous buffer to the next one
# so when we shift and refill in `read_and_lex!`, we can continue where we left off.
unsafe_copyto!(chunking_ctx_next.bytes, _last_newline_at, chunking_ctx.bytes, _last_newline_at, length(chunking_ctx.bytes) - _last_newline_at + 1)
unsafe_copyto!(
chunking_ctx_next.bytes,
_last_newline_at,
chunking_ctx.bytes,
_last_newline_at,
length(chunking_ctx.bytes) - _last_newline_at + 1 - BUFFER_PADDING_BYTES
)
read_and_lex!(lexer, chunking_ctx_next, _last_newline_at)
limit_eols!(chunking_ctx_next, row_num) && break
row_num = submit_lexed_rows!(parsing_queue, consume_ctx, chunking_ctx_next, row_num)
Expand All @@ -65,25 +71,27 @@ function process_and_consume_task(
::Type{CT} # compile time known data for parser
) where {CT}
# TRACING # trace = get_parser_task_trace(worker_id)

_comment = chunking_ctx.comment
try
@inbounds while true
(task_start, task_end, row_num, task_num, use_current_context) = take!(parsing_queue)
iszero(task_end) && break # zero is a signal to stop
# We prepared 2 * nworkers result buffers, as there are might 2 chunks in flight and
# since the user might provide their own consume! methods which won't block like the default
# consume!, not separating the result buffers per chunk could lead to data corruption if
# the results from the 2nd chunk are ready before the 1st chunk is consumed.
result_buf = result_buffers[task_num + (use_current_context ? 0 : tasks_per_chunk(chunking_ctx))]
# TRACING # push!(trace, time_ns())
ctx = ifelse(use_current_context, chunking_ctx, chunking_ctx_next)
# Defined by the library using ChunkedBase via overload on the specific AbstractResultBuffer and AbstractParsingContext
newline_segment = @view(ctx.newline_positions.elements[task_start:task_end])
populate_result_buffer!(result_buf, newline_segment, parsing_ctx, ctx.bytes, _comment, CT)
# Defined by the user via overload on consume_ctx
consume!(consume_ctx, ParsedPayload(row_num, Int(task_end - task_start), result_buf, parsing_ctx, ctx, task_start))
task_done!(consume_ctx, ctx)
# TRACING # push!(trace, time_ns())
while true
let (task_start, task_end, row_num, task_num, use_current_context) = take!(parsing_queue)::SubtaskMetadata
iszero(task_end) && break # zero is a signal to stop
# We prepared 2 * nworkers result buffers, as there are might 2 chunks in flight and
# since the user might provide their own consume! methods which won't block like the default
# consume!, not separating the result buffers per chunk could lead to data corruption if
# the results from the 2nd chunk are ready before the 1st chunk is consumed.
result_buf = result_buffers[task_num + (use_current_context ? 0 : tasks_per_chunk(chunking_ctx))]
# TRACING # push!(trace, time_ns())
ctx = ifelse(use_current_context, chunking_ctx, chunking_ctx_next)
# Defined by the library using ChunkedBase via overload on the specific AbstractResultBuffer and AbstractParsingContext
newline_segment = @view(ctx.newline_positions.elements[task_start:task_end])
populate_result_buffer!(result_buf, newline_segment, parsing_ctx, ctx.bytes, _comment, CT)
# Defined by the user via overload on consume_ctx
consume!(consume_ctx, ParsedPayload(row_num, Int(task_end - task_start), result_buf, parsing_ctx, ctx, task_start))
task_done!(consume_ctx, ctx)
# TRACING # push!(trace, time_ns())
end
end
catch e
ce = CapturedException(e, catch_backtrace())
Expand Down Expand Up @@ -155,6 +163,7 @@ function parse_file_parallel(
if lexer.done
chunking_ctx_next = chunking_ctx
else
# Allocate a new buffer if there is more data to read
chunking_ctx_next = ChunkingContext(chunking_ctx)
end
parser_tasks = sizehint!(Task[], chunking_ctx.nworkers)
Expand Down
8 changes: 4 additions & 4 deletions src/read_and_lex.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ end

function prepare_buffer!(io::IO, buf::Vector{UInt8}, last_newline_at::Int)
ptr = pointer(buf) # `buf` is rooted in ChunkingContext, so we shouldn't need to GC.@preserve it
buffersize = length(buf)
buffersize = length(buf) - BUFFER_PADDING_BYTES
@inbounds if last_newline_at == 0
# This is the first time we saw the buffer, we'll fill it up and skip leading BOM
bytes_read_in = readbytesall!(io, buf, buffersize)
Expand All @@ -25,7 +25,7 @@ function prepare_buffer!(io::IO, buf::Vector{UInt8}, last_newline_at::Int)
# We'll keep the bytes that are past the last newline, shifting them to the left
# and refill the rest of the buffer.
unsafe_copyto!(ptr, ptr + last_newline_at, buffersize - last_newline_at)
bytes_read_in = @inbounds readbytesall!(io, @view(buf[buffersize - last_newline_at + 1:end]), last_newline_at)
bytes_read_in = @inbounds readbytesall!(io, @view(buf[buffersize - last_newline_at + 1:buffersize]), last_newline_at)
else
# Last chunk was consumed entirely
bytes_read_in = readbytesall!(io, buf, buffersize)
Expand Down Expand Up @@ -70,7 +70,7 @@ function read_and_lex!(lexer::Lexer, chunking_ctx::ChunkingContext, _last_newlin
bytes_read_in = prepare_buffer!(lexer.io, chunking_ctx.bytes, _last_newline_at)
chunking_ctx.buffer_refills[] += 1

start_pos = _last_newline_at == 0 ? 1 : length(chunking_ctx.bytes) - _last_newline_at + 1
start_pos = _last_newline_at == 0 ? 1 : length(chunking_ctx.bytes) - BUFFER_PADDING_BYTES - _last_newline_at + 1
end_pos = start_pos + bytes_read_in - 1
# TRACING # push!(LEXER_TASK_TIMES, time_ns())
find_newlines!(lexer, chunking_ctx.bytes, chunking_ctx.newline_positions, start_pos, end_pos)
Expand All @@ -86,7 +86,7 @@ end
function initial_read!(io, chunking_ctx, skip_leading_whitespace=false)
# First ingestion of raw bytes from io
buf = chunking_ctx.bytes
buffersize = length(buf)
buffersize = length(buf) - BUFFER_PADDING_BYTES

# This is the first time we saw the buffer, we'll just fill it up
bytes_read_in = readbytesall!(io, buf, buffersize)
Expand Down
21 changes: 15 additions & 6 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ end
function _get_ctx(; last_newline_at, newlines_num, buffersize, nworkers)
eols = zeros(Int32, newlines_num)
eols[end] = last_newline_at
return ChunkingContext(1, ChunkedBase.TaskCounter(), BufferedVector(eols, newlines_num), zeros(UInt8, buffersize), nworkers, 0, nothing, Ref(0))
bytes = zeros(UInt8, buffersize + ChunkedBase.BUFFER_PADDING_BYTES)
return ChunkingContext(1, ChunkedBase.TaskCounter(), BufferedVector(eols, newlines_num), bytes, nworkers, 0, nothing, Ref(0))
end
# Empty input (only 0 as end of line) -> return 1
ctx = _get_ctx(; last_newline_at=0, newlines_num=1, buffersize=2*16*1024, nworkers=4)
Expand Down Expand Up @@ -409,14 +410,16 @@ end
end

@testset "prepare_buffer!" begin
buf = zeros(UInt8, 10)
_get_buffer(n) = zeros(UInt8, n + ChunkedBase.BUFFER_PADDING_BYTES)
buf = _get_buffer(10)
io = IOBuffer("xxx12")
skip(io, 3)
@test ChunkedBase.prepare_buffer!(io, buf, 10) == 2
@test buf[1] == UInt8('1')
@test buf[2] == UInt8('2')
@test all(buf[11:end] .== 0)

buf = zeros(UInt8, 10)
buf = _get_buffer(10)
buf[9] = 0x09
buf[10] = 0x0a
io = IOBuffer("xxx12")
Expand All @@ -426,26 +429,32 @@ end
@test buf[2] == 0x0a
@test buf[3] == UInt8('1')
@test buf[4] == UInt8('2')
@test all(buf[11:end] .== 0)

buf = zeros(UInt8, 1)

buf = _get_buffer(1)
io = IOBuffer("xxx12")
skip(io, 3)
@test ChunkedBase.prepare_buffer!(io, buf, 0) == 1
@test buf[1] == 0x31
@test all(buf[2:end] .== 0)


buf = zeros(UInt8, 2)
buf = _get_buffer(2)
io = IOBuffer("xxx12")
skip(io, 3)
@test ChunkedBase.prepare_buffer!(io, buf, 0) == 2
@test buf[1] == 0x31
@test buf[2] == 0x32
@test all(buf[3:end] .== 0)

buf = zeros(UInt8, 2)
buf = _get_buffer(2)
io = IOBuffer("xxx123")
skip(io, 3)
@test ChunkedBase.prepare_buffer!(io, buf, 0) == 2
@test buf[1] == 0x31
@test buf[2] == 0x32
@test all(buf[3:end] .== 0)
end

@testset "_isemptyrow" begin
Expand Down

0 comments on commit 86accb9

Please sign in to comment.