Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node ip support #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 92 additions & 89 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ const (
serverEnvKey = "EXT_SERVER_HOST"
portsEnvKey = "EXT_SERVER_PORTS"
egressIPsEnvKey = "EGRESS_IPS"
hostSubnetEnvKey = "HOST_SUBNET"
delayBetweenRequestEnvKey = "DELAY_BETWEEN_REQ_SEC"
delayBetweenStartRequestEnvKey = "DELAY_BETWEEN_START_REQ_SEC"
reqTimeoutEnvKey = "REQ_TIMEOUT_SEC"
reqStartTimeoutEnvKey = "REQ_START_TIMEOUT_SEC"
envKeyErrMsg = "define env key %q"
defaultDelayBetweenReqSec = 1
defaultRequestTimeoutSec = 1
Expand All @@ -35,26 +34,46 @@ const (
func main() {
wg := &sync.WaitGroup{}
stop := registerSignalHandler()
extHost, extPorts, egressIPsStr, delayBetweenStartReq, startTimeout, delayBetweenReq, timeout := processEnvVars()
egressIPs := buildEIPMap(egressIPsStr)
startupNonEIPTick, eipStartUpLatency, eipRecoveryLatency, eipTick, nonEIPTick, failure := buildAndRegisterMetrics(delayBetweenReq, delayBetweenStartReq)
extHost, extPorts, egressIPsStr, hostSubnetStr, delayBetweenReq, timeout := processEnvVars()
egressIPs := make(map[string]struct{})
if egressIPsStr != "" {
egressIPs = buildEIPMap(egressIPsStr)
}
startupNonEIPTick, eipStartUpLatency, eipRecoveryLatency, eipTick, nonEIPTick, failure := buildAndRegisterMetrics(delayBetweenReq)
wg.Add(2)
startMetricsServer(stop, wg)
// begin requests until Egress IP found
wg.Add(1)
go checkDurationForEIPAtStartup(stop, wg, egressIPs, extHost, extPorts, startupNonEIPTick, eipStartUpLatency, failure, delayBetweenStartReq, startTimeout)
wg.Add(1)
go checkEIPAndNonEIPUntilStop(stop, wg, egressIPs, extHost, extPorts, eipRecoveryLatency, eipTick, nonEIPTick, failure, delayBetweenReq, timeout)
go checkEIPAndNonEIPUntilStop(stop, wg, egressIPs, hostSubnetStr, extHost, extPorts, eipStartUpLatency, eipRecoveryLatency, startupNonEIPTick, eipTick, nonEIPTick, failure, delayBetweenReq, timeout)
wg.Wait()
}

func checkEIPAndNonEIPUntilStop(stop <-chan struct{}, wg *sync.WaitGroup, egressIPs map[string]struct{}, extHost, extPorts string,
eipRecoveryLatency *prometheus.Gauge, eipTick, nonEIPTick *prometheus.Gauge, failure *prometheus.Gauge, delayBetweenReq, timeout int) {
func validateHostAddress(ipAddr, subnet string) bool {
ip := net.ParseIP(ipAddr)
if ip == nil {
log.Printf("checkEIPAndNonEIPUntilStop: Error: IP Address is nil")
return false
}

// Parse the subnet
_, ipNet, err := net.ParseCIDR(subnet)
if err != nil {
log.Printf("checkEIPAndNonEIPUntilStop: Error: Failed to parse subnet: %v", err)
return false
}

// Check if the IP address is within the subnet
return ipNet.Contains(ip)
}

func checkEIPAndNonEIPUntilStop(stop <-chan struct{}, wg *sync.WaitGroup, egressIPs map[string]struct{}, hostSubnetStr string, extHost, extPorts string,
eipStartUpLatency, eipRecoveryLatency *prometheus.Gauge, startupNonEIPTick, eipTick, nonEIPTick *prometheus.Gauge, failure *prometheus.Gauge, delayBetweenReq, timeout int) {
log.Print("## checkEIPAndNonEIPUntilStop: Polling source IP and increment metric counts for when Egress IP or another IP seen as source IP")
defer wg.Done()
var done bool
start := time.Now()
var eipCheckFailed bool
var startupLatencySet bool
client := getHTTPClient(timeout)

for !done {
Expand All @@ -66,100 +85,96 @@ func checkEIPAndNonEIPUntilStop(stop <-chan struct{}, wg *sync.WaitGroup, egress
url := buildDstURL(extHost, extPorts)
res, err := client.Get(url)
if err != nil {
if eipCheckFailed == false {
eipCheckFailed = true
start = time.Now()
log.Printf("checkEIPAndNonEIPUntilStop: Error: Failed to talk to %q: %v, Starting recovery timer %v", url, err, start)
}
(*failure).Inc()
log.Printf("checkEIPAndNonEIPUntilStop: Error: Failed to talk to %q: %v", url, err)
continue
}
log.Printf("checkEIPAndNonEIPUntilStop: Reply with HTTP code %s", res.Status)
//log.Printf("checkEIPAndNonEIPUntilStop: Reply with HTTP code %s", res.Status)
if res.StatusCode != http.StatusOK {
if eipCheckFailed == false {
eipCheckFailed = true
start = time.Now()
log.Printf("checkEIPAndNonEIPUntilStop: Error: http res.StatusCode %v, Starting recovery timer %v", res.StatusCode, start)
}
(*failure).Inc()
log.Printf("checkEIPAndNonEIPUntilStop: Error: http res.StatusCode %v", res.StatusCode)
continue
}
resBody, err := ioutil.ReadAll(res.Body)
if err != nil {
if eipCheckFailed == false {
eipCheckFailed = true
start = time.Now()
log.Printf("checkEIPAndNonEIPUntilStop: Error: Could not read response body: %s, Starting recovery timer %v", err, start)
}
(*failure).Inc()
log.Printf("checkEIPAndNonEIPUntilStop: Error: Could not read response body: %s\n", err)
}
resBodyStr := string(resBody)
if !isIP(resBodyStr) {
panic(fmt.Sprintf("response was not an IP address: %q", resBodyStr))
}
if _, ok := egressIPs[resBodyStr]; ok {
(*eipTick).Inc()
if eipCheckFailed == true {
eipCheckFailed = false
(*eipRecoveryLatency).Set(time.Now().Sub(start).Seconds())
start = time.Now()
}
} else {
(*nonEIPTick).Inc()
if eipCheckFailed == false {
eipCheckFailed = true
start = time.Now()
log.Printf("checkEIPAndNonEIPUntilStop: Error: response was not an IP address: %q , Starting recovery timer %v", resBodyStr, start)
}
}
}
if delayBetweenReq != 0 {
time.Sleep(time.Duration(delayBetweenReq) * time.Second)
}
}
log.Print("## checkEIPAndNonEIPUntilStop: Finished polling source IP")
}

func checkDurationForEIPAtStartup(stop <-chan struct{}, wg *sync.WaitGroup, egressIPs map[string]struct{}, extHost, extPorts string,
startupNonEIPTick *prometheus.Gauge, eipStartUpLatency *prometheus.Gauge, failure *prometheus.Gauge, delayBetweenReq, timeout int) {
log.Print("## checkDurationForEIPAtStartup: Polling until Egress IP seen as source IP")
defer wg.Done()
start := time.Now()
var done bool
client := getHTTPClient(timeout)

for !done {
select {
case <-stop:
done = true
default:
log.Printf("checkDurationForEIPAtStartup: Attempting connection to detect Egress IP at startup")
targetURL := buildDstURL(extHost, extPorts)
res, err := client.Get(targetURL)
if err != nil {
log.Printf("checkDurationForEIPAtStartup: Error: Failed to talk to %q: %v", targetURL, err)
continue
}
log.Printf("checkDurationForEIPAtStartup: Reply with HTTP code %s", res.Status)
if res.StatusCode != http.StatusOK {
log.Printf("checkEIPAndNonEIPUntilStop: Error: response was not an IP address: %q", resBodyStr)
(*failure).Inc()
continue
}
resBody, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Printf("checkDurationForEIPAtStartup: Error: Could not read response body: %s\n", err)
}
resBodyStr := string(resBody)
if !isIP(resBodyStr) {
panic(fmt.Sprintf("response was not an IP address: %q", resBodyStr))
valid := false
// validate hostip or eip
if len(egressIPs) == 0 {
valid = validateHostAddress(resBodyStr, hostSubnetStr)
} else {
if _, ok := egressIPs[resBodyStr]; ok {
valid = true
}
}
if _, ok := egressIPs[resBodyStr]; ok {
(*eipStartUpLatency).Set(time.Now().Sub(start).Seconds())
done = true
if valid {
(*eipTick).Inc()
if startupLatencySet == false {
(*eipStartUpLatency).Set(time.Now().Sub(start).Seconds())
startupLatencySet = true
} else {
if eipCheckFailed == true {
eipCheckFailed = false
(*eipRecoveryLatency).Set(time.Now().Sub(start).Seconds())
log.Printf("checkEIPAndNonEIPUntilStop: Reporting recovery latency: %v seconds", time.Now().Sub(start).Seconds())
start = time.Now()
}
}
} else {
(*startupNonEIPTick).Inc()
if startupLatencySet == false {
(*startupNonEIPTick).Inc()
} else {
if eipCheckFailed == false {
eipCheckFailed = true
start = time.Now()
log.Printf("checkEIPAndNonEIPUntilStop: Error: EIP not found in response %q, Starting recovery timer %v", resBodyStr, start)
}
(*nonEIPTick).Inc()
log.Printf("checkEIPAndNonEIPUntilStop: Error: EIP not found in response: %q", resBodyStr)
(*failure).Inc()
}
}
}
if delayBetweenReq != 0 {
time.Sleep(time.Duration(delayBetweenReq) * time.Second)
}
}
log.Print("checkDurationForEIPAtStartup: Egress IP seen or stop requested")
log.Print("## checkEIPAndNonEIPUntilStop: Finished polling source IP")
}

func isIP(s string) bool {
return net.ParseIP(s) != nil
}

func buildDstURL(host, portRange string) string {
return fmt.Sprintf("http://%s:%s", host, portRange)
// Parse the port range string
ports := strings.Split(portRange, ":")
startPort, _ := strconv.Atoi(ports[0])
Expand Down Expand Up @@ -194,7 +209,7 @@ func buildEIPMap(egressIPsStr string) map[string]struct{} {
return egressIPMap
}

func processEnvVars() (string, string, string, int, int, int, int) {
func processEnvVars() (string, string, string, string, int, int) {
var err error
extHost := os.Getenv(serverEnvKey)
if extHost == "" {
Expand All @@ -204,9 +219,13 @@ func processEnvVars() (string, string, string, int, int, int, int) {
if extPorts == "" {
panic(fmt.Sprintf(envKeyErrMsg, portsEnvKey))
}
hostSubnetStr := ""
egressIPsStr := os.Getenv(egressIPsEnvKey)
if egressIPsStr == "" {
panic(fmt.Sprintf(envKeyErrMsg, egressIPsEnvKey))
hostSubnetStr = os.Getenv(hostSubnetEnvKey)
if hostSubnetStr == "" {
panic(fmt.Sprintf(envKeyErrMsg, egressIPsEnvKey))
}
}

delayBetweenReq := defaultDelayBetweenReqSec
Expand All @@ -217,14 +236,6 @@ func processEnvVars() (string, string, string, int, int, int, int) {
panic(fmt.Sprintf("failed to parse delay between requests: %v", err))
}
}
delayBetweenStartReq := defaultDelayBetweenReqSec
delayBetweenStartRequestStr := os.Getenv(delayBetweenStartRequestEnvKey)
if delayBetweenStartRequestStr != "" {
delayBetweenStartReq, err = strconv.Atoi(delayBetweenStartRequestStr)
if err != nil {
panic(fmt.Sprintf("failed to parse delay between start requests: %v", err))
}
}
requestTimeout := defaultRequestTimeoutSec
reqTimeoutStr := os.Getenv(reqTimeoutEnvKey)
if reqTimeoutStr != "" {
Expand All @@ -233,15 +244,7 @@ func processEnvVars() (string, string, string, int, int, int, int) {
panic(fmt.Sprintf("failed to parse request timeout %q: %v", reqTimeoutStr, err))
}
}
requestStartTimeout := defaultRequestTimeoutSec
reqStartTimeoutStr := os.Getenv(reqStartTimeoutEnvKey)
if reqStartTimeoutStr != "" {
requestStartTimeout, err = strconv.Atoi(reqStartTimeoutStr)
if err != nil {
panic(fmt.Sprintf("failed to parse request timeout %q: %v", reqStartTimeoutStr, err))
}
}
return extHost, extPorts, egressIPsStr, delayBetweenStartReq, requestStartTimeout, delayBetweenReq, requestTimeout
return extHost, extPorts, egressIPsStr, hostSubnetStr, delayBetweenReq, requestTimeout
}

func registerSignalHandler() chan struct{} {
Expand Down Expand Up @@ -277,24 +280,24 @@ func startMetricsServer(stop <-chan struct{}, wg *sync.WaitGroup) {
}()
}

func buildAndRegisterMetrics(delayBetweenReq, delayBetweenStartReq int) (*prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge) {
func buildAndRegisterMetrics(delayBetweenReq int) (*prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge, *prometheus.Gauge) {
var startupNonEIPTick = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "scale",
Name: "startup_non_eip_total",
Help: fmt.Sprintf("during startup, increments every time EgressIP not seen as source IP - increments every %d seconds if seen", delayBetweenStartReq),
Help: fmt.Sprintf("during startup, increments every time EgressIP not seen as source IP - increments every %d seconds if seen", delayBetweenReq),
})

var eipStartUpLatency = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "scale",
Name: "eip_startup_latency_total",
Help: fmt.Sprintf("time it takes in seconds for a connection to have a source IP of EgressIP at startup"+
" with polling interval of %d seconds", delayBetweenStartReq),
" with polling interval of %d seconds", delayBetweenReq),
})
var eipRecoveryLatency = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "scale",
Name: "eip_recovery_latency",
Help: fmt.Sprintf("time it takes in seconds for an Egress IP connection to recover from failure"+
" with polling interval of %d seconds", delayBetweenStartReq),
" with polling interval of %d seconds", delayBetweenReq),
})

var eipTick = prometheus.NewGauge(prometheus.GaugeOpts{
Expand Down