diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index b554a3e0a4..a6d5c35d7a 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -23,9 +23,9 @@ func (c *BigQueryConnector) SyncQRepRecords( ) (int, error) { // Ensure the destination table is available. destTable := config.DestinationTableIdentifier - srcSchema := stream.Schema() - if srcSchema.Fields == nil { - return 0, stream.Err() + srcSchema, err := stream.Schema() + if err != nil { + return 0, err } tblMetadata, err := c.replayTableSchemaDeltasQRep(ctx, config, partition, srcSchema) @@ -83,8 +83,9 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep( } } - err = c.ReplayTableSchemaDeltas(ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta}) - if err != nil { + if err := c.ReplayTableSchemaDeltas( + ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta}, + ); err != nil { return nil, fmt.Errorf("failed to add columns to destination table: %w", err) } dstTableMetadata, err = bqTable.Metadata(ctx) diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 9fa3fb5917..030de782f4 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -67,9 +67,9 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords( ) (int, error) { dstTableName := s.config.DestinationTableIdentifier - schema := stream.Schema() - if schema.Fields == nil { - return 0, stream.Err() + schema, err := stream.Schema() + if err != nil { + return 0, err } s.logger.Info("sync function called and schema acquired", slog.String("dstTable", dstTableName)) @@ -109,7 +109,12 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( stagingPath := s.credsProvider.BucketPath startTime := time.Now() - avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, stream.Schema()) + schema, err := stream.Schema() + if err != nil { + return 0, err + } + + avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, schema) if err != nil { return 0, err } diff --git a/flow/connectors/elasticsearch/qrep.go b/flow/connectors/elasticsearch/qrep.go index f3955027be..1206438d5f 100644 --- a/flow/connectors/elasticsearch/qrep.go +++ b/flow/connectors/elasticsearch/qrep.go @@ -42,9 +42,9 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * ) (int, error) { startTime := time.Now() - schema := stream.Schema() - if schema.Fields == nil { - return 0, stream.Err() + schema, err := stream.Schema() + if err != nil { + return 0, err } var bulkIndexFatalError error diff --git a/flow/connectors/kafka/qrep.go b/flow/connectors/kafka/qrep.go index 5cd384588d..c6b60a564d 100644 --- a/flow/connectors/kafka/qrep.go +++ b/flow/connectors/kafka/qrep.go @@ -27,9 +27,9 @@ func (c *KafkaConnector) SyncQRepRecords( ) (int, error) { startTime := time.Now() numRecords := atomic.Int64{} - schema := stream.Schema() - if schema.Fields == nil { - return 0, stream.Err() + schema, err := stream.Schema() + if err != nil { + return 0, err } queueCtx, queueErr := context.WithCancelCause(ctx) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 1cd2cd5952..05aa492baa 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -32,7 +32,7 @@ type QRepPullSink interface { } type QRepSyncSink interface { - GetColumnNames() []string + GetColumnNames() ([]string, error) CopyInto(context.Context, *PostgresConnector, pgx.Tx, pgx.Identifier) (int64, error) } @@ -550,7 +550,10 @@ func syncQRepRecords( upsertMatchCols[col] = struct{}{} } - columnNames := sink.GetColumnNames() + columnNames, err := sink.GetColumnNames() + if err != nil { + return -1, fmt.Errorf("faild to get column names: %w", err) + } setClauseArray := make([]string, 0, len(upsertMatchColsList)+1) selectStrArray := make([]string, 0, len(columnNames)) for _, col := range columnNames { @@ -578,8 +581,7 @@ func syncQRepRecords( setClause, ) c.logger.Info("Performing upsert operation", slog.String("upsertStmt", upsertStmt), syncLog) - _, err := tx.Exec(ctx, upsertStmt) - if err != nil { + if _, err := tx.Exec(ctx, upsertStmt); err != nil { return -1, fmt.Errorf("failed to perform upsert operation: %w", err) } } diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 4c99748727..c548c9d76f 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -224,7 +224,8 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery( args ...interface{}, ) (*model.QRecordBatch, error) { stream := model.NewQRecordStream(1024) - errors := make(chan error, 1) + errors := make(chan struct{}) + var errorsError error qe.logger.Info("Executing and processing query", slog.String("query", query)) // must wait on errors to close before returning to maintain qe.conn exclusion @@ -233,23 +234,28 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery( _, err := qe.ExecuteAndProcessQueryStream(ctx, stream, query, args...) if err != nil { qe.logger.Error("[pg_query_executor] failed to execute and process query stream", slog.Any("error", err)) - errors <- err + errorsError = err } }() select { - case err := <-errors: - return nil, err + case <-errors: + return nil, errorsError case <-stream.SchemaChan(): + schema, err := stream.Schema() + if err != nil { + return nil, err + } batch := &model.QRecordBatch{ - Schema: stream.Schema(), + Schema: schema, Records: nil, } for record := range stream.Records { batch.Records = append(batch.Records, record) } - if err := <-errors; err != nil { - return nil, err + <-errors + if errorsError != nil { + return nil, errorsError } if err := stream.Err(); err != nil { return nil, fmt.Errorf("[pg] failed to get record from stream: %w", err) diff --git a/flow/connectors/postgres/sink_pg.go b/flow/connectors/postgres/sink_pg.go index 8d9d06e747..000642a2da 100644 --- a/flow/connectors/postgres/sink_pg.go +++ b/flow/connectors/postgres/sink_pg.go @@ -15,6 +15,7 @@ import ( type PgCopyShared struct { schemaLatch chan struct{} + err error schema []string schemaSet bool } @@ -109,15 +110,20 @@ func (p PgCopyWriter) ExecuteQueryWithTx( func (p PgCopyWriter) Close(err error) { p.PipeWriter.CloseWithError(err) + p.schema.err = err + p.SetSchema(nil) } -func (p PgCopyReader) GetColumnNames() []string { +func (p PgCopyReader) GetColumnNames() ([]string, error) { <-p.schema.schemaLatch - return p.schema.schema + return p.schema.schema, p.schema.err } func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) { - cols := p.GetColumnNames() + cols, err := p.GetColumnNames() + if err != nil { + return 0, err + } quotedCols := make([]string, 0, len(cols)) for _, col := range cols { quotedCols = append(quotedCols, QuoteIdentifier(col)) diff --git a/flow/connectors/postgres/sink_q.go b/flow/connectors/postgres/sink_q.go index 21a39627be..5c5c97e048 100644 --- a/flow/connectors/postgres/sink_q.go +++ b/flow/connectors/postgres/sink_q.go @@ -84,9 +84,17 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( } func (stream RecordStreamSink) CopyInto(ctx context.Context, _ *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) { - return tx.CopyFrom(ctx, table, stream.GetColumnNames(), model.NewQRecordCopyFromSource(stream.QRecordStream)) + columnNames, err := stream.GetColumnNames() + if err != nil { + return 0, err + } + return tx.CopyFrom(ctx, table, columnNames, model.NewQRecordCopyFromSource(stream.QRecordStream)) } -func (stream RecordStreamSink) GetColumnNames() []string { - return stream.Schema().GetColumnNames() +func (stream RecordStreamSink) GetColumnNames() ([]string, error) { + schema, err := stream.Schema() + if err != nil { + return nil, err + } + return schema.GetColumnNames(), nil } diff --git a/flow/connectors/pubsub/qrep.go b/flow/connectors/pubsub/qrep.go index 2579f773b8..5d84f6c22e 100644 --- a/flow/connectors/pubsub/qrep.go +++ b/flow/connectors/pubsub/qrep.go @@ -25,9 +25,9 @@ func (c *PubSubConnector) SyncQRepRecords( stream *model.QRecordStream, ) (int, error) { startTime := time.Now() - schema := stream.Schema() - if schema.Fields == nil { - return 0, stream.Err() + schema, err := stream.Schema() + if err != nil { + return 0, err } topiccache := topicCache{cache: make(map[string]*pubsub.Topic)} publish := make(chan publishResult, 32) diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index 8f95442f73..41e1c7d05c 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -17,9 +17,9 @@ func (c *S3Connector) SyncQRepRecords( partition *protos.QRepPartition, stream *model.QRecordStream, ) (int, error) { - schema := stream.Schema() - if schema.Fields == nil { - return 0, stream.Err() + schema, err := stream.Schema() + if err != nil { + return 0, err } dstTableName := config.DestinationTableIdentifier diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index a4bfc9b777..dcaa28911a 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -44,8 +44,8 @@ func (s *SnowflakeAvroSyncHandler) SyncRecords( tableLog := slog.String("destinationTable", s.config.DestinationTableIdentifier) dstTableName := s.config.DestinationTableIdentifier - schema := stream.Schema() - if schema.Fields == nil { + schema, err := stream.Schema() + if err != nil { return 0, stream.Err() } @@ -98,11 +98,13 @@ func (s *SnowflakeAvroSyncHandler) SyncQRepRecords( startTime := time.Now() dstTableName := config.DestinationTableIdentifier - schema := stream.Schema() + schema, err := stream.Schema() + if err != nil { + return 0, err + } s.logger.Info("sync function called and schema acquired", partitionLog) - err := s.addMissingColumns(ctx, config.Env, schema, dstTableSchema, dstTableName, partition) - if err != nil { + if err := s.addMissingColumns(ctx, config.Env, schema, dstTableSchema, dstTableName, partition); err != nil { return 0, err } diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 70aba1dae3..6357c46d94 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -129,9 +129,9 @@ func (p *peerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[string]string, ocfWriter *goavro.OCFWriter) (int64, error) { logger := shared.LoggerFromCtx(ctx) - schema := p.stream.Schema() - if schema.Fields == nil { - return 0, p.stream.Err() + schema, err := p.stream.Schema() + if err != nil { + return 0, err } avroConverter, err := model.NewQRecordAvroConverter( diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 1a91bd062c..eeca424b35 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -22,9 +22,9 @@ func NewQRecordStream(buffer int) *QRecordStream { } } -func (s *QRecordStream) Schema() qvalue.QRecordSchema { +func (s *QRecordStream) Schema() (qvalue.QRecordSchema, error) { <-s.schemaLatch - return s.schema + return s.schema, s.Err() } func (s *QRecordStream) SetSchema(schema qvalue.QRecordSchema) { diff --git a/flow/pua/stream_adapter.go b/flow/pua/stream_adapter.go index a03e68c919..6158abd693 100644 --- a/flow/pua/stream_adapter.go +++ b/flow/pua/stream_adapter.go @@ -6,12 +6,14 @@ import ( lua "github.com/yuin/gopher-lua" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" ) func AttachToStream(ls *lua.LState, lfn *lua.LFunction, stream *model.QRecordStream) *model.QRecordStream { output := model.NewQRecordStream(0) go func() { - schema := stream.Schema() + var schema qvalue.QRecordSchema + schema, _ = stream.Schema() output.SetSchema(schema) for record := range stream.Records { row := model.NewRecordItems(len(record))