Skip to content

Commit

Permalink
Simplify Updating ingress IP address; Do only if new IP (#625)
Browse files Browse the repository at this point in the history
  • Loading branch information
draychev authored and akshaysngupta committed Oct 24, 2019
1 parent 8b64060 commit da1d383
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 114 deletions.
51 changes: 38 additions & 13 deletions pkg/azure/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type azClient struct {
subscriptionID SubscriptionID
resourceGroupName ResourceGroup
appGwName ResourceName
memoizedIPs map[string]n.PublicIPAddress

ctx context.Context
}
Expand All @@ -55,30 +56,44 @@ func NewAzClient(subscriptionID SubscriptionID, resourceGroupName ResourceGroup,
subnetsClient: n.NewSubnetsClient(string(subscriptionID)),
groupsClient: r.NewGroupsClient(string(subscriptionID)),
deploymentsClient: r.NewDeploymentsClient(string(subscriptionID)),
subscriptionID: subscriptionID,
resourceGroupName: resourceGroupName,
appGwName: appGwName,

subscriptionID: subscriptionID,
resourceGroupName: resourceGroupName,
appGwName: appGwName,
memoizedIPs: make(map[string]n.PublicIPAddress),

ctx: context.Background(),
authorizer: authorizer,
}

az.appGatewaysClient.AddToUserAgent(userAgent)
if err := az.appGatewaysClient.AddToUserAgent(userAgent); err != nil {
glog.Error("Error adding User Agent to App Gateway client: ", userAgent)
}
az.appGatewaysClient.Authorizer = az.authorizer

az.publicIPsClient.AddToUserAgent(userAgent)
if err := az.publicIPsClient.AddToUserAgent(userAgent); err != nil {
glog.Error("Error adding User Agent to Public IP client: ", userAgent)
}
az.publicIPsClient.Authorizer = az.authorizer
az.virtualNetworksClient.AddToUserAgent(userAgent)

if err := az.virtualNetworksClient.AddToUserAgent(userAgent); err != nil {
glog.Error("Error adding User Agent to Virtual Networks client: ", userAgent)
}
az.virtualNetworksClient.Authorizer = az.authorizer
az.subnetsClient.AddToUserAgent(userAgent)

if err := az.subnetsClient.AddToUserAgent(userAgent); err != nil {
glog.Error("Error adding User Agent to Subnets client: ", userAgent)
}
az.subnetsClient.Authorizer = az.authorizer
az.groupsClient.AddToUserAgent(userAgent)
az.groupsClient.Authorizer = az.authorizer

az.publicIPsClient.AddToUserAgent(userAgent)
az.publicIPsClient.Authorizer = az.authorizer
if err := az.groupsClient.AddToUserAgent(userAgent); err != nil {
glog.Error("Error adding User Agent to Groups client: ", userAgent)
}
az.groupsClient.Authorizer = az.authorizer

az.deploymentsClient.AddToUserAgent(userAgent)
if err := az.deploymentsClient.AddToUserAgent(userAgent); err != nil {
glog.Error("Error adding User Agent to Deployments client: ", userAgent)
}
az.deploymentsClient.Authorizer = az.authorizer

return az
Expand All @@ -100,8 +115,18 @@ func (az *azClient) UpdateGateway(appGwObj *n.ApplicationGateway) (err error) {
}

func (az *azClient) GetPublicIP(resourceID string) (n.PublicIPAddress, error) {
if ip, ok := az.memoizedIPs[resourceID]; ok {
return ip, nil
}

_, resourceGroupName, publicIPName := ParseResourceID(resourceID)
return az.publicIPsClient.Get(az.ctx, string(resourceGroupName), string(publicIPName), "")

ip, err := az.publicIPsClient.Get(az.ctx, string(resourceGroupName), string(publicIPName), "")
if err != nil {
return n.PublicIPAddress{}, err
}
az.memoizedIPs[resourceID] = ip
return ip, nil
}

// DeployGateway is a method that deploy the appgw and related resources
Expand Down
79 changes: 48 additions & 31 deletions pkg/controller/mutate_aks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,64 +13,81 @@ import (

"github.com/Azure/application-gateway-kubernetes-ingress/pkg/annotations"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/appgw"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/azure"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/events"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/k8scontext"
)

type ipResource string
type ipAddress string

// MutateAKS applies changes to Kubernetes resources.
func (c AppGwIngressController) MutateAKS(event events.Event) error {
func (c AppGwIngressController) MutateAKS() error {
appGw, cbCtx, err := c.getAppGw()
if err != nil {
return err
}

ips := getIPsFromAppGateway(appGw, c.azClient)

// update all relevant ingresses with IP address obtained from existing App Gateway configuration
cbCtx.IngressList = c.PruneIngress(appGw, cbCtx)
for _, ingress := range cbCtx.IngressList {
c.updateIngressStatus(appGw, cbCtx, ingress, ips)
}
return nil
}

func (c AppGwIngressController) updateIngressStatus(appGw *n.ApplicationGateway, cbCtx *appgw.ConfigBuilderContext, event events.Event) {
ingress, ok := event.Value.(*v1beta1.Ingress)
if !ok {
return
}
func (c AppGwIngressController) updateIngressStatus(appGw *n.ApplicationGateway, cbCtx *appgw.ConfigBuilderContext, ingress *v1beta1.Ingress, ips map[ipResource]ipAddress) {

// check if this ingress is for AGIC or not, it might have been updated
if !k8scontext.IsIngressApplicationGateway(ingress) || !cbCtx.InIngressList(ingress) {
if err := c.k8sContext.UpdateIngressStatus(*ingress, ""); err != nil {
c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error())
}
// determine what ipAddress to attach
usePrivateIP, _ := annotations.UsePrivateIP(ingress)
usePrivateIP = usePrivateIP || cbCtx.EnvVariables.UsePrivateIP == "true"

ipConf := appgw.LookupIPConfigurationByType(appGw.FrontendIPConfigurations, usePrivateIP)
if ipConf == nil {
glog.V(9).Info("[mutate_aks] No IP config for App Gwy: ", appGw.Name)
return
}

// determine what ip to attach
usePrivateIP, _ := annotations.UsePrivateIP(ingress)
usePrivateIP = usePrivateIP || cbCtx.EnvVariables.UsePrivateIP == "true"
if ipConf := appgw.LookupIPConfigurationByType(appGw.FrontendIPConfigurations, usePrivateIP); ipConf != nil {
if ipAddress, ok := c.ipAddressMap[*ipConf.ID]; ok {
if err := c.k8sContext.UpdateIngressStatus(*ingress, ipAddress); err != nil {
c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error())
}
glog.V(5).Infof("[mutate_aks] Resolving IP for ID (%s)", *ipConf.ID)
if newIP, found := ips[ipResource(*ipConf.ID)]; found {
if err := c.k8sContext.UpdateIngressStatus(*ingress, k8scontext.IPAddress(newIP)); err != nil {
c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error())
glog.Errorf("[mutate_aks] Error updating ingress %s/%s IP to %+v", ingress.Namespace, ingress.Name, newIP)
return
}
glog.V(5).Infof("[mutate_aks] Updated Ingress %s/%s IP to %+v", ingress.Namespace, ingress.Name, newIP)
}
}

func (c AppGwIngressController) updateIPAddressMap(appGw *n.ApplicationGateway) {
func getIPsFromAppGateway(appGw *n.ApplicationGateway, azClient azure.AzClient) map[ipResource]ipAddress {
ips := make(map[ipResource]ipAddress)
for _, ipConf := range *appGw.FrontendIPConfigurations {
if _, ok := c.ipAddressMap[*ipConf.ID]; ok {
return
ipID := ipResource(*ipConf.ID)
if _, ok := ips[ipID]; ok {
continue
}

if ipConf.PrivateIPAddress != nil {
c.ipAddressMap[*ipConf.ID] = k8scontext.IPAddress(*ipConf.PrivateIPAddress)
} else if ipAddress := c.getPublicIPAddress(*ipConf.PublicIPAddress.ID); ipAddress != nil {
c.ipAddressMap[*ipConf.ID] = *ipAddress
ips[ipID] = ipAddress(*ipConf.PrivateIPAddress)
} else if ipAddress := getPublicIPAddress(*ipConf.PublicIPAddress.ID, azClient); ipAddress != nil {
ips[ipID] = *ipAddress
}
}
glog.V(5).Infof("[mutate_aks] Found IPs: %+v", ips)
return ips
}

// getPublicIPAddress gets the ip address associated to public ip on Azure
func (c AppGwIngressController) getPublicIPAddress(publicIPID string) *k8scontext.IPAddress {
// get public ip
publicIP, err := c.azClient.GetPublicIP(publicIPID)
// getPublicIPAddress gets the ipAddress address associated to public ipAddress on Azure
func getPublicIPAddress(publicIPID string, azClient azure.AzClient) *ipAddress {
// get public ipAddress
publicIP, err := azClient.GetPublicIP(publicIPID)
if err != nil {
glog.Errorf("Unable to get Public IP Address %s. Error %s", publicIPID, err)
glog.Errorf("[mutate_aks] Unable to get Public IP Address %s. Error %s", publicIPID, err)
return nil
}

ipAddress := k8scontext.IPAddress(*publicIP.IPAddress)
ipAddress := ipAddress(*publicIP.IPAddress)
return &ipAddress
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/appgw"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/crd_client/agic_crd_client/clientset/versioned/fake"
istio_fake "github.com/Azure/application-gateway-kubernetes-ingress/pkg/crd_client/istio_crd_client/clientset/versioned/fake"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/events"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/k8scontext"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/metricstore"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/tests"
Expand All @@ -43,7 +42,9 @@ var _ = Describe("process function tests", func() {
},
}
publicIP := k8scontext.IPAddress("xxxx")
privateIP := k8scontext.IPAddress("xxxx")
privateIP := k8scontext.IPAddress("yyyy")
var ips map[ipResource]ipAddress

BeforeEach(func() {
stopChannel = make(chan struct{})

Expand Down Expand Up @@ -82,16 +83,17 @@ var _ = Describe("process function tests", func() {
DefaultAddressPoolID: to.StringPtr("xx"),
DefaultHTTPSettingsID: to.StringPtr("yy"),
}

ips = map[ipResource]ipAddress{"PublicIP": "xxxx", "PrivateIP": "yyyy"}
})

AfterEach(func() {
close(stopChannel)
})

Context("test updateIngressStatus", func() {
It("ensure that updateIngressStatus adds ip to ingress", func() {
controller.updateIngressStatus(&appGw, cbCtx, events.Event{
Type: events.Update,
Value: ingress,
})
It("ensure that updateIngressStatus adds ipAddress to ingress", func() {
controller.updateIngressStatus(&appGw, cbCtx, ingress, ips)
updatedIngress, _ := k8sClient.ExtensionsV1beta1().Ingresses(ingress.Namespace).Get(ingress.Name, metav1.GetOptions{})
Expect(updatedIngress.Status.LoadBalancer.Ingress).Should(ContainElement(v1.LoadBalancerIngress{
Hostname: "",
Expand All @@ -100,28 +102,12 @@ var _ = Describe("process function tests", func() {
Expect(len(updatedIngress.Status.LoadBalancer.Ingress)).To(Equal(1))
})

It("ensure that updateIngressStatus removes ip to ingress not for AGIC", func() {
ingress.Annotations[annotations.IngressClassKey] = "otheric"
updatedIngress, _ := k8sClient.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress)
controller.k8sContext.UpdateIngressStatus(*ingress, k8scontext.IPAddress(publicIP))
controller.updateIngressStatus(&appGw, cbCtx, events.Event{
Type: events.Update,
Value: ingress,
})
updatedIngress, _ = k8sClient.ExtensionsV1beta1().Ingresses(ingress.Namespace).Get(ingress.Name, metav1.GetOptions{})
Expect(annotations.IsApplicationGatewayIngress(updatedIngress)).To(BeFalse())
Expect(len(updatedIngress.Status.LoadBalancer.Ingress)).To(Equal(0))
})

It("ensure that updateIngressStatus adds private ip when annotation is present", func() {
It("ensure that updateIngressStatus adds private ipAddress when annotation is present", func() {
ingress.Annotations[annotations.UsePrivateIPKey] = "true"
updatedIngress, _ := k8sClient.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress)
Expect(annotations.UsePrivateIP(updatedIngress)).To(BeTrue())

controller.updateIngressStatus(&appGw, cbCtx, events.Event{
Type: events.Update,
Value: ingress,
})
controller.updateIngressStatus(&appGw, cbCtx, ingress, ips)

updatedIngress, _ = k8sClient.ExtensionsV1beta1().Ingresses(ingress.Namespace).Get(ingress.Name, metav1.GetOptions{})
Expect(updatedIngress.Status.LoadBalancer.Ingress).Should(ContainElement(v1.LoadBalancerIngress{
Expand Down
43 changes: 22 additions & 21 deletions pkg/controller/mutate_app_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ type realClock struct{}

func (realClock) Now() time.Time { return time.Now() }

// MutateAppGateway applies App Gateway config.
func (c AppGwIngressController) MutateAppGateway(event events.Event) error {
func (c AppGwIngressController) getAppGw() (*n.ApplicationGateway, *appgw.ConfigBuilderContext, error) {
// Get current application gateway config
appGw, err := c.azClient.GetGateway()
c.metricStore.IncArmAPICallCounter()
Expand All @@ -36,14 +35,9 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error {
if c.agicPod != nil {
c.recorder.Event(c.agicPod, v1.EventTypeWarning, events.ReasonUnableToFetchAppGw, errorLine)
}
return ErrFetchingAppGatewayConfig
return nil, nil, ErrFetchingAppGatewayConfig
}

c.updateIPAddressMap(&appGw)

existingConfigJSON, _ := dumpSanitizedJSON(&appGw, false, to.StringPtr("-- Existing App Gwy Config --"))
glog.V(5).Info("Existing App Gateway config: ", string(existingConfigJSON))

cbCtx := &appgw.ConfigBuilderContext{
ServiceList: c.k8sContext.ListServices(),
IngressList: c.k8sContext.ListHTTPIngresses(),
Expand All @@ -53,6 +47,19 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error {
DefaultHTTPSettingsID: to.StringPtr(c.appGwIdentifier.HTTPSettingsID(appgw.DefaultBackendHTTPSettingsName)),
}

return &appGw, cbCtx, nil
}

// MutateAppGateway applies App Gateway config.
func (c AppGwIngressController) MutateAppGateway() error {
appGw, cbCtx, err := c.getAppGw()
if err != nil {
return err
}

existingConfigJSON, _ := dumpSanitizedJSON(appGw, false, to.StringPtr("-- Existing App Gwy Config --"))
glog.V(5).Info("Existing App Gateway config: ", string(existingConfigJSON))

if cbCtx.EnvVariables.EnableBrownfieldDeployment {
prohibitedTargets := c.k8sContext.ListAzureProhibitedTargets()
if len(prohibitedTargets) > 0 {
Expand Down Expand Up @@ -81,7 +88,7 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error {
}
}

cbCtx.IngressList = c.PruneIngress(&appGw, cbCtx)
cbCtx.IngressList = c.PruneIngress(appGw, cbCtx)

if cbCtx.EnvVariables.EnableIstioIntegration {
var gatewaysInfo []string
Expand All @@ -102,7 +109,7 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error {
}

// Create a configbuilder based on current appgw config
configBuilder := appgw.NewConfigBuilder(c.k8sContext, &c.appGwIdentifier, &appGw, c.recorder, realClock{})
configBuilder := appgw.NewConfigBuilder(c.k8sContext, &c.appGwIdentifier, appGw, c.recorder, realClock{})

// Run validations on the Kubernetes resources which can suggest misconfiguration.
if err = configBuilder.PreBuildValidate(cbCtx); err != nil {
Expand Down Expand Up @@ -133,10 +140,7 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error {
}
}

if c.configIsSame(&appGw) {
// update ingresses with appgw gateway ip address
c.updateIngressStatus(generatedAppGw, cbCtx, event)

if c.configIsSame(appGw) {
glog.V(3).Info("cache: Config has NOT changed! No need to connect to ARM.")
return nil
}
Expand All @@ -150,12 +154,12 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error {
if err != nil {
// Reset cache
c.configCache = nil
configJSON, _ := dumpSanitizedJSON(&appGw, cbCtx.EnvVariables.EnableSaveConfigToFile, nil)
configJSON, _ := dumpSanitizedJSON(appGw, cbCtx.EnvVariables.EnableSaveConfigToFile, nil)
glogIt := glog.Errorf
if cbCtx.EnvVariables.EnablePanicOnPutError {
glogIt = glog.Fatalf
}
errorLine := fmt.Sprintf("Failed applying App Gwy configuration: %s -- %s", err, string(configJSON))
errorLine := fmt.Sprintf("Failed applying App Gwy configuration:\n%s\n\nerror: %s", string(configJSON), err)
glogIt(errorLine)
if c.agicPod != nil {
c.recorder.Event(c.agicPod, v1.EventTypeWarning, events.ReasonFailedApplyingAppGwConfig, errorLine)
Expand All @@ -164,7 +168,7 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error {
return err
}
// Wait until deployment finshes and save the error message
configJSON, _ := dumpSanitizedJSON(&appGw, cbCtx.EnvVariables.EnableSaveConfigToFile, nil)
configJSON, _ := dumpSanitizedJSON(appGw, cbCtx.EnvVariables.EnableSaveConfigToFile, nil)
glog.V(5).Info(string(configJSON))

// We keep this at log level 1 to show some heartbeat in the logs. Without this it is way too quiet.
Expand All @@ -186,10 +190,7 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error {
}

glog.V(3).Info("cache: Updated with latest applied config.")
c.updateCache(&appGw)

// update ingresses with appgw gateway ip address
c.updateIngressStatus(generatedAppGw, cbCtx, event)
c.updateCache(appGw)

c.metricStore.IncArmAPIUpdateCallSuccessCounter()

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ var _ = Describe("prune function tests", func() {
}
appGw := fixtures.GetAppGateway()

It("removes the ingress using private ip and keeps others", func() {
It("removes the ingress using private ipAddress and keeps others", func() {
Expect(len(cbCtx.IngressList)).To(Equal(2))
prunedIngresses := pruneNoPrivateIP(controller, &appGw, cbCtx, cbCtx.IngressList)
Expect(len(prunedIngresses)).To(Equal(1))
Expect(prunedIngresses[0].Annotations[annotations.UsePrivateIPKey]).To(Equal(""))
})

It("keeps the ingress using private ip when public ip is present", func() {
It("keeps the ingress using private ipAddress when public ipAddress is present", func() {
appGw.FrontendIPConfigurations = &[]n.ApplicationGatewayFrontendIPConfiguration{
fixtures.GetPublicIPConfiguration(),
fixtures.GetPrivateIPConfiguration(),
Expand Down
Loading

0 comments on commit da1d383

Please sign in to comment.