Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job_daemon_system fixes #27

Merged
merged 8 commits into from
May 14, 2024
79 changes: 73 additions & 6 deletions mariadb/insert_or_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"log/slog"
"slices"
"strings"
)
Expand All @@ -15,6 +16,9 @@ type (
Mappings Mappings
Data any

// Log enable query logging
Log bool

names []string
placeholders []string
updates []string
Expand All @@ -25,20 +29,81 @@ type (
func (t *InsertOrUpdate) load() error {
switch v := t.Data.(type) {
case map[string]any:
return t.loadLines([]any{v})
return t.loadMap(v)
case []any:
return t.loadLines(v)
return t.loadSlice(v)
default:
return fmt.Errorf("unsupported data format")
}
}

func (t *InsertOrUpdate) loadLines(data []any) error {
func (t *InsertOrUpdate) loadMap(data map[string]any) error {
var placeholders []string

if len(data) == 0 {
return nil
}

for _, mapping := range t.Mappings {
if mapping.To == "" {
return fmt.Errorf("invalid mapping definition (To is empty): %#v", mapping)
}
var key string
var value any
if mapping.From != "" {
key = mapping.From
} else if mapping.To != "" {
key = mapping.To
} else {
return fmt.Errorf("unsupported mapping definition: %#v", mapping)
}

if v, ok := data[key]; !ok {
if mapping.Optional {
continue
} else {
return fmt.Errorf("key '%s' not found", key)
}
} else {
t.names = append(t.names, mapping.To)
if !slices.Contains(t.Keys, mapping.To) {
t.updates = append(t.updates, fmt.Sprintf("%s = VALUES(%s)", mapping.To, mapping.To))
}
value = v
}

if mapping.Get != nil {
if v, err := mapping.Get(value); err != nil {
return err
} else {
value = v
}
}
if mapping.Modify != nil {
if placeholder, values, err := mapping.Modify(value); err != nil {
return err
} else {
placeholders = append(placeholders, placeholder)
t.values = append(t.values, values...)
}
} else {
placeholders = append(placeholders, "?")
t.values = append(t.values, value)
}
}
t.placeholders = append(t.placeholders, fmt.Sprintf("(%s)", strings.Join(placeholders, ", ")))
return nil
}

func (t *InsertOrUpdate) loadSlice(data []any) error {
if len(data) == 0 {
return nil
}

for _, mapping := range t.Mappings {
if mapping.To == "" {
return fmt.Errorf("invalid mapping definition (To is empty): %#v", mapping)
}
t.names = append(t.names, mapping.To)
if !slices.Contains(t.Keys, mapping.To) {
t.updates = append(t.updates, fmt.Sprintf("%s = VALUES(%s)", mapping.To, mapping.To))
Expand Down Expand Up @@ -100,9 +165,11 @@ func (t *InsertOrUpdate) QueryContext(ctx context.Context, db *sql.DB) (*sql.Row
if len(t.values) == 0 {
return nil, nil
}
sql := t.SQL()
//slog.Debug(fmt.Sprint(sql, t.values))
return db.QueryContext(ctx, sql, t.values...)
query := t.SQL()
if t.Log {
slog.Info(fmt.Sprintf("InsertOrUpdate.QueryContext table: %s SQL: %s VALUES:%#v", t.Table, query, t.values))
}
return db.QueryContext(ctx, query, t.values...)
}

func (t *InsertOrUpdate) SQL() string {
Expand Down
68 changes: 56 additions & 12 deletions mariadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mariadb
import (
"fmt"
"strings"
"time"
)

type (
Expand All @@ -22,22 +23,65 @@ type (

// Modify modifies the placeholder and value (ex: datetimes rfc change)
Modify func(v any) (string, []any, error)

// Optional may be set to true to ignore missing key during load data map
// during InsertOrUpdate.QueryContext calls. It has no effect when loaded data is not
// a map[string] any.
Optional bool
}
)

func ModifyDatetime(v any) (placeholder string, values []any, err error) {
s := fmt.Sprint(v)
if i := strings.Index(s, "+"); i > 0 {
placeholder = "CONVERT_TZ(?, ?, \"SYSTEM\")"
values = append(values, s[:i], s[i:])
// ModifyDatetime returns placeholder for time.Time like objects
func ModifyDatetime(a any) (placeholder string, values []any, err error) {
switch v := a.(type) {
case string:
// TODO: use default time.Parse instead to append time.Time value ?
s := fmt.Sprint(v)
if len(s) < 11 {
// 2024-04-02
placeholder = "?"
values = append(values, s)
return
}
if i := strings.LastIndex(s, "+"); i > 0 {
placeholder = "CONVERT_TZ(?, ?, \"SYSTEM\")"
values = append(values, s[:i], s[i:])
return
}
if i := strings.LastIndex(s, "-"); i > 0 {
placeholder = "CONVERT_TZ(?, ?, \"SYSTEM\")"
values = append(values, s[:i], s[i:])
return
}
placeholder = "?"
values = append(values, s)
return
}
if i := strings.Index(s, "-"); i > 0 {
placeholder = "CONVERT_TZ(?, ?, \"SYSTEM\")"
values = append(values, s[:i], s[i:])
case time.Time:
placeholder = "?"
values = append(values, v)
return
default:
err = fmt.Errorf("ModifyDatetime can't analyse %v", a)
return
}
placeholder = "?"
values = append(values, s)
return
}

func ModifierMaxLen(maxLen int) func(a any) (placeholder string, values []any, err error) {
return func(a any) (placeholder string, values []any, err error) {
switch v := a.(type) {
case string:
var value string
if len(v) > maxLen {
value = v[:maxLen]
} else {
value = v
}
placeholder = "?"
values = append(values, value)
return
default:
err = fmt.Errorf("ModifyStringLen can't analyse %v", a)
return
}
}
}
17 changes: 9 additions & 8 deletions mariadb/now.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@ import (
"context"
"database/sql"
"fmt"
"time"
)

func Now(ctx context.Context, db *sql.DB) (string, error) {
func Now(ctx context.Context, db *sql.DB) (time.Time, error) {
var t time.Time
rows, err := db.QueryContext(ctx, "SELECT NOW()")
if err != nil {
return "", err
return t, err
}
if rows == nil {
return "", fmt.Errorf("no result rows for SELECT NOW()")
return t, fmt.Errorf("no result rows for SELECT NOW()")
}
defer rows.Close()
if !rows.Next() {
return "", fmt.Errorf("no result rows next for SELECT NOW()")
return t, fmt.Errorf("no result rows next for SELECT NOW()")
}
var s string
if err := rows.Scan(&s); err != nil {
return "", err
if err := rows.Scan(&t); err != nil {
return t, err
}
return s, nil
return t, nil
}
44 changes: 22 additions & 22 deletions worker/job_daemon_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/opensvc/oc3/mariadb"
)

func (t *Worker) handleSystemTargets(ctx context.Context, nodeID string, i any, now string) error {
func (t *Worker) handleSystemTargets(ctx context.Context, nodeID string, i any, now time.Time) error {
data, ok := i.([]any)
if !ok {
slog.Warn("unsupported system targets data format")
Expand Down Expand Up @@ -55,7 +55,7 @@ func (t *Worker) handleSystemTargets(ctx context.Context, nodeID string, i any,
return nil
}

func (t *Worker) handleSystemHBA(ctx context.Context, nodeID string, i any, now string) error {
func (t *Worker) handleSystemHBA(ctx context.Context, nodeID string, i any, now time.Time) error {
data, ok := i.([]any)
if !ok {
slog.Warn("unsupported system hba data format")
Expand Down Expand Up @@ -97,7 +97,7 @@ func (t *Worker) handleSystemHBA(ctx context.Context, nodeID string, i any, now
return nil
}

func (t *Worker) handleSystemLAN(ctx context.Context, nodeID string, i any, now string) error {
func (t *Worker) handleSystemLAN(ctx context.Context, nodeID string, i any, now time.Time) error {
var l []any
data, ok := i.(map[string]any)
if !ok {
Expand Down Expand Up @@ -152,7 +152,7 @@ func (t *Worker) handleSystemLAN(ctx context.Context, nodeID string, i any, now
return nil
}

func (t *Worker) handleSystemGroups(ctx context.Context, nodeID string, i any, now string) error {
func (t *Worker) handleSystemGroups(ctx context.Context, nodeID string, i any, now time.Time) error {
data, ok := i.([]any)
if !ok {
slog.Warn("unsupported system groups data format")
Expand Down Expand Up @@ -195,7 +195,7 @@ func (t *Worker) handleSystemGroups(ctx context.Context, nodeID string, i any, n
return nil
}

func (t *Worker) handleSystemUsers(ctx context.Context, nodeID string, i any, now string) error {
func (t *Worker) handleSystemUsers(ctx context.Context, nodeID string, i any, now time.Time) error {
data, ok := i.([]any)
if !ok {
slog.Warn("unsupported system users data format")
Expand Down Expand Up @@ -238,7 +238,7 @@ func (t *Worker) handleSystemUsers(ctx context.Context, nodeID string, i any, no
return nil
}

func (t *Worker) handleSystemHardware(ctx context.Context, nodeID string, i any, now string) error {
func (t *Worker) handleSystemHardware(ctx context.Context, nodeID string, i any, now time.Time) error {
data, ok := i.([]any)
if !ok {
slog.Warn("unsupported system hardware data format")
Expand All @@ -263,7 +263,7 @@ func (t *Worker) handleSystemHardware(ctx context.Context, nodeID string, i any,
mariadb.Mapping{To: "hw_type", From: "type"},
mariadb.Mapping{To: "hw_path", From: "path"},
mariadb.Mapping{To: "hw_class", From: "class"},
mariadb.Mapping{To: "hw_description", From: "description"},
mariadb.Mapping{To: "hw_description", From: "description", Modify: mariadb.ModifierMaxLen(128)},
mariadb.Mapping{To: "hw_driver", From: "driver"},
mariadb.Mapping{To: "updated"},
},
Expand All @@ -284,7 +284,7 @@ func (t *Worker) handleSystemHardware(ctx context.Context, nodeID string, i any,
return nil
}

func (t *Worker) handleSystemProperties(ctx context.Context, nodeID string, i any, now string) error {
func (t *Worker) handleSystemProperties(ctx context.Context, nodeID string, i any, now time.Time) error {
data, ok := i.(map[string]any)
if !ok {
slog.Warn("unsupported system properties format")
Expand All @@ -309,10 +309,10 @@ func (t *Worker) handleSystemProperties(ctx context.Context, nodeID string, i an
request := mariadb.InsertOrUpdate{
Table: "nodes",
Mappings: mariadb.Mappings{
mariadb.Mapping{To: "asset_env", Get: get},
mariadb.Mapping{To: "asset_env", Get: get, Optional: true},
mariadb.Mapping{To: "bios_version", Get: get},
mariadb.Mapping{To: "cluster_id", Get: get},
mariadb.Mapping{To: "connect_to", Get: get},
mariadb.Mapping{To: "connect_to", Get: get, Optional: true},
mariadb.Mapping{To: "cpu_cores", Get: get},
mariadb.Mapping{To: "cpu_dies", Get: get},
mariadb.Mapping{To: "cpu_freq", Get: get},
Expand All @@ -322,14 +322,14 @@ func (t *Worker) handleSystemProperties(ctx context.Context, nodeID string, i an
mariadb.Mapping{To: "fqdn", Get: get},
mariadb.Mapping{To: "last_boot", Get: get, Modify: mariadb.ModifyDatetime},
mariadb.Mapping{To: "listener_port", Get: get},
mariadb.Mapping{To: "loc_addr", Get: get},
mariadb.Mapping{To: "loc_building", Get: get},
mariadb.Mapping{To: "loc_city", Get: get},
mariadb.Mapping{To: "loc_country", Get: get},
mariadb.Mapping{To: "loc_floor", Get: get},
mariadb.Mapping{To: "loc_rack", Get: get},
mariadb.Mapping{To: "loc_room", Get: get},
mariadb.Mapping{To: "loc_zip", Get: get},
mariadb.Mapping{To: "loc_addr", Get: get, Optional: true},
mariadb.Mapping{To: "loc_building", Get: get, Optional: true},
mariadb.Mapping{To: "loc_city", Get: get, Optional: true},
mariadb.Mapping{To: "loc_country", Get: get, Optional: true},
mariadb.Mapping{To: "loc_floor", Get: get, Optional: true},
mariadb.Mapping{To: "loc_rack", Get: get, Optional: true},
mariadb.Mapping{To: "loc_room", Get: get, Optional: true},
mariadb.Mapping{To: "loc_zip", Get: get, Optional: true},
mariadb.Mapping{To: "manufacturer", Get: get},
mariadb.Mapping{To: "mem_banks", Get: get},
mariadb.Mapping{To: "mem_bytes", Get: get},
Expand All @@ -342,14 +342,14 @@ func (t *Worker) handleSystemProperties(ctx context.Context, nodeID string, i an
mariadb.Mapping{To: "os_kernel", Get: get},
mariadb.Mapping{To: "os_name", Get: get},
mariadb.Mapping{To: "os_vendor", Get: get},
mariadb.Mapping{To: "sec_zone", Get: get},
mariadb.Mapping{To: "sec_zone", Get: get, Optional: true},
mariadb.Mapping{To: "serial", Get: get},
mariadb.Mapping{To: "sp_version", Get: get},
mariadb.Mapping{To: "team_integ", Get: get},
mariadb.Mapping{To: "team_support", Get: get},
mariadb.Mapping{To: "team_integ", Get: get, Optional: true},
mariadb.Mapping{To: "team_support", Get: get, Optional: true},
mariadb.Mapping{To: "tz", Get: get},
mariadb.Mapping{To: "updated", Get: get},
mariadb.Mapping{To: "version", Get: get},
mariadb.Mapping{To: "version", Get: get, Modify: mariadb.ModifierMaxLen(20)},
},
Keys: []string{"node_id"},
Data: data,
Expand Down
Loading