From bc560037f1170f0e59712f2379a17bf301ef4486 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 27 Dec 2024 14:34:15 +0000 Subject: [PATCH 1/4] QRecordSchema: close schemaLatch in Close goroutines were leaking waiting on schemaLatch, if provider hit error then schemaLatch could be left indefinitely open --- flow/connectors/bigquery/qrep.go | 3 +++ flow/connectors/clickhouse/qrep_avro_sync.go | 3 +++ flow/connectors/elasticsearch/qrep.go | 3 +++ flow/connectors/kafka/qrep.go | 3 +++ flow/connectors/postgres/qrep_query_executor.go | 1 - flow/connectors/pubsub/qrep.go | 5 ++++- flow/connectors/s3/qrep.go | 3 +++ flow/connectors/snowflake/qrep_avro_sync.go | 3 +++ flow/connectors/utils/avro/avro_writer.go | 3 +++ flow/model/qrecord_stream.go | 3 +++ flow/model/qvalue/qschema.go | 5 +---- 11 files changed, 29 insertions(+), 6 deletions(-) diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index b184cc62a9..b554a3e0a4 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -24,6 +24,9 @@ func (c *BigQueryConnector) SyncQRepRecords( // Ensure the destination table is available. destTable := config.DestinationTableIdentifier srcSchema := stream.Schema() + if srcSchema.Fields == nil { + return 0, stream.Err() + } tblMetadata, err := c.replayTableSchemaDeltasQRep(ctx, config, partition, srcSchema) if err != nil { diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 61450dd55c..9fa3fb5917 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -68,6 +68,9 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords( dstTableName := s.config.DestinationTableIdentifier schema := stream.Schema() + if schema.Fields == nil { + return 0, stream.Err() + } s.logger.Info("sync function called and schema acquired", slog.String("dstTable", dstTableName)) diff --git a/flow/connectors/elasticsearch/qrep.go b/flow/connectors/elasticsearch/qrep.go index 142c6de363..f3955027be 100644 --- a/flow/connectors/elasticsearch/qrep.go +++ b/flow/connectors/elasticsearch/qrep.go @@ -43,6 +43,9 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * startTime := time.Now() schema := stream.Schema() + if schema.Fields == nil { + return 0, stream.Err() + } var bulkIndexFatalError error var bulkIndexErrors []error diff --git a/flow/connectors/kafka/qrep.go b/flow/connectors/kafka/qrep.go index fb65d91d91..5cd384588d 100644 --- a/flow/connectors/kafka/qrep.go +++ b/flow/connectors/kafka/qrep.go @@ -28,6 +28,9 @@ func (c *KafkaConnector) SyncQRepRecords( startTime := time.Now() numRecords := atomic.Int64{} schema := stream.Schema() + if schema.Fields == nil { + return 0, stream.Err() + } queueCtx, queueErr := context.WithCancelCause(ctx) pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, nil, queueErr) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 339c54a633..4c99748727 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -194,7 +194,6 @@ func (qe *QRepQueryExecutor) processFetchedRows( slog.Any("error", err), slog.String("query", query)) return 0, fmt.Errorf("[pg_query_executor] failed to execute query in tx: %w", err) } - defer rows.Close() fieldDescriptions := rows.FieldDescriptions() diff --git a/flow/connectors/pubsub/qrep.go b/flow/connectors/pubsub/qrep.go index 297d78409a..2579f773b8 100644 --- a/flow/connectors/pubsub/qrep.go +++ b/flow/connectors/pubsub/qrep.go @@ -25,11 +25,14 @@ func (c *PubSubConnector) SyncQRepRecords( stream *model.QRecordStream, ) (int, error) { startTime := time.Now() - numRecords := atomic.Int64{} schema := stream.Schema() + if schema.Fields == nil { + return 0, stream.Err() + } topiccache := topicCache{cache: make(map[string]*pubsub.Topic)} publish := make(chan publishResult, 32) waitChan := make(chan struct{}) + numRecords := atomic.Int64{} queueCtx, queueErr := context.WithCancelCause(ctx) pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, &topiccache, publish, queueErr) diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index 968c956aab..8f95442f73 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -18,6 +18,9 @@ func (c *S3Connector) SyncQRepRecords( stream *model.QRecordStream, ) (int, error) { schema := stream.Schema() + if schema.Fields == nil { + return 0, stream.Err() + } dstTableName := config.DestinationTableIdentifier avroSchema, err := getAvroSchema(ctx, config.Env, dstTableName, schema) diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 728d393e62..a4bfc9b777 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -45,6 +45,9 @@ func (s *SnowflakeAvroSyncHandler) SyncRecords( dstTableName := s.config.DestinationTableIdentifier schema := stream.Schema() + if schema.Fields == nil { + return 0, stream.Err() + } s.logger.Info("sync function called and schema acquired", tableLog) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 75bc9f4358..70aba1dae3 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -130,6 +130,9 @@ func (p *peerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[string]string, ocfWriter *goavro.OCFWriter) (int64, error) { logger := shared.LoggerFromCtx(ctx) schema := p.stream.Schema() + if schema.Fields == nil { + return 0, p.stream.Err() + } avroConverter, err := model.NewQRecordAvroConverter( ctx, diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 054d6a42b1..1a91bd062c 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -55,4 +55,7 @@ func (s *QRecordStream) Close(err error) { s.err = err close(s.Records) } + if !s.schemaSet { + s.SetSchema(qvalue.QRecordSchema{}) + } } diff --git a/flow/model/qvalue/qschema.go b/flow/model/qvalue/qschema.go index a6632fdf5f..9e778e4f85 100644 --- a/flow/model/qvalue/qschema.go +++ b/flow/model/qvalue/qschema.go @@ -34,10 +34,7 @@ func (q QRecordSchema) EqualNames(other QRecordSchema) bool { } for i, field := range q.Fields { - // ignore the case of the field name convert to lower case - f1 := strings.ToLower(field.Name) - f2 := strings.ToLower(other.Fields[i].Name) - if f1 != f2 { + if !strings.EqualFold(field.Name, other.Fields[i].Name) { return false } } From 9674a515144ea0a14023c54dba2b418ba5bab633 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 27 Dec 2024 16:20:46 +0000 Subject: [PATCH 2/4] Schema: return error rather than caller having to check Fields == nil --- flow/connectors/bigquery/qrep.go | 11 +++++----- flow/connectors/clickhouse/qrep_avro_sync.go | 13 ++++++++---- flow/connectors/elasticsearch/qrep.go | 6 +++--- flow/connectors/kafka/qrep.go | 6 +++--- flow/connectors/postgres/qrep.go | 10 ++++++---- .../postgres/qrep_query_executor.go | 20 ++++++++++++------- flow/connectors/postgres/sink_pg.go | 12 ++++++++--- flow/connectors/postgres/sink_q.go | 14 ++++++++++--- flow/connectors/pubsub/qrep.go | 6 +++--- flow/connectors/s3/qrep.go | 6 +++--- flow/connectors/snowflake/qrep_avro_sync.go | 12 ++++++----- flow/connectors/utils/avro/avro_writer.go | 6 +++--- flow/model/qrecord_stream.go | 4 ++-- flow/pua/stream_adapter.go | 6 +++++- 14 files changed, 83 insertions(+), 49 deletions(-) diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index b554a3e0a4..a6d5c35d7a 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -23,9 +23,9 @@ func (c *BigQueryConnector) SyncQRepRecords( ) (int, error) { // Ensure the destination table is available. destTable := config.DestinationTableIdentifier - srcSchema := stream.Schema() - if srcSchema.Fields == nil { - return 0, stream.Err() + srcSchema, err := stream.Schema() + if err != nil { + return 0, err } tblMetadata, err := c.replayTableSchemaDeltasQRep(ctx, config, partition, srcSchema) @@ -83,8 +83,9 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep( } } - err = c.ReplayTableSchemaDeltas(ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta}) - if err != nil { + if err := c.ReplayTableSchemaDeltas( + ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta}, + ); err != nil { return nil, fmt.Errorf("failed to add columns to destination table: %w", err) } dstTableMetadata, err = bqTable.Metadata(ctx) diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 9fa3fb5917..030de782f4 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -67,9 +67,9 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords( ) (int, error) { dstTableName := s.config.DestinationTableIdentifier - schema := stream.Schema() - if schema.Fields == nil { - return 0, stream.Err() + schema, err := stream.Schema() + if err != nil { + return 0, err } s.logger.Info("sync function called and schema acquired", slog.String("dstTable", dstTableName)) @@ -109,7 +109,12 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( stagingPath := s.credsProvider.BucketPath startTime := time.Now() - avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, stream.Schema()) + schema, err := stream.Schema() + if err != nil { + return 0, err + } + + avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, schema) if err != nil { return 0, err } diff --git a/flow/connectors/elasticsearch/qrep.go b/flow/connectors/elasticsearch/qrep.go index f3955027be..1206438d5f 100644 --- a/flow/connectors/elasticsearch/qrep.go +++ b/flow/connectors/elasticsearch/qrep.go @@ -42,9 +42,9 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * ) (int, error) { startTime := time.Now() - schema := stream.Schema() - if schema.Fields == nil { - return 0, stream.Err() + schema, err := stream.Schema() + if err != nil { + return 0, err } var bulkIndexFatalError error diff --git a/flow/connectors/kafka/qrep.go b/flow/connectors/kafka/qrep.go index 5cd384588d..c6b60a564d 100644 --- a/flow/connectors/kafka/qrep.go +++ b/flow/connectors/kafka/qrep.go @@ -27,9 +27,9 @@ func (c *KafkaConnector) SyncQRepRecords( ) (int, error) { startTime := time.Now() numRecords := atomic.Int64{} - schema := stream.Schema() - if schema.Fields == nil { - return 0, stream.Err() + schema, err := stream.Schema() + if err != nil { + return 0, err } queueCtx, queueErr := context.WithCancelCause(ctx) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 1cd2cd5952..05aa492baa 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -32,7 +32,7 @@ type QRepPullSink interface { } type QRepSyncSink interface { - GetColumnNames() []string + GetColumnNames() ([]string, error) CopyInto(context.Context, *PostgresConnector, pgx.Tx, pgx.Identifier) (int64, error) } @@ -550,7 +550,10 @@ func syncQRepRecords( upsertMatchCols[col] = struct{}{} } - columnNames := sink.GetColumnNames() + columnNames, err := sink.GetColumnNames() + if err != nil { + return -1, fmt.Errorf("faild to get column names: %w", err) + } setClauseArray := make([]string, 0, len(upsertMatchColsList)+1) selectStrArray := make([]string, 0, len(columnNames)) for _, col := range columnNames { @@ -578,8 +581,7 @@ func syncQRepRecords( setClause, ) c.logger.Info("Performing upsert operation", slog.String("upsertStmt", upsertStmt), syncLog) - _, err := tx.Exec(ctx, upsertStmt) - if err != nil { + if _, err := tx.Exec(ctx, upsertStmt); err != nil { return -1, fmt.Errorf("failed to perform upsert operation: %w", err) } } diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 4c99748727..c548c9d76f 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -224,7 +224,8 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery( args ...interface{}, ) (*model.QRecordBatch, error) { stream := model.NewQRecordStream(1024) - errors := make(chan error, 1) + errors := make(chan struct{}) + var errorsError error qe.logger.Info("Executing and processing query", slog.String("query", query)) // must wait on errors to close before returning to maintain qe.conn exclusion @@ -233,23 +234,28 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery( _, err := qe.ExecuteAndProcessQueryStream(ctx, stream, query, args...) if err != nil { qe.logger.Error("[pg_query_executor] failed to execute and process query stream", slog.Any("error", err)) - errors <- err + errorsError = err } }() select { - case err := <-errors: - return nil, err + case <-errors: + return nil, errorsError case <-stream.SchemaChan(): + schema, err := stream.Schema() + if err != nil { + return nil, err + } batch := &model.QRecordBatch{ - Schema: stream.Schema(), + Schema: schema, Records: nil, } for record := range stream.Records { batch.Records = append(batch.Records, record) } - if err := <-errors; err != nil { - return nil, err + <-errors + if errorsError != nil { + return nil, errorsError } if err := stream.Err(); err != nil { return nil, fmt.Errorf("[pg] failed to get record from stream: %w", err) diff --git a/flow/connectors/postgres/sink_pg.go b/flow/connectors/postgres/sink_pg.go index 8d9d06e747..000642a2da 100644 --- a/flow/connectors/postgres/sink_pg.go +++ b/flow/connectors/postgres/sink_pg.go @@ -15,6 +15,7 @@ import ( type PgCopyShared struct { schemaLatch chan struct{} + err error schema []string schemaSet bool } @@ -109,15 +110,20 @@ func (p PgCopyWriter) ExecuteQueryWithTx( func (p PgCopyWriter) Close(err error) { p.PipeWriter.CloseWithError(err) + p.schema.err = err + p.SetSchema(nil) } -func (p PgCopyReader) GetColumnNames() []string { +func (p PgCopyReader) GetColumnNames() ([]string, error) { <-p.schema.schemaLatch - return p.schema.schema + return p.schema.schema, p.schema.err } func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) { - cols := p.GetColumnNames() + cols, err := p.GetColumnNames() + if err != nil { + return 0, err + } quotedCols := make([]string, 0, len(cols)) for _, col := range cols { quotedCols = append(quotedCols, QuoteIdentifier(col)) diff --git a/flow/connectors/postgres/sink_q.go b/flow/connectors/postgres/sink_q.go index 21a39627be..5c5c97e048 100644 --- a/flow/connectors/postgres/sink_q.go +++ b/flow/connectors/postgres/sink_q.go @@ -84,9 +84,17 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( } func (stream RecordStreamSink) CopyInto(ctx context.Context, _ *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) { - return tx.CopyFrom(ctx, table, stream.GetColumnNames(), model.NewQRecordCopyFromSource(stream.QRecordStream)) + columnNames, err := stream.GetColumnNames() + if err != nil { + return 0, err + } + return tx.CopyFrom(ctx, table, columnNames, model.NewQRecordCopyFromSource(stream.QRecordStream)) } -func (stream RecordStreamSink) GetColumnNames() []string { - return stream.Schema().GetColumnNames() +func (stream RecordStreamSink) GetColumnNames() ([]string, error) { + schema, err := stream.Schema() + if err != nil { + return nil, err + } + return schema.GetColumnNames(), nil } diff --git a/flow/connectors/pubsub/qrep.go b/flow/connectors/pubsub/qrep.go index 2579f773b8..5d84f6c22e 100644 --- a/flow/connectors/pubsub/qrep.go +++ b/flow/connectors/pubsub/qrep.go @@ -25,9 +25,9 @@ func (c *PubSubConnector) SyncQRepRecords( stream *model.QRecordStream, ) (int, error) { startTime := time.Now() - schema := stream.Schema() - if schema.Fields == nil { - return 0, stream.Err() + schema, err := stream.Schema() + if err != nil { + return 0, err } topiccache := topicCache{cache: make(map[string]*pubsub.Topic)} publish := make(chan publishResult, 32) diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index 8f95442f73..41e1c7d05c 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -17,9 +17,9 @@ func (c *S3Connector) SyncQRepRecords( partition *protos.QRepPartition, stream *model.QRecordStream, ) (int, error) { - schema := stream.Schema() - if schema.Fields == nil { - return 0, stream.Err() + schema, err := stream.Schema() + if err != nil { + return 0, err } dstTableName := config.DestinationTableIdentifier diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index a4bfc9b777..dcaa28911a 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -44,8 +44,8 @@ func (s *SnowflakeAvroSyncHandler) SyncRecords( tableLog := slog.String("destinationTable", s.config.DestinationTableIdentifier) dstTableName := s.config.DestinationTableIdentifier - schema := stream.Schema() - if schema.Fields == nil { + schema, err := stream.Schema() + if err != nil { return 0, stream.Err() } @@ -98,11 +98,13 @@ func (s *SnowflakeAvroSyncHandler) SyncQRepRecords( startTime := time.Now() dstTableName := config.DestinationTableIdentifier - schema := stream.Schema() + schema, err := stream.Schema() + if err != nil { + return 0, err + } s.logger.Info("sync function called and schema acquired", partitionLog) - err := s.addMissingColumns(ctx, config.Env, schema, dstTableSchema, dstTableName, partition) - if err != nil { + if err := s.addMissingColumns(ctx, config.Env, schema, dstTableSchema, dstTableName, partition); err != nil { return 0, err } diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 70aba1dae3..6357c46d94 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -129,9 +129,9 @@ func (p *peerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[string]string, ocfWriter *goavro.OCFWriter) (int64, error) { logger := shared.LoggerFromCtx(ctx) - schema := p.stream.Schema() - if schema.Fields == nil { - return 0, p.stream.Err() + schema, err := p.stream.Schema() + if err != nil { + return 0, err } avroConverter, err := model.NewQRecordAvroConverter( diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 1a91bd062c..eeca424b35 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -22,9 +22,9 @@ func NewQRecordStream(buffer int) *QRecordStream { } } -func (s *QRecordStream) Schema() qvalue.QRecordSchema { +func (s *QRecordStream) Schema() (qvalue.QRecordSchema, error) { <-s.schemaLatch - return s.schema + return s.schema, s.Err() } func (s *QRecordStream) SetSchema(schema qvalue.QRecordSchema) { diff --git a/flow/pua/stream_adapter.go b/flow/pua/stream_adapter.go index a03e68c919..9f6f3f8d3e 100644 --- a/flow/pua/stream_adapter.go +++ b/flow/pua/stream_adapter.go @@ -11,7 +11,11 @@ import ( func AttachToStream(ls *lua.LState, lfn *lua.LFunction, stream *model.QRecordStream) *model.QRecordStream { output := model.NewQRecordStream(0) go func() { - schema := stream.Schema() + schema, err := stream.Schema() + if err != nil { + output.Close(err) + return + } output.SetSchema(schema) for record := range stream.Records { row := model.NewRecordItems(len(record)) From 815b60e114156d02b92d8cf3373f8036a9bbb527 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 27 Dec 2024 17:02:12 +0000 Subject: [PATCH 3/4] fix up stream closing --- flow/connectors/postgres/qrep_query_executor.go | 5 ++--- flow/connectors/postgres/sink_q.go | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index c548c9d76f..7c2cf728fa 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -161,9 +161,7 @@ func (qe *QRepQueryExecutor) processRowsStream( record, err := qe.mapRowToQRecord(rows, fieldDescriptions) if err != nil { qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err)) - err := fmt.Errorf("failed to map row to QRecord: %w", err) - stream.Close(err) - return 0, err + return 0, fmt.Errorf("failed to map row to QRecord: %w", err) } stream.Records <- record @@ -204,6 +202,7 @@ func (qe *QRepQueryExecutor) processFetchedRows( numRows, err := qe.processRowsStream(ctx, cursorName, stream, rows, fieldDescriptions) if err != nil { + stream.Close(err) qe.logger.Error("[pg_query_executor] failed to process rows", slog.Any("error", err)) return 0, fmt.Errorf("failed to process rows: %w", err) } diff --git a/flow/connectors/postgres/sink_q.go b/flow/connectors/postgres/sink_q.go index 5c5c97e048..c4204b63c4 100644 --- a/flow/connectors/postgres/sink_q.go +++ b/flow/connectors/postgres/sink_q.go @@ -26,8 +26,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( defer shared.RollbackTx(tx, qe.logger) if qe.snapshot != "" { - _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)) - if err != nil { + if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)); err != nil { qe.logger.Error("[pg_query_executor] failed to set snapshot", slog.Any("error", err), slog.String("query", query)) err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err) From 6b217d3e9188c2d49fe8da4e5dc48fdbece793c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 27 Dec 2024 17:25:48 +0000 Subject: [PATCH 4/4] lift stream Closing out of ExecuteQueryWithTx --- .../postgres/qrep_query_executor.go | 24 ++++++++++++------- flow/connectors/postgres/sink_pg.go | 15 ++++-------- flow/connectors/postgres/sink_q.go | 12 +++------- 3 files changed, 23 insertions(+), 28 deletions(-) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 7c2cf728fa..b8c14ee2e7 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -187,7 +187,6 @@ func (qe *QRepQueryExecutor) processFetchedRows( ) (int, error) { rows, err := qe.executeQueryInTx(ctx, tx, cursorName, fetchSize) if err != nil { - stream.Close(err) qe.logger.Error("[pg_query_executor] failed to execute query in tx", slog.Any("error", err), slog.String("query", query)) return 0, fmt.Errorf("[pg_query_executor] failed to execute query in tx: %w", err) @@ -202,13 +201,11 @@ func (qe *QRepQueryExecutor) processFetchedRows( numRows, err := qe.processRowsStream(ctx, cursorName, stream, rows, fieldDescriptions) if err != nil { - stream.Close(err) qe.logger.Error("[pg_query_executor] failed to process rows", slog.Any("error", err)) return 0, fmt.Errorf("failed to process rows: %w", err) } if err := rows.Err(); err != nil { - stream.Close(err) qe.logger.Error("[pg_query_executor] row iteration failed", slog.String("query", query), slog.Any("error", err)) return 0, fmt.Errorf("[pg_query_executor] row iteration failed '%s': %w", query, err) @@ -292,10 +289,16 @@ func (qe *QRepQueryExecutor) ExecuteQueryIntoSink( }) if err != nil { qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err)) - return 0, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err) + err := fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err) + sink.Close(err) + return 0, err } - return sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...) + totalRecords, err := sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...) + if err != nil { + sink.Close(err) + } + return totalRecords, err } func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin( @@ -314,16 +317,21 @@ func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin( }) if err != nil { qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err)) - return 0, currentSnapshotXmin.Int64, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err) + err := fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err) + sink.Close(err) + return 0, currentSnapshotXmin.Int64, err } - err = tx.QueryRow(ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(¤tSnapshotXmin) - if err != nil { + if err := tx.QueryRow(ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(¤tSnapshotXmin); err != nil { qe.logger.Error("[pg_query_executor] failed to get current snapshot xmin", slog.Any("error", err)) + sink.Close(err) return 0, currentSnapshotXmin.Int64, err } totalRecordsFetched, err := sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...) + if err != nil { + sink.Close(err) + } return totalRecordsFetched, currentSnapshotXmin.Int64, err } diff --git a/flow/connectors/postgres/sink_pg.go b/flow/connectors/postgres/sink_pg.go index 000642a2da..cf62840925 100644 --- a/flow/connectors/postgres/sink_pg.go +++ b/flow/connectors/postgres/sink_pg.go @@ -55,13 +55,10 @@ func (p PgCopyWriter) ExecuteQueryWithTx( defer shared.RollbackTx(tx, qe.logger) if qe.snapshot != "" { - _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)) - if err != nil { + if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)); err != nil { qe.logger.Error("[pg_query_executor] failed to set snapshot", slog.Any("error", err), slog.String("query", query)) - err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err) - p.Close(err) - return 0, err + return 0, fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err) } } @@ -89,17 +86,13 @@ func (p PgCopyWriter) ExecuteQueryWithTx( if err != nil { qe.logger.Info("[pg_query_executor] failed to copy", slog.String("copyQuery", copyQuery), slog.Any("error", err)) - err = fmt.Errorf("[pg_query_executor] failed to copy: %w", err) - p.Close(err) - return 0, err + return 0, fmt.Errorf("[pg_query_executor] failed to copy: %w", err) } qe.logger.Info("Committing transaction") if err := tx.Commit(ctx); err != nil { qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err)) - err = fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err) - p.Close(err) - return 0, err + return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err) } totalRecordsFetched := ct.RowsAffected() diff --git a/flow/connectors/postgres/sink_q.go b/flow/connectors/postgres/sink_q.go index c4204b63c4..648280e220 100644 --- a/flow/connectors/postgres/sink_q.go +++ b/flow/connectors/postgres/sink_q.go @@ -29,9 +29,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)); err != nil { qe.logger.Error("[pg_query_executor] failed to set snapshot", slog.Any("error", err), slog.String("query", query)) - err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err) - stream.Close(err) - return 0, err + return 0, fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err) } } @@ -46,9 +44,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( if _, err := tx.Exec(ctx, cursorQuery, args...); err != nil { qe.logger.Info("[pg_query_executor] failed to declare cursor", slog.String("cursorQuery", cursorQuery), slog.Any("error", err)) - err = fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err) - stream.Close(err) - return 0, err + return 0, fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err) } qe.logger.Info(fmt.Sprintf("[pg_query_executor] declared cursor '%s' for query '%s'", cursorName, query)) @@ -72,9 +68,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( qe.logger.Info("Committing transaction") if err := tx.Commit(ctx); err != nil { qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err)) - err = fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err) - stream.Close(err) - return 0, err + return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err) } qe.logger.Info(fmt.Sprintf("[pg_query_executor] committed transaction for query '%s', rows = %d",