From d6dee8c4cd2a63eb7cdb6cbdda1d9953104098df Mon Sep 17 00:00:00 2001 From: Nantia Makrynioti Date: Thu, 16 Nov 2023 15:50:24 +0100 Subject: [PATCH] Add a MultipartUploadStream IO object (#46) * WIP MultipartUploadStream * more debugging and testing * added test writing large bytes to S3 with MultipleUploadStream * cleanup, added comments and a test with Azurite * WIP - spawn multiple tasks for uploading * WIP - upload with multiple threads * 2nd attempt - upload with multiple threads * fixed Azure test, S3 test still fails with error 400 * Julia scheduling error debugging * fixed S3 test failures * added error flag to avoid deadlocks * put ntasks increment behind lock * put back tests * cleanup * fixed julia 1.6 incompatibility issue with @atomic * fixed type, cleanup * fixed initialization for Julia 1.6 * fixed type again * another attempt for Julia 1.6 * atomic_add! for Julia 1.6 * Tomas' changes * cleanup * addressed feedback * replaced acquire macro with acquire function for Julia 1.6 compatibility * small refactoring of MultipartUploadStream based on feedback * added tests for failures * alternative syntax, fixed semaphore, cleanup * added specialized constructors * comments and cleanup * more comments and cleanup --- src/CloudStore.jl | 3 +- src/object.jl | 206 ++++++++++++++++++++++++++++++++++++++++++++++ test/runtests.jl | 119 ++++++++++++++++++++++++++ 3 files changed, 327 insertions(+), 1 deletion(-) diff --git a/src/CloudStore.jl b/src/CloudStore.jl index 131d89a..41d8907 100644 --- a/src/CloudStore.jl +++ b/src/CloudStore.jl @@ -6,7 +6,8 @@ import CloudBase: AWS, Azure, CloudTest # for specific clouds module API -export Object, PrefetchedDownloadStream, ResponseBodyType, RequestBodyType +export Object, PrefetchedDownloadStream, ResponseBodyType, RequestBodyType, + MultipartUploadStream using HTTP, CodecZlib, CodecZlibNG, Mmap import WorkerUtilities: OrderedSynchronizer diff --git a/src/object.jl b/src/object.jl index 34bfc43..2608c9d 100644 --- a/src/object.jl +++ b/src/object.jl @@ -372,3 +372,209 @@ function Base.read(io::PrefetchedDownloadStream, ::Type{UInt8}) end return b end + +function _upload_task(io; kw...) + try + (part_n, upload_buffer) = take!(io.upload_queue) + # upload the part + parteTag, wb = uploadPart(io.store, io.url, upload_buffer, part_n, io.uploadState; io.credentials, kw...) + Base.release(io.sem) + # add part eTag to our collection of eTags + Base.@lock io.cond_wait begin + if length(io.eTags) < part_n + resize!(io.eTags, part_n) + end + io.eTags[part_n] = parteTag + io.ntasks -= 1 + notify(io.cond_wait) + end + catch e + isopen(io.upload_queue) && close(io.upload_queue, e) + Base.@lock io.cond_wait begin + io.exc = e + notify(io.cond_wait, e, all=true, error=true) + end + end + return nothing +end + +""" +This is an *experimental* API. + + MultipartUploadStream <: IO + MultipartUploadStream(args...; kwargs...) -> MultipartUploadStream + + An in-memory IO stream that uploads chunks to a URL in blob storage. + +For every data chunk we call write(io, data;) to write it to a channel. We spawn one task +per chunk to read data from this channel and uploads it as a distinct part to blob storage +to the same remote object. +We expect the chunks to be written in order. +For cases where there is no need to upload data in parts or the data size is too small, `put` +can be used instead. + +# Arguments +* `store::AbstractStore`: The S3 Bucket / Azure Container object +* `key::String`: S3 key / Azure blob resource name + +# Keywords +* `credentials::Union{CloudCredentials, Nothing}=nothing`: Credentials object used in HTTP + requests +* `concurrent_writes_to_channel::Int=(4 * Threads.nthreads())`: represents the max number of + chunks in flight. Defaults to 4 times the number of threads. We use this value to + initialize a semaphore to perform throttling in case the writing to the channel is much + faster to uploading to blob storage, i.e. `write` will block as a result of this limit + being reached. +* `kwargs...`: HTTP keyword arguments are forwarded to underlying HTTP requests, + +## Examples +``` +# Get an IO stream for a remote CSV file `test.csv` living in your S3 bucket +io = MultipartUploadStream(bucket, "test.csv"; credentials) + +# Write a chunk of data (Vector{UInt8}) to the stream +write(io, data;) + +# Wait for all chunks to be uploaded +wait(io) + +# Close the stream +close(io; credentials) + +# Alternative syntax that encapsulates all these steps +MultipartUploadStream(bucket, "test.csv"; credentials) do io + write(io, data;) +end +``` + +## Note on upload size +``` +Some cloud storage providers might have a lower limit on the size of the uploaded object. +For example it seems that S3 requires at minimum an upload of 5MB: +https://github.com/minio/minio/issues/11076. +We haven't found a similar setting for Azure. +For such cases where the size of the data is too small, one can use `put` +``` +""" +mutable struct MultipartUploadStream{T <: AbstractStore} <: IO + store::T + url::String + credentials::Union{Nothing, AWS.Credentials, Azure.Credentials} + uploadState::Union{String, Nothing} + eTags::Vector{String} + upload_queue::Channel{Tuple{Int, Vector{UInt8}}} + cond_wait::Threads.Condition + cur_part_id::Int + ntasks::Int + exc::Union{Exception, Nothing} + sem::Base.Semaphore + + function MultipartUploadStream( + store::AWS.Bucket, + key::String; + credentials::Union{Nothing, AWS.Credentials}=nothing, + concurrent_writes_to_channel::Int=(4 * Threads.nthreads()), + kw... + ) + url = makeURL(store, key) + uploadState = API.startMultipartUpload(store, key; credentials, kw...) + io = new{AWS.Bucket}( + store, + url, + credentials, + uploadState, + Vector{String}(), + Channel{Tuple{Int, Vector{UInt8}}}(Inf), + Threads.Condition(), + 0, + 0, + nothing, + Base.Semaphore(concurrent_writes_to_channel) + ) + return io + end + + function MultipartUploadStream( + store::Azure.Container, + key::String; + credentials::Union{Nothing, Azure.Credentials}=nothing, + concurrent_writes_to_channel::Int=(4 * Threads.nthreads()), + kw... + ) + url = makeURL(store, key) + uploadState = API.startMultipartUpload(store, key; credentials, kw...) + io = new{Azure.Container}( + store, + url, + credentials, + uploadState, + Vector{String}(), + Channel{Tuple{Int, Vector{UInt8}}}(Inf), + Threads.Condition(), + 0, + 0, + nothing, + Base.Semaphore(concurrent_writes_to_channel) + ) + return io + end + + # Alternative syntax that applies the function `f` to the result of + # `MultipartUploadStream(args...; kwargs...)`, waits for all parts to be uploaded and + # and closes the stream. + function MultipartUploadStream(f::Function, args...; kw...) + io = MultipartUploadStream(args...; kw...) + try + f(io) + wait(io) + close(io; kw...) + catch e + # todo, we need a function here to signal abort to S3/Blobs. We don't have that + # yet in CloudStore.jl + rethrow() + end + end +end + +# Writes a data chunk to the channel and spawn +function Base.write(io::MultipartUploadStream, bytes::Vector{UInt8}; kw...) + local part_n + Base.@lock io.cond_wait begin + io.ntasks += 1 + io.cur_part_id += 1 + part_n = io.cur_part_id + notify(io.cond_wait) + end + Base.acquire(io.sem) + # We expect the data chunks to be written in order in the channel. + put!(io.upload_queue, (part_n, bytes)) + Threads.@spawn _upload_task($io; $(kw)...) + return nothing +end + +# Waits for all parts to be uploaded +function Base.wait(io::MultipartUploadStream) + try + Base.@lock io.cond_wait begin + while true + !isnothing(io.exc) && throw(io.exc) + io.ntasks == 0 && break + wait(io.cond_wait) + end + end + catch e + rethrow() + end +end + +# When there are no more data chunks to upload, this function closes the channel and sends +# a POST request with a single id for the entire upload. +function Base.close(io::MultipartUploadStream; kw...) + try + close(io.upload_queue) + return API.completeMultipartUpload(io.store, io.url, io.eTags, io.uploadState; kw...) + catch e + io.exc = e + rethrow() + end +end diff --git a/test/runtests.jl b/test/runtests.jl index 2eccd4c..b8712f5 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -793,5 +793,124 @@ end end end +# When using Minio, the minimum upload size per part is 5MB according to +# S3 specifications: https://github.com/minio/minio/issues/11076 +# I couldn't find a minimum upload size for Azure blob storage. +@testset "CloudStore.MultipartUploadStream write large bytes - S3" begin + Minio.with(; debug=true) do conf + credentials, bucket = conf + multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB + + N = 5500000 + mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials) + + i = 1 + while i < sizeof(multicsv) + nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N + buf = Vector{UInt8}(undef, nb) + copyto!(buf, 1, codeunits(multicsv), i, nb) + CloudStore.write(mus_obj, buf;) + i += N + end + + CloudStore.wait(mus_obj) + CloudStore.close(mus_obj; credentials) + obj = CloudStore.Object(bucket, "test.csv"; credentials) + @test length(obj) == sizeof(multicsv) + end +end + +@testset "CloudStore.MultipartUploadStream failure due to too small upload size - S3" begin + Minio.with(; debug=true) do conf + credentials, bucket = conf + multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB + + N = 55000 + mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials) + try + i = 1 + nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N + buf = Vector{UInt8}(undef, nb) + copyto!(buf, 1, codeunits(multicsv), i, nb) + CloudStore.write(mus_obj, buf;) + CloudStore.wait(mus_obj) + CloudStore.close(mus_obj; credentials) # This should fail + catch e + @test isnothing(mus_obj.exc) == false + end + end +end + +@testset "CloudStore.MultipartUploadStream failure due to changed url - S3" begin + Minio.with(; debug=true) do conf + credentials, bucket = conf + multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB + + N = 5500000 + mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials) + try + i = 1 + nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N + buf = Vector{UInt8}(undef, nb) + copyto!(buf, 1, codeunits(multicsv), i, nb) + # Changing the url after the MultipartUploadStream object was created + mus_obj.url = "http://127.0.0.1:23252/jl-minio-22377/test_nantia.csv" + CloudStore.write(mus_obj, buf;) # This should fail + CloudStore.wait(mus_obj) + catch e + @test isnothing(mus_obj.exc) == false + end + end +end + +@testset "CloudStore.MultipartUploadStream write large bytes - Azure" begin + Azurite.with(; debug=true) do conf + credentials, bucket = conf + multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB + + N = 2000000 + mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials) + + i = 1 + while i < sizeof(multicsv) + nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N + buf = Vector{UInt8}(undef, nb) + copyto!(buf, 1, codeunits(multicsv), i, nb) + CloudStore.write(mus_obj, buf;) + i += N + end + + CloudStore.wait(mus_obj) + CloudStore.close(mus_obj; credentials) + obj = CloudStore.Object(bucket, "test.csv"; credentials) + @test length(obj) == sizeof(multicsv) + end +end + +@testset "CloudStore.MultipartUploadStream test alternative syntax - Azure" begin + Azurite.with(; debug=true) do conf + credentials, bucket = conf + multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB + + N = 2000000 + function uploading_loop(multicsv, batch_size, mus_obj) + i = 1 + while i < sizeof(multicsv) + nb = i + batch_size > length(multicsv) ? length(multicsv)-i+1 : batch_size + buf = Vector{UInt8}(undef, nb) + copyto!(buf, 1, codeunits(multicsv), i, nb) + CloudStore.write(mus_obj, buf;) + i += batch_size + end + end + + CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials) do mus_obj + uploading_loop(multicsv, N, mus_obj) + end + + obj = CloudStore.Object(bucket, "test.csv"; credentials) + @test length(obj) == sizeof(multicsv) + end +end end # @testset "CloudStore.jl"