diff --git a/.gitignore b/.gitignore index 646104a8..5d635a69 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ build/ _instances/ _coverage_/ __pycache__/ -*.py[cod] \ No newline at end of file +*.py[cod] +vendor/ \ No newline at end of file diff --git a/pkg/backup/upload.go b/pkg/backup/upload.go index 9901b237..f58ee1d7 100644 --- a/pkg/backup/upload.go +++ b/pkg/backup/upload.go @@ -5,11 +5,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/Altinity/clickhouse-backup/pkg/clickhouse" - "github.com/Altinity/clickhouse-backup/pkg/custom" - "github.com/Altinity/clickhouse-backup/pkg/resumable" - "github.com/Altinity/clickhouse-backup/pkg/status" - "github.com/eapache/go-resiliency/retrier" "io" "os" "path" @@ -20,6 +15,12 @@ import ( "sync/atomic" "time" + "github.com/Altinity/clickhouse-backup/pkg/clickhouse" + "github.com/Altinity/clickhouse-backup/pkg/custom" + "github.com/Altinity/clickhouse-backup/pkg/resumable" + "github.com/Altinity/clickhouse-backup/pkg/status" + "github.com/eapache/go-resiliency/retrier" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" diff --git a/pkg/config/config.go b/pkg/config/config.go index ae600df5..9273ed58 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -3,13 +3,14 @@ package config import ( "crypto/tls" "fmt" - s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "math" "os" "runtime" "strings" "time" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/apex/log" "github.com/kelseyhightower/envconfig" "github.com/urfave/cli" @@ -76,6 +77,9 @@ type GCSConfig struct { StorageClass string `yaml:"storage_class" envconfig:"GCS_STORAGE_CLASS"` ObjectLabels map[string]string `yaml:"object_labels" envconfig:"GCS_OBJECT_LABELS"` CustomStorageClassMap map[string]string `yaml:"custom_storage_class_map" envconfig:"GCS_CUSTOM_STORAGE_CLASS_MAP"` + // NOTE: ClientPoolSize should be atleast 2 times bigger than + // UploadConcurrency or DownloadConcurrency in each upload and download case + ClientPoolSize int `yaml:"client_pool_size" envconfig:"GCS_CLIENT_POOL_SIZE"` } // AzureBlobConfig - Azure Blob settings section @@ -544,6 +548,7 @@ func DefaultConfig() *Config { CompressionLevel: 1, CompressionFormat: "tar", StorageClass: "STANDARD", + ClientPoolSize: 500, }, COS: COSConfig{ RowURL: "", diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 8801742c..93c71f2c 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -5,14 +5,16 @@ import ( "encoding/base64" "errors" "fmt" - "google.golang.org/api/iterator" "io" "net/http" "path" "strings" "time" + "google.golang.org/api/iterator" + "github.com/Altinity/clickhouse-backup/pkg/config" + pool "github.com/jolestar/go-commons-pool/v2" "google.golang.org/api/option/internaloption" "cloud.google.com/go/storage" @@ -23,14 +25,19 @@ import ( // GCS - presents methods for manipulate data on GCS type GCS struct { - client *storage.Client - Config *config.GCSConfig + client *storage.Client + Config *config.GCSConfig + clientPool *pool.ObjectPool } type debugGCSTransport struct { base http.RoundTripper } +type clientObject struct { + Client *storage.Client +} + func (w debugGCSTransport) RoundTrip(r *http.Request) (*http.Response, error) { logMsg := fmt.Sprintf(">>> [GCS_REQUEST] >>> %v %v\n", r.Method, r.URL.String()) for h, values := range r.Header { @@ -96,6 +103,30 @@ func (gcs *GCS) Connect(ctx context.Context) error { clientOptions = append(clientOptions, option.WithHTTPClient(debugClient)) } + factory := pool.NewPooledObjectFactory( + func(context.Context) (interface{}, error) { + sClient, err := storage.NewClient(ctx, clientOptions...) + if err != nil { + return nil, err + } + return &clientObject{ + Client: sClient, + }, + nil + }, func(ctx context.Context, object *pool.PooledObject) error { + // destroy + return object.Object.(*clientObject).Client.Close() + }, func(ctx context.Context, object *pool.PooledObject) bool { + return true + }, func(ctx context.Context, object *pool.PooledObject) error { + // activate do nothing + return nil + }, func(ctx context.Context, object *pool.PooledObject) error { + // passivate do nothing + return nil + }) + gcs.clientPool = pool.NewObjectPoolWithDefaultConfig(ctx, factory) + gcs.clientPool.Config.MaxTotal = gcs.Config.ClientPoolSize gcs.client, err = storage.NewClient(ctx, clientOptions...) return err } @@ -105,6 +136,13 @@ func (gcs *GCS) Close(ctx context.Context) error { } func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, process func(ctx context.Context, r RemoteFile) error) error { + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return err + } + pClient := pClientObj.(*clientObject).Client + rootPath := path.Join(gcs.Config.Path, gcsPath) prefix := rootPath + "/" if rootPath == "/" { @@ -114,22 +152,25 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces if !recursive { delimiter = "/" } - it := gcs.client.Bucket(gcs.Config.Bucket).Objects(ctx, &storage.Query{ + it := pClient.Bucket(gcs.Config.Bucket).Objects(ctx, &storage.Query{ Prefix: prefix, Delimiter: delimiter, }) for { object, err := it.Next() if errors.Is(err, iterator.Done) { + gcs.clientPool.ReturnObject(ctx, pClientObj) return nil } if err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return err } if object.Prefix != "" { if err := process(ctx, &gcsFile{ name: strings.TrimPrefix(object.Prefix, rootPath), }); err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return err } continue @@ -139,17 +180,26 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces lastModified: object.Updated, name: strings.TrimPrefix(object.Name, rootPath), }); err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return err } } } func (gcs *GCS) GetFileReader(ctx context.Context, key string) (io.ReadCloser, error) { - obj := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)) + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return nil, err + } + pClient := pClientObj.(*clientObject).Client + obj := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)) reader, err := obj.NewReader(ctx) if err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return nil, err } + gcs.clientPool.ReturnObject(ctx, pClientObj) return reader, nil } @@ -158,8 +208,15 @@ func (gcs *GCS) GetFileReaderWithLocalPath(ctx context.Context, key, _ string) ( } func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error { + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return err + } + pClient := pClientObj.(*clientObject).Client key = path.Join(gcs.Config.Path, key) - obj := gcs.client.Bucket(gcs.Config.Bucket).Object(key) + obj := pClient.Bucket(gcs.Config.Bucket).Object(key) + writer := obj.NewWriter(ctx) writer.StorageClass = gcs.Config.StorageClass if len(gcs.Config.ObjectLabels) > 0 { @@ -168,21 +225,32 @@ func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error defer func() { if err := writer.Close(); err != nil { log.Warnf("can't close writer: %+v", err) + gcs.clientPool.InvalidateObject(ctx, pClientObj) + return } + gcs.clientPool.ReturnObject(ctx, pClientObj) }() buffer := make([]byte, 512*1024) - _, err := io.CopyBuffer(writer, r, buffer) + _, err = io.CopyBuffer(writer, r, buffer) return err } func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) { - objAttr, err := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx) + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return nil, err + } + pClient := pClientObj.(*clientObject).Client + objAttr, err := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx) if err != nil { if errors.Is(err, storage.ErrObjectNotExist) { return nil, ErrNotFound } + gcs.clientPool.InvalidateObject(ctx, pClientObj) return nil, err } + gcs.clientPool.ReturnObject(ctx, pClientObj) return &gcsFile{ size: objAttr.Size, lastModified: objAttr.Updated, @@ -191,8 +259,20 @@ func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) { } func (gcs *GCS) deleteKey(ctx context.Context, key string) error { - object := gcs.client.Bucket(gcs.Config.Bucket).Object(key) - return object.Delete(ctx) + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return err + } + pClient := pClientObj.(*clientObject).Client + object := pClient.Bucket(gcs.Config.Bucket).Object(key) + err = object.Delete(ctx) + if err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) + return err + } + gcs.clientPool.ReturnObject(ctx, pClientObj) + return nil } func (gcs *GCS) DeleteFile(ctx context.Context, key string) error { @@ -206,17 +286,26 @@ func (gcs *GCS) DeleteFileFromObjectDiskBackup(ctx context.Context, key string) } func (gcs *GCS) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) { + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return 0, err + } + pClient := pClientObj.(*clientObject).Client dstKey = path.Join(gcs.Config.ObjectDiskPath, dstKey) - src := gcs.client.Bucket(srcBucket).Object(srcKey) - dst := gcs.client.Bucket(gcs.Config.Bucket).Object(dstKey) + src := pClient.Bucket(srcBucket).Object(srcKey) + dst := pClient.Bucket(gcs.Config.Bucket).Object(dstKey) attrs, err := src.Attrs(ctx) if err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return 0, err } if _, err = dst.CopierFrom(src).Run(ctx); err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return 0, err } log.Debugf("GCS->CopyObject %s/%s -> %s/%s", srcBucket, srcKey, gcs.Config.Bucket, dstKey) + gcs.clientPool.ReturnObject(ctx, pClientObj) return attrs.Size, nil }