From d6b1d51311f219efba06bb317bf5a9301b2ac13d Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Wed, 12 Oct 2022 19:58:16 +0530 Subject: [PATCH] Add telemetry for `metrics_ingestion_path` & `json_endpoint_used` Signed-off-by: Harkishen-Singh --- CHANGELOG.md | 5 +++ pkg/api/parser/parser.go | 24 +++++++++++++ pkg/rules/rules.go | 45 ++++-------------------- pkg/runner/runner.go | 6 ++-- pkg/telemetry/registry.go | 27 +++++++++++++++ pkg/telemetry/telemetry.go | 71 +++++--------------------------------- 6 files changed, 75 insertions(+), 103 deletions(-) create mode 100644 pkg/telemetry/registry.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 376092ebca..6120e284b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,11 @@ We use the following categories for changes: - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## Unreleased + +### Added +- Telemetry for `promscale_metrics_ingestion_path` and `promscale_metrics_ingestion_json_endpoint_used` [#1695] + ## [0.15.0] - 2022-10-11 ### Added diff --git a/pkg/api/parser/parser.go b/pkg/api/parser/parser.go index 2067910090..fe99c8c7ad 100644 --- a/pkg/api/parser/parser.go +++ b/pkg/api/parser/parser.go @@ -9,6 +9,7 @@ import ( "github.com/timescale/promscale/pkg/api/parser/protobuf" "github.com/timescale/promscale/pkg/api/parser/text" "github.com/timescale/promscale/pkg/prompb" + "github.com/timescale/promscale/pkg/telemetry" ) type formatParser func(*http.Request, *prompb.WriteRequest) error @@ -62,6 +63,8 @@ func (d DefaultParser) ParseRequest(r *http.Request, req *prompb.WriteRequest) e return fmt.Errorf("parser error: %w", err) } + updateTelemetry(mediaType) + if len(req.Timeseries) == 0 { return nil } @@ -81,3 +84,24 @@ func (d DefaultParser) ParseRequest(r *http.Request, req *prompb.WriteRequest) e return nil } + +func InitTelemetry() { + telemetry.Registry.Update("metrics_ingestion_path", "no_ingestion") + telemetry.Registry.Update("metrics_ingestion_json_endpoint_used", "0") +} + +func updateTelemetry(parser string) { + switch parser { + case "application/x-protobuf": + telemetry.Registry.Update("metrics_ingestion_path", "protobuf") + case "application/json": + telemetry.Registry.Update("metrics_ingestion_path", "json") + telemetry.Registry.Update("metrics_ingestion_json_endpoint_used", "1") + case "text/plain": + telemetry.Registry.Update("metrics_ingestion_path", "text_plain") + case "application/openmetrics-text": + telemetry.Registry.Update("metrics_ingestion_path", "text_open_metrics") + default: + telemetry.Registry.Update("metrics_ingestion_path", "none") + } +} diff --git a/pkg/rules/rules.go b/pkg/rules/rules.go index 02e59d06fb..89eba507bb 100644 --- a/pkg/rules/rules.go +++ b/pkg/rules/rules.go @@ -24,34 +24,8 @@ import ( "github.com/timescale/promscale/pkg/pgclient" "github.com/timescale/promscale/pkg/rules/adapters" "github.com/timescale/promscale/pkg/telemetry" - "github.com/timescale/promscale/pkg/util" ) -var ( - // These metrics are used to track telemetry by registering - // in telemetryEngine.RegisterDynamicMetadata() - rulesEnabled = prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: util.PromNamespace, - Subsystem: "rules", - Name: "enabled", - Help: "Promscale rules is enabled or not.", - }, - ) - alertingEnabled = prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: util.PromNamespace, - Subsystem: "alerting", - Name: "enabled", - Help: "Promscale alerting is enabled or not.", - }, - ) -) - -func init() { - prometheus.MustRegister(rulesEnabled, alertingEnabled) -} - type Manager struct { ctx context.Context rulesManager *prom_rules.Manager @@ -98,14 +72,9 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C return manager, manager.getReloader(cfg), nil } -func RegisterForTelemetry(t telemetry.Engine) error { - if err := t.RegisterDynamicMetadata("rules_enabled", rulesEnabled); err != nil { - return fmt.Errorf("register dynamic 'promscale_rules_enabled' metric for telemetry: %w", err) - } - if err := t.RegisterDynamicMetadata("alerting_enabled", alertingEnabled); err != nil { - return fmt.Errorf("register dynamic 'promscale_alerting_enabled' metric for telemetry: %w", err) - } - return nil +func InitTelemetry() { + telemetry.Registry.Update("rules_enabled", "0") + telemetry.Registry.Update("alerting_enabled", "0") } func (m *Manager) getReloader(cfg *Config) func() error { @@ -124,17 +93,17 @@ func (m *Manager) getReloader(cfg *Config) func() error { func (m *Manager) updateTelemetry(cfg *Config) { if cfg.ContainsRules() { - rulesEnabled.Set(1) + telemetry.Registry.Update("rules_enabled", "1") if cfg.ContainsAlertingConfig() { - alertingEnabled.Set(1) + telemetry.Registry.Update("alerting_enabled", "1") } else { log.Debug("msg", "Alerting configuration not present in the given Prometheus configuration file. Alerting will not be initialized") - alertingEnabled.Set(0) + telemetry.Registry.Update("alerting_enabled", "0") } return } log.Debug("msg", "Rules files not found. Rules and alerting configuration will not be initialized") - rulesEnabled.Set(0) + telemetry.Registry.Update("rules_enabled", "0") } func (m *Manager) WithPostRulesProcess(f prom_rules.RuleGroupPostProcessFunc) { diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 05a3a26378..c425729e24 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -35,6 +35,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/timescale/promscale/pkg/api" + "github.com/timescale/promscale/pkg/api/parser" jaegerStore "github.com/timescale/promscale/pkg/jaeger/store" "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgclient" @@ -368,8 +369,7 @@ func initTelemetryEngine(client *pgclient.Client) (telemetry.Engine, error) { if err := trace.RegisterTelemetryMetrics(t); err != nil { log.Error("msg", "error registering metrics for Jaeger-ingest telemetry", "err", err.Error()) } - if err := rules.RegisterForTelemetry(t); err != nil { - log.Error("msg", "error registering metrics for rules telemetry", "err", err.Error()) - } + parser.InitTelemetry() + rules.InitTelemetry() return t, nil } diff --git a/pkg/telemetry/registry.go b/pkg/telemetry/registry.go new file mode 100644 index 0000000000..d38fe6b35d --- /dev/null +++ b/pkg/telemetry/registry.go @@ -0,0 +1,27 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package telemetry + +import "sync" + +type reg struct { + r sync.Map +} + +func (r *reg) Update(telemetryName, value string) { + r.r.Store(telemetryName, value) +} + +func (r *reg) metadata() (m Metadata) { + m = Metadata{} + r.r.Range(func(telemetryName, value interface{}) bool { + m[telemetryName.(string)] = value.(string) + return true + }) + return m +} + +// Registry is a telemetry holder that is mutable and can be filled from anywhere in Promscale. +var Registry reg diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 972d3da6c1..297a373c24 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -30,12 +30,8 @@ var ErrInvalidMetric = fmt.Errorf("metric not a counter or gauge") // the _timescaledb_catalog.metadata table. type Engine interface { // RegisterMetric registers a Prometheus metric with a column name. This metric is - // monitored every telemetrySync and updated in the telemetry table. + // monitored every telemetrySync and updated in the promscale_instance_information table. RegisterMetric(columnName string, gaugeOrCounterMetric ...prometheus.Metric) error - - // RegisterDynamicMetadata is a Prometheus metric that changes regularly. This is monitored - // every telemetrySync and updated in the telemetry table. - RegisterDynamicMetadata(columnName string, gauge prometheus.Metric) error Start() Stop() } @@ -50,9 +46,7 @@ type engineImpl struct { promqlEngine *promql.Engine promqlQueryable promql.Queryable - metrics sync.Map - dynamicMetadata sync.Map } func NewEngine(conn pgxconn.PgxConn, uuid [16]byte, promqlQueryable promql.Queryable) (Engine, error) { @@ -120,56 +114,19 @@ func (t *engineImpl) writeToTimescaleMetadataTable(m Metadata) { return } // Try to update via Promscale extension. - if err := t.syncWithMetadataTable(metadataUpdateWithExtension, m); err != nil { + if err := syncWithMetadataTable(t.conn, metadataUpdateWithExtension, m); err != nil { // Promscale extension not installed. Try to attempt to write directly as a rare attempt // in case we fix the _timescaledb_catalog.metadata permissions in the future. - _ = t.syncWithMetadataTable(metadataUpdateNoExtension, m) - } -} - -func (t *engineImpl) RegisterDynamicMetadata(telemetryName string, gauge prometheus.Metric) error { - if !isGauge(gauge) { - return ErrInvalidMetric + _ = syncWithMetadataTable(t.conn, metadataUpdateNoExtension, m) } - t.dynamicMetadata.Store(telemetryName, gauge) - return nil } -func (t *engineImpl) syncDynamicMetadata() error { - var ( - err error - val float64 - metadata = Metadata{} - ) - t.dynamicMetadata.Range(func(key, value interface{}) bool { - columnName := key.(string) - metric := value.(prometheus.Metric) - val, err = util.ExtractMetricValue(metric) - if err != nil { - err = fmt.Errorf("extracting metric value of stat '%s': %w", columnName, err) - return false - } - var state string - switch val { - case 0: - state = "false" - case 1: - state = "true" - default: - err = fmt.Errorf("invalid state value '%f' for stat '%s'", val, columnName) - } - metadata[columnName] = state - return true - }) - if err != nil { - return err - } - t.writeToTimescaleMetadataTable(metadata) - return nil +func (t *engineImpl) syncRegistry() { + t.writeToTimescaleMetadataTable(Registry.metadata()) } -func (t *engineImpl) syncWithMetadataTable(queryFormat string, m Metadata) error { - batch := t.conn.NewBatch() +func syncWithMetadataTable(conn pgxconn.PgxConn, queryFormat string, m Metadata) error { + batch := conn.NewBatch() for key, metadata := range m { safe := pgutf8str.Text{} if err := safe.Set(metadata); err != nil { @@ -179,7 +136,7 @@ func (t *engineImpl) syncWithMetadataTable(queryFormat string, m Metadata) error batch.Queue(query, key, safe, true) } - results, err := t.conn.SendBatch(context.Background(), batch) + results, err := conn.SendBatch(context.Background(), batch) if err != nil { return fmt.Errorf("error sending batch: %w", err) } @@ -209,9 +166,7 @@ func (t *engineImpl) Sync() error { if err := t.syncWithInfoTable(); err != nil { return fmt.Errorf("sync info table: %w", err) } - if err := t.syncDynamicMetadata(); err != nil { - return fmt.Errorf("sync dynamic metadata: %w", err) - } + t.syncRegistry() t.housekeeping() return nil } @@ -300,14 +255,6 @@ func isCounterOrGauge(metric prometheus.Metric) bool { } } -func isGauge(metric prometheus.Metric) bool { - switch metric.(type) { - case prometheus.Gauge: - return true - } - return false -} - // syncInfoTable stats with promscale_instance_information table. func (t *engineImpl) syncInfoTable(stats map[string]float64) error { pgUUID := new(pgtype.UUID)