Skip to content

Commit

Permalink
Use a control loop
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov committed Jun 14, 2024
1 parent adb7ef0 commit 63b5370
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 118 deletions.
251 changes: 167 additions & 84 deletions pkg/storegateway/indexheader/lazy_binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package indexheader

import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -78,18 +79,48 @@ type LazyBinaryReader struct {
lazyLoadingGate gate.Gate
ctx context.Context

readerMx sync.RWMutex
reader Reader
readerErr error
readerInUse sync.WaitGroup // Only increased when readerMx is held.
readerFactory func() (Reader, error)
loadedReaderC chan readerRequest
loadReqC chan loadRequest
unloadReqC chan unloadRequest

// Keep track of the last time it was used.
usedAt *atomic.Int64

readerFactory func() (Reader, error)

blockID ulid.ULID
}

type readerRequest struct {
response chan loadedReader
}

// loadedReader represents an attempt to load a Reader. If the attempt failed, then err is set and reader is nil.
// If the attempt succeeded, then err is nil, and inUse and reader are set.
// If the attempt succeeded, then inUse must be signalled when the reader is no longer in use.
type loadedReader struct {
reader Reader
inUse *sync.WaitGroup

err error
}

// loadRequest is a request to load a binary reader.
type loadRequest struct {
// done will be closed when the load is complete.
done chan struct{}
}

// unloadRequest is a request to unload a binary reader.
type unloadRequest struct {
// response will receive a single error with the result of the unload operation.
// response will not be closed.
response chan error
// idleSinceNanos is the unix nano timestamp of the last time this reader was used.
// If idleSinceNanos is 0, the check on the last usage is skipped.
idleSinceNanos int64
}

// NewLazyBinaryReader makes a new LazyBinaryReader. If the index-header does not exist
// on the local disk at dir location, this function will build it downloading required
// sections from the full index stored in the bucket. However, this function doesn't load
Expand Down Expand Up @@ -123,17 +154,24 @@ func NewLazyBinaryReader(
level.Debug(logger).Log("msg", "built index-header file", "path", path, "elapsed", time.Since(start))
}

return &LazyBinaryReader{
reader := &LazyBinaryReader{
logger: logger,
filepath: path,
metrics: metrics,
usedAt: atomic.NewInt64(time.Now().UnixNano()),
usedAt: atomic.NewInt64(0),
onClosed: onClosed,
readerFactory: readerFactory,
blockID: id,
lazyLoadingGate: lazyLoadingGate,
ctx: ctx,
}, nil

loadedReaderC: make(chan readerRequest),
loadReqC: make(chan loadRequest),
unloadReqC: make(chan unloadRequest),
}
// TODO use a proper service lifecycler here to shut down this goroutine
go reader.controlLoop()
return reader, nil
}

// Close implements Reader. It unloads the index-header from memory (releasing the mmap
Expand Down Expand Up @@ -231,124 +269,169 @@ func (r *LazyBinaryReader) EagerLoad(ctx context.Context) {
// Returns the reader, wait group that should be used to signal that usage of reader is finished, and an error on failure.
// Must be called without lock.
func (r *LazyBinaryReader) getOrLoadReader(ctx context.Context) (Reader, *sync.WaitGroup, error) {
r.readerMx.RLock()
defer r.readerMx.RUnlock()

// Nothing to do if we already tried loading it.
if r.reader != nil {
r.usedAt.Store(time.Now().UnixNano())
r.readerInUse.Add(1)

return r.reader, &r.readerInUse, nil
}
if r.readerErr != nil {
return nil, nil, r.readerErr
loadedR := r.getReader(ctx)
if loadedR.reader != nil {
return loadedR.reader, loadedR.inUse, nil
}

// Release the read lock, so that loadReader can take write lock. Take the read lock again once done.
r.readerMx.RUnlock()
err := r.loadReader()
// Re-acquire read lock.
r.readerMx.RLock()
promise := r.triggerLoadReader(ctx)
loadedR = r.waitLoadedReader(ctx, promise)
if loadedR.reader == nil && loadedR.err == nil {
// There's a small chance that the reader was unloaded while we were waiting for it to load,
// so we make sure to catch this edge case.
return nil, nil, errUnloadedWhileLoading
}
return loadedR.reader, loadedR.inUse, loadedR.err
}

if err != nil {
return nil, nil, err
func (r *LazyBinaryReader) triggerLoadReader(ctx context.Context) chan struct{} {
loadReq := loadRequest{done: make(chan struct{})}
select {
case r.loadReqC <- loadReq:
return loadReq.done
case <-ctx.Done():
return nil
}
// Between the write lock release and the subsequent read lock, the unload() may have run,
// so we make sure to catch this edge case.
if r.reader == nil {
return nil, nil, errUnloadedWhileLoading
}

func (r *LazyBinaryReader) waitLoadedReader(ctx context.Context, loadDone chan struct{}) loadedReader {
select {
case <-loadDone:
return r.getReader(ctx)
case <-ctx.Done():
return loadedReader{err: context.Cause(ctx)}
}
}

r.usedAt.Store(time.Now().UnixNano())
r.readerInUse.Add(1)
return r.reader, &r.readerInUse, nil
func (r *LazyBinaryReader) getReader(ctx context.Context) loadedReader {
readerReq := readerRequest{response: make(chan loadedReader)}
select {
case r.loadedReaderC <- readerReq:
select {
case loadedR := <-readerReq.response:
return loadedR
case <-ctx.Done():
// We will get a response on the channel, and if it's a loaded reader we need to signal that we're no longer using it.
// This should be rare, so spinning up a goroutine shouldn't be too expensive.
go r.waitAndCloseReader(readerReq)
return loadedReader{err: context.Cause(ctx)}
}
case <-ctx.Done():
return loadedReader{err: context.Cause(ctx)}
}
}

// loadReader is called from getOrLoadReader, without any locks.
func (r *LazyBinaryReader) loadReader() error {
func (r *LazyBinaryReader) loadReader() (Reader, error) {
// lazyLoadingGate implementation: blocks load if too many are happening at once.
// It's important to get permit from the Gate when NOT holding the read-lock, otherwise we risk that multiple goroutines
// that enter `load()` will deadlock themselves. (If Start() allows one goroutine to continue, but blocks another one,
// then goroutine that continues would not be able to get Write lock.)
err := r.lazyLoadingGate.Start(r.ctx)
if err != nil {
return errors.Wrapf(err, "failed to wait for turn")
return nil, errors.Wrapf(err, "failed to wait for turn")
}
defer r.lazyLoadingGate.Done()

r.readerMx.Lock()
defer r.readerMx.Unlock()

// Ensure none else tried to load it in the meanwhile.
if r.reader != nil {
return nil
}
if r.readerErr != nil {
return r.readerErr
}

level.Debug(r.logger).Log("msg", "lazy loading index-header file", "path", r.filepath)
r.metrics.loadCount.Inc()
startTime := time.Now()

reader, err := r.readerFactory()
if err != nil {
r.metrics.loadFailedCount.Inc()
r.readerErr = err
return errors.Wrapf(err, "lazy load index-header file at %s", r.filepath)
return nil, errors.Wrapf(err, "lazy load index-header file at %s", r.filepath)
}

r.reader = reader
elapsed := time.Since(startTime)

level.Debug(r.logger).Log("msg", "lazy loaded index-header file", "path", r.filepath, "elapsed", elapsed)
r.metrics.loadDuration.Observe(elapsed.Seconds())

return nil
return reader, nil
}

// unloadIfIdleSince closes underlying BinaryReader if the reader is idle since given time (as unix nano). If idleSince is 0,
// the check on the last usage is skipped. Calling this function on a already unloaded reader is a no-op.
func (r *LazyBinaryReader) unloadIfIdleSince(ts int64) error {
r.readerMx.Lock()
defer r.readerMx.Unlock()

// Nothing to do if already unloaded.
if r.reader == nil {
return nil
}

// Do not unloadIfIdleSince if not idle.
if ts > 0 && r.usedAt.Load() > ts {
return errNotIdle
func (r *LazyBinaryReader) waitAndCloseReader(req readerRequest) {
resp := <-req.response
if resp.reader != nil {
resp.inUse.Done()
}
}

// Wait until all users finished using current reader.
r.readerInUse.Wait()

r.metrics.unloadCount.Inc()
if err := r.reader.Close(); err != nil {
r.metrics.unloadFailedCount.Inc()
return err
// unloadIfIdleSince closes underlying BinaryReader if the reader is idle since given time (as unix nano). If idleSince is 0,
// the check on the last usage is skipped. Calling this function on a already unloaded reader is a no-op.
func (r *LazyBinaryReader) unloadIfIdleSince(tsNano int64) error {
req := unloadRequest{
// The channel is unbuffered because we will read the response. It should be buffered if we can give up before reading from it
response: make(chan error),
idleSinceNanos: tsNano,
}

r.reader = nil
return nil
r.unloadReqC <- req
return <-req.response
}

// isIdleSince returns true if the reader is idle since given time (as unix nano).
func (r *LazyBinaryReader) isIdleSince(ts int64) bool {
if r.usedAt.Load() > ts {
return false
func (r *LazyBinaryReader) controlLoop() {
var loaded loadedReader
for {
select {
case readerReq := <-r.loadedReaderC:
if loaded.reader != nil {
loaded.inUse.Add(1)
r.usedAt.Store(time.Now().UnixNano())
}
readerReq.response <- loaded

case loadReq := <-r.loadReqC:
if loaded.reader == nil && loaded.err == nil {
loaded = loadedReader{}
loaded.reader, loaded.err = r.loadReader()
if loaded.reader != nil {
loaded.inUse = &sync.WaitGroup{}
r.usedAt.Store(time.Now().UnixNano())
}
}
close(loadReq.done)

case unloadPromise := <-r.unloadReqC:
if loaded.reader == nil {
// Nothing to do if already unloaded.
unloadPromise.response <- nil
continue
}

// Do not unloadIfIdleSince if not idle.
if ts := unloadPromise.idleSinceNanos; ts > 0 && r.usedAt.Load() > ts {
unloadPromise.response <- errNotIdle
continue
}

// Wait until all users finished using current reader.
loaded.inUse.Wait()

r.metrics.unloadCount.Inc()
if err := loaded.reader.Close(); err != nil {
r.metrics.unloadFailedCount.Inc()
unloadPromise.response <- fmt.Errorf("closing bianry reader: %w", err)
continue
}

loaded = loadedReader{}
r.usedAt.Store(0)
unloadPromise.response <- nil
}
}
}

// A reader can be considered idle only if it's loaded.
r.readerMx.RLock()
loaded := r.reader != nil
r.readerMx.RUnlock()
// IsIdleSince returns true if the reader is idle since given time (as unix nano).
func (r *LazyBinaryReader) IsIdleSince(ts int64) bool {
lastUse := r.LoadedLastUse()
return lastUse != 0 && lastUse <= ts
}

return loaded
// LoadedLastUse returns 0 if the reader is not loaded.
// LoadedLastUse returns a timestamp in nanoseconds of the last time this reader was used.
func (r *LazyBinaryReader) LoadedLastUse() int64 {
return r.usedAt.Load()
}

type lazySymbolsReader struct {
Expand Down
Loading

0 comments on commit 63b5370

Please sign in to comment.