Skip to content

Commit

Permalink
Fix some typos
Browse files Browse the repository at this point in the history
  • Loading branch information
Drvi committed Nov 7, 2023
1 parent bea8154 commit b8bcd76
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 17 deletions.
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
# ChunkedBase.jl

The package handles ingestion of data chunks and the distribution & synchronization of work that happens on these chunks in parallel. It came to existence while refactoring the `ChunkedCSV.jl` and `ChunkedJSON.jl` packages and was designed to be extended by packages like these. It is a package used to write parser packages.
The package handles the ingestion of data chunks and the distribution & synchronization of work that happens on these chunks in parallel. It came into existence while refactoring the `ChunkedCSV.jl` and `ChunkedJSON.jl` packages and was designed to be extended by packages like these. It is a package used to write parser packages.

Specifically, `ChunkedBase.jl` spawns one task which handles IO and behaves as a coordinator, and a configurable number of worker tasks.
Both CSV and JSONL are textual formats which delimit records by newlines which makes newlines an ideal point to distribute work. One the coordinator task we ingest bytes into preallocated buffer and then use `NewlineLexers.jl` package to quickly find newlines in it. In turn these newlines are distributed among worker tasks and the coordinator immediately starts working a secondary buffer, while the first one is being processed by the workers.
Both CSV and JSONL are textual formats that delimit records by newlines which makes newlines an ideal point to distribute work. One the coordinator task we ingest bytes into a preallocated buffer and then use `NewlineLexers.jl` package to quickly find newlines in it. In turn, these newlines are distributed among worker tasks and the coordinator immediately starts working on a secondary buffer, while the first one is being processed by the workers.

The process looks something like this, with the coordinator task at the bottom:

| ![Diagram](/docs/diagrams/chunked_base.png) |
|:--:|
| *The coordinator synchronizes with workers using a counter behind a mutex (there is one per buffer). It splits the newlines into N segments, distributes them and increments the counter by N. After the coordinator distributes work, it starts to process the second chunk of data, while the first buffer is being worked on. There is handoff happening between the two buffers -- we need to copy the bytes after the last newline from the first buffer to the second. Each worker decrements after the `consume!` is done, and the coordinator will wait for the counter to reach 0 before it overwrites the buffer with new data.* |
| *The coordinator synchronizes with workers using a counter behind a mutex (there is one per buffer). It splits the newlines into N segments, distributes them, and increments the counter by N. After the coordinator distributes work, it starts to process the second chunk of data, while the first buffer is being worked on. There is a handoff happening between the two buffers -- we need to copy the bytes after the last newline from the first buffer to the second. Each worker decrements after the `consume!` is done, and the coordinator will wait for the counter to reach 0 before it overwrites the buffer with new data.* |

Packages like `ChunkedCSV.jl` and `ChunkedJSON.jl` hook into this structure by defining their own `populate_result_buffer!` methods that parse the records they were assigned into their custom `Result` buffers which are then handed to the `consume!` method (e.g. to be inserted to database).
Packages like `ChunkedCSV.jl` and `ChunkedJSON.jl` hook into this structure by defining their own `populate_result_buffer!` methods that parse the records they were assigned into their custom `Result` buffers which are then handed to the `consume!` method (e.g. to be inserted into a database).

The main entry point of this package is the `parse_file_parallel` function, which accepts a number of "context" arguments, each controlling different aspect of the process:
The main entry point of this package is the `parse_file_parallel` function, which accepts several "context" arguments, each controlling a different aspect of the process:
```julia
function parse_file_parallel(
lexer::NewlineLexers.Lexer,
Expand All @@ -27,8 +27,8 @@ function parse_file_parallel(
Let's break it down:
* `lexer` controls how we find newlines in the ingested chunks of data. Newlines serve as record delimiters and knowing their positions allows us to split work safely among multiple workers, which are spawned internally. `Lexer`s are defined in the `NewlineLexers.jl` package.
* `parsing_ctx` controls how we parse the data. It allows the user to dispatch on custom `populate_result_buffer!` overload and to forward configurations to it. `populate_result_buffer!` is where we take the records identified by the `lexer` and parse them into `result_buffers`.
* `consume_ctx` controls how the parsed results are consumed (e.g. inserted them into a database, appended to a `DataFrame`...). `consume_ctx` allows the user to dispatch on their `consume!` method and hold any state necessary for consuming. This happens immediately after `populate_result_buffer!`.
* `chunking_ctx` controls how the work on individual chunks of data is scheduled. It contains buffers for input bytes and found newlines. Through this struct the user controls the size of the chunks and number spawned tasks that carry out the parsing and consuming. If there is enough data in the input, a secondary `chunking_ctx` is created internally to facilitate the double-buffering described above.
* `consume_ctx` controls how the parsed results are consumed (e.g. inserted into a database, appended to a `DataFrame`...). `consume_ctx` allows the user to dispatch on their `consume!` method and hold any state necessary for consumption. This happens immediately after `populate_result_buffer!`.
* `chunking_ctx` controls how the work on individual chunks of data is scheduled. It contains buffers for input bytes and found newlines. Through this struct the user controls the size of the chunks and the number of spawned tasks that carry out the parsing and consuming. If there is enough data in the input, a secondary `chunking_ctx` is created internally to facilitate the double-buffering described above.
* `result_buffers` controls in which format the results are stored. These result buffers hold results from `populate_result_buffer!` and are passed to `consume!`. This allows the user to have multiple result formats for the with `parsing_ctx` e.g. row oriented vs column oriented buffers.

See the docstring of `populate_result_buffer!` and `consume!` for more information about how to integrate with them.
Expand Down Expand Up @@ -117,6 +117,6 @@ julia> print_newlines(io, 64*1024, 4);
# [ Info: Newlines in chunk (id:(2, 2)): [36864, 40960, 45056, 49152, 53248, 57344]
# [ Info: Newlines in chunk (id:(2, 2)): [57344, 61440, 65536]
```
Behind the scenes, `ChunkedBase.jl` was using two 64KiB buffers, finding newlines in them, and splitting the found newlines among 4 tasks. We can see that each of the buffers (identified by the first number in `id` tuple) was refilled two times (refill number is the second element of the tuple).
The way we setup our data, there should be one newline every 4KiB of input, so we'd expect 16 newlines per chunk, but we could see that there are 20 numbers reported per chunk -- this is because the first element of the newline segment we send to the tasks is either 0 or the end of the previous sub-chunk, so we get 4 duplicated elements.
Behind the scenes, `ChunkedBase.jl` was using two 64KiB buffers, finding newlines in them, and splitting the found newlines among 4 tasks. We can see that each of the buffers (identified by the first number in the `id` tuple) was refilled two times (the refill number is the second element of the tuple).
The way we set up our data, there should be one newline every 4KiB of input, so we'd expect 16 newlines per chunk, but we could see that there are 20 numbers reported per chunk -- this is because the first element of the newline segment we send to the tasks is either 0 or the end of the previous sub-chunk, so we get 4 duplicated elements.

2 changes: 1 addition & 1 deletion src/ChunkedBase.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ using SentinelArrays.BufferedVectors
# The means through which the user can provide their own parsing logic.
include("ParsingContexts.jl")

# A counter based synchronization primitive used to coordinate the parsing/consuming tasks.
# A counter-based synchronization primitive that is used to coordinate the parsing/consuming tasks.
include("TaskCounters.jl")
using .TaskCounters

Expand Down
8 changes: 4 additions & 4 deletions src/ParsingContexts.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,24 @@
::Type{CT}=Tuple{}
) where {CT}
Override with your `AbstractParsingContext` to provide a custom logic for parsing the input bytes
Override with your `AbstractParsingContext` to provide custom logic for parsing the input bytes
in `parsing_ctx.bytes` between the newline positions in `newline_segment` into `result_buf`.
The method is called from multiple tasks in parallel, each having a different `newline_segment`,
some sharing the same `parsing_ctx.bytes`. The `result_buf` is only accessed by one task at a time.
# Arguments:
* `result_buf`: a user-provided object which is meant to store the parsing results from this function
* `newline_segment`: a vector of newline positions in `bytes` which delimit the rows of the input.
* `parsing_ctx`: a user-provided object which is used to dispatch to this method and carry parsing specific config
* `parsing_ctx`: a user-provided object that is used to dispatch to this method and carry parsing specific config
* `bytes`: the raw bytes ingested from the input
* `comment`: the comment prefix to skip, if any
* `CT`: an optional, compile-time known object which was passed to `parse_file_parallel` / `parse_file_serial`
# Notes:
Each consecutive pair of `newline_segment` values defines and exclusive range of bytes in `bytes` which
Each consecutive pair of `newline_segment` values defines an exclusive range of bytes in `bytes` which
constitutes a single row.
The range needs to be treated as exclusive because we add a fictional newline at the beginning at the chunk
The range needs to be treated as exclusive because we add a fictional newline at the beginning of the chunk
at position 0 and past the end of the file if it doesn't end on a newline.
A safe way of processing each row would be e.g.:
Expand Down
4 changes: 2 additions & 2 deletions src/parser_parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ end
type of the lexer affects whether the search is quote-aware or not.
* `parsing_ctx`: a user-provided object which is passed to `populate_result_buffer!`
* `consume_ctx`: a user-provided object which is passed to `consume!`
* `chunking_ctx`: an internal object which is used to keep track of the current chunk of data being processed
* `chunking_ctx`: an internal object that is used to keep track of the current chunk of data being processed
* `result_buffers`: a vector of user-provided objects which are used to store the parsed results
* `CT`: an optional, compile-time known type which is passed to `populate_result_buffer!`
Expand All @@ -128,7 +128,7 @@ end
# Notes:
* The `chunking_ctx` is assumed to be filled with data whose newline positions are already detected, e.g.
by calling `read_and_lex!` with the `lexer` object on it.
* If the input is bigger than `chunking_ctx.bytes`, secondary `chunking_ctx` object will be used to
* If the input is bigger than `chunking_ctx.bytes`, a secondary `chunking_ctx` object will be used to
double-buffer the input, which will allocate a new buffer of the same size as `chunking_ctx.bytes`.
* This function spawns `chunking_ctx.nworkers` + 1 tasks.
Expand Down
2 changes: 1 addition & 1 deletion src/read_and_lex_utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ function _hasBOM(bytes::Vector{UInt8})
return @inbounds bytes[1] == 0xef && bytes[2] == 0xbb && bytes[3] == 0xbf
end

# Either we were looking for `\n` as a newline and then an empty row has
# Either we were looking for `\n` as a newline char and then an empty row has
# two newline positions next to each other or there is a single byte between them
# and it's `\r`. Or we were looking for `\r` in which case we only consider two
# neighboring `\r` as an empty row.
Expand Down

0 comments on commit b8bcd76

Please sign in to comment.