Skip to content

Commit

Permalink
allow neighbour app to send custom metrics without policy
Browse files Browse the repository at this point in the history
  • Loading branch information
asalan316 committed Sep 27, 2024
1 parent fcfb8af commit f63ba3c
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 56 deletions.
102 changes: 57 additions & 45 deletions src/acceptance/app/custom_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
. "github.com/onsi/gomega"
)

var _ = FDescribe("AutoScaler custom metrics policy", func() {
var _ = Describe("AutoScaler custom metrics policy", func() {
var (
policy string
err error
Expand Down Expand Up @@ -65,60 +65,72 @@ var _ = FDescribe("AutoScaler custom metrics policy", func() {

})
})

Context("when neighbour app send custom metrics for app B via mtls", func() {
FDescribe("Custom metrics policy with neighbour app", func() {
JustBeforeEach(func() {
neighbourAppName = CreateTestApp(cfg, "go-neighbour-app", 1)
neighbourAppGUID, err = GetAppGuid(cfg, neighbourAppName)
Expect(err).NotTo(HaveOccurred())
policy = GenerateBinding("bound_app", 1, 2, "test_metric", 500)
BindServiceToAppWithPolicy(cfg, neighbourAppName, instanceName, policy)
StartApp(neighbourAppName, cfg.CfPushTimeoutDuration())
})
It("should scale out and scale in app B", Label(acceptance.LabelSmokeTests), func() {
By(fmt.Sprintf("Scale out %s to 2 instance", appToScaleName))
scaleOut := sendMetricToAutoscaler(cfg, appToScaleGUID, neighbourAppName, 550, true)
Eventually(scaleOut).
WithTimeout(5 * time.Minute).
WithPolling(15 * time.Second).
Should(Equal(2))

By(fmt.Sprintf("Scale in %s to 1 instance", appToScaleName))
scaleIn := sendMetricToAutoscaler(cfg, appToScaleGUID, neighbourAppName, 100, true)
Eventually(scaleIn).
WithTimeout(5 * time.Minute).
WithPolling(15 * time.Second).
Should(Equal(1))

})
})

Context("when neighbour app send metrics if metrics strategy is not set i.e same_app", func() {
JustBeforeEach(func() {
neighbourAppName = CreateTestApp(cfg, "go-neighbour-app", 1)
neighbourAppGUID, err = GetAppGuid(cfg, neighbourAppName)
err := BindServiceToAppWithPolicy(cfg, neighbourAppName, instanceName, policy)
Expect(err).NotTo(HaveOccurred())
policy = GenerateBinding("", 1, 2, "test_metric", 500)
BindServiceToAppWithPolicy(cfg, neighbourAppName, instanceName, policy)
StartApp(neighbourAppName, cfg.CfPushTimeoutDuration())
})
It("should scale out and scale the neighbour app", Label(acceptance.LabelSmokeTests), func() {
By(fmt.Sprintf("Scale out %s to 2 instance", neighbourAppName))
scaleOut := sendMetricToAutoscaler(cfg, neighbourAppGUID, neighbourAppName, 550, true)
Eventually(scaleOut).
WithTimeout(5 * time.Minute).
WithPolling(15 * time.Second).
Should(Equal(2))

By(fmt.Sprintf("Scale in %s to 1 instance", neighbourAppName))
scaleIn := sendMetricToAutoscaler(cfg, neighbourAppGUID, neighbourAppName, 100, true)
Eventually(scaleIn).
WithTimeout(5 * time.Minute).
WithPolling(15 * time.Second).
Should(Equal(1))
Context("neighbour app send custom metrics for app B via mtls", func() {
BeforeEach(func() {
policy = GenerateBindingsWithScalingPolicy("bound_app", 1, 2, "test_metric", 500, 100)
})
It("should scale out and scale in app B", Label(acceptance.LabelSmokeTests), func() {
By(fmt.Sprintf("Scale out %s to 2 instance", appToScaleName))
scaleOut := sendMetricToAutoscaler(cfg, appToScaleGUID, neighbourAppName, 550, true)
Eventually(scaleOut).
WithTimeout(5 * time.Minute).
WithPolling(15 * time.Second).
Should(Equal(2))

By(fmt.Sprintf("Scale in %s to 1 instance", appToScaleName))
scaleIn := sendMetricToAutoscaler(cfg, appToScaleGUID, neighbourAppName, 100, true)
Eventually(scaleIn).
WithTimeout(5 * time.Minute).
WithPolling(15 * time.Second).
Should(Equal(1))

})
})
Context("neighbour app send metrics if metrics strategy is not set i.e same_app", func() {
BeforeEach(func() {
policy = GenerateBindingsWithScalingPolicy("", 1, 2, "test_metric", 100, 550)
})
When("policy is attached with neighbour app", func() {
It("should scale out and scale the neighbour app", func() {
By(fmt.Sprintf("Scale out %s to 2 instance", neighbourAppName))
scaleOut := sendMetricToAutoscaler(cfg, neighbourAppGUID, neighbourAppName, 550, true)
Eventually(scaleOut).
WithTimeout(5 * time.Minute).
WithPolling(15 * time.Second).
Should(Equal(2))

By(fmt.Sprintf("Scale in %s to 1 instance", neighbourAppName))
scaleIn := sendMetricToAutoscaler(cfg, neighbourAppGUID, neighbourAppName, 90, true)
Eventually(scaleIn).
WithTimeout(5 * time.Minute).
WithPolling(15 * time.Second).
Should(Equal(1))

})
})
When("no policy is attached with neighbour app", func() {
BeforeEach(func() {
policy = ""
})
It("should not scale neighbour app", func() {
sendMetricToAutoscaler(cfg, neighbourAppGUID, neighbourAppName, 550, true)
Expect(RunningInstances(neighbourAppGUID, 5*time.Second)).To(Equal(1))

})
})

})
})

})

func sendMetricToAutoscaler(config *config.Config, appToScaleGUID string, neighbourAppName string, metricThreshold int, mtls bool) func() (int, error) {
Expand Down
18 changes: 11 additions & 7 deletions src/acceptance/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ func ServicePlansUrl(cfg *config.Config, spaceGuid string) string {
return url.String()
}

func GenerateBinding(allowFrom string, instanceMin, instanceMax int, metricName string, threshold int64) string {
func GenerateBindingsWithScalingPolicy(allowFrom string, instanceMin, instanceMax int, metricName string, scaleInThreshold, scaleOutThreshold int64) string {
bindingConfig := BindingConfig{
Configuration: Configuration{CustomMetrics: CustomMetricsConfig{
MetricSubmissionStrategy: MetricsSubmissionStrategy{AllowFrom: allowFrom},
}},
ScalingPolicy: buildScalingPolicy(instanceMin, instanceMax, metricName, threshold),
ScalingPolicy: buildScaleOutScaleInPolicy(instanceMin, instanceMax, metricName, scaleInThreshold, scaleOutThreshold),
}
marshalledBinding, err := MarshalWithoutHTMLEscape(bindingConfig)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -284,6 +284,14 @@ func GenerateDynamicScaleInPolicy(instanceMin, instanceMax int, metricName strin
}

func GenerateDynamicScaleOutAndInPolicy(instanceMin, instanceMax int, metricName string, scaleInWhenBelowThreshold int64, scaleOutWhenGreaterOrEqualThreshold int64) string {
policy := buildScaleOutScaleInPolicy(instanceMin, instanceMax, metricName, scaleInWhenBelowThreshold, scaleOutWhenGreaterOrEqualThreshold)
marshaled, err := MarshalWithoutHTMLEscape(policy)
Expect(err).NotTo(HaveOccurred())

return string(marshaled)
}

func buildScaleOutScaleInPolicy(instanceMin int, instanceMax int, metricName string, scaleInWhenBelowThreshold int64, scaleOutWhenGreaterOrEqualThreshold int64) ScalingPolicy {
scalingOutRule := ScalingRule{
MetricType: metricName,
BreachDurationSeconds: TestBreachDurationSeconds,
Expand All @@ -305,11 +313,7 @@ func GenerateDynamicScaleOutAndInPolicy(instanceMin, instanceMax int, metricName
InstanceMax: instanceMax,
ScalingRules: []*ScalingRule{&scalingOutRule, &scalingInRule},
}

marshaled, err := MarshalWithoutHTMLEscape(policy)
Expect(err).NotTo(HaveOccurred())

return string(marshaled)
return policy
}

// GenerateDynamicScaleInPolicyBetween creates a scaling policy that scales down from 2 instances to 1, if the metric value is in a range of [upper, lower].
Expand Down
30 changes: 28 additions & 2 deletions src/autoscaler/metricsforwarder/server/custom_metrics_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ var (
type CustomMetricsHandler struct {
metricForwarder forwarder.MetricForwarder
policyDB db.PolicyDB
bindingDB db.BindingDB
allowedMetricCache cache.Cache
cacheTTL time.Duration
logger lager.Logger
}

func NewCustomMetricsHandler(logger lager.Logger, metricForwarder forwarder.MetricForwarder, policyDB db.PolicyDB, allowedMetricCache cache.Cache) *CustomMetricsHandler {
func NewCustomMetricsHandler(logger lager.Logger, metricForwarder forwarder.MetricForwarder, policyDB db.PolicyDB, bindingDB db.BindingDB, allowedMetricCache cache.Cache) *CustomMetricsHandler {
return &CustomMetricsHandler{
metricForwarder: metricForwarder,
policyDB: policyDB,
bindingDB: bindingDB,
allowedMetricCache: allowedMetricCache,
logger: logger,
}
Expand Down Expand Up @@ -138,7 +140,17 @@ func (mh *CustomMetricsHandler) validateCustomMetricTypes(appGUID string, metric
// AllowedMetrics found in cache
allowedMetricTypeSet = res.(map[string]struct{})
} else {
// AllowedMetrics not found in cache, find AllowedMetrics from Database
// allow app with strategy as bound_app to submit metrics without policy
isAppWithBoundStrategy, err := mh.isAppWithBoundStrategy(appGUID)
if err != nil {
mh.logger.Error("error-finding-app-submission-strategy", err, lager.Data{"appId": appGUID})
return err
}
if isAppWithBoundStrategy {
mh.logger.Info("app-with-bound-strategy-found", lager.Data{"appId": appGUID})
return nil
}

scalingPolicy, err := mh.policyDB.GetAppPolicy(context.TODO(), appGUID)
if err != nil {
mh.logger.Error("error-getting-policy", err, lager.Data{"appId": appGUID})
Expand Down Expand Up @@ -172,6 +184,20 @@ func (mh *CustomMetricsHandler) validateCustomMetricTypes(appGUID string, metric
return nil
}

func (mh *CustomMetricsHandler) isAppWithBoundStrategy(appGUID string) (bool, error) {
// allow app with submission_strategy as bound_app to submit custom metrics even without policy
submissionStrategy, err := mh.bindingDB.GetCustomMetricStrategyByAppId(context.TODO(), appGUID)
if err != nil {
mh.logger.Error("error-getting-custom-metrics-strategy", err, lager.Data{"appId": appGUID})
return false, err
}
if submissionStrategy == "bound_app" {
mh.logger.Info("bounded-metrics-submission-strategy", lager.Data{"appId": appGUID, "submission_strategy": submissionStrategy})
return true, nil
}
return false, nil
}

func (mh *CustomMetricsHandler) getMetrics(appID string, metricsConsumer *models.MetricsConsumer) []*models.CustomMetric {
var metrics []*models.CustomMetric
for _, metric := range metricsConsumer.CustomMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var _ = Describe("MetricHandler", func() {
allowedMetricTypeSet map[string]struct{}

policyDB *fakes.FakePolicyDB
fakeBindingDB *fakes.FakeBindingDB
metricsforwarder *fakes.FakeMetricForwarder

resp *httptest.ResponseRecorder
Expand All @@ -46,12 +47,13 @@ var _ = Describe("MetricHandler", func() {
BeforeEach(func() {
logger := lager.NewLogger("metrichandler-test")
policyDB = &fakes.FakePolicyDB{}
fakeBindingDB = &fakes.FakeBindingDB{}
metricsforwarder = &fakes.FakeMetricForwarder{}
allowedMetricCache = *cache.New(10*time.Minute, -1)
allowedMetricTypeSet = make(map[string]struct{})
vars = make(map[string]string)
resp = httptest.NewRecorder()
handler = NewCustomMetricsHandler(logger, metricsforwarder, policyDB, allowedMetricCache)
handler = NewCustomMetricsHandler(logger, metricsforwarder, policyDB, fakeBindingDB, allowedMetricCache)
allowedMetricCache.Flush()
})

Expand Down Expand Up @@ -295,6 +297,24 @@ var _ = Describe("MetricHandler", func() {

})
})
When("neighbour app is bound to same autoscaler instance without policy", func() {
BeforeEach(func() {
fakeBindingDB.GetCustomMetricStrategyByAppIdReturns("bound_app", nil)
customMetrics := []*models.CustomMetric{
{
Name: "queuelength", Value: 12, Unit: "unit", InstanceIndex: 1, AppGUID: "an-app-id",
},
}
body, err = json.Marshal(models.MetricsConsumer{InstanceIndex: 0, CustomMetrics: customMetrics})
Expect(err).NotTo(HaveOccurred())
})

It("should returns status code 200", func() {
Expect(resp.Code).To(Equal(http.StatusOK))
Expect(fakeBindingDB.GetCustomMetricStrategyByAppIdCallCount()).To(Equal(1))

})
})
})
})

Expand Down
2 changes: 1 addition & 1 deletion src/autoscaler/metricsforwarder/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewServer(logger lager.Logger, conf *config.Config, policyDb db.PolicyDB, b
return nil, fmt.Errorf("failed to create metric forwarder: %w", err)
}

mh := NewCustomMetricsHandler(logger, *metricForwarder, policyDb, allowedMetricCache)
mh := NewCustomMetricsHandler(logger, *metricForwarder, policyDb, bindingDB, allowedMetricCache)
authenticator, err := auth.New(logger, credentials, bindingDB)
if err != nil {
return nil, fmt.Errorf("failed to create auth middleware: %w", err)
Expand Down

0 comments on commit f63ba3c

Please sign in to comment.