Skip to content

Commit

Permalink
introduce casng package (#454)
Browse files Browse the repository at this point in the history
This PR includes one method and some of the scaffolding for other
methods that will be posted later.
  • Loading branch information
mrahs authored Jun 12, 2023
1 parent 93d910e commit 9007496
Show file tree
Hide file tree
Showing 8 changed files with 683 additions and 30 deletions.
1 change: 0 additions & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ load("//:go_deps.bzl", "remote_apis_sdks_go_deps")
# gazelle:repository_macro go_deps.bzl%remote_apis_sdks_go_deps
remote_apis_sdks_go_deps()


# protobuf.
http_archive(
name = "rules_proto",
Expand Down
1 change: 0 additions & 1 deletion go/pkg/cas/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go_library(
name = "cas",
srcs = [
"client.go",
"config.go",
"ioutil.go",
"upload.go",
],
Expand Down
53 changes: 53 additions & 0 deletions go/pkg/casng/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "casng",
srcs = [
"batching.go",
"config.go",
"uploader.go",
],
importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/casng",
visibility = ["//visibility:public"],
deps = [
"//go/pkg/contextmd",
"//go/pkg/digest",
"//go/pkg/errors",
"//go/pkg/io/impath",
"//go/pkg/io/walker",
"//go/pkg/retry",
"//go/pkg/symlinkopts",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_golang_glog//:go_default_library",
"@com_github_klauspost_compress//zstd:go_default_library",
"@com_github_pborman_uuid//:go_default_library",
"@go_googleapis//google/bytestream:bytestream_go_proto",
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],
)

go_test(
name = "cas_test",
srcs = [
"batching_query_test.go",
"uploader_test.go",
],
data = glob(["testdata/**"]),
deps = [
":casng",
"//go/pkg/digest",
"//go/pkg/errors",
"//go/pkg/io/impath",
"//go/pkg/retry",
"//go/pkg/symlinkopts",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_golang_glog//:go_default_library",
"@com_github_google_go_cmp//cmp:go_default_library",
"@go_googleapis//google/bytestream:bytestream_go_proto",
"@go_googleapis//google/rpc:status_go_proto",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
],
)
103 changes: 103 additions & 0 deletions go/pkg/casng/batching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package casng

import (
"context"
"io"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/contextmd"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/errors"
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
log "github.com/golang/glog"
)

// MissingBlobs queries the CAS for digests and returns a slice of the missing ones.
//
// This method is useful when a large number of digests is already known. For other use cases, consider the streaming uploader.
// This method does not use internal processors and does not use the uploader's context. It is safe to use even if the uploader's context is cancelled.
//
// The digests are batched based on ItemLimits of the gRPC config. BytesLimit and BundleTimeout are not used in this method.
// Errors from a batch do not affect other batches, but all digests from such bad batches will be reported as missing by this call.
// In other words, if an error is returned, any digest that is not in the returned slice is not missing.
// If no error is returned, the returned slice contains all the missing digests.
func (u *BatchingUploader) MissingBlobs(ctx context.Context, digests []digest.Digest) ([]digest.Digest, error) {
contextmd.Infof(ctx, log.Level(1), "[casng] batch.query: len=%d", len(digests))
if len(digests) == 0 {
return nil, nil
}

// Deduplicate and split into batches.
var batches [][]*repb.Digest
var batch []*repb.Digest
dgSet := make(map[digest.Digest]struct{})
for _, d := range digests {
if _, ok := dgSet[d]; ok {
continue
}
dgSet[d] = struct{}{}
batch = append(batch, d.ToProto())
if len(batch) >= u.queryRPCCfg.ItemsLimit {
batches = append(batches, batch)
batch = nil
}
}
if len(batch) > 0 {
batches = append(batches, batch)
}
if len(batches) == 0 {
return nil, nil
}
contextmd.Infof(ctx, log.Level(1), "[casng] batch.query.deduped: len=%d", len(dgSet))

// Call remote.
missing := make([]digest.Digest, 0, len(dgSet))
var err error
var res *repb.FindMissingBlobsResponse
var errRes error
req := &repb.FindMissingBlobsRequest{InstanceName: u.instanceName}
for _, batch := range batches {
req.BlobDigests = batch
errRes = u.withRetry(ctx, u.queryRPCCfg.RetryPredicate, u.queryRPCCfg.RetryPolicy, func() error {
ctx, ctxCancel := context.WithTimeout(ctx, u.queryRPCCfg.Timeout)
defer ctxCancel()
res, errRes = u.cas.FindMissingBlobs(ctx, req)
return errRes
})
if res == nil {
res = &repb.FindMissingBlobsResponse{}
}
if errRes != nil {
err = errors.Join(errRes, err)
res.MissingBlobDigests = batch
}
for _, d := range res.MissingBlobDigests {
missing = append(missing, digest.NewFromProtoUnvalidated(d))
}
}
contextmd.Infof(ctx, log.Level(1), "[casng] batch.query.done: missing=%d", len(missing))

if err != nil {
err = errors.Join(ErrGRPC, err)
}
return missing, err
}

// WriteBytes uploads all the bytes of r directly to the resource name starting remotely at offset.
//
// r must return io.EOF to terminate the call.
//
// ctx is used to make and cancel remote calls.
// This method does not use the uploader's context which means it is safe to call even after that context is cancelled.
//
// size is used to toggle compression as well as report some stats. It must reflect the actual number of bytes r has to give.
// The server is notified to finalize the resource name and subsequent writes may not succeed.
// The errors returned are either from the context, ErrGRPC, ErrIO, or ErrCompression. More errors may be wrapped inside.
// If an error was returned, the returned stats may indicate that all the bytes were sent, but that does not guarantee that the server committed all of them.
func (u *BatchingUploader) WriteBytes(ctx context.Context, name string, r io.Reader, size int64, offset int64) (Stats, error) {
panic("not yet implemented")
}

// WriteBytesPartial is the same as WriteBytes, but does not notify the server to finalize the resource name.
func (u *BatchingUploader) WriteBytesPartial(ctx context.Context, name string, r io.Reader, size int64, offset int64) (Stats, error) {
panic("not yet implemented")
}
102 changes: 102 additions & 0 deletions go/pkg/casng/batching_query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Using a different package name to strictly exclude types defined here from the original package.
package casng_test

import (
"context"
"fmt"
"sort"
"testing"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/casng"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/errors"
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
log "github.com/golang/glog"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
)

func TestQuery_Batching(t *testing.T) {
errSend := fmt.Errorf("send error")
tests := []struct {
name string
digests []digest.Digest
cas *fakeCAS
wantErr error
wantDigests []digest.Digest
}{
{"empty_request", nil, &fakeCAS{}, nil, nil},
{
"no_missing",
[]digest.Digest{{Hash: "a"}, {Hash: "b"}},
&fakeCAS{findMissingBlobs: func(_ context.Context, _ *repb.FindMissingBlobsRequest, _ ...grpc.CallOption) (*repb.FindMissingBlobsResponse, error) {
return &repb.FindMissingBlobsResponse{}, nil
}},
nil,
[]digest.Digest{},
},
{
"all_missing",
[]digest.Digest{{Hash: "a"}, {Hash: "b"}},
&fakeCAS{findMissingBlobs: func(_ context.Context, req *repb.FindMissingBlobsRequest, _ ...grpc.CallOption) (*repb.FindMissingBlobsResponse, error) {
return &repb.FindMissingBlobsResponse{MissingBlobDigests: req.BlobDigests}, nil
}},
nil,
[]digest.Digest{{Hash: "a"}, {Hash: "b"}},
},
{
"some_missing",
[]digest.Digest{{Hash: "a"}, {Hash: "b"}},
&fakeCAS{findMissingBlobs: func(_ context.Context, req *repb.FindMissingBlobsRequest, _ ...grpc.CallOption) (*repb.FindMissingBlobsResponse, error) {
return &repb.FindMissingBlobsResponse{MissingBlobDigests: []*repb.Digest{{Hash: "a"}}}, nil
}},
nil,
[]digest.Digest{{Hash: "a"}},
},
{
"error_call",
[]digest.Digest{{Hash: "a"}, {Hash: "b"}},
&fakeCAS{findMissingBlobs: func(_ context.Context, req *repb.FindMissingBlobsRequest, _ ...grpc.CallOption) (*repb.FindMissingBlobsResponse, error) {
return &repb.FindMissingBlobsResponse{}, errSend
}},
errSend,
[]digest.Digest{{Hash: "a"}, {Hash: "b"}},
},
{
"two_batches",
[]digest.Digest{{Hash: "a"}, {Hash: "b"}, {Hash: "c"}},
&fakeCAS{findMissingBlobs: func(_ context.Context, req *repb.FindMissingBlobsRequest, _ ...grpc.CallOption) (*repb.FindMissingBlobsResponse, error) {
return &repb.FindMissingBlobsResponse{MissingBlobDigests: req.BlobDigests}, nil
}},
nil,
[]digest.Digest{{Hash: "a"}, {Hash: "b"}, {Hash: "c"}},
},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
log.Infof("test: %s", test.name)
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
u, err := casng.NewBatchingUploader(ctx, test.cas, &fakeByteStreamClient{}, "", defaultRPCCfg, defaultRPCCfg, defaultRPCCfg, defaultIOCfg)
if err != nil {
t.Fatalf("error creating batching uploader: %v", err)
}
missing, err := u.MissingBlobs(ctx, test.digests)
if test.wantErr == nil && err != nil {
t.Errorf("MissingBlobs failed: %v", err)
}
if test.wantErr != nil && !errors.Is(err, test.wantErr) {
t.Errorf("error mismatch: want %v, got %v", test.wantErr, err)
}
sort.Sort(byHash(test.wantDigests))
sort.Sort(byHash(missing))
if diff := cmp.Diff(test.wantDigests, missing); diff != "" {
t.Errorf("missing mismatch, (-want +got): %s", diff)
}
})
}
log.Flush()
}
Loading

0 comments on commit 9007496

Please sign in to comment.