Skip to content

Commit

Permalink
Support large large objects.
Browse files Browse the repository at this point in the history
Fixes #1865.
  • Loading branch information
mitar committed Jan 14, 2024
1 parent bdfebe0 commit f3c601a
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 35 deletions.
73 changes: 55 additions & 18 deletions large_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"io"
)

// The PostgreSQL wire protocol has a limit of 1 GB - 1 per message. See definition of
// PQ_LARGE_MESSAGE_LIMIT in the PostgreSQL source code. To allow for the other data
// in the message,maxLargeObjectMessageLength should be no larger than 1 GB - 1 KB.
var maxLargeObjectMessageLength = 1024*1024*1024 - 1024

// LargeObjects is a structure used to access the large objects API. It is only valid within the transaction where it
// was created.
//
Expand Down Expand Up @@ -68,32 +73,64 @@ type LargeObject struct {

// Write writes p to the large object and returns the number of bytes written and an error if not all of p was written.
func (o *LargeObject) Write(p []byte) (int, error) {
var n int
err := o.tx.QueryRow(o.ctx, "select lowrite($1, $2)", o.fd, p).Scan(&n)
if err != nil {
return n, err
}

if n < 0 {
return 0, errors.New("failed to write to large object")
nTotal := 0
for {
expected := len(p) - nTotal
if expected == 0 {
break
} else if expected > maxLargeObjectMessageLength {
expected = maxLargeObjectMessageLength
}

var n int
err := o.tx.QueryRow(o.ctx, "select lowrite($1, $2)", o.fd, p[nTotal:nTotal+expected]).Scan(&n)
if err != nil {
return nTotal, err
}

if n < 0 {
return nTotal, errors.New("failed to write to large object")
}

nTotal += n

if n < expected {
return nTotal, errors.New("short write to large object")
} else if n > expected {
return nTotal, errors.New("invalid write to large object")
}
}

return n, nil
return nTotal, nil
}

// Read reads up to len(p) bytes into p returning the number of bytes read.
func (o *LargeObject) Read(p []byte) (int, error) {
var res []byte
err := o.tx.QueryRow(o.ctx, "select loread($1, $2)", o.fd, len(p)).Scan(&res)
copy(p, res)
if err != nil {
return len(res), err
nTotal := 0
for {
expected := len(p) - nTotal
if expected == 0 {
break
} else if expected > maxLargeObjectMessageLength {
expected = maxLargeObjectMessageLength
}

var res []byte
err := o.tx.QueryRow(o.ctx, "select loread($1, $2)", o.fd, expected).Scan(&res)
copy(p[nTotal:], res)
nTotal += len(res)
if err != nil {
return nTotal, err
}

if len(res) < expected {
return nTotal, io.EOF
} else if len(res) > expected {
return nTotal, errors.New("invalid read of large object")
}
}

if len(res) < len(p) {
err = io.EOF
}
return len(res), err
return nTotal, nil
}

// Seek moves the current location pointer to the new location specified by offset.
Expand Down
41 changes: 24 additions & 17 deletions large_objects_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pgx_test
package pgx

import (
"context"
Expand All @@ -7,23 +7,24 @@ import (
"testing"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxtest"
)

func TestLargeObjects(t *testing.T) {
t.Parallel()

// We use a very short limit to test chunking logic.
maxLargeObjectMessageLength = 2

ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

conn, err := pgx.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
conn, err := Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
if err != nil {
t.Fatal(err)
}

pgxtest.SkipCockroachDB(t, conn, "Server does support large objects")
// pgxtest.SkipCockroachDB(t, conn, "Server does support large objects")

tx, err := conn.Begin(ctx)
if err != nil {
Expand All @@ -36,22 +37,25 @@ func TestLargeObjects(t *testing.T) {
func TestLargeObjectsSimpleProtocol(t *testing.T) {
t.Parallel()

// We use a very short limit to test chunking logic.
maxLargeObjectMessageLength = 2

ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

config, err := pgx.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
config, err := ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
if err != nil {
t.Fatal(err)
}

config.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
config.DefaultQueryExecMode = QueryExecModeSimpleProtocol

conn, err := pgx.ConnectConfig(ctx, config)
conn, err := ConnectConfig(ctx, config)
if err != nil {
t.Fatal(err)
}

pgxtest.SkipCockroachDB(t, conn, "Server does support large objects")
// pgxtest.SkipCockroachDB(t, conn, "Server does support large objects")

tx, err := conn.Begin(ctx)
if err != nil {
Expand All @@ -61,15 +65,15 @@ func TestLargeObjectsSimpleProtocol(t *testing.T) {
testLargeObjects(t, ctx, tx)
}

func testLargeObjects(t *testing.T, ctx context.Context, tx pgx.Tx) {
func testLargeObjects(t *testing.T, ctx context.Context, tx Tx) {
lo := tx.LargeObjects()

id, err := lo.Create(ctx, 0)
if err != nil {
t.Fatal(err)
}

obj, err := lo.Open(ctx, id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite)
obj, err := lo.Open(ctx, id, LargeObjectModeRead|LargeObjectModeWrite)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -153,7 +157,7 @@ func testLargeObjects(t *testing.T, ctx context.Context, tx pgx.Tx) {
t.Fatal(err)
}

_, err = lo.Open(ctx, id, pgx.LargeObjectModeRead)
_, err = lo.Open(ctx, id, LargeObjectModeRead)
if e, ok := err.(*pgconn.PgError); !ok || e.Code != "42704" {
t.Errorf("Expected undefined_object error (42704), got %#v", err)
}
Expand All @@ -162,15 +166,18 @@ func testLargeObjects(t *testing.T, ctx context.Context, tx pgx.Tx) {
func TestLargeObjectsMultipleTransactions(t *testing.T) {
t.Parallel()

// We use a very short limit to test chunking logic.
maxLargeObjectMessageLength = 2

ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

conn, err := pgx.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
conn, err := Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
if err != nil {
t.Fatal(err)
}

pgxtest.SkipCockroachDB(t, conn, "Server does support large objects")
// pgxtest.SkipCockroachDB(t, conn, "Server does support large objects")

tx, err := conn.Begin(ctx)
if err != nil {
Expand All @@ -184,7 +191,7 @@ func TestLargeObjectsMultipleTransactions(t *testing.T) {
t.Fatal(err)
}

obj, err := lo.Open(ctx, id, pgx.LargeObjectModeWrite)
obj, err := lo.Open(ctx, id, LargeObjectModeWrite)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -220,7 +227,7 @@ func TestLargeObjectsMultipleTransactions(t *testing.T) {
lo2 := tx2.LargeObjects()

// Reopen the large object in the new transaction
obj2, err := lo2.Open(ctx, id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite)
obj2, err := lo2.Open(ctx, id, LargeObjectModeRead|LargeObjectModeWrite)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -296,7 +303,7 @@ func TestLargeObjectsMultipleTransactions(t *testing.T) {
t.Fatal(err)
}

_, err = lo2.Open(ctx, id, pgx.LargeObjectModeRead)
_, err = lo2.Open(ctx, id, LargeObjectModeRead)
if e, ok := err.(*pgconn.PgError); !ok || e.Code != "42704" {
t.Errorf("Expected undefined_object error (42704), got %#v", err)
}
Expand Down

0 comments on commit f3c601a

Please sign in to comment.