diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index ca4223434a..856f66fe69 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -310,6 +310,11 @@ func (a *FlowableActivity) SyncFlow( ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := activity.GetLogger(ctx) + shutdown := heartbeatRoutine(ctx, func() string { + return "syncing batch" + }) + defer shutdown() + cdcState, groupCtx, err := a.maintainPull(ctx, config) if err != nil { logger.Error("MaintainPull failed", slog.Any("error", err)) @@ -326,9 +331,9 @@ func (a *FlowableActivity) SyncFlow( var numRecordsSynced int64 var syncErr error if config.System == protos.TypeSystem_Q { - numRecordsSynced, syncErr = a.SyncRecords(groupCtx, config, options, cdcState) + numRecordsSynced, syncErr = a.syncRecords(groupCtx, config, options, cdcState) } else { - numRecordsSynced, syncErr = a.SyncPg(groupCtx, config, options, cdcState) + numRecordsSynced, syncErr = a.syncPg(groupCtx, config, options, cdcState) } if syncErr != nil { @@ -362,7 +367,7 @@ func (a *FlowableActivity) SyncFlow( return nil } -func (a *FlowableActivity) SyncRecords( +func (a *FlowableActivity) syncRecords( ctx context.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, @@ -403,7 +408,7 @@ func (a *FlowableActivity) SyncRecords( connectors.CDCSyncConnector.SyncRecords) } -func (a *FlowableActivity) SyncPg( +func (a *FlowableActivity) syncPg( ctx context.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, @@ -414,7 +419,7 @@ func (a *FlowableActivity) SyncPg( connectors.CDCSyncPgConnector.SyncPg) } -func (a *FlowableActivity) StartNormalize( +func (a *FlowableActivity) startNormalize( ctx context.Context, config *protos.FlowConnectionConfigs, batchID int64, @@ -435,11 +440,6 @@ func (a *FlowableActivity) StartNormalize( } defer connectors.CloseConnector(ctx, dstConn) - shutdown := heartbeatRoutine(ctx, func() string { - return "normalizing records from batch for job" - }) - defer shutdown() - tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName) if err != nil { return fmt.Errorf("failed to get table name schema mapping: %w", err) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 267f0e0f80..6efda7b434 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -112,10 +112,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon flowName := config.FlowJobName ctx = context.WithValue(ctx, shared.FlowNameKey, flowName) logger := activity.GetLogger(ctx) - shutdown := heartbeatRoutine(ctx, func() string { - return "transferring records for job" - }) - defer shutdown() tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings)) for _, v := range options.TableMappings { @@ -636,7 +632,7 @@ func (a *FlowableActivity) normalizeLoop( case req := <-normalize: retryLoop: for { - if err := a.StartNormalize(ctx, config, req.BatchID); err != nil { + if err := a.startNormalize(ctx, config, req.BatchID); err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) for { // update req to latest normalize request & retry