Skip to content

Commit

Permalink
QRecordSchema: close schemaLatch in Close (#2398)
Browse files Browse the repository at this point in the history
goroutines were leaking waiting on schemaLatch,
if provider hit error then schemaLatch could be left indefinitely open
  • Loading branch information
serprex authored Jan 3, 2025
1 parent ff1f11d commit d365a8b
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 72 deletions.
10 changes: 7 additions & 3 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ func (c *BigQueryConnector) SyncQRepRecords(
) (int, error) {
// Ensure the destination table is available.
destTable := config.DestinationTableIdentifier
srcSchema := stream.Schema()
srcSchema, err := stream.Schema()
if err != nil {
return 0, err
}

tblMetadata, err := c.replayTableSchemaDeltasQRep(ctx, config, partition, srcSchema)
if err != nil {
Expand Down Expand Up @@ -80,8 +83,9 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(
}
}

err = c.ReplayTableSchemaDeltas(ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta})
if err != nil {
if err := c.ReplayTableSchemaDeltas(
ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta},
); err != nil {
return nil, fmt.Errorf("failed to add columns to destination table: %w", err)
}
dstTableMetadata, err = bqTable.Metadata(ctx)
Expand Down
12 changes: 10 additions & 2 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords(
) (int, error) {
dstTableName := s.config.DestinationTableIdentifier

schema := stream.Schema()
schema, err := stream.Schema()
if err != nil {
return 0, err
}
s.logger.Info("sync function called and schema acquired",
slog.String("dstTable", dstTableName))

Expand Down Expand Up @@ -106,7 +109,12 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
stagingPath := s.credsProvider.BucketPath
startTime := time.Now()

avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, stream.Schema())
schema, err := stream.Schema()
if err != nil {
return 0, err
}

avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, schema)
if err != nil {
return 0, err
}
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/elasticsearch/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config *
) (int, error) {
startTime := time.Now()

schema := stream.Schema()
schema, err := stream.Schema()
if err != nil {
return 0, err
}

var bulkIndexFatalError error
var bulkIndexErrors []error
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/kafka/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ func (c *KafkaConnector) SyncQRepRecords(
) (int, error) {
startTime := time.Now()
numRecords := atomic.Int64{}
schema := stream.Schema()
schema, err := stream.Schema()
if err != nil {
return 0, err
}

queueCtx, queueErr := context.WithCancelCause(ctx)
pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, nil, queueErr)
Expand Down
10 changes: 6 additions & 4 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type QRepPullSink interface {
}

type QRepSyncSink interface {
GetColumnNames() []string
GetColumnNames() ([]string, error)
CopyInto(context.Context, *PostgresConnector, pgx.Tx, pgx.Identifier) (int64, error)
}

Expand Down Expand Up @@ -550,7 +550,10 @@ func syncQRepRecords(
upsertMatchCols[col] = struct{}{}
}

columnNames := sink.GetColumnNames()
columnNames, err := sink.GetColumnNames()
if err != nil {
return -1, fmt.Errorf("faild to get column names: %w", err)
}
setClauseArray := make([]string, 0, len(upsertMatchColsList)+1)
selectStrArray := make([]string, 0, len(columnNames))
for _, col := range columnNames {
Expand Down Expand Up @@ -578,8 +581,7 @@ func syncQRepRecords(
setClause,
)
c.logger.Info("Performing upsert operation", slog.String("upsertStmt", upsertStmt), syncLog)
_, err := tx.Exec(ctx, upsertStmt)
if err != nil {
if _, err := tx.Exec(ctx, upsertStmt); err != nil {
return -1, fmt.Errorf("failed to perform upsert operation: %w", err)
}
}
Expand Down
48 changes: 30 additions & 18 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ func (qe *QRepQueryExecutor) processRowsStream(
record, err := qe.mapRowToQRecord(rows, fieldDescriptions)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err))
err := fmt.Errorf("failed to map row to QRecord: %w", err)
stream.Close(err)
return 0, err
return 0, fmt.Errorf("failed to map row to QRecord: %w", err)
}

stream.Records <- record
Expand All @@ -189,12 +187,10 @@ func (qe *QRepQueryExecutor) processFetchedRows(
) (int, error) {
rows, err := qe.executeQueryInTx(ctx, tx, cursorName, fetchSize)
if err != nil {
stream.Close(err)
qe.logger.Error("[pg_query_executor] failed to execute query in tx",
slog.Any("error", err), slog.String("query", query))
return 0, fmt.Errorf("[pg_query_executor] failed to execute query in tx: %w", err)
}

defer rows.Close()

fieldDescriptions := rows.FieldDescriptions()
Expand All @@ -210,7 +206,6 @@ func (qe *QRepQueryExecutor) processFetchedRows(
}

if err := rows.Err(); err != nil {
stream.Close(err)
qe.logger.Error("[pg_query_executor] row iteration failed",
slog.String("query", query), slog.Any("error", err))
return 0, fmt.Errorf("[pg_query_executor] row iteration failed '%s': %w", query, err)
Expand All @@ -225,7 +220,8 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
args ...interface{},
) (*model.QRecordBatch, error) {
stream := model.NewQRecordStream(1024)
errors := make(chan error, 1)
errors := make(chan struct{})
var errorsError error
qe.logger.Info("Executing and processing query", slog.String("query", query))

// must wait on errors to close before returning to maintain qe.conn exclusion
Expand All @@ -234,23 +230,28 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
_, err := qe.ExecuteAndProcessQueryStream(ctx, stream, query, args...)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to execute and process query stream", slog.Any("error", err))
errors <- err
errorsError = err
}
}()

select {
case err := <-errors:
return nil, err
case <-errors:
return nil, errorsError
case <-stream.SchemaChan():
schema, err := stream.Schema()
if err != nil {
return nil, err
}
batch := &model.QRecordBatch{
Schema: stream.Schema(),
Schema: schema,
Records: nil,
}
for record := range stream.Records {
batch.Records = append(batch.Records, record)
}
if err := <-errors; err != nil {
return nil, err
<-errors
if errorsError != nil {
return nil, errorsError
}
if err := stream.Err(); err != nil {
return nil, fmt.Errorf("[pg] failed to get record from stream: %w", err)
Expand Down Expand Up @@ -288,10 +289,16 @@ func (qe *QRepQueryExecutor) ExecuteQueryIntoSink(
})
if err != nil {
qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err))
return 0, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
err := fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
sink.Close(err)
return 0, err
}

return sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...)
totalRecords, err := sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...)
if err != nil {
sink.Close(err)
}
return totalRecords, err
}

func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin(
Expand All @@ -310,16 +317,21 @@ func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin(
})
if err != nil {
qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err))
return 0, currentSnapshotXmin.Int64, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
err := fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
sink.Close(err)
return 0, currentSnapshotXmin.Int64, err
}

err = tx.QueryRow(ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(&currentSnapshotXmin)
if err != nil {
if err := tx.QueryRow(ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(&currentSnapshotXmin); err != nil {
qe.logger.Error("[pg_query_executor] failed to get current snapshot xmin", slog.Any("error", err))
sink.Close(err)
return 0, currentSnapshotXmin.Int64, err
}

totalRecordsFetched, err := sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...)
if err != nil {
sink.Close(err)
}
return totalRecordsFetched, currentSnapshotXmin.Int64, err
}

Expand Down
27 changes: 13 additions & 14 deletions flow/connectors/postgres/sink_pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

type PgCopyShared struct {
schemaLatch chan struct{}
err error
schema []string
schemaSet bool
}
Expand Down Expand Up @@ -54,13 +55,10 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
defer shared.RollbackTx(tx, qe.logger)

if qe.snapshot != "" {
_, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
if err != nil {
if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)); err != nil {
qe.logger.Error("[pg_query_executor] failed to set snapshot",
slog.Any("error", err), slog.String("query", query))
err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
p.Close(err)
return 0, err
return 0, fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
}
}

Expand Down Expand Up @@ -88,17 +86,13 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
if err != nil {
qe.logger.Info("[pg_query_executor] failed to copy",
slog.String("copyQuery", copyQuery), slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to copy: %w", err)
p.Close(err)
return 0, err
return 0, fmt.Errorf("[pg_query_executor] failed to copy: %w", err)
}

qe.logger.Info("Committing transaction")
if err := tx.Commit(ctx); err != nil {
qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
p.Close(err)
return 0, err
return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
}

totalRecordsFetched := ct.RowsAffected()
Expand All @@ -109,15 +103,20 @@ func (p PgCopyWriter) ExecuteQueryWithTx(

func (p PgCopyWriter) Close(err error) {
p.PipeWriter.CloseWithError(err)
p.schema.err = err
p.SetSchema(nil)
}

func (p PgCopyReader) GetColumnNames() []string {
func (p PgCopyReader) GetColumnNames() ([]string, error) {
<-p.schema.schemaLatch
return p.schema.schema
return p.schema.schema, p.schema.err
}

func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) {
cols := p.GetColumnNames()
cols, err := p.GetColumnNames()
if err != nil {
return 0, err
}
quotedCols := make([]string, 0, len(cols))
for _, col := range cols {
quotedCols = append(quotedCols, QuoteIdentifier(col))
Expand Down
29 changes: 15 additions & 14 deletions flow/connectors/postgres/sink_q.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,10 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
defer shared.RollbackTx(tx, qe.logger)

if qe.snapshot != "" {
_, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
if err != nil {
if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)); err != nil {
qe.logger.Error("[pg_query_executor] failed to set snapshot",
slog.Any("error", err), slog.String("query", query))
err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
stream.Close(err)
return 0, err
return 0, fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
}
}

Expand All @@ -47,9 +44,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
if _, err := tx.Exec(ctx, cursorQuery, args...); err != nil {
qe.logger.Info("[pg_query_executor] failed to declare cursor",
slog.String("cursorQuery", cursorQuery), slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err)
stream.Close(err)
return 0, err
return 0, fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err)
}

qe.logger.Info(fmt.Sprintf("[pg_query_executor] declared cursor '%s' for query '%s'", cursorName, query))
Expand All @@ -73,9 +68,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
qe.logger.Info("Committing transaction")
if err := tx.Commit(ctx); err != nil {
qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
stream.Close(err)
return 0, err
return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
}

qe.logger.Info(fmt.Sprintf("[pg_query_executor] committed transaction for query '%s', rows = %d",
Expand All @@ -84,9 +77,17 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
}

func (stream RecordStreamSink) CopyInto(ctx context.Context, _ *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) {
return tx.CopyFrom(ctx, table, stream.GetColumnNames(), model.NewQRecordCopyFromSource(stream.QRecordStream))
columnNames, err := stream.GetColumnNames()
if err != nil {
return 0, err
}
return tx.CopyFrom(ctx, table, columnNames, model.NewQRecordCopyFromSource(stream.QRecordStream))
}

func (stream RecordStreamSink) GetColumnNames() []string {
return stream.Schema().GetColumnNames()
func (stream RecordStreamSink) GetColumnNames() ([]string, error) {
schema, err := stream.Schema()
if err != nil {
return nil, err
}
return schema.GetColumnNames(), nil
}
7 changes: 5 additions & 2 deletions flow/connectors/pubsub/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ func (c *PubSubConnector) SyncQRepRecords(
stream *model.QRecordStream,
) (int, error) {
startTime := time.Now()
numRecords := atomic.Int64{}
schema := stream.Schema()
schema, err := stream.Schema()
if err != nil {
return 0, err
}
topiccache := topicCache{cache: make(map[string]*pubsub.Topic)}
publish := make(chan publishResult, 32)
waitChan := make(chan struct{})
numRecords := atomic.Int64{}

queueCtx, queueErr := context.WithCancelCause(ctx)
pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, &topiccache, publish, queueErr)
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ func (c *S3Connector) SyncQRepRecords(
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, error) {
schema := stream.Schema()
schema, err := stream.Schema()
if err != nil {
return 0, err
}

dstTableName := config.DestinationTableIdentifier
avroSchema, err := getAvroSchema(ctx, config.Env, dstTableName, schema)
Expand Down
Loading

0 comments on commit d365a8b

Please sign in to comment.