From d365a8b2228ff5393189d25e0ac43bd6ded2163b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 3 Jan 2025 17:32:07 +0000 Subject: [PATCH] QRecordSchema: close schemaLatch in Close (#2398) goroutines were leaking waiting on schemaLatch, if provider hit error then schemaLatch could be left indefinitely open --- flow/connectors/bigquery/qrep.go | 10 ++-- flow/connectors/clickhouse/qrep_avro_sync.go | 12 ++++- flow/connectors/elasticsearch/qrep.go | 5 +- flow/connectors/kafka/qrep.go | 5 +- flow/connectors/postgres/qrep.go | 10 ++-- .../postgres/qrep_query_executor.go | 48 ++++++++++++------- flow/connectors/postgres/sink_pg.go | 27 +++++------ flow/connectors/postgres/sink_q.go | 29 +++++------ flow/connectors/pubsub/qrep.go | 7 ++- flow/connectors/s3/qrep.go | 5 +- flow/connectors/snowflake/qrep_avro_sync.go | 13 +++-- flow/connectors/utils/avro/avro_writer.go | 5 +- flow/model/qrecord_stream.go | 7 ++- flow/model/qvalue/qschema.go | 5 +- flow/pua/stream_adapter.go | 6 ++- 15 files changed, 122 insertions(+), 72 deletions(-) diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index b184cc62a9..a6d5c35d7a 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -23,7 +23,10 @@ func (c *BigQueryConnector) SyncQRepRecords( ) (int, error) { // Ensure the destination table is available. destTable := config.DestinationTableIdentifier - srcSchema := stream.Schema() + srcSchema, err := stream.Schema() + if err != nil { + return 0, err + } tblMetadata, err := c.replayTableSchemaDeltasQRep(ctx, config, partition, srcSchema) if err != nil { @@ -80,8 +83,9 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep( } } - err = c.ReplayTableSchemaDeltas(ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta}) - if err != nil { + if err := c.ReplayTableSchemaDeltas( + ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta}, + ); err != nil { return nil, fmt.Errorf("failed to add columns to destination table: %w", err) } dstTableMetadata, err = bqTable.Metadata(ctx) diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 61450dd55c..030de782f4 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -67,7 +67,10 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords( ) (int, error) { dstTableName := s.config.DestinationTableIdentifier - schema := stream.Schema() + schema, err := stream.Schema() + if err != nil { + return 0, err + } s.logger.Info("sync function called and schema acquired", slog.String("dstTable", dstTableName)) @@ -106,7 +109,12 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( stagingPath := s.credsProvider.BucketPath startTime := time.Now() - avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, stream.Schema()) + schema, err := stream.Schema() + if err != nil { + return 0, err + } + + avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, schema) if err != nil { return 0, err } diff --git a/flow/connectors/elasticsearch/qrep.go b/flow/connectors/elasticsearch/qrep.go index 142c6de363..1206438d5f 100644 --- a/flow/connectors/elasticsearch/qrep.go +++ b/flow/connectors/elasticsearch/qrep.go @@ -42,7 +42,10 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * ) (int, error) { startTime := time.Now() - schema := stream.Schema() + schema, err := stream.Schema() + if err != nil { + return 0, err + } var bulkIndexFatalError error var bulkIndexErrors []error diff --git a/flow/connectors/kafka/qrep.go b/flow/connectors/kafka/qrep.go index fb65d91d91..c6b60a564d 100644 --- a/flow/connectors/kafka/qrep.go +++ b/flow/connectors/kafka/qrep.go @@ -27,7 +27,10 @@ func (c *KafkaConnector) SyncQRepRecords( ) (int, error) { startTime := time.Now() numRecords := atomic.Int64{} - schema := stream.Schema() + schema, err := stream.Schema() + if err != nil { + return 0, err + } queueCtx, queueErr := context.WithCancelCause(ctx) pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, nil, queueErr) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 1cd2cd5952..05aa492baa 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -32,7 +32,7 @@ type QRepPullSink interface { } type QRepSyncSink interface { - GetColumnNames() []string + GetColumnNames() ([]string, error) CopyInto(context.Context, *PostgresConnector, pgx.Tx, pgx.Identifier) (int64, error) } @@ -550,7 +550,10 @@ func syncQRepRecords( upsertMatchCols[col] = struct{}{} } - columnNames := sink.GetColumnNames() + columnNames, err := sink.GetColumnNames() + if err != nil { + return -1, fmt.Errorf("faild to get column names: %w", err) + } setClauseArray := make([]string, 0, len(upsertMatchColsList)+1) selectStrArray := make([]string, 0, len(columnNames)) for _, col := range columnNames { @@ -578,8 +581,7 @@ func syncQRepRecords( setClause, ) c.logger.Info("Performing upsert operation", slog.String("upsertStmt", upsertStmt), syncLog) - _, err := tx.Exec(ctx, upsertStmt) - if err != nil { + if _, err := tx.Exec(ctx, upsertStmt); err != nil { return -1, fmt.Errorf("failed to perform upsert operation: %w", err) } } diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 339c54a633..b8c14ee2e7 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -161,9 +161,7 @@ func (qe *QRepQueryExecutor) processRowsStream( record, err := qe.mapRowToQRecord(rows, fieldDescriptions) if err != nil { qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err)) - err := fmt.Errorf("failed to map row to QRecord: %w", err) - stream.Close(err) - return 0, err + return 0, fmt.Errorf("failed to map row to QRecord: %w", err) } stream.Records <- record @@ -189,12 +187,10 @@ func (qe *QRepQueryExecutor) processFetchedRows( ) (int, error) { rows, err := qe.executeQueryInTx(ctx, tx, cursorName, fetchSize) if err != nil { - stream.Close(err) qe.logger.Error("[pg_query_executor] failed to execute query in tx", slog.Any("error", err), slog.String("query", query)) return 0, fmt.Errorf("[pg_query_executor] failed to execute query in tx: %w", err) } - defer rows.Close() fieldDescriptions := rows.FieldDescriptions() @@ -210,7 +206,6 @@ func (qe *QRepQueryExecutor) processFetchedRows( } if err := rows.Err(); err != nil { - stream.Close(err) qe.logger.Error("[pg_query_executor] row iteration failed", slog.String("query", query), slog.Any("error", err)) return 0, fmt.Errorf("[pg_query_executor] row iteration failed '%s': %w", query, err) @@ -225,7 +220,8 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery( args ...interface{}, ) (*model.QRecordBatch, error) { stream := model.NewQRecordStream(1024) - errors := make(chan error, 1) + errors := make(chan struct{}) + var errorsError error qe.logger.Info("Executing and processing query", slog.String("query", query)) // must wait on errors to close before returning to maintain qe.conn exclusion @@ -234,23 +230,28 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery( _, err := qe.ExecuteAndProcessQueryStream(ctx, stream, query, args...) if err != nil { qe.logger.Error("[pg_query_executor] failed to execute and process query stream", slog.Any("error", err)) - errors <- err + errorsError = err } }() select { - case err := <-errors: - return nil, err + case <-errors: + return nil, errorsError case <-stream.SchemaChan(): + schema, err := stream.Schema() + if err != nil { + return nil, err + } batch := &model.QRecordBatch{ - Schema: stream.Schema(), + Schema: schema, Records: nil, } for record := range stream.Records { batch.Records = append(batch.Records, record) } - if err := <-errors; err != nil { - return nil, err + <-errors + if errorsError != nil { + return nil, errorsError } if err := stream.Err(); err != nil { return nil, fmt.Errorf("[pg] failed to get record from stream: %w", err) @@ -288,10 +289,16 @@ func (qe *QRepQueryExecutor) ExecuteQueryIntoSink( }) if err != nil { qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err)) - return 0, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err) + err := fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err) + sink.Close(err) + return 0, err } - return sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...) + totalRecords, err := sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...) + if err != nil { + sink.Close(err) + } + return totalRecords, err } func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin( @@ -310,16 +317,21 @@ func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin( }) if err != nil { qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err)) - return 0, currentSnapshotXmin.Int64, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err) + err := fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err) + sink.Close(err) + return 0, currentSnapshotXmin.Int64, err } - err = tx.QueryRow(ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(¤tSnapshotXmin) - if err != nil { + if err := tx.QueryRow(ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(¤tSnapshotXmin); err != nil { qe.logger.Error("[pg_query_executor] failed to get current snapshot xmin", slog.Any("error", err)) + sink.Close(err) return 0, currentSnapshotXmin.Int64, err } totalRecordsFetched, err := sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...) + if err != nil { + sink.Close(err) + } return totalRecordsFetched, currentSnapshotXmin.Int64, err } diff --git a/flow/connectors/postgres/sink_pg.go b/flow/connectors/postgres/sink_pg.go index 8d9d06e747..cf62840925 100644 --- a/flow/connectors/postgres/sink_pg.go +++ b/flow/connectors/postgres/sink_pg.go @@ -15,6 +15,7 @@ import ( type PgCopyShared struct { schemaLatch chan struct{} + err error schema []string schemaSet bool } @@ -54,13 +55,10 @@ func (p PgCopyWriter) ExecuteQueryWithTx( defer shared.RollbackTx(tx, qe.logger) if qe.snapshot != "" { - _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)) - if err != nil { + if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)); err != nil { qe.logger.Error("[pg_query_executor] failed to set snapshot", slog.Any("error", err), slog.String("query", query)) - err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err) - p.Close(err) - return 0, err + return 0, fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err) } } @@ -88,17 +86,13 @@ func (p PgCopyWriter) ExecuteQueryWithTx( if err != nil { qe.logger.Info("[pg_query_executor] failed to copy", slog.String("copyQuery", copyQuery), slog.Any("error", err)) - err = fmt.Errorf("[pg_query_executor] failed to copy: %w", err) - p.Close(err) - return 0, err + return 0, fmt.Errorf("[pg_query_executor] failed to copy: %w", err) } qe.logger.Info("Committing transaction") if err := tx.Commit(ctx); err != nil { qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err)) - err = fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err) - p.Close(err) - return 0, err + return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err) } totalRecordsFetched := ct.RowsAffected() @@ -109,15 +103,20 @@ func (p PgCopyWriter) ExecuteQueryWithTx( func (p PgCopyWriter) Close(err error) { p.PipeWriter.CloseWithError(err) + p.schema.err = err + p.SetSchema(nil) } -func (p PgCopyReader) GetColumnNames() []string { +func (p PgCopyReader) GetColumnNames() ([]string, error) { <-p.schema.schemaLatch - return p.schema.schema + return p.schema.schema, p.schema.err } func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) { - cols := p.GetColumnNames() + cols, err := p.GetColumnNames() + if err != nil { + return 0, err + } quotedCols := make([]string, 0, len(cols)) for _, col := range cols { quotedCols = append(quotedCols, QuoteIdentifier(col)) diff --git a/flow/connectors/postgres/sink_q.go b/flow/connectors/postgres/sink_q.go index 21a39627be..648280e220 100644 --- a/flow/connectors/postgres/sink_q.go +++ b/flow/connectors/postgres/sink_q.go @@ -26,13 +26,10 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( defer shared.RollbackTx(tx, qe.logger) if qe.snapshot != "" { - _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)) - if err != nil { + if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)); err != nil { qe.logger.Error("[pg_query_executor] failed to set snapshot", slog.Any("error", err), slog.String("query", query)) - err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err) - stream.Close(err) - return 0, err + return 0, fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err) } } @@ -47,9 +44,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( if _, err := tx.Exec(ctx, cursorQuery, args...); err != nil { qe.logger.Info("[pg_query_executor] failed to declare cursor", slog.String("cursorQuery", cursorQuery), slog.Any("error", err)) - err = fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err) - stream.Close(err) - return 0, err + return 0, fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err) } qe.logger.Info(fmt.Sprintf("[pg_query_executor] declared cursor '%s' for query '%s'", cursorName, query)) @@ -73,9 +68,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( qe.logger.Info("Committing transaction") if err := tx.Commit(ctx); err != nil { qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err)) - err = fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err) - stream.Close(err) - return 0, err + return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err) } qe.logger.Info(fmt.Sprintf("[pg_query_executor] committed transaction for query '%s', rows = %d", @@ -84,9 +77,17 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( } func (stream RecordStreamSink) CopyInto(ctx context.Context, _ *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) { - return tx.CopyFrom(ctx, table, stream.GetColumnNames(), model.NewQRecordCopyFromSource(stream.QRecordStream)) + columnNames, err := stream.GetColumnNames() + if err != nil { + return 0, err + } + return tx.CopyFrom(ctx, table, columnNames, model.NewQRecordCopyFromSource(stream.QRecordStream)) } -func (stream RecordStreamSink) GetColumnNames() []string { - return stream.Schema().GetColumnNames() +func (stream RecordStreamSink) GetColumnNames() ([]string, error) { + schema, err := stream.Schema() + if err != nil { + return nil, err + } + return schema.GetColumnNames(), nil } diff --git a/flow/connectors/pubsub/qrep.go b/flow/connectors/pubsub/qrep.go index 297d78409a..5d84f6c22e 100644 --- a/flow/connectors/pubsub/qrep.go +++ b/flow/connectors/pubsub/qrep.go @@ -25,11 +25,14 @@ func (c *PubSubConnector) SyncQRepRecords( stream *model.QRecordStream, ) (int, error) { startTime := time.Now() - numRecords := atomic.Int64{} - schema := stream.Schema() + schema, err := stream.Schema() + if err != nil { + return 0, err + } topiccache := topicCache{cache: make(map[string]*pubsub.Topic)} publish := make(chan publishResult, 32) waitChan := make(chan struct{}) + numRecords := atomic.Int64{} queueCtx, queueErr := context.WithCancelCause(ctx) pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, &topiccache, publish, queueErr) diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index 968c956aab..41e1c7d05c 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -17,7 +17,10 @@ func (c *S3Connector) SyncQRepRecords( partition *protos.QRepPartition, stream *model.QRecordStream, ) (int, error) { - schema := stream.Schema() + schema, err := stream.Schema() + if err != nil { + return 0, err + } dstTableName := config.DestinationTableIdentifier avroSchema, err := getAvroSchema(ctx, config.Env, dstTableName, schema) diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 728d393e62..dcaa28911a 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -44,7 +44,10 @@ func (s *SnowflakeAvroSyncHandler) SyncRecords( tableLog := slog.String("destinationTable", s.config.DestinationTableIdentifier) dstTableName := s.config.DestinationTableIdentifier - schema := stream.Schema() + schema, err := stream.Schema() + if err != nil { + return 0, stream.Err() + } s.logger.Info("sync function called and schema acquired", tableLog) @@ -95,11 +98,13 @@ func (s *SnowflakeAvroSyncHandler) SyncQRepRecords( startTime := time.Now() dstTableName := config.DestinationTableIdentifier - schema := stream.Schema() + schema, err := stream.Schema() + if err != nil { + return 0, err + } s.logger.Info("sync function called and schema acquired", partitionLog) - err := s.addMissingColumns(ctx, config.Env, schema, dstTableSchema, dstTableName, partition) - if err != nil { + if err := s.addMissingColumns(ctx, config.Env, schema, dstTableSchema, dstTableName, partition); err != nil { return 0, err } diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 75bc9f4358..6357c46d94 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -129,7 +129,10 @@ func (p *peerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[string]string, ocfWriter *goavro.OCFWriter) (int64, error) { logger := shared.LoggerFromCtx(ctx) - schema := p.stream.Schema() + schema, err := p.stream.Schema() + if err != nil { + return 0, err + } avroConverter, err := model.NewQRecordAvroConverter( ctx, diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 054d6a42b1..eeca424b35 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -22,9 +22,9 @@ func NewQRecordStream(buffer int) *QRecordStream { } } -func (s *QRecordStream) Schema() qvalue.QRecordSchema { +func (s *QRecordStream) Schema() (qvalue.QRecordSchema, error) { <-s.schemaLatch - return s.schema + return s.schema, s.Err() } func (s *QRecordStream) SetSchema(schema qvalue.QRecordSchema) { @@ -55,4 +55,7 @@ func (s *QRecordStream) Close(err error) { s.err = err close(s.Records) } + if !s.schemaSet { + s.SetSchema(qvalue.QRecordSchema{}) + } } diff --git a/flow/model/qvalue/qschema.go b/flow/model/qvalue/qschema.go index a6632fdf5f..9e778e4f85 100644 --- a/flow/model/qvalue/qschema.go +++ b/flow/model/qvalue/qschema.go @@ -34,10 +34,7 @@ func (q QRecordSchema) EqualNames(other QRecordSchema) bool { } for i, field := range q.Fields { - // ignore the case of the field name convert to lower case - f1 := strings.ToLower(field.Name) - f2 := strings.ToLower(other.Fields[i].Name) - if f1 != f2 { + if !strings.EqualFold(field.Name, other.Fields[i].Name) { return false } } diff --git a/flow/pua/stream_adapter.go b/flow/pua/stream_adapter.go index a03e68c919..9f6f3f8d3e 100644 --- a/flow/pua/stream_adapter.go +++ b/flow/pua/stream_adapter.go @@ -11,7 +11,11 @@ import ( func AttachToStream(ls *lua.LState, lfn *lua.LFunction, stream *model.QRecordStream) *model.QRecordStream { output := model.NewQRecordStream(0) go func() { - schema := stream.Schema() + schema, err := stream.Schema() + if err != nil { + output.Close(err) + return + } output.SetSchema(schema) for record := range stream.Records { row := model.NewRecordItems(len(record))