Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Introuducing a new setting allowing to configure custom polling
Browse files Browse the repository at this point in the history
intervals for individual db metric queries. The actual polling still
happens on `evalInterval`, but this setting can by used to make heavier
queries run less often.
  • Loading branch information
sumerman committed Dec 14, 2022
1 parent 94e68c9 commit 92ad453
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 42 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ We use the following categories for changes:
- Reduced the verbosity of the logs emitted by the vacuum engine [#1715]
- The vacuum engine now throttles the number of workers used based on the oldest txid from
the chunks needing freezing [#1761]
- In order to reduce the overall load on the system, some internal database
metrics won't be collected as often as they used to. None of the affected
metrics is expected to change faster than its new collection interval [#1793]

### Fixed

Expand Down
8 changes: 7 additions & 1 deletion pkg/pgmodel/metrics/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,17 @@ func (e *metricsEngineImpl) Run() error {
func (e *metricsEngineImpl) Update() error {
batch := e.conn.NewBatch()
batchMetrics := []metricQueryWrap{}
for _, m := range e.metrics {
for i, m := range e.metrics {
if m.isHealthCheck {
healthCheck(e.conn, m)
continue
}
now := time.Now()
if m.customPollConfig.enabled && now.Sub(m.customPollConfig.lastUpdate) < m.customPollConfig.interval {
continue
} else if m.customPollConfig.enabled {
e.metrics[i].customPollConfig.lastUpdate = now
}
batch.Queue(m.query)
batchMetrics = append(batchMetrics, m)
}
Expand Down
49 changes: 49 additions & 0 deletions pkg/pgmodel/metrics/database/database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package database

import (
"context"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/timescale/promscale/pkg/tests/testsupport"
"github.com/timescale/promscale/pkg/util"
)

func TestCustomPollConfig(t *testing.T) {
engine := &metricsEngineImpl{
conn: testsupport.MockPgxConn{},
ctx: context.Background(),
metrics: []metricQueryWrap{
{
metrics: counters(
prometheus.CounterOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Name: "test",
Help: "test",
},
),
customPollConfig: updateAtMostEvery(1 * time.Second),
query: "SELECT 1",
},
},
}

testStart := time.Now()
testMetricPollConfig := &engine.metrics[0].customPollConfig

if err := engine.Update(); err != nil {
t.Fail()
}
require.False(t, testMetricPollConfig.lastUpdate.After(testStart))

require.Eventually(t, func() bool {
if err := engine.Update(); err != nil {
t.Fail()
}
return testMetricPollConfig.lastUpdate.After(testStart)
}, 5*time.Second, 1*time.Second)

}
24 changes: 24 additions & 0 deletions pkg/pgmodel/metrics/database/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package database
import (
"fmt"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/timescale/promscale/pkg/util"
Expand Down Expand Up @@ -39,13 +40,31 @@ func init() {
prometheus.MustRegister(dbHealthErrors, upMetric, dbNetworkLatency)
}

type metricQueryPollConfig struct {
enabled bool
interval time.Duration
lastUpdate time.Time
}

func updateAtMostEvery(interval time.Duration) metricQueryPollConfig {
return metricQueryPollConfig{
enabled: true,
interval: interval,
lastUpdate: time.Now(),
}
}

type metricQueryWrap struct {
// Multiple metrics could be retrieved via single query
// In that case they should appear in the same order as
// corresponding the columns in the query's result.
metrics []prometheus.Collector
query string
isHealthCheck bool // if set only metrics[0] is used
// Allows to configure custom polling intervals for individual queries.
// The actual polling still happens on `evalInterval`, but this setting
// can by used to make heavier queries run less often.
customPollConfig metricQueryPollConfig
}

func gauges(opts ...prometheus.GaugeOpts) []prometheus.Collector {
Expand Down Expand Up @@ -112,6 +131,7 @@ var metrics = []metricQueryWrap{
Help: "The number of metrics chunks soon to be removed by maintenance jobs.",
},
),
customPollConfig: updateAtMostEvery(6 * time.Minute),
query: `WITH conf AS MATERIALIZED (SELECT _prom_catalog.get_default_retention_period() AS def_retention)
SELECT count(*)::BIGINT
FROM _timescaledb_catalog.dimension_slice ds
Expand All @@ -136,6 +156,7 @@ var metrics = []metricQueryWrap{
Help: "The number of metrics chunks not-compressed due to a set delay.",
},
),
customPollConfig: updateAtMostEvery(9 * time.Minute),
query: `WITH chunk_candidates AS MATERIALIZED (
SELECT chcons.dimension_slice_id, h.table_name, h.schema_name
FROM _timescaledb_catalog.chunk_constraint chcons
Expand Down Expand Up @@ -163,6 +184,7 @@ var metrics = []metricQueryWrap{
Help: "The number of traces chunks soon to be removed by maintenance jobs.",
},
),
customPollConfig: updateAtMostEvery(6 * time.Minute),
query: `WITH conf AS MATERIALIZED (SELECT coalesce(ps_trace.get_trace_retention_period(), interval '0 day') AS def_retention)
SELECT count(*)::BIGINT
FROM _timescaledb_catalog.dimension_slice ds
Expand All @@ -181,6 +203,7 @@ var metrics = []metricQueryWrap{
Help: "The number of traces chunks soon to be compressed by maintenance jobs.",
},
),
customPollConfig: updateAtMostEvery(9 * time.Minute),
query: `WITH chunk_candidates AS MATERIALIZED (
SELECT chcons.dimension_slice_id
FROM _timescaledb_catalog.chunk_constraint chcons
Expand Down Expand Up @@ -488,6 +511,7 @@ var metrics = []metricQueryWrap{
Help: "The total number of completed traces compression jobs.",
},
)...),
customPollConfig: updateAtMostEvery(6 * time.Minute),
query: `WITH maintenance_jobs_stats AS (
SELECT
coalesce(config ->> 'signal', 'traces') AS signal_type,
Expand Down
37 changes: 2 additions & 35 deletions pkg/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,11 @@
package telemetry

import (
"context"
"testing"

"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/tests/testsupport"
)

func TestRegisterMetric(t *testing.T) {
Expand All @@ -41,35 +35,8 @@ func TestRegisterMetric(t *testing.T) {

func TestEngineStop(t *testing.T) {
engine := &engineImpl{
conn: mockPgxConn{},
conn: testsupport.MockPgxConn{},
}
engine.Start()
engine.Stop()
}

type mockRow struct{}

func (mockRow) Scan(dest ...interface{}) error { return nil }

type mockPgxConn struct{}

func (mockPgxConn) Close() {}
func (mockPgxConn) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
return pgconn.CommandTag{}, nil
}
func (mockPgxConn) Query(ctx context.Context, sql string, args ...interface{}) (pgxconn.PgxRows, error) {
return nil, nil
}
func (mockPgxConn) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
return mockRow{}
}
func (mockPgxConn) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
return 0, nil
}
func (mockPgxConn) CopyFromRows(rows [][]interface{}) pgx.CopyFromSource { return nil }
func (mockPgxConn) NewBatch() pgxconn.PgxBatch { return nil }
func (mockPgxConn) SendBatch(ctx context.Context, b pgxconn.PgxBatch) (pgx.BatchResults, error) {
return nil, nil
}
func (mockPgxConn) Acquire(ctx context.Context) (*pgxpool.Conn, error) { return nil, nil }
func (mockPgxConn) BeginTx(ctx context.Context) (pgx.Tx, error) { return nil, nil }
14 changes: 8 additions & 6 deletions pkg/tests/end_to_end_tests/database_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,16 @@ func TestDatabaseMetrics(t *testing.T) {
chunksCount = getMetricValue(t, "chunks_count")
require.Equal(t, float64(3), chunksCount)
chunksCompressedCount = getMetricValue(t, "chunks_compressed_count")
// we expect these metrics to be delayed by customPollConf
require.Equal(t, float64(0), chunksCompressedCount)
chunksMUncompressedCount = getMetricValue(t, "chunks_metrics_uncompressed_count")
require.Equal(t, float64(0), chunksMUncompressedCount)
chunksMExpiredCount = getMetricValue(t, "chunks_metrics_expired_count")
require.Equal(t, float64(0), chunksMExpiredCount)
chunksTUncompressedCount = getMetricValue(t, "chunks_traces_uncompressed_count")
require.Equal(t, float64(3), chunksTUncompressedCount)
require.Equal(t, float64(0), chunksTUncompressedCount)
chunksTExpiredCount = getMetricValue(t, "chunks_traces_expired_count")
require.Equal(t, float64(3), chunksTExpiredCount)
require.Equal(t, float64(0), chunksTExpiredCount)
})
}

Expand Down Expand Up @@ -121,9 +122,10 @@ func TestDatabaseMetricsAfterCompression(t *testing.T) {
chunksCompressedCount := getMetricValue(t, "chunks_compressed_count")
require.Equal(t, float64(0), chunksCompressedCount)
chunksMUncompressedCount := getMetricValue(t, "chunks_metrics_uncompressed_count")
require.Equal(t, float64(2), chunksMUncompressedCount)
// we expect these metrics to be delayed by customPollConf
require.Equal(t, float64(0), chunksMUncompressedCount)
chunksMExpiredCount := getMetricValue(t, "chunks_metrics_expired_count")
require.Equal(t, float64(2), chunksMExpiredCount)
require.Equal(t, float64(0), chunksMExpiredCount)
chunksTUncompressedCount := getMetricValue(t, "chunks_traces_uncompressed_count")
require.Equal(t, float64(0), chunksTUncompressedCount)
chunksTExpiredCount := getMetricValue(t, "chunks_traces_expired_count")
Expand All @@ -139,9 +141,9 @@ func TestDatabaseMetricsAfterCompression(t *testing.T) {
chunksCompressedCount = getMetricValue(t, "chunks_compressed_count")
require.Equal(t, float64(1), chunksCompressedCount)
chunksMUncompressedCount = getMetricValue(t, "chunks_metrics_uncompressed_count")
require.Equal(t, float64(1), chunksMUncompressedCount)
require.Equal(t, float64(0), chunksMUncompressedCount)
chunksMExpiredCount = getMetricValue(t, "chunks_metrics_expired_count")
require.Equal(t, float64(2), chunksMExpiredCount)
require.Equal(t, float64(0), chunksMExpiredCount)
chunksTUncompressedCount = getMetricValue(t, "chunks_traces_uncompressed_count")
require.Equal(t, float64(0), chunksTUncompressedCount)
chunksTExpiredCount = getMetricValue(t, "chunks_traces_expired_count")
Expand Down
62 changes: 62 additions & 0 deletions pkg/tests/testsupport/mock_pgx_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package testsupport

import (
"context"

"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"

"github.com/timescale/promscale/pkg/pgxconn"
)

type MockRow struct{}

func (MockRow) Scan(dest ...interface{}) error { return nil }

type MockBatchResults struct{}

func (MockBatchResults) Exec() (pgconn.CommandTag, error) {
return nil, nil
}

func (MockBatchResults) Query() (pgx.Rows, error) {
return nil, nil
}
func (MockBatchResults) QueryRow() pgx.Row {
return MockRow{}
}
func (MockBatchResults) QueryFunc(scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) {
return nil, nil
}
func (MockBatchResults) Close() error { return nil }

type MockBatch struct{}

func (MockBatch) Queue(query string, arguments ...interface{}) {}
func (MockBatch) Len() int {
return 0
}

type MockPgxConn struct{}

func (MockPgxConn) Close() {}
func (MockPgxConn) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
return pgconn.CommandTag{}, nil
}
func (MockPgxConn) Query(ctx context.Context, sql string, args ...interface{}) (pgxconn.PgxRows, error) {
return nil, nil
}
func (MockPgxConn) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
return MockRow{}
}
func (MockPgxConn) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
return 0, nil
}
func (MockPgxConn) CopyFromRows(rows [][]interface{}) pgx.CopyFromSource { return nil }
func (MockPgxConn) NewBatch() pgxconn.PgxBatch { return MockBatch{} }
func (MockPgxConn) SendBatch(ctx context.Context, b pgxconn.PgxBatch) (pgx.BatchResults, error) {
return MockBatchResults{}, nil
}
func (MockPgxConn) Acquire(ctx context.Context) (*pgxpool.Conn, error) { return nil, nil }
func (MockPgxConn) BeginTx(ctx context.Context) (pgx.Tx, error) { return nil, nil }

0 comments on commit 92ad453

Please sign in to comment.