Skip to content

Commit

Permalink
Merge pull request #727 from minguyen-jumptrading/feature/add_gcs_cli…
Browse files Browse the repository at this point in the history
…ent_pool

add connection to gcs and use different context for upload incase it …
  • Loading branch information
Slach authored Aug 28, 2023
2 parents 0a20046 + 23c7b7f commit a35ebe2
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 19 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ build/
_instances/
_coverage_/
__pycache__/
*.py[cod]
*.py[cod]
vendor/
11 changes: 6 additions & 5 deletions pkg/backup/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/Altinity/clickhouse-backup/pkg/clickhouse"
"github.com/Altinity/clickhouse-backup/pkg/custom"
"github.com/Altinity/clickhouse-backup/pkg/resumable"
"github.com/Altinity/clickhouse-backup/pkg/status"
"github.com/eapache/go-resiliency/retrier"
"io"
"os"
"path"
Expand All @@ -20,6 +15,12 @@ import (
"sync/atomic"
"time"

"github.com/Altinity/clickhouse-backup/pkg/clickhouse"
"github.com/Altinity/clickhouse-backup/pkg/custom"
"github.com/Altinity/clickhouse-backup/pkg/resumable"
"github.com/Altinity/clickhouse-backup/pkg/status"
"github.com/eapache/go-resiliency/retrier"

"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"

Expand Down
7 changes: 6 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package config
import (
"crypto/tls"
"fmt"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"math"
"os"
"runtime"
"strings"
"time"

s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"

"github.com/apex/log"
"github.com/kelseyhightower/envconfig"
"github.com/urfave/cli"
Expand Down Expand Up @@ -76,6 +77,9 @@ type GCSConfig struct {
StorageClass string `yaml:"storage_class" envconfig:"GCS_STORAGE_CLASS"`
ObjectLabels map[string]string `yaml:"object_labels" envconfig:"GCS_OBJECT_LABELS"`
CustomStorageClassMap map[string]string `yaml:"custom_storage_class_map" envconfig:"GCS_CUSTOM_STORAGE_CLASS_MAP"`
// NOTE: ClientPoolSize should be atleast 2 times bigger than
// UploadConcurrency or DownloadConcurrency in each upload and download case
ClientPoolSize int `yaml:"client_pool_size" envconfig:"GCS_CLIENT_POOL_SIZE"`
}

// AzureBlobConfig - Azure Blob settings section
Expand Down Expand Up @@ -544,6 +548,7 @@ func DefaultConfig() *Config {
CompressionLevel: 1,
CompressionFormat: "tar",
StorageClass: "STANDARD",
ClientPoolSize: 500,
},
COS: COSConfig{
RowURL: "",
Expand Down
113 changes: 101 additions & 12 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import (
"encoding/base64"
"errors"
"fmt"
"google.golang.org/api/iterator"
"io"
"net/http"
"path"
"strings"
"time"

"google.golang.org/api/iterator"

"github.com/Altinity/clickhouse-backup/pkg/config"
pool "github.com/jolestar/go-commons-pool/v2"
"google.golang.org/api/option/internaloption"

"cloud.google.com/go/storage"
Expand All @@ -23,14 +25,19 @@ import (

// GCS - presents methods for manipulate data on GCS
type GCS struct {
client *storage.Client
Config *config.GCSConfig
client *storage.Client
Config *config.GCSConfig
clientPool *pool.ObjectPool
}

type debugGCSTransport struct {
base http.RoundTripper
}

type clientObject struct {
Client *storage.Client
}

func (w debugGCSTransport) RoundTrip(r *http.Request) (*http.Response, error) {
logMsg := fmt.Sprintf(">>> [GCS_REQUEST] >>> %v %v\n", r.Method, r.URL.String())
for h, values := range r.Header {
Expand Down Expand Up @@ -96,6 +103,30 @@ func (gcs *GCS) Connect(ctx context.Context) error {
clientOptions = append(clientOptions, option.WithHTTPClient(debugClient))
}

factory := pool.NewPooledObjectFactory(
func(context.Context) (interface{}, error) {
sClient, err := storage.NewClient(ctx, clientOptions...)
if err != nil {
return nil, err
}
return &clientObject{
Client: sClient,
},
nil
}, func(ctx context.Context, object *pool.PooledObject) error {
// destroy
return object.Object.(*clientObject).Client.Close()
}, func(ctx context.Context, object *pool.PooledObject) bool {
return true
}, func(ctx context.Context, object *pool.PooledObject) error {
// activate do nothing
return nil
}, func(ctx context.Context, object *pool.PooledObject) error {
// passivate do nothing
return nil
})
gcs.clientPool = pool.NewObjectPoolWithDefaultConfig(ctx, factory)
gcs.clientPool.Config.MaxTotal = gcs.Config.ClientPoolSize
gcs.client, err = storage.NewClient(ctx, clientOptions...)
return err
}
Expand All @@ -105,6 +136,13 @@ func (gcs *GCS) Close(ctx context.Context) error {
}

func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, process func(ctx context.Context, r RemoteFile) error) error {
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return err
}
pClient := pClientObj.(*clientObject).Client

rootPath := path.Join(gcs.Config.Path, gcsPath)
prefix := rootPath + "/"
if rootPath == "/" {
Expand All @@ -114,22 +152,25 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces
if !recursive {
delimiter = "/"
}
it := gcs.client.Bucket(gcs.Config.Bucket).Objects(ctx, &storage.Query{
it := pClient.Bucket(gcs.Config.Bucket).Objects(ctx, &storage.Query{
Prefix: prefix,
Delimiter: delimiter,
})
for {
object, err := it.Next()
if errors.Is(err, iterator.Done) {
gcs.clientPool.ReturnObject(ctx, pClientObj)
return nil
}
if err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return err
}
if object.Prefix != "" {
if err := process(ctx, &gcsFile{
name: strings.TrimPrefix(object.Prefix, rootPath),
}); err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return err
}
continue
Expand All @@ -139,17 +180,26 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces
lastModified: object.Updated,
name: strings.TrimPrefix(object.Name, rootPath),
}); err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return err
}
}
}

func (gcs *GCS) GetFileReader(ctx context.Context, key string) (io.ReadCloser, error) {
obj := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key))
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return nil, err
}
pClient := pClientObj.(*clientObject).Client
obj := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key))
reader, err := obj.NewReader(ctx)
if err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return nil, err
}
gcs.clientPool.ReturnObject(ctx, pClientObj)
return reader, nil
}

Expand All @@ -158,8 +208,15 @@ func (gcs *GCS) GetFileReaderWithLocalPath(ctx context.Context, key, _ string) (
}

func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error {
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return err
}
pClient := pClientObj.(*clientObject).Client
key = path.Join(gcs.Config.Path, key)
obj := gcs.client.Bucket(gcs.Config.Bucket).Object(key)
obj := pClient.Bucket(gcs.Config.Bucket).Object(key)

writer := obj.NewWriter(ctx)
writer.StorageClass = gcs.Config.StorageClass
if len(gcs.Config.ObjectLabels) > 0 {
Expand All @@ -168,21 +225,32 @@ func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error
defer func() {
if err := writer.Close(); err != nil {
log.Warnf("can't close writer: %+v", err)
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return
}
gcs.clientPool.ReturnObject(ctx, pClientObj)
}()
buffer := make([]byte, 512*1024)
_, err := io.CopyBuffer(writer, r, buffer)
_, err = io.CopyBuffer(writer, r, buffer)
return err
}

func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) {
objAttr, err := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx)
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return nil, err
}
pClient := pClientObj.(*clientObject).Client
objAttr, err := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, ErrNotFound
}
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return nil, err
}
gcs.clientPool.ReturnObject(ctx, pClientObj)
return &gcsFile{
size: objAttr.Size,
lastModified: objAttr.Updated,
Expand All @@ -191,8 +259,20 @@ func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) {
}

func (gcs *GCS) deleteKey(ctx context.Context, key string) error {
object := gcs.client.Bucket(gcs.Config.Bucket).Object(key)
return object.Delete(ctx)
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return err
}
pClient := pClientObj.(*clientObject).Client
object := pClient.Bucket(gcs.Config.Bucket).Object(key)
err = object.Delete(ctx)
if err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return err
}
gcs.clientPool.ReturnObject(ctx, pClientObj)
return nil
}

func (gcs *GCS) DeleteFile(ctx context.Context, key string) error {
Expand All @@ -206,17 +286,26 @@ func (gcs *GCS) DeleteFileFromObjectDiskBackup(ctx context.Context, key string)
}

func (gcs *GCS) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) {
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return 0, err
}
pClient := pClientObj.(*clientObject).Client
dstKey = path.Join(gcs.Config.ObjectDiskPath, dstKey)
src := gcs.client.Bucket(srcBucket).Object(srcKey)
dst := gcs.client.Bucket(gcs.Config.Bucket).Object(dstKey)
src := pClient.Bucket(srcBucket).Object(srcKey)
dst := pClient.Bucket(gcs.Config.Bucket).Object(dstKey)
attrs, err := src.Attrs(ctx)
if err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return 0, err
}
if _, err = dst.CopierFrom(src).Run(ctx); err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return 0, err
}
log.Debugf("GCS->CopyObject %s/%s -> %s/%s", srcBucket, srcKey, gcs.Config.Bucket, dstKey)
gcs.clientPool.ReturnObject(ctx, pClientObj)
return attrs.Size, nil
}

Expand Down

0 comments on commit a35ebe2

Please sign in to comment.