Skip to content

Commit b7c8ad4

Browse files
authoredApr 9, 2024··
Merge pull request #16 from cgalibern/dev
[worker] push daemon status
2 parents a2afdb9 + 82f535b commit b7c8ad4

12 files changed

+730
-55
lines changed
 

‎conf.go

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ func initConfig() error {
3737
viper.SetDefault("redis.address", "localhost:6379")
3838
viper.SetDefault("redis.password", "")
3939
viper.SetDefault("feeder.tx", true)
40+
viper.SetDefault("websocket.key", "magix123")
41+
viper.SetDefault("websocket.url", "http://127.0.0.1:8889")
4042

4143
// config file
4244
viper.SetConfigName("config")

‎oc2websocket/main.go

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Package oc2websocket provides T that can publish events to
2+
// opensvc collector v2 websocket publisher.
3+
package oc2websocket
4+
5+
import (
6+
"crypto/hmac"
7+
"crypto/md5"
8+
"encoding/hex"
9+
"encoding/json"
10+
"io"
11+
"net/http"
12+
"net/url"
13+
14+
"github.com/google/uuid"
15+
)
16+
17+
type (
18+
event struct {
19+
UUID uuid.UUID `json:"uuid"`
20+
Data []any `json:"data"`
21+
}
22+
23+
T struct {
24+
// URL is the url of opensvc collector v2 websocket publisher
25+
Url string
26+
// Key is the sign key for pushed messages
27+
Key []byte
28+
}
29+
)
30+
31+
func (s *T) pub(e *event) error {
32+
b, err := json.Marshal(e)
33+
if err != nil {
34+
return err
35+
}
36+
h := hmac.New(md5.New, s.Key)
37+
if _, err := h.Write(b); err != nil {
38+
return err
39+
}
40+
sum := h.Sum(nil)
41+
signature := hex.EncodeToString(sum)
42+
43+
params := url.Values{}
44+
params.Add("message", string(b))
45+
params.Add("signature", signature)
46+
params.Add("group", "generic")
47+
resp, err := http.PostForm(s.Url, params)
48+
if err != nil {
49+
return err
50+
}
51+
defer func() {
52+
_ = resp.Body.Close()
53+
}()
54+
55+
if _, err := io.ReadAll(resp.Body); err != nil {
56+
return err
57+
}
58+
return nil
59+
}
60+
61+
// EventPublish publish a new event to opensvc collector v2 websocket publisher
62+
func (s *T) EventPublish(evName string, data map[string]any) error {
63+
if data == nil {
64+
data = make(map[string]any)
65+
}
66+
data["event"] = evName
67+
data["version"] = "3.0.0"
68+
ev := &event{Data: []any{data}}
69+
ev.UUID, _ = uuid.NewUUID()
70+
return s.pub(ev)
71+
}

‎work.go

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"github.com/spf13/viper"
55

6+
"github.com/opensvc/oc3/oc2websocket"
67
"github.com/opensvc/oc3/worker"
78
)
89

@@ -16,6 +17,10 @@ func work(queues []string) error {
1617
DB: db,
1718
Queues: queues,
1819
WithTx: viper.GetBool("feeder.tx"),
20+
Ev: &oc2websocket.T{
21+
Url: viper.GetString("websocket.url"),
22+
Key: []byte(viper.GetString("websocket.key")),
23+
},
1924
}
2025
if err != nil {
2126
return err

‎worker/daemon_data_v2.go

+17-11
Original file line numberDiff line numberDiff line change
@@ -130,17 +130,17 @@ func (d *daemonDataV2) objectStatus(objectName string) *DBObjStatus {
130130
if i, ok := mapTo(d.data, "services", objectName); ok && i != nil {
131131
if o, ok := i.(map[string]any); ok {
132132
oStatus := &DBObjStatus{
133-
availStatus: "n/a",
134-
status: "n/a",
135-
placement: "n/a",
136-
frozen: "n/a",
137-
provisioned: "n/a",
133+
availStatus: "n/a",
134+
overallStatus: "n/a",
135+
placement: "n/a",
136+
frozen: "n/a",
137+
provisioned: "n/a",
138138
}
139139
if s, ok := o["avail"].(string); ok {
140140
oStatus.availStatus = s
141141
}
142142
if s, ok := o["overall"].(string); ok {
143-
oStatus.status = s
143+
oStatus.overallStatus = s
144144
}
145145
if s, ok := o["placement"].(string); ok {
146146
oStatus.placement = s
@@ -162,7 +162,7 @@ func (d *daemonDataV2) objectStatus(objectName string) *DBObjStatus {
162162
}
163163

164164
func mapToA(m map[string]any, defaultValue any, k ...string) any {
165-
if v, ok := mapTo(m, k...); ok {
165+
if v, ok := mapTo(m, k...); ok && v != nil {
166166
return v
167167
} else {
168168
return defaultValue
@@ -182,11 +182,11 @@ func mapToBoolS(m map[string]any, defaultValue bool, k ...string) string {
182182
}
183183
}
184184

185-
func mapToS(m map[string]any, defaultValue any, k ...string) string {
185+
func mapToS(m map[string]any, defaultValue string, k ...string) string {
186186
return mapToA(m, defaultValue, k...).(string)
187187
}
188188

189-
func mapToMap(m map[string]any, defaultValue any, k ...string) map[string]any {
189+
func mapToMap(m map[string]any, defaultValue map[string]any, k ...string) map[string]any {
190190
return mapToA(m, defaultValue, k...).(map[string]any)
191191
}
192192

@@ -213,9 +213,15 @@ func (d *daemonDataV2) InstanceStatus(objectName string, nodename string) *insta
213213
instanceStatus.encap = mapToMap(a, nilMap, "encap")
214214
instanceStatus.resources = mapToMap(a, nilMap, "resources")
215215

216-
switch mapToA(a, 0, "frozen").(type) {
216+
switch v := mapToA(a, 0, "frozen").(type) {
217217
case int:
218-
instanceStatus.monFrozen = 0
218+
if v > 0 {
219+
instanceStatus.monFrozen = 1
220+
}
221+
case float64:
222+
if v > 0 {
223+
instanceStatus.monFrozen = 1
224+
}
219225
default:
220226
instanceStatus.monFrozen = 1
221227
}

‎worker/daemon_status.go

+145-36
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import (
44
"context"
55
"database/sql"
66
"encoding/json"
7+
"errors"
78
"fmt"
89
"log/slog"
10+
"slices"
911
"strings"
1012
"time"
1113

@@ -25,10 +27,13 @@ type (
2527
}
2628

2729
DBObject struct {
28-
svcname string
29-
svcID string
30-
clusterID string
31-
availStatus string
30+
svcname string
31+
svcID string
32+
clusterID string
33+
34+
DBObjStatus
35+
36+
env string
3237
}
3338

3439
DBInstance struct {
@@ -72,6 +77,7 @@ type (
7277
redis *redis.Client
7378
db DBOperater
7479
oDb *opensvcDB
80+
ev EventPublisher
7581

7682
nodeID string
7783
clusterID string
@@ -80,8 +86,9 @@ type (
8086
nodeEnv string
8187
callerNode *DBNode
8288

83-
changes map[string]struct{}
84-
rawData []byte
89+
changes map[string]struct{}
90+
rawChanges string
91+
rawData []byte
8592

8693
data dataProvider
8794

@@ -97,17 +104,31 @@ type (
97104
byInstanceName map[string]*DBInstance
98105
byInstanceID map[string]*DBInstance
99106
}
107+
108+
InstanceID struct {
109+
nodeID string
110+
svcID string
111+
}
100112
)
101113

114+
func (n *DBNode) String() string {
115+
return fmt.Sprintf("node: {nodename: %s, node_id: %s, cluster_id: %s, app: %s}", n.nodename, n.nodeID, n.clusterID, n.app)
116+
}
117+
118+
func (i *InstanceID) String() string {
119+
return fmt.Sprintf("instance id: %s@%s", i.svcID, i.nodeID)
120+
}
121+
102122
func (t *Worker) handleDaemonStatus(nodeID string) error {
103123
defer logDurationInfo(fmt.Sprintf("handleDaemonStatus %s with tx %v", nodeID, t.WithTx), time.Now())
104-
slog.Info(fmt.Sprintf("handleDaemonStatus node_id: %s", nodeID))
124+
slog.Info(fmt.Sprintf("handleDaemonStatus starting for node_id %s", nodeID))
105125
ctx := context.Background()
106126

107127
d := daemonStatus{
108128
ctx: ctx,
109129
redis: t.Redis,
110130
nodeID: nodeID,
131+
ev: t.Ev,
111132

112133
changes: make(map[string]struct{}),
113134

@@ -156,6 +177,9 @@ func (t *Worker) handleDaemonStatus(nodeID string) error {
156177
d.dbFindInstance,
157178
d.dbUpdateServices,
158179
d.dbUpdateInstance,
180+
d.dbPurgeInstance,
181+
d.dbPurgeService,
182+
d.pushFromTableChanges,
159183
)
160184
if err != nil {
161185
if tx, ok := d.db.(DBTxer); ok {
@@ -171,8 +195,7 @@ func (t *Worker) handleDaemonStatus(nodeID string) error {
171195
return fmt.Errorf("handleDaemonStatus commit: %w", err)
172196
}
173197
}
174-
slog.Info(fmt.Sprintf("handleDaemonStatus done: node_id: %s cluster_id: %s, cluster_name: %s",
175-
d.nodeID, d.clusterID, d.clusterName))
198+
slog.Info(fmt.Sprintf("handleDaemonStatus done for %s", d.byNodeID[d.nodeID]))
176199
for k, v := range d.byNodename {
177200
slog.Debug(fmt.Sprintf("found db node %s: %#v", k, v))
178201
}
@@ -213,6 +236,7 @@ func (d *daemonStatus) getChanges() error {
213236
} else if err != redis.Nil {
214237
return fmt.Errorf("getChanges: HGET %s %s: %w", cache.KeyDaemonStatusChangesHash, d.nodeID, err)
215238
}
239+
d.rawChanges = s
216240
for _, change := range strings.Fields(s) {
217241
d.changes[change] = struct{}{}
218242
}
@@ -347,6 +371,7 @@ func (d *daemonStatus) dbFindNodes() error {
347371
for nodename := range d.byNodename {
348372
d.nodes = append(d.nodes, nodename)
349373
}
374+
slog.Info(fmt.Sprintf("handleDaemonStatus run details: %s changes: [%s]", callerNode, d.rawChanges))
350375
return nil
351376
}
352377

@@ -373,7 +398,7 @@ func (d *daemonStatus) dataToNodeFrozen() error {
373398
func (d *daemonStatus) dbFindServices() error {
374399
defer logDuration("dbFindServices", time.Now())
375400
const queryFindServicesInfo = "" +
376-
"SELECT svcname, svc_id, cluster_id, svc_availstatus" +
401+
"SELECT svcname, svc_id, cluster_id, svc_availstatus, svc_env, svc_status, svc_placement, svc_provisioned" +
377402
" FROM services" +
378403
" WHERE cluster_id = ? AND svcname IN (?"
379404
objectNames, err := d.data.objectNames()
@@ -405,12 +430,13 @@ func (d *daemonStatus) dbFindServices() error {
405430
}
406431
defer func() { _ = rows.Close() }()
407432
for rows.Next() {
408-
var o DBObject
409-
if err := rows.Scan(&o.svcname, &o.svcID, &o.clusterID, &o.availStatus); err != nil {
433+
o := DBObject{DBObjStatus: DBObjStatus{}}
434+
if err := rows.Scan(&o.svcname, &o.svcID, &o.clusterID, &o.availStatus, &o.env, &o.overallStatus, &o.placement, &o.provisioned); err != nil {
410435
return fmt.Errorf("dbFindServices scan %s: %w", d.nodeID, err)
411436
}
412437
d.byObjectName[o.svcname] = &o
413438
d.byObjectID[o.svcID] = &o
439+
slog.Debug(fmt.Sprintf("dbFindServices %s (%s)", o.svcname, o.svcID))
414440
}
415441
if err := rows.Err(); err != nil {
416442
return fmt.Errorf("dbFindServices FindClusterNodesInfo %s: %w", d.nodeID, err)
@@ -449,14 +475,16 @@ func (d *daemonStatus) dbFindInstance() error {
449475
for rows.Next() {
450476
var o DBInstance
451477
if err := rows.Scan(&o.svcID, &o.nodeID, &o.Frozen); err != nil {
452-
return fmt.Errorf("dbFindServices scan %s: %w", d.nodeID, err)
478+
return fmt.Errorf("dbFindInstance scan %s: %w", d.nodeID, err)
453479
}
454480
if n, ok := d.byNodeID[o.nodeID]; ok {
455481
// Only pickup instances from known nodes
456482
if s, ok := d.byObjectID[o.svcID]; ok {
457483
// Only pickup instances from known objects
458484
d.byInstanceName[s.svcname+"@"+n.nodename] = &o
459485
d.byInstanceID[s.svcID+"@"+n.nodeID] = &o
486+
slog.Debug(fmt.Sprintf("dbFindInstance found %s@%s (%s@%s)",
487+
s.svcname, n.nodename, s.svcID, n.nodeID))
460488
}
461489
}
462490
}
@@ -521,6 +549,11 @@ func (d *daemonStatus) dbUpdateServices() error {
521549
if err := d.oDb.updateObjectStatus(d.ctx, objectID, oStatus); err != nil {
522550
return fmt.Errorf("dbUpdateServices can't update object %s %s: %w", objectName, objectID, err)
523551
}
552+
if d.byObjectID[objectID].availStatus != oStatus.availStatus {
553+
slog.Debug(fmt.Sprintf("dbUpdateServices %s avail status %s -> %s", objectName, d.byObjectID[objectID].availStatus, oStatus.availStatus))
554+
}
555+
// refresh local cache
556+
d.byObjectID[objectID].DBObjStatus = *oStatus
524557
}
525558
}
526559
}
@@ -572,19 +605,21 @@ func (d *daemonStatus) dbUpdateInstance() error {
572605
return fmt.Errorf("dbUpdateInstance delete resources %s@%s: %w", objID, nodeID, err)
573606
}
574607
} else {
575-
if err := d.instanceStatusUpdate(objID, nodeID, iStatus); err != nil {
576-
return fmt.Errorf("dbUpdateInstance update status %s@%s (%s@%s): %w", objID, nodeID, objectName, nodename, err)
608+
// set iStatus svcID and nodeID for db update
609+
iStatus.svcID = objID
610+
iStatus.nodeID = nodeID
611+
if err := d.instanceStatusUpdate(objectName, nodename, iStatus); err != nil {
612+
return fmt.Errorf("dbUpdateInstance update status %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err)
577613
}
578614
resourceObsoleteAt := time.Now()
579-
if err := d.instanceResourceUpdate(objID, nodeID, iStatus); err != nil {
580-
return fmt.Errorf("dbUpdateInstance update resource %s@%s (%s@%s): %w", objID, nodeID, objectName, nodename, err)
615+
if err := d.instanceResourceUpdate(objectName, nodename, iStatus); err != nil {
616+
return fmt.Errorf("dbUpdateInstance update resource %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err)
581617
}
582618
slog.Debug(fmt.Sprintf("dbUpdateInstance deleting obsolete resources %s@%s", objectName, nodename))
583619
if err := d.oDb.instanceResourcesDeleteObsolete(d.ctx, objID, nodeID, resourceObsoleteAt); err != nil {
584620
return fmt.Errorf("dbUpdateInstance delete obsolete resources %s@%s: %w", objID, nodeID, err)
585621
}
586622
}
587-
// TODO: update update_dash: service_frozen, service_not_on_primary, svcmon_not_updated
588623
} else {
589624
if iStatus.resources == nil {
590625
// scaler or wrapper, for example
@@ -609,54 +644,128 @@ func (d *daemonStatus) dbUpdateInstance() error {
609644
}
610645
}
611646
// TODO: update_container_node_fields
612-
slog.Debug(fmt.Sprintf("dbUpdateInstance skip encap update %s@%s", objectName, nodename))
613647
}
648+
if err := d.oDb.dashboardInstanceFrozenUpdate(d.ctx, objID, nodeID, obj.env, iStatus.monFrozen > 0); err != nil {
649+
return fmt.Errorf("dbUpdateInstance update dashboard instance frozen %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err)
650+
}
651+
if err := d.oDb.dashboardDeleteInstanceNotUpdated(d.ctx, objID, nodeID); err != nil {
652+
return fmt.Errorf("dbUpdateInstance update dashboard instance not updated %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err)
653+
}
654+
// TODO: verify if we need a placement non optimal alert for object/instance
655+
// om2 has: monitor.services.'<path>'.placement = non-optimal
656+
// om3 has: cluster.object.<path>.placement_state = non-optimal
657+
// cluster.node.<node>.instance.<path>.monitor.is_ha_leader
658+
// cluster.node.<node>.instance.<path>.monitor.is_leader
659+
// collector v2 calls update_dash_service_not_on_primary (broken since no DEFAULT.autostart_node values)
614660
}
615661
if len(instanceMonitorStates) == 1 && instanceMonitorStates["idle"] {
616-
// TODO: update dashboard service unavailable
617-
// TODO: update dashboard service_placement
618-
// TODO: update dashboard service_available_but_degraded
619-
// TODO: update dashboard flex_instances_started
620-
// TODO: update dashboardsflex_cpu)
662+
var remove bool
663+
664+
remove = slices.Contains([]string{"up", "n/a"}, obj.availStatus)
665+
if err := d.updateDashboardObject(obj, remove, NewDashboardObjectUnavailable); err != nil {
666+
return fmt.Errorf("dbUpdateInstance on %s (%s): %w", objID, objectName, err)
667+
}
668+
669+
remove = slices.Contains([]string{"optimal", "n/a"}, obj.placement)
670+
if err := d.updateDashboardObject(obj, remove, NewDashboardObjectPlacement); err != nil {
671+
return fmt.Errorf("dbUpdateInstance on %s (%s): %w", objID, objectName, err)
672+
}
673+
674+
remove = slices.Contains([]string{"up", "n/a"}, obj.availStatus) && slices.Contains([]string{"up", "n/a"}, obj.overallStatus)
675+
if err := d.updateDashboardObject(obj, remove, NewDashboardObjectDegraded); err != nil {
676+
return fmt.Errorf("dbUpdateInstance on %s (%s): %w", objID, objectName, err)
677+
}
678+
// TODO: update_dash_flex_instances_started
679+
// TODO: update_dash_flex_cpu
621680
}
622681
}
623682

624-
// TODO: purge deleted data for instance (svcmon, dashboard, dashboard_events, svcdisks, resmon, checks_live,
625-
// comp_status, action_queue, resinfo, saves)
626-
//
627-
// TODO: purge deleted data for service (services, svcactions, drpservices, svcmon_log, resmon_log, svcmon_log_ack,
628-
// checks_settings, comp_log, comp_log_daily, comp_rulesets_services, comp_modulesets_services, log,
629-
// action_queue, svc_tags, form_output_results, svcmon_log_last, resmon_log_last)
630683
return nil
631684
}
632685

633-
func (d *daemonStatus) instanceResourceUpdate(objID string, nodeID string, iStatus *instanceStatus) error {
686+
func (d *daemonStatus) instanceResourceUpdate(objName string, nodename string, iStatus *instanceStatus) error {
634687
for _, res := range iStatus.InstanceResources() {
635-
slog.Debug(fmt.Sprintf("updating instance resource %s@%s %s", objID, nodeID, res.rid))
688+
slog.Debug(fmt.Sprintf("updating instance resource %s@%s %s (%s@%s)", objName, nodename, res.rid, iStatus.svcID, iStatus.nodeID))
636689
if err := d.oDb.instanceResourceUpdate(d.ctx, res); err != nil {
637690
return fmt.Errorf("update resource %s: %w", res.rid, err)
638691
}
639-
slog.Debug(fmt.Sprintf("updating instance resource log %s@%s %s", objID, nodeID, res.rid))
692+
slog.Debug(fmt.Sprintf("updating instance resource log %s@%s %s (%s@%s)", objName, nodename, res.rid, iStatus.svcID, iStatus.nodeID))
640693
if err := d.oDb.instanceResourceLogUpdate(d.ctx, res); err != nil {
641694
return fmt.Errorf("update resource log %s: %w", res.rid, err)
642695
}
643696
}
644697
return nil
645698
}
646699

647-
func (d *daemonStatus) instanceStatusUpdate(objID string, nodeID string, iStatus *instanceStatus) error {
648-
slog.Debug(fmt.Sprintf("updating instance status %s@%s", objID, nodeID))
700+
func (d *daemonStatus) instanceStatusUpdate(objName string, nodename string, iStatus *instanceStatus) error {
701+
slog.Debug(fmt.Sprintf("updating instance status %s@%s (%s@%s)", objName, nodename, iStatus.svcID, iStatus.nodeID))
649702
if err := d.oDb.instanceStatusUpdate(d.ctx, &iStatus.DBInstanceStatus); err != nil {
650703
return fmt.Errorf("update instance status: %w", err)
651704
}
652-
slog.Debug(fmt.Sprintf("instanceStatusUpdate updating status log %s@%s", objID, nodeID))
705+
slog.Debug(fmt.Sprintf("instanceStatusUpdate updating status log %s@%s (%s@%s)", objName, nodename, iStatus.svcID, iStatus.nodeID))
653706
err := d.oDb.instanceStatusLogUpdate(d.ctx, &iStatus.DBInstanceStatus)
654707
if err != nil {
655708
return fmt.Errorf("update instance status log: %w", err)
656709
}
657710
return nil
658711
}
659712

713+
func (d *daemonStatus) dbPurgeInstance() error {
714+
defer logDuration("dbPurgeInstance", time.Now())
715+
var nodeIDs, objectNames []string
716+
for objectName := range d.byObjectName {
717+
objectNames = append(objectNames, objectName)
718+
}
719+
for nodeID := range d.byNodeID {
720+
nodeIDs = append(nodeIDs, nodeID)
721+
}
722+
instanceIDs, err := d.oDb.getOrphanInstances(d.ctx, nodeIDs, objectNames)
723+
if err != nil {
724+
return fmt.Errorf("dbPurgeInstance: getOrphanInstances: %w", err)
725+
}
726+
for _, instanceID := range instanceIDs {
727+
if err1 := d.oDb.purgeInstances(d.ctx, instanceID); err1 != nil {
728+
err = errors.Join(err, fmt.Errorf("purge instance %v: %w", instanceID, err1))
729+
}
730+
}
731+
if err != nil {
732+
return fmt.Errorf("dbPurgeInstance: %w", err)
733+
}
734+
return nil
735+
}
736+
737+
func (d *daemonStatus) dbPurgeService() error {
738+
defer logDuration("dbPurgeService", time.Now())
739+
objectIDs, err := d.oDb.objectIDWithPurgeTag(d.ctx, d.clusterID)
740+
if err != nil {
741+
err = fmt.Errorf("dbPurgeService: objectIDWithPurgeTag: %w", err)
742+
return err
743+
}
744+
for _, objectID := range objectIDs {
745+
if err1 := d.oDb.purgeObject(d.ctx, objectID); err1 != nil {
746+
err = errors.Join(err, fmt.Errorf("purge object %s: %w", objectID, err1))
747+
}
748+
}
749+
if err != nil {
750+
return fmt.Errorf("dbPurgeService: %w", err)
751+
}
752+
return nil
753+
}
754+
755+
func (d *daemonStatus) pushFromTableChanges() error {
756+
defer logDuration("pushFromTableChanges", time.Now())
757+
for _, tableName := range d.oDb.tableChanges() {
758+
slog.Debug(fmt.Sprintf("pushFromTableChanges %s", tableName))
759+
if err := d.oDb.updateTableModified(d.ctx, tableName); err != nil {
760+
return fmt.Errorf("pushFromTableChanges: %w", err)
761+
}
762+
if err := d.ev.EventPublish(tableName+"_change", nil); err != nil {
763+
return fmt.Errorf("EventPublish send %s: %w", tableName, err)
764+
}
765+
}
766+
return nil
767+
}
768+
660769
func logDuration(s string, begin time.Time) {
661770
slog.Debug(fmt.Sprintf("STAT: %s elapse: %s", s, time.Now().Sub(begin)))
662771
}

‎worker/dashboard.go

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package worker
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
type (
9+
dashboarder interface {
10+
Type() string
11+
Fmt() string
12+
Dict() string
13+
Severity() int
14+
}
15+
16+
dashboarderCreate func(o *DBObject) dashboarder
17+
)
18+
19+
func (d *daemonStatus) updateDashboardObject(obj *DBObject, doDelete bool, f dashboarderCreate) error {
20+
objID := obj.svcID
21+
dash := f(obj)
22+
fmtErr := func(err error) error {
23+
if err != nil {
24+
return fmt.Errorf("updateDashboardObject '%s': %w", dash.Type(), err)
25+
}
26+
return nil
27+
}
28+
29+
if doDelete {
30+
return fmtErr(d.oDb.dashboardDeleteObjectWithType(d.ctx, objID, dash.Type()))
31+
}
32+
33+
inAckPeriod, err := d.oDb.ObjectInAckUnavailabilityPeriod(d.ctx, objID)
34+
if err != nil {
35+
return err
36+
}
37+
dashType := dash.Type()
38+
if inAckPeriod {
39+
return fmtErr(d.oDb.dashboardDeleteObjectWithType(d.ctx, objID, dashType))
40+
} else {
41+
now := time.Now()
42+
dash := Dashboard{
43+
ObjectID: objID,
44+
Type: dashType,
45+
Fmt: dash.Fmt(),
46+
Dict: dash.Dict(),
47+
Env: obj.env,
48+
Severity: dash.Severity(),
49+
Created: now,
50+
Updated: now,
51+
}
52+
return fmtErr(d.oDb.dashboardUpdateObject(d.ctx, &dash))
53+
}
54+
}

‎worker/dashboard_object_degraded.go

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package worker
2+
3+
import (
4+
"fmt"
5+
)
6+
7+
type (
8+
DashboardObjectDegraded struct {
9+
obj *DBObject
10+
}
11+
)
12+
13+
func NewDashboardObjectDegraded(o *DBObject) dashboarder {
14+
return &DashboardObjectDegraded{obj: o}
15+
}
16+
17+
func (d *DashboardObjectDegraded) Type() string {
18+
return "service available but degraded"
19+
}
20+
21+
func (d *DashboardObjectDegraded) Fmt() string {
22+
return fmt.Sprintf("current overall status: %s", d.obj.overallStatus)
23+
}
24+
25+
func (d *DashboardObjectDegraded) Dict() string {
26+
return fmt.Sprintf("{\"s\": \"%s\"}", d.obj.overallStatus)
27+
}
28+
29+
func (d *DashboardObjectDegraded) Severity() int {
30+
switch d.obj.env {
31+
case "PRD":
32+
return 3
33+
default:
34+
return 2
35+
}
36+
}

‎worker/dashboard_object_placement.go

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package worker
2+
3+
import "fmt"
4+
5+
type (
6+
DashboardObjectPlacement struct {
7+
obj *DBObject
8+
}
9+
)
10+
11+
func NewDashboardObjectPlacement(o *DBObject) dashboarder {
12+
return &DashboardObjectPlacement{obj: o}
13+
}
14+
15+
func (d *DashboardObjectPlacement) Type() string {
16+
return "service placement"
17+
}
18+
19+
func (d *DashboardObjectPlacement) Fmt() string {
20+
return fmt.Sprintf("%s", d.obj.placement)
21+
}
22+
23+
func (d *DashboardObjectPlacement) Dict() string {
24+
return fmt.Sprintf("{\"placement\": \"%s\"}", d.obj.placement)
25+
}
26+
27+
func (d *DashboardObjectPlacement) Severity() int {
28+
return 1
29+
}
+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package worker
2+
3+
import (
4+
"fmt"
5+
)
6+
7+
type (
8+
DashboardObjectUnavailable struct {
9+
obj *DBObject
10+
}
11+
)
12+
13+
func NewDashboardObjectUnavailable(o *DBObject) dashboarder {
14+
return &DashboardObjectUnavailable{obj: o}
15+
}
16+
17+
func (d *DashboardObjectUnavailable) Type() string {
18+
return "service unavailable"
19+
}
20+
21+
func (d *DashboardObjectUnavailable) Fmt() string {
22+
return fmt.Sprintf("current availability status: %s", d.obj.availStatus)
23+
}
24+
25+
func (d *DashboardObjectUnavailable) Dict() string {
26+
return fmt.Sprintf("{\"s\": \"%s\"}", d.obj.availStatus)
27+
}
28+
29+
func (d *DashboardObjectUnavailable) Severity() int {
30+
switch d.obj.env {
31+
case "PRD":
32+
return 4
33+
default:
34+
return 3
35+
}
36+
}

‎worker/db.go

+180-8
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,11 @@ type (
114114
}
115115

116116
DBObjStatus struct {
117-
availStatus string
118-
status string
119-
placement string
120-
frozen string
121-
provisioned string
117+
availStatus string
118+
overallStatus string
119+
placement string
120+
frozen string
121+
provisioned string
122122
}
123123

124124
// opensvcDB implements opensvc db functions
@@ -306,7 +306,7 @@ func (oDb *opensvcDB) updateObjectStatus(ctx context.Context, svcID string, o *D
306306
" , `svc_provisioned` = ?" +
307307
" , `svc_status_updated` = NOW()" +
308308
" WHERE `svc_id`= ? "
309-
if _, err := oDb.db.ExecContext(ctx, query, o.availStatus, o.status, o.placement, o.frozen, o.provisioned, svcID); err != nil {
309+
if _, err := oDb.db.ExecContext(ctx, query, o.availStatus, o.overallStatus, o.placement, o.frozen, o.provisioned, svcID); err != nil {
310310
return fmt.Errorf("can't update service status %s: %w", svcID, err)
311311
}
312312
oDb.tableChange("services")
@@ -500,9 +500,9 @@ func (oDb *opensvcDB) getAppFromNodeAndCandidateApp(ctx context.Context, candida
500500
}
501501

502502
func (oDb *opensvcDB) objectFromID(ctx context.Context, svcID string) (*DBObject, error) {
503-
const query = "SELECT svcname, svc_id, cluster_id, svc_availstatus FROM services WHERE svc_id = ?"
503+
const query = "SELECT svcname, svc_id, cluster_id, svc_availstatus, svc_status, svc_placement, svc_provisioned FROM services WHERE svc_id = ?"
504504
var o DBObject
505-
err := oDb.db.QueryRowContext(ctx, query, svcID).Scan(&o.svcname, &o.svcID, &o.clusterID, &o.availStatus)
505+
err := oDb.db.QueryRowContext(ctx, query, svcID).Scan(&o.svcname, &o.svcID, &o.clusterID, &o.availStatus, &o.overallStatus, &o.placement, &o.provisioned)
506506
switch {
507507
case errors.Is(err, sql.ErrNoRows):
508508
return nil, nil
@@ -936,3 +936,175 @@ func (oDb *opensvcDB) deleteByInstanceID(ctx context.Context, tableName string,
936936
}
937937
return nil
938938
}
939+
940+
// getOrphanInstances returns list of InstanceID defined on svcmon for nodeIDs but where
941+
// the associated services.svcname is not into objectNames
942+
//
943+
// SELECT `svcmon`.`svc_id`, `svcmon`.`node_id`
944+
// FROM `services`, `svcmon`
945+
// WHERE
946+
// `svcmon`.`node_id` IN ('nodeID1','nodeID2')
947+
// AND
948+
// `svcmon`.`svc_id` = `services`.`svc_id`
949+
// AND
950+
// `services`.`svcname` NOT IN ('obj1','obj2',...)
951+
func (oDb *opensvcDB) getOrphanInstances(ctx context.Context, nodeIDs, objectNames []string) (instanceIDs []InstanceID, err error) {
952+
var (
953+
query = "SELECT `svcmon`.`svc_id`, `svcmon`.`node_id` FROM `services`, `svcmon`"
954+
955+
queryArgs []any
956+
957+
rows *sql.Rows
958+
)
959+
960+
if len(nodeIDs) == 0 || len(objectNames) == 0 {
961+
return
962+
}
963+
query += " WHERE `svcmon`.`node_id` IN (?"
964+
queryArgs = append(queryArgs, nodeIDs[0])
965+
for i := 1; i < len(nodeIDs); i++ {
966+
query += ", ?"
967+
queryArgs = append(queryArgs, nodeIDs[i])
968+
}
969+
query += " ) AND `svcmon`.`svc_id` = `services`.`svc_id`"
970+
query += " AND `services`.`svcname` NOT IN ( ?"
971+
queryArgs = append(queryArgs, objectNames[0])
972+
for i := 1; i < len(objectNames); i++ {
973+
query += ", ?"
974+
queryArgs = append(queryArgs, objectNames[i])
975+
}
976+
query += " )"
977+
rows, err = oDb.db.QueryContext(ctx, query, queryArgs...)
978+
if err != nil {
979+
return
980+
}
981+
defer func() { _ = rows.Close() }()
982+
for rows.Next() {
983+
var instanceID InstanceID
984+
if err = rows.Scan(&instanceID.svcID, &instanceID.nodeID); err != nil {
985+
return
986+
}
987+
instanceIDs = append(instanceIDs, instanceID)
988+
}
989+
err = rows.Err()
990+
return
991+
}
992+
993+
func (oDb *opensvcDB) purgeInstances(ctx context.Context, id InstanceID) error {
994+
const (
995+
where = "WHERE `svc_id` = ? and `node_id` = ?"
996+
)
997+
998+
var (
999+
tables = []string{
1000+
"svcmon", "dashboard", "dashboard_events", "svcdisks", "resmon",
1001+
"checks_live", "comp_status", "action_queue", "resinfo", "saves",
1002+
}
1003+
1004+
err error
1005+
)
1006+
slog.Debug(fmt.Sprintf("purging instance %s", id))
1007+
for _, tableName := range tables {
1008+
request := fmt.Sprintf("DELETE FROM %s WHERE `svc_id` = ? and `node_id` = ?", tableName)
1009+
result, err1 := oDb.db.ExecContext(ctx, request, id.svcID, id.nodeID)
1010+
if err1 != nil {
1011+
err = errors.Join(err, fmt.Errorf("delete from %s: %w", tableName, err1))
1012+
continue
1013+
}
1014+
if rowAffected, err1 := result.RowsAffected(); err1 != nil {
1015+
err = errors.Join(err, fmt.Errorf("count delete from %s: %w", tableName, err1))
1016+
} else if rowAffected > 0 {
1017+
slog.Debug(fmt.Sprintf("purged table %s instance %s", tableName, id))
1018+
oDb.tableChange(tableName)
1019+
}
1020+
}
1021+
return err
1022+
}
1023+
1024+
func (oDb *opensvcDB) objectIDWithPurgeTag(ctx context.Context, clusterID string) (objectIDs []string, err error) {
1025+
const (
1026+
query = "" +
1027+
"SELECT `svc_tags`.`svc_id`" +
1028+
" FROM `tags`, `services`, `svc_tags`" +
1029+
" LEFT JOIN `svcmon` ON `svc_tags`.`svc_id` = `svcmon`.`svc_id`" +
1030+
" WHERE" +
1031+
" `services`.`svc_id`=`svc_tags`.`svc_id`" +
1032+
" AND `services`.`cluster_id` = ?" +
1033+
" AND `tags`.`tag_id` = `svc_tags`.`tag_id`" +
1034+
" AND `tags`.`tag_name` = '@purge'" +
1035+
" AND `svcmon`.`id` IS NULL"
1036+
)
1037+
var (
1038+
rows *sql.Rows
1039+
)
1040+
rows, err = oDb.db.QueryContext(ctx, query, clusterID)
1041+
if err != nil {
1042+
return
1043+
}
1044+
defer func() { _ = rows.Close() }()
1045+
for rows.Next() {
1046+
var ID string
1047+
if err = rows.Scan(&ID); err != nil {
1048+
return
1049+
}
1050+
objectIDs = append(objectIDs, ID)
1051+
}
1052+
err = rows.Err()
1053+
return
1054+
}
1055+
1056+
func (oDb *opensvcDB) purgeObject(ctx context.Context, id string) error {
1057+
const (
1058+
where = "WHERE `svc_id` = ?"
1059+
)
1060+
1061+
var (
1062+
tables = []string{
1063+
"services", "svcactions", "drpservices", "svcmon_log", "resmon_log",
1064+
"svcmon_log_ack", "checks_settings", "comp_log", "comp_log_daily",
1065+
"comp_rulesets_services", "comp_modulesets_services", "log",
1066+
"action_queue", "svc_tags", "form_output_results", "svcmon_log_last",
1067+
"resmon_log_last",
1068+
}
1069+
1070+
err error
1071+
)
1072+
slog.Debug(fmt.Sprintf("purging object %s", id))
1073+
for _, tableName := range tables {
1074+
request := fmt.Sprintf("DELETE FROM %s WHERE `svc_id` = ?", tableName)
1075+
result, err1 := oDb.db.ExecContext(ctx, request, id)
1076+
if err1 != nil {
1077+
err = errors.Join(err, fmt.Errorf("delete from %s: %w", tableName, err1))
1078+
continue
1079+
}
1080+
if rowAffected, err1 := result.RowsAffected(); err1 != nil {
1081+
err = errors.Join(err, fmt.Errorf("count delete from %s: %w", tableName, err1))
1082+
} else if rowAffected > 0 {
1083+
slog.Debug(fmt.Sprintf("purged table %s object %s", tableName, id))
1084+
oDb.tableChange(tableName)
1085+
}
1086+
}
1087+
return err
1088+
}
1089+
1090+
func (oDb *opensvcDB) tableChanges() []string {
1091+
var r []string
1092+
for s := range oDb.tChanges {
1093+
r = append(r, s)
1094+
}
1095+
return r
1096+
}
1097+
1098+
func (oDb *opensvcDB) updateTableModified(ctx context.Context, tableName string) error {
1099+
defer logDuration("updateTableModified", time.Now())
1100+
const (
1101+
query = "" +
1102+
"INSERT INTO `table_modified` VALUES (NULL, ?, NOW())" +
1103+
" ON DUPLICATE KEY UPDATE `table_modified` = NOW()"
1104+
)
1105+
_, err := oDb.db.ExecContext(ctx, query, tableName)
1106+
if err != nil {
1107+
return fmt.Errorf("updateTableModified %s: %w", tableName, err)
1108+
}
1109+
return nil
1110+
}

‎worker/db_dashboard.go

+150
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package worker
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"time"
8+
)
9+
10+
type (
11+
Dashboard struct {
12+
ID int64
13+
ObjectID string
14+
NodeID string
15+
Type string
16+
Fmt string
17+
Dict string
18+
Severity int
19+
Env string
20+
Instance string
21+
Created time.Time
22+
Updated time.Time
23+
}
24+
)
25+
26+
// dashboardInstanceFrozenUpdate update or remove the "service frozen" alerts for instance
27+
func (oDb *opensvcDB) dashboardInstanceFrozenUpdate(ctx context.Context, objectID, nodeID string, objectEnv string, frozen bool) error {
28+
defer logDuration("dashboardInstanceFrozenUpdate", time.Now())
29+
const (
30+
queryThawed = `
31+
DELETE FROM dashboard
32+
WHERE
33+
dash_type = 'service frozen'
34+
AND svc_id = ?
35+
AND node_id = ?
36+
`
37+
queryFrozen = `
38+
INSERT INTO dashboard
39+
SET
40+
dash_type = 'service frozen', svc_id = ?, node_id = ?,
41+
dash_severity = 1, dash_fmt='', dash_dict='',
42+
dash_created = NOW(), dash_updated = NOW(), dash_env = ?
43+
ON DUPLICATE KEY UPDATE
44+
dash_severity = 1, dash_fmt = '', dash_dict = '',
45+
dash_updated = NOW(), dash_env = ?`
46+
)
47+
var (
48+
err error
49+
result sql.Result
50+
)
51+
switch frozen {
52+
case true:
53+
result, err = oDb.db.ExecContext(ctx, queryFrozen, objectID, nodeID, objectEnv, objectEnv)
54+
if err != nil {
55+
return fmt.Errorf("update dashboard 'service frozen' for %s@%s: %w", objectID, nodeID, err)
56+
}
57+
case false:
58+
result, err = oDb.db.ExecContext(ctx, queryThawed, objectID, nodeID)
59+
if err != nil {
60+
return fmt.Errorf("delete dashboard 'service frozen' for %s@%s: %w", objectID, nodeID, err)
61+
}
62+
}
63+
if count, err := result.RowsAffected(); err != nil {
64+
return fmt.Errorf("count dashboard 'service frozen' for %s@%s: %w", objectID, nodeID, err)
65+
} else if count > 0 {
66+
oDb.tableChange("dashboard")
67+
}
68+
return nil
69+
}
70+
71+
// dashboardDeleteInstanceNotUpdated delete "instance status not updated" alerts.
72+
func (oDb *opensvcDB) dashboardDeleteInstanceNotUpdated(ctx context.Context, objectID, nodeID string) error {
73+
defer logDuration("dashboardDeleteInstanceNotUpdated", time.Now())
74+
const (
75+
query = `DELETE FROM dashboard WHERE svc_id = ? AND node_id = ? AND dash_type = 'instance status not updated'`
76+
)
77+
if result, err := oDb.db.ExecContext(ctx, query, objectID, nodeID); err != nil {
78+
return err
79+
} else if count, err := result.RowsAffected(); err != nil {
80+
return err
81+
} else if count > 0 {
82+
oDb.tableChange("dashboard")
83+
}
84+
return nil
85+
}
86+
87+
// dashboardDeleteObjectWithType delete from dashboard where svc_id and dash_type match
88+
func (oDb *opensvcDB) dashboardDeleteObjectWithType(ctx context.Context, objectID, dashType string) error {
89+
defer logDuration("dashboardDeleteObjectWithType: "+dashType, time.Now())
90+
const (
91+
query = `DELETE FROM dashboard WHERE svc_id = ? AND dash_type = ?`
92+
)
93+
if result, err := oDb.db.ExecContext(ctx, query, objectID, dashType); err != nil {
94+
return fmt.Errorf("dashboardDeleteObjectWithType %s: %w", dashType, err)
95+
} else if count, err := result.RowsAffected(); err != nil {
96+
return fmt.Errorf("dashboardDeleteObjectWithType %s: %w", dashType, err)
97+
} else if count > 0 {
98+
oDb.tableChange("dashboard")
99+
}
100+
return nil
101+
}
102+
103+
// ObjectInAckUnavailabilityPeriod returns true if objectID is in acknowledge unavailability period.
104+
func (oDb *opensvcDB) ObjectInAckUnavailabilityPeriod(ctx context.Context, objectID string) (ok bool, err error) {
105+
defer logDuration("ObjectInAckUnavailabilityPeriod", time.Now())
106+
const (
107+
query = `SELECT COUNT(*) FROM svcmon_log_ack WHERE svc_id = ? AND mon_begin <= NOW() AND mon_end >= NOW()`
108+
)
109+
var count uint64
110+
err = oDb.db.QueryRowContext(ctx, query, objectID).Scan(&count)
111+
if err != nil {
112+
err = fmt.Errorf("ObjectInAckUnavailabilityPeriod: %w", err)
113+
}
114+
return count > 0, err
115+
}
116+
117+
// dashboardUpdateObject delete "service unavailable" alerts.
118+
func (oDb *opensvcDB) dashboardUpdateObject(ctx context.Context, d *Dashboard) error {
119+
defer logDuration("dashboardUpdateObject", time.Now())
120+
const (
121+
query = `INSERT INTO dashboard
122+
SET
123+
svc_id = ?,
124+
dash_type = ?,
125+
dash_fmt = ?,
126+
dash_severity = ?,
127+
dash_dict = ?,
128+
dash_created = NOW(),
129+
dash_updated = NOW(),
130+
dash_env = ?
131+
ON DUPLICATE KEY UPDATE
132+
dash_fmt = ?,
133+
dash_severity = ?,
134+
dash_dict = ?,
135+
dash_updated = NOW(),
136+
dash_env = ?
137+
`
138+
)
139+
result, err := oDb.db.ExecContext(ctx, query,
140+
d.ObjectID, d.Type, d.Fmt, d.Severity, d.Dict, d.Env,
141+
d.Fmt, d.Severity, d.Dict, d.Env)
142+
if err != nil {
143+
return fmt.Errorf("dashboardUpdateObject: %w", err)
144+
} else if count, err := result.RowsAffected(); err != nil {
145+
return fmt.Errorf("dashboardUpdateObject: %w", err)
146+
} else if count > 0 {
147+
oDb.tableChange("dashboard")
148+
}
149+
return nil
150+
}

‎worker/worker.go

+5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ type (
1919
DB *sql.DB
2020
Queues []string
2121
WithTx bool
22+
Ev EventPublisher
23+
}
24+
25+
EventPublisher interface {
26+
EventPublish(eventName string, data map[string]any) error
2227
}
2328
)
2429

0 commit comments

Comments
 (0)
Please sign in to comment.