diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fabc754416..6c331f65020 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -151,7 +151,8 @@ ### Tools * [FEATURE] `splitblocks`: add new tool to split blocks larger than a specified duration into multiple blocks. #9517, #9779 -* [ENHANCEMENT] `copyblocks`: Added `--skip-no-compact-block-duration-check`, which defaults to `false`, to simplify targeting blocks that are not awaiting compaction. #9439 +* [ENHANCEMENT] `copyblocks`: add `--skip-no-compact-block-duration-check`, which defaults to `false`, to simplify targeting blocks that are not awaiting compaction. #9439 +* [ENHANCEMENT] `copyblocks`: add `--user-mapping` to support copying blocks between users. #10110 * [ENHANCEMENT] `kafkatool`: add SASL plain authentication support. The following new CLI flags have been added: #9584 * `--kafka-sasl-username` * `--kafka-sasl-password` diff --git a/tools/copyblocks/README.md b/tools/copyblocks/README.md index 3e6d14b764a..8f0fe49a936 100644 --- a/tools/copyblocks/README.md +++ b/tools/copyblocks/README.md @@ -13,6 +13,7 @@ The currently supported services are Amazon Simple Storage Service (S3 and S3-co - Include or exclude users from having blocks copied (`--enabled-users` and `--disabled-users`) - Configurable minimum block duration (`--min-block-duration`) and (`--skip-no-compact-block-duration-check`) to target blocks that are not awaiting compaction - Configurable time range (`--min-time` and `--max-time`) to only copy blocks inclusively within a provided range +- Copy blocks between users with `--user-mapping`. For instance, `--user-mapping="user1:user2,user3:user4"` maps source blocks from `user1` to `user2` and source blocks from `user3` to `user4`. If you don't provide a mapping for a user, it is assumed to be identical to the source user. - Log what would be copied without actually copying anything with `--dry-run` ## Running diff --git a/tools/copyblocks/main.go b/tools/copyblocks/main.go index a72f05efea6..d6a2bc22783 100644 --- a/tools/copyblocks/main.go +++ b/tools/copyblocks/main.go @@ -16,6 +16,7 @@ import ( "os" "os/signal" "path/filepath" + "slices" "strings" "syscall" "time" @@ -24,6 +25,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/tenant" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -44,6 +46,7 @@ type config struct { copyPeriod time.Duration enabledUsers flagext.StringSliceCSV disabledUsers flagext.StringSliceCSV + userMapping flagext.StringSliceCSV dryRun bool skipNoCompactBlockDurationCheck bool httpListen string @@ -59,8 +62,9 @@ func (c *config) registerFlags(f *flag.FlagSet) { f.DurationVar(&c.copyPeriod, "copy-period", 0, "How often to repeat the copy. If set to 0, copy is done once, and the program stops. Otherwise, the program keeps running and copying blocks until it is terminated.") f.Var(&c.enabledUsers, "enabled-users", "If not empty, only blocks for these users are copied.") f.Var(&c.disabledUsers, "disabled-users", "If not empty, blocks for these users are not copied.") + f.Var(&c.userMapping, "user-mapping", "A comma-separated list of (source user):(destination user). If a user is not mapped then its destination user is assumed to be identical.") f.BoolVar(&c.dryRun, "dry-run", false, "Don't perform any copy; only log what would happen.") - f.BoolVar(&c.skipNoCompactBlockDurationCheck, "skip-no-compact-block-duration-check", false, "If set, blocks marked as no-compact are not checked against min-block-duration") + f.BoolVar(&c.skipNoCompactBlockDurationCheck, "skip-no-compact-block-duration-check", false, "If set, blocks marked as no-compact are not checked against min-block-duration.") f.StringVar(&c.httpListen, "http-listen-address", ":8080", "HTTP listen address.") } @@ -80,6 +84,27 @@ func (c *config) validate() error { return nil } +func (c *config) parseUserMapping() (map[string]string, error) { + m := make(map[string]string, len(c.userMapping)) + for _, mapping := range c.userMapping { + splitMapping := strings.Split(mapping, ":") + if len(splitMapping) != 2 || slices.Contains(splitMapping, "") { + return nil, fmt.Errorf("invalid user mapping: %s", mapping) + } + for _, id := range splitMapping { + if err := tenant.ValidTenantID(id); err != nil { + return nil, err + } + } + source := splitMapping[0] + if _, ok := m[source]; ok { + return nil, fmt.Errorf("multiple user mappings for source user: %s", source) + } + m[source] = splitMapping[1] + } + return m, nil +} + type metrics struct { copyCyclesSucceeded prometheus.Counter copyCyclesFailed prometheus.Counter @@ -126,6 +151,12 @@ func main() { os.Exit(1) } + userMapping, err := cfg.parseUserMapping() + if err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) + } + logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) @@ -144,7 +175,7 @@ func main() { } }() - success := runCopy(ctx, cfg, logger, m) + success := runCopy(ctx, cfg, userMapping, logger, m) if cfg.copyPeriod <= 0 { if success { os.Exit(0) @@ -158,14 +189,14 @@ func main() { for ctx.Err() == nil { select { case <-t.C: - _ = runCopy(ctx, cfg, logger, m) + _ = runCopy(ctx, cfg, userMapping, logger, m) case <-ctx.Done(): } } } -func runCopy(ctx context.Context, cfg config, logger log.Logger, m *metrics) bool { - err := copyBlocks(ctx, cfg, logger, m) +func runCopy(ctx context.Context, cfg config, userMapping map[string]string, logger log.Logger, m *metrics) bool { + err := copyBlocks(ctx, cfg, userMapping, logger, m) if err != nil { m.copyCyclesFailed.Inc() level.Error(logger).Log("msg", "failed to copy blocks", "err", err, "dryRun", cfg.dryRun) @@ -177,7 +208,7 @@ func runCopy(ctx context.Context, cfg config, logger log.Logger, m *metrics) boo return true } -func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) error { +func copyBlocks(ctx context.Context, cfg config, userMapping map[string]string, logger log.Logger, m *metrics) error { sourceBucket, destBucket, copyFunc, err := cfg.copyConfig.ToBuckets(ctx) if err != nil { return err @@ -197,23 +228,28 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) disabledUsers[u] = struct{}{} } - return concurrency.ForEachUser(ctx, tenants, cfg.tenantConcurrency, func(ctx context.Context, tenantID string) error { - if !isAllowedUser(enabledUsers, disabledUsers, tenantID) { + return concurrency.ForEachUser(ctx, tenants, cfg.tenantConcurrency, func(ctx context.Context, sourceTenantID string) error { + if !isAllowedUser(enabledUsers, disabledUsers, sourceTenantID) { return nil } - logger := log.With(logger, "tenantID", tenantID) + destinationTenantID, ok := userMapping[sourceTenantID] + if !ok { + destinationTenantID = sourceTenantID + } - blocks, err := listBlocksForTenant(ctx, sourceBucket, tenantID) + logger := log.With(logger, "sourceTenantID", sourceTenantID, "destinationTenantID", destinationTenantID) + + blocks, err := listBlocksForTenant(ctx, sourceBucket, sourceTenantID) if err != nil { level.Error(logger).Log("msg", "failed to list blocks for tenant", "err", err) - return errors.Wrapf(err, "failed to list blocks for tenant %v", tenantID) + return errors.Wrapf(err, "failed to list blocks for tenant %v", sourceTenantID) } - markers, err := listBlockMarkersForTenant(ctx, sourceBucket, tenantID, destBucket.Name()) + markers, err := listBlockMarkersForTenant(ctx, sourceBucket, sourceTenantID, destBucket.Name()) if err != nil { level.Error(logger).Log("msg", "failed to list blocks markers for tenant", "err", err) - return errors.Wrapf(err, "failed to list block markers for tenant %v", tenantID) + return errors.Wrapf(err, "failed to list block markers for tenant %v", sourceTenantID) } var blockIDs []string @@ -243,7 +279,7 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) return nil } - blockMeta, err := loadMetaJSONFile(ctx, sourceBucket, tenantID, blockID) + blockMeta, err := loadMetaJSONFile(ctx, sourceBucket, sourceTenantID, blockID) if err != nil { level.Error(logger).Log("msg", "skipping block, failed to read meta.json file", "err", err) return err @@ -287,7 +323,7 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) level.Info(logger).Log("msg", "copying block") - err = copySingleBlock(ctx, tenantID, blockID, markers[blockID], sourceBucket, copyFunc) + err = copySingleBlock(ctx, sourceTenantID, destinationTenantID, blockID, markers[blockID], sourceBucket, copyFunc) if err != nil { m.blocksCopyFailed.Inc() level.Error(logger).Log("msg", "failed to copy block", "err", err) @@ -297,7 +333,9 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) m.blocksCopied.Inc() level.Info(logger).Log("msg", "block copied successfully") - err = uploadCopiedMarkerFile(ctx, sourceBucket, tenantID, blockID, destBucket.Name()) + // Note that only the blockID and destination bucket are considered in the copy marker. + // If multiple tenants in the same destination bucket are copied to from the same source tenant the markers will currently clash. + err = uploadCopiedMarkerFile(ctx, sourceBucket, sourceTenantID, blockID, destBucket.Name()) if err != nil { level.Error(logger).Log("msg", "failed to upload copied-marker file for block", "block", blockID.String(), "err", err) return err @@ -324,13 +362,13 @@ func isAllowedUser(enabled map[string]struct{}, disabled map[string]struct{}, te } // This method copies files within single TSDB block to a destination bucket. -func copySingleBlock(ctx context.Context, tenantID string, blockID ulid.ULID, markers blockMarkers, srcBkt objtools.Bucket, copyFunc objtools.CopyFunc) error { +func copySingleBlock(ctx context.Context, sourceTenantID, destinationTenantID string, blockID ulid.ULID, markers blockMarkers, srcBkt objtools.Bucket, copyFunc objtools.CopyFunc) error { result, err := srcBkt.List(ctx, objtools.ListOptions{ - Prefix: tenantID + objtools.Delim + blockID.String(), + Prefix: sourceTenantID + objtools.Delim + blockID.String(), Recursive: true, }) if err != nil { - return errors.Wrapf(err, "copySingleBlock: failed to list block files for %v/%v", tenantID, blockID.String()) + return errors.Wrapf(err, "copySingleBlock: failed to list block files for %v/%v", sourceTenantID, blockID.String()) } paths := result.ToNames() @@ -346,11 +384,21 @@ func copySingleBlock(ctx context.Context, tenantID string, blockID ulid.ULID, ma // Copy global markers too (skipping deletion mark because deleted blocks are not copied by this tool). if markers.noCompact { - paths = append(paths, tenantID+objtools.Delim+block.NoCompactMarkFilepath(blockID)) + paths = append(paths, sourceTenantID+objtools.Delim+block.NoCompactMarkFilepath(blockID)) } + isCrossTenant := sourceTenantID != destinationTenantID + for _, fullPath := range paths { - err := copyFunc(ctx, fullPath, objtools.CopyOptions{}) + options := objtools.CopyOptions{} + if isCrossTenant { + after, found := strings.CutPrefix(fullPath, sourceTenantID) + if !found { + return fmt.Errorf("unexpected object path that does not begin with sourceTenantID: path=%s, sourceTenantID=%s", fullPath, sourceTenantID) + } + options.DestinationObjectName = destinationTenantID + after + } + err := copyFunc(ctx, fullPath, options) if err != nil { return errors.Wrapf(err, "copySingleBlock: failed to copy %v", fullPath) }