diff --git a/enqueue_test.go b/enqueue_test.go index 3450a8e..11b06c1 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -1,19 +1,21 @@ package qg import ( + "context" "testing" "time" ) func TestEnqueueOnlyType(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - j, err := findOneJob(c.pool) + j, err := findOneJob(ctx, c.pool) if err != nil { t.Fatal(err) } @@ -46,15 +48,16 @@ func TestEnqueueOnlyType(t *testing.T) { } func TestEnqueueWithPriority(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) want := int16(99) - if err := c.Enqueue(&Job{Type: "MyJob", Priority: want}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob", Priority: want}); err != nil { t.Fatal(err) } - j, err := findOneJob(c.pool) + j, err := findOneJob(ctx, c.pool) if err != nil { t.Fatal(err) } @@ -65,15 +68,16 @@ func TestEnqueueWithPriority(t *testing.T) { } func TestEnqueueWithRunAt(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) want := time.Now().Add(2 * time.Minute) - if err := c.Enqueue(&Job{Type: "MyJob", RunAt: want}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob", RunAt: want}); err != nil { t.Fatal(err) } - j, err := findOneJob(c.pool) + j, err := findOneJob(ctx, c.pool) if err != nil { t.Fatal(err) } @@ -86,15 +90,16 @@ func TestEnqueueWithRunAt(t *testing.T) { } func TestEnqueueWithArgs(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) want := `{"arg1":0, "arg2":"a string"}` - if err := c.Enqueue(&Job{Type: "MyJob", Args: []byte(want)}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob", Args: []byte(want)}); err != nil { t.Fatal(err) } - j, err := findOneJob(c.pool) + j, err := findOneJob(ctx, c.pool) if err != nil { t.Fatal(err) } @@ -105,15 +110,16 @@ func TestEnqueueWithArgs(t *testing.T) { } func TestEnqueueWithQueue(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) want := "special-work-queue" - if err := c.Enqueue(&Job{Type: "MyJob", Queue: want}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob", Queue: want}); err != nil { t.Fatal(err) } - j, err := findOneJob(c.pool) + j, err := findOneJob(ctx, c.pool) if err != nil { t.Fatal(err) } @@ -124,29 +130,31 @@ func TestEnqueueWithQueue(t *testing.T) { } func TestEnqueueWithEmptyType(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Type: ""}); err != ErrMissingType { + if err := c.Enqueue(ctx, &Job{Type: ""}); err != ErrMissingType { t.Fatalf("want ErrMissingType, got %v", err) } } func TestEnqueueInTx(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - tx, err := c.pool.Begin() + tx, err := c.pool.Begin(ctx) if err != nil { t.Fatal(err) } - defer tx.Rollback() //nolint:errcheck + defer tx.Rollback(ctx) //nolint:errcheck - if err = c.EnqueueInTx(&Job{Type: "MyJob"}, tx); err != nil { + if err = c.EnqueueInTx(ctx, &Job{Type: "MyJob"}, tx); err != nil { t.Fatal(err) } - j, err := findOneJob(tx) + j, err := findOneJob(ctx, tx) if err != nil { t.Fatal(err) } @@ -154,11 +162,11 @@ func TestEnqueueInTx(t *testing.T) { t.Fatal("want job, got none") } - if err = tx.Rollback(); err != nil { + if err = tx.Rollback(ctx); err != nil { t.Fatal(err) } - j, err = findOneJob(c.pool) + j, err = findOneJob(ctx, c.pool) if err != nil { t.Fatal(err) } diff --git a/go.mod b/go.mod index 8f71f57..481ff0f 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,9 @@ module github.com/kanmu/qg go 1.16 require ( - github.com/hashicorp/go-version v1.3.0 // indirect - github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect - github.com/jackc/pgx v3.0.1-0.20170728201109-511b90478f17+incompatible + github.com/jackc/pgconn v1.14.3 + github.com/jackc/pgx/v4 v4.18.3 github.com/lib/pq v1.10.3 // indirect - github.com/pkg/errors v0.8.0 // indirect github.com/shopspring/decimal v1.3.1 // indirect gopkg.in/guregu/null.v3 v3.0.2-0.20160228005316-41961cea0328 ) diff --git a/go.sum b/go.sum index 69a98fe..0901b28 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,217 @@ -github.com/hashicorp/go-version v1.3.0 h1:McDWVJIU/y+u1BRV06dPaLfLCaT7fUTJLp5r04x7iNw= -github.com/hashicorp/go-version v1.3.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= -github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc= -github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ= -github.com/jackc/pgx v3.0.1-0.20170728201109-511b90478f17+incompatible h1:gODIIrHcjpWFN5yw777R1Ju3aOB07lhy0vxutBGDgFg= -github.com/jackc/pgx v3.0.1-0.20170728201109-511b90478f17+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= +github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= +github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= +github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= +github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= +github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= +github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA= +github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE= +github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s= +github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o= +github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8/2JY= +github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= +github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w= +github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= +github.com/jackc/pgmock v0.0.0-20201204152224-4fe30f7445fd/go.mod h1:hrBW0Enj2AZTNpt/7Y5rr2xe/9Mn757Wtb2xeBzPv2c= +github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5Wi/+Zz7xoE5ALHsRQlOctkOiHc= +github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= +github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= +github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag= +github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= +github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= +github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= +github.com/jackc/pgtype v1.8.1-0.20210724151600-32e20a603178/go.mod h1:C516IlIV9NKqfsMCXTdChteoXmwgUceqaLfjg2e3NlM= +github.com/jackc/pgtype v1.14.0 h1:y+xUdabmyMkJLyApYuPj38mW+aAIqCe5uuBB51rH3Vw= +github.com/jackc/pgtype v1.14.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= +github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y= +github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= +github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= +github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= +github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= +github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= +github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.3.0 h1:eHK/5clGOatcjX3oWGBO/MpxpbHzSwud5EWTSCI+MX0= +github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg= github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= -github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= +github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= +github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= +github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg= +golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/guregu/null.v3 v3.0.2-0.20160228005316-41961cea0328 h1:hU8cvgSqUQ5u4yLmjxkBal15GhqW77yRklOkbEOlJ38= gopkg.in/guregu/null.v3 v3.0.2-0.20160228005316-41961cea0328/go.mod h1:E4tX2Qe3h7QdL+uZ3a0vqvYwKQsRSQKM5V4YltdgH9Y= +gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/que.go b/que.go index 3e33e65..31c866e 100644 --- a/que.go +++ b/que.go @@ -10,8 +10,9 @@ import ( null "gopkg.in/guregu/null.v3" - "github.com/jackc/pgx" - "github.com/jackc/pgx/stdlib" + "github.com/jackc/pgconn" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" ) // Job is a single unit of work for Que to perform. @@ -50,32 +51,29 @@ type Job struct { mu sync.Mutex deleted bool c *Client - conn *pgx.Conn + conn *pgxpool.Conn tx Txer } -// Queryer is interface for query type Queryer interface { - Exec(string, ...interface{}) (sql.Result, error) - ExecContext(context.Context, string, ...interface{}) (sql.Result, error) - PrepareContext(context.Context, string) (*sql.Stmt, error) - Query(string, ...interface{}) (*sql.Rows, error) - QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) - QueryRow(string, ...interface{}) *sql.Row - QueryRowContext(context.Context, string, ...interface{}) *sql.Row + Begin(ctx context.Context) (pgx.Tx, error) + BeginFunc(ctx context.Context, f func(pgx.Tx) error) (err error) + Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) + Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) + QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row + QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) } // Txer is interface for tx type Txer interface { Queryer - Commit() error - Rollback() error + Commit(ctx context.Context) error + Rollback(ctx context.Context) error } // Conner is interface for conn type Conner interface { Queryer - Begin() (*sql.Tx, error) Close() error } @@ -91,7 +89,7 @@ type JobStats struct { } // Conn returns transaction -func (j *Job) Conn() *pgx.Conn { +func (j *Job) Conn() *pgxpool.Conn { j.mu.Lock() defer j.mu.Unlock() @@ -110,7 +108,7 @@ func (j *Job) Tx() Txer { // // You must also later call Done() to return this job's database connection to // the pool. -func (j *Job) Delete() error { +func (j *Job) Delete(ctx context.Context) error { j.mu.Lock() defer j.mu.Unlock() @@ -118,7 +116,7 @@ func (j *Job) Delete() error { return nil } - _, err := j.conn.Exec("que_destroy_job", j.Queue, j.Priority, j.RunAt, j.ID) + _, err := j.conn.Exec(ctx, "que_destroy_job", j.Queue, j.Priority, j.RunAt, j.ID) if err != nil { return err } @@ -129,7 +127,7 @@ func (j *Job) Delete() error { // Done releases the Postgres advisory lock on the job and returns the database // connection to the pool. -func (j *Job) Done() { +func (j *Job) Done(ctx context.Context) { j.mu.Lock() defer j.mu.Unlock() @@ -141,12 +139,11 @@ func (j *Job) Done() { var ok bool // Swallow this error because we don't want an unlock failure to cause work to // stop. - if err := j.conn.QueryRow("que_unlock_job", j.ID).Scan(&ok); err != nil { + if err := j.conn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok); err != nil { log.Printf("failed to unlock job job_id=%d job_type=%s", j.ID, j.Type) } - stdlib.ReleaseConn(j.c.pool, j.conn) //nolint:errcheck - // j.pool.Release(j.conn) + j.conn.Release() j.c.dischargeJob(j) j.c = nil j.conn = nil @@ -158,11 +155,11 @@ func (j *Job) Done() { // // You must also later call Done() to return this job's database connection to // the pool. -func (j *Job) Error(msg string) error { +func (j *Job) Error(ctx context.Context, msg string) error { errorCount := j.ErrorCount + 1 delay := intPow(int(errorCount), 4) + 3 // TODO: configurable delay - _, err := j.conn.Exec("que_set_error", errorCount, delay, msg, j.Queue, j.Priority, j.RunAt, j.ID) + _, err := j.conn.Exec(ctx, "que_set_error", errorCount, delay, msg, j.Queue, j.Priority, j.RunAt, j.ID) if err != nil { return err } @@ -172,50 +169,33 @@ func (j *Job) Error(msg string) error { // Client is a Que client that can add jobs to the queue and remove jobs from // the queue. type Client struct { - pool *sql.DB + pool *pgxpool.Pool mu sync.Mutex - stmtJobStats *sql.Stmt jobsManaged map[int64]*Job // TODO: add a way to specify default queueing options } -// NewClient2 creates a new Client that uses the pgx pool. -func NewClient2(pool *sql.DB) (*Client, error) { - stmtJobStats, err := pool.Prepare(sqlJobStats) - if err != nil { - return nil, err - } +// NewClient creates a new Client that uses the pgx pool. +func NewClient(pool *pgxpool.Pool) (*Client, error) { return &Client{ pool: pool, - stmtJobStats: stmtJobStats, jobsManaged: map[int64]*Job{}, }, nil } -// NewClient creates a new Client that uses the pgx pool. Returns nil if the initialization fails. -func NewClient(pool *sql.DB) *Client { - c, err := NewClient2(pool) - if err != nil { - return nil - } - return c -} - // Close disposes all the resources associated to the client -func (c *Client) Close() { +func (c *Client) Close(ctx context.Context) { c.mu.Lock() defer c.mu.Unlock() if c.pool == nil { return } - c.stmtJobStats.Close() for _, j := range c.jobsManaged { - j.Done() + j.Done(ctx) } c.pool = nil c.jobsManaged = nil - c.stmtJobStats = nil } // ErrMissingType is returned when you attempt to enqueue a job with no Type @@ -223,8 +203,8 @@ func (c *Client) Close() { var ErrMissingType = errors.New("job type must be specified") // Enqueue adds a job to the queue. -func (c *Client) Enqueue(j *Job) error { - return execEnqueue(j, c.pool) +func (c *Client) Enqueue(ctx context.Context, j *Job) error { + return execEnqueue(ctx, j, c.pool) } // EnqueueInTx adds a job to the queue within the scope of the transaction tx. @@ -233,11 +213,11 @@ func (c *Client) Enqueue(j *Job) error { // // It is the caller's responsibility to Commit or Rollback the transaction after // this function is called. -func (c *Client) EnqueueInTx(j *Job, tx *sql.Tx) error { - return execEnqueue(j, tx) +func (c *Client) EnqueueInTx(ctx context.Context, j *Job, tx pgx.Tx) error { + return execEnqueue(ctx, j, tx) } -func execEnqueue(j *Job, q Queryer) error { +func execEnqueue(ctx context.Context, j *Job, q Queryer) error { if j.Type == "" { return ErrMissingType } @@ -257,9 +237,8 @@ func execEnqueue(j *Job, q Queryer) error { Time: j.RunAt, Valid: !j.RunAt.IsZero(), } - // args := bytea(j.Args) - _, err := q.Exec("que_insert_job", queue, priority, runAt, j.Type, j.Args) + _, err := q.Exec(ctx, "que_insert_job", queue, priority, runAt, j.Type, j.Args) return err } @@ -275,28 +254,6 @@ func (c *Client) dischargeJob(j *Job) { delete(c.jobsManaged, j.ID) } -// type bytea []byte -// -// func (b bytea) Encode(w *pgx.WriteBuf, oid pgx.Oid) error { -// if len(b) == 0 { -// w.WriteInt32(-1) -// return nil -// } -// w.WriteInt32(int32(len(b))) -// w.WriteBytes(b) -// return nil -// } - -// func (b bytea) FormatCode() int16 { -// return pgx.TextFormatCode -// } - -// type queryable interface { -// Exec(sql string, arguments ...interface{}) (commandTag pgx.CommandTag, err error) -// Query(sql string, args ...interface{}) (*pgx.Rows, error) -// QueryRow(sql string, args ...interface{}) *pgx.Row -// } - // Maximum number of loop iterations in LockJob before giving up. This is to // avoid looping forever in case something is wrong. const maxLockJobAttempts = 10 @@ -321,8 +278,8 @@ var ErrAgain = errors.New("maximum number of LockJob attempts reached") // // After the Job has been worked, you must call either Done() or Error() on it // in order to return the database connection to the pool and remove the lock. -func (c *Client) LockJob(queue string) (*Job, error) { - conn, err := stdlib.AcquireConn(c.pool) +func (c *Client) LockJob(ctx context.Context, queue string) (*Job, error) { + conn, err := c.pool.Acquire(ctx) if err != nil { return nil, err } @@ -330,7 +287,7 @@ func (c *Client) LockJob(queue string) (*Job, error) { j := Job{c: c, conn: conn} for i := 0; i < maxLockJobAttempts; i++ { - err = conn.QueryRow("que_lock_job", queue).Scan( + err = conn.QueryRow(ctx, "que_lock_job", queue).Scan( &j.Queue, &j.Priority, &j.RunAt, @@ -340,9 +297,8 @@ func (c *Client) LockJob(queue string) (*Job, error) { &j.ErrorCount, ) if err != nil { - // stdConn.Close() - stdlib.ReleaseConn(c.pool, conn) //nolint:errcheck - // c.pool.Release(conn) + conn.Release() + if err == pgx.ErrNoRows { return nil, nil } @@ -362,7 +318,7 @@ func (c *Client) LockJob(queue string) (*Job, error) { // I'm not sure how to reliably commit a transaction that deletes // the job in a separate thread between lock_job and check_job. var ok bool - err = conn.QueryRow("que_check_job", j.Queue, j.Priority, j.RunAt, j.ID).Scan(&ok) + err = conn.QueryRow(ctx, "que_check_job", j.Queue, j.Priority, j.RunAt, j.ID).Scan(&ok) if err == nil { c.manageJob(&j) return &j, nil @@ -373,16 +329,12 @@ func (c *Client) LockJob(queue string) (*Job, error) { // eventually causing the server to run out of locks. // // Also swallow the possible error, exactly like in Done. - _ = conn.QueryRow("que_unlock_job", j.ID).Scan(&ok) + _ = conn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok) continue } else { - // stdConn.Close() - // c.pool.Release(conn) - stdlib.ReleaseConn(c.pool, conn) //nolint:errcheck return nil, err } } - stdlib.ReleaseConn(c.pool, conn) //nolint:errcheck return nil, ErrAgain } @@ -395,10 +347,10 @@ var preparedStatements = map[string]string{ "que_unlock_job": sqlUnlockJob, } -// PrepareStatements prepar statements -func PrepareStatements(conn *pgx.Conn) error { +// PrepareStatements prepare statements +func PrepareStatements(ctx context.Context, conn *pgx.Conn) error { for name, sql := range preparedStatements { - if _, err := conn.Prepare(name, sql); err != nil { + if _, err := conn.Prepare(ctx, name, sql); err != nil { return err } } @@ -406,8 +358,8 @@ func PrepareStatements(conn *pgx.Conn) error { } // Stats retrieves the stats of all the queues -func (c *Client) Stats() (results []JobStats, err error) { - rows, err := c.stmtJobStats.Query() +func (c *Client) Stats(ctx context.Context) (results []JobStats, err error) { + rows, err := c.pool.Query(ctx, sqlJobStats) if err != nil { return } diff --git a/que_test.go b/que_test.go index 9014fbc..1eee9dd 100644 --- a/que_test.go +++ b/que_test.go @@ -1,81 +1,72 @@ package qg import ( - "database/sql" + "context" + "errors" "testing" "time" - "github.com/jackc/pgx" - "github.com/jackc/pgx/stdlib" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" ) -var testConnConfig = pgx.ConnConfig{ - Host: "localhost", - Database: "qgtest", - User: "qgtest", - // LogLevel: pgx.LogLevelDebug, - // Logger: log15.New("testlogger", "test/qg"), +func newConn(ctx context.Context) *pgx.Conn { + conn, err := pgx.Connect(ctx, "postgres://qgtest@localhost/qgtest") + if err != nil { + panic(err) + } + return conn } const maxConn = 5 -func openTestClientMaxConns(t testing.TB, maxConnections int) *Client { - // connPoolConfig := pgx.ConnPoolConfig{ - // ConnConfig: testConnConfig, - // MaxConnections: maxConnections, - // AfterConnect: PrepareStatements, - // } - // pool, err := pgx.NewConnPool(connPoolConfig) - // if err != nil { - // t.Fatal(err) - // } - driverConfig := stdlib.DriverConfig{ - ConnConfig: pgx.ConnConfig{ - Host: "localhost", - Database: "qgtest", - User: "qgtest", - }, - AfterConnect: PrepareStatements, - } - stdlib.RegisterDriverConfig(&driverConfig) - db, err := sql.Open("pgx", driverConfig.ConnectionString("")) +func openTestClientMaxConns(ctx context.Context, t testing.TB, maxConnections int) *Client { + config, err := pgxpool.ParseConfig("postgres://qgtest@localhost/qgtest") if err != nil { t.Fatal(err) } - // using stdlib, it's difficult to open max conn from the begining - // if we want to open connections till its limit, need to use go routine to - // concurrently open connections - db.SetMaxOpenConns(maxConnections) - db.SetMaxIdleConns(maxConnections) + + config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { + return PrepareStatements(ctx, conn) + } + config.MaxConns = int32(maxConnections) + config.MinConns = int32(maxConnections) + // make lifetime sufficiently long - db.SetConnMaxLifetime(time.Duration(5 * time.Minute)) - c, err := NewClient2(db) + config.MaxConnLifetime = 5 * time.Minute + + pool, err := pgxpool.ConnectConfig(ctx, config) + if err != nil { + t.Fatal(err) + } + + c, err := NewClient(pool) if err != nil { t.Fatal(err) } return c } -func openTestClient(t testing.TB) *Client { - return openTestClientMaxConns(t, maxConn) +func openTestClient(ctx context.Context, t testing.TB) *Client { + return openTestClientMaxConns(ctx, t, maxConn) } -func truncateAndClose(c *Client) { +func truncateAndClose(ctx context.Context, c *Client) { pool := c.pool - c.Close() - if _, err := pool.Exec("TRUNCATE TABLE que_jobs"); err != nil { + c.Close(ctx) + if _, err := pool.Exec(ctx, "TRUNCATE TABLE que_jobs"); err != nil { panic(err) } pool.Close() } -func findOneJob(q Queryer) (*Job, error) { +func findOneJob(ctx context.Context, q Queryer) (*Job, error) { findSQL := ` SELECT priority, run_at, job_id, job_class, args, error_count, last_error, queue FROM que_jobs LIMIT 1` j := &Job{} - err := q.QueryRow(findSQL).Scan( + err := q.QueryRow(ctx, findSQL).Scan( &j.Priority, &j.RunAt, &j.ID, @@ -85,7 +76,8 @@ func findOneJob(q Queryer) (*Job, error) { &j.LastError, &j.Queue, ) - if err == sql.ErrNoRows { + + if errors.Is(err, pgx.ErrNoRows) { return nil, nil } if err != nil { diff --git a/stats_test.go b/stats_test.go index 9de0df5..426bab1 100644 --- a/stats_test.go +++ b/stats_test.go @@ -1,18 +1,20 @@ package qg import ( + "context" "testing" ) func TestStats(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Queue: "Q1", Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Queue: "Q1", Type: "MyJob"}); err != nil { t.Fatal(err) } - stats, err := c.Stats() + stats, err := c.Stats(ctx) if err != nil { t.Fatal(err) } @@ -49,11 +51,11 @@ func TestStats(t *testing.T) { t.Errorf("stats[0].OldestRunAt.IsZero() != false (got %v)", stats[0].OldestRunAt.IsZero()) } - if err := c.Enqueue(&Job{Queue: "Q1", Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Queue: "Q1", Type: "MyJob"}); err != nil { t.Fatal(err) } - stats, err = c.Stats() + stats, err = c.Stats(ctx) if err != nil { t.Fatal(err) } @@ -90,11 +92,11 @@ func TestStats(t *testing.T) { t.Errorf("stats[0].OldestRunAt.IsZero() != false (got %v)", stats[0].OldestRunAt.IsZero()) } - if err := c.Enqueue(&Job{Queue: "Q2", Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Queue: "Q2", Type: "MyJob"}); err != nil { t.Fatal(err) } - stats, err = c.Stats() + stats, err = c.Stats(ctx) if err != nil { t.Fatal(err) } @@ -159,11 +161,11 @@ func TestStats(t *testing.T) { t.Errorf("stats[1].OldestRunAt.IsZero() != false (got %v)", stats[1].OldestRunAt.IsZero()) } - if err := c.Enqueue(&Job{Queue: "Q1", Type: "AnotherJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Queue: "Q1", Type: "AnotherJob"}); err != nil { t.Fatal(err) } - stats, err = c.Stats() + stats, err = c.Stats(ctx) if err != nil { t.Fatal(err) } @@ -257,16 +259,16 @@ func TestStats(t *testing.T) { } func() { - j, err := c.LockJob("Q1") + j, err := c.LockJob(ctx, "Q1") if err != nil { t.Fatal(err) } if j == nil { t.Fatal(err) } - defer j.Done() + defer j.Done(ctx) - stats, err = c.Stats() + stats, err = c.Stats(ctx) if err != nil { t.Fatal(err) } @@ -361,17 +363,17 @@ func TestStats(t *testing.T) { }() func() { - j, err := c.LockJob("Q1") + j, err := c.LockJob(ctx, "Q1") if err != nil { t.Fatal(err) } if j == nil { t.Fatal(err) } - defer j.Done() - j.Delete() //nolint:errcheck + defer j.Done(ctx) + j.Delete(ctx) //nolint:errcheck - stats, err = c.Stats() + stats, err = c.Stats(ctx) if err != nil { t.Fatal(err) } @@ -466,17 +468,17 @@ func TestStats(t *testing.T) { }() func() { - j, err := c.LockJob("Q1") + j, err := c.LockJob(ctx, "Q1") if err != nil { t.Fatal(err) } if j == nil { t.Fatal(err) } - j.Error("???") //nolint:errcheck - j.Done() + j.Error(ctx, "???") //nolint:errcheck + j.Done(ctx) - stats, err = c.Stats() + stats, err = c.Stats(ctx) if err != nil { t.Fatal(err) } diff --git a/testing.go b/testing.go deleted file mode 100644 index ab7bae7..0000000 --- a/testing.go +++ /dev/null @@ -1,15 +0,0 @@ -package qg - -import "github.com/jackc/pgx" - -// // TestInjectJobConn injects *pgx.Conn to Job -func TestInjectJobConn(j *Job, conn *pgx.Conn) *Job { - j.conn = conn - return j -} - -// TestInjectJobTx injects tx to Job -func TestInjectJobTx(j *Job, tx Txer) *Job { - j.tx = tx - return j -} diff --git a/work_test.go b/work_test.go index 586a55b..f0a6bc8 100644 --- a/work_test.go +++ b/work_test.go @@ -1,24 +1,23 @@ package qg import ( + "context" "fmt" "sync" "testing" "time" - - "github.com/jackc/pgx" - "github.com/jackc/pgx/stdlib" ) func TestLockJob(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - j, err := c.LockJob("") + j, err := c.LockJob(ctx, "") if err != nil { t.Fatal(err) } @@ -29,7 +28,7 @@ func TestLockJob(t *testing.T) { if j.c == nil { t.Fatal("want non-nil c on locked Job") } - defer j.Done() + defer j.Done(ctx) // check values of returned Job if j.ID == 0 { @@ -60,7 +59,7 @@ func TestLockJob(t *testing.T) { // check for advisory lock var count int64 query := "SELECT count(*) FROM pg_locks WHERE locktype=$1 AND objid=$2::bigint" - if err = j.c.pool.QueryRow(query, "advisory", j.ID).Scan(&count); err != nil { + if err = j.c.pool.QueryRow(ctx, query, "advisory", j.ID).Scan(&count); err != nil { t.Fatal(err) } if count != 1 { @@ -69,49 +68,51 @@ func TestLockJob(t *testing.T) { // make sure conn was checked out of pool openedConn := 2 // one is for locking job, the other is for counting pg_locks - stat := c.pool.Stats() - available := stat.OpenConnections - if want := openedConn; available != want { + stat := c.pool.Stat() + available := stat.AcquireCount() + if want := openedConn; int(available) != want { t.Errorf("want available=%d, got %d", want, available) } - if err = j.Delete(); err != nil { + if err = j.Delete(ctx); err != nil { t.Fatal(err) } } func TestLockJobAlreadyLocked(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - j, err := c.LockJob("") + j, err := c.LockJob(ctx, "") if err != nil { t.Fatal(err) } if j == nil { t.Fatal("wanted job, got none") } - defer j.Done() + defer j.Done(ctx) - j2, err := c.LockJob("") + j2, err := c.LockJob(ctx, "") if err != nil { t.Fatal(err) } if j2 != nil { - defer j2.Done() + defer j2.Done(ctx) t.Fatalf("wanted no job, got %+v", j2) } } func TestLockJobNoJob(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - j, err := c.LockJob("") + j, err := c.LockJob(ctx, "") if err != nil { t.Fatal(err) } @@ -121,53 +122,55 @@ func TestLockJobNoJob(t *testing.T) { } func TestLockJobCustomQueue(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Type: "MyJob", Queue: "extra_priority"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob", Queue: "extra_priority"}); err != nil { t.Fatal(err) } - j, err := c.LockJob("") + j, err := c.LockJob(ctx, "") if err != nil { t.Fatal(err) } if j != nil { - j.Done() + j.Done(ctx) t.Errorf("expected no job to be found with empty queue name, got %+v", j) } - j, err = c.LockJob("extra_priority") + j, err = c.LockJob(ctx, "extra_priority") if err != nil { t.Fatal(err) } - defer j.Done() + defer j.Done(ctx) if j == nil { t.Fatal("wanted job, got none") } - if err = j.Delete(); err != nil { + if err = j.Delete(ctx); err != nil { t.Fatal(err) } } func TestJobConn(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - j, err := c.LockJob("") + j, err := c.LockJob(ctx, "") if err != nil { t.Fatal(err) } if j == nil { t.Fatal("wanted job, got none") } - defer j.Done() + defer j.Done(ctx) if conn := j.Conn(); conn != j.conn { t.Errorf("want %+v, got %+v", j.conn, conn) @@ -175,21 +178,22 @@ func TestJobConn(t *testing.T) { } func TestJobConnRace(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - j, err := c.LockJob("") + j, err := c.LockJob(ctx, "") if err != nil { t.Fatal(err) } if j == nil { t.Fatal("wanted job, got none") } - defer j.Done() + defer j.Done(ctx) var wg sync.WaitGroup wg.Add(2) @@ -201,7 +205,7 @@ func TestJobConnRace(t *testing.T) { wg.Done() }() go func() { - j.Done() + j.Done(ctx) wg.Done() }() wg.Wait() @@ -209,37 +213,30 @@ func TestJobConnRace(t *testing.T) { // Test the race condition in LockJob func TestLockJobAdvisoryRace(t *testing.T) { - c := openTestClientMaxConns(t, 4) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClientMaxConns(ctx, t, 4) + defer truncateAndClose(ctx, c) // *pgx.ConnPool doesn't support pools of only one connection. Make sure // the other one is busy so we know which backend will be used by LockJob // below. - unusedConn, err := stdlib.AcquireConn(c.pool) + conn, err := c.pool.Acquire(ctx) if err != nil { t.Fatal(err) } - defer stdlib.ReleaseConn(c.pool, unusedConn) //nolint:errcheck + defer conn.Release() // We use two jobs: the first one is concurrently deleted, and the second // one is returned by LockJob after recovering from the race condition. for i := 0; i < 2; i++ { - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } } - // helper functions - newConn := func() *pgx.Conn { - conn, err := pgx.Connect(testConnConfig) - if err != nil { - panic(err) - } - return conn - } - getBackendID := func(conn *pgx.Conn) int32 { + getBackendID := func(q Queryer) int32 { var backendID int32 - err := conn.QueryRow(` + err := q.QueryRow(ctx, ` SELECT pg_backend_pid() `).Scan(&backendID) if err != nil { @@ -248,11 +245,11 @@ func TestLockJobAdvisoryRace(t *testing.T) { return backendID } waitUntilBackendIsWaiting := func(backendID int32, name string) { - conn := newConn() + conn := newConn(ctx) i := 0 for { var waiting bool - err := conn.QueryRow(`SELECT wait_event is not null from pg_stat_activity where pid=$1`, backendID).Scan(&waiting) + err := conn.QueryRow(ctx, `SELECT wait_event is not null from pg_stat_activity where pid=$1`, backendID).Scan(&waiting) if err != nil { panic(err) } @@ -288,14 +285,14 @@ func TestLockJobAdvisoryRace(t *testing.T) { secondAccessExclusiveBackendIDChan := make(chan int32) go func() { - conn := newConn() - defer conn.Close() + conn := newConn(ctx) + defer conn.Close(ctx) - tx, err := conn.Begin() + tx, err := conn.Begin(ctx) if err != nil { panic(err) } - _, err = tx.Exec(`LOCK TABLE que_jobs IN ACCESS EXCLUSIVE MODE`) + _, err = tx.Exec(ctx, `LOCK TABLE que_jobs IN ACCESS EXCLUSIVE MODE`) if err != nil { panic(err) } @@ -308,31 +305,31 @@ func TestLockJobAdvisoryRace(t *testing.T) { backendID = <-secondAccessExclusiveBackendIDChan waitUntilBackendIsWaiting(backendID, "second access exclusive lock") - err = tx.Rollback() + err = tx.Rollback(ctx) if err != nil { panic(err) } }() go func() { - conn := newConn() - defer conn.Close() + conn := newConn(ctx) + defer conn.Close(ctx) // synchronization point secondAccessExclusiveBackendIDChan <- getBackendID(conn) - tx, err := conn.Begin() + tx, err := conn.Begin(ctx) if err != nil { panic(err) } - _, err = tx.Exec(`LOCK TABLE que_jobs IN ACCESS EXCLUSIVE MODE`) + _, err = tx.Exec(ctx, `LOCK TABLE que_jobs IN ACCESS EXCLUSIVE MODE`) if err != nil { panic(err) } // Fake a concurrent transaction grabbing the job var jid int64 - err = tx.QueryRow(` + err = tx.QueryRow(ctx, ` DELETE FROM que_jobs WHERE job_id = (SELECT min(job_id) @@ -345,27 +342,27 @@ func TestLockJobAdvisoryRace(t *testing.T) { deletedJobIDChan <- jid - err = tx.Commit() + err = tx.Commit(ctx) if err != nil { panic(err) } }() - conn, err := stdlib.AcquireConn(c.pool) + conn, err = c.pool.Acquire(ctx) if err != nil { - panic(err) + t.Fatal(err) } ourBackendID := getBackendID(conn) - stdlib.ReleaseConn(c.pool, conn) //nolint:errcheck + conn.Release() // synchronization point lockJobBackendIDChan <- ourBackendID - job, err := c.LockJob("") + job, err := c.LockJob(ctx, "") if err != nil { panic(err) } - defer job.Done() + defer job.Done(ctx) deletedJobID := <-deletedJobIDChan @@ -378,28 +375,29 @@ func TestLockJobAdvisoryRace(t *testing.T) { } func TestJobDelete(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - j, err := c.LockJob("") + j, err := c.LockJob(ctx, "") if err != nil { t.Fatal(err) } if j == nil { t.Fatal("wanted job, got none") } - defer j.Done() + defer j.Done(ctx) - if err = j.Delete(); err != nil { + if err = j.Delete(ctx); err != nil { t.Fatal(err) } // make sure job was deleted - j2, err := findOneJob(c.pool) + j2, err := findOneJob(ctx, c.pool) if err != nil { t.Fatal(err) } @@ -409,14 +407,15 @@ func TestJobDelete(t *testing.T) { } func TestJobDone(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - j, err := c.LockJob("") + j, err := c.LockJob(ctx, "") if err != nil { t.Fatal(err) } @@ -424,7 +423,7 @@ func TestJobDone(t *testing.T) { t.Fatal("wanted job, got none") } - j.Done() + j.Done(ctx) // make sure conn and pool were cleared if j.conn != nil { @@ -437,7 +436,7 @@ func TestJobDone(t *testing.T) { // make sure lock was released var count int64 query := "SELECT count(*) FROM pg_locks WHERE locktype=$1 AND objid=$2::bigint" - if err = c.pool.QueryRow(query, "advisory", j.ID).Scan(&count); err != nil { + if err = c.pool.QueryRow(ctx, query, "advisory", j.ID).Scan(&count); err != nil { t.Fatal(err) } if count != 0 { @@ -445,23 +444,24 @@ func TestJobDone(t *testing.T) { } // make sure conn was returned to pool - openedConn := 1 - stat := c.pool.Stats() - available := stat.OpenConnections - if openedConn != available { - t.Errorf("want available=total, got available=%d total=%d", available, openedConn) + opendConn := 1 + stat := c.pool.Stat() + available := stat.AcquireCount() + if opendConn != int(available) { + t.Errorf("want available=total, got available=%d total=%d", available, opendConn) } } func TestJobDoneMultiple(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - j, err := c.LockJob("") + j, err := c.LockJob(ctx, "") if err != nil { t.Fatal(err) } @@ -469,20 +469,21 @@ func TestJobDoneMultiple(t *testing.T) { t.Fatal("wanted job, got none") } - j.Done() + j.Done(ctx) // try calling Done() again - j.Done() + j.Done(ctx) } func TestJobDeleteFromTx(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - j, err := c.LockJob("") + j, err := c.LockJob(ctx, "") if err != nil { t.Fatal(err) } @@ -497,25 +498,25 @@ func TestJobDeleteFromTx(t *testing.T) { } // start a transaction - tx, err := conn.Begin() + tx, err := conn.Begin(ctx) if err != nil { t.Fatal(err) } // delete the job - if err = j.Delete(); err != nil { + if err = j.Delete(ctx); err != nil { t.Fatal(err) } - if err = tx.Commit(); err != nil { + if err = tx.Commit(ctx); err != nil { t.Fatal(err) } // mark as done - j.Done() + j.Done(ctx) // make sure the job is gone - j2, err := findOneJob(c.pool) + j2, err := findOneJob(ctx, c.pool) if err != nil { t.Fatal(err) } @@ -526,14 +527,15 @@ func TestJobDeleteFromTx(t *testing.T) { } func TestJobDeleteFromTxRollback(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - j1, err := c.LockJob("") + j1, err := c.LockJob(ctx, "") if err != nil { t.Fatal(err) } @@ -548,25 +550,25 @@ func TestJobDeleteFromTxRollback(t *testing.T) { } // start a transaction - tx, err := conn.Begin() + tx, err := conn.Begin(ctx) if err != nil { t.Fatal(err) } // delete the job - if err = j1.Delete(); err != nil { + if err = j1.Delete(ctx); err != nil { t.Fatal(err) } - if err = tx.Rollback(); err != nil { + if err = tx.Rollback(ctx); err != nil { t.Fatal(err) } // mark as done - j1.Done() + j1.Done(ctx) // make sure the job still exists and matches j1 - j2, err := findOneJob(c.pool) + j2, err := findOneJob(ctx, c.pool) if err != nil { t.Fatal(err) } @@ -579,37 +581,38 @@ func TestJobDeleteFromTxRollback(t *testing.T) { } func TestJobError(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - j, err := c.LockJob("") + j, err := c.LockJob(ctx, "") if err != nil { t.Fatal(err) } if j == nil { t.Fatal("wanted job, got none") } - defer j.Done() + defer j.Done(ctx) msg := "world\nended" - if err = j.Error(msg); err != nil { + if err = j.Error(ctx, msg); err != nil { t.Fatal(err) } - j.Done() + j.Done(ctx) // make sure job was not deleted - j2, err := findOneJob(c.pool) + j2, err := findOneJob(ctx, c.pool) if err != nil { t.Fatal(err) } if j2 == nil { t.Fatal("job was not found") } - defer j2.Done() + defer j2.Done(ctx) if !j2.LastError.Valid || j2.LastError.String != msg { t.Errorf("want LastError=%q, got %q", msg, j2.LastError.String) @@ -621,7 +624,7 @@ func TestJobError(t *testing.T) { // make sure lock was released var count int64 query := "SELECT count(*) FROM pg_locks WHERE locktype=$1 AND objid=$2::bigint" - if err = c.pool.QueryRow(query, "advisory", j.ID).Scan(&count); err != nil { + if err = c.pool.QueryRow(ctx, query, "advisory", j.ID).Scan(&count); err != nil { t.Fatal(err) } if count != 0 { @@ -630,9 +633,9 @@ func TestJobError(t *testing.T) { // make sure conn was returned to pool openedConn := 1 - stat := c.pool.Stats() - available := stat.OpenConnections - if openedConn != available { + stat := c.pool.Stat() + available := stat.AcquireCount() + if openedConn != int(available) { t.Errorf("want available=total, got available=%d total=%d", available, openedConn) } } diff --git a/worker.go b/worker.go index 782fc1a..1a39713 100644 --- a/worker.go +++ b/worker.go @@ -2,6 +2,7 @@ package qg import ( "bytes" + "context" "fmt" "log" "os" @@ -69,11 +70,11 @@ func NewWorker(c *Client, m WorkMap) *Worker { // Work pulls jobs off the Worker's Queue at its Interval. This function only // returns after Shutdown() is called, so it should be run in its own goroutine. -func (w *Worker) Work() { +func (w *Worker) Work(ctx context.Context) { defer log.Println("worker done") for { // Try to work a job - if w.WorkOne() { + if w.WorkOne(ctx) { // Since we just did work, non-blocking check whether we should exit select { case <-w.ch: @@ -94,8 +95,8 @@ func (w *Worker) Work() { } // WorkOne work on job -func (w *Worker) WorkOne() (didWork bool) { - j, err := w.c.LockJob(w.Queue) +func (w *Worker) WorkOne(ctx context.Context) (didWork bool) { + j, err := w.c.LockJob(ctx, w.Queue) if err != nil { log.Printf("attempting to lock job: %v", err) return @@ -103,14 +104,14 @@ func (w *Worker) WorkOne() (didWork bool) { if j == nil { return // no job was available } - j.tx, err = j.c.pool.Begin() + j.tx, err = j.c.pool.Begin(ctx) if err != nil { log.Printf("failed to create transaction: %v", err) return } - defer j.tx.Rollback() //nolint:errcheck - defer j.Done() - defer recoverPanic(j) + defer j.tx.Rollback(ctx) //nolint:errcheck + defer j.Done(ctx) + defer recoverPanic(ctx, j) didWork = true @@ -118,21 +119,21 @@ func (w *Worker) WorkOne() (didWork bool) { if !ok { msg := fmt.Sprintf("unknown job type: %q", j.Type) log.Println(msg) - if err = j.Error(msg); err != nil { + if err = j.Error(ctx, msg); err != nil { log.Printf("attempting to save error on job %d: %v", j.ID, err) } return } if err = wf(j); err != nil { - j.Error(err.Error()) //nolint:errcheck + j.Error(ctx, err.Error()) //nolint:errcheck return } - if err = j.Delete(); err != nil { + if err = j.Delete(ctx); err != nil { log.Printf("attempting to delete job %d: %v", j.ID, err) } - j.tx.Commit() //nolint:errcheck + j.tx.Commit(ctx) //nolint:errcheck log.Printf("event=job_worked job_id=%d job_type=%s", j.ID, j.Type) return } @@ -157,8 +158,8 @@ func (w *Worker) Shutdown() { // recoverPanic tries to handle panics in job execution. // A stacktrace is stored into Job last_error. -func recoverPanic(j *Job) { - j.tx.Rollback() //nolint:errcheck +func recoverPanic(ctx context.Context, j *Job) { + j.tx.Rollback(ctx) //nolint:errcheck if r := recover(); r != nil { // record an error on the job with panic message and stacktrace stackBuf := make([]byte, 1024) @@ -170,7 +171,7 @@ func recoverPanic(j *Job) { fmt.Fprintln(buf, "[...]") stacktrace := buf.String() log.Printf("event=panic job_id=%d job_type=%s\n%s", j.ID, j.Type, stacktrace) - if err := j.Error(stacktrace); err != nil { + if err := j.Error(ctx, stacktrace); err != nil { log.Printf("attempting to save error on job %d: %v", j.ID, err) } } @@ -200,7 +201,7 @@ func NewWorkerPool(c *Client, wm WorkMap, count int) *WorkerPool { } // Start starts all of the Workers in the WorkerPool. -func (w *WorkerPool) Start() { +func (w *WorkerPool) Start(ctx context.Context) { w.mu.Lock() defer w.mu.Unlock() @@ -208,7 +209,7 @@ func (w *WorkerPool) Start() { w.workers[i] = NewWorker(w.c, w.WorkMap) w.workers[i].Interval = w.Interval w.workers[i].Queue = w.Queue - go w.workers[i].Work() + go w.workers[i].Work(ctx) } } diff --git a/worker_test.go b/worker_test.go index 234a78c..e52535b 100644 --- a/worker_test.go +++ b/worker_test.go @@ -1,6 +1,7 @@ package qg import ( + "context" "fmt" "io" "log" @@ -14,8 +15,9 @@ func init() { } func TestWorkerWorkOne(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) success := false wm := WorkMap{ @@ -26,16 +28,16 @@ func TestWorkerWorkOne(t *testing.T) { } w := NewWorker(c, wm) - didWork := w.WorkOne() + didWork := w.WorkOne(ctx) if didWork { t.Errorf("want didWork=false when no job was queued") } - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - didWork = w.WorkOne() + didWork = w.WorkOne(ctx) if !didWork { t.Errorf("want didWork=true") } @@ -45,13 +47,14 @@ func TestWorkerWorkOne(t *testing.T) { } func TestWorkerShutdown(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) w := NewWorker(c, WorkMap{}) finished := false go func() { - w.Work() + w.Work(ctx) finished = true }() w.Shutdown() @@ -64,24 +67,25 @@ func TestWorkerShutdown(t *testing.T) { } func BenchmarkWorker(b *testing.B) { - c := openTestClient(b) + ctx := context.Background() + c := openTestClient(ctx, b) log.SetOutput(io.Discard) defer func() { log.SetOutput(os.Stdout) }() - defer truncateAndClose(c) + defer truncateAndClose(ctx, c) w := NewWorker(c, WorkMap{"Nil": nilWorker}) for i := 0; i < b.N; i++ { - if err := c.Enqueue(&Job{Type: "Nil"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "Nil"}); err != nil { log.Fatal(err) } } b.ResetTimer() for i := 0; i < b.N; i++ { - w.WorkOne() + w.WorkOne(ctx) } } @@ -90,8 +94,9 @@ func nilWorker(j *Job) error { } func TestWorkerWorkReturnsError(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) called := 0 wm := WorkMap{ @@ -102,16 +107,16 @@ func TestWorkerWorkReturnsError(t *testing.T) { } w := NewWorker(c, wm) - didWork := w.WorkOne() + didWork := w.WorkOne(ctx) if didWork { t.Errorf("want didWork=false when no job was queued") } - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - didWork = w.WorkOne() + didWork = w.WorkOne(ctx) if !didWork { t.Errorf("want didWork=true") } @@ -119,13 +124,13 @@ func TestWorkerWorkReturnsError(t *testing.T) { t.Errorf("want called=1 was: %d", called) } - tx, err := c.pool.Begin() + tx, err := c.pool.Begin(ctx) if err != nil { t.Fatal(err) } - defer tx.Rollback() //nolint:errcheck + defer tx.Rollback(ctx) //nolint:errcheck - j, err := findOneJob(tx) + j, err := findOneJob(ctx, tx) if err != nil { t.Fatal(err) } @@ -141,8 +146,9 @@ func TestWorkerWorkReturnsError(t *testing.T) { } func TestWorkerWorkRescuesPanic(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) called := 0 wm := WorkMap{ @@ -153,22 +159,22 @@ func TestWorkerWorkRescuesPanic(t *testing.T) { } w := NewWorker(c, wm) - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - w.WorkOne() + w.WorkOne(ctx) if called != 1 { t.Errorf("want called=1 was: %d", called) } - tx, err := c.pool.Begin() + tx, err := c.pool.Begin(ctx) if err != nil { t.Fatal(err) } - defer tx.Rollback() //nolint:errcheck + defer tx.Rollback(ctx) //nolint:errcheck - j, err := findOneJob(tx) + j, err := findOneJob(ctx, tx) if err != nil { t.Fatal(err) } @@ -191,8 +197,9 @@ func TestWorkerWorkRescuesPanic(t *testing.T) { } func TestWorkerWorkOneTypeNotInMap(t *testing.T) { - c := openTestClient(t) - defer truncateAndClose(c) + ctx := context.Background() + c := openTestClient(ctx, t) + defer truncateAndClose(ctx, c) currentConns := 2 availConns := 2 @@ -201,16 +208,16 @@ func TestWorkerWorkOneTypeNotInMap(t *testing.T) { wm := WorkMap{} w := NewWorker(c, wm) - didWork := w.WorkOne() + didWork := w.WorkOne(ctx) if didWork { t.Errorf("want didWork=false when no job was queued") } - if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil { + if err := c.Enqueue(ctx, &Job{Type: "MyJob"}); err != nil { t.Fatal(err) } - didWork = w.WorkOne() + didWork = w.WorkOne(ctx) if !didWork { t.Errorf("want didWork=true") } @@ -218,20 +225,21 @@ func TestWorkerWorkOneTypeNotInMap(t *testing.T) { t.Errorf("want success=false") } - if currentConns != c.pool.Stats().OpenConnections { - t.Errorf("want currentConns euqual: before=%d after=%d", currentConns, c.pool.Stats().OpenConnections) + acquireCount := c.pool.Stat().AcquireCount() + if currentConns != int(acquireCount) { + t.Errorf("want currentConns euqual: before=%d after=%d", currentConns, acquireCount) } - if availConns != c.pool.Stats().OpenConnections { - t.Errorf("want availConns euqual: before=%d after=%d", availConns, c.pool.Stats().OpenConnections) + if availConns != int(acquireCount) { + t.Errorf("want availConns euqual: before=%d after=%d", availConns, acquireCount) } - tx, err := c.pool.Begin() + tx, err := c.pool.Begin(ctx) if err != nil { t.Fatal(err) } - defer tx.Rollback() //nolint:errcheck + defer tx.Rollback(ctx) //nolint:errcheck - j, err := findOneJob(tx) + j, err := findOneJob(ctx, tx) if err != nil { t.Fatal(err) }