Skip to content

Commit

Permalink
: Sets eventgenerator healthenpoint on server port
Browse files Browse the repository at this point in the history
  • Loading branch information
bonzofenix committed Jun 17, 2024
1 parent 8e9794b commit cbaa084
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ var _ = SynchronizedBeforeSuite(func() []byte {
})

var _ = SynchronizedAfterSuite(func() {
_ = os.Remove(configFile.Name())
if configFile != nil {
err := os.Remove(configFile.Name())
Expect(err).NotTo(HaveOccurred())
}
}, func() {
gexec.CleanupBuildArtifacts()
})
Expand Down Expand Up @@ -238,7 +241,7 @@ func initConfig() {
testCertDir := testhelpers.TestCertFolder()

egPort = 7000 + GinkgoParallelProcess()
healthport = 8000 + GinkgoParallelProcess()
healthport = egPort
dbUrl := testhelpers.GetDbUrl()
conf = config.Config{
Logging: helpers.LoggingConfig{
Expand Down Expand Up @@ -317,6 +320,7 @@ func initConfig() {
},
}
configFile = writeConfig(&conf)

}

func writeConfig(c *config.Config) *os.File {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package main_test

import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"time"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/eventgenerator/config"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/testhelpers"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand All @@ -17,13 +22,52 @@ import (
. "github.com/onsi/gomega/gexec"
)

func newHttpsclient(certDir string) *http.Client {
clientKey := filepath.Join(certDir, "eventgenerator.key")
clientCrt := filepath.Join(certDir, "eventgenerator.crt")
autoscalerCa := filepath.Join(certDir, "autoscaler-ca.crt")

cert, err := tls.LoadX509KeyPair(clientCrt, clientKey)
if err != nil {
panic(err)
}

// Load the CA certificate
caCert, err := ioutil.ReadFile(autoscalerCa)
if err != nil {
panic(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

// Create a TLS configuration using the client key, client certificate, and CA certificate
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}

// Create an HTTP client with the custom TLS configuration
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
return client
}

var _ = Describe("Eventgenerator", func() {
var (
runner *EventGeneratorRunner
runner *EventGeneratorRunner
testCertDir string
httpsClient *http.Client
serverURL string
)

BeforeEach(func() {
runner = NewEventGeneratorRunner()
testCertDir = testhelpers.TestCertFolder()
httpsClient = newHttpsclient(testCertDir)
serverURL = fmt.Sprintf("https://127.0.0.1:%d", conf.Server.Port)
})

AfterEach(func() {
Expand Down Expand Up @@ -129,7 +173,7 @@ var _ = Describe("Eventgenerator", func() {
})

It("returns with a 200", func() {
rsp, err := httpClient.Get(fmt.Sprintf("https://127.0.0.1:%d/v1/apps/an-app-id/aggregated_metric_histories/a-metric-type", egPort))
rsp, err := httpClient.Get(fmt.Sprintf("%s/v1/apps/an-app-id/aggregated_metric_histories/a-metric-type", serverURL))
Expect(err).NotTo(HaveOccurred())
Expect(rsp.StatusCode).To(Equal(http.StatusOK))
rsp.Body.Close()
Expand All @@ -152,10 +196,13 @@ var _ = Describe("Eventgenerator", func() {

Context("when a request to query health comes", func() {
It("returns with a 200", func() {
rsp, err := healthHttpClient.Get(fmt.Sprintf("http://127.0.0.1:%d/health", healthport))
rsp, err := httpsClient.Get(fmt.Sprintf("%s/health", serverURL))
Expect(err).NotTo(HaveOccurred())
Expect(rsp.StatusCode).To(Equal(http.StatusOK))
raw, _ := io.ReadAll(rsp.Body)

raw, err := io.ReadAll(rsp.Body)
Expect(err).NotTo(HaveOccurred())

healthData := string(raw)
Expect(healthData).To(ContainSubstring("autoscaler_eventgenerator_concurrent_http_request"))
Expect(healthData).To(ContainSubstring("autoscaler_eventgenerator_policyDB"))
Expand All @@ -175,26 +222,25 @@ var _ = Describe("Eventgenerator", func() {
Context("when username and password are incorrect for basic authentication during health check", func() {
It("should return 401", func() {

req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", healthport), nil)
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/health", serverURL), nil)
Expect(err).NotTo(HaveOccurred())

req.SetBasicAuth("wrongusername", "wrongpassword")

rsp, err := healthHttpClient.Do(req)
rsp, err := httpsClient.Do(req)
Expect(err).ToNot(HaveOccurred())
Expect(rsp.StatusCode).To(Equal(http.StatusUnauthorized))
})
})

Context("when username and password are correct for basic authentication during health check", func() {
It("should return 200", func() {

req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", healthport), nil)
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/health", serverURL), nil)
Expect(err).NotTo(HaveOccurred())

req.SetBasicAuth(conf.Health.HealthCheckUsername, conf.Health.HealthCheckPassword)

rsp, err := healthHttpClient.Do(req)
rsp, err := httpsClient.Do(req)
Expect(err).ToNot(HaveOccurred())
Expect(rsp.StatusCode).To(Equal(http.StatusOK))
})
Expand All @@ -208,27 +254,35 @@ var _ = Describe("Eventgenerator", func() {
Context("when username and password are incorrect for basic authentication during health check", func() {
It("should return 401", func() {

req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", healthport), nil)
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/health", serverURL), nil)
Expect(err).NotTo(HaveOccurred())

req.SetBasicAuth("wrongusername", "wrongpassword")

rsp, err := healthHttpClient.Do(req)
rsp, err := httpsClient.Do(req)
Expect(err).ToNot(HaveOccurred())
Expect(rsp.StatusCode).To(Equal(http.StatusUnauthorized))
})
})

Context("when username and password are correct for basic authentication during health check", func() {
It("should return 200", func() {
// Load the client key and certificate
//
// Load your custom certificate file

req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", healthport), nil)
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/health", serverURL), nil)
Expect(err).NotTo(HaveOccurred())

req.SetBasicAuth(conf.Health.HealthCheckUsername, conf.Health.HealthCheckPassword)

rsp, err := healthHttpClient.Do(req)
rsp, err := httpsClient.Do(req)
Expect(err).ToNot(HaveOccurred())

body, err := io.ReadAll(rsp.Body)
Expect(err).NotTo(HaveOccurred())
Expect(string(body)).To(ContainSubstring("autoscaler_eventgenerator_concurrent_http_request"))

Expect(rsp.StatusCode).To(Equal(http.StatusOK))
})
})
Expand Down
21 changes: 5 additions & 16 deletions src/autoscaler/eventgenerator/cmd/eventgenerator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"io"
"time"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/db/sqldb"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/eventgenerator/aggregator"
Expand All @@ -21,7 +20,6 @@ import (

"code.cloudfoundry.org/clock"
"code.cloudfoundry.org/lager/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/tedsuo/ifrit"
"github.com/tedsuo/ifrit/grouper"
"github.com/tedsuo/ifrit/sigmon"
Expand Down Expand Up @@ -57,12 +55,6 @@ func main() {
defer func() { _ = policyDb.Close() }()

httpStatusCollector := healthendpoint.NewHTTPStatusCollector("autoscaler", "eventgenerator")
promRegistry := prometheus.NewRegistry()
healthendpoint.RegisterCollectors(promRegistry, []prometheus.Collector{
healthendpoint.NewDatabaseStatusCollector("autoscaler", "eventgenerator", "appMetricDB", appMetricDB),
healthendpoint.NewDatabaseStatusCollector("autoscaler", "eventgenerator", "policyDB", policyDb),
httpStatusCollector,
}, true, logger.Session("eventgenerator-prometheus"))

appManager := aggregator.NewAppManager(logger, egClock, conf.Aggregator.PolicyPollerInterval, len(conf.Server.NodeAddrs), conf.Server.NodeIndex, conf.Aggregator.MetricCacheSizePerApp, policyDb, appMetricDB)

Expand Down Expand Up @@ -103,20 +95,16 @@ func main() {

eventGenerator := ifrit.RunFunc(runFunc(appManager, evaluators, evaluationManager, metricPollers, anAggregator))

httpServer, err := server.NewServer(logger.Session("http_server"), conf, appManager.QueryAppMetrics, httpStatusCollector)
httpServer, err := server.NewServer(logger.Session("http_server"), conf, appMetricDB, policyDb, appManager.QueryAppMetrics, httpStatusCollector)

if err != nil {
logger.Error("failed to create http server", err)
os.Exit(1)
}
healthServer, err := healthendpoint.NewServerWithBasicAuth(conf.Health, []healthendpoint.Checker{}, logger.Session("health-server"), promRegistry, time.Now)
if err != nil {
logger.Error("failed to create health server", err)
os.Exit(1)
}

members := grouper.Members{
{"eventGenerator", eventGenerator},
{"http_server", httpServer},
{"health_server", healthServer},
}
monitor := ifrit.Invoke(sigmon.New(grouper.NewOrdered(os.Interrupt, members)))

Expand Down Expand Up @@ -162,7 +150,8 @@ func loadConfig(path string) (*config.Config, error) {
}

configFileBytes, err := io.ReadAll(configFile)
_ = configFile.Close()
defer func() { _ = configFile.Close() }()

if err != nil {
return nil, fmt.Errorf("failed to read data from config file %q: %w", path, err)
}
Expand Down
47 changes: 44 additions & 3 deletions src/autoscaler/eventgenerator/server/server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package server

import (
"fmt"
"net/http"
"time"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/db"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/eventgenerator/aggregator"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
Expand All @@ -13,6 +16,7 @@ import (

"code.cloudfoundry.org/lager/v3"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/tedsuo/ifrit"
)

Expand All @@ -23,18 +27,55 @@ func (vh VarsFunc) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vh(w, r, vars)
}

func NewServer(logger lager.Logger, conf *config.Config, queryAppMetric aggregator.QueryAppMetricsFunc, httpStatusCollector healthendpoint.HTTPStatusCollector) (ifrit.Runner, error) {
func NewServer(logger lager.Logger, conf *config.Config, appMetricDB db.AppMetricDB, policyDb db.PolicyDB, queryAppMetric aggregator.QueryAppMetricsFunc, httpStatusCollector healthendpoint.HTTPStatusCollector) (ifrit.Runner, error) {
eh := NewEventGenHandler(logger, queryAppMetric)
httpStatusCollectMiddleware := healthendpoint.NewHTTPStatusCollectMiddleware(httpStatusCollector)
r := routes.EventGeneratorRoutes()
r.Use(otelmux.Middleware("eventgenerator"))
r.Use(httpStatusCollectMiddleware.Collect)
r.Get(routes.GetAggregatedMetricHistoriesRouteName).Handler(VarsFunc(eh.GetAggregatedMetricHistories))

httpServerConfig := helpers.ServerConfig{
healthRouter, err := createHealthRouter(appMetricDB, policyDb, logger, conf, httpStatusCollector)
if err != nil {
return nil, fmt.Errorf("failed to create health router: %w", err)
}

mainRouter := setupMainRouter(r, healthRouter)
return helpers.NewHTTPServer(logger, serverConfigFrom(conf), mainRouter)
}

func serverConfigFrom(conf *config.Config) helpers.ServerConfig {
return helpers.ServerConfig{
Port: conf.Server.Port,
TLS: conf.Server.TLS,
}
}

func createHealthRouter(appMetricDB db.AppMetricDB, policyDb db.PolicyDB, logger lager.Logger, conf *config.Config, httpStatusCollector healthendpoint.HTTPStatusCollector) (*mux.Router, error) {
checkers := []healthendpoint.Checker{}
gatherer := CreatePrometheusRegistry(appMetricDB, policyDb, httpStatusCollector, logger)
healthRouter, err := healthendpoint.NewHealthRouter(conf.Health, checkers, logger.Session("health-server"), gatherer, time.Now)
if err != nil {
return nil, fmt.Errorf("failed to create health router: %w", err)
}

return healthRouter, nil
}

func CreatePrometheusRegistry(appMetricDB db.AppMetricDB, policyDb db.PolicyDB, httpStatusCollector healthendpoint.HTTPStatusCollector, logger lager.Logger) *prometheus.Registry {
promRegistry := prometheus.NewRegistry()
healthendpoint.RegisterCollectors(promRegistry, []prometheus.Collector{
healthendpoint.NewDatabaseStatusCollector("autoscaler", "eventgenerator", "appMetricDB", appMetricDB),
healthendpoint.NewDatabaseStatusCollector("autoscaler", "eventgenerator", "policyDB", policyDb),
httpStatusCollector,
}, true, logger.Session("eventgenerator-prometheus"))
return promRegistry
}

return helpers.NewHTTPServer(logger, httpServerConfig, r)
func setupMainRouter(egRouter, healthRouter *mux.Router) *mux.Router {
mainRouter := mux.NewRouter()
mainRouter.PathPrefix("/v1").Handler(egRouter)
mainRouter.PathPrefix("/health").Handler(healthRouter)
mainRouter.PathPrefix("/").Handler(healthRouter)
return mainRouter
}
7 changes: 6 additions & 1 deletion src/autoscaler/eventgenerator/server/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
var (
serverProcess ifrit.Process
serverUrl *url.URL
policyDB *fakes.FakePolicyDB

appMetricDB *fakes.FakeAppMetricDB
)

func TestServer(t *testing.T) {
Expand All @@ -43,7 +46,9 @@ var _ = BeforeSuite(func() {
}

httpStatusCollector := &fakes.FakeHTTPStatusCollector{}
httpServer, err := server.NewServer(lager.NewLogger("test"), conf, queryAppMetrics, httpStatusCollector)
policyDB = &fakes.FakePolicyDB{}
appMetricDB = &fakes.FakeAppMetricDB{}
httpServer, err := server.NewServer(lager.NewLogger("test"), conf, appMetricDB, policyDB, queryAppMetrics, httpStatusCollector)
Expect(err).NotTo(HaveOccurred())

serverUrl, err = url.Parse("http://127.0.0.1:" + strconv.Itoa(port))
Expand Down
Loading

0 comments on commit cbaa084

Please sign in to comment.