Skip to content

Commit

Permalink
Add a MultipartUploadStream IO object (#46)
Browse files Browse the repository at this point in the history
* WIP MultipartUploadStream

* more debugging and testing

* added test writing large bytes to S3 with MultipleUploadStream

* cleanup, added comments and a test with Azurite

* WIP - spawn multiple tasks for uploading

* WIP - upload with multiple threads

* 2nd attempt - upload with multiple threads

* fixed Azure test, S3 test still fails with error 400

* Julia scheduling error debugging

* fixed S3 test failures

* added error flag to avoid deadlocks

* put ntasks increment behind lock

* put back tests

* cleanup

* fixed julia 1.6 incompatibility issue with @atomic

* fixed type, cleanup

* fixed initialization for Julia 1.6

* fixed type again

* another attempt for Julia 1.6

* atomic_add! for Julia 1.6

* Tomas' changes

* cleanup

* addressed feedback

* replaced acquire macro with acquire function for Julia 1.6 compatibility

* small refactoring of MultipartUploadStream based on feedback

* added tests for failures

* alternative syntax, fixed semaphore, cleanup

* added specialized constructors

* comments and cleanup

* more comments and cleanup
  • Loading branch information
nantiamak authored Nov 16, 2023
1 parent 328b427 commit d6dee8c
Show file tree
Hide file tree
Showing 3 changed files with 327 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/CloudStore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import CloudBase: AWS, Azure, CloudTest
# for specific clouds
module API

export Object, PrefetchedDownloadStream, ResponseBodyType, RequestBodyType
export Object, PrefetchedDownloadStream, ResponseBodyType, RequestBodyType,
MultipartUploadStream

using HTTP, CodecZlib, CodecZlibNG, Mmap
import WorkerUtilities: OrderedSynchronizer
Expand Down
206 changes: 206 additions & 0 deletions src/object.jl
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,209 @@ function Base.read(io::PrefetchedDownloadStream, ::Type{UInt8})
end
return b
end

function _upload_task(io; kw...)
try
(part_n, upload_buffer) = take!(io.upload_queue)
# upload the part
parteTag, wb = uploadPart(io.store, io.url, upload_buffer, part_n, io.uploadState; io.credentials, kw...)
Base.release(io.sem)
# add part eTag to our collection of eTags
Base.@lock io.cond_wait begin
if length(io.eTags) < part_n
resize!(io.eTags, part_n)
end
io.eTags[part_n] = parteTag
io.ntasks -= 1
notify(io.cond_wait)
end
catch e
isopen(io.upload_queue) && close(io.upload_queue, e)
Base.@lock io.cond_wait begin
io.exc = e
notify(io.cond_wait, e, all=true, error=true)
end
end
return nothing
end

"""
This is an *experimental* API.
MultipartUploadStream <: IO
MultipartUploadStream(args...; kwargs...) -> MultipartUploadStream
An in-memory IO stream that uploads chunks to a URL in blob storage.
For every data chunk we call write(io, data;) to write it to a channel. We spawn one task
per chunk to read data from this channel and uploads it as a distinct part to blob storage
to the same remote object.
We expect the chunks to be written in order.
For cases where there is no need to upload data in parts or the data size is too small, `put`
can be used instead.
# Arguments
* `store::AbstractStore`: The S3 Bucket / Azure Container object
* `key::String`: S3 key / Azure blob resource name
# Keywords
* `credentials::Union{CloudCredentials, Nothing}=nothing`: Credentials object used in HTTP
requests
* `concurrent_writes_to_channel::Int=(4 * Threads.nthreads())`: represents the max number of
chunks in flight. Defaults to 4 times the number of threads. We use this value to
initialize a semaphore to perform throttling in case the writing to the channel is much
faster to uploading to blob storage, i.e. `write` will block as a result of this limit
being reached.
* `kwargs...`: HTTP keyword arguments are forwarded to underlying HTTP requests,
## Examples
```
# Get an IO stream for a remote CSV file `test.csv` living in your S3 bucket
io = MultipartUploadStream(bucket, "test.csv"; credentials)
# Write a chunk of data (Vector{UInt8}) to the stream
write(io, data;)
# Wait for all chunks to be uploaded
wait(io)
# Close the stream
close(io; credentials)
# Alternative syntax that encapsulates all these steps
MultipartUploadStream(bucket, "test.csv"; credentials) do io
write(io, data;)
end
```
## Note on upload size
```
Some cloud storage providers might have a lower limit on the size of the uploaded object.
For example it seems that S3 requires at minimum an upload of 5MB:
https://github.com/minio/minio/issues/11076.
We haven't found a similar setting for Azure.
For such cases where the size of the data is too small, one can use `put`
```
"""
mutable struct MultipartUploadStream{T <: AbstractStore} <: IO
store::T
url::String
credentials::Union{Nothing, AWS.Credentials, Azure.Credentials}
uploadState::Union{String, Nothing}
eTags::Vector{String}
upload_queue::Channel{Tuple{Int, Vector{UInt8}}}
cond_wait::Threads.Condition
cur_part_id::Int
ntasks::Int
exc::Union{Exception, Nothing}
sem::Base.Semaphore

function MultipartUploadStream(
store::AWS.Bucket,
key::String;
credentials::Union{Nothing, AWS.Credentials}=nothing,
concurrent_writes_to_channel::Int=(4 * Threads.nthreads()),
kw...
)
url = makeURL(store, key)
uploadState = API.startMultipartUpload(store, key; credentials, kw...)
io = new{AWS.Bucket}(
store,
url,
credentials,
uploadState,
Vector{String}(),
Channel{Tuple{Int, Vector{UInt8}}}(Inf),
Threads.Condition(),
0,
0,
nothing,
Base.Semaphore(concurrent_writes_to_channel)
)
return io
end

function MultipartUploadStream(
store::Azure.Container,
key::String;
credentials::Union{Nothing, Azure.Credentials}=nothing,
concurrent_writes_to_channel::Int=(4 * Threads.nthreads()),
kw...
)
url = makeURL(store, key)
uploadState = API.startMultipartUpload(store, key; credentials, kw...)
io = new{Azure.Container}(
store,
url,
credentials,
uploadState,
Vector{String}(),
Channel{Tuple{Int, Vector{UInt8}}}(Inf),
Threads.Condition(),
0,
0,
nothing,
Base.Semaphore(concurrent_writes_to_channel)
)
return io
end

# Alternative syntax that applies the function `f` to the result of
# `MultipartUploadStream(args...; kwargs...)`, waits for all parts to be uploaded and
# and closes the stream.
function MultipartUploadStream(f::Function, args...; kw...)
io = MultipartUploadStream(args...; kw...)
try
f(io)
wait(io)
close(io; kw...)
catch e
# todo, we need a function here to signal abort to S3/Blobs. We don't have that
# yet in CloudStore.jl
rethrow()
end
end
end

# Writes a data chunk to the channel and spawn
function Base.write(io::MultipartUploadStream, bytes::Vector{UInt8}; kw...)
local part_n
Base.@lock io.cond_wait begin
io.ntasks += 1
io.cur_part_id += 1
part_n = io.cur_part_id
notify(io.cond_wait)
end
Base.acquire(io.sem)
# We expect the data chunks to be written in order in the channel.
put!(io.upload_queue, (part_n, bytes))
Threads.@spawn _upload_task($io; $(kw)...)
return nothing
end

# Waits for all parts to be uploaded
function Base.wait(io::MultipartUploadStream)
try
Base.@lock io.cond_wait begin
while true
!isnothing(io.exc) && throw(io.exc)
io.ntasks == 0 && break
wait(io.cond_wait)
end
end
catch e
rethrow()
end
end

# When there are no more data chunks to upload, this function closes the channel and sends
# a POST request with a single id for the entire upload.
function Base.close(io::MultipartUploadStream; kw...)
try
close(io.upload_queue)
return API.completeMultipartUpload(io.store, io.url, io.eTags, io.uploadState; kw...)
catch e
io.exc = e
rethrow()
end
end
119 changes: 119 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -793,5 +793,124 @@ end
end
end

# When using Minio, the minimum upload size per part is 5MB according to
# S3 specifications: https://github.com/minio/minio/issues/11076
# I couldn't find a minimum upload size for Azure blob storage.
@testset "CloudStore.MultipartUploadStream write large bytes - S3" begin
Minio.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB

N = 5500000
mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials)

i = 1
while i < sizeof(multicsv)
nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N
buf = Vector{UInt8}(undef, nb)
copyto!(buf, 1, codeunits(multicsv), i, nb)
CloudStore.write(mus_obj, buf;)
i += N
end

CloudStore.wait(mus_obj)
CloudStore.close(mus_obj; credentials)
obj = CloudStore.Object(bucket, "test.csv"; credentials)
@test length(obj) == sizeof(multicsv)
end
end

@testset "CloudStore.MultipartUploadStream failure due to too small upload size - S3" begin
Minio.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB

N = 55000
mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials)
try
i = 1
nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N
buf = Vector{UInt8}(undef, nb)
copyto!(buf, 1, codeunits(multicsv), i, nb)
CloudStore.write(mus_obj, buf;)
CloudStore.wait(mus_obj)
CloudStore.close(mus_obj; credentials) # This should fail
catch e
@test isnothing(mus_obj.exc) == false
end
end
end

@testset "CloudStore.MultipartUploadStream failure due to changed url - S3" begin
Minio.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB

N = 5500000
mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials)
try
i = 1
nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N
buf = Vector{UInt8}(undef, nb)
copyto!(buf, 1, codeunits(multicsv), i, nb)
# Changing the url after the MultipartUploadStream object was created
mus_obj.url = "http://127.0.0.1:23252/jl-minio-22377/test_nantia.csv"
CloudStore.write(mus_obj, buf;) # This should fail
CloudStore.wait(mus_obj)
catch e
@test isnothing(mus_obj.exc) == false
end
end
end

@testset "CloudStore.MultipartUploadStream write large bytes - Azure" begin
Azurite.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB

N = 2000000
mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials)

i = 1
while i < sizeof(multicsv)
nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N
buf = Vector{UInt8}(undef, nb)
copyto!(buf, 1, codeunits(multicsv), i, nb)
CloudStore.write(mus_obj, buf;)
i += N
end

CloudStore.wait(mus_obj)
CloudStore.close(mus_obj; credentials)
obj = CloudStore.Object(bucket, "test.csv"; credentials)
@test length(obj) == sizeof(multicsv)
end
end

@testset "CloudStore.MultipartUploadStream test alternative syntax - Azure" begin
Azurite.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB

N = 2000000
function uploading_loop(multicsv, batch_size, mus_obj)
i = 1
while i < sizeof(multicsv)
nb = i + batch_size > length(multicsv) ? length(multicsv)-i+1 : batch_size
buf = Vector{UInt8}(undef, nb)
copyto!(buf, 1, codeunits(multicsv), i, nb)
CloudStore.write(mus_obj, buf;)
i += batch_size
end
end

CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials) do mus_obj
uploading_loop(multicsv, N, mus_obj)
end

obj = CloudStore.Object(bucket, "test.csv"; credentials)
@test length(obj) == sizeof(multicsv)
end
end

end # @testset "CloudStore.jl"

0 comments on commit d6dee8c

Please sign in to comment.