Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add telemetry for metrics_ingestion_path & json_endpoint_used
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <[email protected]>
  • Loading branch information
Harkishen-Singh committed Oct 13, 2022
1 parent c80f69c commit 6eaf1fe
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 93 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ We use the following categories for changes:
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## Unreleased

### Added
- Telemetry for `promscale_metrics_ingestion_path` and `promscale_metrics_ingestion_json_endpoint_used` [#1695]

## [0.15.0] - 2022-10-11

### Added
Expand Down
24 changes: 24 additions & 0 deletions pkg/api/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/timescale/promscale/pkg/api/parser/protobuf"
"github.com/timescale/promscale/pkg/api/parser/text"
"github.com/timescale/promscale/pkg/prompb"
"github.com/timescale/promscale/pkg/telemetry"
)

type formatParser func(*http.Request, *prompb.WriteRequest) error
Expand Down Expand Up @@ -62,6 +63,8 @@ func (d DefaultParser) ParseRequest(r *http.Request, req *prompb.WriteRequest) e
return fmt.Errorf("parser error: %w", err)
}

updateTelemetry(mediaType)

if len(req.Timeseries) == 0 {
return nil
}
Expand All @@ -81,3 +84,24 @@ func (d DefaultParser) ParseRequest(r *http.Request, req *prompb.WriteRequest) e

return nil
}

func InitTelemetry() {
telemetry.Registry.Update("metrics_ingestion_path", "no_ingestion")
telemetry.Registry.Update("metrics_ingestion_json_endpoint_used", "0")
}

func updateTelemetry(parser string) {
switch parser {
case "application/x-protobuf":
telemetry.Registry.Update("metrics_ingestion_path", "protobuf")
case "application/json":
telemetry.Registry.Update("metrics_ingestion_path", "json")
telemetry.Registry.Update("metrics_ingestion_json_endpoint_used", "1")
case "text/plain":
telemetry.Registry.Update("metrics_ingestion_path", "text_plain")
case "application/openmetrics-text":
telemetry.Registry.Update("metrics_ingestion_path", "text_open_metrics")
default:
telemetry.Registry.Update("metrics_ingestion_path", "none")
}
}
45 changes: 7 additions & 38 deletions pkg/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,8 @@ import (
"github.com/timescale/promscale/pkg/pgclient"
"github.com/timescale/promscale/pkg/rules/adapters"
"github.com/timescale/promscale/pkg/telemetry"
"github.com/timescale/promscale/pkg/util"
)

var (
// These metrics are used to track telemetry by registering
// in telemetryEngine.RegisterDynamicMetadata()
rulesEnabled = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "rules",
Name: "enabled",
Help: "Promscale rules is enabled or not.",
},
)
alertingEnabled = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "alerting",
Name: "enabled",
Help: "Promscale alerting is enabled or not.",
},
)
)

func init() {
prometheus.MustRegister(rulesEnabled, alertingEnabled)
}

type Manager struct {
ctx context.Context
rulesManager *prom_rules.Manager
Expand Down Expand Up @@ -98,14 +72,9 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C
return manager, manager.getReloader(cfg), nil
}

func RegisterForTelemetry(t telemetry.Engine) error {
if err := t.RegisterDynamicMetadata("rules_enabled", rulesEnabled); err != nil {
return fmt.Errorf("register dynamic 'promscale_rules_enabled' metric for telemetry: %w", err)
}
if err := t.RegisterDynamicMetadata("alerting_enabled", alertingEnabled); err != nil {
return fmt.Errorf("register dynamic 'promscale_alerting_enabled' metric for telemetry: %w", err)
}
return nil
func InitTelemetry() {
telemetry.Registry.Update("rules_enabled", "0")
telemetry.Registry.Update("alerting_enabled", "0")
}

func (m *Manager) getReloader(cfg *Config) func() error {
Expand All @@ -124,17 +93,17 @@ func (m *Manager) getReloader(cfg *Config) func() error {

func (m *Manager) updateTelemetry(cfg *Config) {
if cfg.ContainsRules() {
rulesEnabled.Set(1)
telemetry.Registry.Update("rules_enabled", "1")
if cfg.ContainsAlertingConfig() {
alertingEnabled.Set(1)
telemetry.Registry.Update("alerting_enabled", "1")
} else {
log.Debug("msg", "Alerting configuration not present in the given Prometheus configuration file. Alerting will not be initialized")
alertingEnabled.Set(0)
telemetry.Registry.Update("alerting_enabled", "0")
}
return
}
log.Debug("msg", "Rules files not found. Rules and alerting configuration will not be initialized")
rulesEnabled.Set(0)
telemetry.Registry.Update("rules_enabled", "0")
}

func (m *Manager) WithPostRulesProcess(f prom_rules.RuleGroupPostProcessFunc) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"

"github.com/timescale/promscale/pkg/api"
"github.com/timescale/promscale/pkg/api/parser"
jaegerStore "github.com/timescale/promscale/pkg/jaeger/store"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgclient"
Expand Down Expand Up @@ -368,8 +369,7 @@ func initTelemetryEngine(client *pgclient.Client) (telemetry.Engine, error) {
if err := trace.RegisterTelemetryMetrics(t); err != nil {
log.Error("msg", "error registering metrics for Jaeger-ingest telemetry", "err", err.Error())
}
if err := rules.RegisterForTelemetry(t); err != nil {
log.Error("msg", "error registering metrics for rules telemetry", "err", err.Error())
}
parser.InitTelemetry()
rules.InitTelemetry()
return t, nil
}
27 changes: 27 additions & 0 deletions pkg/telemetry/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package telemetry

import "sync"

type reg struct {
r sync.Map
}

func (r *reg) Update(telemetryName, value string) {
r.r.Store(telemetryName, value)
}

func (r *reg) metadata() (m Metadata) {
m = Metadata{}
r.r.Range(func(telemetryName, value interface{}) bool {
m[telemetryName.(string)] = value.(string)
return true
})
return m
}

// Registry is a telemetry holder that is mutable and can be filled from anywhere in Promscale.
var Registry reg
61 changes: 9 additions & 52 deletions pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,8 @@ var ErrInvalidMetric = fmt.Errorf("metric not a counter or gauge")
// the _timescaledb_catalog.metadata table.
type Engine interface {
// RegisterMetric registers a Prometheus metric with a column name. This metric is
// monitored every telemetrySync and updated in the telemetry table.
// monitored every telemetrySync and updated in the promscale_instance_information table.
RegisterMetric(columnName string, gaugeOrCounterMetric ...prometheus.Metric) error

// RegisterDynamicMetadata is a Prometheus metric that changes regularly. This is monitored
// every telemetrySync and updated in the telemetry table.
RegisterDynamicMetadata(columnName string, gauge prometheus.Metric) error
Start()
Stop()
}
Expand Down Expand Up @@ -120,56 +116,19 @@ func (t *engineImpl) writeToTimescaleMetadataTable(m Metadata) {
return
}
// Try to update via Promscale extension.
if err := t.syncWithMetadataTable(metadataUpdateWithExtension, m); err != nil {
if err := syncWithMetadataTable(t.conn, metadataUpdateWithExtension, m); err != nil {
// Promscale extension not installed. Try to attempt to write directly as a rare attempt
// in case we fix the _timescaledb_catalog.metadata permissions in the future.
_ = t.syncWithMetadataTable(metadataUpdateNoExtension, m)
}
}

func (t *engineImpl) RegisterDynamicMetadata(telemetryName string, gauge prometheus.Metric) error {
if !isGauge(gauge) {
return ErrInvalidMetric
_ = syncWithMetadataTable(t.conn, metadataUpdateNoExtension, m)
}
t.dynamicMetadata.Store(telemetryName, gauge)
return nil
}

func (t *engineImpl) syncDynamicMetadata() error {
var (
err error
val float64
metadata = Metadata{}
)
t.dynamicMetadata.Range(func(key, value interface{}) bool {
columnName := key.(string)
metric := value.(prometheus.Metric)
val, err = util.ExtractMetricValue(metric)
if err != nil {
err = fmt.Errorf("extracting metric value of stat '%s': %w", columnName, err)
return false
}
var state string
switch val {
case 0:
state = "false"
case 1:
state = "true"
default:
err = fmt.Errorf("invalid state value '%f' for stat '%s'", val, columnName)
}
metadata[columnName] = state
return true
})
if err != nil {
return err
}
t.writeToTimescaleMetadataTable(metadata)
return nil
func (t *engineImpl) syncRegistry() {
t.writeToTimescaleMetadataTable(Registry.metadata())
}

func (t *engineImpl) syncWithMetadataTable(queryFormat string, m Metadata) error {
batch := t.conn.NewBatch()
func syncWithMetadataTable(conn pgxconn.PgxConn, queryFormat string, m Metadata) error {
batch := conn.NewBatch()
for key, metadata := range m {
safe := pgutf8str.Text{}
if err := safe.Set(metadata); err != nil {
Expand All @@ -179,7 +138,7 @@ func (t *engineImpl) syncWithMetadataTable(queryFormat string, m Metadata) error
batch.Queue(query, key, safe, true)
}

results, err := t.conn.SendBatch(context.Background(), batch)
results, err := conn.SendBatch(context.Background(), batch)
if err != nil {
return fmt.Errorf("error sending batch: %w", err)
}
Expand Down Expand Up @@ -209,9 +168,7 @@ func (t *engineImpl) Sync() error {
if err := t.syncWithInfoTable(); err != nil {
return fmt.Errorf("sync info table: %w", err)
}
if err := t.syncDynamicMetadata(); err != nil {
return fmt.Errorf("sync dynamic metadata: %w", err)
}
t.syncRegistry()
t.housekeeping()
return nil
}
Expand Down

0 comments on commit 6eaf1fe

Please sign in to comment.