diff --git a/pkg/experiment/compactor/compaction_worker.go b/pkg/experiment/compactor/compaction_worker.go index 2acf15a418..7522bfd85d 100644 --- a/pkg/experiment/compactor/compaction_worker.go +++ b/pkg/experiment/compactor/compaction_worker.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "strconv" "strings" "sync" @@ -19,6 +20,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + _ "go.uber.org/automaxprocs" "golang.org/x/sync/errgroup" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" @@ -47,17 +49,17 @@ type Worker struct { } type Config struct { - JobConcurrency util.ConcurrencyLimit `yaml:"job_capacity"` - JobPollInterval time.Duration `yaml:"job_poll_interval"` - SmallObjectSize int `yaml:"small_object_size_bytes"` - TempDir string `yaml:"temp_dir"` - RequestTimeout time.Duration `yaml:"request_timeout"` + JobConcurrency int `yaml:"job_capacity"` + JobPollInterval time.Duration `yaml:"job_poll_interval"` + SmallObjectSize int `yaml:"small_object_size_bytes"` + TempDir string `yaml:"temp_dir"` + RequestTimeout time.Duration `yaml:"request_timeout"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { const prefix = "compaction-worker." tempdir := filepath.Join(os.TempDir(), "pyroscope-compactor") - f.TextVar(&cfg.JobConcurrency, prefix+"job-concurrency", util.GoMaxProcsConcurrencyLimit(), "Number of concurrent jobs compaction worker will run. Defaults to the number of CPU cores.") + f.IntVar(&cfg.JobConcurrency, prefix+"job-concurrency", 0, "Number of concurrent jobs compaction worker will run. Defaults to the number of CPU cores.") f.DurationVar(&cfg.JobPollInterval, prefix+"job-poll-interval", 5*time.Second, "Interval between job requests") f.DurationVar(&cfg.RequestTimeout, prefix+"request-timeout", 5*time.Second, "Job request timeout.") f.IntVar(&cfg.SmallObjectSize, prefix+"small-object-size-bytes", 8<<20, "Size of the object that can be loaded in memory.") @@ -95,7 +97,10 @@ func New( storage: storage, metrics: newMetrics(reg), } - w.threads = int(config.JobConcurrency) + w.threads = config.JobConcurrency + if w.threads < 1 { + w.threads = runtime.GOMAXPROCS(-1) + } w.queue = make(chan *compactionJob, 2*w.threads) w.jobs = make(map[string]*compactionJob, 2*w.threads) w.capacity.Store(int32(w.threads)) diff --git a/pkg/util/concurrency.go b/pkg/util/concurrency.go deleted file mode 100644 index a7edff0453..0000000000 --- a/pkg/util/concurrency.go +++ /dev/null @@ -1,48 +0,0 @@ -package util - -import ( - "runtime" - "strconv" - - _ "go.uber.org/automaxprocs" -) - -type ConcurrencyLimit int - -func GoMaxProcsConcurrencyLimit() *ConcurrencyLimit { - lim := ConcurrencyLimit(runtime.GOMAXPROCS(-1)) - return &lim -} - -func (c *ConcurrencyLimit) String() string { - if *c == 0 { - return "auto" - } - return strconv.Itoa(int(*c)) -} - -func (c *ConcurrencyLimit) Set(v string) (err error) { - var p int - if v == "" || v == "auto" { - p = runtime.GOMAXPROCS(-1) - } else { - p, err = strconv.Atoi(v) - } - if err != nil { - return err - } - if p < 1 { - *c = 1 - return - } - *c = ConcurrencyLimit(p) - return nil -} - -func (c *ConcurrencyLimit) UnmarshalText(text []byte) error { - return c.Set(string(text)) -} - -func (c *ConcurrencyLimit) MarshalText() ([]byte, error) { - return []byte(c.String()), nil -} diff --git a/tools/doc-generator/parse/parser.go b/tools/doc-generator/parse/parser.go index 775d2bc059..43b7af4df3 100644 --- a/tools/doc-generator/parse/parser.go +++ b/tools/doc-generator/parse/parser.go @@ -13,6 +13,7 @@ import ( "strings" "time" "unicode" + "unsafe" "github.com/go-kit/log" "github.com/grafana/dskit/flagext" @@ -115,8 +116,16 @@ func Flags(cfg flagext.Registerer, logger log.Logger) map[uintptr]*flag.Flag { return } - ptr := reflect.ValueOf(f.Value).Pointer() - flags[ptr] = f + val := reflect.ValueOf(f.Value) + if val.Kind() == reflect.Ptr { + flags[val.Pointer()] = f + } else if val.CanAddr() { + flags[val.UnsafeAddr()] = f + } else if val.CanInterface() { + flags[uintptr(unsafe.Pointer(&f.Value))] = f + } else { + panic("cannot get pointer to flag value") + } }) return flags