Skip to content

Commit

Permalink
feat(ops): implement out of space notification #348 (#456)
Browse files Browse the repository at this point in the history
* create pkg health

* wire up health notifications to server.controller

* pkg health

* wire up notifications to server.handler

* add tests

* Convert NotificationText to json

* Remove debug logs

* Bavail for disk space check

* lint fixes

* SCA errors

* state -> status

* remove redundant comments

* remove unused ticker

* use constructor method, mock health controller for tests

* NotificationText -> Notification

* add ControllerArgs

* remove redundant build constraint

* On Error push NoData to history

* ControllerArgs -> ControllerConfig

* move dealloc to serverService

* Refactor health controller

* Solve conflicts and refactor health controller

* Clarify NotificationText implementation limitations

* Refactor health controller tests

Co-authored-by: Anton Kolesnikov <[email protected]>
  • Loading branch information
gawicks and kolesnikovae authored Oct 19, 2021
1 parent 8b7bf9d commit e33e0b3
Show file tree
Hide file tree
Showing 20 changed files with 355 additions and 36 deletions.
12 changes: 12 additions & 0 deletions pkg/cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import (
"github.com/pyroscope-io/pyroscope/pkg/analytics"
"github.com/pyroscope-io/pyroscope/pkg/config"
"github.com/pyroscope-io/pyroscope/pkg/exporter"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/pyroscope-io/pyroscope/pkg/server"
"github.com/pyroscope-io/pyroscope/pkg/storage"
"github.com/pyroscope-io/pyroscope/pkg/util/bytesize"
"github.com/pyroscope-io/pyroscope/pkg/util/debug"
)

Expand All @@ -29,6 +31,7 @@ type serverService struct {
analyticsService *analytics.Service
selfProfiling *agent.ProfileSession
debugReporter *debug.Reporter
healthController *health.Controller

stopped chan struct{}
done chan struct{}
Expand All @@ -55,6 +58,12 @@ func newServerService(logger *logrus.Logger, c *config.Server) (*serverService,
return nil, fmt.Errorf("new metric exporter: %w", err)
}

diskPressure := health.DiskPressure{
Threshold: 512 * bytesize.MB,
Path: c.StoragePath,
}

svc.healthController = health.NewController(svc.logger, time.Minute, diskPressure)
svc.debugReporter = debug.NewReporter(svc.logger, svc.storage, svc.config, prometheus.DefaultRegisterer)
svc.directUpstream = direct.New(svc.storage, metricsExporter)
svc.selfProfiling, _ = agent.NewSession(agent.SessionConfig{
Expand All @@ -71,6 +80,7 @@ func newServerService(logger *logrus.Logger, c *config.Server) (*serverService,
Configuration: svc.config,
Storage: svc.storage,
MetricsExporter: metricsExporter,
Notifier: svc.healthController,
Logger: svc.logger,
MetricsRegisterer: prometheus.DefaultRegisterer,
ExportedMetricsRegistry: exportedMetricsRegistry,
Expand Down Expand Up @@ -101,6 +111,7 @@ func (svc *serverService) Start() error {
go svc.analyticsService.Start()
}

svc.healthController.Start()
svc.directUpstream.Start()
if err := svc.selfProfiling.Start(); err != nil {
svc.logger.WithError(err).Error("failed to start self-profiling")
Expand Down Expand Up @@ -134,6 +145,7 @@ func (svc *serverService) Stop() {
func (svc *serverService) stop() {
svc.controller.Drain()
svc.debugReporter.Stop()
svc.healthController.Stop()
if svc.analyticsService != nil {
svc.analyticsService.Stop()
}
Expand Down
105 changes: 105 additions & 0 deletions pkg/health/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package health

import (
"fmt"
"sync"
"time"

"github.com/sirupsen/logrus"
)

// Controller performs probes of health conditions.
type Controller struct {
m sync.RWMutex
conditions []Condition
history [][]StatusMessage
current []StatusMessage

interval time.Duration
logger *logrus.Logger

close chan struct{}
}

const historySize = 5

func NewController(logger *logrus.Logger, interval time.Duration, conditions ...Condition) *Controller {
c := Controller{
conditions: conditions,
history: make([][]StatusMessage, len(conditions)),
current: make([]StatusMessage, len(conditions)),
interval: interval,
logger: logger,
close: make(chan struct{}),
}
for i := range c.history {
c.history[i] = make([]StatusMessage, historySize)
}
return &c
}

func (c *Controller) Start() {
c.probe()
go func() {
t := time.NewTicker(c.interval)
defer t.Stop()
for {
select {
case <-c.close:
return
case <-t.C:
c.probe()
}
}
}()
}

func (c *Controller) Stop() { close(c.close) }

func (c *Controller) probe() {
c.m.Lock()
defer c.m.Unlock()
for i, condition := range c.conditions {
history := c.history[i]
copy(history, history[1:])
s, err := condition.Probe()
if err != nil {
s = StatusMessage{Message: err.Error()}
c.logger.WithError(err).
WithField("probe-name", fmt.Sprintf("%T", condition)).
Warn("failed to make probe")
}
history[len(history)-1] = s
current := s
for _, x := range history {
if x.Status > current.Status {
current = x
}
}
c.current[i] = current
}
}

func (c *Controller) Unhealthy() []StatusMessage {
c.m.RLock()
defer c.m.RUnlock()
m := make([]StatusMessage, 0, len(c.current))
for _, x := range c.current {
if x.Status > Healthy {
m = append(m, x)
}
}
return m
}

// NotificationText satisfies server.Notifier.
//
// TODO(kolesnikovae): I think we need to make UI notifications
// structured (explicit status field) and support multiple messages.
// At the moment there can be only one notification.
func (c *Controller) NotificationText() string {
if u := c.Unhealthy(); len(u) > 0 {
return u[0].Message
}
return ""
}
135 changes: 135 additions & 0 deletions pkg/health/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package health

import (
"fmt"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"

"github.com/pyroscope-io/pyroscope/pkg/config"
"github.com/pyroscope-io/pyroscope/pkg/testing"
)

type mockCondition struct {
mockData []StatusMessage
name string
index int
}

func (d *mockCondition) Probe() (StatusMessage, error) {
var status = d.mockData[d.index]
status.Message = fmt.Sprintf("%s %s", status.Message, d.name)
d.index = (d.index + 1) % len(d.mockData)
return status, nil
}

var _ = Describe("health", func() {
dataHealthy := []StatusMessage{{Healthy, "Healthy"}}
dataWarning := []StatusMessage{{Warning, "Warning"}}
dataCritical := []StatusMessage{{Critical, "Critical"}}

testing.WithConfig(func(cfg **config.Config) {
Describe("Controller", func() {
It("Should support listening on multiple Conditions", func() {
defer GinkgoRecover()

condition1 := &mockCondition{name: "MockCondition1", mockData: dataHealthy}
condition2 := &mockCondition{name: "MockCondition2", mockData: dataCritical}
condition3 := &mockCondition{name: "MockCondition3", mockData: dataWarning}

healthController := NewController(logrus.New(), time.Millisecond, condition1, condition2, condition3)
healthController.Start()

notification := healthController.Unhealthy()
healthController.Stop()

Expect(notification).To(ContainElements([]StatusMessage{
{Critical, "Critical MockCondition2"},
{Warning, "Warning MockCondition3"},
}))
})

It("Should suppress 'flapping' on rapid status changes", func() {
defer GinkgoRecover()

condition := &mockCondition{mockData: []StatusMessage{
{Status: Healthy},
{Status: Healthy},
{Status: Warning},
{Status: Healthy},
{Status: Critical},
{Status: Healthy},
{Status: Critical},
{Status: Healthy},
{Status: Healthy},
{Status: Healthy},
{Status: Healthy},
}}

healthController := NewController(logrus.New(), time.Minute, condition)
healthController.Start()
Expect(healthController.Unhealthy()).To(BeEmpty())
healthController.probe()
Expect(healthController.Unhealthy()).To(BeEmpty())

healthController.probe()
requireStatus(healthController.Unhealthy(), Warning)
healthController.probe()
requireStatus(healthController.Unhealthy(), Warning)
healthController.probe()
requireStatus(healthController.Unhealthy(), Critical)
healthController.probe()
requireStatus(healthController.Unhealthy(), Critical)
healthController.probe()
requireStatus(healthController.Unhealthy(), Critical)
healthController.probe()
healthController.probe()
healthController.probe()
healthController.probe()
healthController.probe()

Expect(healthController.Unhealthy()).To(BeEmpty())
healthController.Stop()
})

It("Should return empty notification if status healthy", func() {
defer GinkgoRecover()

condition := &mockCondition{name: "MockCondition", mockData: dataHealthy}

healthController := NewController(logrus.New(), time.Millisecond, condition)
healthController.Start()

notification := healthController.Unhealthy()
healthController.Stop()

Expect(notification).To(BeEmpty())
})

It("Satisfies notifier interface", func() {
defer GinkgoRecover()

condition1 := &mockCondition{name: "MockCondition1", mockData: dataCritical}
condition2 := &mockCondition{name: "MockCondition2", mockData: dataWarning}

healthController := NewController(logrus.New(), time.Millisecond, condition1, condition2)
healthController.Start()

actualNotification := healthController.Unhealthy()
healthController.Stop()

Expect(actualNotification).To(ConsistOf([]StatusMessage{
{Critical, "Critical MockCondition1"},
{Warning, "Warning MockCondition2"},
}))
})
})
})
})

func requireStatus(s []StatusMessage, x Status) {
Expect(len(s)).To(Equal(1))
Expect(s[0].Status).To(Equal(x))
}
28 changes: 28 additions & 0 deletions pkg/health/disk_pressure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package health

import (
"fmt"

"github.com/pyroscope-io/pyroscope/pkg/util/bytesize"
"github.com/pyroscope-io/pyroscope/pkg/util/disk"
)

type DiskPressure struct {
Threshold bytesize.ByteSize
Path string
}

func (d DiskPressure) Probe() (StatusMessage, error) {
var m StatusMessage
available, err := disk.FreeSpace(d.Path)
if err != nil {
return m, err
}
if available < d.Threshold {
m.Status = Critical
} else {
m.Status = Healthy
}
m.Message = fmt.Sprintf("Disk space is running low: %v available", available)
return m, nil
}
34 changes: 34 additions & 0 deletions pkg/health/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package health

// Condition represents an aspect of pyroscope server health.
type Condition interface {
Probe() (StatusMessage, error)
}

type StatusMessage struct {
Status
// The message is displayed to users.
Message string
}

type Status int

const (
NoData Status = iota
Healthy
Warning
Critical
)

func (s Status) String() string {
switch s {
case Healthy:
return "Healthy"
case Warning:
return "Warning"
case Critical:
return "Critical"
default:
return "Unknown"
}
}
13 changes: 13 additions & 0 deletions pkg/health/health_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package health_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestHealth(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Health Suite")
}
1 change: 1 addition & 0 deletions pkg/server/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var _ = Describe("server", func() {
Logger: logrus.New(),
MetricsRegisterer: prometheus.NewRegistry(),
ExportedMetricsRegistry: prometheus.NewRegistry(),
Notifier: mockNotifier{},
})
h, _ := c.mux()
httpServer := httptest.NewServer(h)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var _ = Describe("server", func() {
Logger: logrus.New(),
MetricsRegisterer: prometheus.NewRegistry(),
ExportedMetricsRegistry: prometheus.NewRegistry(),
Notifier: mockNotifier{},
})
h, _ := c.mux()
httpServer := httptest.NewServer(h)
Expand Down
Loading

0 comments on commit e33e0b3

Please sign in to comment.