diff --git a/pgconn/pgconn.go b/pgconn/pgconn.go index 803b41d1f..d5a67beaf 100644 --- a/pgconn/pgconn.go +++ b/pgconn/pgconn.go @@ -2053,6 +2053,13 @@ func (p *Pipeline) Flush() error { // Sync establishes a synchronization point and flushes the queued requests. func (p *Pipeline) Sync() error { + if p.closed { + if p.err != nil { + return p.err + } + return errors.New("pipeline closed") + } + p.conn.frontend.SendSync(&pgproto3.Sync{}) err := p.Flush() if err != nil { @@ -2069,10 +2076,21 @@ func (p *Pipeline) Sync() error { // *PipelineSync. If an ErrorResponse is received from the server, results will be nil and err will be a *PgError. If no // results are available, results and err will both be nil. func (p *Pipeline) GetResults() (results any, err error) { + if p.closed { + if p.err != nil { + return nil, p.err + } + return nil, errors.New("pipeline closed") + } + if p.expectedReadyForQueryCount == 0 { return nil, nil } + return p.getResults() +} + +func (p *Pipeline) getResults() (results any, err error) { for { msg, err := p.conn.receiveMessage() if err != nil { @@ -2159,6 +2177,7 @@ func (p *Pipeline) Close() error { if p.closed { return p.err } + p.closed = true if p.pendingSync { @@ -2171,7 +2190,7 @@ func (p *Pipeline) Close() error { } for p.expectedReadyForQueryCount > 0 { - _, err := p.GetResults() + _, err := p.getResults() if err != nil { p.err = err var pgErr *PgError