From 8b64060721e6550c7fc1ecc0d3491917aca53741 Mon Sep 17 00:00:00 2001 From: Delyan Raychev Date: Wed, 23 Oct 2019 15:01:14 -0700 Subject: [PATCH] Decouple mutation of App Gateway and AKS settings (#623) --- pkg/appgw/frontend_listeners.go | 2 +- pkg/appgw/frontend_listeners_test.go | 2 +- pkg/appgw/ingress_rules_test.go | 2 +- pkg/controller/mutate_aks.go | 76 +++++++++++++++++++ .../{process.go => mutate_app_gateway.go} | 73 ++---------------- ...ess_test.go => mutate_app_gateway_test.go} | 0 pkg/worker/fake.go | 9 ++- pkg/worker/types.go | 5 +- pkg/worker/worker.go | 6 +- 9 files changed, 101 insertions(+), 74 deletions(-) create mode 100644 pkg/controller/mutate_aks.go rename pkg/controller/{process.go => mutate_app_gateway.go} (75%) rename pkg/controller/{process_test.go => mutate_app_gateway_test.go} (100%) diff --git a/pkg/appgw/frontend_listeners.go b/pkg/appgw/frontend_listeners.go index 819b294fa..38b3e7824 100644 --- a/pkg/appgw/frontend_listeners.go +++ b/pkg/appgw/frontend_listeners.go @@ -101,7 +101,7 @@ func (c *appGwConfigBuilder) getListeners(cbCtx *ConfigBuilderContext) (*[]n.App sort.Sort(sorter.ByListenerName(listeners)) sort.Sort(sorter.ByFrontendPortName(ports)) - // Since getListeners() would be called multiple times within the life cycle of a Process(Event) + // Since getListeners() would be called multiple times within the life cycle of a MutateAppGateway(Event) // we cache the results of this function in what would be final place to store the Listeners. c.mem.listeners = &listeners c.mem.ports = &ports diff --git a/pkg/appgw/frontend_listeners_test.go b/pkg/appgw/frontend_listeners_test.go index 5d4ed29c6..7f2520ce3 100644 --- a/pkg/appgw/frontend_listeners_test.go +++ b/pkg/appgw/frontend_listeners_test.go @@ -22,7 +22,7 @@ import ( // appgw_suite_test.go launches these Ginkgo tests -var _ = Describe("Process ingress rules and parse frontend listener configs", func() { +var _ = Describe("MutateAppGateway ingress rules and parse frontend listener configs", func() { var envVariables environment.EnvVariables var listenerID80 listenerIdentifier diff --git a/pkg/appgw/ingress_rules_test.go b/pkg/appgw/ingress_rules_test.go index d2c20df4f..5a147be3d 100644 --- a/pkg/appgw/ingress_rules_test.go +++ b/pkg/appgw/ingress_rules_test.go @@ -12,7 +12,7 @@ import ( // appgw_suite_test.go launches these Ginkgo tests -var _ = Describe("Process ingress rules, listeners, and ports", func() { +var _ = Describe("MutateAppGateway ingress rules, listeners, and ports", func() { port80 := Port(80) port443 := Port(443) diff --git a/pkg/controller/mutate_aks.go b/pkg/controller/mutate_aks.go new file mode 100644 index 000000000..36c82facf --- /dev/null +++ b/pkg/controller/mutate_aks.go @@ -0,0 +1,76 @@ +// ------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +// -------------------------------------------------------------------------------------------- + +package controller + +import ( + n "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network" + "github.com/golang/glog" + v1 "k8s.io/api/core/v1" + "k8s.io/api/extensions/v1beta1" + + "github.com/Azure/application-gateway-kubernetes-ingress/pkg/annotations" + "github.com/Azure/application-gateway-kubernetes-ingress/pkg/appgw" + "github.com/Azure/application-gateway-kubernetes-ingress/pkg/events" + "github.com/Azure/application-gateway-kubernetes-ingress/pkg/k8scontext" +) + +// MutateAKS applies changes to Kubernetes resources. +func (c AppGwIngressController) MutateAKS(event events.Event) error { + return nil +} + +func (c AppGwIngressController) updateIngressStatus(appGw *n.ApplicationGateway, cbCtx *appgw.ConfigBuilderContext, event events.Event) { + ingress, ok := event.Value.(*v1beta1.Ingress) + if !ok { + return + } + + // check if this ingress is for AGIC or not, it might have been updated + if !k8scontext.IsIngressApplicationGateway(ingress) || !cbCtx.InIngressList(ingress) { + if err := c.k8sContext.UpdateIngressStatus(*ingress, ""); err != nil { + c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error()) + } + return + } + + // determine what ip to attach + usePrivateIP, _ := annotations.UsePrivateIP(ingress) + usePrivateIP = usePrivateIP || cbCtx.EnvVariables.UsePrivateIP == "true" + if ipConf := appgw.LookupIPConfigurationByType(appGw.FrontendIPConfigurations, usePrivateIP); ipConf != nil { + if ipAddress, ok := c.ipAddressMap[*ipConf.ID]; ok { + if err := c.k8sContext.UpdateIngressStatus(*ingress, ipAddress); err != nil { + c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error()) + } + } + } +} + +func (c AppGwIngressController) updateIPAddressMap(appGw *n.ApplicationGateway) { + for _, ipConf := range *appGw.FrontendIPConfigurations { + if _, ok := c.ipAddressMap[*ipConf.ID]; ok { + return + } + + if ipConf.PrivateIPAddress != nil { + c.ipAddressMap[*ipConf.ID] = k8scontext.IPAddress(*ipConf.PrivateIPAddress) + } else if ipAddress := c.getPublicIPAddress(*ipConf.PublicIPAddress.ID); ipAddress != nil { + c.ipAddressMap[*ipConf.ID] = *ipAddress + } + } +} + +// getPublicIPAddress gets the ip address associated to public ip on Azure +func (c AppGwIngressController) getPublicIPAddress(publicIPID string) *k8scontext.IPAddress { + // get public ip + publicIP, err := c.azClient.GetPublicIP(publicIPID) + if err != nil { + glog.Errorf("Unable to get Public IP Address %s. Error %s", publicIPID, err) + return nil + } + + ipAddress := k8scontext.IPAddress(*publicIP.IPAddress) + return &ipAddress +} diff --git a/pkg/controller/process.go b/pkg/controller/mutate_app_gateway.go similarity index 75% rename from pkg/controller/process.go rename to pkg/controller/mutate_app_gateway.go index 1b23e476c..f7f2ef575 100644 --- a/pkg/controller/process.go +++ b/pkg/controller/mutate_app_gateway.go @@ -11,28 +11,22 @@ import ( "strings" "time" - n "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network" - "github.com/Azure/go-autorest/autorest/to" - "github.com/golang/glog" - v1 "k8s.io/api/core/v1" - "k8s.io/api/extensions/v1beta1" - - "github.com/Azure/application-gateway-kubernetes-ingress/pkg/annotations" "github.com/Azure/application-gateway-kubernetes-ingress/pkg/appgw" "github.com/Azure/application-gateway-kubernetes-ingress/pkg/brownfield" "github.com/Azure/application-gateway-kubernetes-ingress/pkg/environment" "github.com/Azure/application-gateway-kubernetes-ingress/pkg/events" - "github.com/Azure/application-gateway-kubernetes-ingress/pkg/k8scontext" + n "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network" + "github.com/Azure/go-autorest/autorest/to" + "github.com/golang/glog" + v1 "k8s.io/api/core/v1" ) type realClock struct{} -func (realClock) Now() time.Time { return time.Now() } -func (realClock) After(d time.Duration) <-chan time.Time { return time.After(d) } +func (realClock) Now() time.Time { return time.Now() } -// Process is the callback function that will be executed for every event -// in the EventQueue. -func (c AppGwIngressController) Process(event events.Event) error { +// MutateAppGateway applies App Gateway config. +func (c AppGwIngressController) MutateAppGateway(event events.Event) error { // Get current application gateway config appGw, err := c.azClient.GetGateway() c.metricStore.IncArmAPICallCounter() @@ -201,56 +195,3 @@ func (c AppGwIngressController) Process(event events.Event) error { return nil } - -func (c AppGwIngressController) updateIngressStatus(appGw *n.ApplicationGateway, cbCtx *appgw.ConfigBuilderContext, event events.Event) { - ingress, ok := event.Value.(*v1beta1.Ingress) - if !ok { - return - } - - // check if this ingress is for AGIC or not, it might have been updated - if !k8scontext.IsIngressApplicationGateway(ingress) || !cbCtx.InIngressList(ingress) { - if err := c.k8sContext.UpdateIngressStatus(*ingress, ""); err != nil { - c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error()) - } - return - } - - // determine what ip to attach - usePrivateIP, _ := annotations.UsePrivateIP(ingress) - usePrivateIP = usePrivateIP || cbCtx.EnvVariables.UsePrivateIP == "true" - if ipConf := appgw.LookupIPConfigurationByType(appGw.FrontendIPConfigurations, usePrivateIP); ipConf != nil { - if ipAddress, ok := c.ipAddressMap[*ipConf.ID]; ok { - if err := c.k8sContext.UpdateIngressStatus(*ingress, ipAddress); err != nil { - c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error()) - } - } - } -} - -func (c AppGwIngressController) updateIPAddressMap(appGw *n.ApplicationGateway) { - for _, ipConf := range *appGw.FrontendIPConfigurations { - if _, ok := c.ipAddressMap[*ipConf.ID]; ok { - return - } - - if ipConf.PrivateIPAddress != nil { - c.ipAddressMap[*ipConf.ID] = k8scontext.IPAddress(*ipConf.PrivateIPAddress) - } else if ipAddress := c.getPublicIPAddress(*ipConf.PublicIPAddress.ID); ipAddress != nil { - c.ipAddressMap[*ipConf.ID] = *ipAddress - } - } -} - -// getPublicIPAddress gets the ip address associated to public ip on Azure -func (c AppGwIngressController) getPublicIPAddress(publicIPID string) *k8scontext.IPAddress { - // get public ip - publicIP, err := c.azClient.GetPublicIP(publicIPID) - if err != nil { - glog.Errorf("Unable to get Public IP Address %s. Error %s", publicIPID, err) - return nil - } - - ipAddress := k8scontext.IPAddress(*publicIP.IPAddress) - return &ipAddress -} diff --git a/pkg/controller/process_test.go b/pkg/controller/mutate_app_gateway_test.go similarity index 100% rename from pkg/controller/process_test.go rename to pkg/controller/mutate_app_gateway_test.go diff --git a/pkg/worker/fake.go b/pkg/worker/fake.go index 7328d2d80..fe45bce73 100644 --- a/pkg/worker/fake.go +++ b/pkg/worker/fake.go @@ -14,8 +14,13 @@ type FakeProcessor struct { processFunc func(events.Event) error } -// Process will call the callback provided -func (fp FakeProcessor) Process(event events.Event) error { +// MutateAppGateway will call the callback provided +func (fp FakeProcessor) MutateAppGateway(event events.Event) error { + return fp.processFunc(event) +} + +// MutateAKS will call the callback provided +func (fp FakeProcessor) MutateAKS(event events.Event) error { return fp.processFunc(event) } diff --git a/pkg/worker/types.go b/pkg/worker/types.go index 0da8787f0..0315a5ca9 100644 --- a/pkg/worker/types.go +++ b/pkg/worker/types.go @@ -11,11 +11,12 @@ import ( // EventProcessor provides a mechanism to act on events in the internal queue. type EventProcessor interface { - Process(events.Event) error + MutateAppGateway(events.Event) error + MutateAKS(events.Event) error ShouldProcess(events.Event) (bool, *string) } -// Worker listens on the eventChannel and runs the EventProcessor.Process +// Worker listens to the eventChannel and runs the EventProcessor.MutateAppGateway and MutateAKS // for each event. type Worker struct { EventProcessor diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 7c24dd12f..d9782aa5b 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -43,9 +43,13 @@ func (w *Worker) Run(work chan events.Event, stopChannel chan struct{}) { continue } + if err := w.MutateAKS(event); err != nil { + + } + lastEvent := drainChan(work, event) - if err := w.Process(lastEvent); err != nil { + if err := w.MutateAppGateway(lastEvent); err != nil { glog.Error("Processing event failed:", err) time.Sleep(sleepOnErrorSeconds * time.Second) }