diff --git a/metrics/collector.go b/metrics/collector.go index a711562a9..c77e0c2b0 100644 --- a/metrics/collector.go +++ b/metrics/collector.go @@ -11,7 +11,6 @@ import ( "github.com/cybozu-go/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - v3 "go.etcd.io/etcd/client/v3" ) type logger struct{} @@ -40,10 +39,13 @@ type metricGroup struct { // This abstraction is for mock test. type storage interface { IsSabakanDisabled(context.Context) (bool, error) + GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error) + GetCluster(ctx context.Context) (*cke.Cluster, error) } // NewCollector returns a new prometheus.Collector. -func NewCollector(client *v3.Client) prometheus.Collector { +func NewCollector(storage storage) prometheus.Collector { + return &collector{ metrics: map[string]metricGroup{ "leader": { @@ -55,7 +57,7 @@ func NewCollector(client *v3.Client) prometheus.Collector { isAvailable: isOperationPhaseAvailable, }, "reboot": { - collectors: []prometheus.Collector{rebootQueueEntries, rebootQueueItems, nodeRebootStatus}, + collectors: []prometheus.Collector{nodeMetricsCollector{storage}}, isAvailable: isRebootAvailable, }, "sabakan_integration": { @@ -63,7 +65,7 @@ func NewCollector(client *v3.Client) prometheus.Collector { isAvailable: isSabakanIntegrationAvailable, }, }, - storage: &cke.Storage{Client: client}, + storage: storage, } } @@ -120,3 +122,68 @@ func (c collector) Collect(ch chan<- prometheus.Metric) { } wg.Wait() } + +// nodeMetricsCollector implements prometheus.Collector interface. +type nodeMetricsCollector struct { + storage storage +} + +var _ prometheus.Collector = &nodeMetricsCollector{} + +func (c nodeMetricsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- rebootQueueEntries + ch <- rebootQueueItems + ch <- nodeRebootStatus +} + +func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + rqEntries, err := c.storage.GetRebootsEntries(ctx) + if err != nil { + log.Error("failed to get reboots entries", map[string]interface{}{ + log.FnError: err, + }) + return + } + + cluster, err := c.storage.GetCluster(ctx) + if err != nil { + log.Error("failed to get cluster", map[string]interface{}{ + log.FnError: err, + }) + return + } + itemCounts := cke.CountRebootQueueEntries(rqEntries) + nodeStatus := cke.BuildNodeRebootStatus(cluster.Nodes, rqEntries) + + ch <- prometheus.MustNewConstMetric( + rebootQueueEntries, + prometheus.GaugeValue, + float64(len(rqEntries)), + ) + for status, count := range itemCounts { + ch <- prometheus.MustNewConstMetric( + rebootQueueItems, + prometheus.GaugeValue, + float64(count), + status, + ) + } + for node, statuses := range nodeStatus { + for status, matches := range statuses { + value := float64(0) + if matches { + value = 1 + } + ch <- prometheus.MustNewConstMetric( + nodeRebootStatus, + prometheus.GaugeValue, + value, + node, + status, + ) + } + } +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 7aa387ed5..4737a00a7 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -33,29 +33,25 @@ var operationPhaseTimestampSeconds = prometheus.NewGauge( }, ) -var rebootQueueEntries = prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: "reboot_queue_entries", - Help: "The number of reboot queue entries remaining.", - }, +var rebootQueueEntries = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "reboot_queue_entries"), + "The number of reboot queue entries remaining.", + nil, + nil, ) -var rebootQueueItems = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: "reboot_queue_items", - Help: "The number of reboot queue entries remaining per status.", - }, +var rebootQueueItems = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "reboot_queue_items"), + "The number of reboot queue entries remaining per status.", []string{"status"}, + nil, ) -var nodeRebootStatus = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: "node_reboot_status", - Help: "The reboot status of a node.", - }, []string{"node", "status"}, +var nodeRebootStatus = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "node_reboot_status"), + "The reboot status of a node.", + []string{"node", "status"}, + nil, ) var sabakanIntegrationSuccessful = prometheus.NewGauge( diff --git a/metrics/updater.go b/metrics/updater.go index 1579732ce..e805b04d9 100644 --- a/metrics/updater.go +++ b/metrics/updater.go @@ -39,36 +39,6 @@ func isOperationPhaseAvailable(_ context.Context, _ storage) (bool, error) { return isLeader, nil } -// UpdateRebootQueueEntries updates "reboot_queue_entries". -func UpdateRebootQueueEntries(numEntries int) { - rebootQueueEntries.Set(float64(numEntries)) -} - -// UpdateRebootQueueItems updates "reboot_queue_items". -func UpdateRebootQueueItems(counts map[string]int) { - for status, count := range counts { - rebootQueueItems.With(map[string]string{ - "status": status, - }).Set(float64(count)) - } -} - -// UpdateNodeRebootStatus updates "node_reboot_status". -func UpdateNodeRebootStatus(nodeStatus map[string]map[string]bool) { - for node, statuses := range nodeStatus { - for status, matches := range statuses { - value := float64(0) - if matches { - value = 1 - } - nodeRebootStatus.With(map[string]string{ - "node": node, - "status": status, - }).Set(value) - } - } -} - func isRebootAvailable(_ context.Context, _ storage) (bool, error) { return isLeader, nil } diff --git a/metrics/updater_test.go b/metrics/updater_test.go index 4a67f5b83..d824c3511 100644 --- a/metrics/updater_test.go +++ b/metrics/updater_test.go @@ -43,13 +43,13 @@ type updateOperationPhaseTestCase struct { type updateRebootQueueEntriesTestCase struct { name string - input int + input []*cke.RebootQueueEntry expected float64 } type updateRebootQueueItemsTestCase struct { name string - input map[string]int + input []*cke.RebootQueueEntry expected map[string]float64 } @@ -252,17 +252,22 @@ func testUpdateRebootQueueEntries(t *testing.T) { testCases := []updateRebootQueueEntriesTestCase{ { name: "zero", - input: 0, + input: nil, expected: 0, }, { - name: "one", - input: 1, + name: "one", + input: []*cke.RebootQueueEntry{ + {Status: cke.RebootStatusQueued}, + }, expected: 1, }, { - name: "two", - input: 2, + name: "two", + input: []*cke.RebootQueueEntry{ + {Status: cke.RebootStatusQueued}, + {Status: cke.RebootStatusRebooting}, + }, expected: 2, }, } @@ -271,11 +276,10 @@ func testUpdateRebootQueueEntries(t *testing.T) { ctx := context.Background() defer ctx.Done() - collector, _ := newTestCollector() + collector, storage := newTestCollector() + storage.setRebootsEntries(tt.input) handler := GetHandler(collector) - UpdateRebootQueueEntries(tt.input) - w := httptest.NewRecorder() req := httptest.NewRequest("GET", "/metrics", nil) handler.ServeHTTP(w, req) @@ -309,28 +313,44 @@ func testUpdateRebootQueueItems(t *testing.T) { testCases := []updateRebootQueueItemsTestCase{ { name: "zero", - input: map[string]int{ - "queued": 1, - "draining": 2, - "rebooting": 3, + input: []*cke.RebootQueueEntry{ + {Status: cke.RebootStatusQueued}, + {Status: cke.RebootStatusDraining}, + {Status: cke.RebootStatusDraining}, + {Status: cke.RebootStatusRebooting}, + {Status: cke.RebootStatusRebooting}, + {Status: cke.RebootStatusRebooting}, }, expected: map[string]float64{ "queued": 1.0, "draining": 2.0, "rebooting": 3.0, + "cancelled": 0.0, }, }, { name: "one", - input: map[string]int{ - "queued": 4, - "draining": 5, - "cancelled": 6, + input: []*cke.RebootQueueEntry{ + {Status: cke.RebootStatusQueued}, + {Status: cke.RebootStatusQueued}, + {Status: cke.RebootStatusQueued}, + {Status: cke.RebootStatusQueued}, + {Status: cke.RebootStatusDraining}, + {Status: cke.RebootStatusDraining}, + {Status: cke.RebootStatusDraining}, + {Status: cke.RebootStatusDraining}, + {Status: cke.RebootStatusDraining}, + {Status: cke.RebootStatusCancelled}, + {Status: cke.RebootStatusCancelled}, + {Status: cke.RebootStatusCancelled}, + {Status: cke.RebootStatusCancelled}, + {Status: cke.RebootStatusCancelled}, + {Status: cke.RebootStatusCancelled}, }, expected: map[string]float64{ "queued": 4.0, "draining": 5.0, - "rebooting": 3.0, + "rebooting": 0.0, "cancelled": 6.0, }, }, @@ -340,11 +360,10 @@ func testUpdateRebootQueueItems(t *testing.T) { ctx := context.Background() defer ctx.Done() - collector, _ := newTestCollector() + collector, storage := newTestCollector() + storage.setRebootsEntries(tt.input) handler := GetHandler(collector) - UpdateRebootQueueItems(tt.input) - w := httptest.NewRecorder() req := httptest.NewRequest("GET", "/metrics", nil) handler.ServeHTTP(w, req) @@ -374,7 +393,25 @@ func testUpdateRebootQueueItems(t *testing.T) { } func testUpdateNodeRebootStatus(t *testing.T) { - input := map[string]map[string]bool{ + inputCluster := &cke.Cluster{ + Nodes: []*cke.Node{ + { + Address: "192.168.1.11", + Hostname: "node1", + }, + { + Address: "192.168.1.12", + Hostname: "node2", + }, + }, + } + inputEntries := []*cke.RebootQueueEntry{ + { + Node: "192.168.1.11", + Status: cke.RebootStatusRebooting, + }, + } + expected := map[string]map[string]bool{ "node1": { "queued": false, "draining": false, @@ -388,16 +425,15 @@ func testUpdateNodeRebootStatus(t *testing.T) { "cancelled": false, }, } - expected := input ctx := context.Background() defer ctx.Done() - collector, _ := newTestCollector() + collector, storage := newTestCollector() + storage.setCluster(inputCluster) + storage.setRebootsEntries(inputEntries) handler := GetHandler(collector) - UpdateNodeRebootStatus(input) - w := httptest.NewRecorder() req := httptest.NewRequest("GET", "/metrics", nil) handler.ServeHTTP(w, req) @@ -579,14 +615,17 @@ func testUpdateSabakanIntegration(t *testing.T) { } func newTestCollector() (prometheus.Collector, *testStorage) { - c := NewCollector(nil) - s := &testStorage{} - c.(*collector).storage = s + s := &testStorage{ + cluster: new(cke.Cluster), + } + c := NewCollector(s) return c, s } type testStorage struct { sabakanEnabled bool + rebootEntries []*cke.RebootQueueEntry + cluster *cke.Cluster } func (s *testStorage) enableSabakan(flag bool) { @@ -597,6 +636,22 @@ func (s *testStorage) IsSabakanDisabled(_ context.Context) (bool, error) { return !s.sabakanEnabled, nil } +func (s *testStorage) setRebootsEntries(entries []*cke.RebootQueueEntry) { + s.rebootEntries = entries +} + +func (s *testStorage) GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error) { + return s.rebootEntries, nil +} + +func (s *testStorage) setCluster(cluster *cke.Cluster) { + s.cluster = cluster +} + +func (s *testStorage) GetCluster(ctx context.Context) (*cke.Cluster, error) { + return s.cluster, nil +} + func labelToMap(labelPair []*dto.LabelPair) map[string]string { res := make(map[string]string) for _, l := range labelPair { diff --git a/mtest/reboot_test.go b/mtest/reboot_test.go index 827952def..30488232e 100644 --- a/mtest/reboot_test.go +++ b/mtest/reboot_test.go @@ -94,7 +94,6 @@ func testRebootOperations() { // - RebootDrainTimeoutOp // - RebootUncordonOp // - RebootDequeueOp - // - RebootRecalcMetricsOp cluster := getCluster() for i := 0; i < 3; i++ { diff --git a/op/reboot.go b/op/reboot.go index 2a9533072..f86ef94e6 100644 --- a/op/reboot.go +++ b/op/reboot.go @@ -9,7 +9,6 @@ import ( "time" "github.com/cybozu-go/cke" - "github.com/cybozu-go/cke/metrics" "github.com/cybozu-go/log" "github.com/cybozu-go/well" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -524,64 +523,6 @@ func (c rebootDequeueCommand) Command() cke.Command { } } -// - -type rebootRecalcMetricsOp struct { - finished bool -} - -// RebootRecalcMetricsOp returns an Operator to racalc metrics. -func RebootRecalcMetricsOp() cke.Operator { - return &rebootRecalcMetricsOp{} -} - -func (o *rebootRecalcMetricsOp) Name() string { - return "reboot-recalc-metrics" -} - -func (o *rebootRecalcMetricsOp) NextCommand() cke.Commander { - if o.finished { - return nil - } - - o.finished = true - return rebootRecalcMetricsCommand{} -} - -func (o *rebootRecalcMetricsOp) Targets() []string { - return []string{} -} - -type rebootRecalcMetricsCommand struct { -} - -func (c rebootRecalcMetricsCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { - rqEntries, err := inf.Storage().GetRebootsEntries(ctx) - if err != nil { - return err - } - cluster, err := inf.Storage().GetCluster(ctx) - if err != nil { - return err - } - - metrics.UpdateRebootQueueEntries(len(rqEntries)) - itemCounts := cke.CountRebootQueueEntries(rqEntries) - metrics.UpdateRebootQueueItems(itemCounts) - nodeStatus := cke.BuildNodeRebootStatus(cluster.Nodes, rqEntries) - metrics.UpdateNodeRebootStatus(nodeStatus) - - return nil -} - -func (c rebootRecalcMetricsCommand) Command() cke.Command { - return cke.Command{ - Name: "rebootRecalcMetricsCommand", - } -} - -// - func listProtectedNamespaces(ctx context.Context, cs *kubernetes.Clientset, ls *metav1.LabelSelector) (map[string]bool, error) { selector, err := metav1.LabelSelectorAsSelector(ls) if err != nil { diff --git a/pkg/cke/main.go b/pkg/cke/main.go index a572de4d1..a0d97edf8 100644 --- a/pkg/cke/main.go +++ b/pkg/cke/main.go @@ -126,7 +126,8 @@ func main() { // API server mux := http.NewServeMux() // Metrics - collector := metrics.NewCollector(etcd) + storage := &cke.Storage{Client: etcd} + collector := metrics.NewCollector(storage) metricsHandler := metrics.GetHandler(collector) mux.Handle("/metrics", metricsHandler) // REST API diff --git a/server/control.go b/server/control.go index e216134b3..c76e52292 100644 --- a/server/control.go +++ b/server/control.go @@ -314,9 +314,6 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t if err != nil { return err } - metrics.UpdateRebootQueueEntries(len(rqEntries)) - itemCounts := cke.CountRebootQueueEntries(rqEntries) - metrics.UpdateRebootQueueItems(itemCounts) rqEntries = cke.DedupRebootQueueEntries(rqEntries) if len(rqEntries) > 0 { diff --git a/server/strategy.go b/server/strategy.go index b7d2f1072..f6e8fc80a 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -719,7 +719,6 @@ func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOp } if len(ops) > 0 { phaseReboot = true - ops = append(ops, op.RebootRecalcMetricsOp()) } return ops, phaseReboot diff --git a/server/strategy_test.go b/server/strategy_test.go index e10dfe09d..6a517c4d3 100644 --- a/server/strategy_test.go +++ b/server/strategy_test.go @@ -1207,7 +1207,7 @@ func TestDecideOps(t *testing.T) { d.Status.Kubernetes.MasterEndpointSlice.Endpoints[2].Conditions.Ready = &endpointReady d.Status.Kubernetes.EtcdEndpointSlice.Endpoints[2].Conditions.Ready = &endpointReady }), - ExpectedOps: []opData{{"reboot-drain-start", 1}, {"reboot-recalc-metrics", 0}}, + ExpectedOps: []opData{{"reboot-drain-start", 1}}, }, { Name: "EndpointsWithCancelledRebootEntry", @@ -1224,7 +1224,7 @@ func TestDecideOps(t *testing.T) { Status: cke.RebootStatusCancelled, }, }), - ExpectedOps: []opData{{"reboot-dequeue", 1}, {"reboot-recalc-metrics", 0}}, + ExpectedOps: []opData{{"reboot-dequeue", 1}}, }, { Name: "UserResourceAdd", @@ -2065,7 +2065,6 @@ func TestDecideOps(t *testing.T) { }), ExpectedOps: []opData{ {"reboot-drain-start", 1}, - {"reboot-recalc-metrics", 0}, }, }, { @@ -2102,7 +2101,6 @@ func TestDecideOps(t *testing.T) { }), ExpectedOps: []opData{ {"reboot-reboot", 1}, - {"reboot-recalc-metrics", 0}, }, }, { @@ -2122,7 +2120,6 @@ func TestDecideOps(t *testing.T) { }), ExpectedOps: []opData{ {"reboot-drain-timeout", 1}, - {"reboot-recalc-metrics", 0}, }, }, { @@ -2142,7 +2139,6 @@ func TestDecideOps(t *testing.T) { }), ExpectedOps: []opData{ {"reboot-dequeue", 1}, - {"reboot-recalc-metrics", 0}, }, }, { @@ -2181,7 +2177,6 @@ func TestDecideOps(t *testing.T) { }), ExpectedOps: []opData{ {"reboot-dequeue", 1}, - {"reboot-recalc-metrics", 0}, }, }, }