Skip to content

Commit

Permalink
casng, query processor (#463)
Browse files Browse the repository at this point in the history
The query processor provides a streaming interface to query the CAS for digests.

Multiple concurrent clients can use the same uploader instance at the same time.
The processor bundles requests from multiple concurrent clients to amortize the cost of querying
the batching API. That is, it attempts to bundle the maximum possible number of digests in a single gRPC call.

This is done using 3 factors: the size (bytes) limit, the items limit, and a time limit.
If any of these limits is reached, the processor will dispatch the call and start a new bundle.
This means that a request can be delayed by the processor (not including network and server time) up to the time limit.
However, in high throughput sessions, the processor will dispatch calls sooner.

To properly manage multiple concurrent clients while providing the bundling behaviour, the processor becomes a serialization point.
That is, all requests go through a single channel. To minimize blocking and leverage server concurrency, the processor loop
is optimized for high throughput and it launches gRPC calls concurrently.
In other words, it's many-one-many relationship, where many clients send to one processor which sends to many workers.

To avoid forcing another serialization point through the processor, each worker notifies relevant clients of the results
it acquired from the server. In this case, it's a one-many relationship, where one worker sends to many clients.

All in all, the design implements a many-one-many-one-many pipeline.
Many clients send to one processor, which sends to many workers; each worker sends to many clients.

Each client is provided with a channel they send their requests on. The handler of that channel, marks each request
with a unique tag and forwards it to the processor.

The processor receives multiple requests, each potentially with a different tag.
Each worker receives a bundle of requests that may contain multiple tags.

To facilitate the routing between workers and clients, a simple pubsub implementation is used.
Each instance, a broker, manages routing messages between multiple subscribers (clients) and multiple publishers (workers).
Each client gets their own channel on which they receive messages marked for them.
Each publisher specifies which clients the messages should be routed to.
The broker attempts at-most-once delivery.

The client handler manages the pubsub subscription by waiting until a matching number of responses was received, after which
it cancels the subscription.
  • Loading branch information
mrahs authored Jun 16, 2023
1 parent 8d8b99c commit cdd7fa5
Show file tree
Hide file tree
Showing 7 changed files with 658 additions and 22 deletions.
4 changes: 4 additions & 0 deletions go/pkg/casng/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go_library(
srcs = [
"batching.go",
"config.go",
"pubsub.go",
"streaming_query.go",
"throttler.go",
"uploader.go",
],
Expand Down Expand Up @@ -32,6 +34,8 @@ go_test(
name = "cas_test",
srcs = [
"batching_query_test.go",
"batching_write_bytes_test.go",
"streaming_query_test.go",
"uploader_test.go",
],
data = glob(["testdata/**"]),
Expand Down
20 changes: 7 additions & 13 deletions go/pkg/casng/batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (u *BatchingUploader) MissingBlobs(ctx context.Context, digests []digest.Di
// ctx is used to make and cancel remote calls.
// This method does not use the uploader's context which means it is safe to call even after that context is cancelled.
//
// Compression is turned on based the resource name.
// size is used to report some stats. It must reflect the actual number of bytes r has to give.
// Compression is turned on based on the resource name.
// size is used to report stats. It must reflect the actual number of bytes r has to give.
// The server is notified to finalize the resource name and subsequent writes may not succeed.
// The errors returned are either from the context, ErrGRPC, ErrIO, or ErrCompression. More errors may be wrapped inside.
// If an error was returned, the returned stats may indicate that all the bytes were sent, but that does not guarantee that the server committed all of them.
Expand Down Expand Up @@ -214,17 +214,17 @@ func (u *uploader) writeBytes(ctx context.Context, name string, r io.Reader, siz
stats.TotalBytesMoved += n64
return stream.Send(req)
})
if errStream != nil && errStream != io.EOF {
err = errors.Join(ErrGRPC, errStream, err)
break
}

// The server says the content for the specified resource already exists.
if errStream == io.EOF {
cacheHit = true
break
}

if errStream != nil {
err = errors.Join(ErrGRPC, errStream, err)
break
}

req.WriteOffset += n64

// The reader is done (interrupted or completed).
Expand Down Expand Up @@ -260,17 +260,11 @@ func (u *uploader) writeBytes(ctx context.Context, name string, r io.Reader, siz
stats.LogicalBytesCached = size
}
stats.LogicalBytesStreamed = stats.LogicalBytesMoved
stats.LogicalBytesBatched = 0
stats.InputFileCount = 0
stats.InputDirCount = 0
stats.InputSymlinkCount = 0
if cacheHit {
stats.CacheHitCount = 1
} else {
stats.CacheMissCount = 1
}
stats.DigestCount = 0
stats.BatchedCount = 0
if err == nil {
stats.StreamedCount = 1
}
Expand Down
2 changes: 1 addition & 1 deletion go/pkg/casng/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ type Stats struct {
// EffectiveBytesMoved is the total number of bytes moved over the wire, excluding retries.
// This may not be accurate since a gRPC call may be interrupted in which case this number may be higher than the real one.
// For failures, this is reported as 0.
// It may be higher than BytesRequested (compression headers), but never higher than BytesMoved.
// It may be higher than BytesRequested (compression headers), but never higher than TotalBytesMoved.
EffectiveBytesMoved int64

// LogicalBytesCached is the total number of bytes not moved over the wire due to caching (either remotely or locally).
Expand Down
209 changes: 209 additions & 0 deletions go/pkg/casng/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package casng

import (
"context"
"sync"
"time"

log "github.com/golang/glog"
"github.com/pborman/uuid"
)

// tag identifies a pubsub channel for routing purposes.
// Producers tag messages and consumers subscribe to tags.
type tag string

// pubsub provides a simple pubsub implementation to route messages and wait for them.
type pubsub struct {
subs map[tag]chan any
mu sync.RWMutex
timeout time.Duration
// A signalling channel that gets a message everytime the broker hits 0 subscriptions.
// Unlike sync.WaitGroup, this allows the broker to accept more subs while a client is waiting for signal.
done chan struct{}
}

// sub returns a routing tag and a channel to the subscriber to read messages from.
//
// Only messages associated with the returned tag are sent on the returned channel.
// This allows the subscriber to send a tagged message (request) that propagates across the system and eventually
// receive related messages (responses) from publishers on the returned channel.
//
// The subscriber must continue draining the returned channel until it's closed.
// The returned channel is unbuffered and closed only when unsub is called with the returned tag.
//
// To properly terminate the subscription, the subscriber must wait until all expected responses are received
// on the returned channel before unsubscribing.
// Once unsubscribed, any tagged messages for this subscription are dropped.
func (ps *pubsub) sub() (tag, <-chan any) {
ps.mu.Lock()
defer ps.mu.Unlock()

t := tag(uuid.New())
subscriber := make(chan any)
ps.subs[t] = subscriber

log.V(3).Infof("[casng] pubsub.sub: tag=%s", t)
return t, subscriber
}

// unsub removes the subscription for tag, if any, and closes the corresponding channel.
func (ps *pubsub) unsub(t tag) {
ps.mu.Lock()
defer ps.mu.Unlock()
subscriber, ok := ps.subs[t]
if !ok {
return
}
delete(ps.subs, t)
close(subscriber)
if len(ps.subs) == 0 {
close(ps.done)
ps.done = make(chan struct{})
}
log.V(3).Infof("[casng] pubsub.unsub: tag=%s", t)
}

// pub is a blocking call that fans-out a response to all specified (by tag) subscribers concurrently.
//
// Returns when all active subscribers have received their copies or timed out.
// Inactive subscribers (expired by cancelling their context) are skipped (their copies are dropped).
//
// A busy subscriber does not block others from receiving their copies, but will delay this call by up to the specified timeout on the broker.
//
// A pool of workers of the same size of tags is used. Each worker attempts to deliver the message to the corresponding subscriber.
// The pool size is a function of the client's concurrency.
// E.g. if 500 workers are publishing messages, with an average of 10 clients per message, the pool size will be 5,000.
// The maximum theoretical pool size for n publishers publishing every message to m subscribers is nm.
// However, the expected average case is few clients per message so the pool size should be close to the concurrency limit.
func (ps *pubsub) pub(m any, tags ...tag) {
_ = ps.pubN(m, len(tags), tags...)
}

// mpub (multi-publish) delivers the "once" message to a single subscriber then delivers the "rest" message to the rest of the subscribers.
// It's useful for cases where the message holds shared information that should not be duplicated among subscribers, such as stats.
func (ps *pubsub) mpub(once any, rest any, tags ...tag) {
t := ps.pubOnce(once, tags...)
_ = ps.pubN(rest, len(tags)-1, excludeTag(tags, t)...)
}

// pubOnce is like pub, but delivers the message to a single subscriber.
// The tag of the subscriber that got the message is returned.
func (ps *pubsub) pubOnce(m any, tags ...tag) tag {
received := ps.pubN(m, 1, tags...)
if len(received) == 0 {
return ""
}
return received[0]
}

// pubN is like pub, but delivers the message to no more than n subscribers. The tags of the subscribers that got the message are returned.
func (ps *pubsub) pubN(m any, n int, tags ...tag) []tag {
if log.V(3) {
startTime := time.Now()
defer func() {
log.Infof("[casng] pubsub.duration: start=%d, end=%d", startTime.UnixNano(), time.Now().UnixNano())
}()
}
if len(tags) == 0 {
log.Warning("[casng] pubsub.pub: called without tags, dropping message")
log.V(4).Infof("[casng] pubsub.pub: called without tags for msg=%v", m)
return nil
}
if n <= 0 {
log.Warningf("[casng] pubsub.pub: nothing published because n=%d", n)
return nil
}

ps.mu.RLock()
defer ps.mu.RUnlock()

log.V(4).Infof("[casng] pubsub.pub.msg: type=%[1]T, value=%[1]v", m)
ctx, ctxCancel := context.WithTimeout(context.Background(), ps.timeout)
defer ctxCancel()

// Optimize for the usual case of a single receiver.
if len(tags) == 1 {
t := tags[0]
s := ps.subs[t]
select {
case s <- m:
case <-ctx.Done():
log.Errorf("pubsub timeout for %s", t)
return nil
}
return tags
}

wg := sync.WaitGroup{}
received := make([]tag, 0, len(tags))
r := make(chan tag)
go func() {
for t := range r {
received = append(received, t)
}
wg.Done()
}()
for _, t := range tags {
s := ps.subs[t]
wg.Add(1)
go func(t tag) {
defer wg.Done()
select {
case s <- m:
r <- t
case <-ctx.Done():
log.Errorf("pubsub timeout for %s", t)
}
}(t)
}

// Wait for subscribers.
wg.Wait()

// Wait for the aggregator.
wg.Add(1)
close(r)
wg.Wait()
return received
}

// wait blocks until all existing subscribers unsubscribe.
// The signal is a snapshot. The broker my get more subscribers after returning from this call.
func (ps *pubsub) wait() {
<-ps.done
}

// len returns the number of active subscribers.
func (ps *pubsub) len() int {
ps.mu.RLock()
defer ps.mu.RUnlock()
return len(ps.subs)
}

// newPubSub initializes a new instance where subscribers must receive messages within timeout.
func newPubSub(timeout time.Duration) *pubsub {
return &pubsub{
subs: make(map[tag]chan any),
timeout: timeout,
done: make(chan struct{}),
}
}

// excludeTag is used by mpub to filter out the tag that received the "once" message.
func excludeTag(tags []tag, et tag) []tag {
if len(tags) == 0 {
return []tag{}
}
ts := make([]tag, 0, len(tags)-1)
// Only exclude the tag once.
excluded := false
for _, t := range tags {
if !excluded && t == et {
excluded = true
continue
}
ts = append(ts, t)
}
return ts
}
Loading

0 comments on commit cdd7fa5

Please sign in to comment.