Skip to content

Commit

Permalink
More PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Drvi committed Nov 10, 2023
1 parent f9e4a7a commit 17e7634
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 55 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function parse_file_parallel(
parsing_ctx::AbstractParsingContext, # user-defined
consume_ctx::AbstractConsumeContext, # user-defined
chunking_ctx::ChunkingContext,
result_buffers::Vector{<:AbstractResultBuffer}, #user-defined
result_buffers::Vector{<:AbstractResultBuffer}, # user-defined
::Type{CT}=Tuple{} # ignore this for now
) where {CT}
```
Expand All @@ -40,7 +40,7 @@ Let's break it down:
* `parsing_ctx` controls how we parse the data. It allows the user to dispatch on custom `populate_result_buffer!` overload and to forward configurations to it. `populate_result_buffer!` is where we take the records identified by the `lexer` and parse them into `result_buffers`.
* `consume_ctx` controls how the parsed results are consumed (e.g. inserted into a database, appended to a `DataFrame`...). `consume_ctx` allows the user to dispatch on their `consume!` method and hold any state necessary for consumption. This happens immediately after `populate_result_buffer!`.
* `chunking_ctx` controls how the work on individual chunks of data is scheduled. It contains buffers for input bytes and found newlines. Through this struct the user controls the size of the chunks and the number of spawned tasks that carry out the parsing and consuming. If there is enough data in the input, a secondary `chunking_ctx` is created internally to facilitate the double-buffering described above.
* `result_buffers` controls in which format the results are stored. These result buffers hold results from `populate_result_buffer!` and are passed to `consume!`. This allows the user to have multiple result formats for the with `parsing_ctx` e.g. row oriented vs column oriented buffers.
* `result_buffers` controls in which format the results are stored. These result buffers hold results from `populate_result_buffer!` and are passed to `consume!`. This allows the user to have multiple result formats for the same `parsing_ctx` e.g. row-oriented vs column-oriented buffers.

There is also `parse_file_serial` which doesn't spawn any tasks and just calls `populate_result_buffer!` and `consume!` sequentially without double-buffering. This can be useful for debugging or for small files.

Expand Down
1 change: 1 addition & 0 deletions src/ConsumeContexts.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export setup_tasks!, consume!, task_done!, sync_tasks, cleanup

"""
AbstractConsumeContext
End users should subtype this to create custom consume contexts which are then
used in `parse_file_parallel` and `parse_file_serial`, to dispatch on their
`populate_result_buffer!` method.
Expand Down
52 changes: 26 additions & 26 deletions src/ParsingContexts.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,38 @@
::Type{CT}=Tuple{}
) where {CT}
Override with your `AbstractParsingContext` to provide custom logic for parsing the input bytes
in `parsing_ctx.bytes` between the newline positions in `newline_segment` into `result_buf`.
The method is called from multiple tasks in parallel, each having a different `newline_segment`,
some sharing the same `parsing_ctx.bytes`. The `result_buf` is only accessed by one task at a time.
Override with your `AbstractParsingContext` to provide custom logic for parsing the input bytes
in `parsing_ctx.bytes` between the newline positions in `newline_segment` into `result_buf`.
The method is called from multiple tasks in parallel, each having a different `newline_segment`,
some sharing the same `parsing_ctx.bytes`. The `result_buf` is only accessed by one task at a time.
# Arguments:
* `result_buf`: a user-provided object which is meant to store the parsing results from this function
* `newline_segment`: a vector of newline positions in `bytes` which delimit the rows of the input.
* `parsing_ctx`: a user-provided object that is used to dispatch to this method and carry parsing specific config
* `bytes`: the raw bytes ingested from the input
* `comment`: the comment prefix to skip, if any
* `CT`: an optional, compile-time known object which was passed to `parse_file_parallel` / `parse_file_serial`
# Arguments:
* `result_buf`: a user-provided object which is meant to store the parsing results from this function
* `newline_segment`: a vector of newline positions in `bytes` which delimit the rows of the input.
* `parsing_ctx`: a user-provided object that is used to dispatch to this method and carry parsing specific config
* `bytes`: the raw bytes ingested from the input
* `comment`: the comment prefix to skip, if any
* `CT`: an optional, compile-time known object which was passed to `parse_file_parallel` / `parse_file_serial`
# Notes:
Each consecutive pair of `newline_segment` values defines an exclusive range of bytes in `bytes` which
constitutes a single row.
# Notes:
Each consecutive pair of `newline_segment` values defines an exclusive range of bytes in `bytes` which
constitutes a single row.
The range needs to be treated as exclusive because we add a fictional newline at the beginning of the chunk
at position 0 and past the end of the file if it doesn't end on a newline.
A safe way of processing each row would be e.g.:
The range needs to be treated as exclusive because we add a fictional newline at the beginning of the chunk
at position 0 and past the end of the file if it doesn't end on a newline.
A safe way of processing each row would be e.g.:
```
start_index = first(newline_segment)
for i in 2:length(newline_segment)
end_index = newline_segment[i]
row_bytes = view(bytes, start_index+1:end_index-1) # +/- 1 is needed!
```
start_index = first(newline_segment)
for i in 2:length(newline_segment)
end_index = newline_segment[i]
row_bytes = view(bytes, start_index+1:end_index-1) # +/- 1 is needed!
# ... actually populate the result_buf
# ... actually populate the result_buf
start_index = end_index
end
```
start_index = end_index
end
```
"""
function populate_result_buffer! end
"""
Expand Down
50 changes: 26 additions & 24 deletions src/parser_parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -105,34 +105,36 @@ end
::Type{CT}=Tuple{}
) where {CT} -> Nothing
Parse the file in `lexer.io` in parallel using `chunking_ctx.nworkers` tasks. User must provide
a `populate_result_buffer!` method which is used to parse ingested data in `chunking_ctx.bytes`, using the
newline positions in `chunking_ctx.newline_positions` as row boundaries into the `result_buffers`.
The `consume!` method is called after each `populate_result_buffer!` call, so the user can process
the parsed results in parallel. No `result_buffer` is accessed by more than one task at a time.
Parse the file in `lexer.io` in parallel using `chunking_ctx.nworkers` tasks. User must provide
a `populate_result_buffer!` method which is used to parse ingested data in `chunking_ctx.bytes`, using the
newline positions in `chunking_ctx.newline_positions` as row boundaries into the `result_buffers`.
The `consume!` method is called after each `populate_result_buffer!` call, so the user can process
the parsed results in parallel. No `result_buffer` is accessed by more than one task at a time.
# Arguments:
* `lexer`: a `NewlineLexers.Lexer` object which is used to find newline positions in the input. The
type of the lexer affects whether the search is quote-aware or not.
* `parsing_ctx`: a user-provided object which is passed to `populate_result_buffer!`
* `consume_ctx`: a user-provided object which is passed to `consume!`
* `chunking_ctx`: an internal object that is used to keep track of the current chunk of data being processed
* `result_buffers`: a vector of user-provided objects which are used to store the parsed results
* `CT`: an optional, compile-time known type which is passed to `populate_result_buffer!`
# Arguments:
* `lexer`: a `NewlineLexers.Lexer` object which is used to find newline positions in the input. The
type of the lexer affects whether the search is quote-aware or not.
* `parsing_ctx`: a user-provided object which is passed to `populate_result_buffer!`
* `consume_ctx`: a user-provided object which is passed to `consume!`
* `chunking_ctx`: an internal object that is used to keep track of the current chunk of data being processed
* `result_buffers`: a vector of user-provided objects which are used to store the parsed results
* `CT`: an optional, compile-time known type which is passed to `populate_result_buffer!`.
This is bit of niche functionality required by ChunkedCSV, which needs to know about
"custom types" at compile time in order to unroll the parsing loop on them.
# Exceptions:
* `UnmatchedQuoteError`: if the input ends with an unmatched quote
* `NoValidRowsInBufferError`: if not a single newline was found in the input buffer
* `CapturedException`: if an exception was thrown in one of the parser/consumer tasks
# Exceptions:
* `UnmatchedQuoteError`: if the input ends with an unmatched quote
* `NoValidRowsInBufferError`: if not a single newline was found in the input buffer
* `CapturedException`: if an exception was thrown in one of the parser/consumer tasks
# Notes:
* You can initialize the `chunking_ctx` yourself using `read_and_lex!` or with `initial_read!` + `initial_lex!`
which gives you the opportunity to sniff the first chunk of data before you call `parse_file_parallel`.
* If the input is bigger than `chunking_ctx.bytes`, a secondary `chunking_ctx` object will be used to
double-buffer the input, which will allocate a new buffer of the same size as `chunking_ctx.bytes`.
* This function spawns `chunking_ctx.nworkers` + 1 tasks.
# Notes:
* You can initialize the `chunking_ctx` yourself using `read_and_lex!` or with `initial_read!` + `initial_lex!`
which gives you the opportunity to sniff the first chunk of data before you call `parse_file_parallel`.
* If the input is bigger than `chunking_ctx.bytes`, a secondary `chunking_ctx` object will be used to
double-buffer the input, which will allocate a new buffer of the same size as `chunking_ctx.bytes`.
* This function spawns `chunking_ctx.nworkers` + 1 tasks.
See also [`populate_result_buffer!`](@ref), [`consume!`](@ref), [`parse_file_serial`](@ref).
See also [`populate_result_buffer!`](@ref), [`consume!`](@ref), [`parse_file_serial`](@ref).
"""
function parse_file_parallel(
lexer::Lexer,
Expand Down
6 changes: 3 additions & 3 deletions src/parser_serial.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
::Type{CT}=Tuple{},
) where {CT}
The serial analog of `parse_file_parallel` which doesn't spawn any tasks,
useful for debugging and processing very small files.
The serial analog of `parse_file_parallel` which doesn't spawn any tasks,
useful for debugging and processing very small files.
See also [`populate_result_buffer!`](@ref), [`consume!`](@ref), [`parse_file_parallel`](@ref).
See also [`populate_result_buffer!`](@ref), [`consume!`](@ref), [`parse_file_parallel`](@ref).
"""
function parse_file_serial(
lexer::Lexer,
Expand Down

0 comments on commit 17e7634

Please sign in to comment.