Skip to content

Commit

Permalink
add error handling and connection pool close to gcs.go
Browse files Browse the repository at this point in the history
  • Loading branch information
Slach committed Oct 18, 2023
1 parent 8f6fe6b commit 237ded8
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (gcs *GCS) Connect(ctx context.Context) error {
}

func (gcs *GCS) Close(ctx context.Context) error {
gcs.clientPool.Close(ctx)
return gcs.client.Close()
}

Expand Down Expand Up @@ -174,10 +175,14 @@ func (gcs *GCS) GetFileReader(ctx context.Context, key string) (io.ReadCloser, e
obj := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key))
reader, err := obj.NewReader(ctx)
if err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
if pErr := gcs.clientPool.InvalidateObject(ctx, pClientObj); pErr != nil {
log.Warnf("GetFileReader: gcs.clientPool.InvalidateObject error: %v ", pErr)
}
return nil, err
}
gcs.clientPool.ReturnObject(ctx, pClientObj)
if pErr := gcs.clientPool.ReturnObject(ctx, pClientObj); pErr != nil {
log.Warnf("GetFileReader: gcs.clientPool.ReturnObject error: %v ", pErr)
}
return reader, nil
}

Expand All @@ -202,14 +207,17 @@ func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error
}
defer func() {
if err := writer.Close(); err != nil {
log.Warnf("can't close writer: %+v", err)
gcs.clientPool.InvalidateObject(ctx, pClientObj)

log.Warnf("gcs.PutFile: can't close writer: %+v", err)
if err = gcs.clientPool.InvalidateObject(ctx, pClientObj); err != nil {
log.Warnf("gcs.PutFile: gcs.clientPool.InvalidateObject error: %+v", err)
}
return
}
gcs.clientPool.ReturnObject(ctx, pClientObj)
if err = gcs.clientPool.ReturnObject(ctx, pClientObj); err != nil {
log.Warnf("gcs.PutFile: gcs.clientPool.ReturnObject error: %+v", err)
}
}()
buffer := make([]byte, 512*1024)
buffer := make([]byte, 128*1024)
_, err = io.CopyBuffer(writer, r, buffer)
return err
}
Expand Down Expand Up @@ -239,10 +247,14 @@ func (gcs *GCS) deleteKey(ctx context.Context, key string) error {
object := pClient.Bucket(gcs.Config.Bucket).Object(key)
err = object.Delete(ctx)
if err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
if pErr := gcs.clientPool.InvalidateObject(ctx, pClientObj); pErr != nil {
log.Warnf("gcs.deleteKey: gcs.clientPool.InvalidateObject error: %+v", pErr)
}
return err
}
gcs.clientPool.ReturnObject(ctx, pClientObj)
if pErr := gcs.clientPool.ReturnObject(ctx, pClientObj); pErr != nil {
log.Warnf("gcs.deleteKey: gcs.clientPool.ReturnObject error: %+v", pErr)
}
return nil
}

Expand All @@ -268,15 +280,21 @@ func (gcs *GCS) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string
dst := pClient.Bucket(gcs.Config.Bucket).Object(dstKey)
attrs, err := src.Attrs(ctx)
if err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
if pErr := gcs.clientPool.InvalidateObject(ctx, pClientObj); pErr != nil {
log.Warnf("gcs.CopyObject: gcs.clientPool.InvalidateObject error: %+v", pErr)
}
return 0, err
}
if _, err = dst.CopierFrom(src).Run(ctx); err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
if pErr := gcs.clientPool.InvalidateObject(ctx, pClientObj); pErr != nil {
log.Warnf("gcs.CopyObject: gcs.clientPool.InvalidateObject error: %+v", pErr)
}
return 0, err
}
log.Debugf("GCS->CopyObject %s/%s -> %s/%s", srcBucket, srcKey, gcs.Config.Bucket, dstKey)
gcs.clientPool.ReturnObject(ctx, pClientObj)
if pErr := gcs.clientPool.ReturnObject(ctx, pClientObj); pErr != nil {
log.Warnf("gcs.CopyObject: gcs.clientPool.ReturnObject error: %+v", pErr)
}
return attrs.Size, nil
}

Expand Down

0 comments on commit 237ded8

Please sign in to comment.