Skip to content

Commit

Permalink
casng, byte streaming implementation (#462)
Browse files Browse the repository at this point in the history
Byte streaming is used to upload large files that do not fit in a batching request.
This implementation avoids using the chunker package by leveraging
the streaming nature of IO buffering.
Furthermore, error handling is more robust in this implementation
since all errors are handled while ensuring a graceful cancelation when necessary.
  • Loading branch information
mrahs authored Jun 13, 2023
1 parent 9007496 commit 8d8b99c
Show file tree
Hide file tree
Showing 5 changed files with 544 additions and 11 deletions.
1 change: 1 addition & 0 deletions go/pkg/casng/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"batching.go",
"config.go",
"throttler.go",
"uploader.go",
],
importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/casng",
Expand Down
198 changes: 192 additions & 6 deletions go/pkg/casng/batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package casng

import (
"context"
"fmt"
"io"
"sync"
"time"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/contextmd"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/errors"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/retry"
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
log "github.com/golang/glog"
"github.com/klauspost/compress/zstd"
bspb "google.golang.org/genproto/googleapis/bytestream"
)

// MissingBlobs queries the CAS for digests and returns a slice of the missing ones.
Expand Down Expand Up @@ -57,7 +63,7 @@ func (u *BatchingUploader) MissingBlobs(ctx context.Context, digests []digest.Di
req := &repb.FindMissingBlobsRequest{InstanceName: u.instanceName}
for _, batch := range batches {
req.BlobDigests = batch
errRes = u.withRetry(ctx, u.queryRPCCfg.RetryPredicate, u.queryRPCCfg.RetryPolicy, func() error {
errRes = retry.WithPolicy(ctx, u.queryRPCCfg.RetryPredicate, u.queryRPCCfg.RetryPolicy, func() error {
ctx, ctxCancel := context.WithTimeout(ctx, u.queryRPCCfg.Timeout)
defer ctxCancel()
res, errRes = u.cas.FindMissingBlobs(ctx, req)
Expand Down Expand Up @@ -89,15 +95,195 @@ func (u *BatchingUploader) MissingBlobs(ctx context.Context, digests []digest.Di
// ctx is used to make and cancel remote calls.
// This method does not use the uploader's context which means it is safe to call even after that context is cancelled.
//
// size is used to toggle compression as well as report some stats. It must reflect the actual number of bytes r has to give.
// Compression is turned on based the resource name.
// size is used to report some stats. It must reflect the actual number of bytes r has to give.
// The server is notified to finalize the resource name and subsequent writes may not succeed.
// The errors returned are either from the context, ErrGRPC, ErrIO, or ErrCompression. More errors may be wrapped inside.
// If an error was returned, the returned stats may indicate that all the bytes were sent, but that does not guarantee that the server committed all of them.
func (u *BatchingUploader) WriteBytes(ctx context.Context, name string, r io.Reader, size int64, offset int64) (Stats, error) {
panic("not yet implemented")
func (u *BatchingUploader) WriteBytes(ctx context.Context, name string, r io.Reader, size, offset int64) (Stats, error) {
if !u.streamThrottle.acquire(ctx) {
return Stats{}, ctx.Err()
}
defer u.streamThrottle.release()
return u.writeBytes(ctx, name, r, size, offset, true)
}

// WriteBytesPartial is the same as WriteBytes, but does not notify the server to finalize the resource name.
func (u *BatchingUploader) WriteBytesPartial(ctx context.Context, name string, r io.Reader, size int64, offset int64) (Stats, error) {
panic("not yet implemented")
func (u *BatchingUploader) WriteBytesPartial(ctx context.Context, name string, r io.Reader, size, offset int64) (Stats, error) {
if !u.streamThrottle.acquire(ctx) {
return Stats{}, ctx.Err()
}
defer u.streamThrottle.release()
return u.writeBytes(ctx, name, r, size, offset, false)
}

func (u *uploader) writeBytes(ctx context.Context, name string, r io.Reader, size, offset int64, finish bool) (Stats, error) {
contextmd.Infof(ctx, log.Level(1), "[casng] upload.write_bytes: name=%s, size=%d, offset=%d, finish=%t", name, size, offset, finish)
defer contextmd.Infof(ctx, log.Level(1), "[casng] upload.write_bytes.done: name=%s, size=%d, offset=%d, finish=%t", name, size, offset, finish)
if log.V(3) {
startTime := time.Now()
defer func() {
log.Infof("[casng] upload.write_bytes.duration: start=%d, end=%d, name=%s, size=%d, chunk_size=%d", startTime.UnixNano(), time.Now().UnixNano(), name, size, u.ioCfg.BufferSize)
}()
}

var stats Stats
// Read raw bytes if compression is disabled.
src := r

// If compression is enabled, plug in the encoder via a pipe.
var errEnc error
var nRawBytes int64 // Track the actual number of the consumed raw bytes.
var encWg sync.WaitGroup
var withCompression bool // Used later to ensure the pipe is closed.
if IsCompressedWriteResourceName(name) {
contextmd.Infof(ctx, log.Level(1), "[casng] upload.write_bytes.compressing: name=%s, size=%d", name, size)
withCompression = true
pr, pw := io.Pipe()
// Closing pr always returns a nil error, but also sends ErrClosedPipe to pw.
defer pr.Close()
src = pr // Read compressed bytes instead of raw bytes.

enc := u.zstdEncoders.Get().(*zstd.Encoder)
defer u.zstdEncoders.Put(enc)
// (Re)initialize the encoder with this writer.
enc.Reset(pw)
// Get it going.
encWg.Add(1)
go func() {
defer encWg.Done()
// Closing pw always returns a nil error, but also sends an EOF to pr.
defer pw.Close()

// The encoder will theoretically read continuously. However, pw will block it
// while pr is not reading from the other side.
// In other words, the chunk size of the encoder's output is controlled by the reader.
nRawBytes, errEnc = enc.ReadFrom(r)
// Closing the encoder is necessary to flush remaining bytes.
errEnc = errors.Join(enc.Close(), errEnc)
if errors.Is(errEnc, io.ErrClosedPipe) {
// pr was closed first, which means the actual error is on that end.
errEnc = nil
}
}()
}

ctx, ctxCancel := context.WithCancel(ctx)
defer ctxCancel()

stream, errStream := u.byteStream.Write(ctx)
if errStream != nil {
return stats, errors.Join(ErrGRPC, errStream)
}

// buf slice is never resliced which makes it safe to use a pointer-like type.
buf := u.buffers.Get().([]byte)
defer u.buffers.Put(buf)

cacheHit := false
var err error
req := &bspb.WriteRequest{
ResourceName: name,
WriteOffset: offset,
}
for {
n, errRead := src.Read(buf)
if errRead != nil && errRead != io.EOF {
err = errors.Join(ErrIO, errRead, err)
break
}

n64 := int64(n)
stats.LogicalBytesMoved += n64 // This may be adjusted later to exclude compression. See below.
stats.EffectiveBytesMoved += n64

req.Data = buf[:n]
req.FinishWrite = finish && errRead == io.EOF
errStream := retry.WithPolicy(ctx, u.streamRPCCfg.RetryPredicate, u.streamRPCCfg.RetryPolicy, func() error {
timer := time.NewTimer(u.streamRPCCfg.Timeout)
// Ensure the timer goroutine terminates if Send does not timeout.
success := make(chan struct{})
defer close(success)
go func() {
select {
case <-timer.C:
ctxCancel() // Cancel the stream to allow Send to return.
case <-success:
}
}()
stats.TotalBytesMoved += n64
return stream.Send(req)
})
if errStream != nil && errStream != io.EOF {
err = errors.Join(ErrGRPC, errStream, err)
break
}

// The server says the content for the specified resource already exists.
if errStream == io.EOF {
cacheHit = true
break
}

req.WriteOffset += n64

// The reader is done (interrupted or completed).
if errRead == io.EOF {
break
}
}

// In case of a cache hit or an error, the pipe must be closed to terminate the encoder's goroutine
// which would have otherwise terminated after draining the reader.
if srcCloser, ok := src.(io.Closer); ok && withCompression {
if errClose := srcCloser.Close(); errClose != nil {
err = errors.Join(ErrIO, errClose, err)
}
}

// This theoretically will block until the encoder's goroutine has returned, which is the happy path.
// If the reader failed without the encoder's knowledge, closing the pipe will trigger the encoder to terminate, which is done above.
// In any case, waiting here is necessary because the encoder's goroutine currently owns errEnc and nRawBytes.
encWg.Wait()
if errEnc != nil {
err = errors.Join(ErrCompression, errEnc, err)
}

// Capture stats before processing errors.
stats.BytesRequested = size
if nRawBytes > 0 {
// Compression was turned on.
// nRawBytes may be smaller than compressed bytes (additional headers without effective compression).
stats.LogicalBytesMoved = nRawBytes
}
if cacheHit {
stats.LogicalBytesCached = size
}
stats.LogicalBytesStreamed = stats.LogicalBytesMoved
stats.LogicalBytesBatched = 0
stats.InputFileCount = 0
stats.InputDirCount = 0
stats.InputSymlinkCount = 0
if cacheHit {
stats.CacheHitCount = 1
} else {
stats.CacheMissCount = 1
}
stats.DigestCount = 0
stats.BatchedCount = 0
if err == nil {
stats.StreamedCount = 1
}

res, errClose := stream.CloseAndRecv()
if errClose != nil {
return stats, errors.Join(ErrGRPC, errClose, err)
}

// CommittedSize is based on the uncompressed size of the blob.
if !cacheHit && res.CommittedSize != size {
err = errors.Join(ErrGRPC, fmt.Errorf("committed size mismatch: got %d, want %d", res.CommittedSize, size), err)
}

return stats, err
}
Loading

0 comments on commit 8d8b99c

Please sign in to comment.