diff --git a/cmd.go b/cmd.go index 196f77c..00e096a 100644 --- a/cmd.go +++ b/cmd.go @@ -1,16 +1,25 @@ package main import ( + "log/slog" "path/filepath" "github.com/spf13/cobra" ) +var ( + debug bool +) + func newCmd(args []string) *cobra.Command { root := &cobra.Command{ Use: filepath.Base(args[0]), Short: "Manage the opensvc collector infrastructure components.", PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + if debug { + slog.SetLogLoggerLevel(slog.LevelDebug) + } + logConfigDir() if err := initConfig(); err != nil { return err @@ -20,6 +29,8 @@ func newCmd(args []string) *cobra.Command { }, } + root.PersistentFlags().BoolVar(&debug, "debug", false, "set log level to debug") + root.AddCommand( &cobra.Command{ Use: "api", diff --git a/go.mod b/go.mod index 1738d19..9f56e73 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/opensvc/oc3 -go 1.21 +go 1.22 require ( github.com/allenai/go-swaggerui v0.1.0 @@ -10,6 +10,7 @@ require ( github.com/labstack/echo/v4 v4.11.4 github.com/oapi-codegen/runtime v1.1.1 github.com/shaj13/go-guardian/v2 v2.11.5 + github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.18.2 ) @@ -39,7 +40,6 @@ require ( github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect - github.com/spf13/cobra v1.8.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect diff --git a/worker/daemon_status.go b/worker/daemon_status.go index 85b074c..c5c3de4 100644 --- a/worker/daemon_status.go +++ b/worker/daemon_status.go @@ -27,6 +27,12 @@ type ( clusterID string } + DBInstance struct { + svcID string + nodeID string + Frozen uint + } + dataLister interface { objectNames() ([]string, error) nodeNames() ([]string, error) @@ -67,23 +73,41 @@ type ( byObjectID map[string]*DBObject tableChange map[string]struct{} + + byInstanceName map[string]*DBInstance + byInstanceID map[string]*DBInstance } ) func (t *Worker) handleDaemonStatus(nodeID string) error { d := daemonStatus{ - ctx: context.Background(), - redis: t.Redis, - db: t.DB, - nodeID: nodeID, - byNodename: make(map[string]*DBNode), - byNodeID: make(map[string]*DBNode), + ctx: context.Background(), + redis: t.Redis, + db: t.DB, + nodeID: nodeID, + + byNodename: make(map[string]*DBNode), + byNodeID: make(map[string]*DBNode), + byObjectID: make(map[string]*DBObject), byObjectName: make(map[string]*DBObject), - tableChange: make(map[string]struct{}), + + byInstanceID: make(map[string]*DBInstance), + byInstanceName: make(map[string]*DBInstance), + + tableChange: make(map[string]struct{}), } - functions := []func() error{ - d.dropPending, + chain := func(f ...func() error) error { + for _, f := range f { + err := f() + if err != nil { + return err + } + } + return nil + } + + err := chain(d.dropPending, d.getChanges, d.getData, d.dbCheckClusterIDForNodeID, @@ -91,17 +115,24 @@ func (t *Worker) handleDaemonStatus(nodeID string) error { d.dbFindNodes, d.dataToNodeFrozen, d.dbFindServices, + d.dbFindInstance, + ) + if err != nil { + return err } - for _, f := range functions { - err := f() - if err != nil { - return err - } - } + slog.Info(fmt.Sprintf("handleDaemonStatus done: node_id: %s cluster_id: %s, cluster_name: %s changes: %s, byNodes: %#v", d.nodeID, d.clusterID, d.clusterName, d.changes, d.byNodename)) for k, v := range d.byNodename { - slog.Info(fmt.Sprintf("found node %s: %#v", k, v)) + slog.Debug(fmt.Sprintf("found db node %s: %#v", k, v)) + } + + for k, v := range d.byObjectID { + slog.Debug(fmt.Sprintf("found db object %s: %#v", k, v)) + } + + for k, v := range d.byInstanceName { + slog.Debug(fmt.Sprintf("found db instance %s: %#v", k, v)) } return nil } @@ -115,7 +146,19 @@ func (d *daemonStatus) dropPending() error { func (d *daemonStatus) getChanges() error { s, err := d.redis.HGet(d.ctx, cache.KeyDaemonStatusChangesHash, d.nodeID).Result() - if err != nil { + if err == nil { + // TODO: fix possible race: + // worker iteration 1: pickup changes 'a' + // listener: read previous changes 'a' + // listener: merge b => set changes from 'a' to 'a', 'b' + // listener: ask for new worker iteration 2 + // worker iteration 1: delete changes the 'b' => 'b' change is lost + // worker iteration 1: ... done + // worker iteration 2: pickup changes: empty instead of expected 'b' + if err := d.redis.HDel(d.ctx, cache.KeyDaemonStatusChangesHash, d.nodeID).Err(); err != nil { + return fmt.Errorf("getChanges: HDEL %s %s: %w", cache.KeyDaemonStatusChangesHash, d.nodeID, err) + } + } else { return fmt.Errorf("getChanges: HGET %s %s: %w", cache.KeyDaemonStatusChangesHash, d.nodeID, err) } d.changes = strings.Fields(s) @@ -306,3 +349,50 @@ func (d *daemonStatus) dbFindServices() error { } return nil } + +func (d *daemonStatus) dbFindInstance() error { + const querySelect = "" + + "SELECT svc_id, node_id, mon_frozen" + + " FROM svcmon" + + " WHERE svc_id IN (?" + + values := []any{} + for svcID := range d.byObjectID { + values = append(values, svcID) + } + if len(values) == 0 { + return nil + } + query := querySelect + for i := 1; i < len(values); i++ { + query += ", ?" + } + query += ")" + + rows, err := d.db.QueryContext(d.ctx, query, values...) + if err != nil { + return fmt.Errorf("dbFindInstance query svcIDs: [%s]: %w", values, err) + } + if rows == nil { + return fmt.Errorf("dbFindInstance query returns nil rows") + } + defer func() { _ = rows.Close() }() + for rows.Next() { + var o DBInstance + if err := rows.Scan(&o.svcID, &o.nodeID, &o.Frozen); err != nil { + return fmt.Errorf("dbFindServices scan %s: %w", d.nodeID, err) + } + if n, ok := d.byNodeID[o.nodeID]; ok { + // Only pickup instances from known nodes + if s, ok := d.byObjectID[o.svcID]; ok { + // Only pickup instances from known objects + d.byInstanceName[s.svcname+"@"+n.nodename] = &o + d.byInstanceID[s.svcID+"@"+n.nodeID] = &o + } + } + } + if err := rows.Err(); err != nil { + return fmt.Errorf("dbFindInstance query rows: %w", err) + } + return nil +} diff --git a/worker/worker.go b/worker/worker.go index 81b4652..03b3da7 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -35,7 +35,8 @@ func (t *Worker) Run() error { time.Sleep(time.Second) continue } - slog.Info(fmt.Sprintf("BLPOP %s -> %s", result[0], result[1])) + begin := time.Now() + slog.Debug(fmt.Sprintf("BLPOP %s -> %s", result[0], result[1])) switch result[0] { case cache.KeySystem: err = t.handleSystem(result[1]) @@ -47,6 +48,7 @@ func (t *Worker) Run() error { if err != nil { slog.Error(err.Error()) } + slog.Debug(fmt.Sprintf("BLPOP %s <- %s: %s", result[0], result[1], time.Now().Sub(begin))) } return nil }