Skip to content

Commit

Permalink
Merge pull request #51 from 0chain/sunilmhta/gosdk-multiupload
Browse files Browse the repository at this point in the history
Adding a new API for PutMultipleObjects in ObjectLayer and implementi…
  • Loading branch information
sunilmhta authored Aug 1, 2023
2 parents e55cc17 + 8e6309d commit c2fdd01
Show file tree
Hide file tree
Showing 14 changed files with 640 additions and 68 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ go mod tidy
go build .
export MINIO_ROOT_USER=someminiouser
export MINIO_ROOT_PASSWORD=someminiopassword
./zs3server gateway zcn --configDir /path/to/config/dir
> Note: allocation and configDir both are optional. By default configDir takes ~/.zcn as configDir and if allocation is not provided in command then it will look for allocation.txt file in configDir directory.
./minio gateway zcn --configDir /path/to/config/dir
Note: allocation and configDir both are optional. By default configDir takes ~/.zcn as configDir and if allocation is not provided in command then it will look for allocation.txt file in configDir directory.
```

> If you want to debug on local you might want to build with `-gcflags="all=-N -l"` flag to view all the objects during debugging.
## Run using docker

To build and run minio sevrer component in any machine you will need first to install docker and docker-compose
Expand Down
5 changes: 4 additions & 1 deletion cmd/api-router.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,11 @@ func registerAPIRouter(router *mux.Router) {
collectAPIStats("putobject", maxClients(gz(httpTraceHdrs(api.PutObjectExtractHandler)))))

// PutObject
router.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
router.Methods(http.MethodPut).Path("/{object}").HandlerFunc(
collectAPIStats("putobject", maxClients(gz(httpTraceHdrs(api.PutObjectHandler)))))
// PutMultipleObjects
router.Methods(http.MethodPut).HandlerFunc(
collectAPIStats("putmultipleobjects", maxClients(gz(httpTraceHdrs(api.PutMultipleObjectsHandler))))).Queries("multiupload", "true")

// DeleteObject
router.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(
Expand Down
10 changes: 10 additions & 0 deletions cmd/erasure-server-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2144,3 +2144,13 @@ func (z *erasureServerPools) RestoreTransitionedObject(ctx context.Context, buck

return z.serverPools[idx].RestoreTransitionedObject(ctx, bucket, object, opts)
}

func (z *erasureServerPools) PutMultipleObjects(
ctx context.Context,
bucket string,
objects []string,
r []*PutObjReader,
opts []ObjectOptions,
) (objInfo []ObjectInfo, err []error) {
return nil, []error{NotImplemented{}}
}
10 changes: 10 additions & 0 deletions cmd/fs-v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -1463,3 +1463,13 @@ func (fs *FSObjects) GetRawData(ctx context.Context, volume, file string, fn fun
}
return fn(f, "fs", fs.fsUUID, file, st.Size(), st.ModTime(), st.IsDir())
}

func (fs *FSObjects) PutMultipleObjects(
ctx context.Context,
bucket string,
objects []string,
r []*PutObjReader,
opts []ObjectOptions,
) (objInfo []ObjectInfo, err []error) {
return nil, []error{NotImplemented{}}
}
20 changes: 15 additions & 5 deletions cmd/gateway/azure/gateway-azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,12 +645,12 @@ func (a *azureObjects) DeleteBucket(ctx context.Context, bucket string, opts min
// distinguish between Azure continuation tokens and application
// supplied markers.
//
// - NextMarker in ListObjectsV1 response is constructed by
// prefixing "{minio}" to the Azure continuation token,
// e.g, "{minio}CgRvYmoz"
// - NextMarker in ListObjectsV1 response is constructed by
// prefixing "{minio}" to the Azure continuation token,
// e.g, "{minio}CgRvYmoz"
//
// - Application supplied markers are used as-is to list
// object keys that appear after it in the lexicographical order.
// - Application supplied markers are used as-is to list
// object keys that appear after it in the lexicographical order.
func (a *azureObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result minio.ListObjectsInfo, err error) {
var objects []minio.ObjectInfo
var prefixes []string
Expand Down Expand Up @@ -913,6 +913,16 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r *
return a.GetObjectInfo(ctx, bucket, object, opts)
}

func (a *azureObjects) PutMultipleObjects(
ctx context.Context,
bucket string,
objects []string,
r []*minio.PutObjReader,
opts []minio.ObjectOptions,
) (objInfo []minio.ObjectInfo, err []error) {
return nil, []error{minio.NotImplemented{}}
}

// CopyObject - Copies a blob from source container to destination container.
// Uses Azure equivalent `BlobURL.StartCopyFromURL`.
func (a *azureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
Expand Down
12 changes: 11 additions & 1 deletion cmd/gateway/gcs/gateway-gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,16 @@ func (l *gcsGateway) PutObject(ctx context.Context, bucket string, key string, r
return fromGCSAttrsToObjectInfo(w.Attrs()), nil
}

func (l *gcsGateway) PutMultipleObjects(
ctx context.Context,
bucket string,
objects []string,
r []*minio.PutObjReader,
opts []minio.ObjectOptions,
) (objInfo []minio.ObjectInfo, err []error) {
return nil, []error{minio.NotImplemented{}}
}

// CopyObject - Copies a blob from source container to destination container.
func (l *gcsGateway) CopyObject(ctx context.Context, srcBucket string, srcObject string, destBucket string, destObject string,
srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (minio.ObjectInfo, error) {
Expand Down Expand Up @@ -1159,7 +1169,7 @@ func (l *gcsGateway) GetMultipartInfo(ctx context.Context, bucket, object, uploa
return result, nil
}

// ListObjectParts returns all object parts for specified object in specified bucket
// ListObjectParts returns all object parts for specified object in specified bucket
func (l *gcsGateway) ListObjectParts(ctx context.Context, bucket string, key string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (minio.ListPartsInfo, error) {
it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{
Prefix: path.Join(gcsMinioMultipartPathV1, uploadID),
Expand Down
10 changes: 10 additions & 0 deletions cmd/gateway/hdfs/gateway-hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,16 @@ func (n *hdfsObjects) PutObject(ctx context.Context, bucket string, object strin
}, nil
}

func (n *hdfsObjects) PutMultipleObjects(
ctx context.Context,
bucket string,
objects []string,
r []*minio.PutObjReader,
opts []minio.ObjectOptions,
) (objInfo []minio.ObjectInfo, err []error) {
return nil, []error{minio.NotImplemented{}}
}

func (n *hdfsObjects) NewMultipartUpload(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (uploadID string, err error) {
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
if err != nil {
Expand Down
28 changes: 19 additions & 9 deletions cmd/gateway/s3/gateway-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,21 @@ func randString(n int, src rand.Source, prefix string) string {
}

// Chains all credential types, in the following order:
// - AWS env vars (i.e. AWS_ACCESS_KEY_ID)
// - AWS creds file (i.e. AWS_SHARED_CREDENTIALS_FILE or ~/.aws/credentials)
// - Static credentials provided by user (i.e. MINIO_ROOT_USER/MINIO_ACCESS_KEY)
// - AWS env vars (i.e. AWS_ACCESS_KEY_ID)
// - AWS creds file (i.e. AWS_SHARED_CREDENTIALS_FILE or ~/.aws/credentials)
// - Static credentials provided by user (i.e. MINIO_ROOT_USER/MINIO_ACCESS_KEY)
var defaultProviders = []credentials.Provider{
&credentials.EnvAWS{},
&credentials.FileAWSCredentials{},
}

// Chains all credential types, in the following order:
// - AWS env vars (i.e. AWS_ACCESS_KEY_ID)
// - AWS creds file (i.e. AWS_SHARED_CREDENTIALS_FILE or ~/.aws/credentials)
// - IAM profile based credentials. (performs an HTTP
// call to a pre-defined endpoint, only valid inside
// configured ec2 instances)
// - Static credentials provided by user (i.e. MINIO_ROOT_USER/MINIO_ACCESS_KEY)
// - AWS env vars (i.e. AWS_ACCESS_KEY_ID)
// - AWS creds file (i.e. AWS_SHARED_CREDENTIALS_FILE or ~/.aws/credentials)
// - IAM profile based credentials. (performs an HTTP
// call to a pre-defined endpoint, only valid inside
// configured ec2 instances)
// - Static credentials provided by user (i.e. MINIO_ROOT_USER/MINIO_ACCESS_KEY)
var defaultAWSCredProviders = []credentials.Provider{
&credentials.EnvAWS{},
&credentials.FileAWSCredentials{},
Expand Down Expand Up @@ -535,6 +535,16 @@ func (l *s3Objects) PutObject(ctx context.Context, bucket string, object string,
return minio.FromMinioClientObjectInfo(bucket, oi), nil
}

func (l *s3Objects) PutMultipleObjects(
ctx context.Context,
bucket string,
objects []string,
r []*minio.PutObjReader,
opts []minio.ObjectOptions,
) (objInfo []minio.ObjectInfo, err []error) {
return nil, []error{minio.NotImplemented{}}
}

// CopyObject copies an object from source bucket to a destination bucket.
func (l *s3Objects) CopyObject(ctx context.Context, srcBucket string, srcObject string, dstBucket string, dstObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
if srcOpts.CheckPrecondFn != nil && srcOpts.CheckPrecondFn(srcInfo) {
Expand Down
116 changes: 116 additions & 0 deletions cmd/gateway/zcn/gateway-zcn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"errors"
"fmt"
"github.com/0chain/gosdk/constants"
"github.com/minio/minio/internal/logger"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/0chain/gosdk/zboxcore/sdk"
Expand Down Expand Up @@ -520,6 +523,119 @@ func (zob *zcnObjects) PutObject(ctx context.Context, bucket, object string, r *
return
}

func (zob *zcnObjects) PutMultipleObjects(
ctx context.Context,
bucket string,
objects []string,
r []*minio.PutObjReader,
opts []minio.ObjectOptions,
) ([]minio.ObjectInfo, []error) {
total := len(objects)
if total <= 0 {
return nil, []error{fmt.Errorf("no files to upload")}
}

if total != len(r) || total != len(opts) {
return nil, []error{fmt.Errorf("length mismatch of objects with file readers or with options")}
}

remotePaths := make([]string, total)
for i, object := range objects {
if bucket == rootBucketName {
remotePaths[i] = filepath.Join(rootPath, object)
} else {
remotePaths[i] = filepath.Join(rootPath, bucket, object)
}
}
operationRequests := make([]sdk.OperationRequest, total)
objectInfo := make([]minio.ObjectInfo, total)
cb := &statusCB{
doneCh: make(chan struct{}, 1),
errCh: make(chan error, 1),
}

var wg sync.WaitGroup
errCh := make(chan error, total)
for i := 0; i < total; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
var ref *sdk.ORef
ref, err := getSingleRegularRef(zob.alloc, remotePaths[idx])
if err != nil {
if !isPathNoExistError(err) {
errCh <- err
return
}
}

var isUpdate bool
if ref != nil {
isUpdate = true
}

contentType := opts[idx].UserDefined["content-type"]
if contentType == "" {
contentType = "application/octet-stream"
}

_, fileName := filepath.Split(remotePaths[idx])
fileMeta := sdk.FileMeta{
Path: "",
RemotePath: remotePaths[idx],
ActualSize: r[idx].Size(),
MimeType: contentType,
RemoteName: fileName,
}

options := []sdk.ChunkedUploadOption{
sdk.WithStatusCallback(cb),
sdk.WithEncrypt(false),
}
operationRequests[idx] = sdk.OperationRequest{
FileMeta: fileMeta,
FileReader: newMinioReader(r[idx]),
OperationType: constants.FileOperationInsert,
Opts: options,
}
if isUpdate {
operationRequests[idx].OperationType = constants.FileOperationUpdate
}
objectInfo[idx] = minio.ObjectInfo{
Bucket: bucket,
Name: objects[idx],
Size: r[idx].Size(),
ModTime: time.Now(),
}
}(i)
}
wg.Wait()
close(errCh)

var errs []error
for err := range errCh {
errs = append(errs, err)
}

if errs != nil && len(errs) > 0 {
logger.Error("error while getting file ref and creating operationRequests.")
return nil, errs
}

errn := zob.alloc.DoMultiOperation(operationRequests)
if errn != nil {
logger.Error("error in sending multioperation to gosdk: %v", errn)
return nil, []error{errn}
}

select {
case <-cb.doneCh:
case err := <-cb.errCh:
return nil, []error{err}
}

return objectInfo, nil
}
func (zob *zcnObjects) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
var srcRemotePath, dstRemotePath string
if srcBucket == rootBucketName {
Expand Down
5 changes: 5 additions & 0 deletions cmd/object-api-interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ type ObjectLayer interface {
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (reader *GetObjectReader, err error)
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
PutMultipleObjects(ctx context.Context, bucket string, object []string, data []*PutObjReader, opts []ObjectOptions) (objInfo []ObjectInfo, err []error)
CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)
DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error)
DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error)
Expand Down Expand Up @@ -267,3 +268,7 @@ func GetObject(ctx context.Context, api ObjectLayer, bucket, object string, star
_, err = xioutil.Copy(writer, reader)
return err
}

// type ExtendedObjectLayer interface {
// PutMultipleObjects(ctx context.Context, bucket string, object []string, data []*PutObjReader, opts []ObjectOptions) (objInfo []ObjectInfo, err []error)
// }
Loading

0 comments on commit c2fdd01

Please sign in to comment.