diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 856f66fe6..878aad598 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -41,13 +41,6 @@ type NormalizeBatchRequest struct { BatchID int64 } -type CdcState struct { - connector connectors.CDCPullConnectorCore - syncDone chan struct{} - normalize chan NormalizeBatchRequest - errGroup *errgroup.Group -} - type FlowableActivity struct { CatalogPool *pgxpool.Pool Alerter *alerting.Alerter @@ -249,40 +242,69 @@ func (a *FlowableActivity) CreateNormalizedTable( }, nil } -func (a *FlowableActivity) maintainPull( +func (a *FlowableActivity) SyncFlow( ctx context.Context, config *protos.FlowConnectionConfigs, -) (CdcState, context.Context, error) { + options *protos.SyncFlowOptions, +) error { ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) - srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, config.Env, a.CatalogPool, config.SourceName) + logger := activity.GetLogger(ctx) + + currentSyncFlowNum := atomic.Int32{} + totalRecordsSynced := atomic.Int64{} + normalizingBatchID := atomic.Int64{} + normalizeWaiting := atomic.Bool{} + syncingBatchID := atomic.Int64{} + syncWaiting := atomic.Bool{} + + shutdown := heartbeatRoutine(ctx, func() string { + // Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch + sBatchID := syncingBatchID.Load() + nBatchID := normalizingBatchID.Load() + var sWaiting, nWaiting string + if syncWaiting.Load() { + sWaiting = " (W)" + } + if normalizeWaiting.Load() { + nWaiting = " (W)" + } + return fmt.Sprintf( + "currentSyncFlowNum:%d, totalRecordsSynced:%d, syncingBatchID:%d%s, normalizingBatchID:%d%s", + currentSyncFlowNum.Load(), totalRecordsSynced.Load(), + sBatchID, sWaiting, nBatchID, nWaiting, + ) + }) + defer shutdown() + + srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, config.Env, a.CatalogPool, config.SourceName) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return CdcState{}, nil, err + return err } if err := srcConn.SetupReplConn(ctx); err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) connectors.CloseConnector(ctx, srcConn) - return CdcState{}, nil, err + return err } normalizeBufferSize, err := peerdbenv.PeerDBNormalizeChannelBufferSize(ctx, config.Env) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) connectors.CloseConnector(ctx, srcConn) - return CdcState{}, nil, err + return err } // syncDone will be closed by SyncFlow, // whereas normalizeDone will be closed by normalizing goroutine // Wait on normalizeDone at end to not interrupt final normalize syncDone := make(chan struct{}) - normalize := make(chan NormalizeBatchRequest, normalizeBufferSize) + normRequests := make(chan NormalizeBatchRequest, normalizeBufferSize) group, groupCtx := errgroup.WithContext(ctx) group.Go(func() error { // returning error signals sync to stop, normalize can recover connections without interrupting sync, so never return error - a.normalizeLoop(groupCtx, config, syncDone, normalize) + a.normalizeLoop(groupCtx, config, syncDone, normRequests, &normalizingBatchID, &normalizeWaiting) return nil }) group.Go(func() error { @@ -294,47 +316,19 @@ func (a *FlowableActivity) maintainPull( return nil }) - return CdcState{ - connector: srcConn, - syncDone: syncDone, - normalize: normalize, - errGroup: group, - }, groupCtx, nil -} - -func (a *FlowableActivity) SyncFlow( - ctx context.Context, - config *protos.FlowConnectionConfigs, - options *protos.SyncFlowOptions, -) error { - ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) - logger := activity.GetLogger(ctx) - - shutdown := heartbeatRoutine(ctx, func() string { - return "syncing batch" - }) - defer shutdown() - - cdcState, groupCtx, err := a.maintainPull(ctx, config) - if err != nil { - logger.Error("MaintainPull failed", slog.Any("error", err)) - return err - } - - currentSyncFlowNum := int32(0) - totalRecordsSynced := int64(0) - for groupCtx.Err() == nil { - currentSyncFlowNum += 1 - logger.Info("executing sync flow", slog.Int("count", int(currentSyncFlowNum))) + logger.Info("executing sync flow", slog.Int64("count", int64(currentSyncFlowNum.Add(1)))) - var numRecordsSynced int64 + var syncResponse *model.SyncResponse var syncErr error if config.System == protos.TypeSystem_Q { - numRecordsSynced, syncErr = a.syncRecords(groupCtx, config, options, cdcState) + syncResponse, syncErr = a.syncRecords(groupCtx, config, options, srcConn.(connectors.CDCPullConnector), + normRequests, &syncingBatchID, &syncWaiting) } else { - numRecordsSynced, syncErr = a.syncPg(groupCtx, config, options, cdcState) + syncResponse, syncErr = a.syncPg(groupCtx, config, options, srcConn.(connectors.CDCPullPgConnector), + normRequests, &syncingBatchID, &syncWaiting) } + syncWaiting.Store(true) if syncErr != nil { if groupCtx.Err() != nil { @@ -342,21 +336,21 @@ func (a *FlowableActivity) SyncFlow( break } logger.Error("failed to sync records", slog.Any("error", syncErr)) - close(cdcState.syncDone) - return errors.Join(syncErr, cdcState.errGroup.Wait()) + close(syncDone) + return errors.Join(syncErr, group.Wait()) } else { - totalRecordsSynced += numRecordsSynced - logger.Info("synced records", - slog.Int64("numRecordsSynced", numRecordsSynced), slog.Int64("totalRecordsSynced", totalRecordsSynced)) + totalRecordsSynced.Add(syncResponse.NumRecordsSynced) + logger.Info("synced records", slog.Int64("numRecordsSynced", syncResponse.NumRecordsSynced), + slog.Int64("totalRecordsSynced", totalRecordsSynced.Load())) - if options.NumberOfSyncs > 0 && currentSyncFlowNum >= options.NumberOfSyncs { + if options.NumberOfSyncs > 0 && currentSyncFlowNum.Load() >= options.NumberOfSyncs { break } } } - close(cdcState.syncDone) - waitErr := cdcState.errGroup.Wait() + close(syncDone) + waitErr := group.Wait() if err := ctx.Err(); err != nil { logger.Info("sync canceled", slog.Any("error", err)) return err @@ -371,8 +365,11 @@ func (a *FlowableActivity) syncRecords( ctx context.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, - cdcState CdcState, -) (int64, error) { + srcConn connectors.CDCPullConnector, + normRequests chan<- NormalizeBatchRequest, + syncingBatchID *atomic.Int64, + syncWaiting *atomic.Bool, +) (*model.SyncResponse, error) { var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error) if config.Script != "" { var onErr context.CancelCauseFunc @@ -403,7 +400,8 @@ func (a *FlowableActivity) syncRecords( return stream, nil } } - return syncCore(ctx, a, config, options, cdcState, adaptStream, + return syncCore(ctx, a, config, options, srcConn, normRequests, + syncingBatchID, syncWaiting, adaptStream, connectors.CDCPullConnector.PullRecords, connectors.CDCSyncConnector.SyncRecords) } @@ -412,9 +410,13 @@ func (a *FlowableActivity) syncPg( ctx context.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, - cdcState CdcState, -) (int64, error) { - return syncCore(ctx, a, config, options, cdcState, nil, + srcConn connectors.CDCPullPgConnector, + normRequests chan<- NormalizeBatchRequest, + syncingBatchID *atomic.Int64, + syncWaiting *atomic.Bool, +) (*model.SyncResponse, error) { + return syncCore(ctx, a, config, options, srcConn, normRequests, + syncingBatchID, syncWaiting, nil, connectors.CDCPullPgConnector.PullPg, connectors.CDCSyncPgConnector.SyncPg) } diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 8b6e0c7f7..b4ecd39d7 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -104,11 +104,14 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon a *FlowableActivity, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, - cdcState CdcState, + srcConn TPull, + normRequests chan<- NormalizeBatchRequest, + syncingBatchID *atomic.Int64, + syncWaiting *atomic.Bool, adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error), pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error, sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error), -) (int64, error) { +) (*model.SyncResponse, error) { flowName := config.FlowJobName ctx = context.WithValue(ctx, shared.FlowNameKey, flowName) logger := activity.GetLogger(ctx) @@ -118,10 +121,8 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - srcConn := cdcState.connector.(TPull) - normChan := cdcState.normalize if err := srcConn.ConnectionActive(ctx); err != nil { - return 0, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) + return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) } batchSize := options.BatchSize @@ -140,7 +141,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon }() if err != nil { a.Alerter.LogFlowError(ctx, flowName, err) - return 0, err + return nil, err } logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset)) @@ -149,23 +150,24 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon channelBufferSize, err := peerdbenv.PeerDBCDCChannelBufferSize(ctx, config.Env) if err != nil { - return 0, fmt.Errorf("failed to get CDC channel buffer size: %w", err) + return nil, fmt.Errorf("failed to get CDC channel buffer size: %w", err) } recordBatchPull := model.NewCDCStream[Items](channelBufferSize) recordBatchSync := recordBatchPull if adaptStream != nil { var err error if recordBatchSync, err = adaptStream(recordBatchPull); err != nil { - return 0, err + return nil, err } } tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, flowName) if err != nil { - return 0, err + return nil, err } startTime := time.Now() + syncWaiting.Store(false) errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{ @@ -198,24 +200,25 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon a.Alerter.LogFlowError(ctx, flowName, err) } if temporal.IsApplicationError(err) { - return 0, err + return nil, err } else { - return 0, fmt.Errorf("failed in pull records when: %w", err) + return nil, fmt.Errorf("failed in pull records when: %w", err) } } logger.Info("no records to push") + syncWaiting.Store(true) dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName) if err != nil { - return 0, fmt.Errorf("failed to recreate destination connector: %w", err) + return nil, fmt.Errorf("failed to recreate destination connector: %w", err) } defer connectors.CloseConnector(ctx, dstConn) if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, recordBatchSync.SchemaDeltas); err != nil { - return 0, fmt.Errorf("failed to sync schema: %w", err) + return nil, fmt.Errorf("failed to sync schema: %w", err) } - return -1, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas) + return nil, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas) } var syncStartTime time.Time @@ -232,6 +235,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon return err } syncBatchID += 1 + syncingBatchID.Store(syncBatchID) logger.Info("begin pulling records for batch", slog.Int64("SyncBatchID", syncBatchID)) if err := monitoring.AddCDCBatchForFlow(errCtx, a.CatalogPool, flowName, monitoring.CDCBatchInfo{ @@ -271,11 +275,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon a.Alerter.LogFlowError(ctx, flowName, err) } if temporal.IsApplicationError(err) { - return 0, err + return nil, err } else { - return 0, fmt.Errorf("failed to pull records: %w", err) + return nil, fmt.Errorf("failed to pull records: %w", err) } } + syncWaiting.Store(true) syncDuration := time.Since(syncStartTime) lastCheckpoint := recordBatchSync.GetLastCheckpoint() @@ -285,18 +290,18 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(res.NumRecordsSynced), lastCheckpoint, ); err != nil { a.Alerter.LogFlowError(ctx, flowName, err) - return 0, err + return nil, err } if err := monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint); err != nil { a.Alerter.LogFlowError(ctx, flowName, err) - return 0, err + return nil, err } if res.TableNameRowsMapping != nil { if err := monitoring.AddCDCBatchTablesForFlow( ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, res.TableNameRowsMapping, ); err != nil { - return 0, err + return nil, err } } @@ -315,33 +320,33 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil { - return 0, err + return nil, err } if recordBatchSync.NeedsNormalize() { parallel, err := peerdbenv.PeerDBEnableParallelSyncNormalize(ctx, config.Env) if err != nil { - return 0, err + return nil, err } var done chan struct{} if !parallel { done = make(chan struct{}) } select { - case normChan <- NormalizeBatchRequest{BatchID: res.CurrentSyncBatchID, Done: done}: + case normRequests <- NormalizeBatchRequest{BatchID: res.CurrentSyncBatchID, Done: done}: case <-ctx.Done(): - return 0, nil + return res, nil } if done != nil { select { case <-done: case <-ctx.Done(): - return 0, nil + return res, nil } } } - return res.NumRecordsSynced, nil + return res, nil } func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) { @@ -596,7 +601,7 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn } func (a *FlowableActivity) maintainReplConn( - ctx context.Context, flowName string, srcConn connectors.CDCPullConnector, syncDone <-chan struct{}, + ctx context.Context, flowName string, srcConn connectors.CDCPullConnectorCore, syncDone <-chan struct{}, ) error { ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() @@ -621,21 +626,26 @@ func (a *FlowableActivity) normalizeLoop( ctx context.Context, config *protos.FlowConnectionConfigs, syncDone <-chan struct{}, - normalize <-chan NormalizeBatchRequest, + normalizeRequests <-chan NormalizeBatchRequest, + normalizingBatchID *atomic.Int64, + normalizeWaiting *atomic.Bool, ) { logger := activity.GetLogger(ctx) for { + normalizeWaiting.Store(true) select { - case req := <-normalize: + case req := <-normalizeRequests: + normalizeWaiting.Store(false) retryLoop: for { + normalizingBatchID.Store(req.BatchID) if err := a.startNormalize(ctx, config, req.BatchID); err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) for { // update req to latest normalize request & retry select { - case req = <-normalize: + case req = <-normalizeRequests: case <-syncDone: logger.Info("[normalize-loop] syncDone closed before retry") return @@ -662,9 +672,11 @@ func (a *FlowableActivity) normalizeLoop( break } case <-syncDone: + normalizeWaiting.Store(false) logger.Info("[normalize-loop] syncDone closed") return case <-ctx.Done(): + normalizeWaiting.Store(false) logger.Info("[normalize-loop] context closed") return }