From 502541b07024d39136454a6f91160343dc350812 Mon Sep 17 00:00:00 2001 From: venkataanil Date: Thu, 2 May 2024 11:19:48 +0530 Subject: [PATCH] node ip support 1. client can validate nodeip, helpful for non eip testing 2. merged both checkDurationForEIPAtStartup and checkEIPAndNonEIPUntilStop functionality into one. It measures both startup and failover latency. 3. Earlier we start the latency timer only when httpstatus or response not matching. Now we start when "client.Get" returns any errors --- main.go | 181 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 92 insertions(+), 89 deletions(-) diff --git a/main.go b/main.go index 998ab73..33ed8dc 100644 --- a/main.go +++ b/main.go @@ -23,10 +23,9 @@ const ( serverEnvKey = "EXT_SERVER_HOST" portsEnvKey = "EXT_SERVER_PORTS" egressIPsEnvKey = "EGRESS_IPS" + hostSubnetEnvKey = "HOST_SUBNET" delayBetweenRequestEnvKey = "DELAY_BETWEEN_REQ_SEC" - delayBetweenStartRequestEnvKey = "DELAY_BETWEEN_START_REQ_SEC" reqTimeoutEnvKey = "REQ_TIMEOUT_SEC" - reqStartTimeoutEnvKey = "REQ_START_TIMEOUT_SEC" envKeyErrMsg = "define env key %q" defaultDelayBetweenReqSec = 1 defaultRequestTimeoutSec = 1 @@ -35,26 +34,46 @@ const ( func main() { wg := &sync.WaitGroup{} stop := registerSignalHandler() - extHost, extPorts, egressIPsStr, delayBetweenStartReq, startTimeout, delayBetweenReq, timeout := processEnvVars() - egressIPs := buildEIPMap(egressIPsStr) - startupNonEIPTick, eipStartUpLatency, eipRecoveryLatency, eipTick, nonEIPTick, failure := buildAndRegisterMetrics(delayBetweenReq, delayBetweenStartReq) + extHost, extPorts, egressIPsStr, hostSubnetStr, delayBetweenReq, timeout := processEnvVars() + egressIPs := make(map[string]struct{}) + if egressIPsStr != "" { + egressIPs = buildEIPMap(egressIPsStr) + } + startupNonEIPTick, eipStartUpLatency, eipRecoveryLatency, eipTick, nonEIPTick, failure := buildAndRegisterMetrics(delayBetweenReq) wg.Add(2) startMetricsServer(stop, wg) // begin requests until Egress IP found wg.Add(1) - go checkDurationForEIPAtStartup(stop, wg, egressIPs, extHost, extPorts, startupNonEIPTick, eipStartUpLatency, failure, delayBetweenStartReq, startTimeout) - wg.Add(1) - go checkEIPAndNonEIPUntilStop(stop, wg, egressIPs, extHost, extPorts, eipRecoveryLatency, eipTick, nonEIPTick, failure, delayBetweenReq, timeout) + go checkEIPAndNonEIPUntilStop(stop, wg, egressIPs, hostSubnetStr, extHost, extPorts, eipStartUpLatency, eipRecoveryLatency, startupNonEIPTick, eipTick, nonEIPTick, failure, delayBetweenReq, timeout) wg.Wait() } -func checkEIPAndNonEIPUntilStop(stop <-chan struct{}, wg *sync.WaitGroup, egressIPs map[string]struct{}, extHost, extPorts string, - eipRecoveryLatency *prometheus.Gauge, eipTick, nonEIPTick *prometheus.Gauge, failure *prometheus.Gauge, delayBetweenReq, timeout int) { +func validateHostAddress(ipAddr, subnet string) bool { + ip := net.ParseIP(ipAddr) + if ip == nil { + log.Printf("checkEIPAndNonEIPUntilStop: Error: IP Address is nil") + return false + } + + // Parse the subnet + _, ipNet, err := net.ParseCIDR(subnet) + if err != nil { + log.Printf("checkEIPAndNonEIPUntilStop: Error: Failed to parse subnet: %v", err) + return false + } + + // Check if the IP address is within the subnet + return ipNet.Contains(ip) +} + +func checkEIPAndNonEIPUntilStop(stop <-chan struct{}, wg *sync.WaitGroup, egressIPs map[string]struct{}, hostSubnetStr string, extHost, extPorts string, + eipStartUpLatency, eipRecoveryLatency *prometheus.Gauge, startupNonEIPTick, eipTick, nonEIPTick *prometheus.Gauge, failure *prometheus.Gauge, delayBetweenReq, timeout int) { log.Print("## checkEIPAndNonEIPUntilStop: Polling source IP and increment metric counts for when Egress IP or another IP seen as source IP") defer wg.Done() var done bool start := time.Now() var eipCheckFailed bool + var startupLatencySet bool client := getHTTPClient(timeout) for !done { @@ -66,93 +85,88 @@ func checkEIPAndNonEIPUntilStop(stop <-chan struct{}, wg *sync.WaitGroup, egress url := buildDstURL(extHost, extPorts) res, err := client.Get(url) if err != nil { + if eipCheckFailed == false { + eipCheckFailed = true + start = time.Now() + log.Printf("checkEIPAndNonEIPUntilStop: Error: Failed to talk to %q: %v, Starting recovery timer %v", url, err, start) + } + (*failure).Inc() log.Printf("checkEIPAndNonEIPUntilStop: Error: Failed to talk to %q: %v", url, err) continue } - log.Printf("checkEIPAndNonEIPUntilStop: Reply with HTTP code %s", res.Status) + //log.Printf("checkEIPAndNonEIPUntilStop: Reply with HTTP code %s", res.Status) if res.StatusCode != http.StatusOK { if eipCheckFailed == false { eipCheckFailed = true start = time.Now() + log.Printf("checkEIPAndNonEIPUntilStop: Error: http res.StatusCode %v, Starting recovery timer %v", res.StatusCode, start) } (*failure).Inc() + log.Printf("checkEIPAndNonEIPUntilStop: Error: http res.StatusCode %v", res.StatusCode) continue } resBody, err := ioutil.ReadAll(res.Body) if err != nil { + if eipCheckFailed == false { + eipCheckFailed = true + start = time.Now() + log.Printf("checkEIPAndNonEIPUntilStop: Error: Could not read response body: %s, Starting recovery timer %v", err, start) + } + (*failure).Inc() log.Printf("checkEIPAndNonEIPUntilStop: Error: Could not read response body: %s\n", err) } resBodyStr := string(resBody) if !isIP(resBodyStr) { - panic(fmt.Sprintf("response was not an IP address: %q", resBodyStr)) - } - if _, ok := egressIPs[resBodyStr]; ok { - (*eipTick).Inc() - if eipCheckFailed == true { - eipCheckFailed = false - (*eipRecoveryLatency).Set(time.Now().Sub(start).Seconds()) - start = time.Now() - } - } else { - (*nonEIPTick).Inc() if eipCheckFailed == false { eipCheckFailed = true start = time.Now() + log.Printf("checkEIPAndNonEIPUntilStop: Error: response was not an IP address: %q , Starting recovery timer %v", resBodyStr, start) } - } - } - if delayBetweenReq != 0 { - time.Sleep(time.Duration(delayBetweenReq) * time.Second) - } - } - log.Print("## checkEIPAndNonEIPUntilStop: Finished polling source IP") -} - -func checkDurationForEIPAtStartup(stop <-chan struct{}, wg *sync.WaitGroup, egressIPs map[string]struct{}, extHost, extPorts string, - startupNonEIPTick *prometheus.Gauge, eipStartUpLatency *prometheus.Gauge, failure *prometheus.Gauge, delayBetweenReq, timeout int) { - log.Print("## checkDurationForEIPAtStartup: Polling until Egress IP seen as source IP") - defer wg.Done() - start := time.Now() - var done bool - client := getHTTPClient(timeout) - - for !done { - select { - case <-stop: - done = true - default: - log.Printf("checkDurationForEIPAtStartup: Attempting connection to detect Egress IP at startup") - targetURL := buildDstURL(extHost, extPorts) - res, err := client.Get(targetURL) - if err != nil { - log.Printf("checkDurationForEIPAtStartup: Error: Failed to talk to %q: %v", targetURL, err) - continue - } - log.Printf("checkDurationForEIPAtStartup: Reply with HTTP code %s", res.Status) - if res.StatusCode != http.StatusOK { + log.Printf("checkEIPAndNonEIPUntilStop: Error: response was not an IP address: %q", resBodyStr) (*failure).Inc() - continue - } - resBody, err := ioutil.ReadAll(res.Body) - if err != nil { - log.Printf("checkDurationForEIPAtStartup: Error: Could not read response body: %s\n", err) } - resBodyStr := string(resBody) - if !isIP(resBodyStr) { - panic(fmt.Sprintf("response was not an IP address: %q", resBodyStr)) + valid := false + // validate hostip or eip + if len(egressIPs) == 0 { + valid = validateHostAddress(resBodyStr, hostSubnetStr) + } else { + if _, ok := egressIPs[resBodyStr]; ok { + valid = true + } } - if _, ok := egressIPs[resBodyStr]; ok { - (*eipStartUpLatency).Set(time.Now().Sub(start).Seconds()) - done = true + if valid { + (*eipTick).Inc() + if startupLatencySet == false { + (*eipStartUpLatency).Set(time.Now().Sub(start).Seconds()) + startupLatencySet = true + } else { + if eipCheckFailed == true { + eipCheckFailed = false + (*eipRecoveryLatency).Set(time.Now().Sub(start).Seconds()) + log.Printf("checkEIPAndNonEIPUntilStop: Reporting recovery latency: %v seconds", time.Now().Sub(start).Seconds()) + start = time.Now() + } + } } else { - (*startupNonEIPTick).Inc() + if startupLatencySet == false { + (*startupNonEIPTick).Inc() + } else { + if eipCheckFailed == false { + eipCheckFailed = true + start = time.Now() + log.Printf("checkEIPAndNonEIPUntilStop: Error: EIP not found in response %q, Starting recovery timer %v", resBodyStr, start) + } + (*nonEIPTick).Inc() + log.Printf("checkEIPAndNonEIPUntilStop: Error: EIP not found in response: %q", resBodyStr) + (*failure).Inc() + } } } if delayBetweenReq != 0 { time.Sleep(time.Duration(delayBetweenReq) * time.Second) } } - log.Print("checkDurationForEIPAtStartup: Egress IP seen or stop requested") + log.Print("## checkEIPAndNonEIPUntilStop: Finished polling source IP") } func isIP(s string) bool { @@ -160,6 +174,7 @@ func isIP(s string) bool { } func buildDstURL(host, portRange string) string { + return fmt.Sprintf("http://%s:%s", host, portRange) // Parse the port range string ports := strings.Split(portRange, ":") startPort, _ := strconv.Atoi(ports[0]) @@ -194,7 +209,7 @@ func buildEIPMap(egressIPsStr string) map[string]struct{} { return egressIPMap } -func processEnvVars() (string, string, string, int, int, int, int) { +func processEnvVars() (string, string, string, string, int, int) { var err error extHost := os.Getenv(serverEnvKey) if extHost == "" { @@ -204,9 +219,13 @@ func processEnvVars() (string, string, string, int, int, int, int) { if extPorts == "" { panic(fmt.Sprintf(envKeyErrMsg, portsEnvKey)) } + hostSubnetStr := "" egressIPsStr := os.Getenv(egressIPsEnvKey) if egressIPsStr == "" { - panic(fmt.Sprintf(envKeyErrMsg, egressIPsEnvKey)) + hostSubnetStr = os.Getenv(hostSubnetEnvKey) + if hostSubnetStr == "" { + panic(fmt.Sprintf(envKeyErrMsg, egressIPsEnvKey)) + } } delayBetweenReq := defaultDelayBetweenReqSec @@ -217,14 +236,6 @@ func processEnvVars() (string, string, string, int, int, int, int) { panic(fmt.Sprintf("failed to parse delay between requests: %v", err)) } } - delayBetweenStartReq := defaultDelayBetweenReqSec - delayBetweenStartRequestStr := os.Getenv(delayBetweenStartRequestEnvKey) - if delayBetweenStartRequestStr != "" { - delayBetweenStartReq, err = strconv.Atoi(delayBetweenStartRequestStr) - if err != nil { - panic(fmt.Sprintf("failed to parse delay between start requests: %v", err)) - } - } requestTimeout := defaultRequestTimeoutSec reqTimeoutStr := os.Getenv(reqTimeoutEnvKey) if reqTimeoutStr != "" { @@ -233,15 +244,7 @@ func processEnvVars() (string, string, string, int, int, int, int) { panic(fmt.Sprintf("failed to parse request timeout %q: %v", reqTimeoutStr, err)) } } - requestStartTimeout := defaultRequestTimeoutSec - reqStartTimeoutStr := os.Getenv(reqStartTimeoutEnvKey) - if reqStartTimeoutStr != "" { - requestStartTimeout, err = strconv.Atoi(reqStartTimeoutStr) - if err != nil { - panic(fmt.Sprintf("failed to parse request timeout %q: %v", reqStartTimeoutStr, err)) - } - } - return extHost, extPorts, egressIPsStr, delayBetweenStartReq, requestStartTimeout, delayBetweenReq, requestTimeout + return extHost, extPorts, egressIPsStr, hostSubnetStr, delayBetweenReq, requestTimeout } func registerSignalHandler() chan struct{} { @@ -277,24 +280,24 @@ func startMetricsServer(stop <-chan struct{}, wg *sync.WaitGroup) { }() } -func buildAndRegisterMetrics(delayBetweenReq, delayBetweenStartReq int) (*prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge) { +func buildAndRegisterMetrics(delayBetweenReq int) (*prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge) { var startupNonEIPTick = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "scale", Name: "startup_non_eip_total", - Help: fmt.Sprintf("during startup, increments every time EgressIP not seen as source IP - increments every %d seconds if seen", delayBetweenStartReq), + Help: fmt.Sprintf("during startup, increments every time EgressIP not seen as source IP - increments every %d seconds if seen", delayBetweenReq), }) var eipStartUpLatency = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "scale", Name: "eip_startup_latency_total", Help: fmt.Sprintf("time it takes in seconds for a connection to have a source IP of EgressIP at startup"+ - " with polling interval of %d seconds", delayBetweenStartReq), + " with polling interval of %d seconds", delayBetweenReq), }) var eipRecoveryLatency = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "scale", Name: "eip_recovery_latency", Help: fmt.Sprintf("time it takes in seconds for an Egress IP connection to recover from failure"+ - " with polling interval of %d seconds", delayBetweenStartReq), + " with polling interval of %d seconds", delayBetweenReq), }) var eipTick = prometheus.NewGauge(prometheus.GaugeOpts{