Skip to content

Commit

Permalink
Update pgx to v4
Browse files Browse the repository at this point in the history
  • Loading branch information
michiomochi committed Jul 10, 2024
1 parent 2999bea commit 31d9771
Show file tree
Hide file tree
Showing 10 changed files with 403 additions and 183 deletions.
22 changes: 15 additions & 7 deletions enqueue_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package qg

import (
"context"
"testing"
"time"
)

func TestEnqueueOnlyType(t *testing.T) {
ctx := context.Background()
c := openTestClient(t)
defer truncateAndClose(c)
defer truncateAndClose(ctx, c)

if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -46,8 +48,9 @@ func TestEnqueueOnlyType(t *testing.T) {
}

func TestEnqueueWithPriority(t *testing.T) {
ctx := context.Background()
c := openTestClient(t)
defer truncateAndClose(c)
defer truncateAndClose(ctx, c)

want := int16(99)
if err := c.Enqueue(&Job{Type: "MyJob", Priority: want}); err != nil {
Expand All @@ -65,8 +68,9 @@ func TestEnqueueWithPriority(t *testing.T) {
}

func TestEnqueueWithRunAt(t *testing.T) {
ctx := context.Background()
c := openTestClient(t)
defer truncateAndClose(c)
defer truncateAndClose(ctx, c)

want := time.Now().Add(2 * time.Minute)
if err := c.Enqueue(&Job{Type: "MyJob", RunAt: want}); err != nil {
Expand All @@ -86,8 +90,9 @@ func TestEnqueueWithRunAt(t *testing.T) {
}

func TestEnqueueWithArgs(t *testing.T) {
ctx := context.Background()
c := openTestClient(t)
defer truncateAndClose(c)
defer truncateAndClose(ctx, c)

want := `{"arg1":0, "arg2":"a string"}`
if err := c.Enqueue(&Job{Type: "MyJob", Args: []byte(want)}); err != nil {
Expand All @@ -105,8 +110,9 @@ func TestEnqueueWithArgs(t *testing.T) {
}

func TestEnqueueWithQueue(t *testing.T) {
ctx := context.Background()
c := openTestClient(t)
defer truncateAndClose(c)
defer truncateAndClose(ctx, c)

want := "special-work-queue"
if err := c.Enqueue(&Job{Type: "MyJob", Queue: want}); err != nil {
Expand All @@ -124,17 +130,19 @@ func TestEnqueueWithQueue(t *testing.T) {
}

func TestEnqueueWithEmptyType(t *testing.T) {
ctx := context.Background()
c := openTestClient(t)
defer truncateAndClose(c)
defer truncateAndClose(ctx, c)

if err := c.Enqueue(&Job{Type: ""}); err != ErrMissingType {
t.Fatalf("want ErrMissingType, got %v", err)
}
}

func TestEnqueueInTx(t *testing.T) {
ctx := context.Background()
c := openTestClient(t)
defer truncateAndClose(c)
defer truncateAndClose(ctx, c)

tx, err := c.pool.Begin()
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ module github.com/kanmu/qg
go 1.16

require (
github.com/hashicorp/go-version v1.3.0 // indirect
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/jackc/pgx v3.0.1-0.20170728201109-511b90478f17+incompatible
github.com/jackc/pgconn v1.14.3
github.com/jackc/pgx/v4 v4.18.3
github.com/lib/pq v1.10.3 // indirect
github.com/pkg/errors v0.8.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
gopkg.in/guregu/null.v3 v3.0.2-0.20160228005316-41961cea0328
)
219 changes: 211 additions & 8 deletions go.sum

Large diffs are not rendered by default.

35 changes: 18 additions & 17 deletions que.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (

null "gopkg.in/guregu/null.v3"

"github.com/jackc/pgx"
"github.com/jackc/pgx/stdlib"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
)

// Job is a single unit of work for Que to perform.
Expand Down Expand Up @@ -110,15 +111,15 @@ func (j *Job) Tx() Txer {
//
// You must also later call Done() to return this job's database connection to
// the pool.
func (j *Job) Delete() error {
func (j *Job) Delete(ctx context.Context) error {
j.mu.Lock()
defer j.mu.Unlock()

if j.deleted {
return nil
}

_, err := j.conn.Exec("que_destroy_job", j.Queue, j.Priority, j.RunAt, j.ID)
_, err := j.conn.Exec(ctx, "que_destroy_job", j.Queue, j.Priority, j.RunAt, j.ID)
if err != nil {
return err
}
Expand All @@ -129,7 +130,7 @@ func (j *Job) Delete() error {

// Done releases the Postgres advisory lock on the job and returns the database
// connection to the pool.
func (j *Job) Done() {
func (j *Job) Done(ctx context.Context) {
j.mu.Lock()
defer j.mu.Unlock()

Expand All @@ -141,7 +142,7 @@ func (j *Job) Done() {
var ok bool
// Swallow this error because we don't want an unlock failure to cause work to
// stop.
if err := j.conn.QueryRow("que_unlock_job", j.ID).Scan(&ok); err != nil {
if err := j.conn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok); err != nil {
log.Printf("failed to unlock job job_id=%d job_type=%s", j.ID, j.Type)
}

Expand All @@ -158,11 +159,11 @@ func (j *Job) Done() {
//
// You must also later call Done() to return this job's database connection to
// the pool.
func (j *Job) Error(msg string) error {
func (j *Job) Error(ctx context.Context, msg string) error {
errorCount := j.ErrorCount + 1
delay := intPow(int(errorCount), 4) + 3 // TODO: configurable delay

_, err := j.conn.Exec("que_set_error", errorCount, delay, msg, j.Queue, j.Priority, j.RunAt, j.ID)
_, err := j.conn.Exec(ctx, "que_set_error", errorCount, delay, msg, j.Queue, j.Priority, j.RunAt, j.ID)
if err != nil {
return err
}
Expand Down Expand Up @@ -203,15 +204,15 @@ func NewClient(pool *sql.DB) *Client {
}

// Close disposes all the resources associated to the client
func (c *Client) Close() {
func (c *Client) Close(ctx context.Context) {
c.mu.Lock()
defer c.mu.Unlock()
if c.pool == nil {
return
}
c.stmtJobStats.Close()
for _, j := range c.jobsManaged {
j.Done()
j.Done(ctx)
}
c.pool = nil
c.jobsManaged = nil
Expand Down Expand Up @@ -321,7 +322,7 @@ var ErrAgain = errors.New("maximum number of LockJob attempts reached")
//
// After the Job has been worked, you must call either Done() or Error() on it
// in order to return the database connection to the pool and remove the lock.
func (c *Client) LockJob(queue string) (*Job, error) {
func (c *Client) LockJob(ctx context.Context, queue string) (*Job, error) {
conn, err := stdlib.AcquireConn(c.pool)
if err != nil {
return nil, err
Expand All @@ -330,7 +331,7 @@ func (c *Client) LockJob(queue string) (*Job, error) {
j := Job{c: c, conn: conn}

for i := 0; i < maxLockJobAttempts; i++ {
err = conn.QueryRow("que_lock_job", queue).Scan(
err = conn.QueryRow(ctx, "que_lock_job", queue).Scan(
&j.Queue,
&j.Priority,
&j.RunAt,
Expand Down Expand Up @@ -362,7 +363,7 @@ func (c *Client) LockJob(queue string) (*Job, error) {
// I'm not sure how to reliably commit a transaction that deletes
// the job in a separate thread between lock_job and check_job.
var ok bool
err = conn.QueryRow("que_check_job", j.Queue, j.Priority, j.RunAt, j.ID).Scan(&ok)
err = conn.QueryRow(ctx, "que_check_job", j.Queue, j.Priority, j.RunAt, j.ID).Scan(&ok)
if err == nil {
c.manageJob(&j)
return &j, nil
Expand All @@ -373,7 +374,7 @@ func (c *Client) LockJob(queue string) (*Job, error) {
// eventually causing the server to run out of locks.
//
// Also swallow the possible error, exactly like in Done.
_ = conn.QueryRow("que_unlock_job", j.ID).Scan(&ok)
_ = conn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok)
continue
} else {
// stdConn.Close()
Expand All @@ -395,10 +396,10 @@ var preparedStatements = map[string]string{
"que_unlock_job": sqlUnlockJob,
}

// PrepareStatements prepar statements
func PrepareStatements(conn *pgx.Conn) error {
// PrepareStatements prepare statements
func PrepareStatements(ctx context.Context, conn *pgconn.PgConn) error {
for name, sql := range preparedStatements {
if _, err := conn.Prepare(name, sql); err != nil {
if _, err := conn.Prepare(ctx, name, sql, nil); err != nil {
return err
}
}
Expand Down
52 changes: 23 additions & 29 deletions que_test.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,41 @@
package qg

import (
"context"
"database/sql"
"testing"
"time"

"github.com/jackc/pgx"
"github.com/jackc/pgx/stdlib"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
)

var testConnConfig = pgx.ConnConfig{
Host: "localhost",
Database: "qgtest",
User: "qgtest",
// LogLevel: pgx.LogLevelDebug,
// Logger: log15.New("testlogger", "test/qg"),
func newConn(ctx context.Context) *pgx.Conn {
conn, err := pgx.Connect(ctx, "postgres://qgtest@localhost/qgtest")
if err != nil {
panic(err)
}
return conn
}

const maxConn = 5

func openTestClientMaxConns(t testing.TB, maxConnections int) *Client {
// connPoolConfig := pgx.ConnPoolConfig{
// ConnConfig: testConnConfig,
// MaxConnections: maxConnections,
// AfterConnect: PrepareStatements,
// }
// pool, err := pgx.NewConnPool(connPoolConfig)
// if err != nil {
// t.Fatal(err)
// }
driverConfig := stdlib.DriverConfig{
ConnConfig: pgx.ConnConfig{
Host: "localhost",
Database: "qgtest",
User: "qgtest",
},
AfterConnect: PrepareStatements,
}
stdlib.RegisterDriverConfig(&driverConfig)
db, err := sql.Open("pgx", driverConfig.ConnectionString(""))
config, err := pgx.ParseConfig("postgres://qgtest@localhost/qgtest")
if err != nil {
t.Fatal(err)
}

afterConnect := func(ctx context.Context, conn *pgconn.PgConn) error {
return PrepareStatements(ctx, conn)
}

config.AfterConnect = afterConnect

stdlib.RegisterConnConfig(config)
db := stdlib.OpenDB(*config)

// using stdlib, it's difficult to open max conn from the begining
// if we want to open connections till its limit, need to use go routine to
// concurrently open connections
Expand All @@ -60,9 +54,9 @@ func openTestClient(t testing.TB) *Client {
return openTestClientMaxConns(t, maxConn)
}

func truncateAndClose(c *Client) {
func truncateAndClose(ctx context.Context, c *Client) {
pool := c.pool
c.Close()
c.Close(ctx)
if _, err := pool.Exec("TRUNCATE TABLE que_jobs"); err != nil {
panic(err)
}
Expand Down
20 changes: 11 additions & 9 deletions stats_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package qg

import (
"context"
"testing"
)

func TestStats(t *testing.T) {
ctx := context.Background()
c := openTestClient(t)
defer truncateAndClose(c)
defer truncateAndClose(ctx, c)

if err := c.Enqueue(&Job{Queue: "Q1", Type: "MyJob"}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -257,14 +259,14 @@ func TestStats(t *testing.T) {
}

func() {
j, err := c.LockJob("Q1")
j, err := c.LockJob(ctx, "Q1")
if err != nil {
t.Fatal(err)
}
if j == nil {
t.Fatal(err)
}
defer j.Done()
defer j.Done(ctx)

stats, err = c.Stats()
if err != nil {
Expand Down Expand Up @@ -361,15 +363,15 @@ func TestStats(t *testing.T) {
}()

func() {
j, err := c.LockJob("Q1")
j, err := c.LockJob(ctx, "Q1")
if err != nil {
t.Fatal(err)
}
if j == nil {
t.Fatal(err)
}
defer j.Done()
j.Delete() //nolint:errcheck
defer j.Done(ctx)
j.Delete(ctx) //nolint:errcheck

stats, err = c.Stats()
if err != nil {
Expand Down Expand Up @@ -466,15 +468,15 @@ func TestStats(t *testing.T) {
}()

func() {
j, err := c.LockJob("Q1")
j, err := c.LockJob(ctx, "Q1")
if err != nil {
t.Fatal(err)
}
if j == nil {
t.Fatal(err)
}
j.Error("???") //nolint:errcheck
j.Done()
j.Error(ctx, "???") //nolint:errcheck
j.Done(ctx)

stats, err = c.Stats()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion testing.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package qg

import "github.com/jackc/pgx"
import "github.com/jackc/pgx/v4"

// // TestInjectJobConn injects *pgx.Conn to Job
func TestInjectJobConn(j *Job, conn *pgx.Conn) *Job {
Expand Down
Loading

0 comments on commit 31d9771

Please sign in to comment.