Skip to content

Commit

Permalink
Tweak code organization, add comments and docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
Drvi committed Nov 6, 2023
1 parent 77901b8 commit 1406a84
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 118 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ function parse_file_parallel(
) where {CT}
```
Let's break it down:
* `lexer`s responsibility is finding newlines in the ingested chunks of data. Newlines serve as record delimiters and knowing their positions allows us to split work safely among multiple workers, which are spawned internally.
* `parsing_ctx` provides a way for the user to dispatch on custom `populate_result_buffer!` overload and to forward configurations to it. `populate_result_buffer!` is where we take the records identified by the `lexer` and parse them into `result_buffers`.
* `consume_ctx` after the data has been parsed, we can "consume" it (e.g. insert them into a database), `consume_ctx` allows the user to dispatch on their `consume!` method and hold any necessary state.
* `chunking_ctx` is used to facilitate IO and work synchronization. Through this struct the user controls the size of the chunks and number spawned tasks that carry out the parsing and consuming. If there is enough data in the input, a secondary `chunking_ctx` is created internally to facilitate the double-buffering described above.
* `result_buffers` an array of buffers results from `populate_result_buffer!` and that are passed to `consume!`. This allows the user to have multiple result formats for the with `parsing_ctx` e.g. row oriented vs column oriented buffers.
* `lexer` controls how we find newlines in the ingested chunks of data. Newlines serve as record delimiters and knowing their positions allows us to split work safely among multiple workers, which are spawned internally. `Lexer`s are defined in the `NewlineLexers.jl` package.
* `parsing_ctx` controls how we parse the data. It allows the user to dispatch on custom `populate_result_buffer!` overload and to forward configurations to it. `populate_result_buffer!` is where we take the records identified by the `lexer` and parse them into `result_buffers`.
* `consume_ctx` controls how the parsed results are consumed (e.g. inserted them into a database, appended to a `DataFrame`...). `consume_ctx` allows the user to dispatch on their `consume!` method and hold any state necessary for consuming. This happens immediately after `populate_result_buffer!`.
* `chunking_ctx` controls how the work on individual chunks of data is scheduled. It contains buffers for input bytes and found newlines. Through this struct the user controls the size of the chunks and number spawned tasks that carry out the parsing and consuming. If there is enough data in the input, a secondary `chunking_ctx` is created internally to facilitate the double-buffering described above.
* `result_buffers` controls in which format the results are stored. These result buffers hold results from `populate_result_buffer!` and are passed to `consume!`. This allows the user to have multiple result formats for the with `parsing_ctx` e.g. row oriented vs column oriented buffers.

See the docstring of `populate_result_buffer!` and `consume!` for more information about how to integrate with them.

Expand Down Expand Up @@ -102,7 +102,7 @@ end
Let's run in on some data:
```julia
julia> io = IOBuffer((("x" ^ 4095) * "\n") ^ 64); # 256KiB
julia> print_newlines(io, 2^16, 4);
julia> print_newlines(io, 64*1024, 4);
# [ Info: Newlines in chunk (id:(1, 1)): [16384, 20480, 24576, 28672, 32768, 36864]
# [ Info: Newlines in chunk (id:(1, 1)): [0, 4096, 8192, 12288, 16384]
# [ Info: Newlines in chunk (id:(1, 1)): [36864, 40960, 45056, 49152, 53248, 57344]
Expand Down
Binary file added docs/diagrams/chunked_base.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
114 changes: 29 additions & 85 deletions src/ChunkedBase.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,103 +6,39 @@ using CodecZlibNG
using NewlineLexers
using SentinelArrays.BufferedVectors

const MIN_TASK_SIZE_IN_BYTES = 16 * 1024
# The means through which the user can provide their own parsing logic.
include("ParsingContexts.jl")

# populate_result_buffer!(result_buf::AbstractResultBuffer, newlines::AbstractVector{Int32}, parsing_ctx::AbstractParsingContext, comment::Union{Nothing,Vector{UInt8}}=nothing, ::Type{CT}=Tuple{}) where {CT}
function populate_result_buffer! end

abstract type AbstractParsingContext end

# Synchronization mechanism -- after we lexed all rows, we split them in N tasks and TaskCounter
# in ChunkingContext will block the io/lexer to overwrite the current chunk unless workers
# report back N times that they are done with their tasks.
# A counter based synchronization primitive used to coordinate the parsing/consuming tasks.
include("TaskCounters.jl")
using .TaskCounters

_comment_to_bytes(x::AbstractString) = Vector{UInt8}(x)
_comment_to_bytes(x::Char) = _comment_to_bytes(ncodeunits(x) > 1 ? string(x) : UInt8(x))
_comment_to_bytes(x::UInt8) = [x]
_comment_to_bytes(x::Vector{UInt8}) = x
_comment_to_bytes(::Nothing) = nothing
struct ChunkingContext
id::Int
counter::TaskCounter
newline_positions::BufferedVector{Int32}
bytes::Vector{UInt8}
nworkers::Int
limit::Int
comment::Union{Nothing,Vector{UInt8}}
# combination on `id` and `buffer_refills` can be used to detect if any pointers to `bytes` are still valid
buffer_refills::Base.RefValue{Int}
end
function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer, comment::Union{Nothing,UInt8,String,Char,Vector{UInt8}})
(4 <= buffersize <= typemax(Int32)) || throw(ArgumentError("`buffersize` argument must be larger than 4 and smaller than 2_147_483_648 bytes."))
(0 < nworkers < 256) || throw(ArgumentError("`nworkers` argument must be larger than 0 and smaller than 256."))
(0 <= limit <= typemax(Int)) || throw(ArgumentError("`limit` argument must be positive and smaller than 9_223_372_036_854_775_808."))
# TRACING # clear_traces!(nworkers)
return ChunkingContext(
1,
TaskCounter(),
BufferedVector{Int32}(Int32[0], 1),
Vector{UInt8}(undef, buffersize),
nworkers,
limit,
_comment_to_bytes(comment),
Ref(0),
)
end
function ChunkingContext(ctx::ChunkingContext)
out = ChunkingContext(
ctx.id + 1,
TaskCounter(),
BufferedVector{Int32}(Vector{Int32}(undef, max(1, length(ctx.newline_positions))), 1),
similar(ctx.bytes),
ctx.nworkers,
ctx.limit,
ctx.comment,
Ref(0),
)
out.newline_positions.elements[1] = 0
return out
end
tasks_per_chunk(ctx::ChunkingContext) = ctx.nworkers
total_result_buffers_count(ctx::ChunkingContext) = 2tasks_per_chunk(ctx)
last_newline_at(ctx::ChunkingContext) = Int(last(ctx.newline_positions))
function should_use_parallel(ctx::ChunkingContext, _force)
return !(
_force === :serial ||
((_force !== :parallel) && (Threads.nthreads() == 1 || ctx.nworkers == 1 || last_newline_at(ctx) < MIN_TASK_SIZE_IN_BYTES))
)
end

# We split the detected newlines equally among thr nworkers parsing tasks, but each
# unit of work should contain at least 16 KiB of raw bytes (MIN_TASK_SIZE_IN_BYTES).
function estimate_task_size(ctx::ChunkingContext)
eols = ctx.newline_positions
length(eols) == 1 && return 1 # empty file
bytes_to_parse = last(eols)
rows = length(eols) # actually rows + 1
buffersize = length(ctx.bytes)
# There are 2*nworkers result buffers total, but there are nworkers tasks per chunk
prorated_maxtasks = ceil(Int, tasks_per_chunk(ctx) * (bytes_to_parse / buffersize))
# Lower bound is 2 because length(eols) == 2 => 1 row
# bump min rows if average row is much smaller than MIN_TASK_SIZE_IN_BYTES
min_rows = max(2, cld(MIN_TASK_SIZE_IN_BYTES, cld(bytes_to_parse, rows)))
return min(max(min_rows, cld(rows, prorated_maxtasks)), rows)
end

abstract type AbstractResultBuffer end
# The `ChunkingContext` is used to keep track of the current chunk of data being processed
# and to coordinate with the parsing/consuming tasks. Two `ChunkingContext` objects are used
# to double-buffer the input.
include("ChunkingContext.jl")

# The `ParsedPayload` is used to pass the results of parsing to the `consume!` method. All relevant
# information about the current chunk of data being processed is passed to `consume!` via `ParsedPayload`.
# In case the parsed results need to be consumed in order, the `PayloadOrderer` can be used to
# sort the payloads.
include("payload.jl")

# Consuming is the means through which the parsed results are used by the end user.
include("ConsumeContexts.jl")
using .ConsumeContexts

# By lexing the input in advance we can detect that we got to an inconsistent state
# (e.g. an unmatched quote at the end of the file) in which case we throw one of these exceptions.
include("exceptions.jl")

# Utilities for the coordinator task to handle the input and the newline positions.
include("read_and_lex_utils.jl")
include("read_and_lex.jl")
include("ConsumeContexts.jl")
using .ConsumeContexts

include("parser_serial.jl")
# The main entrypoints of the library.
include("parser_parallel.jl")
include("parser_serial.jl")

export ChunkingContext, tasks_per_chunk, total_result_buffers_count
export AbstractParsingContext
Expand All @@ -114,6 +50,14 @@ export SkipContext
export Lexer
export parse_file_serial, parse_file_parallel, populate_result_buffer!


# By uncommenting all `# TRACING #` in the package, e.g. by turning them to `#= ... =#`,
# you'll enable low-overhead tracing capability.
# Before parsing, call `clear_traces!` to reset the traces, then call
# include("_tracing.jl") # once
# plot_traces() # to plot the traces
# TODO: Port this over to be macro-based, with plotting being provided by a package extension

# TRACING # const PARSER_TASKS_TIMES = [UInt[]]
# TRACING # const CONSUMER_TASKS_TIMES = [UInt[]]
# TRACING # const IO_TASK_TIMES = UInt[]
Expand Down
82 changes: 82 additions & 0 deletions src/ChunkingContext.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# When splitting the work among multiple tasks, each task should have at least this many bytes of input
# This is to avoid having too many tasks with too little work to do.
# TODO: make this configurable and find a good default (the current 16 KiB is a guess)
const MIN_TASK_SIZE_IN_BYTES = 16 * 1024

_comment_to_bytes(x::AbstractString) = Vector{UInt8}(x)
_comment_to_bytes(x::Char) = _comment_to_bytes(ncodeunits(x) > 1 ? string(x) : UInt8(x))
_comment_to_bytes(x::UInt8) = [x]
_comment_to_bytes(x::Vector{UInt8}) = x
_comment_to_bytes(::Nothing) = nothing

# Holds a byte buffer and newline positions for a single chunk of the input file.
# The newline positions are used to split the chunk into tasks for parallel parsing.
struct ChunkingContext
id::Int # id of the chunking context (1 or 2)
counter::TaskCounter # synchronization mechanism to coordinate parsing
newline_positions::BufferedVector{Int32} # positions of newlines in the bytes
bytes::Vector{UInt8} # raw bytes ingested from the input
nworkers::Int # number of worker tasks
limit::Int # maximum number of rows to parse, see `limit_eols!`
# byte prefix to skip, used in `skip_rows_init!` and handed to `populate_result_buffer!`
# for user to handle with consistently (`_startswith` could be used to do the check)
comment::Union{Nothing,Vector{UInt8}}
# number of times we refilled the buffer, can be combined with `id` to uniquely identify a chunk
buffer_refills::Base.RefValue{Int}
end
function ChunkingContext(buffersize::Integer, nworkers::Integer, limit::Integer, comment::Union{Nothing,UInt8,String,Char,Vector{UInt8}})
(4 <= buffersize <= typemax(Int32)) || throw(ArgumentError("`buffersize` argument must be larger than 4 and smaller than 2_147_483_648 bytes."))
(0 < nworkers < 256) || throw(ArgumentError("`nworkers` argument must be larger than 0 and smaller than 256."))
(0 <= limit <= typemax(Int)) || throw(ArgumentError("`limit` argument must be positive and smaller than 9_223_372_036_854_775_808."))
# TRACING # clear_traces!(nworkers)
return ChunkingContext(
1,
TaskCounter(),
BufferedVector{Int32}(Int32[0], 1),
Vector{UInt8}(undef, buffersize),
nworkers,
limit,
_comment_to_bytes(comment),
Ref(0),
)
end
# Convenience for double-buffering
function ChunkingContext(ctx::ChunkingContext)
out = ChunkingContext(
ctx.id + 1,
TaskCounter(),
BufferedVector{Int32}(Vector{Int32}(undef, max(1, length(ctx.newline_positions))), 1),
similar(ctx.bytes),
ctx.nworkers,
ctx.limit,
ctx.comment,
Ref(0),
)
out.newline_positions.elements[1] = 0
return out
end
tasks_per_chunk(ctx::ChunkingContext) = ctx.nworkers
total_result_buffers_count(ctx::ChunkingContext) = 2tasks_per_chunk(ctx)
last_newline_at(ctx::ChunkingContext) = Int(last(ctx.newline_positions))
function should_use_parallel(ctx::ChunkingContext, _force)
return !(

Check warning on line 62 in src/ChunkingContext.jl

View check run for this annotation

Codecov / codecov/patch

src/ChunkingContext.jl#L61-L62

Added lines #L61 - L62 were not covered by tests
_force === :serial ||
((_force !== :parallel) && (Threads.nthreads() == 1 || ctx.nworkers == 1 || last_newline_at(ctx) < MIN_TASK_SIZE_IN_BYTES))
)
end

# We split the detected newlines equally among thr nworkers parsing tasks, but each
# unit of work should contain at least 16 KiB of raw bytes (MIN_TASK_SIZE_IN_BYTES).
function estimate_task_size(ctx::ChunkingContext)
eols = ctx.newline_positions
length(eols) == 1 && return 1 # empty file
bytes_to_parse = last(eols)
rows = length(eols) # actually rows + 1
buffersize = length(ctx.bytes)
# There are 2*nworkers result buffers total, but there are nworkers tasks per chunk
prorated_maxtasks = ceil(Int, tasks_per_chunk(ctx) * (bytes_to_parse / buffersize))
# Lower bound is 2 because length(eols) == 2 => 1 row
# bump min rows if average row is much smaller than MIN_TASK_SIZE_IN_BYTES
min_rows = max(2, cld(MIN_TASK_SIZE_IN_BYTES, cld(bytes_to_parse, rows)))
return min(max(min_rows, cld(rows, prorated_maxtasks)), rows)
end
15 changes: 9 additions & 6 deletions src/ConsumeContexts.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ using ..ChunkedBase: set!, dec!
export AbstractConsumeContext, SkipContext
export setup_tasks!, consume!, task_done!, sync_tasks, cleanup

# End users should subtype this to create custom consume contexts which are then
# used in `parse_file_parallel` and `parse_file_serial`, to dispatch on their
# `populate_result_buffer!` method.
abstract type AbstractConsumeContext end

"""
consume!(consume_ctx::AbstractConsumeContext, payload::ParsedPayload{<:AbstractResultBuffer, <:AbstractParsingContext})
Override with your `AbstractConsumeContext` to provide a custom logic for processing the parsed results in `AbstractResultBuffer`.
The method is called from multiple tasks in parallel, just after each corresponding `task_buf` has been populated.
`task_buf` is only filled once per chunk.
`task_buf` is only filled once per chunk and is only accessed by one task at a time.
See also [`consume!`](@ref), [`setup_tasks!`](@ref), [`setup_tasks!`](@ref), [`cleanup`](@ref), [`AbstractResultBuffer`](@ref)
"""
Expand All @@ -34,11 +37,11 @@ is considered to be entirely processed.
This function is called just after the we're done detecting newline positions in the current
chunk of data and we are about to submit partitions of the detected newlines to the parse/consume tasks.
`ntasks` is between 1 and two times the `nworkers` agrument to `parse_file`, depeneding on
the size of the input. Most of the time, the value is `2*nworkers` is used, but for smaller
buffer sizes, smaller files or when handling the last bytes of the file, `ntasks` will be
smaller as we try to ensure the minimal average tasks size if terms of bytes of input is at least
$(Base.format_bytes(MIN_TASK_SIZE_IN_BYTES)). For `:serial` parsing mode, `ntasks` is always 1.
`ntasks` is between 1 and `nworkers` argument to `parse_file`, depending on the size of the input.
Most of the time, the value is `nworkers` is used, but for smaller buffer sizes, smaller files or
when handling the last bytes of the file, `ntasks` will be smaller as we try to ensure the minimal
average tasks size if terms of bytes of input is at least $(Base.format_bytes(MIN_TASK_SIZE_IN_BYTES)).
For `:serial` parsing mode, `ntasks` is always 1.
You should override this method when you further subdivide the amount of concurrent work on the chunk,
e.g. when you want to process each column separately in `@spawn` tasks, in which case you'd expect
Expand Down
52 changes: 52 additions & 0 deletions src/ParsingContexts.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""
populate_result_buffer!(
result_buf::AbstractResultBuffer,
newline_segment:AbstractVector{Int32},
parsing_ctx::AbstractParsingContext,
bytes::Vector{UInt8},
comment::Union{Nothing,Vector{UInt8}}=nothing,
::Type{CT}=Tuple{}
) where {CT}
Override with your `AbstractParsingContext` to provide a custom logic for parsing the input bytes
in `parsing_ctx.bytes` between the newline positions in `newline_segment` into `result_buf`.
The method is called from multiple tasks in parallel, each having a different `newline_segment`,
some sharing the same `parsing_ctx.bytes`. The `result_buf` is only accessed by one task at a time.
# Arguments:
* `result_buf`: a user-provided object which is meant to store the parsing results from this function
* `newline_segment`: a vector of newline positions in `bytes` which delimit the rows of the input.
* `parsing_ctx`: a user-provided object which is used to dispatch to this method and carry parsing specific config
* `bytes`: the raw bytes ingested from the input
* `comment`: the comment prefix to skip, if any
* `CT`: an optional, compile-time known object which was passed to `parse_file_parallel` / `parse_file_serial`
# Notes:
Each consecutive pair of `newline_segment` values defines and exclusive range of bytes in `bytes` which
constitutes a single row.
The range needs to be treated as exclusive because we add a fictional newline at the beginning at the chunk
at position 0 and past the end of the file if it doesn't end on a newline.
A safe way of processing each row would be e.g.:
```
start_index = first(newline_segment)
for i in 2:length(newline_segment)
end_index = newline_segment[i]
row_bytes = view(bytes, start_index+1:end_index-1) # +/- 1 is needed!
# ... actually populate the result_buf
start_index = end_index
end
```
"""
function populate_result_buffer! end

# Users should subtype this to create custom parsing contexts variables which are
# then used in `parse_file_parallel` / `parse_file_serial`, to dispatch on their
# `populate_result_buffer!` method.
abstract type AbstractParsingContext end
# Users should subtype this to create custom result buffer objects to store the
# parsed results in `populate_result_buffer!`.
abstract type AbstractResultBuffer end
13 changes: 7 additions & 6 deletions src/_traces.jl
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
using GLMakie, ChunkedBase

function plot_traces()
t1 = copy(ChunkedBase.T1)
t2 = copy(ChunkedBase.T2)
io_task = copy(ChunkedBase.IO_TASK_TIMES)
lexer_task = copy(ChunkedBase.LEXER_TASK_TIMES)
parser_tasks = filter(x->length(x)>0, ChunkedBase.PARSER_TASKS_TIMES)
consume_tasks = filter(x->length(x)>0, ChunkedBase.CONSUMER_TASKS_TIMES)
t1 = copy(ChunkedBase.T1) # total parse/consume time for the first byte buffer by all workers
t2 = copy(ChunkedBase.T2) # total parse/consume time for the second byte buffer by all workers
io_task = copy(ChunkedBase.IO_TASK_TIMES) # time spent in IO
lexer_task = copy(ChunkedBase.LEXER_TASK_TIMES) # time spent in lexer
parser_tasks = filter(x->length(x)>0, ChunkedBase.PARSER_TASKS_TIMES) # individual parse/consume times for each worker
consume_tasks = filter(x->length(x)>0, ChunkedBase.CONSUMER_TASKS_TIMES) # optional extensions for consumes that spawn tasks

Check warning on line 9 in src/_traces.jl

View check run for this annotation

Codecov / codecov/patch

src/_traces.jl#L4-L9

Added lines #L4 - L9 were not covered by tests

start = Int(mapreduce(first, min, parser_tasks, init=min(io_task[1], lexer_task[1])))

# convert to seconds, subtract start time
lexer_timing = map(x->(x - start) / (1e9), lexer_task)
io_timing = map(x->(x - start) / (1e9), io_task)
pa_timings = map.(x->(x - start) / (1e9), parser_tasks)
Expand Down
Loading

0 comments on commit 1406a84

Please sign in to comment.