Skip to content

Commit 2660d7d

Browse files
committed
replace int fields manual lock with atomic lock and remove extra clone
1 parent 7a070b5 commit 2660d7d

10 files changed

+41
-80
lines changed

feature/s3/transfermanager/api_op_DownloadDirectory.go

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"path/filepath"
1111
"strings"
1212
"sync"
13+
"sync/atomic"
1314

1415
"github.com/aws/aws-sdk-go-v2/aws"
1516
"github.com/aws/aws-sdk-go-v2/service/s3"
@@ -87,12 +88,12 @@ type DownloadDirectoryOutput struct {
8788
// Total number of objects successfully downloaded
8889
// this value might not be the real number of success if user passed a customized
8990
// failure policy in input
90-
ObjectsDownloaded int
91+
ObjectsDownloaded int64
9192

9293
// Total number of objects failed to download
9394
// this value might not be the real number of failure if user passed a customized
9495
// failure policy in input
95-
ObjectsFailed int
96+
ObjectsFailed int64
9697
}
9798

9899
type objectEntry struct {
@@ -132,8 +133,8 @@ type directoryDownloader struct {
132133
in *DownloadDirectoryInput
133134
failurePolicy DownloadDirectoryFailurePolicy
134135

135-
objectsDownloaded int
136-
objectsFailed int
136+
objectsDownloaded int64
137+
objectsFailed int64
137138

138139
err error
139140

@@ -272,7 +273,7 @@ func (d *directoryDownloader) downloadObject(ctx context.Context, ch chan object
272273
if err != nil {
273274
d.setErr(fmt.Errorf("error when heading info of object %s: %v", data.key, err))
274275
} else {
275-
d.incrObjectsFailed(1)
276+
atomic.AddInt64(&d.objectsFailed, 1)
276277
}
277278
continue
278279
}
@@ -298,31 +299,17 @@ func (d *directoryDownloader) downloadObject(ctx context.Context, ch chan object
298299
if err != nil {
299300
d.setErr(fmt.Errorf("error when getting object and writing to local file %s: %v", data.path, err))
300301
} else {
301-
d.incrObjectsFailed(1)
302+
atomic.AddInt64(&d.objectsFailed, 1)
302303
}
303304
os.Remove(data.path)
304305
continue
305306
}
306307

307-
d.incrObjectsDownloaded(1)
308+
atomic.AddInt64(&d.objectsDownloaded, 1)
308309
d.emitter.ObjectsTransferred(ctx, n)
309310
}
310311
}
311312

312-
func (d *directoryDownloader) incrObjectsDownloaded(n int) {
313-
d.mu.Lock()
314-
defer d.mu.Unlock()
315-
316-
d.objectsDownloaded += n
317-
}
318-
319-
func (d *directoryDownloader) incrObjectsFailed(n int) {
320-
d.mu.Lock()
321-
defer d.mu.Unlock()
322-
323-
d.objectsFailed += n
324-
}
325-
326313
func (d *directoryDownloader) setErr(err error) {
327314
d.mu.Lock()
328315
defer d.mu.Unlock()

feature/s3/transfermanager/api_op_DownloadObject.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strconv"
1111
"strings"
1212
"sync"
13+
"sync/atomic"
1314
"time"
1415

1516
"github.com/aws/aws-sdk-go-v2/aws"
@@ -518,7 +519,7 @@ func (o *DownloadObjectOutput) mapFromGetObjectOutput(out *s3.GetObjectOutput, c
518519
o.TagCount = out.TagCount
519520
o.VersionID = out.VersionId
520521
o.WebsiteRedirectLocation = out.WebsiteRedirectLocation
521-
o.ResultMetadata = out.ResultMetadata.Clone()
522+
o.ResultMetadata = out.ResultMetadata
522523
}
523524

524525
// DownloadObject downloads an object from S3, intelligently splitting large
@@ -793,7 +794,7 @@ func (d *downloader) tryDownloadChunk(ctx context.Context, params *s3.GetObjectI
793794
return nil, &errReadingBody{err: err}
794795
}
795796

796-
d.incrWritten(n)
797+
atomic.AddInt64(&d.written, n)
797798
d.emitter.BytesTransferred(ctx, n)
798799
return out, nil
799800
}

feature/s3/transfermanager/api_op_GetObject.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"strconv"
88
"strings"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/aws/aws-sdk-go-v2/aws"
@@ -531,7 +532,7 @@ func (o *GetObjectOutput) mapFromGetObjectOutput(out *s3.GetObjectOutput, checks
531532
o.TagCount = out.TagCount
532533
o.VersionID = out.VersionId
533534
o.WebsiteRedirectLocation = out.WebsiteRedirectLocation
534-
o.ResultMetadata = out.ResultMetadata.Clone()
535+
o.ResultMetadata = out.ResultMetadata
535536
}
536537

537538
func (o *GetObjectOutput) mapFromHeadObjectOutput(out *s3.HeadObjectOutput, checksumMode types.ChecksumMode, enableChecksumValidation bool, body *concurrentReader) {
@@ -579,7 +580,7 @@ func (o *GetObjectOutput) mapFromHeadObjectOutput(out *s3.HeadObjectOutput, chec
579580
o.TagCount = out.TagCount
580581
o.VersionID = out.VersionId
581582
o.WebsiteRedirectLocation = out.WebsiteRedirectLocation
582-
o.ResultMetadata = out.ResultMetadata.Clone()
583+
o.ResultMetadata = out.ResultMetadata
583584
}
584585

585586
// GetObject downloads an object from S3, intelligently splitting large
@@ -648,7 +649,7 @@ func (g *getter) get(ctx context.Context) (out *GetObjectOutput, err error) {
648649
capacity := sectionParts
649650
r.sectionParts = sectionParts
650651
r.partSize = partSize
651-
r.setCapacity(min(capacity, partsCount))
652+
atomic.StoreInt32(&r.capacity, min(capacity, partsCount))
652653
r.partsCount = partsCount
653654
} else {
654655
out, err := g.options.S3.HeadObject(ctx, &s3.HeadObjectInput{
@@ -672,7 +673,7 @@ func (g *getter) get(ctx context.Context) (out *GetObjectOutput, err error) {
672673
sectionParts := int32(max(1, g.options.GetBufferSize/g.options.PartSizeBytes))
673674
capacity := min(sectionParts, partsCount)
674675
r.partSize = g.options.PartSizeBytes
675-
r.setCapacity(capacity)
676+
atomic.StoreInt32(&r.capacity, capacity)
676677
r.partsCount = partsCount
677678
r.sectionParts = sectionParts
678679
r.totalBytes = total

feature/s3/transfermanager/api_op_UploadDirectory.go

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"path/filepath"
88
"sync"
9+
"sync/atomic"
910

1011
"github.com/aws/aws-sdk-go-v2/aws"
1112
)
@@ -89,12 +90,12 @@ type UploadDirectoryOutput struct {
8990
// Total number of objects successfully uploaded
9091
// this value might not be the real number of success if user passed a customized
9192
// failure policy in input
92-
ObjectsUploaded int
93+
ObjectsUploaded int64
9394

9495
// Total number of objects failed to upload
9596
// this value might not be the real number of failure is user passed a customized
9697
// failure policy in input
97-
ObjectsFailed int
98+
ObjectsFailed int64
9899
}
99100

100101
// UploadDirectory traverses a local directory recursively/non-recursively and intelligently
@@ -127,8 +128,8 @@ type directoryUploader struct {
127128
in *UploadDirectoryInput
128129
failurePolicy UploadDirectoryFailurePolicy
129130

130-
filesUploaded int
131-
filesFailed int
131+
filesUploaded int64
132+
filesFailed int64
132133
traversed map[string]interface{}
133134

134135
err error
@@ -378,33 +379,19 @@ func (u *directoryUploader) uploadFile(ctx context.Context, ch chan fileEntry) {
378379
u.setErr(fmt.Errorf("error when uploading file %s: %v", data.path, err))
379380
} else {
380381
// this failed object is ignored, just increase the failure count
381-
u.incrFilesFailed(1)
382+
atomic.AddInt64(&u.filesFailed, 1)
382383
}
383384
continue
384385
}
385386

386387
u.progressOnce.Do(func() {
387388
u.emitter.Start(ctx, u.in)
388389
})
389-
u.incrFilesUploaded(1)
390+
atomic.AddInt64(&u.filesUploaded, 1)
390391
u.emitter.ObjectsTransferred(ctx, aws.ToInt64(out.ContentLength))
391392
}
392393
}
393394

394-
func (u *directoryUploader) incrFilesUploaded(n int) {
395-
u.mu.Lock()
396-
defer u.mu.Unlock()
397-
398-
u.filesUploaded += n
399-
}
400-
401-
func (u *directoryUploader) incrFilesFailed(n int) {
402-
u.mu.Lock()
403-
defer u.mu.Unlock()
404-
405-
u.filesFailed += n
406-
}
407-
408395
func (u *directoryUploader) setErr(err error) {
409396
u.mu.Lock()
410397
defer u.mu.Unlock()

feature/s3/transfermanager/api_op_UploadObject.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -738,7 +738,7 @@ func (o *UploadObjectOutput) mapFromPutObjectOutput(out *s3.PutObjectOutput, buc
738738
o.SSEKMSKeyID = out.SSEKMSKeyId
739739
o.ServerSideEncryption = types.ServerSideEncryption(out.ServerSideEncryption)
740740
o.VersionID = out.VersionId
741-
o.ResultMetadata = out.ResultMetadata.Clone()
741+
o.ResultMetadata = out.ResultMetadata
742742
}
743743

744744
func (o *UploadObjectOutput) mapFromCompleteMultipartUploadOutput(out *s3.CompleteMultipartUploadOutput, bucket, uploadID *string, contentLength int64, completedParts completedParts) {
@@ -761,7 +761,7 @@ func (o *UploadObjectOutput) mapFromCompleteMultipartUploadOutput(out *s3.Comple
761761
o.SSEKMSKeyID = out.SSEKMSKeyId
762762
o.ServerSideEncryption = types.ServerSideEncryption(out.ServerSideEncryption)
763763
o.VersionID = out.VersionId
764-
o.ResultMetadata = out.ResultMetadata.Clone()
764+
o.ResultMetadata = out.ResultMetadata
765765
}
766766

767767
// UploadObject uploads an object to S3, intelligently buffering large

feature/s3/transfermanager/concurrent_reader.go

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/aws/aws-sdk-go-v2/service/s3"
1111
"io"
1212
"sync"
13+
"sync/atomic"
1314
)
1415

1516
// concurrentReader receives object parts from working goroutines, composes those chunks in order and read
@@ -81,7 +82,7 @@ func (r *concurrentReader) Read(p []byte) (int, error) {
8182
break
8283
}
8384

84-
if r.index == r.getCapacity() {
85+
if r.index == atomic.LoadInt32(&r.capacity) {
8586
continue
8687
}
8788

@@ -202,7 +203,7 @@ func (r *concurrentReader) read(p []byte) (int, error) {
202203

203204
partSize := r.partSize
204205
minIndex := int32(r.written / partSize)
205-
maxIndex := min(int32((r.written+int64(cap(p))-1)/partSize), r.getCapacity()-1)
206+
maxIndex := min(int32((r.written+int64(cap(p))-1)/partSize), atomic.LoadInt32(&r.capacity)-1)
206207
for i := minIndex; i <= maxIndex; i++ {
207208
if e := r.getErr(); e != nil && e != io.EOF {
208209
r.clean()
@@ -223,9 +224,9 @@ func (r *concurrentReader) read(p []byte) (int, error) {
223224
if c.cur >= c.length {
224225
r.readCount++
225226
delete(r.buf, i)
226-
if r.readCount == r.getCapacity() {
227-
capacity := min(r.getCapacity()+r.sectionParts, r.partsCount)
228-
r.setCapacity(capacity)
227+
if r.readCount == atomic.LoadInt32(&r.capacity) {
228+
capacity := min(atomic.LoadInt32(&r.capacity)+r.sectionParts, r.partsCount)
229+
atomic.StoreInt32(&r.capacity, capacity)
229230
}
230231
if r.readCount >= r.partsCount {
231232
r.setErr(io.EOF)
@@ -234,7 +235,7 @@ func (r *concurrentReader) read(p []byte) (int, error) {
234235
}
235236
}
236237

237-
for r.receiveCount < r.getCapacity() {
238+
for r.receiveCount < atomic.LoadInt32(&r.capacity) {
238239
if e := r.getErr(); e != nil && e != io.EOF {
239240
r.clean()
240241
return written, e
@@ -263,9 +264,9 @@ func (r *concurrentReader) read(p []byte) (int, error) {
263264
r.buf[oc.index] = &oc
264265
} else {
265266
r.readCount++
266-
if r.readCount == r.getCapacity() {
267-
capacity := min(r.getCapacity()+r.sectionParts, r.partsCount)
268-
r.setCapacity(capacity)
267+
if r.readCount == atomic.LoadInt32(&r.capacity) {
268+
capacity := min(atomic.LoadInt32(&r.capacity)+r.sectionParts, r.partsCount)
269+
atomic.StoreInt32(&r.capacity, capacity)
269270
}
270271
if r.readCount >= r.partsCount {
271272
r.setErr(io.EOF)
@@ -276,20 +277,6 @@ func (r *concurrentReader) read(p []byte) (int, error) {
276277
return written, r.getErr()
277278
}
278279

279-
func (r *concurrentReader) setCapacity(n int32) {
280-
r.m.Lock()
281-
defer r.m.Unlock()
282-
283-
r.capacity = n
284-
}
285-
286-
func (r *concurrentReader) getCapacity() int32 {
287-
r.m.Lock()
288-
defer r.m.Unlock()
289-
290-
return r.capacity
291-
}
292-
293280
func (r *concurrentReader) setDone(done bool) {
294281
r.m.Lock()
295282
defer r.m.Unlock()

feature/s3/transfermanager/download_directory_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ func TestDownloadDirectory(t *testing.T) {
5959
expectKeys []string
6060
expectFiles []string
6161
expectErr string
62-
expectObjectsDownloaded int
63-
expectObjectsFailed int
62+
expectObjectsDownloaded int64
63+
expectObjectsFailed int64
6464
listenerValidationFn func(*testing.T, *mockDirectoryListener, any, any, error)
6565
}{
6666
"single object": {

feature/s3/transfermanager/mapping_reference_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,6 @@ func mockValue(v reflect.Value, field string, legacyTypes map[string]interface{}
689689
if !ok {
690690
panic(fmt.Sprintf("need to handle %v for field %s", v.Type().Elem().Kind(), field))
691691
}
692-
//indirV := reflect.Indirect(reflect.ValueOf(&vv))
693692
v.Set(reflect.ValueOf(vv))
694693
default:
695694
panic(fmt.Sprintf("need to handle %v", v.Type().Elem().Kind()))

feature/s3/transfermanager/setup_integ_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ type uploadDirectoryTestData struct {
389389
Source string
390390
Recursive bool
391391
KeyPrefix string
392-
ExpectFilesUploaded int
392+
ExpectFilesUploaded int64
393393
ExpectKeys []string
394394
ExpectError string
395395
}
@@ -478,7 +478,7 @@ func testUploadDirectory(t *testing.T, bucket string, testData uploadDirectoryTe
478478
type downloadDirectoryTestData struct {
479479
ObjectsSize map[string]int64
480480
KeyPrefix string
481-
ExpectObjectsDownloaded int
481+
ExpectObjectsDownloaded int64
482482
ExpectFiles []string
483483
ExpectError string
484484
}

feature/s3/transfermanager/upload_directory_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,14 @@ func TestUploadDirectory(t *testing.T) {
4949
recursive bool
5050
keyPrefix string
5151
filter FileFilter
52-
s3Delimiter string
5352
callback PutRequestCallback
5453
failurePolicy UploadDirectoryFailurePolicy
5554
putobjectFunc func(*s3testing.TransferManagerLoggingClient, *s3.PutObjectInput) (*s3.PutObjectOutput, error)
5655
preprocessFunc func(string) (func() error, error)
5756
expectKeys []string
5857
expectErr string
59-
expectFilesUploaded int
60-
expectFilesFailed int
58+
expectFilesUploaded int64
59+
expectFilesFailed int64
6160
listenerValidationFn func(*testing.T, *mockDirectoryListener, any, any, error)
6261
}{
6362
"single file recursively": {

0 commit comments

Comments
 (0)