Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QRecordSchema: close schemaLatch in Close #2398

Merged
merged 5 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ func (c *BigQueryConnector) SyncQRepRecords(
) (int, error) {
// Ensure the destination table is available.
destTable := config.DestinationTableIdentifier
srcSchema := stream.Schema()
srcSchema, err := stream.Schema()
if err != nil {
return 0, err
}

tblMetadata, err := c.replayTableSchemaDeltasQRep(ctx, config, partition, srcSchema)
if err != nil {
Expand Down Expand Up @@ -80,8 +83,9 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(
}
}

err = c.ReplayTableSchemaDeltas(ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta})
if err != nil {
if err := c.ReplayTableSchemaDeltas(
ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta},
); err != nil {
return nil, fmt.Errorf("failed to add columns to destination table: %w", err)
}
dstTableMetadata, err = bqTable.Metadata(ctx)
Expand Down
12 changes: 10 additions & 2 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords(
) (int, error) {
dstTableName := s.config.DestinationTableIdentifier

schema := stream.Schema()
schema, err := stream.Schema()
if err != nil {
return 0, err
}
s.logger.Info("sync function called and schema acquired",
slog.String("dstTable", dstTableName))

Expand Down Expand Up @@ -106,7 +109,12 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
stagingPath := s.credsProvider.BucketPath
startTime := time.Now()

avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, stream.Schema())
schema, err := stream.Schema()
if err != nil {
return 0, err
}

avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, schema)
if err != nil {
return 0, err
}
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/elasticsearch/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config *
) (int, error) {
startTime := time.Now()

schema := stream.Schema()
schema, err := stream.Schema()
if err != nil {
return 0, err
}

var bulkIndexFatalError error
var bulkIndexErrors []error
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/kafka/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ func (c *KafkaConnector) SyncQRepRecords(
) (int, error) {
startTime := time.Now()
numRecords := atomic.Int64{}
schema := stream.Schema()
schema, err := stream.Schema()
if err != nil {
return 0, err
}

queueCtx, queueErr := context.WithCancelCause(ctx)
pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, nil, queueErr)
Expand Down
10 changes: 6 additions & 4 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type QRepPullSink interface {
}

type QRepSyncSink interface {
GetColumnNames() []string
GetColumnNames() ([]string, error)
CopyInto(context.Context, *PostgresConnector, pgx.Tx, pgx.Identifier) (int64, error)
}

Expand Down Expand Up @@ -550,7 +550,10 @@ func syncQRepRecords(
upsertMatchCols[col] = struct{}{}
}

columnNames := sink.GetColumnNames()
columnNames, err := sink.GetColumnNames()
if err != nil {
return -1, fmt.Errorf("faild to get column names: %w", err)
}
setClauseArray := make([]string, 0, len(upsertMatchColsList)+1)
selectStrArray := make([]string, 0, len(columnNames))
for _, col := range columnNames {
Expand Down Expand Up @@ -578,8 +581,7 @@ func syncQRepRecords(
setClause,
)
c.logger.Info("Performing upsert operation", slog.String("upsertStmt", upsertStmt), syncLog)
_, err := tx.Exec(ctx, upsertStmt)
if err != nil {
if _, err := tx.Exec(ctx, upsertStmt); err != nil {
return -1, fmt.Errorf("failed to perform upsert operation: %w", err)
}
}
Expand Down
48 changes: 30 additions & 18 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ func (qe *QRepQueryExecutor) processRowsStream(
record, err := qe.mapRowToQRecord(rows, fieldDescriptions)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err))
err := fmt.Errorf("failed to map row to QRecord: %w", err)
stream.Close(err)
return 0, err
return 0, fmt.Errorf("failed to map row to QRecord: %w", err)
}

stream.Records <- record
Expand All @@ -189,12 +187,10 @@ func (qe *QRepQueryExecutor) processFetchedRows(
) (int, error) {
rows, err := qe.executeQueryInTx(ctx, tx, cursorName, fetchSize)
if err != nil {
stream.Close(err)
qe.logger.Error("[pg_query_executor] failed to execute query in tx",
slog.Any("error", err), slog.String("query", query))
return 0, fmt.Errorf("[pg_query_executor] failed to execute query in tx: %w", err)
}

defer rows.Close()

fieldDescriptions := rows.FieldDescriptions()
Expand All @@ -210,7 +206,6 @@ func (qe *QRepQueryExecutor) processFetchedRows(
}

if err := rows.Err(); err != nil {
stream.Close(err)
qe.logger.Error("[pg_query_executor] row iteration failed",
slog.String("query", query), slog.Any("error", err))
return 0, fmt.Errorf("[pg_query_executor] row iteration failed '%s': %w", query, err)
Expand All @@ -225,7 +220,8 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
args ...interface{},
) (*model.QRecordBatch, error) {
stream := model.NewQRecordStream(1024)
errors := make(chan error, 1)
errors := make(chan struct{})
var errorsError error
qe.logger.Info("Executing and processing query", slog.String("query", query))

// must wait on errors to close before returning to maintain qe.conn exclusion
Expand All @@ -234,23 +230,28 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
_, err := qe.ExecuteAndProcessQueryStream(ctx, stream, query, args...)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to execute and process query stream", slog.Any("error", err))
errors <- err
errorsError = err
}
}()

select {
case err := <-errors:
return nil, err
case <-errors:
return nil, errorsError
case <-stream.SchemaChan():
schema, err := stream.Schema()
if err != nil {
return nil, err
}
batch := &model.QRecordBatch{
Schema: stream.Schema(),
Schema: schema,
Records: nil,
}
for record := range stream.Records {
batch.Records = append(batch.Records, record)
}
if err := <-errors; err != nil {
return nil, err
<-errors
if errorsError != nil {
return nil, errorsError
}
if err := stream.Err(); err != nil {
return nil, fmt.Errorf("[pg] failed to get record from stream: %w", err)
Expand Down Expand Up @@ -288,10 +289,16 @@ func (qe *QRepQueryExecutor) ExecuteQueryIntoSink(
})
if err != nil {
qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err))
return 0, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
err := fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
sink.Close(err)
return 0, err
}

return sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...)
totalRecords, err := sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...)
if err != nil {
sink.Close(err)
}
return totalRecords, err
}

func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin(
Expand All @@ -310,16 +317,21 @@ func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin(
})
if err != nil {
qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err))
return 0, currentSnapshotXmin.Int64, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
err := fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
sink.Close(err)
return 0, currentSnapshotXmin.Int64, err
}

err = tx.QueryRow(ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(&currentSnapshotXmin)
if err != nil {
if err := tx.QueryRow(ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(&currentSnapshotXmin); err != nil {
qe.logger.Error("[pg_query_executor] failed to get current snapshot xmin", slog.Any("error", err))
sink.Close(err)
return 0, currentSnapshotXmin.Int64, err
}

totalRecordsFetched, err := sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...)
if err != nil {
sink.Close(err)
}
return totalRecordsFetched, currentSnapshotXmin.Int64, err
}

Expand Down
27 changes: 13 additions & 14 deletions flow/connectors/postgres/sink_pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

type PgCopyShared struct {
schemaLatch chan struct{}
err error
schema []string
schemaSet bool
}
Expand Down Expand Up @@ -54,13 +55,10 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
defer shared.RollbackTx(tx, qe.logger)

if qe.snapshot != "" {
_, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
if err != nil {
if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)); err != nil {
qe.logger.Error("[pg_query_executor] failed to set snapshot",
slog.Any("error", err), slog.String("query", query))
err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
p.Close(err)
return 0, err
return 0, fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
}
}

Expand Down Expand Up @@ -88,17 +86,13 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
if err != nil {
qe.logger.Info("[pg_query_executor] failed to copy",
slog.String("copyQuery", copyQuery), slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to copy: %w", err)
p.Close(err)
return 0, err
return 0, fmt.Errorf("[pg_query_executor] failed to copy: %w", err)
}

qe.logger.Info("Committing transaction")
if err := tx.Commit(ctx); err != nil {
qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
p.Close(err)
return 0, err
return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
}

totalRecordsFetched := ct.RowsAffected()
Expand All @@ -109,15 +103,20 @@ func (p PgCopyWriter) ExecuteQueryWithTx(

func (p PgCopyWriter) Close(err error) {
p.PipeWriter.CloseWithError(err)
p.schema.err = err
p.SetSchema(nil)
}

func (p PgCopyReader) GetColumnNames() []string {
func (p PgCopyReader) GetColumnNames() ([]string, error) {
<-p.schema.schemaLatch
return p.schema.schema
return p.schema.schema, p.schema.err
}

func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) {
cols := p.GetColumnNames()
cols, err := p.GetColumnNames()
if err != nil {
return 0, err
}
quotedCols := make([]string, 0, len(cols))
for _, col := range cols {
quotedCols = append(quotedCols, QuoteIdentifier(col))
Expand Down
29 changes: 15 additions & 14 deletions flow/connectors/postgres/sink_q.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,10 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
defer shared.RollbackTx(tx, qe.logger)

if qe.snapshot != "" {
_, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
if err != nil {
if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)); err != nil {
qe.logger.Error("[pg_query_executor] failed to set snapshot",
slog.Any("error", err), slog.String("query", query))
err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
stream.Close(err)
return 0, err
return 0, fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
}
}

Expand All @@ -47,9 +44,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
if _, err := tx.Exec(ctx, cursorQuery, args...); err != nil {
qe.logger.Info("[pg_query_executor] failed to declare cursor",
slog.String("cursorQuery", cursorQuery), slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err)
stream.Close(err)
return 0, err
return 0, fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err)
}

qe.logger.Info(fmt.Sprintf("[pg_query_executor] declared cursor '%s' for query '%s'", cursorName, query))
Expand All @@ -73,9 +68,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
qe.logger.Info("Committing transaction")
if err := tx.Commit(ctx); err != nil {
qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
stream.Close(err)
return 0, err
return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
}

qe.logger.Info(fmt.Sprintf("[pg_query_executor] committed transaction for query '%s', rows = %d",
Expand All @@ -84,9 +77,17 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
}

func (stream RecordStreamSink) CopyInto(ctx context.Context, _ *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) {
return tx.CopyFrom(ctx, table, stream.GetColumnNames(), model.NewQRecordCopyFromSource(stream.QRecordStream))
columnNames, err := stream.GetColumnNames()
if err != nil {
return 0, err
}
return tx.CopyFrom(ctx, table, columnNames, model.NewQRecordCopyFromSource(stream.QRecordStream))
}

func (stream RecordStreamSink) GetColumnNames() []string {
return stream.Schema().GetColumnNames()
func (stream RecordStreamSink) GetColumnNames() ([]string, error) {
schema, err := stream.Schema()
if err != nil {
return nil, err
}
return schema.GetColumnNames(), nil
}
7 changes: 5 additions & 2 deletions flow/connectors/pubsub/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ func (c *PubSubConnector) SyncQRepRecords(
stream *model.QRecordStream,
) (int, error) {
startTime := time.Now()
numRecords := atomic.Int64{}
schema := stream.Schema()
schema, err := stream.Schema()
if err != nil {
return 0, err
}
topiccache := topicCache{cache: make(map[string]*pubsub.Topic)}
publish := make(chan publishResult, 32)
waitChan := make(chan struct{})
numRecords := atomic.Int64{}

queueCtx, queueErr := context.WithCancelCause(ctx)
pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, &topiccache, publish, queueErr)
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ func (c *S3Connector) SyncQRepRecords(
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, error) {
schema := stream.Schema()
schema, err := stream.Schema()
if err != nil {
return 0, err
}

dstTableName := config.DestinationTableIdentifier
avroSchema, err := getAvroSchema(ctx, config.Env, dstTableName, schema)
Expand Down
Loading
Loading