forked from viant/afsc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
copy.go
82 lines (77 loc) · 2.25 KB
/
copy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package gs
import (
"context"
"fmt"
"github.com/viant/afs/file"
"github.com/viant/afs/option"
"github.com/viant/afs/storage"
gstorage "google.golang.org/api/storage/v1"
"path"
"strings"
)
const copySizeThreshold = 100 * 1024 * 1024
func (s *storager) Copy(ctx context.Context, sourcePath, destBucket, destPath string, options ...storage.Option) error {
return s.copy(ctx, sourcePath, destBucket, destPath, options)
}
func (s *storager) copy(ctx context.Context, sourcePath, destBucket, destPath string, options []storage.Option) error {
sourcePath = strings.Trim(sourcePath, "/")
destPath = strings.Trim(destPath, "/")
objectInfo, err := s.get(ctx, sourcePath, options)
if isNotFound(err) {
objectOpt := &option.ObjectKind{}
if _, ok := option.Assign(options, &objectOpt); ok && objectOpt.File {
return err
}
infoList, err := s.List(ctx, sourcePath)
if err != nil {
return err
}
if len(infoList) == 0 {
return fmt.Errorf("%v: not found", sourcePath)
}
for i := 1; i < len(infoList); i++ {
name := infoList[i].Name()
if err = s.Move(ctx, path.Join(sourcePath, name), destBucket, path.Join(destPath, name)); err != nil {
return err
}
}
return nil
}
if err != nil {
return err
}
info, ok := objectInfo.(*file.Info)
if !ok {
return fmt.Errorf("unable copy, expected: %T, but had: %v", info, objectInfo)
}
object, _ := info.Source.(*gstorage.Object)
object.Name = destPath
if info.Size() < copySizeThreshold {
call := s.Objects.Copy(s.bucket, sourcePath, destBucket, destPath, object)
call.Context(ctx)
s.setGeneration(func(generation int64) {
call.IfGenerationMatch(generation)
}, func(generation int64) {
call.IfGenerationNotMatch(generation)
}, options)
return runWithRetries(ctx, func() error {
_, err = call.Do()
return err
}, s)
}
call := s.Objects.Rewrite(s.bucket, sourcePath, destBucket, destPath, object)
call.Context(ctx)
s.setGeneration(func(generation int64) {
call.IfGenerationMatch(generation)
}, func(generation int64) {
call.IfGenerationNotMatch(generation)
}, options)
return runWithRetries(ctx, func() error {
output, err := call.Do()
for err == nil && output.RewriteToken != "" {
call.RewriteToken(output.RewriteToken)
output, err = call.Do()
}
return err
}, s)
}