Skip to content

Commit

Permalink
Even more tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
Drvi committed Nov 8, 2023
1 parent 478b63c commit 18dd946
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The process looks something like this, with the coordinator task at the bottom:
|:--:|
| *The coordinator synchronizes with workers using a counter behind a mutex (there is one per buffer). It splits the newlines into N segments, distributes them, and increments the counter by N. After the coordinator distributes work, it starts to process the second chunk of data, while the first buffer is being worked on. There is a handoff happening between the two buffers -- we need to copy the bytes after the last newline from the first buffer to the second. Each worker decrements after the `consume!` is done, and the coordinator will wait for the counter to reach 0 before it overwrites the buffer with new data.* |

Packages like `ChunkedCSV.jl` and `ChunkedJSON.jl` hook into this structure by defining their own `populate_result_buffer!` methods that parse the records they were assigned into their custom `Result` buffers which are then handed to the `consume!` method (e.g. to be inserted into a database).
Packages like `ChunkedCSV.jl` and `ChunkedJSONL.jl` hook into this structure by defining their own `populate_result_buffer!` methods that parse the records they were assigned into their custom `Result` buffers which are then handed to the `consume!` method (e.g. to be inserted into a database).

The main entry point of this package is the `parse_file_parallel` function, which accepts several "context" arguments, each controlling a different aspect of the process:
```julia
Expand Down
6 changes: 4 additions & 2 deletions src/ChunkingContext.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# When splitting the work among multiple tasks, we aim for each task to have at least this
# many bytes of input to work on.
# This is to avoid having too many tasks with too little work to do.
# many bytes of input to work on (even if it means that some tasks will have nothing to process)
# This is to avoid overhead from too many task switches.
# TODO: make this configurable and find a good default (the current 16 KiB is a guess)
const MIN_TASK_SIZE_IN_BYTES = 16 * 1024

Expand Down Expand Up @@ -39,6 +39,8 @@ The `id` field is necessary because we internally create a secondary `ChunkingCo
- The `newline_positions` field is used to store the newline positions in the input.
- The `bytes` field is used to store the raw bytes ingested from the input.
- `comment` can be used to skip the *initial* comment lines in the `skip_rows_init!`. This value is also passed to `populate_result_buffer!` for user to apply handle commented rows in the middle of the file during parsing (`_startswith` could be used to do the check).
- The `buffersize` should be large enough to fit the longest row in the input, otherwise the lexer will fail.
- The `buffersize` should be chosen such that each of the `nworkers` tasks has enough bytes to work on. Using 1MiB per task seems to work reasonably well in practice.
# See also:
- [`parse_file_parallel`](@ref), [`parse_file_serial`](@ref)
Expand Down

0 comments on commit 18dd946

Please sign in to comment.