Skip to content

Commit

Permalink
SyncFlow: top level heartbeat
Browse files Browse the repository at this point in the history
was seeing heartbeat timeouts
  • Loading branch information
serprex committed Dec 29, 2024
1 parent 1305dec commit 5970177
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 15 deletions.
20 changes: 10 additions & 10 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ func (a *FlowableActivity) SyncFlow(
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

shutdown := heartbeatRoutine(ctx, func() string {
return "syncing batch"
})
defer shutdown()

cdcState, groupCtx, err := a.maintainPull(ctx, config)
if err != nil {
logger.Error("MaintainPull failed", slog.Any("error", err))
Expand All @@ -326,9 +331,9 @@ func (a *FlowableActivity) SyncFlow(
var numRecordsSynced int64
var syncErr error
if config.System == protos.TypeSystem_Q {
numRecordsSynced, syncErr = a.SyncRecords(groupCtx, config, options, cdcState)
numRecordsSynced, syncErr = a.syncRecords(groupCtx, config, options, cdcState)
} else {
numRecordsSynced, syncErr = a.SyncPg(groupCtx, config, options, cdcState)
numRecordsSynced, syncErr = a.syncPg(groupCtx, config, options, cdcState)
}

if syncErr != nil {
Expand Down Expand Up @@ -362,7 +367,7 @@ func (a *FlowableActivity) SyncFlow(
return nil
}

func (a *FlowableActivity) SyncRecords(
func (a *FlowableActivity) syncRecords(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
Expand Down Expand Up @@ -403,7 +408,7 @@ func (a *FlowableActivity) SyncRecords(
connectors.CDCSyncConnector.SyncRecords)
}

func (a *FlowableActivity) SyncPg(
func (a *FlowableActivity) syncPg(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
Expand All @@ -414,7 +419,7 @@ func (a *FlowableActivity) SyncPg(
connectors.CDCSyncPgConnector.SyncPg)
}

func (a *FlowableActivity) StartNormalize(
func (a *FlowableActivity) startNormalize(
ctx context.Context,
config *protos.FlowConnectionConfigs,
batchID int64,
Expand All @@ -435,11 +440,6 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(ctx, dstConn)

shutdown := heartbeatRoutine(ctx, func() string {
return "normalizing records from batch for job"
})
defer shutdown()

tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName)
if err != nil {
return fmt.Errorf("failed to get table name schema mapping: %w", err)
Expand Down
6 changes: 1 addition & 5 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
flowName := config.FlowJobName
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
logger := activity.GetLogger(ctx)
shutdown := heartbeatRoutine(ctx, func() string {
return "transferring records for job"
})
defer shutdown()

tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings))
for _, v := range options.TableMappings {
Expand Down Expand Up @@ -636,7 +632,7 @@ func (a *FlowableActivity) normalizeLoop(
case req := <-normalize:
retryLoop:
for {
if err := a.StartNormalize(ctx, config, req.BatchID); err != nil {
if err := a.startNormalize(ctx, config, req.BatchID); err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
for {
// update req to latest normalize request & retry
Expand Down

0 comments on commit 5970177

Please sign in to comment.