From 6081b5e608fbdc66f17f6878f30d7e1a7f3dc659 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Drvo=C5=A1t=C4=9Bp?= Date: Tue, 18 Jul 2023 16:29:35 +0200 Subject: [PATCH] More tracing tweaks --- src/ChunkedBase.jl | 13 ++++++++++++- src/_traces.jl | 4 +++- src/parser_parallel.jl | 4 ++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/ChunkedBase.jl b/src/ChunkedBase.jl index ec0c140..e31b120 100644 --- a/src/ChunkedBase.jl +++ b/src/ChunkedBase.jl @@ -115,11 +115,13 @@ export Lexer export parse_file_serial, parse_file_parallel, populate_result_buffer! # TRACING # const PARSER_TASKS_TIMES = [UInt[]] +# TRACING # const CONSUMER_TASKS_TIMES = [UInt[]] # TRACING # const IO_TASK_TIMES = UInt[] # TRACING # const LEXER_TASK_TIMES = UInt[] # TRACING # const T1 = UInt[] # TRACING # const T2 = UInt[] # TRACING # get_parser_task_trace(i) = PARSER_TASKS_TIMES[i] +# TRACING # get_consumer_task_trace(i) = CONSUMER_TASKS_TIMES[i] # TRACING # function clear_traces!(nworkers::Int=Threads.nthreads()) # TRACING # for _ in (length(PARSER_TASKS_TIMES)+1:nworkers) # TRACING # push!(PARSER_TASKS_TIMES, UInt[]) @@ -129,6 +131,7 @@ export parse_file_serial, parse_file_parallel, populate_result_buffer! # TRACING # empty!(T1) # TRACING # empty!(T2) # TRACING # foreach(empty!, PARSER_TASKS_TIMES) +# TRACING # foreach(empty!, CONSUMER_TASKS_TIMES) # TRACING # return nothing # TRACING # end # TRACING # function dump_traces(path) @@ -142,11 +145,15 @@ export parse_file_serial, parse_file_parallel, populate_result_buffer! # TRACING # for x in PARSER_TASKS_TIMES # TRACING # write(io, UInt32(length(x)), x) # TRACING # end +# TRACING # write(io, UInt32(length(CONSUMER_TASKS_TIMES))) +# TRACING # for x in CONSUMER_TASKS_TIMES +# TRACING # write(io, UInt32(length(x)), x) +# TRACING # end # TRACING # end # TRACING # return nothing # TRACING # end +# TRACING # _resize!(vv, n) = length(vv) >= n ? resize!(vv, n) : append!(vv, [UInt[] for _ in 1:n-length(vv)]) # TRACING # function load_traces!(path) -# TRACING # _resize!(vv, n) = length(vv) >= n ? resize!(vv, n) : append!(vv, [UInt[] for _ in 1:n-length(vv)]) # TRACING # open(path, "r") do io # TRACING # read!(io, resize!(IO_TASK_TIMES, read(io, UInt32))) # TRACING # read!(io, resize!(LEXER_TASK_TIMES, read(io, UInt32))) @@ -157,6 +164,10 @@ export parse_file_serial, parse_file_parallel, populate_result_buffer! # TRACING # for x in PARSER_TASKS_TIMES # TRACING # read!(io, resize!(x, read(io, UInt32))) # TRACING # end +# TRACING # _resize!(CONSUMER_TASKS_TIMES, read(io, UInt32)) +# TRACING # for x in CONSUMER_TASKS_TIMES +# TRACING # read!(io, resize!(x, read(io, UInt32))) +# TRACING # end # TRACING # end # TRACING # return nothing # TRACING # end diff --git a/src/_traces.jl b/src/_traces.jl index c4afc56..72a4a74 100644 --- a/src/_traces.jl +++ b/src/_traces.jl @@ -6,12 +6,14 @@ function plot_traces() io_task = copy(ChunkedBase.IO_TASK_TIMES) lexer_task = copy(ChunkedBase.LEXER_TASK_TIMES) parser_tasks = filter(x->length(x)>0, ChunkedBase.PARSER_TASKS_TIMES) + consume_tasks = filter(x->length(x)>0, ChunkedBase.CONSUMER_TASKS_TIMES) start = Int(mapreduce(first, min, parser_tasks, init=min(io_task[1], lexer_task[1]))) lexer_timing = map(x->(x - start) / (1e9), lexer_task) io_timing = map(x->(x - start) / (1e9), io_task) pa_timings = map.(x->(x - start) / (1e9), parser_tasks) + co_timings = map.(x->(x - start) / (1e9), consume_tasks) t1_timing = map(x->(x - start) / (1e9), t1) t2_timing = map(x->(x - start) / (1e9), t2) @@ -19,7 +21,7 @@ function plot_traces() ends = 2:2:length(io_timing) GLMakie.scatter!(io_timing[ends], fill(1, length(ends))) - for (i, timing) in enumerate(vcat([lexer_timing, t1_timing, t2_timing, Float64[]], pa_timings)) + for (i, timing) in enumerate(vcat([lexer_timing, t1_timing, t2_timing, Float64[]], pa_timings, [Float64[]], co_timings)) GLMakie.linesegments!(timing, fill(i+1, length(timing))) ends = 2:2:length(timing) GLMakie.scatter!(timing[ends], fill(i+1, length(ends))) diff --git a/src/parser_parallel.jl b/src/parser_parallel.jl index 87bd1d7..02f48ba 100644 --- a/src/parser_parallel.jl +++ b/src/parser_parallel.jl @@ -70,7 +70,7 @@ function process_and_consume_task( # consume!, not separating the result buffers per chunk could lead to data corruption if # the results from the 2nd chunk are ready before the 1st chunk is consumed. result_buf = result_buffers[task_num + (use_current_context ? 0 : tasks_per_chunk(chunking_ctx))] - # TRACING # push!(trace, time_ns()) + # TRACING # push!(trace, time_ns()) ctx = ifelse(use_current_context, chunking_ctx, chunking_ctx_next) # Defined by the library using ChunkedBase via overload on the specific AbstractResultBuffer and AbstractParsingContext newline_segment = @view(ctx.newline_positions.elements[task_start:task_end]) @@ -78,7 +78,7 @@ function process_and_consume_task( # Defined by the user via overload on consume_ctx consume!(consume_ctx, ParsedPayload(row_num, Int(task_end - task_start), result_buf, parsing_ctx, ctx, task_start)) task_done!(consume_ctx, ctx) - # TRACING # push!(trace, time_ns()) + # TRACING # push!(trace, time_ns()) end catch e ce = CapturedException(e, catch_backtrace())