Skip to content

Commit

Permalink
Decouple mutation of App Gateway and AKS settings (#623)
Browse files Browse the repository at this point in the history
  • Loading branch information
draychev authored and akshaysngupta committed Oct 23, 2019
1 parent de9d025 commit 8b64060
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 74 deletions.
2 changes: 1 addition & 1 deletion pkg/appgw/frontend_listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (c *appGwConfigBuilder) getListeners(cbCtx *ConfigBuilderContext) (*[]n.App
sort.Sort(sorter.ByListenerName(listeners))
sort.Sort(sorter.ByFrontendPortName(ports))

// Since getListeners() would be called multiple times within the life cycle of a Process(Event)
// Since getListeners() would be called multiple times within the life cycle of a MutateAppGateway(Event)
// we cache the results of this function in what would be final place to store the Listeners.
c.mem.listeners = &listeners
c.mem.ports = &ports
Expand Down
2 changes: 1 addition & 1 deletion pkg/appgw/frontend_listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

// appgw_suite_test.go launches these Ginkgo tests

var _ = Describe("Process ingress rules and parse frontend listener configs", func() {
var _ = Describe("MutateAppGateway ingress rules and parse frontend listener configs", func() {

var envVariables environment.EnvVariables
var listenerID80 listenerIdentifier
Expand Down
2 changes: 1 addition & 1 deletion pkg/appgw/ingress_rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// appgw_suite_test.go launches these Ginkgo tests

var _ = Describe("Process ingress rules, listeners, and ports", func() {
var _ = Describe("MutateAppGateway ingress rules, listeners, and ports", func() {
port80 := Port(80)
port443 := Port(443)

Expand Down
76 changes: 76 additions & 0 deletions pkg/controller/mutate_aks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// -------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
// --------------------------------------------------------------------------------------------

package controller

import (
n "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
"github.com/golang/glog"
v1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"

"github.com/Azure/application-gateway-kubernetes-ingress/pkg/annotations"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/appgw"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/events"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/k8scontext"
)

// MutateAKS applies changes to Kubernetes resources.
func (c AppGwIngressController) MutateAKS(event events.Event) error {
return nil
}

func (c AppGwIngressController) updateIngressStatus(appGw *n.ApplicationGateway, cbCtx *appgw.ConfigBuilderContext, event events.Event) {
ingress, ok := event.Value.(*v1beta1.Ingress)
if !ok {
return
}

// check if this ingress is for AGIC or not, it might have been updated
if !k8scontext.IsIngressApplicationGateway(ingress) || !cbCtx.InIngressList(ingress) {
if err := c.k8sContext.UpdateIngressStatus(*ingress, ""); err != nil {
c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error())
}
return
}

// determine what ip to attach
usePrivateIP, _ := annotations.UsePrivateIP(ingress)
usePrivateIP = usePrivateIP || cbCtx.EnvVariables.UsePrivateIP == "true"
if ipConf := appgw.LookupIPConfigurationByType(appGw.FrontendIPConfigurations, usePrivateIP); ipConf != nil {
if ipAddress, ok := c.ipAddressMap[*ipConf.ID]; ok {
if err := c.k8sContext.UpdateIngressStatus(*ingress, ipAddress); err != nil {
c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error())
}
}
}
}

func (c AppGwIngressController) updateIPAddressMap(appGw *n.ApplicationGateway) {
for _, ipConf := range *appGw.FrontendIPConfigurations {
if _, ok := c.ipAddressMap[*ipConf.ID]; ok {
return
}

if ipConf.PrivateIPAddress != nil {
c.ipAddressMap[*ipConf.ID] = k8scontext.IPAddress(*ipConf.PrivateIPAddress)
} else if ipAddress := c.getPublicIPAddress(*ipConf.PublicIPAddress.ID); ipAddress != nil {
c.ipAddressMap[*ipConf.ID] = *ipAddress
}
}
}

// getPublicIPAddress gets the ip address associated to public ip on Azure
func (c AppGwIngressController) getPublicIPAddress(publicIPID string) *k8scontext.IPAddress {
// get public ip
publicIP, err := c.azClient.GetPublicIP(publicIPID)
if err != nil {
glog.Errorf("Unable to get Public IP Address %s. Error %s", publicIPID, err)
return nil
}

ipAddress := k8scontext.IPAddress(*publicIP.IPAddress)
return &ipAddress
}
73 changes: 7 additions & 66 deletions pkg/controller/process.go → pkg/controller/mutate_app_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,22 @@ import (
"strings"
"time"

n "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
"github.com/Azure/go-autorest/autorest/to"
"github.com/golang/glog"
v1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"

"github.com/Azure/application-gateway-kubernetes-ingress/pkg/annotations"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/appgw"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/brownfield"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/environment"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/events"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/k8scontext"
n "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
"github.com/Azure/go-autorest/autorest/to"
"github.com/golang/glog"
v1 "k8s.io/api/core/v1"
)

type realClock struct{}

func (realClock) Now() time.Time { return time.Now() }
func (realClock) After(d time.Duration) <-chan time.Time { return time.After(d) }
func (realClock) Now() time.Time { return time.Now() }

// Process is the callback function that will be executed for every event
// in the EventQueue.
func (c AppGwIngressController) Process(event events.Event) error {
// MutateAppGateway applies App Gateway config.
func (c AppGwIngressController) MutateAppGateway(event events.Event) error {
// Get current application gateway config
appGw, err := c.azClient.GetGateway()
c.metricStore.IncArmAPICallCounter()
Expand Down Expand Up @@ -201,56 +195,3 @@ func (c AppGwIngressController) Process(event events.Event) error {

return nil
}

func (c AppGwIngressController) updateIngressStatus(appGw *n.ApplicationGateway, cbCtx *appgw.ConfigBuilderContext, event events.Event) {
ingress, ok := event.Value.(*v1beta1.Ingress)
if !ok {
return
}

// check if this ingress is for AGIC or not, it might have been updated
if !k8scontext.IsIngressApplicationGateway(ingress) || !cbCtx.InIngressList(ingress) {
if err := c.k8sContext.UpdateIngressStatus(*ingress, ""); err != nil {
c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error())
}
return
}

// determine what ip to attach
usePrivateIP, _ := annotations.UsePrivateIP(ingress)
usePrivateIP = usePrivateIP || cbCtx.EnvVariables.UsePrivateIP == "true"
if ipConf := appgw.LookupIPConfigurationByType(appGw.FrontendIPConfigurations, usePrivateIP); ipConf != nil {
if ipAddress, ok := c.ipAddressMap[*ipConf.ID]; ok {
if err := c.k8sContext.UpdateIngressStatus(*ingress, ipAddress); err != nil {
c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error())
}
}
}
}

func (c AppGwIngressController) updateIPAddressMap(appGw *n.ApplicationGateway) {
for _, ipConf := range *appGw.FrontendIPConfigurations {
if _, ok := c.ipAddressMap[*ipConf.ID]; ok {
return
}

if ipConf.PrivateIPAddress != nil {
c.ipAddressMap[*ipConf.ID] = k8scontext.IPAddress(*ipConf.PrivateIPAddress)
} else if ipAddress := c.getPublicIPAddress(*ipConf.PublicIPAddress.ID); ipAddress != nil {
c.ipAddressMap[*ipConf.ID] = *ipAddress
}
}
}

// getPublicIPAddress gets the ip address associated to public ip on Azure
func (c AppGwIngressController) getPublicIPAddress(publicIPID string) *k8scontext.IPAddress {
// get public ip
publicIP, err := c.azClient.GetPublicIP(publicIPID)
if err != nil {
glog.Errorf("Unable to get Public IP Address %s. Error %s", publicIPID, err)
return nil
}

ipAddress := k8scontext.IPAddress(*publicIP.IPAddress)
return &ipAddress
}
File renamed without changes.
9 changes: 7 additions & 2 deletions pkg/worker/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ type FakeProcessor struct {
processFunc func(events.Event) error
}

// Process will call the callback provided
func (fp FakeProcessor) Process(event events.Event) error {
// MutateAppGateway will call the callback provided
func (fp FakeProcessor) MutateAppGateway(event events.Event) error {
return fp.processFunc(event)
}

// MutateAKS will call the callback provided
func (fp FakeProcessor) MutateAKS(event events.Event) error {
return fp.processFunc(event)
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/worker/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (

// EventProcessor provides a mechanism to act on events in the internal queue.
type EventProcessor interface {
Process(events.Event) error
MutateAppGateway(events.Event) error
MutateAKS(events.Event) error
ShouldProcess(events.Event) (bool, *string)
}

// Worker listens on the eventChannel and runs the EventProcessor.Process
// Worker listens to the eventChannel and runs the EventProcessor.MutateAppGateway and MutateAKS
// for each event.
type Worker struct {
EventProcessor
Expand Down
6 changes: 5 additions & 1 deletion pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ func (w *Worker) Run(work chan events.Event, stopChannel chan struct{}) {
continue
}

if err := w.MutateAKS(event); err != nil {

}

lastEvent := drainChan(work, event)

if err := w.Process(lastEvent); err != nil {
if err := w.MutateAppGateway(lastEvent); err != nil {
glog.Error("Processing event failed:", err)
time.Sleep(sleepOnErrorSeconds * time.Second)
}
Expand Down

0 comments on commit 8b64060

Please sign in to comment.