Skip to content

Commit

Permalink
Serve GetTree requests out of the local cache via BatchReadBlobs (#7319)
Browse files Browse the repository at this point in the history
  • Loading branch information
iain-macdonald authored Aug 28, 2024
1 parent b22e5f1 commit e9d6970
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ go_library(
"//server/real_environment",
"//server/remote_cache/digest",
"//server/util/log",
"//server/util/proto",
"//server/util/rpcutil",
"//server/util/status",
"@org_golang_google_grpc//codes",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package content_addressable_storage_server_proxy
import (
"context"
"fmt"
"io"

"github.com/buildbuddy-io/buildbuddy/server/environment"
"github.com/buildbuddy-io/buildbuddy/server/real_environment"
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest"
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
"github.com/buildbuddy-io/buildbuddy/server/util/rpcutil"
"github.com/buildbuddy-io/buildbuddy/server/util/status"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -156,22 +157,44 @@ func (s *CASServerProxy) batchReadBlobsRemote(ctx context.Context, readReq *repb
}

func (s *CASServerProxy) GetTree(req *repb.GetTreeRequest, stream repb.ContentAddressableStorage_GetTreeServer) error {
// TODO(iain): cache these
remoteStream, err := s.remote.GetTree(stream.Context(), req)
if err != nil {
return err
}
for {
rsp, err := remoteStream.Recv()
if err == io.EOF {
break
ctx := stream.Context()
resp := repb.GetTreeResponse{}
respSizeBytes := 0
for dirsToGet := []*repb.Digest{req.RootDigest}; len(dirsToGet) > 0; {
brbreq := repb.BatchReadBlobsRequest{
InstanceName: req.InstanceName,
Digests: dirsToGet,
DigestFunction: req.DigestFunction,
}
brbresps, err := s.BatchReadBlobs(ctx, &brbreq)
if err != nil {
return err
}
if err = stream.Send(rsp); err != nil {
return err

dirsToGet = []*repb.Digest{}
for _, brbresp := range brbresps.Responses {
dir := &repb.Directory{}
if err := proto.Unmarshal(brbresp.Data, dir); err != nil {
return err
}

// Flush to the stream if adding the dir will make resp bigger than
// the maximum gRPC frame size.
dirSizeBytes := proto.Size(dir)
if int64(respSizeBytes+dirSizeBytes) > rpcutil.GRPCMaxSizeBytes {
if err := stream.Send(&resp); err != nil {
return err
}
resp = repb.GetTreeResponse{}
respSizeBytes = 0
}

resp.Directories = append(resp.Directories, dir)
respSizeBytes += dirSizeBytes
for _, subDir := range dir.Directories {
dirsToGet = append(dirsToGet, subDir.Digest)
}
}
}
return nil
return stream.Send(&resp)
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,39 +276,111 @@ func makeTree(ctx context.Context, client bspb.ByteStreamClient, t *testing.T) (

func TestGetTree(t *testing.T) {
ctx := context.Background()
conn, _, requestCounter := runRemoteCASS(ctx, testenv.GetTestEnv(t), t)
conn, unaryRequests, streamRequests := runRemoteCASS(ctx, testenv.GetTestEnv(t), t)
casClient := repb.NewContentAddressableStorageClient(conn)
bsClient := bspb.NewByteStreamClient(conn)
proxyConn := runCASProxy(ctx, conn, testenv.GetTestEnv(t), t)
casProxy := repb.NewContentAddressableStorageClient(proxyConn)
bsProxy := bspb.NewByteStreamClient(proxyConn)

// Read some random digest that's not there
// Unkown tree.
digest := &repb.Digest{
Hash: strings.Repeat("a", 64),
SizeBytes: 15,
}

_, err := casClient.GetTree(ctx, &repb.GetTreeRequest{RootDigest: digest})
require.NoError(t, err)
_, err = casProxy.GetTree(ctx, &repb.GetTreeRequest{RootDigest: digest})
require.NoError(t, err)
require.Equal(t, int32(1), requestCounter.Load())
require.Equal(t, int32(0), unaryRequests.Load())
require.Equal(t, int32(1), streamRequests.Load())

// Full tree written to the remote.
rootDigest, files := makeTree(ctx, bsClient, t)
treeFiles := cas.ReadTree(ctx, t, casClient, "", rootDigest)
require.ElementsMatch(t, files, treeFiles)
requestCounter.Store(0)
unaryRequests.Store(0)
streamRequests.Store(0)
treeFiles = cas.ReadTree(ctx, t, casProxy, "", rootDigest)
require.ElementsMatch(t, files, treeFiles)
// The tree has 4 levels, so expect 4 unary requests.
require.Equal(t, int32(4), unaryRequests.Load())
require.Equal(t, int32(0), streamRequests.Load())
unaryRequests.Store(0)
streamRequests.Store(0)
treeFiles = cas.ReadTree(ctx, t, casProxy, "", rootDigest)
require.ElementsMatch(t, files, treeFiles)
require.Equal(t, int32(1), requestCounter.Load())
require.Equal(t, int32(0), unaryRequests.Load())
require.Equal(t, int32(0), streamRequests.Load())

// Full tree written to the proxy.
rootDigest, files = makeTree(ctx, bsProxy, t)
treeFiles = cas.ReadTree(ctx, t, casClient, "", rootDigest)
require.ElementsMatch(t, files, treeFiles)
requestCounter.Store(0)
unaryRequests.Store(0)
streamRequests.Store(0)
treeFiles = cas.ReadTree(ctx, t, casProxy, "", rootDigest)
require.ElementsMatch(t, files, treeFiles)
require.Equal(t, int32(0), unaryRequests.Load())
require.Equal(t, int32(0), streamRequests.Load())

// Write two subtrees to the proxy and a root node to the remote.
firstTreeRoot, firstTreeFiles := makeTree(ctx, bsProxy, t)
secondTreeRoot, secondTreeFiles := makeTree(ctx, bsProxy, t)
root := &repb.Directory{
Directories: []*repb.DirectoryNode{
&repb.DirectoryNode{
Name: "first",
Digest: firstTreeRoot,
},
&repb.DirectoryNode{
Name: "second",
Digest: secondTreeRoot,
},
},
}
rootDigest, err = cachetools.UploadProto(ctx, bsClient, "", repb.DigestFunction_SHA256, root)
files = []string{"first", "second"}
files = append(files, firstTreeFiles...)
files = append(files, secondTreeFiles...)
require.NoError(t, err)
treeFiles = cas.ReadTree(ctx, t, casClient, "", rootDigest)
require.ElementsMatch(t, files, treeFiles)
unaryRequests.Store(0)
streamRequests.Store(0)
treeFiles = cas.ReadTree(ctx, t, casProxy, "", rootDigest)
require.ElementsMatch(t, files, treeFiles)
// Only the root note should be read from the remote.
require.Equal(t, int32(1), unaryRequests.Load())
require.Equal(t, int32(0), streamRequests.Load())

// Write two subtrees to the remote and a root node to the proxy.
firstTreeRoot, firstTreeFiles = makeTree(ctx, bsClient, t)
secondTreeRoot, secondTreeFiles = makeTree(ctx, bsClient, t)
root = &repb.Directory{
Directories: []*repb.DirectoryNode{
&repb.DirectoryNode{
Name: "first",
Digest: firstTreeRoot,
},
&repb.DirectoryNode{
Name: "second",
Digest: secondTreeRoot,
},
},
}
rootDigest, err = cachetools.UploadProto(ctx, bsProxy, "", repb.DigestFunction_SHA256, root)
files = []string{"first", "second"}
files = append(files, firstTreeFiles...)
files = append(files, secondTreeFiles...)
require.NoError(t, err)
treeFiles = cas.ReadTree(ctx, t, casClient, "", rootDigest)
require.ElementsMatch(t, files, treeFiles)
unaryRequests.Store(0)
streamRequests.Store(0)
treeFiles = cas.ReadTree(ctx, t, casProxy, "", rootDigest)
require.ElementsMatch(t, files, treeFiles)
// TODO(iain): change this to 0 once tree caching support is added
require.Equal(t, int32(1), requestCounter.Load())
// The subtrees but not root should be read from the remote.
require.Equal(t, int32(4), unaryRequests.Load())
require.Equal(t, int32(0), streamRequests.Load())
}
1 change: 1 addition & 0 deletions enterprise/server/remote_execution/dirtools/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//server/util/fastcopy",
"//server/util/log",
"//server/util/proto",
"//server/util/rpcutil",
"//server/util/status",
"//third_party/singleflight",
"@org_golang_google_genproto_googleapis_bytestream//:bytestream",
Expand Down
7 changes: 3 additions & 4 deletions enterprise/server/remote_execution/dirtools/dirtools.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/util/fastcopy"
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
"github.com/buildbuddy-io/buildbuddy/server/util/rpcutil"
"github.com/buildbuddy-io/buildbuddy/server/util/status"
"github.com/buildbuddy-io/buildbuddy/third_party/singleflight"
"golang.org/x/sync/errgroup"
Expand All @@ -30,8 +31,6 @@ import (
bspb "google.golang.org/genproto/googleapis/bytestream"
)

const gRPCMaxSize = int64(4000000)

var (
enableDownloadCompresssion = flag.Bool("cache.client.enable_download_compression", true, "If true, enable compression of downloads from remote caches")
)
Expand Down Expand Up @@ -778,7 +777,7 @@ func (ff *BatchFileFetcher) FetchFiles(filesToFetch FileMap, opts *DownloadTreeO
// fit in the batch call, so we'll have to bytestream
// it.
size := f.d.GetSizeBytes()
if size > gRPCMaxSize || ff.env.GetContentAddressableStorageClient() == nil {
if size > rpcutil.GRPCMaxSizeBytes || ff.env.GetContentAddressableStorageClient() == nil {
eg.Go(func() error {
return ff.bytestreamReadFiles(ctx, ff.instanceName, f.d, f.fps, opts)
})
Expand All @@ -788,7 +787,7 @@ func (ff *BatchFileFetcher) FetchFiles(filesToFetch FileMap, opts *DownloadTreeO
// If the digest would push our current batch request
// size over the gRPC max, dispatch the request and
// start a new one.
if currentBatchRequestSize+size > gRPCMaxSize {
if currentBatchRequestSize+size > rpcutil.GRPCMaxSizeBytes {
reqCopy := req
eg.Go(func() error {
return ff.batchDownloadFiles(ctx, reqCopy, filesToFetch, opts)
Expand Down
5 changes: 2 additions & 3 deletions server/remote_cache/cachetools/cachetools.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (

const (
uploadBufSizeBytes = 1000000 // 1MB
gRPCMaxSize = int64(4000000)
maxCompressionBufSize = int64(4000000)
)

Expand Down Expand Up @@ -610,7 +609,7 @@ func (ul *BatchCASUploader) Upload(d *repb.Digest, rsc io.ReadSeekCloser) error
compressor = repb.Compressor_ZSTD
}

if d.GetSizeBytes() > gRPCMaxSize {
if d.GetSizeBytes() > rpcutil.GRPCMaxSizeBytes {
resourceName := digest.NewResourceName(d, ul.instanceName, rspb.CacheType_CAS, ul.digestFunction)
resourceName.SetCompressor(compressor)

Expand All @@ -637,7 +636,7 @@ func (ul *BatchCASUploader) Upload(d *repb.Digest, rsc io.ReadSeekCloser) error
b = compression.CompressZstd(nil, b)
}
additionalSize := int64(len(b))
if ul.unsentBatchSize+additionalSize > gRPCMaxSize {
if ul.unsentBatchSize+additionalSize > rpcutil.GRPCMaxSizeBytes {
ul.flushCurrentBatch()
}
ul.unsentBatchReq.Requests = append(ul.unsentBatchReq.Requests, &repb.BatchUpdateBlobsRequest_Request{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//server/util/log",
"//server/util/prefix",
"//server/util/proto",
"//server/util/rpcutil",
"//server/util/status",
"@com_github_prometheus_client_golang//prometheus",
"@org_golang_google_genproto_googleapis_rpc//status",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/buildbuddy-io/buildbuddy/server/util/prefix"
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
"github.com/buildbuddy-io/buildbuddy/server/util/rpcutil"
"github.com/buildbuddy-io/buildbuddy/server/util/status"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -38,10 +39,7 @@ import (
gstatus "google.golang.org/grpc/status"
)

const (
gRPCMaxSize = int64(4194304 - 2000)
TreeCacheRemoteInstanceName = "_bb_treecache_"
)
const TreeCacheRemoteInstanceName = "_bb_treecache_"

var (
enableTreeCaching = flag.Bool("cache.enable_tree_caching", true, "If true, cache GetTree responses (full and partial)")
Expand Down Expand Up @@ -532,7 +530,7 @@ func (s *ContentAddressableStorageServer) GetTree(req *repb.GetTreeRequest, stre
rn := digest.ResourceNameFromProto(dirWithDigest.ResourceName)
d := rn.GetDigest()

if rspSizeBytes+d.GetSizeBytes() > gRPCMaxSize {
if rspSizeBytes+d.GetSizeBytes() > rpcutil.GRPCMaxSizeBytes {
if err := stream.Send(rsp); err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions server/util/rpcutil/rpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
)

const GRPCMaxSizeBytes = int64(4 * 1000 * 1000)

type StreamMsg[T proto.Message] struct {
Data T
Error error
Expand Down

0 comments on commit e9d6970

Please sign in to comment.