From bc560037f1170f0e59712f2379a17bf301ef4486 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 27 Dec 2024 14:34:15 +0000 Subject: [PATCH] QRecordSchema: close schemaLatch in Close goroutines were leaking waiting on schemaLatch, if provider hit error then schemaLatch could be left indefinitely open --- flow/connectors/bigquery/qrep.go | 3 +++ flow/connectors/clickhouse/qrep_avro_sync.go | 3 +++ flow/connectors/elasticsearch/qrep.go | 3 +++ flow/connectors/kafka/qrep.go | 3 +++ flow/connectors/postgres/qrep_query_executor.go | 1 - flow/connectors/pubsub/qrep.go | 5 ++++- flow/connectors/s3/qrep.go | 3 +++ flow/connectors/snowflake/qrep_avro_sync.go | 3 +++ flow/connectors/utils/avro/avro_writer.go | 3 +++ flow/model/qrecord_stream.go | 3 +++ flow/model/qvalue/qschema.go | 5 +---- 11 files changed, 29 insertions(+), 6 deletions(-) diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index b184cc62a9..b554a3e0a4 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -24,6 +24,9 @@ func (c *BigQueryConnector) SyncQRepRecords( // Ensure the destination table is available. destTable := config.DestinationTableIdentifier srcSchema := stream.Schema() + if srcSchema.Fields == nil { + return 0, stream.Err() + } tblMetadata, err := c.replayTableSchemaDeltasQRep(ctx, config, partition, srcSchema) if err != nil { diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 61450dd55c..9fa3fb5917 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -68,6 +68,9 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords( dstTableName := s.config.DestinationTableIdentifier schema := stream.Schema() + if schema.Fields == nil { + return 0, stream.Err() + } s.logger.Info("sync function called and schema acquired", slog.String("dstTable", dstTableName)) diff --git a/flow/connectors/elasticsearch/qrep.go b/flow/connectors/elasticsearch/qrep.go index 142c6de363..f3955027be 100644 --- a/flow/connectors/elasticsearch/qrep.go +++ b/flow/connectors/elasticsearch/qrep.go @@ -43,6 +43,9 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * startTime := time.Now() schema := stream.Schema() + if schema.Fields == nil { + return 0, stream.Err() + } var bulkIndexFatalError error var bulkIndexErrors []error diff --git a/flow/connectors/kafka/qrep.go b/flow/connectors/kafka/qrep.go index fb65d91d91..5cd384588d 100644 --- a/flow/connectors/kafka/qrep.go +++ b/flow/connectors/kafka/qrep.go @@ -28,6 +28,9 @@ func (c *KafkaConnector) SyncQRepRecords( startTime := time.Now() numRecords := atomic.Int64{} schema := stream.Schema() + if schema.Fields == nil { + return 0, stream.Err() + } queueCtx, queueErr := context.WithCancelCause(ctx) pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, nil, queueErr) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 339c54a633..4c99748727 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -194,7 +194,6 @@ func (qe *QRepQueryExecutor) processFetchedRows( slog.Any("error", err), slog.String("query", query)) return 0, fmt.Errorf("[pg_query_executor] failed to execute query in tx: %w", err) } - defer rows.Close() fieldDescriptions := rows.FieldDescriptions() diff --git a/flow/connectors/pubsub/qrep.go b/flow/connectors/pubsub/qrep.go index 297d78409a..2579f773b8 100644 --- a/flow/connectors/pubsub/qrep.go +++ b/flow/connectors/pubsub/qrep.go @@ -25,11 +25,14 @@ func (c *PubSubConnector) SyncQRepRecords( stream *model.QRecordStream, ) (int, error) { startTime := time.Now() - numRecords := atomic.Int64{} schema := stream.Schema() + if schema.Fields == nil { + return 0, stream.Err() + } topiccache := topicCache{cache: make(map[string]*pubsub.Topic)} publish := make(chan publishResult, 32) waitChan := make(chan struct{}) + numRecords := atomic.Int64{} queueCtx, queueErr := context.WithCancelCause(ctx) pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, &topiccache, publish, queueErr) diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index 968c956aab..8f95442f73 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -18,6 +18,9 @@ func (c *S3Connector) SyncQRepRecords( stream *model.QRecordStream, ) (int, error) { schema := stream.Schema() + if schema.Fields == nil { + return 0, stream.Err() + } dstTableName := config.DestinationTableIdentifier avroSchema, err := getAvroSchema(ctx, config.Env, dstTableName, schema) diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 728d393e62..a4bfc9b777 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -45,6 +45,9 @@ func (s *SnowflakeAvroSyncHandler) SyncRecords( dstTableName := s.config.DestinationTableIdentifier schema := stream.Schema() + if schema.Fields == nil { + return 0, stream.Err() + } s.logger.Info("sync function called and schema acquired", tableLog) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 75bc9f4358..70aba1dae3 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -130,6 +130,9 @@ func (p *peerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[string]string, ocfWriter *goavro.OCFWriter) (int64, error) { logger := shared.LoggerFromCtx(ctx) schema := p.stream.Schema() + if schema.Fields == nil { + return 0, p.stream.Err() + } avroConverter, err := model.NewQRecordAvroConverter( ctx, diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 054d6a42b1..1a91bd062c 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -55,4 +55,7 @@ func (s *QRecordStream) Close(err error) { s.err = err close(s.Records) } + if !s.schemaSet { + s.SetSchema(qvalue.QRecordSchema{}) + } } diff --git a/flow/model/qvalue/qschema.go b/flow/model/qvalue/qschema.go index a6632fdf5f..9e778e4f85 100644 --- a/flow/model/qvalue/qschema.go +++ b/flow/model/qvalue/qschema.go @@ -34,10 +34,7 @@ func (q QRecordSchema) EqualNames(other QRecordSchema) bool { } for i, field := range q.Fields { - // ignore the case of the field name convert to lower case - f1 := strings.ToLower(field.Name) - f2 := strings.ToLower(other.Fields[i].Name) - if f1 != f2 { + if !strings.EqualFold(field.Name, other.Fields[i].Name) { return false } }