Skip to content

Commit 8e6309d

Browse files
committed
use one cb and use goroutines to call fileRef
1 parent 9eeedeb commit 8e6309d

File tree

1 file changed

+71
-54
lines changed

1 file changed

+71
-54
lines changed

cmd/gateway/zcn/gateway-zcn.go

Lines changed: 71 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"os"
1212
"path/filepath"
1313
"strings"
14+
"sync"
1415
"time"
1516

1617
"github.com/0chain/gosdk/zboxcore/sdk"
@@ -548,58 +549,77 @@ func (zob *zcnObjects) PutMultipleObjects(
548549
}
549550
operationRequests := make([]sdk.OperationRequest, total)
550551
objectInfo := make([]minio.ObjectInfo, total)
551-
cbs := make([]*statusCB, total)
552+
cb := &statusCB{
553+
doneCh: make(chan struct{}, 1),
554+
errCh: make(chan error, 1),
555+
}
556+
557+
var wg sync.WaitGroup
558+
errCh := make(chan error, total)
552559
for i := 0; i < total; i++ {
553-
var ref *sdk.ORef
554-
ref, err := getSingleRegularRef(zob.alloc, remotePaths[i])
555-
if err != nil {
556-
if !isPathNoExistError(err) {
557-
return nil, []error{err}
560+
wg.Add(1)
561+
go func(idx int) {
562+
defer wg.Done()
563+
var ref *sdk.ORef
564+
ref, err := getSingleRegularRef(zob.alloc, remotePaths[idx])
565+
if err != nil {
566+
if !isPathNoExistError(err) {
567+
errCh <- err
568+
return
569+
}
558570
}
559-
}
560571

561-
var isUpdate bool
562-
if ref != nil {
563-
isUpdate = true
564-
}
572+
var isUpdate bool
573+
if ref != nil {
574+
isUpdate = true
575+
}
565576

566-
contentType := opts[i].UserDefined["content-type"]
567-
if contentType == "" {
568-
contentType = "application/octet-stream"
569-
}
577+
contentType := opts[idx].UserDefined["content-type"]
578+
if contentType == "" {
579+
contentType = "application/octet-stream"
580+
}
570581

571-
_, fileName := filepath.Split(remotePaths[i])
572-
fileMeta := sdk.FileMeta{
573-
Path: "",
574-
RemotePath: remotePaths[i],
575-
ActualSize: r[i].Size(),
576-
MimeType: contentType,
577-
RemoteName: fileName,
578-
}
582+
_, fileName := filepath.Split(remotePaths[idx])
583+
fileMeta := sdk.FileMeta{
584+
Path: "",
585+
RemotePath: remotePaths[idx],
586+
ActualSize: r[idx].Size(),
587+
MimeType: contentType,
588+
RemoteName: fileName,
589+
}
579590

580-
cbs[i] = &statusCB{
581-
doneCh: make(chan struct{}, 1),
582-
errCh: make(chan error, 1),
583-
}
584-
options := []sdk.ChunkedUploadOption{
585-
sdk.WithStatusCallback(cbs[i]),
586-
sdk.WithEncrypt(false),
587-
}
588-
operationRequests[i] = sdk.OperationRequest{
589-
FileMeta: fileMeta,
590-
FileReader: newMinioReader(r[i]),
591-
OperationType: constants.FileOperationInsert,
592-
Opts: options,
593-
}
594-
if isUpdate {
595-
operationRequests[i].OperationType = constants.FileOperationUpdate
596-
}
597-
objectInfo[i] = minio.ObjectInfo{
598-
Bucket: bucket,
599-
Name: objects[i],
600-
Size: r[i].Size(),
601-
ModTime: time.Now(),
602-
}
591+
options := []sdk.ChunkedUploadOption{
592+
sdk.WithStatusCallback(cb),
593+
sdk.WithEncrypt(false),
594+
}
595+
operationRequests[idx] = sdk.OperationRequest{
596+
FileMeta: fileMeta,
597+
FileReader: newMinioReader(r[idx]),
598+
OperationType: constants.FileOperationInsert,
599+
Opts: options,
600+
}
601+
if isUpdate {
602+
operationRequests[idx].OperationType = constants.FileOperationUpdate
603+
}
604+
objectInfo[idx] = minio.ObjectInfo{
605+
Bucket: bucket,
606+
Name: objects[idx],
607+
Size: r[idx].Size(),
608+
ModTime: time.Now(),
609+
}
610+
}(i)
611+
}
612+
wg.Wait()
613+
close(errCh)
614+
615+
var errs []error
616+
for err := range errCh {
617+
errs = append(errs, err)
618+
}
619+
620+
if errs != nil && len(errs) > 0 {
621+
logger.Error("error while getting file ref and creating operationRequests.")
622+
return nil, errs
603623
}
604624

605625
errn := zob.alloc.DoMultiOperation(operationRequests)
@@ -608,16 +628,13 @@ func (zob *zcnObjects) PutMultipleObjects(
608628
return nil, []error{errn}
609629
}
610630

611-
var errs []error
612-
for i, cb := range cbs {
613-
select {
614-
case <-cb.doneCh:
615-
case err := <-cb.errCh:
616-
errs = append(errs, fmt.Errorf("error for object %s: %v", objects[i], err)) // store the error in case of unsuccessful upload for ith object
617-
}
631+
select {
632+
case <-cb.doneCh:
633+
case err := <-cb.errCh:
634+
return nil, []error{err}
618635
}
619636

620-
return objectInfo, errs
637+
return objectInfo, nil
621638
}
622639
func (zob *zcnObjects) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
623640
var srcRemotePath, dstRemotePath string

0 commit comments

Comments
 (0)