Skip to content

Commit

Permalink
Merge pull request #2 from JuliaData/td-more-tests
Browse files Browse the repository at this point in the history
Misc tweaks and tests
  • Loading branch information
Drvi authored Jun 20, 2023
2 parents 5562eb5 + 92a9302 commit 65d33e9
Show file tree
Hide file tree
Showing 7 changed files with 508 additions and 88 deletions.
3 changes: 2 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ julia = "1.6"

[extras]
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"

[targets]
test = ["Test"]
test = ["Test", "Random"]
2 changes: 1 addition & 1 deletion src/parser_parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ function process_and_consume_task(
newline_segment = @view(ctx.newline_positions.elements[task_start:task_end])
populate_result_buffer!(result_buf, newline_segment, parsing_ctx, ctx.bytes, _comment, CT)
# Defined by the user via overload on consume_ctx
consume!(consume_ctx, ParsedPayload(row_num, task_end - task_start + 1, result_buf, parsing_ctx, ctx, task_start))
consume!(consume_ctx, ParsedPayload(row_num, Int(task_end - task_start), result_buf, parsing_ctx, ctx, task_start))
task_done!(consume_ctx, ctx)
# TRACING # push!(trace, time_ns())
end
Expand Down
4 changes: 2 additions & 2 deletions src/parser_serial.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ function parse_file_serial(
consume_ctx::AbstractConsumeContext,
chunking_ctx::ChunkingContext,
result_buf::AbstractResultBuffer,
::Type{CT}
::Type{CT}=Tuple{},
) where {CT}
row_num = 1
_comment = chunking_ctx.comment
Expand All @@ -18,7 +18,7 @@ function parse_file_serial(
task_end = Int32(last(task))
newline_segment = @view(chunking_ctx.newline_positions.elements[task_start:task_end])
populate_result_buffer!(result_buf, newline_segment, parsing_ctx, chunking_ctx.bytes, _comment, CT)
consume!(consume_ctx, ParsedPayload(row_num, length(task), result_buf, parsing_ctx, chunking_ctx, task_start))
consume!(consume_ctx, ParsedPayload(row_num, length(task) - 1, result_buf, parsing_ctx, chunking_ctx, task_start))
row_num += Int(task_end - task_start)
task_start = task_end
task_done!(consume_ctx, chunking_ctx)
Expand Down
17 changes: 5 additions & 12 deletions src/payload.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ struct ParsedPayload{B<:AbstractResultBuffer, C<:AbstractParsingContext}
eols_buffer_index::Int32
end
Base.length(payload::ParsedPayload) = payload.len
last_row(payload::ParsedPayload) = (payload.row_num + length(payload) - 1)
last_row(payload::ParsedPayload) = payload.row_num + length(payload) - 1


#
# PayloadOrderer
#

# Like a Channel, but when you take! a Payload from it, it will be the next one in order
mutable struct PayloadOrderer{B<:AbstractResultBuffer, C<:AbstractParsingContext}
mutable struct PayloadOrderer{B<:AbstractResultBuffer, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}}
queue::Channel{ParsedPayload{B,C}}
expected_row::Int
waititng_room::Vector{ParsedPayload{B,C}}
end
PayloadOrderer(queue::Channel{ParsedPayload{B,C}}) where {B,C} = PayloadOrderer{B,C}(queue)
PayloadOrderer{B,C}(queue::Channel{ParsedPayload{B,C}}) where {B,C} = PayloadOrderer{B,C}(queue, 1, sizehint!(ParsedPayload{B,C}[], Threads.nthreads()))
PayloadOrderer{B,C}() where {B,C} = PayloadOrderer{B,C}(Channel{ParsedPayload}(Inf), 1, sizehint!(ParsedPayload{B,C}[], Threads.nthreads()))
PayloadOrderer{B,C}() where {B,C} = PayloadOrderer{B,C}(Channel{ParsedPayload{B,C}}(Inf), 1, sizehint!(ParsedPayload{B,C}[], Threads.nthreads()))

function _reenqueue_ordered!(queue::Channel{T}, waiting_room::Vector{T}, payload::T) where {T}
row = payload.row_num
Expand All @@ -54,20 +54,13 @@ function _reorder!(queue::Channel{T}, waiting_room::Vector{T}, payload::T, expec
_reenqueue_ordered!(queue, waiting_room, payload)
return false
end

insertsorted!(waiting_room, payload, x->x.row_num)

if waiting_room[1].row_num == expected_row
payload = popfirst!(waiting_room)
put!(queue, payload)
_reenqueue_ordered!(queue, waiting_room, payload)
end

return true
end

Base.put!(o::PayloadOrderer, x::ParsedPayload) = put!(o.queue, x)
Base.put!(o::PayloadOrderer{B,C}, x::ParsedPayload{B,C}) where {B,C} = put!(o.queue, x)
Base.close(o::PayloadOrderer, excp::Exception=Base.closed_exception()) = close(o.queue, excp)
Base.isopen(o::PayloadOrderer) = isopen(o.queue)

function Base.take!(o::PayloadOrderer) # Blocks until a the next payload in order arrives
payload = take!(o.queue)
Expand Down
2 changes: 1 addition & 1 deletion src/read_and_lex_utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ function _detect_newline(buf, pos, len)
v = view(buf, pos:len)
if isnothing(findfirst(==(UInt8('\n')), v))
if isnothing(findfirst(==(UInt8('\r')), v))
throw(ArgumentError("No newline detected. Specify the newline character explicitly via the `newlinechar` keyword argument. Use `\n` even for CRLF."))
throw(ArgumentError("No newline detected. Specify the newline character explicitly via the `newline` keyword argument. Use `\n` even for CRLF."))
else
return UInt8('\r')
end
Expand Down
Loading

0 comments on commit 65d33e9

Please sign in to comment.