Skip to content

Commit

Permalink
SyncFlow: top level heartbeat
Browse files Browse the repository at this point in the history
was seeing heartbeat timeouts
  • Loading branch information
serprex committed Dec 30, 2024
1 parent 4cd7c60 commit c099f69
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 17 deletions.
20 changes: 10 additions & 10 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ func (a *FlowableActivity) SyncFlow(
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

shutdown := heartbeatRoutine(ctx, func() string {
return "syncing batch"
})
defer shutdown()

cdcState, groupCtx, err := a.maintainPull(ctx, config)
if err != nil {
logger.Error("MaintainPull failed", slog.Any("error", err))
Expand All @@ -326,9 +331,9 @@ func (a *FlowableActivity) SyncFlow(
var numRecordsSynced int64
var syncErr error
if config.System == protos.TypeSystem_Q {
numRecordsSynced, syncErr = a.SyncRecords(groupCtx, config, options, cdcState)
numRecordsSynced, syncErr = a.syncRecords(groupCtx, config, options, cdcState)
} else {
numRecordsSynced, syncErr = a.SyncPg(groupCtx, config, options, cdcState)
numRecordsSynced, syncErr = a.syncPg(groupCtx, config, options, cdcState)
}

if syncErr != nil {
Expand Down Expand Up @@ -362,7 +367,7 @@ func (a *FlowableActivity) SyncFlow(
return nil
}

func (a *FlowableActivity) SyncRecords(
func (a *FlowableActivity) syncRecords(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
Expand Down Expand Up @@ -403,7 +408,7 @@ func (a *FlowableActivity) SyncRecords(
connectors.CDCSyncConnector.SyncRecords)
}

func (a *FlowableActivity) SyncPg(
func (a *FlowableActivity) syncPg(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
Expand All @@ -414,7 +419,7 @@ func (a *FlowableActivity) SyncPg(
connectors.CDCSyncPgConnector.SyncPg)
}

func (a *FlowableActivity) StartNormalize(
func (a *FlowableActivity) startNormalize(
ctx context.Context,
config *protos.FlowConnectionConfigs,
batchID int64,
Expand All @@ -435,11 +440,6 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(ctx, dstConn)

shutdown := heartbeatRoutine(ctx, func() string {
return "normalizing records from batch for job"
})
defer shutdown()

tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName)
if err != nil {
return fmt.Errorf("failed to get table name schema mapping: %w", err)
Expand Down
8 changes: 1 addition & 7 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
flowName := config.FlowJobName
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
logger := activity.GetLogger(ctx)
shutdown := heartbeatRoutine(ctx, func() string {
return "transferring records for job"
})
defer shutdown()

tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings))
for _, v := range options.TableMappings {
Expand Down Expand Up @@ -306,7 +302,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon

pushedRecordsWithCount := fmt.Sprintf("pushed %d records for batch %d in %v",
res.NumRecordsSynced, res.CurrentSyncBatchID, syncDuration.Truncate(time.Second))
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)
a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount)

if a.OtelManager != nil {
Expand Down Expand Up @@ -609,7 +604,6 @@ func (a *FlowableActivity) maintainReplConn(
for {
select {
case <-ticker.C:
activity.RecordHeartbeat(ctx, "keep session alive")
if err := srcConn.ReplPing(ctx); err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("connection to source down: %w", err)
Expand All @@ -636,7 +630,7 @@ func (a *FlowableActivity) normalizeLoop(
case req := <-normalize:
retryLoop:
for {
if err := a.StartNormalize(ctx, config, req.BatchID); err != nil {
if err := a.startNormalize(ctx, config, req.BatchID); err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
for {
// update req to latest normalize request & retry
Expand Down

0 comments on commit c099f69

Please sign in to comment.