diff --git a/src/autoscaler/scalingengine/cmd/scalingengine/main.go b/src/autoscaler/scalingengine/cmd/scalingengine/main.go index 86a66994c7..88502a0970 100644 --- a/src/autoscaler/scalingengine/cmd/scalingengine/main.go +++ b/src/autoscaler/scalingengine/cmd/scalingengine/main.go @@ -4,11 +4,9 @@ import ( "flag" "fmt" "os" - "time" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/cf" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/db/sqldb" - "code.cloudfoundry.org/app-autoscaler/src/autoscaler/healthendpoint" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine/config" @@ -16,7 +14,6 @@ import ( "code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine/server" "code.cloudfoundry.org/clock" "code.cloudfoundry.org/lager/v3" - "github.com/prometheus/client_golang/prometheus" "github.com/tedsuo/ifrit" "github.com/tedsuo/ifrit/grouper" "github.com/tedsuo/ifrit/sigmon" @@ -79,33 +76,17 @@ func main() { } defer func() { _ = schedulerDB.Close() }() - httpStatusCollector := healthendpoint.NewHTTPStatusCollector("autoscaler", "scalingengine") - promRegistry := prometheus.NewRegistry() - healthendpoint.RegisterCollectors(promRegistry, []prometheus.Collector{ - healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "policyDB", policyDb), - healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "scalingengineDB", scalingEngineDB), - healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "schedulerDB", schedulerDB), - httpStatusCollector, - }, true, logger.Session("scalingengine-prometheus")) - scalingEngine := scalingengine.NewScalingEngine(logger, cfClient, policyDb, scalingEngineDB, eClock, conf.DefaultCoolDownSecs, conf.LockSize) synchronizer := schedule.NewActiveScheduleSychronizer(logger, schedulerDB, scalingEngineDB, scalingEngine) - httpServer, err := server.NewServer(logger.Session("http-server"), conf, scalingEngineDB, scalingEngine, synchronizer, httpStatusCollector) + httpServer, err := server.NewServer(logger.Session("http-server"), conf, policyDb, scalingEngineDB, schedulerDB, scalingEngine, synchronizer) if err != nil { logger.Error("failed to create http server", err) os.Exit(1) } - healthServer, err := healthendpoint.NewServerWithBasicAuth(conf.Health, []healthendpoint.Checker{}, logger.Session("health-server"), promRegistry, time.Now) - if err != nil { - logger.Error("failed to create health server", err) - os.Exit(1) - } - members := grouper.Members{ {"http_server", httpServer}, - {"health_server", healthServer}, } monitor := ifrit.Invoke(sigmon.New(grouper.NewOrdered(os.Interrupt, members))) diff --git a/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_suite_test.go b/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_suite_test.go index a6c4517df3..f7400f33c0 100644 --- a/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_suite_test.go +++ b/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_suite_test.go @@ -138,8 +138,7 @@ var _ = SynchronizedBeforeSuite( _, err = testDB.Exec(testDB.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) values(?, ?, ?)"), appId, policy, "1234") FailOnError("insert failed", err) - httpClient = NewEventGeneratorClient() - healthHttpClient = &http.Client{} + httpClient = NewScalingEngineClient() }) func verifyCertExistence(testCertDir string) { diff --git a/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_test.go b/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_test.go index 9c19c16ecc..1d54597530 100644 --- a/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_test.go +++ b/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_test.go @@ -23,11 +23,13 @@ import ( var _ = Describe("Main", func() { var ( - runner *ScalingEngineRunner + runner *ScalingEngineRunner + serverURL string ) BeforeEach(func() { runner = NewScalingEngineRunner() + serverURL = fmt.Sprintf("https://127.0.0.1:%d", conf.Server.Port) }) JustBeforeEach(func() { @@ -49,10 +51,6 @@ var _ = Describe("Main", func() { It("http server starts directly", func() { Eventually(runner.Session.Buffer, 2*time.Second).Should(gbytes.Say("scalingengine.http-server.new-http-server")) }) - - It("health server starts directly", func() { - Eventually(runner.Session.Buffer, 2*time.Second).Should(gbytes.Say("scalingengine.health-server.new-http-server")) - }) }) Context("when starting multiple scaling engine instances", func() { @@ -160,7 +158,7 @@ var _ = Describe("Main", func() { body, err := json.Marshal(models.Trigger{Adjustment: "+1"}) Expect(err).NotTo(HaveOccurred()) - rsp, err := httpClient.Post(fmt.Sprintf("https://127.0.0.1:%d/v1/apps/%s/scale", port, appId), + rsp, err := httpClient.Post(fmt.Sprintf("%s/v1/apps/%s/scale", serverURL, appId), "application/json", bytes.NewReader(body)) Expect(err).NotTo(HaveOccurred()) Expect(rsp.StatusCode).To(Equal(http.StatusOK)) @@ -170,7 +168,7 @@ var _ = Describe("Main", func() { Context("when a request to retrieve scaling history comes", func() { It("returns with a 200", func() { - req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("https://127.0.0.1:%d/v1/apps/%s/scaling_histories", port, appId), nil) + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/v1/apps/%s/scaling_histories", serverURL, appId), nil) Expect(err).NotTo(HaveOccurred()) req.Header.Set("Authorization", "Bearer none") rsp, err := httpClient.Do(req) @@ -182,7 +180,7 @@ var _ = Describe("Main", func() { It("handles the start and end of a schedule", func() { By("start of a schedule") - url := fmt.Sprintf("https://127.0.0.1:%d/v1/apps/%s/active_schedules/111111", port, appId) + url := fmt.Sprintf("%s/v1/apps/%s/active_schedules/111111", serverURL, appId) bodyReader := bytes.NewReader([]byte(`{"instance_min_count":1, "instance_max_count":5, "initial_min_instance_count":3}`)) req, err := http.NewRequest(http.MethodPut, url, bodyReader) @@ -205,7 +203,6 @@ var _ = Describe("Main", func() { }) Describe("when Health server is ready to serve RESTful API", func() { - BeforeEach(func() { basicAuthConfig := conf basicAuthConfig.Health.HealthCheckUsername = "" @@ -219,7 +216,7 @@ var _ = Describe("Main", func() { Context("when a request to query health comes", func() { It("returns with a 200", func() { - rsp, err := healthHttpClient.Get(fmt.Sprintf("http://127.0.0.1:%d", healthport)) + rsp, err := httpClient.Get(fmt.Sprintf("%s", serverURL)) Expect(err).NotTo(HaveOccurred()) Expect(rsp.StatusCode).To(Equal(http.StatusOK)) raw, _ := io.ReadAll(rsp.Body) @@ -243,13 +240,12 @@ var _ = Describe("Main", func() { Context("when username and password are incorrect for basic authentication during health check", func() { It("should return 401", func() { - - req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", healthport), nil) + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/health", serverURL), nil) Expect(err).NotTo(HaveOccurred()) req.SetBasicAuth("wrongusername", "wrongpassword") - rsp, err := healthHttpClient.Do(req) + rsp, err := httpClient.Do(req) Expect(err).ToNot(HaveOccurred()) Expect(rsp.StatusCode).To(Equal(http.StatusUnauthorized)) }) @@ -258,12 +254,12 @@ var _ = Describe("Main", func() { Context("when username and password are correct for basic authentication during health check", func() { It("should return 200", func() { - req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", healthport), nil) + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/health", serverURL), nil) Expect(err).NotTo(HaveOccurred()) req.SetBasicAuth(conf.Health.HealthCheckUsername, conf.Health.HealthCheckPassword) - rsp, err := healthHttpClient.Do(req) + rsp, err := httpClient.Do(req) Expect(err).ToNot(HaveOccurred()) Expect(rsp.StatusCode).To(Equal(http.StatusOK)) }) @@ -278,12 +274,12 @@ var _ = Describe("Main", func() { Context("when username and password are incorrect for basic authentication during health check", func() { It("should return 401", func() { - req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", healthport), nil) + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/health", serverURL), nil) Expect(err).NotTo(HaveOccurred()) req.SetBasicAuth("wrongusername", "wrongpassword") - rsp, err := healthHttpClient.Do(req) + rsp, err := httpClient.Do(req) Expect(err).ToNot(HaveOccurred()) Expect(rsp.StatusCode).To(Equal(http.StatusUnauthorized)) }) @@ -292,12 +288,12 @@ var _ = Describe("Main", func() { Context("when username and password are correct for basic authentication during health check", func() { It("should return 200", func() { - req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", healthport), nil) + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/health", serverURL), nil) Expect(err).NotTo(HaveOccurred()) req.SetBasicAuth(conf.Health.HealthCheckUsername, conf.Health.HealthCheckPassword) - rsp, err := healthHttpClient.Do(req) + rsp, err := httpClient.Do(req) Expect(err).ToNot(HaveOccurred()) Expect(rsp.StatusCode).To(Equal(http.StatusOK)) }) diff --git a/src/autoscaler/scalingengine/schedule/sync.go b/src/autoscaler/scalingengine/schedule/sync.go index ddaec015c6..47f7268baa 100644 --- a/src/autoscaler/scalingengine/schedule/sync.go +++ b/src/autoscaler/scalingengine/schedule/sync.go @@ -10,6 +10,7 @@ import ( "code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine" ) +// TODO: fix the typo in the interface name, it should be ActiveScheduleSynchronizer type ActiveScheduleSychronizer interface { Sync() } diff --git a/src/autoscaler/scalingengine/server/server.go b/src/autoscaler/scalingengine/server/server.go index b9133179d1..d77478dd05 100644 --- a/src/autoscaler/scalingengine/server/server.go +++ b/src/autoscaler/scalingengine/server/server.go @@ -1,6 +1,8 @@ package server import ( + "time" + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/db" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/healthendpoint" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers" @@ -12,6 +14,7 @@ import ( "code.cloudfoundry.org/lager/v3" "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus" "github.com/tedsuo/ifrit" "go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux" @@ -26,9 +29,39 @@ func (vh VarsFunc) ServeHTTP(w http.ResponseWriter, r *http.Request) { vh(w, r, vars) } -func NewServer(logger lager.Logger, conf *config.Config, scalingEngineDB db.ScalingEngineDB, scalingEngine scalingengine.ScalingEngine, synchronizer schedule.ActiveScheduleSychronizer, httpStatusCollector healthendpoint.HTTPStatusCollector) (ifrit.Runner, error) { +func createPrometheusRegistry(policyDB db.PolicyDB, scalingEngineDB db.ScalingEngineDB, schedulerDB db.SchedulerDB, httpStatusCollector healthendpoint.HTTPStatusCollector, logger lager.Logger) *prometheus.Registry { + promRegistry := prometheus.NewRegistry() + //validate that db are not nil + + if policyDB == nil || scalingEngineDB == nil || schedulerDB == nil { + logger.Error("failed-to-create-prometheus-registry", fmt.Errorf("db is nil: have policyDB: %t, have scalingEngineDB: %t, have schedulerDB: %t", policyDB != nil, scalingEngineDB != nil, schedulerDB != nil)) + return promRegistry + } + + healthendpoint.RegisterCollectors(promRegistry, []prometheus.Collector{ + healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "policyDB", policyDB), + healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "scalingengineDB", scalingEngineDB), + healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "schedulerDB", schedulerDB), + httpStatusCollector, + }, true, logger.Session("scalingengine-prometheus")) + return promRegistry +} + +func createHealthRouter(logger lager.Logger, conf *config.Config, policyDB db.PolicyDB, scalingEngineDB db.ScalingEngineDB, schedulerDB db.SchedulerDB, httpStatusCollector healthendpoint.HTTPStatusCollector) (*mux.Router, error) { + checkers := []healthendpoint.Checker{} + gatherer := createPrometheusRegistry(policyDB, scalingEngineDB, schedulerDB, httpStatusCollector, logger) + healthRouter, err := healthendpoint.NewHealthRouter(conf.Health, checkers, logger.Session("health-server"), gatherer, time.Now) + if err != nil { + return nil, fmt.Errorf("failed to create health router: %w", err) + } + return healthRouter, nil +} + +func NewServer(logger lager.Logger, conf *config.Config, policyDB db.PolicyDB, scalingEngineDB db.ScalingEngineDB, schedulerDB db.SchedulerDB, scalingEngine scalingengine.ScalingEngine, synchronizer schedule.ActiveScheduleSychronizer) (ifrit.Runner, error) { handler := NewScalingHandler(logger, scalingEngineDB, scalingEngine) syncHandler := NewSyncHandler(logger, synchronizer) + httpStatusCollector := healthendpoint.NewHTTPStatusCollector("autoscaler", "scalingengine") + httpStatusCollectMiddleware := healthendpoint.NewHTTPStatusCollectMiddleware(httpStatusCollector) r := routes.ScalingEngineRoutes() r.Use(otelmux.Middleware("scalingengine")) @@ -48,7 +81,22 @@ func NewServer(logger lager.Logger, conf *config.Config, scalingEngineDB db.Scal r.Get(routes.SyncActiveSchedulesRouteName).Handler(VarsFunc(syncHandler.Sync)) - return helpers.NewHTTPServer(logger, conf.Server, r) + healthRouter, err := createHealthRouter(logger, conf, policyDB, scalingEngineDB, schedulerDB, httpStatusCollector) + if err != nil { + return nil, fmt.Errorf("failed to create health router: %w", err) + } + + mainRouter := setupMainRouter(r, healthRouter) + + return helpers.NewHTTPServer(logger, conf.Server, mainRouter) +} + +func setupMainRouter(r *mux.Router, healthRouter *mux.Router) *mux.Router { + mainRouter := mux.NewRouter() + mainRouter.PathPrefix("/v1").Handler(r) + mainRouter.PathPrefix("/health").Handler(healthRouter) + mainRouter.PathPrefix("/").Handler(healthRouter) + return mainRouter } func newScalingHistoryHandler(logger lager.Logger, scalingEngineDB db.ScalingEngineDB) (http.Handler, error) { diff --git a/src/autoscaler/scalingengine/server/server_test.go b/src/autoscaler/scalingengine/server/server_test.go index d3fbff3192..767311cb08 100644 --- a/src/autoscaler/scalingengine/server/server_test.go +++ b/src/autoscaler/scalingengine/server/server_test.go @@ -21,11 +21,10 @@ import ( ) var ( - server ifrit.Process - serverUrl string - scalingEngineDB *fakes.FakeScalingEngineDB - sychronizer *fakes.FakeActiveScheduleSychronizer - httpStatusCollector *fakes.FakeHTTPStatusCollector + server ifrit.Process + serverUrl string + scalingEngineDB *fakes.FakeScalingEngineDB + sychronizer *fakes.FakeActiveScheduleSychronizer ) var _ = SynchronizedBeforeSuite(func() []byte { @@ -39,10 +38,11 @@ var _ = SynchronizedBeforeSuite(func() []byte { } scalingEngineDB = &fakes.FakeScalingEngineDB{} scalingEngine := &fakes.FakeScalingEngine{} + policyDb := &fakes.FakePolicyDB{} + schedulerDB := &fakes.FakeSchedulerDB{} sychronizer = &fakes.FakeActiveScheduleSychronizer{} - httpStatusCollector = &fakes.FakeHTTPStatusCollector{} - httpServer, err := NewServer(lager.NewLogger("test"), conf, scalingEngineDB, scalingEngine, sychronizer, httpStatusCollector) + httpServer, err := NewServer(lager.NewLogger("test"), conf, policyDb, scalingEngineDB, schedulerDB, scalingEngine, sychronizer) Expect(err).NotTo(HaveOccurred()) server = ginkgomon_v2.Invoke(httpServer) serverUrl = fmt.Sprintf("http://127.0.0.1:%d", conf.Server.Port) diff --git a/src/autoscaler/testhelpers/clients.go b/src/autoscaler/testhelpers/clients.go index eecf376d1b..0c7c338e6e 100644 --- a/src/autoscaler/testhelpers/clients.go +++ b/src/autoscaler/testhelpers/clients.go @@ -31,6 +31,10 @@ func NewSchedulerClient() *http.Client { return CreateClientFor("scheduler") } +func NewScalingEngineClient() *http.Client { + return CreateClientFor("scalingengine") +} + func CreateClientFor(name string) *http.Client { certFolder := TestCertFolder() return CreateClient(filepath.Join(certFolder, name+".crt"),