Skip to content

Commit

Permalink
blockbuilder: fix panic when closing tsdb after failing to upload blo…
Browse files Browse the repository at this point in the history
…ck (#10391)

Signed-off-by: Vladimir Varankin <[email protected]>
  • Loading branch information
narqo authored Jan 9, 2025
1 parent 27f0faf commit e7d8d36
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 17 deletions.
30 changes: 13 additions & 17 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,30 +316,26 @@ type blockUploader func(_ context.Context, tenantID, dbDir string, blockIDs []st
// All the DBs are closed and directories cleared irrespective of success or failure of this function.
func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUploader) (_ int, err error) {
var (
doneDBsMu sync.Mutex
doneDBs = make(map[*userTSDB]bool)
closedDBsMu sync.Mutex
closedDBs = make(map[*userTSDB]bool)
)

b.tsdbsMu.Lock()
defer func() {
b.tsdbsMu.Unlock()

var merr multierror.MultiError
merr.Add(err)
// If some TSDB was not compacted or uploaded, it will be re-tried in the next cycle, so we remove it here.
// If some TSDB was not compacted or uploaded, it will be re-tried in the next cycle, so we always remove it here.
for _, db := range b.tsdbs {
if doneDBs[db] {
continue
if !closedDBs[db] {
merr.Add(db.Close())
}
dbDir := db.Dir()
merr.Add(db.Close())
merr.Add(os.RemoveAll(dbDir))
merr.Add(os.RemoveAll(db.Dir()))
}

err = merr.Err()

// Clear the map so that it can be released from the memory. Not setting to nil in case we want to reuse the TSDBBuilder.
clear(b.tsdbs)
b.tsdbsMu.Unlock()

err = merr.Err()
}()

level.Info(b.logger).Log("msg", "compacting and uploading blocks", "num_tsdb", len(b.tsdbs))
Expand Down Expand Up @@ -384,14 +380,14 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp
return err
}

closedDBsMu.Lock()
closedDBs[db] = true
closedDBsMu.Unlock()

if err := uploadBlocks(ctx, tenant.tenantID, dbDir, blockIDs); err != nil {
return err
}

doneDBsMu.Lock()
doneDBs[db] = true
doneDBsMu.Unlock()

// Clear the DB from the disk. Don't need it anymore.
return os.RemoveAll(dbDir)
})
Expand Down
24 changes: 24 additions & 0 deletions pkg/blockbuilder/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,30 @@ func TestTSDBBuilder(t *testing.T) {
}
}

func TestTSDBBuilder_CompactAndUpload_fail(t *testing.T) {
overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
metrics := newTSDBBBuilderMetrics(prometheus.NewPedanticRegistry())
builder := NewTSDBBuilder(log.NewNopLogger(), t.TempDir(), mimir_tsdb.BlocksStorageConfig{}, overrides, metrics, 0)
t.Cleanup(func() {
require.NoError(t, builder.Close())
})

userID := strconv.Itoa(rand.Int())
tenant := tsdbTenant{
partitionID: 0,
tenantID: userID,
}
_, err = builder.getOrCreateTSDB(tenant)
require.NoError(t, err)

errUploadFailed := fmt.Errorf("upload failed")
_, err = builder.CompactAndUpload(context.Background(), func(_ context.Context, _, _ string, _ []string) error {
return errUploadFailed
})
require.ErrorIs(t, err, errUploadFailed)
}

func compareQuery(t *testing.T, db *tsdb.DB, expSamples []mimirpb.Sample, expHistograms []mimirpb.Histogram, matchers ...*labels.Matcher) {
querier, err := db.Querier(math.MinInt64, math.MaxInt64)
require.NoError(t, err)
Expand Down

0 comments on commit e7d8d36

Please sign in to comment.