diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index ca4223434a..e867876fb1 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -41,13 +41,6 @@ type NormalizeBatchRequest struct { BatchID int64 } -type CdcState struct { - connector connectors.CDCPullConnectorCore - syncDone chan struct{} - normalize chan NormalizeBatchRequest - errGroup *errgroup.Group -} - type FlowableActivity struct { CatalogPool *pgxpool.Pool Alerter *alerting.Alerter @@ -249,40 +242,67 @@ func (a *FlowableActivity) CreateNormalizedTable( }, nil } -func (a *FlowableActivity) maintainPull( +func (a *FlowableActivity) SyncFlow( ctx context.Context, config *protos.FlowConnectionConfigs, -) (CdcState, context.Context, error) { + options *protos.SyncFlowOptions, +) error { ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) - srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, config.Env, a.CatalogPool, config.SourceName) + logger := activity.GetLogger(ctx) + + var currentSyncFlowNum atomic.Int32 + var totalRecordsSynced atomic.Int64 + var normalizingBatchID atomic.Int64 + var normalizeWaiting atomic.Bool + var syncingBatchID atomic.Int64 + var syncState atomic.Pointer[string] + syncState.Store(shared.Ptr("setup")) + + shutdown := heartbeatRoutine(ctx, func() string { + // Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch + sBatchID := syncingBatchID.Load() + nBatchID := normalizingBatchID.Load() + var nWaiting string + if normalizeWaiting.Load() { + nWaiting = " (W)" + } + return fmt.Sprintf( + "currentSyncFlowNum:%d, totalRecordsSynced:%d, syncingBatchID:%d (%s), normalizingBatchID:%d%s", + currentSyncFlowNum.Load(), totalRecordsSynced.Load(), + sBatchID, *syncState.Load(), nBatchID, nWaiting, + ) + }) + defer shutdown() + + srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, config.Env, a.CatalogPool, config.SourceName) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return CdcState{}, nil, err + return err } if err := srcConn.SetupReplConn(ctx); err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) connectors.CloseConnector(ctx, srcConn) - return CdcState{}, nil, err + return err } normalizeBufferSize, err := peerdbenv.PeerDBNormalizeChannelBufferSize(ctx, config.Env) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) connectors.CloseConnector(ctx, srcConn) - return CdcState{}, nil, err + return err } // syncDone will be closed by SyncFlow, // whereas normalizeDone will be closed by normalizing goroutine // Wait on normalizeDone at end to not interrupt final normalize syncDone := make(chan struct{}) - normalize := make(chan NormalizeBatchRequest, normalizeBufferSize) + normRequests := make(chan NormalizeBatchRequest, normalizeBufferSize) group, groupCtx := errgroup.WithContext(ctx) group.Go(func() error { // returning error signals sync to stop, normalize can recover connections without interrupting sync, so never return error - a.normalizeLoop(groupCtx, config, syncDone, normalize) + a.normalizeLoop(groupCtx, logger, config, syncDone, normRequests, &normalizingBatchID, &normalizeWaiting) return nil }) group.Go(func() error { @@ -294,41 +314,17 @@ func (a *FlowableActivity) maintainPull( return nil }) - return CdcState{ - connector: srcConn, - syncDone: syncDone, - normalize: normalize, - errGroup: group, - }, groupCtx, nil -} - -func (a *FlowableActivity) SyncFlow( - ctx context.Context, - config *protos.FlowConnectionConfigs, - options *protos.SyncFlowOptions, -) error { - ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) - logger := activity.GetLogger(ctx) - - cdcState, groupCtx, err := a.maintainPull(ctx, config) - if err != nil { - logger.Error("MaintainPull failed", slog.Any("error", err)) - return err - } - - currentSyncFlowNum := int32(0) - totalRecordsSynced := int64(0) - for groupCtx.Err() == nil { - currentSyncFlowNum += 1 - logger.Info("executing sync flow", slog.Int("count", int(currentSyncFlowNum))) + logger.Info("executing sync flow", slog.Int64("count", int64(currentSyncFlowNum.Add(1)))) - var numRecordsSynced int64 + var syncResponse *model.SyncResponse var syncErr error if config.System == protos.TypeSystem_Q { - numRecordsSynced, syncErr = a.SyncRecords(groupCtx, config, options, cdcState) + syncResponse, syncErr = a.syncRecords(groupCtx, config, options, srcConn.(connectors.CDCPullConnector), + normRequests, &syncingBatchID, &syncState) } else { - numRecordsSynced, syncErr = a.SyncPg(groupCtx, config, options, cdcState) + syncResponse, syncErr = a.syncPg(groupCtx, config, options, srcConn.(connectors.CDCPullPgConnector), + normRequests, &syncingBatchID, &syncState) } if syncErr != nil { @@ -337,21 +333,23 @@ func (a *FlowableActivity) SyncFlow( break } logger.Error("failed to sync records", slog.Any("error", syncErr)) - close(cdcState.syncDone) - return errors.Join(syncErr, cdcState.errGroup.Wait()) + syncState.Store(shared.Ptr("cleanup")) + close(syncDone) + return errors.Join(syncErr, group.Wait()) } else { - totalRecordsSynced += numRecordsSynced - logger.Info("synced records", - slog.Int64("numRecordsSynced", numRecordsSynced), slog.Int64("totalRecordsSynced", totalRecordsSynced)) + totalRecordsSynced.Add(syncResponse.NumRecordsSynced) + logger.Info("synced records", slog.Int64("numRecordsSynced", syncResponse.NumRecordsSynced), + slog.Int64("totalRecordsSynced", totalRecordsSynced.Load())) - if options.NumberOfSyncs > 0 && currentSyncFlowNum >= options.NumberOfSyncs { + if options.NumberOfSyncs > 0 && currentSyncFlowNum.Load() >= options.NumberOfSyncs { break } } } - close(cdcState.syncDone) - waitErr := cdcState.errGroup.Wait() + syncState.Store(shared.Ptr("cleanup")) + close(syncDone) + waitErr := group.Wait() if err := ctx.Err(); err != nil { logger.Info("sync canceled", slog.Any("error", err)) return err @@ -362,12 +360,15 @@ func (a *FlowableActivity) SyncFlow( return nil } -func (a *FlowableActivity) SyncRecords( +func (a *FlowableActivity) syncRecords( ctx context.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, - cdcState CdcState, -) (int64, error) { + srcConn connectors.CDCPullConnector, + normRequests chan<- NormalizeBatchRequest, + syncingBatchID *atomic.Int64, + syncWaiting *atomic.Pointer[string], +) (*model.SyncResponse, error) { var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error) if config.Script != "" { var onErr context.CancelCauseFunc @@ -398,23 +399,28 @@ func (a *FlowableActivity) SyncRecords( return stream, nil } } - return syncCore(ctx, a, config, options, cdcState, adaptStream, + return syncCore(ctx, a, config, options, srcConn, normRequests, + syncingBatchID, syncWaiting, adaptStream, connectors.CDCPullConnector.PullRecords, connectors.CDCSyncConnector.SyncRecords) } -func (a *FlowableActivity) SyncPg( +func (a *FlowableActivity) syncPg( ctx context.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, - cdcState CdcState, -) (int64, error) { - return syncCore(ctx, a, config, options, cdcState, nil, + srcConn connectors.CDCPullPgConnector, + normRequests chan<- NormalizeBatchRequest, + syncingBatchID *atomic.Int64, + syncWaiting *atomic.Pointer[string], +) (*model.SyncResponse, error) { + return syncCore(ctx, a, config, options, srcConn, normRequests, + syncingBatchID, syncWaiting, nil, connectors.CDCPullPgConnector.PullPg, connectors.CDCSyncPgConnector.SyncPg) } -func (a *FlowableActivity) StartNormalize( +func (a *FlowableActivity) startNormalize( ctx context.Context, config *protos.FlowConnectionConfigs, batchID int64, @@ -435,11 +441,6 @@ func (a *FlowableActivity) StartNormalize( } defer connectors.CloseConnector(ctx, dstConn) - shutdown := heartbeatRoutine(ctx, func() string { - return "normalizing records from batch for job" - }) - defer shutdown() - tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName) if err != nil { return fmt.Errorf("failed to get table name schema mapping: %w", err) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 267f0e0f80..6aee4a7e07 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -104,28 +104,25 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon a *FlowableActivity, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, - cdcState CdcState, + srcConn TPull, + normRequests chan<- NormalizeBatchRequest, + syncingBatchID *atomic.Int64, + syncState *atomic.Pointer[string], adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error), pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error, sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error), -) (int64, error) { +) (*model.SyncResponse, error) { flowName := config.FlowJobName ctx = context.WithValue(ctx, shared.FlowNameKey, flowName) logger := activity.GetLogger(ctx) - shutdown := heartbeatRoutine(ctx, func() string { - return "transferring records for job" - }) - defer shutdown() tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings)) for _, v := range options.TableMappings { tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - srcConn := cdcState.connector.(TPull) - normChan := cdcState.normalize if err := srcConn.ConnectionActive(ctx); err != nil { - return 0, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) + return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) } batchSize := options.BatchSize @@ -144,7 +141,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon }() if err != nil { a.Alerter.LogFlowError(ctx, flowName, err) - return 0, err + return nil, err } logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset)) @@ -153,23 +150,24 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon channelBufferSize, err := peerdbenv.PeerDBCDCChannelBufferSize(ctx, config.Env) if err != nil { - return 0, fmt.Errorf("failed to get CDC channel buffer size: %w", err) + return nil, fmt.Errorf("failed to get CDC channel buffer size: %w", err) } recordBatchPull := model.NewCDCStream[Items](channelBufferSize) recordBatchSync := recordBatchPull if adaptStream != nil { var err error if recordBatchSync, err = adaptStream(recordBatchPull); err != nil { - return 0, err + return nil, err } } tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, flowName) if err != nil { - return 0, err + return nil, err } startTime := time.Now() + syncState.Store(shared.Ptr("syncing")) errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{ @@ -202,24 +200,25 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon a.Alerter.LogFlowError(ctx, flowName, err) } if temporal.IsApplicationError(err) { - return 0, err + return nil, err } else { - return 0, fmt.Errorf("failed in pull records when: %w", err) + return nil, fmt.Errorf("failed in pull records when: %w", err) } } logger.Info("no records to push") dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName) if err != nil { - return 0, fmt.Errorf("failed to recreate destination connector: %w", err) + return nil, fmt.Errorf("failed to recreate destination connector: %w", err) } defer connectors.CloseConnector(ctx, dstConn) + syncState.Store(shared.Ptr("updating schema")) if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, recordBatchSync.SchemaDeltas); err != nil { - return 0, fmt.Errorf("failed to sync schema: %w", err) + return nil, fmt.Errorf("failed to sync schema: %w", err) } - return -1, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas) + return nil, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas) } var syncStartTime time.Time @@ -236,6 +235,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon return err } syncBatchID += 1 + syncingBatchID.Store(syncBatchID) logger.Info("begin pulling records for batch", slog.Int64("SyncBatchID", syncBatchID)) if err := monitoring.AddCDCBatchForFlow(errCtx, a.CatalogPool, flowName, monitoring.CDCBatchInfo{ @@ -275,11 +275,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon a.Alerter.LogFlowError(ctx, flowName, err) } if temporal.IsApplicationError(err) { - return 0, err + return nil, err } else { - return 0, fmt.Errorf("failed to pull records: %w", err) + return nil, fmt.Errorf("failed to pull records: %w", err) } } + syncState.Store(shared.Ptr("bookkeeping")) syncDuration := time.Since(syncStartTime) lastCheckpoint := recordBatchSync.GetLastCheckpoint() @@ -289,24 +290,23 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(res.NumRecordsSynced), lastCheckpoint, ); err != nil { a.Alerter.LogFlowError(ctx, flowName, err) - return 0, err + return nil, err } if err := monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint); err != nil { a.Alerter.LogFlowError(ctx, flowName, err) - return 0, err + return nil, err } if res.TableNameRowsMapping != nil { if err := monitoring.AddCDCBatchTablesForFlow( ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, res.TableNameRowsMapping, ); err != nil { - return 0, err + return nil, err } } pushedRecordsWithCount := fmt.Sprintf("pushed %d records for batch %d in %v", res.NumRecordsSynced, res.CurrentSyncBatchID, syncDuration.Truncate(time.Second)) - activity.RecordHeartbeat(ctx, pushedRecordsWithCount) a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount) if a.OtelManager != nil { @@ -319,34 +319,36 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } } + syncState.Store(shared.Ptr("updating schema")) if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil { - return 0, err + return nil, err } if recordBatchSync.NeedsNormalize() { parallel, err := peerdbenv.PeerDBEnableParallelSyncNormalize(ctx, config.Env) if err != nil { - return 0, err + return nil, err } var done chan struct{} if !parallel { done = make(chan struct{}) } + syncState.Store(shared.Ptr("normalizing")) select { - case normChan <- NormalizeBatchRequest{BatchID: res.CurrentSyncBatchID, Done: done}: + case normRequests <- NormalizeBatchRequest{BatchID: res.CurrentSyncBatchID, Done: done}: case <-ctx.Done(): - return 0, nil + return res, nil } if done != nil { select { case <-done: case <-ctx.Done(): - return 0, nil + return res, nil } } } - return res.NumRecordsSynced, nil + return res, nil } func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) { @@ -601,7 +603,7 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn } func (a *FlowableActivity) maintainReplConn( - ctx context.Context, flowName string, srcConn connectors.CDCPullConnector, syncDone <-chan struct{}, + ctx context.Context, flowName string, srcConn connectors.CDCPullConnectorCore, syncDone <-chan struct{}, ) error { ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() @@ -609,7 +611,6 @@ func (a *FlowableActivity) maintainReplConn( for { select { case <-ticker.C: - activity.RecordHeartbeat(ctx, "keep session alive") if err := srcConn.ReplPing(ctx); err != nil { a.Alerter.LogFlowError(ctx, flowName, err) return fmt.Errorf("connection to source down: %w", err) @@ -625,23 +626,29 @@ func (a *FlowableActivity) maintainReplConn( // Suitable to be run as goroutine func (a *FlowableActivity) normalizeLoop( ctx context.Context, + logger log.Logger, config *protos.FlowConnectionConfigs, syncDone <-chan struct{}, - normalize <-chan NormalizeBatchRequest, + normalizeRequests <-chan NormalizeBatchRequest, + normalizingBatchID *atomic.Int64, + normalizeWaiting *atomic.Bool, ) { - logger := activity.GetLogger(ctx) + defer normalizeWaiting.Store(false) for { + normalizeWaiting.Store(true) select { - case req := <-normalize: + case req := <-normalizeRequests: + normalizeWaiting.Store(false) retryLoop: for { - if err := a.StartNormalize(ctx, config, req.BatchID); err != nil { + normalizingBatchID.Store(req.BatchID) + if err := a.startNormalize(ctx, config, req.BatchID); err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) for { // update req to latest normalize request & retry select { - case req = <-normalize: + case req = <-normalizeRequests: case <-syncDone: logger.Info("[normalize-loop] syncDone closed before retry") return diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 955ecfc4b5..b9d40a3243 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -34,3 +34,7 @@ const ( ) const FetchAndChannelSize = 256 * 1024 + +func Ptr[T any](x T) *T { + return &x +}