diff --git a/test/e2e_tests.jl b/test/e2e_tests.jl index dfc1832..607aac3 100644 --- a/test/e2e_tests.jl +++ b/test/e2e_tests.jl @@ -318,14 +318,11 @@ end @testset "consume!" begin @testset "serial" begin throw_ctx = TestThrowingContext(2) + lexer = Lexer(IOBuffer("[1,2]\n[3,4]"), nothing, '\n') + chunking_ctx = ChunkingContext(6, 1, 0, nothing) + ChunkedBase.read_and_lex!(lexer, chunking_ctx) @test_throws ErrorException("These contexts are for throwing, and that's all what they do") begin - parse_file_serial( - Lexer(IOBuffer("[1,2]\n[3,4]"), nothing, '\n'), - TestParsingContext(), - throw_ctx, - ChunkingContext(6, 1, 0, nothing), - make_buf(1), - ) + parse_file_serial(lexer, TestParsingContext(), throw_ctx, chunking_ctx, make_buf(1)) end @assert !isempty(throw_ctx.tasks) @test throw_ctx.tasks[1] === current_task() @@ -335,14 +332,13 @@ end @testset "parallel" begin # 1500 rows should be enough to get each of the 3 task at least one consume! throw_ctx = TestThrowingContext(1500) + lexer = Lexer(IOBuffer(("[1,2]\n[3,4]\n" ^ 800)), nothing, '\n') # 1600 rows total nworkers = min(3, Threads.nthreads()) + chunking_ctx = ChunkingContext(12, nworkers, 0, nothing) + ChunkedBase.read_and_lex!(lexer, chunking_ctx) @test_throws TaskFailedException begin parse_file_parallel( - Lexer(IOBuffer(("[1,2]\n[3,4]\n" ^ 800)), nothing, '\n'), # 1600 rows total - TestParsingContext(), - throw_ctx, - ChunkingContext(12, nworkers, 0, nothing), - [make_buf(1) for _ in 1:(2*nworkers)], + lexer, TestParsingContext(), throw_ctx, chunking_ctx, [make_buf(1) for _ in 1:(2*nworkers)] ) end sleep(0.2) @@ -356,14 +352,12 @@ end @testset "io" begin @testset "serial" begin throw_ctx = TestThrowingContext(typemax(Int)) # Only capture tasks, let IO do the throwing + lexer = Lexer(ThrowingIO(("[1,2]\n[3,4]\n" ^ 10)), nothing, '\n') # 20 rows total + chunking_ctx = ChunkingContext(6, 1, 0, nothing) + + ChunkedBase.read_and_lex!(lexer, chunking_ctx) @test_throws ErrorException("That should be enough data for everyone") begin - parse_file_serial( - Lexer(ThrowingIO(("[1,2]\n[3,4]\n" ^ 10)), nothing, '\n'), # 20 rows total - TestParsingContext(), - throw_ctx, - ChunkingContext(6, 1, 0, nothing), - make_buf(1), - ) + parse_file_serial(lexer, TestParsingContext(), throw_ctx, chunking_ctx, make_buf(1)) end @assert !isempty(throw_ctx.tasks) @test throw_ctx.tasks[1] === current_task() @@ -371,15 +365,15 @@ end end @testset "parallel" begin - throw_ctx = TestThrowingContext(typemax(Int)) # Only capture tasks, let IO do the throwing nworkers = min(3, Threads.nthreads()) + throw_ctx = TestThrowingContext(typemax(Int)) # Only capture tasks, let IO do the throwing + lexer = Lexer(ThrowingIO(("[1,2]\n[3,4]\n" ^ 800)), nothing, '\n') # 1600 rows total + chunking_ctx = ChunkingContext(12, nworkers, 0, nothing) + + ChunkedBase.read_and_lex!(lexer, chunking_ctx) @test_throws TaskFailedException begin parse_file_parallel( - Lexer(ThrowingIO(("[1,2]\n[3,4]\n" ^ 800)), nothing, '\n'), # 1600 rows total - TestParsingContext(), - throw_ctx, - ChunkingContext(12, nworkers, 0, nothing), - [make_buf(1) for _ in 1:(2*nworkers)], + lexer, TestParsingContext(), throw_ctx, chunking_ctx, [make_buf(1) for _ in 1:(2*nworkers)] ) end sleep(0.2)