diff --git a/pkg/cli/server.go b/pkg/cli/server.go index 4f31392f55..a0895ad1a8 100644 --- a/pkg/cli/server.go +++ b/pkg/cli/server.go @@ -15,8 +15,10 @@ import ( "github.com/pyroscope-io/pyroscope/pkg/analytics" "github.com/pyroscope-io/pyroscope/pkg/config" "github.com/pyroscope-io/pyroscope/pkg/exporter" + "github.com/pyroscope-io/pyroscope/pkg/health" "github.com/pyroscope-io/pyroscope/pkg/server" "github.com/pyroscope-io/pyroscope/pkg/storage" + "github.com/pyroscope-io/pyroscope/pkg/util/bytesize" "github.com/pyroscope-io/pyroscope/pkg/util/debug" ) @@ -29,6 +31,7 @@ type serverService struct { analyticsService *analytics.Service selfProfiling *agent.ProfileSession debugReporter *debug.Reporter + healthController *health.Controller stopped chan struct{} done chan struct{} @@ -55,6 +58,12 @@ func newServerService(logger *logrus.Logger, c *config.Server) (*serverService, return nil, fmt.Errorf("new metric exporter: %w", err) } + diskPressure := health.DiskPressure{ + Threshold: 512 * bytesize.MB, + Path: c.StoragePath, + } + + svc.healthController = health.NewController(svc.logger, time.Minute, diskPressure) svc.debugReporter = debug.NewReporter(svc.logger, svc.storage, svc.config, prometheus.DefaultRegisterer) svc.directUpstream = direct.New(svc.storage, metricsExporter) svc.selfProfiling, _ = agent.NewSession(agent.SessionConfig{ @@ -71,6 +80,7 @@ func newServerService(logger *logrus.Logger, c *config.Server) (*serverService, Configuration: svc.config, Storage: svc.storage, MetricsExporter: metricsExporter, + Notifier: svc.healthController, Logger: svc.logger, MetricsRegisterer: prometheus.DefaultRegisterer, ExportedMetricsRegistry: exportedMetricsRegistry, @@ -101,6 +111,7 @@ func (svc *serverService) Start() error { go svc.analyticsService.Start() } + svc.healthController.Start() svc.directUpstream.Start() if err := svc.selfProfiling.Start(); err != nil { svc.logger.WithError(err).Error("failed to start self-profiling") @@ -134,6 +145,7 @@ func (svc *serverService) Stop() { func (svc *serverService) stop() { svc.controller.Drain() svc.debugReporter.Stop() + svc.healthController.Stop() if svc.analyticsService != nil { svc.analyticsService.Stop() } diff --git a/pkg/health/controller.go b/pkg/health/controller.go new file mode 100644 index 0000000000..29befac4f6 --- /dev/null +++ b/pkg/health/controller.go @@ -0,0 +1,105 @@ +package health + +import ( + "fmt" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +// Controller performs probes of health conditions. +type Controller struct { + m sync.RWMutex + conditions []Condition + history [][]StatusMessage + current []StatusMessage + + interval time.Duration + logger *logrus.Logger + + close chan struct{} +} + +const historySize = 5 + +func NewController(logger *logrus.Logger, interval time.Duration, conditions ...Condition) *Controller { + c := Controller{ + conditions: conditions, + history: make([][]StatusMessage, len(conditions)), + current: make([]StatusMessage, len(conditions)), + interval: interval, + logger: logger, + close: make(chan struct{}), + } + for i := range c.history { + c.history[i] = make([]StatusMessage, historySize) + } + return &c +} + +func (c *Controller) Start() { + c.probe() + go func() { + t := time.NewTicker(c.interval) + defer t.Stop() + for { + select { + case <-c.close: + return + case <-t.C: + c.probe() + } + } + }() +} + +func (c *Controller) Stop() { close(c.close) } + +func (c *Controller) probe() { + c.m.Lock() + defer c.m.Unlock() + for i, condition := range c.conditions { + history := c.history[i] + copy(history, history[1:]) + s, err := condition.Probe() + if err != nil { + s = StatusMessage{Message: err.Error()} + c.logger.WithError(err). + WithField("probe-name", fmt.Sprintf("%T", condition)). + Warn("failed to make probe") + } + history[len(history)-1] = s + current := s + for _, x := range history { + if x.Status > current.Status { + current = x + } + } + c.current[i] = current + } +} + +func (c *Controller) Unhealthy() []StatusMessage { + c.m.RLock() + defer c.m.RUnlock() + m := make([]StatusMessage, 0, len(c.current)) + for _, x := range c.current { + if x.Status > Healthy { + m = append(m, x) + } + } + return m +} + +// NotificationText satisfies server.Notifier. +// +// TODO(kolesnikovae): I think we need to make UI notifications +// structured (explicit status field) and support multiple messages. +// At the moment there can be only one notification. +func (c *Controller) NotificationText() string { + if u := c.Unhealthy(); len(u) > 0 { + return u[0].Message + } + return "" +} diff --git a/pkg/health/controller_test.go b/pkg/health/controller_test.go new file mode 100644 index 0000000000..ab107d6e1b --- /dev/null +++ b/pkg/health/controller_test.go @@ -0,0 +1,135 @@ +package health + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" + + "github.com/pyroscope-io/pyroscope/pkg/config" + "github.com/pyroscope-io/pyroscope/pkg/testing" +) + +type mockCondition struct { + mockData []StatusMessage + name string + index int +} + +func (d *mockCondition) Probe() (StatusMessage, error) { + var status = d.mockData[d.index] + status.Message = fmt.Sprintf("%s %s", status.Message, d.name) + d.index = (d.index + 1) % len(d.mockData) + return status, nil +} + +var _ = Describe("health", func() { + dataHealthy := []StatusMessage{{Healthy, "Healthy"}} + dataWarning := []StatusMessage{{Warning, "Warning"}} + dataCritical := []StatusMessage{{Critical, "Critical"}} + + testing.WithConfig(func(cfg **config.Config) { + Describe("Controller", func() { + It("Should support listening on multiple Conditions", func() { + defer GinkgoRecover() + + condition1 := &mockCondition{name: "MockCondition1", mockData: dataHealthy} + condition2 := &mockCondition{name: "MockCondition2", mockData: dataCritical} + condition3 := &mockCondition{name: "MockCondition3", mockData: dataWarning} + + healthController := NewController(logrus.New(), time.Millisecond, condition1, condition2, condition3) + healthController.Start() + + notification := healthController.Unhealthy() + healthController.Stop() + + Expect(notification).To(ContainElements([]StatusMessage{ + {Critical, "Critical MockCondition2"}, + {Warning, "Warning MockCondition3"}, + })) + }) + + It("Should suppress 'flapping' on rapid status changes", func() { + defer GinkgoRecover() + + condition := &mockCondition{mockData: []StatusMessage{ + {Status: Healthy}, + {Status: Healthy}, + {Status: Warning}, + {Status: Healthy}, + {Status: Critical}, + {Status: Healthy}, + {Status: Critical}, + {Status: Healthy}, + {Status: Healthy}, + {Status: Healthy}, + {Status: Healthy}, + }} + + healthController := NewController(logrus.New(), time.Minute, condition) + healthController.Start() + Expect(healthController.Unhealthy()).To(BeEmpty()) + healthController.probe() + Expect(healthController.Unhealthy()).To(BeEmpty()) + + healthController.probe() + requireStatus(healthController.Unhealthy(), Warning) + healthController.probe() + requireStatus(healthController.Unhealthy(), Warning) + healthController.probe() + requireStatus(healthController.Unhealthy(), Critical) + healthController.probe() + requireStatus(healthController.Unhealthy(), Critical) + healthController.probe() + requireStatus(healthController.Unhealthy(), Critical) + healthController.probe() + healthController.probe() + healthController.probe() + healthController.probe() + healthController.probe() + + Expect(healthController.Unhealthy()).To(BeEmpty()) + healthController.Stop() + }) + + It("Should return empty notification if status healthy", func() { + defer GinkgoRecover() + + condition := &mockCondition{name: "MockCondition", mockData: dataHealthy} + + healthController := NewController(logrus.New(), time.Millisecond, condition) + healthController.Start() + + notification := healthController.Unhealthy() + healthController.Stop() + + Expect(notification).To(BeEmpty()) + }) + + It("Satisfies notifier interface", func() { + defer GinkgoRecover() + + condition1 := &mockCondition{name: "MockCondition1", mockData: dataCritical} + condition2 := &mockCondition{name: "MockCondition2", mockData: dataWarning} + + healthController := NewController(logrus.New(), time.Millisecond, condition1, condition2) + healthController.Start() + + actualNotification := healthController.Unhealthy() + healthController.Stop() + + Expect(actualNotification).To(ConsistOf([]StatusMessage{ + {Critical, "Critical MockCondition1"}, + {Warning, "Warning MockCondition2"}, + })) + }) + }) + }) +}) + +func requireStatus(s []StatusMessage, x Status) { + Expect(len(s)).To(Equal(1)) + Expect(s[0].Status).To(Equal(x)) +} diff --git a/pkg/health/disk_pressure.go b/pkg/health/disk_pressure.go new file mode 100644 index 0000000000..199659e9af --- /dev/null +++ b/pkg/health/disk_pressure.go @@ -0,0 +1,28 @@ +package health + +import ( + "fmt" + + "github.com/pyroscope-io/pyroscope/pkg/util/bytesize" + "github.com/pyroscope-io/pyroscope/pkg/util/disk" +) + +type DiskPressure struct { + Threshold bytesize.ByteSize + Path string +} + +func (d DiskPressure) Probe() (StatusMessage, error) { + var m StatusMessage + available, err := disk.FreeSpace(d.Path) + if err != nil { + return m, err + } + if available < d.Threshold { + m.Status = Critical + } else { + m.Status = Healthy + } + m.Message = fmt.Sprintf("Disk space is running low: %v available", available) + return m, nil +} diff --git a/pkg/health/health.go b/pkg/health/health.go new file mode 100644 index 0000000000..7d455498d8 --- /dev/null +++ b/pkg/health/health.go @@ -0,0 +1,34 @@ +package health + +// Condition represents an aspect of pyroscope server health. +type Condition interface { + Probe() (StatusMessage, error) +} + +type StatusMessage struct { + Status + // The message is displayed to users. + Message string +} + +type Status int + +const ( + NoData Status = iota + Healthy + Warning + Critical +) + +func (s Status) String() string { + switch s { + case Healthy: + return "Healthy" + case Warning: + return "Warning" + case Critical: + return "Critical" + default: + return "Unknown" + } +} diff --git a/pkg/health/health_suite_test.go b/pkg/health/health_suite_test.go new file mode 100644 index 0000000000..a6cf7ce593 --- /dev/null +++ b/pkg/health/health_suite_test.go @@ -0,0 +1,13 @@ +package health_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestHealth(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Health Suite") +} diff --git a/pkg/server/build_test.go b/pkg/server/build_test.go index 21dfd7dce4..d92d8c7dbe 100644 --- a/pkg/server/build_test.go +++ b/pkg/server/build_test.go @@ -37,6 +37,7 @@ var _ = Describe("server", func() { Logger: logrus.New(), MetricsRegisterer: prometheus.NewRegistry(), ExportedMetricsRegistry: prometheus.NewRegistry(), + Notifier: mockNotifier{}, }) h, _ := c.mux() httpServer := httptest.NewServer(h) diff --git a/pkg/server/config_test.go b/pkg/server/config_test.go index 8d6d49d0e4..21e5f82245 100644 --- a/pkg/server/config_test.go +++ b/pkg/server/config_test.go @@ -36,6 +36,7 @@ var _ = Describe("server", func() { Logger: logrus.New(), MetricsRegisterer: prometheus.NewRegistry(), ExportedMetricsRegistry: prometheus.NewRegistry(), + Notifier: mockNotifier{}, }) h, _ := c.mux() httpServer := httptest.NewServer(h) diff --git a/pkg/server/controller.go b/pkg/server/controller.go index 36c43b1e56..67372d9a58 100644 --- a/pkg/server/controller.go +++ b/pkg/server/controller.go @@ -46,6 +46,7 @@ type Controller struct { storage *storage.Storage log *logrus.Logger httpServer *http.Server + notifier Notifier metricsMdw middleware.Middleware dir http.FileSystem @@ -66,6 +67,7 @@ type Config struct { Configuration *config.Server *logrus.Logger *storage.Storage + Notifier // The registerer is used for exposing server metrics. MetricsRegisterer prometheus.Registerer @@ -75,12 +77,20 @@ type Config struct { storage.MetricsExporter } +type Notifier interface { + // NotificationText returns message that will be displayed to user + // on index page load. The message should point user to a critical problem. + // TODO(kolesnikovae): we should poll for notifications (or subscribe). + NotificationText() string +} + func New(c Config) (*Controller, error) { ctrl := Controller{ config: c.Configuration, log: c.Logger, storage: c.Storage, exporter: c.MetricsExporter, + notifier: c.Notifier, stats: make(map[string]int), appStats: mustNewHLL(), diff --git a/pkg/server/controller_gzip_test.go b/pkg/server/controller_gzip_test.go index 7f0c142cba..7c032afcaa 100644 --- a/pkg/server/controller_gzip_test.go +++ b/pkg/server/controller_gzip_test.go @@ -49,6 +49,7 @@ var _ = Describe("server", func() { Logger: logrus.New(), MetricsRegisterer: prometheus.NewRegistry(), ExportedMetricsRegistry: prometheus.NewRegistry(), + Notifier: mockNotifier{}, }) c.dir = http.Dir(tempAssetDir.Path) h, _ := c.getHandler() diff --git a/pkg/server/controller_https_test.go b/pkg/server/controller_https_test.go index 49eefe5449..dc2527ccb6 100644 --- a/pkg/server/controller_https_test.go +++ b/pkg/server/controller_https_test.go @@ -42,6 +42,7 @@ var _ = Describe("server", func() { Logger: logrus.New(), MetricsRegisterer: prometheus.NewRegistry(), ExportedMetricsRegistry: prometheus.NewRegistry(), + Notifier: mockNotifier{}, }) c.dir = http.Dir(testDataDir) @@ -80,6 +81,7 @@ var _ = Describe("server", func() { Logger: logrus.New(), MetricsRegisterer: prometheus.NewRegistry(), ExportedMetricsRegistry: prometheus.NewRegistry(), + Notifier: mockNotifier{}, }) c.dir = http.Dir(testDataDir) diff --git a/pkg/server/handler.go b/pkg/server/handler.go index fa8b83bee6..f2c5fec4d6 100644 --- a/pkg/server/handler.go +++ b/pkg/server/handler.go @@ -270,16 +270,11 @@ func (ctrl *Controller) renderIndexPage(w http.ResponseWriter, _ *http.Request) "LatestVersionInfo": updates.LatestVersionJSON(), "ExtraMetadata": extraMetadataStr, "BaseURL": ctrl.config.BaseURL, - "NotificationText": ctrl.NotificationText(), + "NotificationText": ctrl.notifier.NotificationText(), "IsAuthRequired": strconv.FormatBool(ctrl.isAuthRequired()), }) } -func (*Controller) NotificationText() string { - // TODO: implement backend support for alert text - return "" -} - func mustExecute(t *template.Template, w io.Writer, v interface{}) { if err := t.Execute(w, v); err != nil { panic(err) diff --git a/pkg/server/http.go b/pkg/server/http.go index 0eca353a32..a41f43861c 100644 --- a/pkg/server/http.go +++ b/pkg/server/http.go @@ -14,11 +14,6 @@ func (ctrl *Controller) addRoutes(mux *http.ServeMux, routes []route, } } -// the metrics middleware needs to be explicit passed -// since it requires access to the pattern string -// otherwise it would infer route from the url, which would explode the cardinality -type metricsMiddleware func(name string) func(http.HandlerFunc) http.HandlerFunc - func chain(f http.HandlerFunc, middleware ...func(http.HandlerFunc) http.HandlerFunc) http.HandlerFunc { if len(middleware) == 0 { return f diff --git a/pkg/server/ingest_test.go b/pkg/server/ingest_test.go index 7a2ced1b38..bd4ea6a8f8 100644 --- a/pkg/server/ingest_test.go +++ b/pkg/server/ingest_test.go @@ -49,6 +49,7 @@ var _ = Describe("server", func() { Logger: logrus.New(), MetricsRegisterer: prometheus.NewRegistry(), ExportedMetricsRegistry: prometheus.NewRegistry(), + Notifier: mockNotifier{}, }) h, _ := c.mux() httpServer := httptest.NewServer(h) diff --git a/pkg/server/mocks.go b/pkg/server/mocks.go new file mode 100644 index 0000000000..f51565890d --- /dev/null +++ b/pkg/server/mocks.go @@ -0,0 +1,5 @@ +package server + +type mockNotifier struct{} + +func (mockNotifier) NotificationText() string { return "" } diff --git a/pkg/server/render_test.go b/pkg/server/render_test.go index 7bb05765d6..731b3a64ea 100644 --- a/pkg/server/render_test.go +++ b/pkg/server/render_test.go @@ -76,6 +76,7 @@ var _ = Describe("server", func() { Logger: logrus.New(), MetricsRegisterer: prometheus.NewRegistry(), ExportedMetricsRegistry: prometheus.NewRegistry(), + Notifier: mockNotifier{}, }) h, _ := c.mux() httpServer = httptest.NewServer(h) diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 37d663acbb..4c97edda88 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -84,9 +84,6 @@ func (s *Storage) CollectLocalProfiles() error { func (s *Storage) PutLocal(po *PutInput) error { logrus.Debug("PutLocal") - if err := s.performFreeSpaceCheck(); err != nil { - return err - } name := fmt.Sprintf("%d-%s.profile", po.StartTime.Unix(), po.Key.AppName()) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 1e33b00553..5dacd6d577 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -27,7 +27,6 @@ import ( "github.com/pyroscope-io/pyroscope/pkg/storage/segment" "github.com/pyroscope-io/pyroscope/pkg/storage/tree" "github.com/pyroscope-io/pyroscope/pkg/util/bytesize" - "github.com/pyroscope-io/pyroscope/pkg/util/disk" "github.com/pyroscope-io/pyroscope/pkg/util/slices" ) @@ -213,17 +212,11 @@ type PutInput struct { AggregationType string } -var OutOfSpaceThreshold = 512 * bytesize.MB - func (s *Storage) Put(pi *PutInput) error { // TODO: This is a pretty broad lock. We should find a way to make these locks more selective. s.putMutex.Lock() defer s.putMutex.Unlock() - if err := s.performFreeSpaceCheck(); err != nil { - return err - } - if pi.StartTime.Before(s.lifetimeBasedRetentionThreshold()) { return errRetention } @@ -705,13 +698,3 @@ func (s *Storage) lifetimeBasedRetentionThreshold() time.Time { } return t } - -func (s *Storage) performFreeSpaceCheck() error { - freeSpace, err := disk.FreeSpace(s.config.StoragePath) - if err == nil { - if freeSpace < OutOfSpaceThreshold { - return errOutOfSpace - } - } - return nil -} diff --git a/pkg/util/bytesize/bytesize.go b/pkg/util/bytesize/bytesize.go index a699693070..b41c61a7e3 100644 --- a/pkg/util/bytesize/bytesize.go +++ b/pkg/util/bytesize/bytesize.go @@ -10,17 +10,17 @@ import ( type ByteSize int64 -var Byte ByteSize = 1 +const Byte ByteSize = 1 + +const ( + // TODO: fix units - SI and IEC standards swapped. -var ( KB = 1024 * Byte MB = 1024 * KB GB = 1024 * MB TB = 1024 * GB PB = 1024 * TB -) -var ( KiB = 1000 * Byte MiB = 1000 * KiB GiB = 1000 * MiB diff --git a/pkg/util/disk/usage_unix.go b/pkg/util/disk/usage_unix.go index 6ee51a7d21..ef257220a1 100644 --- a/pkg/util/disk/usage_unix.go +++ b/pkg/util/disk/usage_unix.go @@ -15,5 +15,5 @@ func FreeSpace(storagePath string) (bytesize.ByteSize, error) { return 0, err } - return bytesize.ByteSize(fs.Bfree) * bytesize.ByteSize(fs.Bsize), nil + return bytesize.ByteSize(fs.Bavail) * bytesize.ByteSize(fs.Bsize), nil }