Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Upgrade to pgx v5
Browse files Browse the repository at this point in the history
Version 5 of pgx introduced several breaking changes that affected our
code base. You can read more about them here:

https://github.com/jackc/pgx/blob/29ad306e47c491a0ecc52d502241539aedd636bd/CHANGELOG.md#codec-and-value-split

In summary:

- NULL Representation

> Previously, types had a Status field that could be Undefined, Null, or
> Present. This has been changed to a Valid bool field to harmonize with
> how database/sql represents NULL and to make the zero value useable.

- Codec and Value Split

> Previously, the type system combined decoding and encoding values with
> the value types...
> This concepts have been separated. A Codec only has responsibility for
> encoding and decoding values. Value types are generally defined by
> implementing an interface that a particular Codec understands (e.g.
> PointScanner and PointValuer for the PostgreSQL point type).

- Array Types

> All array types are now handled by ArrayCodec instead of using code
> generation for each new array type...

- Other Changes

> JSON and JSONB types removed. Use []byte or string directly.

There are some changes that don't seem to appear in the CHANGELOG:

- An error is returned when scanning NULL into a variable that can't
  handle nil values. For example, scanning NULL into an int64.

- Custom types must be register into the connection's `typeMap
  *pgtype.Map`, otherwise, an error will be returned when the
  encoding/decoding plan is not found for the custom type OID.

- Requires go 1.18 since it uses generics for some features, like
  arrays.

To deal with the custom types, we added the register logic in the
`afterConnect` method of the pool. Every time a connection is created a
new pgtype.Map is generated with all the default PostgreSQL, meaning
that whenever a connection is created, we need to register our custom
types with it.
  • Loading branch information
alejandrodnm committed Jan 11, 2023
1 parent f795fe5 commit ca19e2f
Show file tree
Hide file tree
Showing 115 changed files with 1,082 additions and 1,112 deletions.
23 changes: 9 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/timescale/promscale

go 1.18
go 1.19

require (
github.com/NYTimes/gziphandler v1.1.1
Expand All @@ -19,12 +19,10 @@ require (
github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/go-hclog v1.3.1
github.com/jackc/pgconn v1.13.0
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pgproto3/v2 v2.3.1
github.com/jackc/pgtype v1.12.0
github.com/jackc/pgx/v4 v4.17.0
github.com/jackc/pgx/v5 v5.2.0
github.com/jaegertracing/jaeger v1.38.2-0.20221006002917-5bf8a28fe06d
github.com/mitchellh/mapstructure v1.5.0
github.com/oklog/run v1.1.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.61.0
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
Expand All @@ -37,6 +35,7 @@ require (
github.com/prometheus/prometheus v0.39.2-0.20221021121301-51a44e6657c3
github.com/sergi/go-diff v1.2.0
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546
github.com/spf13/viper v1.13.0
github.com/spyzhov/ajson v0.7.1
github.com/stretchr/testify v1.8.0
github.com/testcontainers/testcontainers-go v0.13.0
Expand All @@ -53,7 +52,7 @@ require (
go.uber.org/atomic v1.10.0
go.uber.org/automaxprocs v1.5.1
go.uber.org/goleak v1.2.0
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e
golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8
golang.org/x/time v0.0.0-20220920022843-2ce7c2934d45
google.golang.org/grpc v1.49.0
Expand Down Expand Up @@ -117,11 +116,9 @@ require (
github.com/hashicorp/memberlist v0.3.1 // indirect
github.com/hashicorp/yamux v0.0.0-20190923154419-df201c70410d // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/puddle v1.2.1 // indirect
github.com/jackc/puddle/v2 v2.1.2 // indirect
github.com/jessevdk/go-flags v1.5.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand All @@ -137,7 +134,6 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.50 // indirect
github.com/mitchellh/go-testing-interface v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/sys/mount v0.2.0 // indirect
github.com/moby/sys/mountinfo v0.5.0 // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
Expand Down Expand Up @@ -170,7 +166,6 @@ require (
github.com/spf13/cobra v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.13.0 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
Expand All @@ -182,13 +177,13 @@ require (
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220926192436-02166a98028e // indirect
golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1 // indirect
golang.org/x/sync v0.0.0-20220907140024-f12130a52804 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/text v0.3.8 // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220920201722-2b89144ce006 // indirect
Expand Down
103 changes: 10 additions & 93 deletions go.sum

Large diffs are not rendered by default.

17 changes: 10 additions & 7 deletions pkg/dataset/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"fmt"
"time"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5"
"github.com/timescale/promscale/pkg/log"
"gopkg.in/yaml.v2"
)
Expand Down Expand Up @@ -72,12 +72,15 @@ func (c *Config) Apply(conn *pgx.Conn) error {
log.Info("msg", fmt.Sprintf("Setting trace dataset default retention period to %s", c.Traces.RetentionPeriod))

queries := map[string]interface{}{
setDefaultMetricChunkIntervalSQL: time.Duration(c.Metrics.ChunkInterval),
setDefaultMetricCompressionSQL: c.Metrics.Compression,
setDefaultMetricHAReleaseRefreshSQL: time.Duration(c.Metrics.HALeaseRefresh),
setDefaultMetricHAReleaseTimeoutSQL: time.Duration(c.Metrics.HALeaseTimeout),
setDefaultMetricRetentionPeriodSQL: time.Duration(c.Metrics.RetentionPeriod),
setDefaultTraceRetentionPeriodSQL: time.Duration(c.Traces.RetentionPeriod),
setDefaultMetricChunkIntervalSQL: time.Duration(c.Metrics.ChunkInterval),
setDefaultMetricCompressionSQL: c.Metrics.Compression,
setDefaultMetricRetentionPeriodSQL: time.Duration(c.Metrics.RetentionPeriod),
setDefaultTraceRetentionPeriodSQL: time.Duration(c.Traces.RetentionPeriod),
// These need to be sent as string because the SQL does `$1::text` making
// PGX require a string, []byte or a TextValuer.
// https://github.com/jackc/pgx/blob/74f9b9f0a483f95513c621364f2c3912181ee360/pgtype/text.go#L92-L106
setDefaultMetricHAReleaseRefreshSQL: time.Duration(c.Metrics.HALeaseRefresh).String(),
setDefaultMetricHAReleaseTimeoutSQL: time.Duration(c.Metrics.HALeaseTimeout).String(),
}

for sql, param := range queries {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ha/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"fmt"
"time"

"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/timescale/promscale/pkg/pgxconn"
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/testhelpers/containers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (

constants "github.com/timescale/promscale/pkg/tests"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

var (
Expand Down
36 changes: 26 additions & 10 deletions pkg/internal/testhelpers/postgres_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
"time"

"github.com/blang/semver/v4"
"github.com/timescale/promscale/pkg/pgmodel/model"

"github.com/docker/go-connections/nat"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
_ "github.com/jackc/pgx/v4/stdlib"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)
Expand Down Expand Up @@ -174,14 +176,24 @@ func MakePromUserPromAdmin(t testing.TB, dbName string) {
func PgxPoolWithRole(t testing.TB, dbName string, role string) *pgxpool.Pool {
user := getRoleUser(role)
setupRole(t, dbName, role)
pool, err := pgxpool.Connect(context.Background(), PgConnectURLUser(dbName, user))
assert.NoError(t, err)
pool, err := PgxPoolWithRegisteredTypes(PgConnectURLUser(dbName, user))
require.NoError(t, err)
return pool
}

func PgxPoolWithRegisteredTypes(connectURL string) (*pgxpool.Pool, error) {
config, err := pgxpool.ParseConfig(connectURL)
if err != nil {
return nil, err
}
config.AfterConnect = model.RegisterCustomPgTypes
return pgxpool.NewWithConfig(context.Background(), config)
}

// WithDB establishes a database for testing and calls the callback
func WithDB(t testing.TB, DBName string, superuser SuperuserStatus, deferNode2Setup bool, extensionState TestOptions, f func(db *pgxpool.Pool, t testing.TB, connectString string)) {
db, err := DbSetup(DBName, superuser, deferNode2Setup, extensionState)
defer model.UnRegisterCustomPgTypes(db.Config().ConnConfig.Config)
if err != nil {
t.Fatal(err)
return
Expand All @@ -199,11 +211,14 @@ func GetReadOnlyConnection(t testing.TB, DBName string) *pgxpool.Pool {
assert.NoError(t, err)

pgConfig.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
_, err := conn.Exec(context.Background(), "SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY")
return err
_, err := conn.Exec(ctx, "SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY")
if err != nil {
return err
}
return model.RegisterCustomPgTypes(ctx, conn)
}

dbPool, err := pgxpool.ConnectConfig(context.Background(), pgConfig)
dbPool, err := pgxpool.NewWithConfig(context.Background(), pgConfig)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -252,6 +267,7 @@ func DbSetup(DBName string, superuser SuperuserStatus, deferNode2Setup bool, ext
if err != nil {
return nil, err
}
model.UnRegisterCustomPgTypes(ourDb.Config().Config)

if extensionState.UsesMultinode() {
// Multinode requires the administrator to set up data nodes, so in
Expand Down Expand Up @@ -287,7 +303,7 @@ func DbSetup(DBName string, superuser SuperuserStatus, deferNode2Setup bool, ext
return nil, err
}

dbPool, err := pgxpool.Connect(context.Background(), PgConnectURL(DBName, superuser))
dbPool, err := pgxpool.New(context.Background(), PgConnectURL(DBName, superuser))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/jaeger/store/find_trace_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"context"
"fmt"

"github.com/jackc/pgtype"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/timescale/promscale/pkg/pgxconn"
Expand Down
29 changes: 12 additions & 17 deletions pkg/jaeger/store/get_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package store

import (
"context"
"errors"
"fmt"

"github.com/jackc/pgtype"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/timescale/promscale/pkg/pgxconn"
)
Expand All @@ -34,8 +35,8 @@ WHERE

func getOperations(ctx context.Context, conn pgxconn.PgxConn, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) {
var (
pgOperationNames, pgSpanKinds pgtype.TextArray
operationsResp []spanstore.Operation
operationNames, spanKinds []string
operationsResp []spanstore.Operation
)

args := []interface{}{query.ServiceName}
Expand All @@ -51,19 +52,10 @@ func getOperations(ctx context.Context, conn pgxconn.PgxConn, query spanstore.Op

sqlQuery := fmt.Sprintf(getOperationsSQLFormat, kindQual)

if err := conn.QueryRow(ctx, sqlQuery, args...).Scan(&pgOperationNames, &pgSpanKinds); err != nil {
if err := conn.QueryRow(ctx, sqlQuery, args...).Scan(&operationNames, &spanKinds); err != nil {
return operationsResp, fmt.Errorf("fetching operations: %w", err)
}

operationNames, err := textArraytoStringArr(pgOperationNames)
if err != nil {
return operationsResp, fmt.Errorf("operation names: text-array-to-string-array: %w", err)
}
spanKinds, err := textArraytoStringArr(pgSpanKinds)
if err != nil {
return operationsResp, fmt.Errorf("span kinds: text-array-to-string-array: %w", err)
}

if len(operationNames) != len(spanKinds) {
return operationsResp, fmt.Errorf("entries not same in operation-name and span-kind")
}
Expand All @@ -76,10 +68,13 @@ func getOperations(ctx context.Context, conn pgxconn.PgxConn, query spanstore.Op
return operationsResp, nil
}

func textArraytoStringArr(s pgtype.TextArray) ([]string, error) {
var d []string
if err := s.AssignTo(&d); err != nil {
return []string{}, fmt.Errorf("assign to: %w", err)
func textArraytoStringArr(s pgtype.FlatArray[pgtype.Text]) ([]string, error) {
d := make([]string, len(s))
for i, v := range s {
if !v.Valid {
return nil, errors.New("can't assign NULL to string")
}
d[i] = v.String
}
return d, nil
}
14 changes: 5 additions & 9 deletions pkg/jaeger/store/get_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,27 @@ import (
"context"
"fmt"

"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/pkg/errors"
"github.com/timescale/promscale/pkg/pgxconn"
)

const getServicesSQL = `
SELECT
array_agg(value#>>'{}' ORDER BY value)
array_agg(value#>>'{}' ORDER BY value)
FROM
_ps_trace.tag
WHERE
key='service.name' and value IS NOT NULL`

func getServices(ctx context.Context, conn pgxconn.PgxConn) ([]string, error) {
var pgServices pgtype.TextArray
var pgServices pgtype.FlatArray[pgtype.Text]
if err := conn.QueryRow(ctx, getServicesSQL).Scan(&pgServices); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return []string{}, nil
}
return nil, fmt.Errorf("fetching services: %w", err)
}
s, err := textArraytoStringArr(pgServices)
if err != nil {
return nil, fmt.Errorf("services: converting text-array-to-string-arr: %w", err)
}
return s, nil
return textArraytoStringArr(pgServices)
}
7 changes: 2 additions & 5 deletions pkg/jaeger/store/trace_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"
"time"

"github.com/jackc/pgtype"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand Down Expand Up @@ -192,10 +192,7 @@ func getUUIDFromTraceID(traceID model.TraceID) (pgtype.UUID, error) {
return uuid, fmt.Errorf("marshaling TraceID: %w", err)
}

if err := uuid.Set(buf); err != nil {
return uuid, fmt.Errorf("setting TraceID: %w", err)
}
return uuid, nil
return pgtype.UUID{Bytes: buf, Valid: true}, nil
}

func (b *Builder) getTraceQuery(traceID model.TraceID) (string, []interface{}, error) {
Expand Down
Loading

0 comments on commit ca19e2f

Please sign in to comment.