Skip to content

Commit

Permalink
More tracing tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
Drvi committed Jul 18, 2023
1 parent dec42da commit 6081b5e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 4 deletions.
13 changes: 12 additions & 1 deletion src/ChunkedBase.jl
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,13 @@ export Lexer
export parse_file_serial, parse_file_parallel, populate_result_buffer!

# TRACING # const PARSER_TASKS_TIMES = [UInt[]]
# TRACING # const CONSUMER_TASKS_TIMES = [UInt[]]
# TRACING # const IO_TASK_TIMES = UInt[]
# TRACING # const LEXER_TASK_TIMES = UInt[]
# TRACING # const T1 = UInt[]
# TRACING # const T2 = UInt[]
# TRACING # get_parser_task_trace(i) = PARSER_TASKS_TIMES[i]
# TRACING # get_consumer_task_trace(i) = CONSUMER_TASKS_TIMES[i]
# TRACING # function clear_traces!(nworkers::Int=Threads.nthreads())
# TRACING # for _ in (length(PARSER_TASKS_TIMES)+1:nworkers)
# TRACING # push!(PARSER_TASKS_TIMES, UInt[])
Expand All @@ -129,6 +131,7 @@ export parse_file_serial, parse_file_parallel, populate_result_buffer!
# TRACING # empty!(T1)
# TRACING # empty!(T2)
# TRACING # foreach(empty!, PARSER_TASKS_TIMES)
# TRACING # foreach(empty!, CONSUMER_TASKS_TIMES)
# TRACING # return nothing
# TRACING # end
# TRACING # function dump_traces(path)
Expand All @@ -142,11 +145,15 @@ export parse_file_serial, parse_file_parallel, populate_result_buffer!
# TRACING # for x in PARSER_TASKS_TIMES
# TRACING # write(io, UInt32(length(x)), x)
# TRACING # end
# TRACING # write(io, UInt32(length(CONSUMER_TASKS_TIMES)))
# TRACING # for x in CONSUMER_TASKS_TIMES
# TRACING # write(io, UInt32(length(x)), x)
# TRACING # end
# TRACING # end
# TRACING # return nothing
# TRACING # end
# TRACING # _resize!(vv, n) = length(vv) >= n ? resize!(vv, n) : append!(vv, [UInt[] for _ in 1:n-length(vv)])
# TRACING # function load_traces!(path)
# TRACING # _resize!(vv, n) = length(vv) >= n ? resize!(vv, n) : append!(vv, [UInt[] for _ in 1:n-length(vv)])
# TRACING # open(path, "r") do io
# TRACING # read!(io, resize!(IO_TASK_TIMES, read(io, UInt32)))
# TRACING # read!(io, resize!(LEXER_TASK_TIMES, read(io, UInt32)))
Expand All @@ -157,6 +164,10 @@ export parse_file_serial, parse_file_parallel, populate_result_buffer!
# TRACING # for x in PARSER_TASKS_TIMES
# TRACING # read!(io, resize!(x, read(io, UInt32)))
# TRACING # end
# TRACING # _resize!(CONSUMER_TASKS_TIMES, read(io, UInt32))
# TRACING # for x in CONSUMER_TASKS_TIMES
# TRACING # read!(io, resize!(x, read(io, UInt32)))
# TRACING # end
# TRACING # end
# TRACING # return nothing
# TRACING # end
Expand Down
4 changes: 3 additions & 1 deletion src/_traces.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@ function plot_traces()
io_task = copy(ChunkedBase.IO_TASK_TIMES)
lexer_task = copy(ChunkedBase.LEXER_TASK_TIMES)
parser_tasks = filter(x->length(x)>0, ChunkedBase.PARSER_TASKS_TIMES)
consume_tasks = filter(x->length(x)>0, ChunkedBase.CONSUMER_TASKS_TIMES)

start = Int(mapreduce(first, min, parser_tasks, init=min(io_task[1], lexer_task[1])))

lexer_timing = map(x->(x - start) / (1e9), lexer_task)
io_timing = map(x->(x - start) / (1e9), io_task)
pa_timings = map.(x->(x - start) / (1e9), parser_tasks)
co_timings = map.(x->(x - start) / (1e9), consume_tasks)
t1_timing = map(x->(x - start) / (1e9), t1)
t2_timing = map(x->(x - start) / (1e9), t2)

fig = GLMakie.linesegments(io_timing, fill(1, length(io_timing)))
ends = 2:2:length(io_timing)
GLMakie.scatter!(io_timing[ends], fill(1, length(ends)))

for (i, timing) in enumerate(vcat([lexer_timing, t1_timing, t2_timing, Float64[]], pa_timings))
for (i, timing) in enumerate(vcat([lexer_timing, t1_timing, t2_timing, Float64[]], pa_timings, [Float64[]], co_timings))
GLMakie.linesegments!(timing, fill(i+1, length(timing)))
ends = 2:2:length(timing)
GLMakie.scatter!(timing[ends], fill(i+1, length(ends)))
Expand Down
4 changes: 2 additions & 2 deletions src/parser_parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ function process_and_consume_task(
# consume!, not separating the result buffers per chunk could lead to data corruption if
# the results from the 2nd chunk are ready before the 1st chunk is consumed.
result_buf = result_buffers[task_num + (use_current_context ? 0 : tasks_per_chunk(chunking_ctx))]
# TRACING # push!(trace, time_ns())
# TRACING # push!(trace, time_ns())
ctx = ifelse(use_current_context, chunking_ctx, chunking_ctx_next)
# Defined by the library using ChunkedBase via overload on the specific AbstractResultBuffer and AbstractParsingContext
newline_segment = @view(ctx.newline_positions.elements[task_start:task_end])
populate_result_buffer!(result_buf, newline_segment, parsing_ctx, ctx.bytes, _comment, CT)
# Defined by the user via overload on consume_ctx
consume!(consume_ctx, ParsedPayload(row_num, Int(task_end - task_start), result_buf, parsing_ctx, ctx, task_start))
task_done!(consume_ctx, ctx)
# TRACING # push!(trace, time_ns())
# TRACING # push!(trace, time_ns())
end
catch e
ce = CapturedException(e, catch_backtrace())
Expand Down

0 comments on commit 6081b5e

Please sign in to comment.