Skip to content

Commit

Permalink
QRecordSchema: close schemaLatch in Close
Browse files Browse the repository at this point in the history
goroutines were leaking waiting on schemaLatch,
if provider hit error then schemaLatch could be left indefinitely open
  • Loading branch information
serprex committed Dec 27, 2024
1 parent 1305dec commit bc56003
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 6 deletions.
3 changes: 3 additions & 0 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ func (c *BigQueryConnector) SyncQRepRecords(
// Ensure the destination table is available.
destTable := config.DestinationTableIdentifier
srcSchema := stream.Schema()
if srcSchema.Fields == nil {
return 0, stream.Err()
}

tblMetadata, err := c.replayTableSchemaDeltasQRep(ctx, config, partition, srcSchema)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords(
dstTableName := s.config.DestinationTableIdentifier

schema := stream.Schema()
if schema.Fields == nil {
return 0, stream.Err()
}
s.logger.Info("sync function called and schema acquired",
slog.String("dstTable", dstTableName))

Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/elasticsearch/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config *
startTime := time.Now()

schema := stream.Schema()
if schema.Fields == nil {
return 0, stream.Err()
}

var bulkIndexFatalError error
var bulkIndexErrors []error
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/kafka/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func (c *KafkaConnector) SyncQRepRecords(
startTime := time.Now()
numRecords := atomic.Int64{}
schema := stream.Schema()
if schema.Fields == nil {
return 0, stream.Err()
}

queueCtx, queueErr := context.WithCancelCause(ctx)
pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, nil, queueErr)
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ func (qe *QRepQueryExecutor) processFetchedRows(
slog.Any("error", err), slog.String("query", query))
return 0, fmt.Errorf("[pg_query_executor] failed to execute query in tx: %w", err)
}

defer rows.Close()

fieldDescriptions := rows.FieldDescriptions()
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/pubsub/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ func (c *PubSubConnector) SyncQRepRecords(
stream *model.QRecordStream,
) (int, error) {
startTime := time.Now()
numRecords := atomic.Int64{}
schema := stream.Schema()
if schema.Fields == nil {
return 0, stream.Err()
}
topiccache := topicCache{cache: make(map[string]*pubsub.Topic)}
publish := make(chan publishResult, 32)
waitChan := make(chan struct{})
numRecords := atomic.Int64{}

queueCtx, queueErr := context.WithCancelCause(ctx)
pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, &topiccache, publish, queueErr)
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ func (c *S3Connector) SyncQRepRecords(
stream *model.QRecordStream,
) (int, error) {
schema := stream.Schema()
if schema.Fields == nil {
return 0, stream.Err()
}

dstTableName := config.DestinationTableIdentifier
avroSchema, err := getAvroSchema(ctx, config.Env, dstTableName, schema)
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func (s *SnowflakeAvroSyncHandler) SyncRecords(
dstTableName := s.config.DestinationTableIdentifier

schema := stream.Schema()
if schema.Fields == nil {
return 0, stream.Err()
}

s.logger.Info("sync function called and schema acquired", tableLog)

Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ func (p *peerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error
func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[string]string, ocfWriter *goavro.OCFWriter) (int64, error) {
logger := shared.LoggerFromCtx(ctx)
schema := p.stream.Schema()
if schema.Fields == nil {
return 0, p.stream.Err()
}

avroConverter, err := model.NewQRecordAvroConverter(
ctx,
Expand Down
3 changes: 3 additions & 0 deletions flow/model/qrecord_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,7 @@ func (s *QRecordStream) Close(err error) {
s.err = err
close(s.Records)
}
if !s.schemaSet {
s.SetSchema(qvalue.QRecordSchema{})
}
}
5 changes: 1 addition & 4 deletions flow/model/qvalue/qschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ func (q QRecordSchema) EqualNames(other QRecordSchema) bool {
}

for i, field := range q.Fields {
// ignore the case of the field name convert to lower case
f1 := strings.ToLower(field.Name)
f2 := strings.ToLower(other.Fields[i].Name)
if f1 != f2 {
if !strings.EqualFold(field.Name, other.Fields[i].Name) {
return false
}
}
Expand Down

0 comments on commit bc56003

Please sign in to comment.