From de92892d8c64f2be68c6c347ae2036a9b013a2c3 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 11 May 2026 08:24:53 +0000 Subject: [PATCH 1/2] fix(distributed): cascade-clean stale node_models on drain and filter routing by healthy status MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stale node_models rows (state="loaded") were surviving past the healthy state of their owning node, causing /embeddings (and other inference paths) to dispatch to a backend whose process was gone or drained. The downstream symptom in a live cluster was pgvector rejecting inserts with "vector cannot have more than 16000 dimensions (SQLSTATE 54000)" because the misbehaving backend silently returned a malformed (oversized) tensor; the Models page showed the model as "running" without an associated node, like a stale entry, even though the node was no longer visible in the Nodes view. Two changes here, plus a third in a follow-up commit: - MarkDraining now cascade-deletes node_models rows for the affected node, mirroring MarkOffline. Drains are explicit operator actions — the box has been intentionally taken out of rotation — so clearing the rows stops the Models UI from misreporting and prevents the routing layer from picking those rows if scheduling logic is ever relaxed. In-flight requests already hold their gRPC client through Route() and finish normally; the only observable effect is a non-fatal IncrementInFlight warning, acceptable for a drain. MarkUnhealthy is deliberately left status-only: it fires from managers_distributed / reconciler on a single nats.ErrNoResponders with no retry, so a transient NATS hiccup must not nuke every loaded model and force a full reload on recovery. - FindAndLockNodeWithModel's inner JOIN now filters on backend_nodes.status = healthy in addition to node_models.state = loaded. The previous version relied on the second node-fetch step to reject non-healthy nodes, but a concurrent reader could still pick the same stale row in the same window. Belt-and-braces. - DistributedConfig.PerModelHealthCheck renamed to DisablePerModelHealthCheck and inverted at the call site so per-model gRPC probing is on by default. The probe (now made consecutive-miss aware in a follow-up commit) independently health- checks each model's gRPC address and removes stale node_models rows when the backend has crashed even though the worker's node-level heartbeat is still arriving. Migration: the field had no CLI flag, env var binding, or YAML key in tree (only the bare struct field), so there is no user-facing migration. Anything constructing DistributedConfig in code needs to drop the assignment (default now does the right thing) or invert it. Assisted-by: Claude:claude-opus-4-7 go-vet go-test golangci-lint Signed-off-by: Ettore Di Giacinto --- core/application/distributed.go | 2 +- core/config/distributed_config.go | 10 ++++++++- core/services/nodes/registry.go | 37 +++++++++++++++++++++++++++---- 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/core/application/distributed.go b/core/application/distributed.go index 2a77d5b3c659..7662992dfd90 100644 --- a/core/application/distributed.go +++ b/core/application/distributed.go @@ -169,7 +169,7 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade cfg.Distributed.HealthCheckIntervalOrDefault(), cfg.Distributed.StaleNodeThresholdOrDefault(), routerAuthToken, - cfg.Distributed.PerModelHealthCheck, + !cfg.Distributed.DisablePerModelHealthCheck, ) // Initialize job store diff --git a/core/config/distributed_config.go b/core/config/distributed_config.go index 8fc7f6518d35..0b77d1ffde36 100644 --- a/core/config/distributed_config.go +++ b/core/config/distributed_config.go @@ -31,7 +31,15 @@ type DistributedConfig struct { DrainTimeout time.Duration // Time to wait for in-flight requests during drain (default 30s) HealthCheckInterval time.Duration // Health monitor check interval (default 15s) StaleNodeThreshold time.Duration // Time before a node is considered stale (default 60s) - PerModelHealthCheck bool // Enable per-model backend health checking (default false) + // DisablePerModelHealthCheck turns off the health monitor's per-model + // gRPC probe. When enabled (the default), the monitor pings each model's + // gRPC address and removes stale node_models rows whose backend has + // crashed even though the worker's node-level heartbeat is still arriving. + // Without per-model probing, /embeddings and /completions can be dispatched + // to a backend that silently returns garbage (see also the cascading + // model-row cleanup on MarkUnhealthy / MarkDraining). + DisablePerModelHealthCheck bool + MCPCIJobTimeout time.Duration // MCP CI job execution timeout (default 10m) MaxUploadSize int64 // Maximum upload body size in bytes (default 50 GB) diff --git a/core/services/nodes/registry.go b/core/services/nodes/registry.go index 6f73c84b3e94..8849cf6e2501 100644 --- a/core/services/nodes/registry.go +++ b/core/services/nodes/registry.go @@ -546,7 +546,13 @@ func (r *NodeRegistry) GetByName(ctx context.Context, name string) (*BackendNode return &node, nil } -// MarkUnhealthy sets a node status to unhealthy. +// MarkUnhealthy sets a node status to unhealthy. Deliberately status-only: +// callers fire this on transient triggers (a single nats.ErrNoResponders from +// managers_distributed / reconciler) where the next heartbeat is expected to +// flip the node back to healthy, and cascade-deleting node_models here would +// force a full model reload on every brief NATS hiccup. Stale rows are reaped +// by the per-model health probe (on by default; see HealthMonitor) and by +// MarkOffline when the heartbeat really has gone away. func (r *NodeRegistry) MarkUnhealthy(ctx context.Context, nodeID string) error { return r.setStatus(ctx, nodeID, StatusUnhealthy) } @@ -556,9 +562,23 @@ func (r *NodeRegistry) MarkHealthy(ctx context.Context, nodeID string) error { return r.setStatus(ctx, nodeID, StatusHealthy) } -// MarkDraining sets a node status to draining (no new requests). +// MarkDraining sets a node status to draining (no new requests) and clears its +// model records. Routing already filters out non-healthy nodes, so removing +// the rows on drain doesn't change new-request behavior — but it does stop the +// Models UI from showing the node's models as "running" while the box has been +// taken out of rotation, and it prevents stale rows from being selected if +// (re)scheduling logic gets relaxed elsewhere. In-flight requests already hold +// their gRPC client through Route() and will finish normally; the only +// observable effect is that the per-call IncrementInFlight bookkeeping logs a +// non-fatal warning, which is acceptable for a drain. func (r *NodeRegistry) MarkDraining(ctx context.Context, nodeID string) error { - return r.setStatus(ctx, nodeID, StatusDraining) + if err := r.setStatus(ctx, nodeID, StatusDraining); err != nil { + return err + } + if err := r.db.WithContext(ctx).Where("node_id = ?", nodeID).Delete(&NodeModel{}).Error; err != nil { + xlog.Warn("Failed to clear model records on draining", "node", nodeID, "error", err) + } + return nil } // FindStaleNodes returns nodes that haven't sent a heartbeat within the given threshold. @@ -673,9 +693,18 @@ func (r *NodeRegistry) FindAndLockNodeWithModel(ctx context.Context, modelName s // to moderate concurrency where requests don't overlap) collapses to // "biggest GPU wins every time" and one node ends up taking nearly all // the load while replicas on other nodes sit idle. + // Filter on backend_nodes.status = healthy in the inner JOIN itself, + // not only in the later node-fetch step. The previous version picked + // a (node_id, replica) pair purely on node_models state, then bailed + // out when the second query couldn't find a healthy node row — but + // any concurrent reader of node_models could still pick the same + // stale row in the same window, and other helpers that mirror this + // JOIN need the same invariant. Belt-and-braces: status filter here + // AND the status-checked node fetch below. q := tx.Clauses(clause.Locking{Strength: "UPDATE"}). Joins("JOIN backend_nodes ON backend_nodes.id = node_models.node_id"). - Where("node_models.model_name = ? AND node_models.state = ?", modelName, "loaded") + Where("node_models.model_name = ? AND node_models.state = ? AND backend_nodes.status = ?", + modelName, "loaded", StatusHealthy) if len(candidateNodeIDs) > 0 { q = q.Where("node_models.node_id IN ?", candidateNodeIDs) } From 9029ec2cd26b9685941a95f73814ae8b7440dc97 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 11 May 2026 08:25:07 +0000 Subject: [PATCH 2/2] fix(distributed): require consecutive misses before per-model probe removes a row MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The per-model gRPC probe used to remove a node_models row on a single failed health check. With the per-model probe now on by default, that made any 5-second gRPC blip (network jitter, a long-running request hogging the worker's gRPC server thread, brief GC pause) trigger a full reload of the affected model — too eager for production. Require perModelMissThreshold (3) consecutive failed probes before removal. At the default 15s tick a model must be unreachable for ~45s before reap; a single successful probe in between resets the streak. Per-(node, model, replica) state tracked under a mutex on the monitor. If the removal call itself fails, the miss counter is left in place so the next tick retries rather than starting the streak over. Tests: - removes stale model via per-model health check after consecutive failures (replaces the single-shot expectation) - preserves model row when an intermittent failure is followed by a success (covers the reset-on-success path and verifies the counter reset by failing twice more without crossing threshold) - newTestHealthMonitor initializes the misses map so direct-construct test helpers don't nil-map-panic in the probe path Assisted-by: Claude:claude-opus-4-7 go-vet go-test golangci-lint Signed-off-by: Ettore Di Giacinto --- core/services/nodes/health.go | 67 ++++++++++++++++++++++--- core/services/nodes/health_mock_test.go | 1 + core/services/nodes/health_test.go | 42 +++++++++++++++- 3 files changed, 101 insertions(+), 9 deletions(-) diff --git a/core/services/nodes/health.go b/core/services/nodes/health.go index ad570fd8192d..ffe1cfa0e2e5 100644 --- a/core/services/nodes/health.go +++ b/core/services/nodes/health.go @@ -12,6 +12,24 @@ import ( "gorm.io/gorm" ) +// perModelMissThreshold is the number of consecutive failed gRPC probes +// against a model's backend before the model is removed from the registry. +// A single failure can be transient (network blip, brief GC pause on the +// worker, a long-running request hogging the gRPC server thread); requiring +// N consecutive misses avoids deleting healthy rows over noise. At the +// default 15s tick this means a model has to be unreachable for ~45s before +// it gets reaped. +const perModelMissThreshold = 3 + +// modelKey identifies a specific (node, model, replica) tuple. We track miss +// counts per tuple because the same model name can be loaded on multiple +// replicas on the same node. +type modelKey struct { + NodeID string + ModelName string + ReplicaIndex int +} + // HealthMonitor periodically checks the health of registered backend nodes. type HealthMonitor struct { registry NodeHealthStore @@ -21,6 +39,8 @@ type HealthMonitor struct { autoOffline bool // mark stale nodes as offline (preserves approval status) clientFactory BackendClientFactory // creates gRPC backend clients perModelHealthCheck bool // check each model's backend process individually + missesMu sync.Mutex + misses map[modelKey]int // consecutive failed-probe counts; reset on success or model removal cancel context.CancelFunc cancelMu sync.Mutex } @@ -46,6 +66,7 @@ func NewHealthMonitor(registry NodeHealthStore, db *gorm.DB, checkInterval, stal autoOffline: true, clientFactory: factory, perModelHealthCheck: perModelHealthCheck, + misses: make(map[modelKey]int), } } @@ -152,9 +173,11 @@ func (hm *HealthMonitor) doCheckAll(ctx context.Context) { } } - // Per-model backend health check (opt-in): probe each model's gRPC address - // and remove stale model records. This does NOT affect the node's status — - // a crashed backend process is a model-level issue, not a node-level one. + // Per-model backend health check: probe each model's gRPC address and + // remove stale model records. This does NOT affect the node's status — + // a crashed backend process is a model-level issue, not a node-level + // one. A model is only removed after perModelMissThreshold consecutive + // failed probes so a single network/GC blip doesn't force a reload. if hm.perModelHealthCheck { models, _ := hm.registry.GetNodeModels(ctx, node.ID) for _, m := range models { @@ -163,15 +186,43 @@ func (hm *HealthMonitor) doCheckAll(ctx context.Context) { } mClient := hm.clientFactory.NewClient(m.Address, false) mCheckCtx, mCancel := context.WithTimeout(ctx, 5*time.Second) - if ok, _ := mClient.HealthCheck(mCheckCtx); !ok { - xlog.Warn("Model backend unhealthy, removing from registry", - "node", node.ID, "model", m.ModelName, "replica", m.ReplicaIndex, "address", m.Address) - hm.registry.RemoveNodeModel(ctx, node.ID, m.ModelName, m.ReplicaIndex) - } + ok, _ := mClient.HealthCheck(mCheckCtx) mCancel() if closer, ok := mClient.(io.Closer); ok { closer.Close() } + + key := modelKey{NodeID: node.ID, ModelName: m.ModelName, ReplicaIndex: m.ReplicaIndex} + hm.missesMu.Lock() + if ok { + // Probe succeeded — wipe any previous miss streak. + delete(hm.misses, key) + hm.missesMu.Unlock() + continue + } + hm.misses[key]++ + misses := hm.misses[key] + hm.missesMu.Unlock() + + if misses < perModelMissThreshold { + xlog.Debug("Model backend probe failed, awaiting threshold before removal", + "node", node.ID, "model", m.ModelName, "replica", m.ReplicaIndex, + "address", m.Address, "misses", misses, "threshold", perModelMissThreshold) + continue + } + xlog.Warn("Model backend unhealthy after consecutive misses, removing from registry", + "node", node.ID, "model", m.ModelName, "replica", m.ReplicaIndex, + "address", m.Address, "misses", misses) + if err := hm.registry.RemoveNodeModel(ctx, node.ID, m.ModelName, m.ReplicaIndex); err != nil { + xlog.Warn("Failed to remove unhealthy model from registry", + "node", node.ID, "model", m.ModelName, "replica", m.ReplicaIndex, "error", err) + // Leave the miss counter in place so the next tick retries + // the removal rather than starting the streak over. + continue + } + hm.missesMu.Lock() + delete(hm.misses, key) + hm.missesMu.Unlock() } } } diff --git a/core/services/nodes/health_mock_test.go b/core/services/nodes/health_mock_test.go index f4a70626154b..55056581e2c9 100644 --- a/core/services/nodes/health_mock_test.go +++ b/core/services/nodes/health_mock_test.go @@ -321,6 +321,7 @@ func newTestHealthMonitor(store NodeHealthStore, factory BackendClientFactory, a staleThreshold: staleThreshold, autoOffline: autoOffline, clientFactory: factory, + misses: make(map[modelKey]int), } } diff --git a/core/services/nodes/health_test.go b/core/services/nodes/health_test.go index d0f928db24e0..c78ccfffe0d4 100644 --- a/core/services/nodes/health_test.go +++ b/core/services/nodes/health_test.go @@ -255,7 +255,7 @@ var _ = Describe("HealthMonitor (mock-based)", func() { Expect(calls).NotTo(ContainElement(ContainSubstring("MarkUnhealthy"))) }) - It("removes stale model via per-model health check without affecting node status", func() { + It("removes stale model via per-model health check after consecutive failures", func() { store := newFakeNodeHealthStore() factory := newFakeBackendClientFactory() hm := newTestHealthMonitor(store, factory, true, staleThreshold) @@ -268,6 +268,15 @@ var _ = Describe("HealthMonitor (mock-based)", func() { // Model backend is dead factory.setClient("10.0.0.10:50053", &fakeBackendClient{healthy: false, err: fmt.Errorf("connection refused")}) + // First (perModelMissThreshold-1) probes must NOT remove the row — + // a single failure could be a transient blip. + for i := 0; i < perModelMissThreshold-1; i++ { + hm.doCheckAll(context.Background()) + Expect(store.getCalls()).NotTo(ContainElement(ContainSubstring("RemoveNodeModel")), + "removed too early at miss %d", i+1) + } + + // Threshold-th consecutive miss triggers removal. hm.doCheckAll(context.Background()) // Node should remain healthy — only the specific replica record is removed. @@ -275,5 +284,36 @@ var _ = Describe("HealthMonitor (mock-based)", func() { Expect(store.getCalls()).To(ContainElement("RemoveNodeModel:node-model:piper-model:0")) Expect(store.getCalls()).NotTo(ContainElement(ContainSubstring("MarkUnhealthy"))) }) + + It("preserves model row when an intermittent failure is followed by a success", func() { + store := newFakeNodeHealthStore() + factory := newFakeBackendClientFactory() + hm := newTestHealthMonitor(store, factory, true, staleThreshold) + hm.perModelHealthCheck = true + + node := makeTestNode("node-flap", "flap-worker", "10.0.0.11:50051", StatusHealthy, freshTime()) + store.addNode(node) + store.addNodeModel("node-flap", NodeModel{NodeID: "node-flap", ModelName: "piper-model", Address: "10.0.0.11:50053"}) + + deadClient := &fakeBackendClient{healthy: false, err: fmt.Errorf("connection refused")} + liveClient := &fakeBackendClient{healthy: true} + + // Two failing probes then a recovery — should NOT remove the row, + // and should reset the miss counter so two more failures don't tip + // it over. + factory.setClient("10.0.0.11:50053", deadClient) + hm.doCheckAll(context.Background()) + hm.doCheckAll(context.Background()) + factory.setClient("10.0.0.11:50053", liveClient) + hm.doCheckAll(context.Background()) + + Expect(store.getCalls()).NotTo(ContainElement(ContainSubstring("RemoveNodeModel"))) + + // Counter is reset; two more failures must not be enough to remove. + factory.setClient("10.0.0.11:50053", deadClient) + hm.doCheckAll(context.Background()) + hm.doCheckAll(context.Background()) + Expect(store.getCalls()).NotTo(ContainElement(ContainSubstring("RemoveNodeModel"))) + }) }) })