From 8448efa7fcc6e592b61acae0f69793535f1356f1 Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Fri, 1 Nov 2024 09:42:37 -0700 Subject: [PATCH 1/2] revise health check logic to not error on 5 second timeout --- config.example.yaml | 7 +++++-- proxy/manager.go | 23 +++++++++++++---------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/config.example.yaml b/config.example.yaml index fc738e5..4e51a76 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -35,7 +35,10 @@ models: # until the upstream server is ready for traffic checkEndpoint: none - # don't use this, just for testing if things are broken + # don't use these, just for testing if things are broken "broken": cmd: models/llama-server-osx --port 8999 -m models/doesnotexist.gguf - proxy: http://127.0.0.1:8999 \ No newline at end of file + proxy: http://127.0.0.1:8999 + "broken_timeout": + cmd: models/llama-server-osx --port 8999 -m models/Qwen2.5-1.5B-Instruct-Q4_K_M.gguf + proxy: http://127.0.0.1:9000 \ No newline at end of file diff --git a/proxy/manager.go b/proxy/manager.go index 0107c51..abafcae 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -187,26 +187,28 @@ func (pm *ProxyManager) checkHealthEndpoint() error { if err != nil { return err } + ctx, cancel := context.WithTimeout(req.Context(), 250*time.Millisecond) defer cancel() req = req.WithContext(ctx) resp, err := client.Do(req) - if err != nil { - if strings.Contains(err.Error(), "connection refused") { - - // if TCP dial can't connect any HTTP response after 5 seconds - // exit quickly. - if time.Since(startTime) > 5*time.Second { - return fmt.Errorf("health check endpoint took more than 5 seconds to respond") - } - } + if err != nil { if time.Since(startTime) >= maxDuration { return fmt.Errorf("failed to check health from: %s", healthURL) } - time.Sleep(time.Second) + + // wait a bit longer for TCP connection issues + if strings.Contains(err.Error(), "connection refused") { + fmt.Fprintf(pm.logMonitor, "Connection refused on %s\n", healthURL) + time.Sleep(5 * time.Second) + } else { + time.Sleep(time.Second) + } + continue } + defer resp.Body.Close() if resp.StatusCode == http.StatusOK { return nil @@ -214,6 +216,7 @@ func (pm *ProxyManager) checkHealthEndpoint() error { if time.Since(startTime) >= maxDuration { return fmt.Errorf("failed to check health from: %s", healthURL) } + time.Sleep(time.Second) } } From 34f9fd73408a2637a238ab4b58a7976005b09f91 Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Fri, 1 Nov 2024 14:32:39 -0700 Subject: [PATCH 2/2] Improve timeout and exit handling of child processes. fix #3 and #5 llama-swap only waited a maximum of 5 seconds for an upstream HTTP server to be available. If it took longer than that it will error out the request. Now it will wait up to the configured healthCheckTimeout or the upstream process unexpectedly exits. --- config.example.yaml | 2 +- proxy/manager.go | 42 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/config.example.yaml b/config.example.yaml index 4e51a76..315951c 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -1,6 +1,6 @@ # Seconds to wait for llama.cpp to be available to serve requests # Default (and minimum): 15 seconds -healthCheckTimeout: 60 +healthCheckTimeout: 15 models: "llama": diff --git a/proxy/manager.go b/proxy/manager.go index abafcae..db5d0e8 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -149,14 +149,28 @@ func (pm *ProxyManager) swapModel(requestedModel string) error { } pm.currentCmd = cmd - if err := pm.checkHealthEndpoint(); err != nil { + // watch for the command to exist + cmdCtx, cancel := context.WithCancelCause(context.Background()) + + // monitor the command's exist status + go func() { + err := cmd.Wait() + if err != nil { + cancel(fmt.Errorf("command [%s] %s", strings.Join(cmd.Args, " "), err.Error())) + } else { + cancel(nil) + } + }() + + // wait for checkHealthEndpoint + if err := pm.checkHealthEndpoint(cmdCtx); err != nil { return err } return nil } -func (pm *ProxyManager) checkHealthEndpoint() error { +func (pm *ProxyManager) checkHealthEndpoint(cmdCtx context.Context) error { if pm.currentConfig.Proxy == "" { return fmt.Errorf("no upstream available to check /health") @@ -179,6 +193,7 @@ func (pm *ProxyManager) checkHealthEndpoint() error { if err != nil { return fmt.Errorf("failed to create health url with with %s and path %s", proxyTo, checkEndpoint) } + client := &http.Client{} startTime := time.Now() @@ -188,24 +203,34 @@ func (pm *ProxyManager) checkHealthEndpoint() error { return err } - ctx, cancel := context.WithTimeout(req.Context(), 250*time.Millisecond) + ctx, cancel := context.WithTimeout(cmdCtx, 250*time.Millisecond) defer cancel() req = req.WithContext(ctx) resp, err := client.Do(req) + ttl := (maxDuration - time.Since(startTime)).Seconds() + if err != nil { - if time.Since(startTime) >= maxDuration { - return fmt.Errorf("failed to check health from: %s", healthURL) + // check if the context was cancelled + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: } // wait a bit longer for TCP connection issues if strings.Contains(err.Error(), "connection refused") { - fmt.Fprintf(pm.logMonitor, "Connection refused on %s\n", healthURL) + fmt.Fprintf(pm.logMonitor, "Connection refused on %s, ttl %.0fs\n", healthURL, ttl) + time.Sleep(5 * time.Second) } else { time.Sleep(time.Second) } + if ttl < 0 { + return fmt.Errorf("failed to check health from: %s", healthURL) + } + continue } @@ -213,7 +238,8 @@ func (pm *ProxyManager) checkHealthEndpoint() error { if resp.StatusCode == http.StatusOK { return nil } - if time.Since(startTime) >= maxDuration { + + if ttl < 0 { return fmt.Errorf("failed to check health from: %s", healthURL) } @@ -239,7 +265,7 @@ func (pm *ProxyManager) proxyChatRequest(w http.ResponseWriter, r *http.Request) } if err := pm.swapModel(model); err != nil { - http.Error(w, fmt.Sprintf("unable to swap to model: %s", err.Error()), http.StatusNotFound) + http.Error(w, fmt.Sprintf("unable to swap to model, %s", err.Error()), http.StatusNotFound) return }