Skip to content

Commit

Permalink
Merge pull request #753 from minguyen-jumptrading/feature/fixhangingp…
Browse files Browse the repository at this point in the history
…roblem

Fix backup hanging.
  • Loading branch information
Slach authored Oct 3, 2023
2 parents 22488e4 + 5660d00 commit 02ac91d
Showing 1 changed file with 21 additions and 50 deletions.
71 changes: 21 additions & 50 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package storage
import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -103,7 +102,7 @@ func (gcs *GCS) Connect(ctx context.Context) error {
clientOptions = append(clientOptions, option.WithHTTPClient(debugClient))
}

factory := pool.NewPooledObjectFactory(
factory := pool.NewPooledObjectFactorySimple(
func(context.Context) (interface{}, error) {
sClient, err := storage.NewClient(ctx, clientOptions...)
if err != nil {
Expand All @@ -113,17 +112,6 @@ func (gcs *GCS) Connect(ctx context.Context) error {
Client: sClient,
},
nil
}, func(ctx context.Context, object *pool.PooledObject) error {
// destroy
return object.Object.(*clientObject).Client.Close()
}, func(ctx context.Context, object *pool.PooledObject) bool {
return true
}, func(ctx context.Context, object *pool.PooledObject) error {
// activate do nothing
return nil
}, func(ctx context.Context, object *pool.PooledObject) error {
// passivate do nothing
return nil
})
gcs.clientPool = pool.NewObjectPoolWithDefaultConfig(ctx, factory)
gcs.clientPool.Config.MaxTotal = gcs.Config.ClientPoolSize
Expand All @@ -136,13 +124,6 @@ func (gcs *GCS) Close(ctx context.Context) error {
}

func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, process func(ctx context.Context, r RemoteFile) error) error {
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return err
}
pClient := pClientObj.(*clientObject).Client

rootPath := path.Join(gcs.Config.Path, gcsPath)
prefix := rootPath + "/"
if rootPath == "/" {
Expand All @@ -152,35 +133,32 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces
if !recursive {
delimiter = "/"
}
it := pClient.Bucket(gcs.Config.Bucket).Objects(ctx, &storage.Query{
it := gcs.client.Bucket(gcs.Config.Bucket).Objects(ctx, &storage.Query{
Prefix: prefix,
Delimiter: delimiter,
})
for {
object, err := it.Next()
if errors.Is(err, iterator.Done) {
gcs.clientPool.ReturnObject(ctx, pClientObj)
return nil
}
if err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return err
}
if object.Prefix != "" {
switch err {
case nil:
if object.Prefix != "" {
if err := process(ctx, &gcsFile{
name: strings.TrimPrefix(object.Prefix, rootPath),
}); err != nil {
return err
}
continue
}
if err := process(ctx, &gcsFile{
name: strings.TrimPrefix(object.Prefix, rootPath),
size: object.Size,
lastModified: object.Updated,
name: strings.TrimPrefix(object.Name, rootPath),
}); err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return err
}
continue
}
if err := process(ctx, &gcsFile{
size: object.Size,
lastModified: object.Updated,
name: strings.TrimPrefix(object.Name, rootPath),
}); err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
case iterator.Done:
return nil
default:
return err
}
}
Expand Down Expand Up @@ -226,6 +204,7 @@ func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error
if err := writer.Close(); err != nil {
log.Warnf("can't close writer: %+v", err)
gcs.clientPool.InvalidateObject(ctx, pClientObj)

return
}
gcs.clientPool.ReturnObject(ctx, pClientObj)
Expand All @@ -236,21 +215,13 @@ func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error
}

func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) {
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
objAttr, err := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return nil, err
}
pClient := pClientObj.(*clientObject).Client
objAttr, err := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
if err == storage.ErrObjectNotExist {
return nil, ErrNotFound
}
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return nil, err
}
gcs.clientPool.ReturnObject(ctx, pClientObj)
return &gcsFile{
size: objAttr.Size,
lastModified: objAttr.Updated,
Expand Down

0 comments on commit 02ac91d

Please sign in to comment.