diff --git a/go/pkg/casng/BUILD.bazel b/go/pkg/casng/BUILD.bazel index ea6f0e2fd..4896a81ea 100644 --- a/go/pkg/casng/BUILD.bazel +++ b/go/pkg/casng/BUILD.bazel @@ -5,6 +5,8 @@ go_library( srcs = [ "batching.go", "config.go", + "pubsub.go", + "streaming_query.go", "throttler.go", "uploader.go", ], @@ -32,6 +34,8 @@ go_test( name = "cas_test", srcs = [ "batching_query_test.go", + "batching_write_bytes_test.go", + "streaming_query_test.go", "uploader_test.go", ], data = glob(["testdata/**"]), diff --git a/go/pkg/casng/batching.go b/go/pkg/casng/batching.go index 9b7736a7a..9ac467a77 100644 --- a/go/pkg/casng/batching.go +++ b/go/pkg/casng/batching.go @@ -95,8 +95,8 @@ func (u *BatchingUploader) MissingBlobs(ctx context.Context, digests []digest.Di // ctx is used to make and cancel remote calls. // This method does not use the uploader's context which means it is safe to call even after that context is cancelled. // -// Compression is turned on based the resource name. -// size is used to report some stats. It must reflect the actual number of bytes r has to give. +// Compression is turned on based on the resource name. +// size is used to report stats. It must reflect the actual number of bytes r has to give. // The server is notified to finalize the resource name and subsequent writes may not succeed. // The errors returned are either from the context, ErrGRPC, ErrIO, or ErrCompression. More errors may be wrapped inside. // If an error was returned, the returned stats may indicate that all the bytes were sent, but that does not guarantee that the server committed all of them. @@ -214,17 +214,17 @@ func (u *uploader) writeBytes(ctx context.Context, name string, r io.Reader, siz stats.TotalBytesMoved += n64 return stream.Send(req) }) - if errStream != nil && errStream != io.EOF { - err = errors.Join(ErrGRPC, errStream, err) - break - } - // The server says the content for the specified resource already exists. if errStream == io.EOF { cacheHit = true break } + if errStream != nil { + err = errors.Join(ErrGRPC, errStream, err) + break + } + req.WriteOffset += n64 // The reader is done (interrupted or completed). @@ -260,17 +260,11 @@ func (u *uploader) writeBytes(ctx context.Context, name string, r io.Reader, siz stats.LogicalBytesCached = size } stats.LogicalBytesStreamed = stats.LogicalBytesMoved - stats.LogicalBytesBatched = 0 - stats.InputFileCount = 0 - stats.InputDirCount = 0 - stats.InputSymlinkCount = 0 if cacheHit { stats.CacheHitCount = 1 } else { stats.CacheMissCount = 1 } - stats.DigestCount = 0 - stats.BatchedCount = 0 if err == nil { stats.StreamedCount = 1 } diff --git a/go/pkg/casng/config.go b/go/pkg/casng/config.go index 4194aaec0..ed6a7ccd6 100644 --- a/go/pkg/casng/config.go +++ b/go/pkg/casng/config.go @@ -168,7 +168,7 @@ type Stats struct { // EffectiveBytesMoved is the total number of bytes moved over the wire, excluding retries. // This may not be accurate since a gRPC call may be interrupted in which case this number may be higher than the real one. // For failures, this is reported as 0. - // It may be higher than BytesRequested (compression headers), but never higher than BytesMoved. + // It may be higher than BytesRequested (compression headers), but never higher than TotalBytesMoved. EffectiveBytesMoved int64 // LogicalBytesCached is the total number of bytes not moved over the wire due to caching (either remotely or locally). diff --git a/go/pkg/casng/pubsub.go b/go/pkg/casng/pubsub.go new file mode 100644 index 000000000..f6aef8a44 --- /dev/null +++ b/go/pkg/casng/pubsub.go @@ -0,0 +1,209 @@ +package casng + +import ( + "context" + "sync" + "time" + + log "github.com/golang/glog" + "github.com/pborman/uuid" +) + +// tag identifies a pubsub channel for routing purposes. +// Producers tag messages and consumers subscribe to tags. +type tag string + +// pubsub provides a simple pubsub implementation to route messages and wait for them. +type pubsub struct { + subs map[tag]chan any + mu sync.RWMutex + timeout time.Duration + // A signalling channel that gets a message everytime the broker hits 0 subscriptions. + // Unlike sync.WaitGroup, this allows the broker to accept more subs while a client is waiting for signal. + done chan struct{} +} + +// sub returns a routing tag and a channel to the subscriber to read messages from. +// +// Only messages associated with the returned tag are sent on the returned channel. +// This allows the subscriber to send a tagged message (request) that propagates across the system and eventually +// receive related messages (responses) from publishers on the returned channel. +// +// The subscriber must continue draining the returned channel until it's closed. +// The returned channel is unbuffered and closed only when unsub is called with the returned tag. +// +// To properly terminate the subscription, the subscriber must wait until all expected responses are received +// on the returned channel before unsubscribing. +// Once unsubscribed, any tagged messages for this subscription are dropped. +func (ps *pubsub) sub() (tag, <-chan any) { + ps.mu.Lock() + defer ps.mu.Unlock() + + t := tag(uuid.New()) + subscriber := make(chan any) + ps.subs[t] = subscriber + + log.V(3).Infof("[casng] pubsub.sub: tag=%s", t) + return t, subscriber +} + +// unsub removes the subscription for tag, if any, and closes the corresponding channel. +func (ps *pubsub) unsub(t tag) { + ps.mu.Lock() + defer ps.mu.Unlock() + subscriber, ok := ps.subs[t] + if !ok { + return + } + delete(ps.subs, t) + close(subscriber) + if len(ps.subs) == 0 { + close(ps.done) + ps.done = make(chan struct{}) + } + log.V(3).Infof("[casng] pubsub.unsub: tag=%s", t) +} + +// pub is a blocking call that fans-out a response to all specified (by tag) subscribers concurrently. +// +// Returns when all active subscribers have received their copies or timed out. +// Inactive subscribers (expired by cancelling their context) are skipped (their copies are dropped). +// +// A busy subscriber does not block others from receiving their copies, but will delay this call by up to the specified timeout on the broker. +// +// A pool of workers of the same size of tags is used. Each worker attempts to deliver the message to the corresponding subscriber. +// The pool size is a function of the client's concurrency. +// E.g. if 500 workers are publishing messages, with an average of 10 clients per message, the pool size will be 5,000. +// The maximum theoretical pool size for n publishers publishing every message to m subscribers is nm. +// However, the expected average case is few clients per message so the pool size should be close to the concurrency limit. +func (ps *pubsub) pub(m any, tags ...tag) { + _ = ps.pubN(m, len(tags), tags...) +} + +// mpub (multi-publish) delivers the "once" message to a single subscriber then delivers the "rest" message to the rest of the subscribers. +// It's useful for cases where the message holds shared information that should not be duplicated among subscribers, such as stats. +func (ps *pubsub) mpub(once any, rest any, tags ...tag) { + t := ps.pubOnce(once, tags...) + _ = ps.pubN(rest, len(tags)-1, excludeTag(tags, t)...) +} + +// pubOnce is like pub, but delivers the message to a single subscriber. +// The tag of the subscriber that got the message is returned. +func (ps *pubsub) pubOnce(m any, tags ...tag) tag { + received := ps.pubN(m, 1, tags...) + if len(received) == 0 { + return "" + } + return received[0] +} + +// pubN is like pub, but delivers the message to no more than n subscribers. The tags of the subscribers that got the message are returned. +func (ps *pubsub) pubN(m any, n int, tags ...tag) []tag { + if log.V(3) { + startTime := time.Now() + defer func() { + log.Infof("[casng] pubsub.duration: start=%d, end=%d", startTime.UnixNano(), time.Now().UnixNano()) + }() + } + if len(tags) == 0 { + log.Warning("[casng] pubsub.pub: called without tags, dropping message") + log.V(4).Infof("[casng] pubsub.pub: called without tags for msg=%v", m) + return nil + } + if n <= 0 { + log.Warningf("[casng] pubsub.pub: nothing published because n=%d", n) + return nil + } + + ps.mu.RLock() + defer ps.mu.RUnlock() + + log.V(4).Infof("[casng] pubsub.pub.msg: type=%[1]T, value=%[1]v", m) + ctx, ctxCancel := context.WithTimeout(context.Background(), ps.timeout) + defer ctxCancel() + + // Optimize for the usual case of a single receiver. + if len(tags) == 1 { + t := tags[0] + s := ps.subs[t] + select { + case s <- m: + case <-ctx.Done(): + log.Errorf("pubsub timeout for %s", t) + return nil + } + return tags + } + + wg := sync.WaitGroup{} + received := make([]tag, 0, len(tags)) + r := make(chan tag) + go func() { + for t := range r { + received = append(received, t) + } + wg.Done() + }() + for _, t := range tags { + s := ps.subs[t] + wg.Add(1) + go func(t tag) { + defer wg.Done() + select { + case s <- m: + r <- t + case <-ctx.Done(): + log.Errorf("pubsub timeout for %s", t) + } + }(t) + } + + // Wait for subscribers. + wg.Wait() + + // Wait for the aggregator. + wg.Add(1) + close(r) + wg.Wait() + return received +} + +// wait blocks until all existing subscribers unsubscribe. +// The signal is a snapshot. The broker my get more subscribers after returning from this call. +func (ps *pubsub) wait() { + <-ps.done +} + +// len returns the number of active subscribers. +func (ps *pubsub) len() int { + ps.mu.RLock() + defer ps.mu.RUnlock() + return len(ps.subs) +} + +// newPubSub initializes a new instance where subscribers must receive messages within timeout. +func newPubSub(timeout time.Duration) *pubsub { + return &pubsub{ + subs: make(map[tag]chan any), + timeout: timeout, + done: make(chan struct{}), + } +} + +// excludeTag is used by mpub to filter out the tag that received the "once" message. +func excludeTag(tags []tag, et tag) []tag { + if len(tags) == 0 { + return []tag{} + } + ts := make([]tag, 0, len(tags)-1) + // Only exclude the tag once. + excluded := false + for _, t := range tags { + if !excluded && t == et { + excluded = true + continue + } + ts = append(ts, t) + } + return ts +} diff --git a/go/pkg/casng/streaming_query.go b/go/pkg/casng/streaming_query.go new file mode 100644 index 000000000..4a996b695 --- /dev/null +++ b/go/pkg/casng/streaming_query.go @@ -0,0 +1,323 @@ +// The query processor provides a streaming interface to query the CAS for digests. +// +// Multiple concurrent clients can use the same uploader instance at the same time. +// The processor bundles requests from multiple concurrent clients to amortize the cost of querying +// the batching API. That is, it attempts to bundle the maximum possible number of digests in a single gRPC call. +// +// This is done using 3 factors: the size (bytes) limit, the items limit, and a time limit. +// If any of these limits is reached, the processor will dispatch the call and start a new bundle. +// This means that a request can be delayed by the processor (not including network and server time) up to the time limit. +// However, in high throughput sessions, the processor will dispatch calls sooner. +// +// To properly manage multiple concurrent clients while providing the bundling behaviour, the processor becomes a serialization point. +// That is, all requests go through a single channel. To minimize blocking and leverage server concurrency, the processor loop +// is optimized for high throughput and it launches gRPC calls concurrently. +// In other words, it's many-one-many relationship, where many clients send to one processor which sends to many workers. +// +// To avoid forcing another serialization point through the processor, each worker notifies relevant clients of the results +// it acquired from the server. In this case, it's a one-many relationship, where one worker sends to many clients. +// +// All in all, the design implements a many-one-many-one-many pipeline. +// Many clients send to one processor, which sends to many workers; each worker sends to many clients. +// +// Each client is provided with a channel they send their requests on. The handler of that channel, marks each request +// with a unique tag and forwards it to the processor. +// +// The processor receives multiple requests, each potentially with a different tag. +// Each worker receives a bundle of requests that may contain multiple tags. +// +// To facilitate the routing between workers and clients, a simple pubsub implementation is used. +// Each instance, a broker, manages routing messages between multiple subscribers (clients) and multiple publishers (workers). +// Each client gets their own channel on which they receive messages marked for them. +// Each publisher specifies which clients the messages should be routed to. +// The broker attempts at-most-once delivery. +// +// The client handler manages the pubsub subscription by waiting until a matching number of responses was received, after which +// it cancels the subscription. +package casng + +import ( + "context" + "time" + + "github.com/bazelbuild/remote-apis-sdks/go/pkg/contextmd" + "github.com/bazelbuild/remote-apis-sdks/go/pkg/digest" + "github.com/bazelbuild/remote-apis-sdks/go/pkg/errors" + "github.com/bazelbuild/remote-apis-sdks/go/pkg/retry" + repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" + log "github.com/golang/glog" + "google.golang.org/protobuf/proto" +) + +// MissingBlobsResponse represents a query result for a single digest. +// +// If Err is not nil, Missing is false. +type MissingBlobsResponse struct { + Digest digest.Digest + Missing bool + Err error +} + +// missingBlobRequest associates a digest with its requester's context. +type missingBlobRequest struct { + digest digest.Digest + tag tag + ctx context.Context +} + +// missingBlobRequestBundle is a set of digests, each is associated with multiple tags (requesters). +// It is used for unified requests when multiple concurrent requesters share seats in the same bundle. +type missingBlobRequestBundle = map[digest.Digest][]tag + +// MissingBlobs is a non-blocking call that queries the CAS for incoming digests. +// +// This method is useful when digests are calculated and dispatched on the fly. +// For a large list of known digests, consider using the batching uploader. +// +// To properly stop this call, close in and cancel ctx, then wait for the returned channel to close. +// The channel in must be closed as a termination signal. Cancelling ctx is not enough. +// The uploader's context is used to make remote calls using metadata from ctx. +// Metadata unification assumes all requests share the same correlated invocation ID. +// +// The digests are unified (aggregated/bundled) based on ItemsLimit, BytesLimit and BundleTimeout of the gRPC config. +// The returned channel is unbuffered and will be closed after the input channel is closed and all sent requests get their corresponding responses. +// This could indicate completion or cancellation (in case the context was canceled). +// Slow consumption speed on the returned channel affects the consumption speed on in. +// +// This method must not be called after cancelling the uploader's context. +func (u *StreamingUploader) MissingBlobs(ctx context.Context, in <-chan digest.Digest) <-chan MissingBlobsResponse { + pipeIn := make(chan missingBlobRequest) + out := u.missingBlobsPipe(pipeIn) + u.clientSenderWg.Add(1) + go func() { + defer u.clientSenderWg.Done() + defer close(pipeIn) + for d := range in { + pipeIn <- missingBlobRequest{digest: d, ctx: ctx} + } + }() + return out +} + +// missingBlobsPipe is a shared implementation between batching and streaming interfaces. +func (u *uploader) missingBlobsPipe(in <-chan missingBlobRequest) <-chan MissingBlobsResponse { + ch := make(chan MissingBlobsResponse) + + // If this was called after the the uploader was terminated, short the circuit and return. + select { + case <-u.ctx.Done(): + go func() { + defer close(ch) + res := MissingBlobsResponse{Err: ErrTerminatedUploader} + for req := range in { + res.Digest = req.digest + ch <- res + } + }() + return ch + default: + } + + tag, resCh := u.queryPubSub.sub() + pendingCh := make(chan int) + + // Sender. It terminates when in is closed, at which point it sends 0 as a termination signal to the counter. + u.querySenderWg.Add(1) + go func() { + defer u.querySenderWg.Done() + + log.V(1).Info("[casng] query.streamer.sender.start") + defer log.V(1).Info("[casng] query.streamer.sender.stop") + + for r := range in { + r.tag = tag + u.queryCh <- r + pendingCh <- 1 + } + pendingCh <- 0 + }() + + // Receiver. It terminates with resCh is closed, at which point it closes the returned channel. + u.receiverWg.Add(1) + go func() { + defer u.receiverWg.Done() + defer close(ch) + + log.V(1).Info("[casng] query.streamer.receiver.start") + defer log.V(1).Info("[casng] query.streamer.receiver.stop") + + // Continue to drain until the broker closes the channel. + for { + r, ok := <-resCh + if !ok { + return + } + ch <- r.(MissingBlobsResponse) + pendingCh <- -1 + } + }() + + // Counter. It terminates when count hits 0 after receiving a done signal from the sender. + // Upon termination, it sends a signal to pubsub to terminate the subscription which closes resCh. + u.workerWg.Add(1) + go func() { + defer u.workerWg.Done() + defer u.queryPubSub.unsub(tag) + + log.V(1).Info("[casng] query.streamer.counter.start") + defer log.V(1).Info("[casng] query.streamer.counter.stop") + + pending := 0 + done := false + for x := range pendingCh { + if x == 0 { + done = true + } + pending += x + // If the sender is done and all the requests are done, let the receiver and the broker terminate. + if pending == 0 && done { + return + } + } + }() + + return ch +} + +// queryProcessor is the fan-in handler that manages the bundling and dispatching of incoming requests. +func (u *uploader) queryProcessor() { + log.V(1).Info("[casng] query.processor.start") + defer log.V(1).Info("[casng] query.processor.stop") + + bundle := make(missingBlobRequestBundle) + ctx := u.ctx // context with unified metadata. + bundleSize := u.queryRequestBaseSize + + handle := func() { + if len(bundle) < 1 { + return + } + // Block the entire processor if the concurrency limit is reached. + startTime := time.Now() + if !u.queryThrottler.acquire(u.ctx) { + // Ensure responses are dispatched before aborting. + for d := range bundle { + u.queryPubSub.pub(MissingBlobsResponse{ + Digest: d, + Err: u.ctx.Err(), + }, bundle[d]...) + } + return + } + log.V(3).Infof("[casng] query.throttle.duration: start=%d, end=%d", startTime.UnixNano(), time.Now().UnixNano()) + + u.workerWg.Add(1) + go func(ctx context.Context, b missingBlobRequestBundle) { + defer u.workerWg.Done() + defer u.queryThrottler.release() + u.callMissingBlobs(ctx, b) + }(ctx, bundle) + + bundle = make(missingBlobRequestBundle) + bundleSize = u.queryRequestBaseSize + ctx = u.ctx + } + + bundleTicker := time.NewTicker(u.queryRPCCfg.BundleTimeout) + defer bundleTicker.Stop() + for { + select { + case req, ok := <-u.queryCh: + if !ok { + return + } + startTime := time.Now() + + log.V(3).Infof("[casng] query.processor.req: digest=%s, tag=%s, bundle=%d", req.digest, req.tag, len(bundle)) + dSize := proto.Size(req.digest.ToProto()) + + // Check oversized items. + if u.queryRequestBaseSize+dSize > u.queryRPCCfg.BytesLimit { + u.queryPubSub.pub(MissingBlobsResponse{ + Digest: req.digest, + Err: ErrOversizedItem, + }, req.tag) + // Covers waiting on subscribers. + log.V(3).Infof("[casng] query.pub.duration: start=%d, end=%d", startTime.UnixNano(), time.Now().UnixNano()) + continue + } + + // Check size threshold. + if bundleSize+dSize >= u.queryRPCCfg.BytesLimit { + handle() + } + + // Duplicate tags are allowed to ensure the requester can match the number of responses to the number of requests. + bundle[req.digest] = append(bundle[req.digest], req.tag) + bundleSize += dSize + ctx, _ = contextmd.FromContexts(ctx, req.ctx) // ignore non-essential error. + + // Check length threshold. + if len(bundle) >= u.queryRPCCfg.ItemsLimit { + handle() + } + case <-bundleTicker.C: + handle() + } + } +} + +// callMissingBlobs calls the gRPC endpoint and notifies requesters of the results. +// It assumes ownership of the bundle argument. +func (u *uploader) callMissingBlobs(ctx context.Context, bundle missingBlobRequestBundle) { + digests := make([]*repb.Digest, 0, len(bundle)) + for d := range bundle { + digests = append(digests, d.ToProto()) + } + + req := &repb.FindMissingBlobsRequest{ + InstanceName: u.instanceName, + BlobDigests: digests, + } + + var res *repb.FindMissingBlobsResponse + var err error + startTime := time.Now() + err = retry.WithPolicy(ctx, u.queryRPCCfg.RetryPredicate, u.queryRPCCfg.RetryPolicy, func() error { + ctx, ctxCancel := context.WithTimeout(ctx, u.queryRPCCfg.Timeout) + defer ctxCancel() + res, err = u.cas.FindMissingBlobs(ctx, req) + return err + }) + log.V(3).Infof("[casng] query.grpc.duration: start=%d, end=%d", startTime.UnixNano(), time.Now().UnixNano()) + + var missing []*repb.Digest + if res != nil { + missing = res.MissingBlobDigests + } + if err != nil { + err = errors.Join(ErrGRPC, err) + missing = digests + } + + startTime = time.Now() + // Report missing. + for _, dpb := range missing { + d := digest.NewFromProtoUnvalidated(dpb) + u.queryPubSub.pub(MissingBlobsResponse{ + Digest: d, + Missing: err == nil, + Err: err, + }, bundle[d]...) + delete(bundle, d) + } + + // Report non-missing. + for d := range bundle { + u.queryPubSub.pub(MissingBlobsResponse{ + Digest: d, + Missing: false, + }, bundle[d]...) + } + log.V(3).Infof("[casng] query.pub.duration: start=%d, end=%d", startTime.UnixNano(), time.Now().UnixNano()) +} diff --git a/go/pkg/casng/streaming_query_test.go b/go/pkg/casng/streaming_query_test.go new file mode 100644 index 000000000..6fef5d93c --- /dev/null +++ b/go/pkg/casng/streaming_query_test.go @@ -0,0 +1,53 @@ +package casng_test + +import ( + "context" + "testing" + + "github.com/bazelbuild/remote-apis-sdks/go/pkg/casng" + "github.com/bazelbuild/remote-apis-sdks/go/pkg/digest" + repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" + "google.golang.org/grpc" +) + +func TestMissingBlobs_StreamingAbort(t *testing.T) { + fCas := &fakeCAS{findMissingBlobs: func(_ context.Context, _ *repb.FindMissingBlobsRequest, _ ...grpc.CallOption) (*repb.FindMissingBlobsResponse, error) { + return &repb.FindMissingBlobsResponse{}, nil + }} + ctx, ctxCancel := context.WithCancel(context.Background()) + ctxCancel() + _, err := casng.NewStreamingUploader(ctx, fCas, &fakeByteStreamClient{}, "", defaultRPCCfg, defaultRPCCfg, defaultRPCCfg, defaultIOCfg) + if err != nil { + t.Fatalf("error creating batching uploader: %v", err) + } +} + +func TestMissingBlobs_Streaming(t *testing.T) { + fCas := &fakeCAS{findMissingBlobs: func(_ context.Context, _ *repb.FindMissingBlobsRequest, _ ...grpc.CallOption) (*repb.FindMissingBlobsResponse, error) { + return &repb.FindMissingBlobsResponse{}, nil + }} + ctx, ctxCancel := context.WithCancel(context.Background()) + u, err := casng.NewStreamingUploader(ctx, fCas, &fakeByteStreamClient{}, "", defaultRPCCfg, defaultRPCCfg, defaultRPCCfg, defaultIOCfg) + if err != nil { + t.Fatalf("error creating batching uploader: %v", err) + } + reqChan := make(chan digest.Digest) + ch := u.MissingBlobs(ctx, reqChan) + + go func() { + for i := 0; i < 1000; i++ { + reqChan <- digest.Digest{Hash: "a"} + } + close(reqChan) + }() + + defer ctxCancel() + for r := range ch { + if r.Err != nil { + t.Errorf("unexpected error: %v", r.Err) + } + if r.Missing { + t.Errorf("unexpected missing: %s", r.Digest.Hash) + } + } +} diff --git a/go/pkg/casng/uploader.go b/go/pkg/casng/uploader.go index aa77e3297..062b1444e 100644 --- a/go/pkg/casng/uploader.go +++ b/go/pkg/casng/uploader.go @@ -81,6 +81,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/bazelbuild/remote-apis-sdks/go/pkg/digest" repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" @@ -121,7 +122,7 @@ func MakeCompressedWriteResourceName(instanceName, hash string, size int64) stri return fmt.Sprintf("%s/uploads/%s/compressed-blobs/zstd/%s/%d", instanceName, uuid.New(), hash, size) } -// IsCompressedWriteResourceName returns true if the name was generated with MakeCompressedWriteResourceName. +// IsCompressedWriteResourceName returns true if the name was generated using MakeCompressedWriteResourceName. func IsCompressedWriteResourceName(name string) bool { return strings.Contains(name, "compressed-blobs/zstd") } @@ -147,6 +148,7 @@ type uploader struct { streamRPCCfg GRPCConfig // gRPC throttling controls. + queryThrottler *throttler // Controls concurrent calls to the query API. streamThrottle *throttler // Controls concurrent calls to the byte streaming API. // IO controls. @@ -171,13 +173,15 @@ type uploader struct { uploadRequestItemBaseSize int // Concurrency controls. - clientSenderWg sync.WaitGroup // Batching API producers. - querySenderWg sync.WaitGroup // Query streaming API producers. - uploadSenderWg sync.WaitGroup // Upload streaming API producers. - processorWg sync.WaitGroup // Internal routers. - receiverWg sync.WaitGroup // Consumers. - workerWg sync.WaitGroup // Short-lived intermediate producers/consumers. - walkerWg sync.WaitGroup // Tracks all walkers. + clientSenderWg sync.WaitGroup // Batching API producers. + querySenderWg sync.WaitGroup // Query streaming API producers. + processorWg sync.WaitGroup // Internal routers. + receiverWg sync.WaitGroup // Consumers. + workerWg sync.WaitGroup // Short-lived intermediate producers/consumers. + walkerWg sync.WaitGroup // Tracks all walkers. + queryCh chan missingBlobRequest // Fan-in channel for query requests. + queryPubSub *pubsub // Fan-out broker for query responses. + uploadPubSub *pubsub // Fan-out broker for upload responses. // ctx is used to make unified calls and terminate saturated throttlers and in-flight workers. ctx context.Context @@ -248,6 +252,7 @@ func newUploader( batchRPCCfg: uploadCfg, streamRPCCfg: streamCfg, + queryThrottler: newThrottler(int64(queryCfg.ConcurrentCallsLimit)), streamThrottle: newThrottler(int64(streamCfg.ConcurrentCallsLimit)), ioCfg: ioCfg, @@ -266,11 +271,59 @@ func newUploader( }, }, + queryCh: make(chan missingBlobRequest), + queryPubSub: newPubSub(time.Second), + queryRequestBaseSize: proto.Size(&repb.FindMissingBlobsRequest{InstanceName: instanceName, BlobDigests: []*repb.Digest{}}), uploadRequestBaseSize: proto.Size(&repb.BatchUpdateBlobsRequest{InstanceName: instanceName, Requests: []*repb.BatchUpdateBlobsRequest_Request{}}), uploadRequestItemBaseSize: proto.Size(&repb.BatchUpdateBlobsRequest_Request{Digest: digest.NewFromBlob([]byte("abc")).ToProto(), Data: []byte{}}), } log.V(1).Infof("[casng] uploader.new: cfg_query=%+v, cfg_batch=%+v, cfg_stream=%+v, cfg_io=%+v", queryCfg, uploadCfg, streamCfg, ioCfg) + u.processorWg.Add(1) + go func() { + u.queryProcessor() + u.processorWg.Done() + }() + + go u.close() return u, nil } + +func (u *uploader) close() { + // The context must be cancelled first. + <-u.ctx.Done() + + startTime := time.Now() + + // 1st, batching API senders should stop producing requests. + // These senders are terminated by the user. + log.V(1).Infof("[casng] uploader: waiting for client senders") + u.clientSenderWg.Wait() + + // 2nd, streaming API upload senders should stop producing queries and requests. + + // 3rd, streaming API query senders should stop producing queries. + // This propagates from the uploader's pipe, hence, the uploader must stop first. + log.V(1).Infof("[casng] uploader: waiting for query senders") + u.querySenderWg.Wait() + close(u.queryCh) // Terminates the query processor. + + // 4th, internal routres should flush all remaining requests. + log.V(1).Infof("[casng] uploader: waiting for processors") + u.processorWg.Wait() + + // 5th, internal brokers should flush all remaining messages. + log.V(1).Infof("[casng] uploader: waiting for brokers") + u.queryPubSub.wait() + + // 6th, receivers should have drained their channels by now. + log.V(1).Infof("[casng] uploader: waiting for receivers") + u.receiverWg.Wait() + + // 7th, workers should have terminated by now. + log.V(1).Infof("[casng] uploader: waiting for workers") + u.workerWg.Wait() + + log.V(3).Infof("[casng] upload.close.duration: start=%d, end=%d", startTime.UnixNano(), time.Now().UnixNano()) +}