Skip to content

Commit 6c07beb

Browse files
authored
UploadStream for Azure Client (#27)
1 parent 39615d4 commit 6c07beb

6 files changed

Lines changed: 278 additions & 49 deletions

File tree

azurebs/client/client.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,17 @@ type AzBlobstore struct {
1515
storageClient StorageClient
1616
}
1717

18+
// Single blob put threshold is 32MB
19+
const singleBlobPutThreshold = int64(32 * 1024 * 1024)
20+
21+
func getFileSize(source *os.File) (int64, error) {
22+
fileInfo, err := source.Stat()
23+
if err != nil {
24+
return 0, fmt.Errorf("failed to get file stat: %w", err)
25+
}
26+
return fileInfo.Size(), nil
27+
}
28+
1829
func New(storageClient StorageClient) (AzBlobstore, error) {
1930
return AzBlobstore{storageClient: storageClient}, nil
2031
}
@@ -30,24 +41,36 @@ func (client *AzBlobstore) Put(sourceFilePath string, dest string) error {
3041
return err
3142
}
3243
defer source.Close() //nolint:errcheck
33-
34-
md5, err := client.storageClient.Upload(source, dest)
44+
fileSize, err := getFileSize(source)
3545
if err != nil {
36-
return fmt.Errorf("upload failure: %w", err)
46+
return err
3747
}
48+
if fileSize <= singleBlobPutThreshold {
49+
md5, err := client.storageClient.Upload(source, dest)
50+
if err != nil {
51+
return fmt.Errorf("upload failure: %w", err)
52+
}
3853

39-
if !bytes.Equal(sourceMD5, md5) {
40-
slog.Error("Upload failed due to MD5 mismatch, deleting blob", "blob", dest, "expected_md5", fmt.Sprintf("%x", sourceMD5), "received_md5", fmt.Sprintf("%x", md5))
54+
if !bytes.Equal(sourceMD5, md5) {
55+
slog.Error("Upload failed due to MD5 mismatch, deleting blob", "blob", dest, "expected_md5", fmt.Sprintf("%x", sourceMD5), "received_md5", fmt.Sprintf("%x", md5))
4156

42-
err := client.storageClient.Delete(dest)
43-
if err != nil {
44-
slog.Error("Failed to delete blob after MD5 mismatch", "blob", dest, "error", err)
57+
err := client.storageClient.Delete(dest)
58+
if err != nil {
59+
slog.Error("Failed to delete blob after MD5 mismatch", "blob", dest, "error", err)
4560

61+
}
62+
return fmt.Errorf("MD5 mismatch: expected %x, got %x", sourceMD5, md5)
63+
}
64+
65+
slog.Debug("MD5 verification passed", "blob", dest, "md5", fmt.Sprintf("%x", md5))
66+
67+
} else {
68+
err := client.storageClient.UploadStream(source, dest)
69+
if err != nil {
70+
return fmt.Errorf("upload failure: %w", err)
4671
}
47-
return fmt.Errorf("MD5 mismatch: expected %x, got %x", sourceMD5, md5)
4872
}
4973

50-
slog.Debug("MD5 verification passed", "blob", dest, "md5", fmt.Sprintf("%x", md5))
5174
return nil
5275
}
5376

azurebs/client/client_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package client_test
22

33
import (
4+
"bytes"
45
"errors"
56
"os"
67
"runtime"
@@ -32,6 +33,29 @@ var _ = Describe("Client", func() {
3233
Expect(dest).To(Equal("target/blob"))
3334
})
3435

36+
It("uploads a file with UploadStream", func() {
37+
storageClient := clientfakes.FakeStorageClient{}
38+
39+
azBlobstore, err := client.New(&storageClient)
40+
Expect(err).ToNot(HaveOccurred())
41+
42+
file, _ := os.CreateTemp("", "tmpfile-test-upload") //nolint:errcheck
43+
defer os.Remove(file.Name()) //nolint:errcheck
44+
45+
contentSize := 1024 * 1024 * 64 // 64MB
46+
47+
content := bytes.Repeat([]byte("x"), contentSize)
48+
_, _ = file.Write(content) //nolint:errcheck
49+
50+
azBlobstore.Put(file.Name(), "target/blob") //nolint:errcheck
51+
52+
Expect(storageClient.UploadStreamCallCount()).To(Equal(1))
53+
source, dest := storageClient.UploadStreamArgsForCall(0)
54+
55+
Expect(source).To(BeAssignableToTypeOf((*os.File)(nil)))
56+
Expect(dest).To(Equal("target/blob"))
57+
})
58+
3559
It("skips the upload if the md5 cannot be calculated from the file", func() {
3660
storageClient := clientfakes.FakeStorageClient{}
3761

azurebs/client/clientfakes/fake_storage_client.go

Lines changed: 74 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

azurebs/client/storage_client.go

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ type StorageClient interface {
3131
dest string,
3232
) ([]byte, error)
3333

34+
UploadStream(
35+
source io.ReadSeekCloser,
36+
dest string,
37+
) error
38+
3439
Download(
3540
source string,
3641
dest *os.File,
@@ -68,6 +73,36 @@ type StorageClient interface {
6873
EnsureContainerExists() error
6974
}
7075

76+
// 4 MB of block size
77+
const blockSize = int64(4 * 1024 * 1024)
78+
79+
// number of go routines
80+
const maxConcurrency = 5
81+
82+
func createContext(dsc DefaultStorageClient) (context.Context, context.CancelFunc, error) {
83+
var ctx context.Context
84+
var cancel context.CancelFunc
85+
86+
if dsc.storageConfig.Timeout != "" {
87+
timeoutInt, err := strconv.Atoi(dsc.storageConfig.Timeout)
88+
timeout := time.Duration(timeoutInt) * time.Second
89+
if timeout < 1 && err == nil {
90+
slog.Info("Invalid time, need at least 1 second", "timeout", dsc.storageConfig.Timeout)
91+
return nil, nil, fmt.Errorf("invalid time: %w", err)
92+
}
93+
if err != nil {
94+
slog.Info("Invalid timeout format, need seconds as number e.g. 30s", "timeout", dsc.storageConfig.Timeout)
95+
return nil, nil, fmt.Errorf("invalid timeout format: %w", err)
96+
}
97+
ctx, cancel = context.WithTimeout(context.Background(), timeout)
98+
} else {
99+
ctx, cancel = context.WithCancel(context.Background())
100+
}
101+
102+
return ctx, cancel, nil
103+
104+
}
105+
71106
type DefaultStorageClient struct {
72107
credential *azblob.SharedKeyCredential
73108
serviceURL string
@@ -91,33 +126,23 @@ func (dsc DefaultStorageClient) Upload(
91126
) ([]byte, error) {
92127
blobURL := fmt.Sprintf("%s/%s", dsc.serviceURL, dest)
93128

94-
var ctx context.Context
95-
var cancel context.CancelFunc
96-
97129
if dsc.storageConfig.Timeout != "" {
98-
timeoutInt, err := strconv.Atoi(dsc.storageConfig.Timeout)
99-
timeout := time.Duration(timeoutInt) * time.Second
100-
if timeout < 1 && err == nil {
101-
slog.Info("Invalid time, need at least 1 second", "timeout", dsc.storageConfig.Timeout)
102-
return nil, fmt.Errorf("invalid time: %w", err)
103-
}
104-
if err != nil {
105-
slog.Info("Invalid timeout format, need seconds as number e.g. 30s", "timeout", dsc.storageConfig.Timeout)
106-
return nil, fmt.Errorf("invalid timeout format: %w", err)
107-
}
108-
slog.Info("Uploading blob to container", "container", dsc.storageConfig.ContainerName, "blob", dest, "url", blobURL, "timeout", timeout.String())
109-
110-
ctx, cancel = context.WithTimeout(context.Background(), timeout)
130+
slog.Info("Uploading blob to container", "container", dsc.storageConfig.ContainerName, "blob", dest, "url", blobURL, "timeout", dsc.storageConfig.Timeout)
111131
} else {
112132
slog.Info("Uploading blob to container", "container", dsc.storageConfig.ContainerName, "blob", dest, "url", blobURL)
113-
ctx, cancel = context.WithCancel(context.Background())
133+
}
134+
135+
ctx, cancel, err := createContext(dsc)
136+
if err != nil {
137+
return nil, err
114138
}
115139
defer cancel()
116140

117141
client, err := blockblob.NewClientWithSharedKeyCredential(blobURL, dsc.credential, nil)
118142
if err != nil {
119143
return nil, err
120144
}
145+
121146
uploadResponse, err := client.Upload(ctx, source, nil)
122147
if err != nil {
123148
if dsc.storageConfig.Timeout != "" && errors.Is(err, context.DeadlineExceeded) {
@@ -127,7 +152,42 @@ func (dsc DefaultStorageClient) Upload(
127152
}
128153

129154
slog.Info("Successfully uploaded blob", "container", dsc.storageConfig.ContainerName, "blob", dest)
130-
return uploadResponse.ContentMD5, err
155+
return uploadResponse.ContentMD5, nil
156+
}
157+
158+
func (dsc DefaultStorageClient) UploadStream(
159+
source io.ReadSeekCloser,
160+
dest string,
161+
) error {
162+
blobURL := fmt.Sprintf("%s/%s", dsc.serviceURL, dest)
163+
164+
if dsc.storageConfig.Timeout != "" {
165+
slog.Info("UploadStreaming blob to container", "container", dsc.storageConfig.ContainerName, "blob", dest, "url", blobURL, "timeout", dsc.storageConfig.Timeout)
166+
} else {
167+
slog.Info("UploadStreaming blob to container", "container", dsc.storageConfig.ContainerName, "blob", dest, "url", blobURL)
168+
}
169+
170+
ctx, cancel, err := createContext(dsc)
171+
if err != nil {
172+
return err
173+
}
174+
defer cancel()
175+
176+
client, err := blockblob.NewClientWithSharedKeyCredential(blobURL, dsc.credential, nil)
177+
if err != nil {
178+
return err
179+
}
180+
181+
_, err = client.UploadStream(ctx, source, &azblob.UploadStreamOptions{BlockSize: blockSize, Concurrency: maxConcurrency})
182+
if err != nil {
183+
if dsc.storageConfig.Timeout != "" && errors.Is(err, context.DeadlineExceeded) {
184+
return fmt.Errorf("upload failed: timeout of %s reached while uploading %s", dsc.storageConfig.Timeout, dest)
185+
}
186+
return fmt.Errorf("upload failure: %w", err)
187+
}
188+
189+
slog.Info("Successfully uploaded blob", "container", dsc.storageConfig.ContainerName, "blob", dest)
190+
return nil
131191
}
132192

133193
func (dsc DefaultStorageClient) Download(

0 commit comments

Comments
 (0)