Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
zoetrope committed Sep 7, 2023
1 parent c798e29 commit fb07a25
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 127 deletions.
65 changes: 63 additions & 2 deletions metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ type metricGroup struct {
// This abstraction is for mock test.
type storage interface {
IsSabakanDisabled(context.Context) (bool, error)
GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error)
GetCluster(ctx context.Context) (*cke.Cluster, error)
}

// NewCollector returns a new prometheus.Collector.
func NewCollector(client *v3.Client) prometheus.Collector {

storage := &cke.Storage{Client: client}
return &collector{
metrics: map[string]metricGroup{
"leader": {
Expand All @@ -55,15 +59,15 @@ func NewCollector(client *v3.Client) prometheus.Collector {
isAvailable: isOperationPhaseAvailable,
},
"reboot": {
collectors: []prometheus.Collector{rebootQueueEntries, rebootQueueItems, nodeRebootStatus},
collectors: []prometheus.Collector{nodeMetricsCollector{storage}},
isAvailable: isRebootAvailable,
},
"sabakan_integration": {
collectors: []prometheus.Collector{sabakanIntegrationSuccessful, sabakanIntegrationTimestampSeconds, sabakanWorkers, sabakanUnusedMachines},
isAvailable: isSabakanIntegrationAvailable,
},
},
storage: &cke.Storage{Client: client},
storage: storage,
}
}

Expand Down Expand Up @@ -120,3 +124,60 @@ func (c collector) Collect(ch chan<- prometheus.Metric) {
}
wg.Wait()
}

type nodeMetricsCollector struct {
storage storage
}

var _ prometheus.Collector = &nodeMetricsCollector{}

func (c nodeMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- rebootQueueEntries
ch <- rebootQueueItems
ch <- nodeRebootStatus
}

func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

rqEntries, err := c.storage.GetRebootsEntries(ctx)
if err != nil {
return
}
cluster, err := c.storage.GetCluster(ctx)
if err != nil {
return
}
itemCounts := cke.CountRebootQueueEntries(rqEntries)
nodeStatus := cke.BuildNodeRebootStatus(cluster.Nodes, rqEntries)

ch <- prometheus.MustNewConstMetric(
rebootQueueEntries,
prometheus.GaugeValue,
float64(len(rqEntries)),
)
for status, count := range itemCounts {
ch <- prometheus.MustNewConstMetric(
rebootQueueItems,
prometheus.GaugeValue,
float64(count),
status,
)
}
for node, statuses := range nodeStatus {
for status, matches := range statuses {
value := float64(0)
if matches {
value = 1
}
ch <- prometheus.MustNewConstMetric(
nodeRebootStatus,
prometheus.GaugeValue,
value,
node,
status,
)
}
}
}
32 changes: 14 additions & 18 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,25 @@ var operationPhaseTimestampSeconds = prometheus.NewGauge(
},
)

var rebootQueueEntries = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "reboot_queue_entries",
Help: "The number of reboot queue entries remaining.",
},
var rebootQueueEntries = prometheus.NewDesc(
"reboot_queue_entries",
"The number of reboot queue entries remaining.",
nil,
nil,
)

var rebootQueueItems = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "reboot_queue_items",
Help: "The number of reboot queue entries remaining per status.",
},
var rebootQueueItems = prometheus.NewDesc(
"reboot_queue_items",
"The number of reboot queue entries remaining per status.",
[]string{"status"},
nil,
)

var nodeRebootStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "node_reboot_status",
Help: "The reboot status of a node.",
}, []string{"node", "status"},
var nodeRebootStatus = prometheus.NewDesc(
"node_reboot_status",
"The reboot status of a node.",
[]string{"node", "status"},
nil,
)

var sabakanIntegrationSuccessful = prometheus.NewGauge(
Expand Down
30 changes: 0 additions & 30 deletions metrics/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,36 +39,6 @@ func isOperationPhaseAvailable(_ context.Context, _ storage) (bool, error) {
return isLeader, nil
}

// UpdateRebootQueueEntries updates "reboot_queue_entries".
func UpdateRebootQueueEntries(numEntries int) {
rebootQueueEntries.Set(float64(numEntries))
}

// UpdateRebootQueueItems updates "reboot_queue_items".
func UpdateRebootQueueItems(counts map[string]int) {
for status, count := range counts {
rebootQueueItems.With(map[string]string{
"status": status,
}).Set(float64(count))
}
}

// UpdateNodeRebootStatus updates "node_reboot_status".
func UpdateNodeRebootStatus(nodeStatus map[string]map[string]bool) {
for node, statuses := range nodeStatus {
for status, matches := range statuses {
value := float64(0)
if matches {
value = 1
}
nodeRebootStatus.With(map[string]string{
"node": node,
"status": status,
}).Set(value)
}
}
}

func isRebootAvailable(_ context.Context, _ storage) (bool, error) {
return isLeader, nil
}
Expand Down
14 changes: 8 additions & 6 deletions metrics/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,6 @@ func testUpdateRebootQueueEntries(t *testing.T) {
collector, _ := newTestCollector()
handler := GetHandler(collector)

UpdateRebootQueueEntries(tt.input)

w := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/metrics", nil)
handler.ServeHTTP(w, req)
Expand Down Expand Up @@ -343,8 +341,6 @@ func testUpdateRebootQueueItems(t *testing.T) {
collector, _ := newTestCollector()
handler := GetHandler(collector)

UpdateRebootQueueItems(tt.input)

w := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/metrics", nil)
handler.ServeHTTP(w, req)
Expand Down Expand Up @@ -396,8 +392,6 @@ func testUpdateNodeRebootStatus(t *testing.T) {
collector, _ := newTestCollector()
handler := GetHandler(collector)

UpdateNodeRebootStatus(input)

w := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/metrics", nil)
handler.ServeHTTP(w, req)
Expand Down Expand Up @@ -597,6 +591,14 @@ func (s *testStorage) IsSabakanDisabled(_ context.Context) (bool, error) {
return !s.sabakanEnabled, nil
}

func (s *testStorage) GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error) {
return nil, nil
}

func (s *testStorage) GetCluster(ctx context.Context) (*cke.Cluster, error) {
return nil, nil
}

func labelToMap(labelPair []*dto.LabelPair) map[string]string {
res := make(map[string]string)
for _, l := range labelPair {
Expand Down
1 change: 0 additions & 1 deletion mtest/reboot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func testRebootOperations() {
// - RebootDrainTimeoutOp
// - RebootUncordonOp
// - RebootDequeueOp
// - RebootRecalcMetricsOp

cluster := getCluster()
for i := 0; i < 3; i++ {
Expand Down
59 changes: 0 additions & 59 deletions op/reboot.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/cybozu-go/cke"
"github.com/cybozu-go/cke/metrics"
"github.com/cybozu-go/log"
"github.com/cybozu-go/well"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -524,64 +523,6 @@ func (c rebootDequeueCommand) Command() cke.Command {
}
}

//

type rebootRecalcMetricsOp struct {
finished bool
}

// RebootRecalcMetricsOp returns an Operator to racalc metrics.
func RebootRecalcMetricsOp() cke.Operator {
return &rebootRecalcMetricsOp{}
}

func (o *rebootRecalcMetricsOp) Name() string {
return "reboot-recalc-metrics"
}

func (o *rebootRecalcMetricsOp) NextCommand() cke.Commander {
if o.finished {
return nil
}

o.finished = true
return rebootRecalcMetricsCommand{}
}

func (o *rebootRecalcMetricsOp) Targets() []string {
return []string{}
}

type rebootRecalcMetricsCommand struct {
}

func (c rebootRecalcMetricsCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error {
rqEntries, err := inf.Storage().GetRebootsEntries(ctx)
if err != nil {
return err
}
cluster, err := inf.Storage().GetCluster(ctx)
if err != nil {
return err
}

metrics.UpdateRebootQueueEntries(len(rqEntries))
itemCounts := cke.CountRebootQueueEntries(rqEntries)
metrics.UpdateRebootQueueItems(itemCounts)
nodeStatus := cke.BuildNodeRebootStatus(cluster.Nodes, rqEntries)
metrics.UpdateNodeRebootStatus(nodeStatus)

return nil
}

func (c rebootRecalcMetricsCommand) Command() cke.Command {
return cke.Command{
Name: "rebootRecalcMetricsCommand",
}
}

//

func listProtectedNamespaces(ctx context.Context, cs *kubernetes.Clientset, ls *metav1.LabelSelector) (map[string]bool, error) {
selector, err := metav1.LabelSelectorAsSelector(ls)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions sabakan/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func MachineToNode(m *Machine, tmpl *cke.Node) *cke.Node {
n.Labels["cke.cybozu.com/rack"] = strconv.Itoa(m.Spec.Rack)
n.Labels["cke.cybozu.com/index-in-rack"] = strconv.Itoa(m.Spec.IndexInRack)
n.Labels["cke.cybozu.com/role"] = m.Spec.Role
n.Labels["cke.cybozu.com/retire-date"] = m.Spec.RetireDate.Format("2006-01")
n.Labels["cke.cybozu.com/register-date"] = m.Spec.RegisterDate.Format("2006-01")
n.Labels["node-role.kubernetes.io/"+m.Spec.Role] = "true"
if n.ControlPlane {
n.Labels["node-role.kubernetes.io/master"] = "true"
Expand Down
3 changes: 0 additions & 3 deletions server/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,6 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t
if err != nil {
return err
}
metrics.UpdateRebootQueueEntries(len(rqEntries))
itemCounts := cke.CountRebootQueueEntries(rqEntries)
metrics.UpdateRebootQueueItems(itemCounts)
rqEntries = cke.DedupRebootQueueEntries(rqEntries)

if len(rqEntries) > 0 {
Expand Down
1 change: 0 additions & 1 deletion server/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,6 @@ func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOp
}
if len(ops) > 0 {
phaseReboot = true
ops = append(ops, op.RebootRecalcMetricsOp())
}

return ops, phaseReboot
Expand Down
9 changes: 2 additions & 7 deletions server/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ func TestDecideOps(t *testing.T) {
d.Status.Kubernetes.MasterEndpointSlice.Endpoints[2].Conditions.Ready = &endpointReady
d.Status.Kubernetes.EtcdEndpointSlice.Endpoints[2].Conditions.Ready = &endpointReady
}),
ExpectedOps: []opData{{"reboot-drain-start", 1}, {"reboot-recalc-metrics", 0}},
ExpectedOps: []opData{{"reboot-drain-start", 1}},
},
{
Name: "EndpointsWithCancelledRebootEntry",
Expand All @@ -1224,7 +1224,7 @@ func TestDecideOps(t *testing.T) {
Status: cke.RebootStatusCancelled,
},
}),
ExpectedOps: []opData{{"reboot-dequeue", 1}, {"reboot-recalc-metrics", 0}},
ExpectedOps: []opData{{"reboot-dequeue", 1}},
},
{
Name: "UserResourceAdd",
Expand Down Expand Up @@ -2065,7 +2065,6 @@ func TestDecideOps(t *testing.T) {
}),
ExpectedOps: []opData{
{"reboot-drain-start", 1},
{"reboot-recalc-metrics", 0},
},
},
{
Expand Down Expand Up @@ -2102,7 +2101,6 @@ func TestDecideOps(t *testing.T) {
}),
ExpectedOps: []opData{
{"reboot-reboot", 1},
{"reboot-recalc-metrics", 0},
},
},
{
Expand All @@ -2122,7 +2120,6 @@ func TestDecideOps(t *testing.T) {
}),
ExpectedOps: []opData{
{"reboot-drain-timeout", 1},
{"reboot-recalc-metrics", 0},
},
},
{
Expand All @@ -2142,7 +2139,6 @@ func TestDecideOps(t *testing.T) {
}),
ExpectedOps: []opData{
{"reboot-dequeue", 1},
{"reboot-recalc-metrics", 0},
},
},
{
Expand Down Expand Up @@ -2181,7 +2177,6 @@ func TestDecideOps(t *testing.T) {
}),
ExpectedOps: []opData{
{"reboot-dequeue", 1},
{"reboot-recalc-metrics", 0},
},
},
}
Expand Down

0 comments on commit fb07a25

Please sign in to comment.