Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package main

import (
"log/slog"
"path/filepath"

"github.com/spf13/cobra"
)

var (
debug bool
)

func newCmd(args []string) *cobra.Command {
root := &cobra.Command{
Use: filepath.Base(args[0]),
Short: "Manage the opensvc collector infrastructure components.",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
if debug {
slog.SetLogLoggerLevel(slog.LevelDebug)
}

logConfigDir()
if err := initConfig(); err != nil {
return err
Expand All @@ -20,6 +29,8 @@ func newCmd(args []string) *cobra.Command {
},
}

root.PersistentFlags().BoolVar(&debug, "debug", false, "set log level to debug")

root.AddCommand(
&cobra.Command{
Use: "api",
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/opensvc/oc3

go 1.21
go 1.22

require (
github.com/allenai/go-swaggerui v0.1.0
Expand All @@ -10,6 +10,7 @@ require (
github.com/labstack/echo/v4 v4.11.4
github.com/oapi-codegen/runtime v1.1.1
github.com/shaj13/go-guardian/v2 v2.11.5
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.18.2
)

Expand Down Expand Up @@ -39,7 +40,6 @@ require (
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/cobra v1.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
Expand Down
124 changes: 107 additions & 17 deletions worker/daemon_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ type (
clusterID string
}

DBInstance struct {
svcID string
nodeID string
Frozen uint
}

dataLister interface {
objectNames() ([]string, error)
nodeNames() ([]string, error)
Expand Down Expand Up @@ -67,41 +73,66 @@ type (
byObjectID map[string]*DBObject

tableChange map[string]struct{}

byInstanceName map[string]*DBInstance
byInstanceID map[string]*DBInstance
}
)

func (t *Worker) handleDaemonStatus(nodeID string) error {
d := daemonStatus{
ctx: context.Background(),
redis: t.Redis,
db: t.DB,
nodeID: nodeID,
byNodename: make(map[string]*DBNode),
byNodeID: make(map[string]*DBNode),
ctx: context.Background(),
redis: t.Redis,
db: t.DB,
nodeID: nodeID,

byNodename: make(map[string]*DBNode),
byNodeID: make(map[string]*DBNode),

byObjectID: make(map[string]*DBObject),
byObjectName: make(map[string]*DBObject),
tableChange: make(map[string]struct{}),

byInstanceID: make(map[string]*DBInstance),
byInstanceName: make(map[string]*DBInstance),

tableChange: make(map[string]struct{}),
}
functions := []func() error{
d.dropPending,
chain := func(f ...func() error) error {
for _, f := range f {
err := f()
if err != nil {
return err
}
}
return nil
}

err := chain(d.dropPending,
d.getChanges,
d.getData,
d.dbCheckClusterIDForNodeID,
d.dbCheckClusters,
d.dbFindNodes,
d.dataToNodeFrozen,
d.dbFindServices,
d.dbFindInstance,
)
if err != nil {
return err
}
for _, f := range functions {
err := f()
if err != nil {
return err
}
}

slog.Info(fmt.Sprintf("handleDaemonStatus done: node_id: %s cluster_id: %s, cluster_name: %s changes: %s, byNodes: %#v",
d.nodeID, d.clusterID, d.clusterName, d.changes, d.byNodename))
for k, v := range d.byNodename {
slog.Info(fmt.Sprintf("found node %s: %#v", k, v))
slog.Debug(fmt.Sprintf("found db node %s: %#v", k, v))
}

for k, v := range d.byObjectID {
slog.Debug(fmt.Sprintf("found db object %s: %#v", k, v))
}

for k, v := range d.byInstanceName {
slog.Debug(fmt.Sprintf("found db instance %s: %#v", k, v))
}
return nil
}
Expand All @@ -115,7 +146,19 @@ func (d *daemonStatus) dropPending() error {

func (d *daemonStatus) getChanges() error {
s, err := d.redis.HGet(d.ctx, cache.KeyDaemonStatusChangesHash, d.nodeID).Result()
if err != nil {
if err == nil {
// TODO: fix possible race:
// worker iteration 1: pickup changes 'a'
// listener: read previous changes 'a'
// listener: merge b => set changes from 'a' to 'a', 'b'
// listener: ask for new worker iteration 2
// worker iteration 1: delete changes the 'b' => 'b' change is lost
// worker iteration 1: ... done
// worker iteration 2: pickup changes: empty instead of expected 'b'
if err := d.redis.HDel(d.ctx, cache.KeyDaemonStatusChangesHash, d.nodeID).Err(); err != nil {
return fmt.Errorf("getChanges: HDEL %s %s: %w", cache.KeyDaemonStatusChangesHash, d.nodeID, err)
}
} else {
return fmt.Errorf("getChanges: HGET %s %s: %w", cache.KeyDaemonStatusChangesHash, d.nodeID, err)
}
d.changes = strings.Fields(s)
Expand Down Expand Up @@ -306,3 +349,50 @@ func (d *daemonStatus) dbFindServices() error {
}
return nil
}

func (d *daemonStatus) dbFindInstance() error {
const querySelect = "" +
"SELECT svc_id, node_id, mon_frozen" +
" FROM svcmon" +
" WHERE svc_id IN (?"

values := []any{}
for svcID := range d.byObjectID {
values = append(values, svcID)
}
if len(values) == 0 {
return nil
}
query := querySelect
for i := 1; i < len(values); i++ {
query += ", ?"
}
query += ")"

rows, err := d.db.QueryContext(d.ctx, query, values...)
if err != nil {
return fmt.Errorf("dbFindInstance query svcIDs: [%s]: %w", values, err)
}
if rows == nil {
return fmt.Errorf("dbFindInstance query returns nil rows")
}
defer func() { _ = rows.Close() }()
for rows.Next() {
var o DBInstance
if err := rows.Scan(&o.svcID, &o.nodeID, &o.Frozen); err != nil {
return fmt.Errorf("dbFindServices scan %s: %w", d.nodeID, err)
}
if n, ok := d.byNodeID[o.nodeID]; ok {
// Only pickup instances from known nodes
if s, ok := d.byObjectID[o.svcID]; ok {
// Only pickup instances from known objects
d.byInstanceName[s.svcname+"@"+n.nodename] = &o
d.byInstanceID[s.svcID+"@"+n.nodeID] = &o
}
}
}
if err := rows.Err(); err != nil {
return fmt.Errorf("dbFindInstance query rows: %w", err)
}
return nil
}
4 changes: 3 additions & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func (t *Worker) Run() error {
time.Sleep(time.Second)
continue
}
slog.Info(fmt.Sprintf("BLPOP %s -> %s", result[0], result[1]))
begin := time.Now()
slog.Debug(fmt.Sprintf("BLPOP %s -> %s", result[0], result[1]))
switch result[0] {
case cache.KeySystem:
err = t.handleSystem(result[1])
Expand All @@ -47,6 +48,7 @@ func (t *Worker) Run() error {
if err != nil {
slog.Error(err.Error())
}
slog.Debug(fmt.Sprintf("BLPOP %s <- %s: %s", result[0], result[1], time.Now().Sub(begin)))
}
return nil
}