Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix metrics #1354

Merged
merged 5 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions checks/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ func transform(ctx *context.Context, in *pkg.CheckResult) ([]*pkg.CheckResult, e
}
return results, nil
} else if len(transformed) == 1 && t.Name == "" {
if ctx.IsTrace() {
ctx.Tracef("merging %v into %v", t, in)
}
in.Metrics = append(in.Metrics, t.Metrics...)
if t.Start != nil {
in.Start = *t.Start
Expand Down
6 changes: 6 additions & 0 deletions checks/runchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

var checksCache = cache.New(5*time.Minute, 5*time.Minute)

var DisabledChecks []string

func getDisabledChecks(ctx *context.Context) (map[string]struct{}, error) {
if val, ok := checksCache.Get("disabledChecks"); ok {
return val.(map[string]struct{}), nil
Expand All @@ -41,6 +43,10 @@ func getDisabledChecks(ctx *context.Context) (map[string]struct{}, error) {
result[strings.TrimPrefix(name, "check.disabled.")] = struct{}{}
}

for _, check := range DisabledChecks {
result[check] = struct{}{}
}

if rows.Err() != nil {
return nil, rows.Err()
}
Expand Down
27 changes: 9 additions & 18 deletions cmd/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"os"
"strings"
"time"

"github.com/flanksource/canary-checker/pkg/cache"
Expand Down Expand Up @@ -31,7 +30,6 @@ import (

var webhookPort int
var enableLeaderElection bool
var operatorNamespace string
var operatorExecutor bool
var disablePostgrest bool
var Operator = &cobra.Command{
Expand All @@ -42,7 +40,7 @@ var Operator = &cobra.Command{

func init() {
ServerFlags(Operator.Flags())
Operator.Flags().StringVarP(&operatorNamespace, "namespace", "n", "", "Watch only specified namespaces, otherwise watch all")
Operator.Flags().StringVarP(&runner.WatchNamespace, "namespace", "n", "", "Watch only specified namespaces, otherwise watch all")
Operator.Flags().BoolVar(&operatorExecutor, "executor", true, "If false, only serve the UI and sync the configs")
Operator.Flags().IntVar(&webhookPort, "webhookPort", 8082, "Port for webhooks ")
Operator.Flags().BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enabling this will ensure there is only one active controller manager")
Expand Down Expand Up @@ -88,7 +86,7 @@ func run(cmd *cobra.Command, args []string) {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
LeaderElection: enableLeaderElection,
LeaderElectionNamespace: operatorNamespace,
LeaderElectionNamespace: runner.WatchNamespace,
Metrics: ctrlMetrics.Options{
BindAddress: ":0",
},
Expand All @@ -106,23 +104,16 @@ func run(cmd *cobra.Command, args []string) {
}
loggr.Sugar().Infof("Using runner name: %s", runner.RunnerName)

includeNamespaces := []string{}
if operatorNamespace != "" {
includeNamespaces = strings.Split(operatorNamespace, ",")
canaryJobs.CanaryNamespaces = includeNamespaces
}
runner.RunnerLabels = labels.LoadFromFile("/etc/podinfo/labels")

canaryReconciler := &controllers.CanaryReconciler{
IncludeCheck: includeCheck,
IncludeNamespaces: includeNamespaces,
Client: mgr.GetClient(),
LogPass: logPass,
LogFail: logFail,
Log: ctrl.Log.WithName("controllers").WithName("canary"),
Scheme: mgr.GetScheme(),
RunnerName: runner.RunnerName,
CanaryCache: gocache.New(7*24*time.Hour, 1*time.Hour),
Client: mgr.GetClient(),
LogPass: logPass,
LogFail: logFail,
Log: ctrl.Log.WithName("controllers").WithName("canary"),
Scheme: mgr.GetScheme(),
RunnerName: runner.RunnerName,
CanaryCache: gocache.New(7*24*time.Hour, 1*time.Hour),
}

systemReconciler := &controllers.TopologyReconciler{
Expand Down
7 changes: 5 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var Root = &cobra.Command{

var httpPort = 8080
var publicEndpoint = "http://localhost:8080"
var includeCheck, prometheusURL string
var prometheusURL string
var pushServers, pullServers []string
var sharedLibrary []string
var exposeEnv bool
Expand All @@ -49,7 +49,9 @@ func ServerFlags(flags *pflag.FlagSet) {
_ = flags.MarkDeprecated("dev", "")

flags.StringVar(&publicEndpoint, "public-endpoint", publicEndpoint, "Host on which the health dashboard is exposed. Could be used for generting-links, redirects etc.")
flags.StringVar(&includeCheck, "include-check", "", "Run matching canaries - useful for debugging")
flags.StringSliceVar(&runner.IncludeCanaries, "include-check", []string{}, "Run matching canaries - useful for debugging")
flags.StringSliceVar(&runner.IncludeTypes, "include-type", []string{}, "Check type to disable")
flags.StringSliceVar(&runner.IncludeNamespaces, "include-namespace", []string{}, "Check type to disable")
flags.IntVar(&cache.DefaultCacheCount, "maxStatusCheckCount", 5, "Maximum number of past checks in the in memory cache")
flags.StringSliceVar(&pushServers, "push-servers", []string{}, "push check results to multiple canary servers")
flags.StringSliceVar(&pullServers, "pull-servers", []string{}, "push check results to multiple canary servers")
Expand Down Expand Up @@ -82,6 +84,7 @@ func init() {

Root.PersistentFlags().StringVar(&db.ConnectionString, "db", "DB_URL", "Connection string for the postgres database")
Root.PersistentFlags().BoolVar(&db.RunMigrations, "db-migrations", false, "Run database migrations")
Root.PersistentFlags().BoolVar(&db.DBMetrics, "db-metrics", false, "Expose db metrics")
Root.PersistentFlags().BoolVar(&logFail, "log-fail", true, "Log every failing check")
Root.PersistentFlags().BoolVar(&logPass, "log-pass", false, "Log every passing check")
Root.PersistentFlags().StringArrayVar(&sharedLibrary, "shared-library", []string{}, "Add javascript files to be shared by all javascript templates")
Expand Down
14 changes: 14 additions & 0 deletions pkg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pkg

import (
"fmt"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -525,6 +526,19 @@ type Metric struct {
Value float64 `json:"value,omitempty"`
}

func (m Metric) ID() string {
return fmt.Sprintf("%s-%s", m.Name, strings.Join(m.LabelNames(), "-"))
}

func (m Metric) LabelNames() []string {
var names []string
for k := range m.Labels {
names = append(names, k)
}
sort.Strings(names)
return names
}

func (m Metric) String() string {
labels := ""
if len(m.Labels) > 0 {
Expand Down
30 changes: 9 additions & 21 deletions pkg/controllers/canary_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg"
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
"github.com/flanksource/canary-checker/pkg/runner"
"github.com/flanksource/kommons"
"github.com/go-logr/logr"
jsontime "github.com/liamylian/jsontime/v2/v2"
Expand All @@ -46,9 +47,7 @@ var json = jsontime.ConfigWithCustomTimeFormat

// CanaryReconciler reconciles a Canary object
type CanaryReconciler struct {
IncludeCheck string
IncludeNamespaces []string
LogPass, LogFail bool
LogPass, LogFail bool
client.Client
Kubernetes kubernetes.Interface
Kommons *kommons.Client
Expand All @@ -68,21 +67,19 @@ const FinalizerName = "canary.canaries.flanksource.com"
// +kubebuilder:rbac:groups="",resources=pods/exec,verbs=*
// +kubebuilder:rbac:groups="",resources=pods/logs,verbs=*
func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (ctrl.Result, error) {
if len(r.IncludeNamespaces) > 0 && !r.includeNamespace(req.Namespace) {
r.Log.V(2).Info("namespace not included, skipping")
return ctrl.Result{}, nil
}
if r.IncludeCheck != "" && r.IncludeCheck != req.Name {
r.Log.V(2).Info("check not included, skipping")
return ctrl.Result{}, nil
}

logger := r.Log.WithValues("canary", req.NamespacedName)
canary := &v1.Canary{}
err := r.Get(ctx, req.NamespacedName, canary)
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
} else if err != nil {
return ctrl.Result{RequeueAfter: 10 * time.Minute}, corev1.ErrUnexpectedEndOfGroupGenerated
}

if runner.IsCanaryIgnored(&canary.ObjectMeta) {
return ctrl.Result{}, nil
}

canary.SetRunnerName(r.RunnerName)
// Add finalizer first if not exist to avoid the race condition between init and delete
if !controllerutil.ContainsFinalizer(canary, FinalizerName) {
Expand Down Expand Up @@ -219,12 +216,3 @@ func (r *CanaryReconciler) Report() {
}
}
}

func (r *CanaryReconciler) includeNamespace(namespace string) bool {
for _, n := range r.IncludeNamespaces {
if n == namespace {
return true
}
}
return false
}
8 changes: 4 additions & 4 deletions pkg/db/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"gorm.io/gorm/clause"
)

func GetAllCanariesForSync(namespaces ...string) ([]pkg.Canary, error) {
func GetAllCanariesForSync(namespace string) ([]pkg.Canary, error) {
query := `
SELECT json_agg(
jsonb_set_lax(to_jsonb(canaries),'{checks}', (
Expand All @@ -44,9 +44,9 @@ func GetAllCanariesForSync(namespaces ...string) ([]pkg.Canary, error) {

args := make(pgx.NamedArgs)

if namespaces != nil {
query += " AND namespace = ANY(@namespaces)"
args["namespaces"] = namespaces
if namespace != "" {
query += " AND namespace = @namespace"
args["namespace"] = namespace
}

rows, err := Pool.Query(context.Background(), query, args)
Expand Down
25 changes: 14 additions & 11 deletions pkg/db/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var Gorm *gorm.DB
var ConnectionString string
var DefaultExpiryDays int
var RunMigrations bool
var DBMetrics bool
var PostgresServer *embeddedpostgres.EmbeddedPostgres
var HTTPEndpoint = "http://localhost:8080/db"

Expand Down Expand Up @@ -101,17 +102,19 @@ func Init() error {
return err
}

go func() {
if err := Gorm.Use(prometheus.New(prometheus.Config{
DBName: Pool.Config().ConnConfig.Database,
StartServer: false,
MetricsCollector: []prometheus.MetricsCollector{
&prometheus.Postgres{},
},
})); err != nil {
logger.Warnf("Failed to register prometheus metrics: %v", err)
}
}()
if DBMetrics {
go func() {
if err := Gorm.Use(prometheus.New(prometheus.Config{
DBName: Pool.Config().ConnConfig.Database,
StartServer: false,
MetricsCollector: []prometheus.MetricsCollector{
&prometheus.Postgres{},
},
})); err != nil {
logger.Warnf("Failed to register prometheus metrics: %v", err)
}
}()
}

if RunMigrations {
opts := &migrate.MigrateOptions{IgnoreFiles: []string{"007_events.sql", "012_changelog.sql"}}
Expand Down
17 changes: 12 additions & 5 deletions pkg/jobs/canary/canary_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/flanksource/canary-checker/pkg/db"
"github.com/flanksource/canary-checker/pkg/metrics"
"github.com/flanksource/canary-checker/pkg/push"
"github.com/flanksource/canary-checker/pkg/runner"
"github.com/flanksource/canary-checker/pkg/utils"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/models"
Expand All @@ -30,10 +31,6 @@ var DataFile string
var Executor bool
var LogPass, LogFail bool

// CanaryNamespaces is a list of namespaces whose canary specs should be synced.
// If empty, all namespaces will be synced
var CanaryNamespaces []string

var Kommons *kommons.Client
var Kubernetes kubernetes.Interface
var FuncScheduler = cron.New()
Expand Down Expand Up @@ -69,6 +66,9 @@ var minimumTimeBetweenCanaryRuns = 10 * time.Second
var canaryLastRuntimes = sync.Map{}

func (job CanaryJob) Run() {
if runner.IsCanaryIgnored(&job.Canary.ObjectMeta) {
return
}
canaryID := job.DBCanary.ID.String()
val, _ := concurrentJobLocks.LoadOrStore(canaryID, &sync.Mutex{})
lock, ok := val.(*sync.Mutex)
Expand Down Expand Up @@ -279,6 +279,9 @@ func ScanCanaryConfigs() {
}

for _, canary := range configs {
if runner.IsCanaryIgnored(&canary.ObjectMeta) {
continue
}
_, err := db.PersistCanary(canary, path.Base(configfile))
if err != nil {
logger.Errorf("could not persist %s: %v", canary.Name, err)
Expand All @@ -303,6 +306,10 @@ func SyncCanaryJob(dbCanary pkg.Canary) error {
return nil
}

if runner.IsCanaryIgnored(&canary.ObjectMeta) {
return nil
}

if Kommons == nil {
var err error
Kommons, Kubernetes, err = pkg.NewKommonsClient()
Expand Down Expand Up @@ -350,7 +357,7 @@ func SyncCanaryJob(dbCanary pkg.Canary) error {
func SyncCanaryJobs() {
logger.Debugf("Syncing canary jobs")

canaries, err := db.GetAllCanariesForSync(CanaryNamespaces...)
canaries, err := db.GetAllCanariesForSync(runner.WatchNamespace)
if err != nil {
logger.Errorf("Failed to get canaries: %v", err)

Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/canary/canary_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var _ = ginkgo.Describe("Test Sync Canary Job", ginkgo.Ordered, func() {
err = db.Gorm.Create(canaryM).Error
Expect(err).To(BeNil())

response, err := db.GetAllCanariesForSync()
response, err := db.GetAllCanariesForSync("")
Expect(err).To(BeNil())
Expect(len(response)).To(Equal(1))
})
Expand Down
Loading
Loading