diff --git a/cache/main.go b/cache/main.go index 587b871..f8b2e97 100644 --- a/cache/main.go +++ b/cache/main.go @@ -3,9 +3,7 @@ package cache var ( clientKey = "redisClient" - KeySystem = "oc3:q:system" - KeySystemHash = "oc3:h:system" - KeyGeneric = "oc3:q:generic" + KeyGeneric = "oc3:q:generic" KeyDaemonPing = "oc3:q:daemon_ping" KeyDaemonPingPending = "oc3:h:daemon_ping_pending" @@ -13,6 +11,9 @@ var ( KeyDaemonStatusChangesHash = "oc3:h:daemon_status_changes" KeyDaemonStatus = "oc3:q:daemon_status" KeyDaemonStatusPending = "oc3:h:daemon_status_pending" + KeyDaemonSystem = "oc3:q:daemon_system" + KeyDaemonSystemHash = "oc3:h:daemon_system" + KeyDaemonSystemPending = "oc3:h:daemon_system_pending" KeyPackagesHash = "oc3:h:packages" KeyPackages = "oc3:q:packages" KeyPackagesPending = "oc3:h:packages_pending" diff --git a/handlers/post_daemon_system.go b/handlers/post_daemon_system.go index 9ac5934..8c0249e 100644 --- a/handlers/post_daemon_system.go +++ b/handlers/post_daemon_system.go @@ -3,18 +3,17 @@ package handlers import ( "fmt" "io" - "log/slog" "net/http" - "github.com/go-redis/redis/v8" "github.com/labstack/echo/v4" "github.com/opensvc/oc3/cache" ) func (a *Api) PostDaemonSystem(c echo.Context) error { - key := nodeIDFromContext(c) - if key == "" { + log := getLog(c) + nodeID := nodeIDFromContext(c) + if nodeID == "" { return JSONNodeAuthProblem(c) } @@ -25,33 +24,17 @@ func (a *Api) PostDaemonSystem(c echo.Context) error { reqCtx := c.Request().Context() - s := fmt.Sprintf("HSET %s %s", cache.KeySystemHash, key) - slog.Info(s) - if _, err := a.Redis.HSet(reqCtx, cache.KeySystemHash, key, string(b)).Result(); err != nil { + s := fmt.Sprintf("HSET %s %s", cache.KeyDaemonSystemHash, nodeID) + log.Info(s) + if _, err := a.Redis.HSet(reqCtx, cache.KeyDaemonSystemHash, nodeID, string(b)).Result(); err != nil { s = fmt.Sprintf("%s: %s", s, err) - slog.Error(s) + log.Error(s) return JSONProblem(c, http.StatusInternalServerError, "", s) } - s = fmt.Sprintf("LPOS %s %s", cache.KeySystem, key) - slog.Info(s) - if _, err := a.Redis.LPos(reqCtx, cache.KeySystem, key, redis.LPosArgs{}).Result(); err != nil { - switch err { - case nil: - case redis.Nil: - s = fmt.Sprintf("LPUSH %s %s", cache.KeySystem, key) - slog.Info(s) - if _, err := a.Redis.LPush(reqCtx, cache.KeySystem, key).Result(); err != nil { - s := fmt.Sprintf("%s: %s", s, err) - slog.Error(s) - return JSONProblemf(c, http.StatusInternalServerError, "", "%s", s) - } - default: - s := fmt.Sprintf("%s: %s", s, err) - slog.Error(s) - return JSONProblemf(c, http.StatusInternalServerError, "", "%s", s) - } - + if err := a.pushNotPending(reqCtx, cache.KeyDaemonSystemPending, cache.KeyDaemonSystem, nodeID); err != nil { + log.Error(fmt.Sprintf("can't push %s %s: %s", cache.KeyDaemonSystem, nodeID, err)) + return JSONProblemf(c, http.StatusInternalServerError, "redis operation", "can't push %s %s: %s", cache.KeyDaemonSystem, nodeID, err) } return c.NoContent(http.StatusNoContent) diff --git a/worker/handler_daemon_ping.go b/worker/job_daemon_ping.go similarity index 99% rename from worker/handler_daemon_ping.go rename to worker/job_daemon_ping.go index 5f11372..e09da55 100644 --- a/worker/handler_daemon_ping.go +++ b/worker/job_daemon_ping.go @@ -109,7 +109,7 @@ func (t *Worker) handleDaemonPing(nodeID string) error { func (d *daemonPing) dropPending() error { if err := d.redis.HDel(d.ctx, cache.KeyDaemonPingPending, d.nodeID).Err(); err != nil { - return fmt.Errorf("dropPending: HDEL %s %s: %w", cache.KeyDaemonStatusPending, d.nodeID, err) + return fmt.Errorf("dropPending: HDEL %s %s: %w", cache.KeyDaemonPingPending, d.nodeID, err) } return nil } diff --git a/worker/handler_daemon_status.go b/worker/job_daemon_status.go similarity index 100% rename from worker/handler_daemon_status.go rename to worker/job_daemon_status.go diff --git a/worker/worker_system.go b/worker/job_daemon_system.go similarity index 97% rename from worker/worker_system.go rename to worker/job_daemon_system.go index 2440a0e..c4a57e8 100644 --- a/worker/worker_system.go +++ b/worker/job_daemon_system.go @@ -8,6 +8,7 @@ import ( "time" "github.com/go-redis/redis/v8" + "github.com/opensvc/oc3/cache" "github.com/opensvc/oc3/mariadb" ) @@ -363,7 +364,11 @@ func (t *Worker) handleSystem(nodeID string) error { ctx := context.Background() ctx, _ = context.WithTimeout(ctx, time.Second*5) - cmd := t.Redis.HGet(ctx, cache.KeySystemHash, nodeID) + if err := t.Redis.HDel(ctx, cache.KeyDaemonSystemPending, nodeID).Err(); err != nil { + return fmt.Errorf("dropPending: HDEL %s %s: %w", cache.KeyDaemonSystemPending, nodeID, err) + } + + cmd := t.Redis.HGet(ctx, cache.KeyDaemonSystemHash, nodeID) result, err := cmd.Result() switch err { case nil: diff --git a/worker/worker_package.go b/worker/job_package.go similarity index 100% rename from worker/worker_package.go rename to worker/job_package.go diff --git a/worker/worker.go b/worker/worker.go index dcf9ae7..fe1aafb 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -83,7 +83,7 @@ func (t *Worker) Run() error { case cache.KeyDaemonPing: workType = "daemonPing" err = t.handleDaemonPing(result[1]) - case cache.KeySystem: + case cache.KeyDaemonSystem: workType = "daemonSystem" err = t.handleSystem(result[1]) case cache.KeyDaemonStatus: