Skip to content

Commit f88829b

Browse files
committed
Accept log_destination with instrumentation enabled
Existing deployments may already be using one of these structured formats. Postgres can log in multiple formats simultaneously, allowing a custom solution to run alongside OpenTelemetry collection. Regardless of the number of formats are emitted by Postgres, the collector is now configured to scrape the preferred format for a particular version of Postgres. When blank, Postgres will emit *no* logs after startup. This does not introduce any new validation in that regard. Issue: PGO-2423
1 parent 2e046c7 commit f88829b

File tree

4 files changed

+128
-16
lines changed

4 files changed

+128
-16
lines changed

internal/collector/postgres.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,19 @@ func PostgreSQLParameters(ctx context.Context,
5454
// https://www.postgresql.org/docs/current/runtime-config-logging.html
5555
outParameters.Mandatory.Add("logging_collector", "on")
5656

57-
// PostgreSQL v8.3 adds support for CSV logging, and
58-
// PostgreSQL v15 adds support for JSON logging.
59-
// The latter is preferred because newlines are escaped as "\n", U+005C + U+006E.
60-
if version >= 15 {
61-
outParameters.Mandatory.Add("log_destination", "jsonlog")
62-
} else {
57+
// Enable structured logging. This setting is combined with any specified on the cluster.
58+
//
59+
// The JSON format of PostgreSQL v15 delimits messages with newline U+000A
60+
// and escapes newlines in message content as "\n", U+005C + U+006E.
61+
// JSON keys take up space on disk, but newline-delimited is easy to parse.
62+
outParameters.Mandatory.Add("log_destination", "jsonlog")
63+
64+
// The only structured format prior to PostgreSQL v15 is the CSV format, added in PostgreSQL v8.3.
65+
// This format does *not* escape newlines, so the Collector must search for the beginning of each message.
66+
// Forcing the UTC timezone ensures a consistent beginning to each message.
67+
if version < 15 {
6368
outParameters.Mandatory.Add("log_destination", "csvlog")
69+
outParameters.Mandatory.Add("log_timezone", "UTC")
6470
}
6571

6672
// Log in a timezone the OpenTelemetry Collector understands.
@@ -211,7 +217,7 @@ func EnablePostgresLogging(
211217

212218
// https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/database#readme
213219
{"action": "insert", "key": "db.system", "value": "postgresql"},
214-
{"action": "insert", "key": "db.version", "value": fmt.Sprint(inCluster.Spec.PostgresVersion)},
220+
{"action": "insert", "key": "db.version", "value": fmt.Sprint(version)},
215221
},
216222
}
217223

@@ -227,13 +233,15 @@ func EnablePostgresLogging(
227233
exporters = slices.Clone(spec.Exporters)
228234
}
229235

236+
// JSON logs are preferable since PostgreSQL v15. These are enabled in [PostgreSQLParameters].
237+
receivers := []ComponentID{"filelog/postgres_jsonlog"}
238+
if version < 15 {
239+
receivers = []ComponentID{"filelog/postgres_csvlog"}
240+
}
241+
230242
outConfig.Pipelines["logs/postgres"] = Pipeline{
231243
Extensions: []ComponentID{"file_storage/postgres_logs"},
232-
// TODO(logs): Choose only one receiver, maybe?
233-
Receivers: []ComponentID{
234-
"filelog/postgres_csvlog",
235-
"filelog/postgres_jsonlog",
236-
},
244+
Receivers: receivers,
237245
Processors: []ComponentID{
238246
"resource/postgres",
239247
"transform/postgres_logs",

internal/collector/postgres_test.go

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,6 @@ service:
276276
- batch/logs
277277
- groupbyattrs/compact
278278
receivers:
279-
- filelog/postgres_csvlog
280279
- filelog/postgres_jsonlog
281280
`)
282281
})
@@ -542,7 +541,6 @@ service:
542541
- batch/logs
543542
- groupbyattrs/compact
544543
receivers:
545-
- filelog/postgres_csvlog
546544
- filelog/postgres_jsonlog
547545
`)
548546
})
@@ -678,6 +676,75 @@ service:
678676
- sqlquery/5s
679677
- sqlquery/300s
680678
`)
679+
})
680+
}
681+
682+
func TestPostgresParameters(t *testing.T) {
683+
t.Run("NoInstrumentation", func(t *testing.T) {
684+
cluster := new(v1beta1.PostgresCluster)
685+
cluster.Spec.PostgresVersion = 99
686+
687+
before := postgres.NewParameters()
688+
params := postgres.NewParameters()
689+
PostgreSQLParameters(t.Context(), cluster, &params)
690+
691+
assert.DeepEqual(t, before, params)
692+
})
693+
694+
t.Run("Specified", func(t *testing.T) {
695+
cluster := new(v1beta1.PostgresCluster)
696+
cluster.Spec.PostgresVersion = 99
697+
require.UnmarshalInto(t, &cluster.Spec.Instrumentation, `{}`)
698+
699+
// Feature disabled
700+
{
701+
before := postgres.NewParameters()
702+
params := postgres.NewParameters()
703+
PostgreSQLParameters(t.Context(), cluster, &params)
704+
705+
assert.DeepEqual(t, before, params)
706+
}
707+
708+
// Feature enabled
709+
gate := feature.NewGate()
710+
assert.NilError(t, gate.SetFromMap(map[string]bool{
711+
feature.OpenTelemetryLogs: true,
712+
}))
713+
ctx := feature.NewContext(t.Context(), gate)
714+
715+
params := postgres.NewParameters()
716+
PostgreSQLParameters(ctx, cluster, &params)
717+
718+
assert.Equal(t, params.Mandatory.Value("log_destination"), "jsonlog")
719+
assert.Assert(t, params.Mandatory.Value("log_filename") != "")
720+
assert.Assert(t, params.Mandatory.Value("log_rotation_age") != "")
721+
assert.Assert(t, params.Mandatory.Value("log_rotation_size") != "")
722+
assert.Equal(t, params.Mandatory.Value("log_timezone"), "UTC")
723+
assert.Equal(t, params.Mandatory.Value("log_truncate_on_rotation"), "on")
724+
assert.Equal(t, params.Mandatory.Value("logging_collector"), "on")
725+
})
726+
727+
t.Run("OldPostgres", func(t *testing.T) {
728+
cluster := new(v1beta1.PostgresCluster)
729+
cluster.Spec.PostgresVersion = 10
730+
require.UnmarshalInto(t, &cluster.Spec.Instrumentation, `{}`)
731+
732+
// Feature enabled
733+
gate := feature.NewGate()
734+
assert.NilError(t, gate.SetFromMap(map[string]bool{
735+
feature.OpenTelemetryLogs: true,
736+
}))
737+
ctx := feature.NewContext(t.Context(), gate)
738+
739+
params := postgres.NewParameters()
740+
PostgreSQLParameters(ctx, cluster, &params)
681741

742+
assert.Equal(t, params.Mandatory.Value("log_destination"), "csvlog")
743+
assert.Assert(t, params.Mandatory.Value("log_filename") != "")
744+
assert.Assert(t, params.Mandatory.Value("log_rotation_age") != "")
745+
assert.Assert(t, params.Mandatory.Value("log_rotation_size") != "")
746+
assert.Equal(t, params.Mandatory.Value("log_timezone"), "UTC")
747+
assert.Equal(t, params.Mandatory.Value("log_truncate_on_rotation"), "on")
748+
assert.Equal(t, params.Mandatory.Value("logging_collector"), "on")
682749
})
683750
}

internal/controller/postgrescluster/postgres.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,15 +153,19 @@ func (r *Reconciler) generatePostgresParameters(
153153

154154
// Overwrite the above with mandatory values.
155155
if builtin.Mandatory != nil {
156-
// This parameter is a comma-separated list. Rather than overwrite the
156+
// These parameters are comma-separated lists. Rather than overwrite the
157157
// user-defined value, we want to combine it with the mandatory one.
158+
destination := result.Value("log_destination")
158159
preload := result.Value("shared_preload_libraries")
159160

160161
for k, v := range builtin.Mandatory.AsMap() {
161162
// Load mandatory libraries ahead of user-defined libraries.
162163
if k == "shared_preload_libraries" && len(v) > 0 && len(preload) > 0 {
163164
v = v + "," + preload
164165
}
166+
if k == "log_destination" && len(v) > 0 && len(destination) > 0 {
167+
v = v + "," + destination
168+
}
165169

166170
result.Add(k, v)
167171
}

internal/controller/postgrescluster/postgres_test.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func TestGeneratePostgresHBAs(t *testing.T) {
162162
}
163163

164164
func TestGeneratePostgresParameters(t *testing.T) {
165-
ctx := context.Background()
165+
ctx := t.Context()
166166
reconciler := &Reconciler{}
167167

168168
builtin := reconciler.generatePostgresParameters(ctx, v1beta1.NewPostgresCluster(), false)
@@ -244,6 +244,39 @@ func TestGeneratePostgresParameters(t *testing.T) {
244244
assert.Equal(t, result.Value("jit"), "on") // Config
245245
})
246246

247+
t.Run("log_destination", func(t *testing.T) {
248+
t.Run("passthrough without instrumentation", func(t *testing.T) {
249+
cluster := v1beta1.NewPostgresCluster()
250+
require.UnmarshalInto(t, &cluster.Spec.Config, `{
251+
parameters: {
252+
log_destination: stderr
253+
},
254+
}`)
255+
256+
result := reconciler.generatePostgresParameters(ctx, cluster, false)
257+
assert.Equal(t, result.Value("log_destination"), "stderr")
258+
})
259+
260+
t.Run("combine with intrumentation", func(t *testing.T) {
261+
gate := feature.NewGate()
262+
assert.NilError(t, gate.SetFromMap(map[string]bool{
263+
feature.OpenTelemetryLogs: true,
264+
}))
265+
ctx := feature.NewContext(t.Context(), gate)
266+
267+
cluster := v1beta1.NewPostgresCluster()
268+
require.UnmarshalInto(t, &cluster.Spec.Instrumentation, `{}`)
269+
require.UnmarshalInto(t, &cluster.Spec.Config, `{
270+
parameters: {
271+
log_destination: stderr
272+
},
273+
}`)
274+
275+
result := reconciler.generatePostgresParameters(ctx, cluster, false)
276+
assert.Equal(t, result.Value("log_destination"), "csvlog,stderr")
277+
})
278+
})
279+
247280
t.Run("shared_preload_libraries", func(t *testing.T) {
248281
t.Run("NumericIncluded", func(t *testing.T) {
249282
cluster := v1beta1.NewPostgresCluster()

0 commit comments

Comments
 (0)