Skip to content

Commit

Permalink
heartbeat info on batch ids / waiting status
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 30, 2024
1 parent c099f69 commit ac39606
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 93 deletions.
130 changes: 66 additions & 64 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ type NormalizeBatchRequest struct {
BatchID int64
}

type CdcState struct {
connector connectors.CDCPullConnectorCore
syncDone chan struct{}
normalize chan NormalizeBatchRequest
errGroup *errgroup.Group
}

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
Expand Down Expand Up @@ -249,40 +242,69 @@ func (a *FlowableActivity) CreateNormalizedTable(
}, nil
}

func (a *FlowableActivity) maintainPull(
func (a *FlowableActivity) SyncFlow(
ctx context.Context,
config *protos.FlowConnectionConfigs,
) (CdcState, context.Context, error) {
options *protos.SyncFlowOptions,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, config.Env, a.CatalogPool, config.SourceName)
logger := activity.GetLogger(ctx)

currentSyncFlowNum := atomic.Int32{}
totalRecordsSynced := atomic.Int64{}
normalizingBatchID := atomic.Int64{}
normalizeWaiting := atomic.Bool{}
syncingBatchID := atomic.Int64{}
syncWaiting := atomic.Bool{}

shutdown := heartbeatRoutine(ctx, func() string {
// Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch
sBatchID := syncingBatchID.Load()
nBatchID := normalizingBatchID.Load()
var sWaiting, nWaiting string
if syncWaiting.Load() {
sWaiting = " (W)"
}
if normalizeWaiting.Load() {
nWaiting = " (W)"
}
return fmt.Sprintf(
"currentSyncFlowNum:%d, totalRecordsSynced:%d, syncingBatchID:%d%s, normalizingBatchID:%d%s",
currentSyncFlowNum.Load(), totalRecordsSynced.Load(),
sBatchID, sWaiting, nBatchID, nWaiting,
)
})
defer shutdown()

srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, config.Env, a.CatalogPool, config.SourceName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return CdcState{}, nil, err
return err
}

if err := srcConn.SetupReplConn(ctx); err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
connectors.CloseConnector(ctx, srcConn)
return CdcState{}, nil, err
return err
}

normalizeBufferSize, err := peerdbenv.PeerDBNormalizeChannelBufferSize(ctx, config.Env)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
connectors.CloseConnector(ctx, srcConn)
return CdcState{}, nil, err
return err
}

// syncDone will be closed by SyncFlow,
// whereas normalizeDone will be closed by normalizing goroutine
// Wait on normalizeDone at end to not interrupt final normalize
syncDone := make(chan struct{})
normalize := make(chan NormalizeBatchRequest, normalizeBufferSize)
normRequests := make(chan NormalizeBatchRequest, normalizeBufferSize)

group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
// returning error signals sync to stop, normalize can recover connections without interrupting sync, so never return error
a.normalizeLoop(groupCtx, config, syncDone, normalize)
a.normalizeLoop(groupCtx, config, syncDone, normRequests, &normalizingBatchID, &normalizeWaiting)
return nil
})
group.Go(func() error {
Expand All @@ -294,69 +316,41 @@ func (a *FlowableActivity) maintainPull(
return nil
})

return CdcState{
connector: srcConn,
syncDone: syncDone,
normalize: normalize,
errGroup: group,
}, groupCtx, nil
}

func (a *FlowableActivity) SyncFlow(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

shutdown := heartbeatRoutine(ctx, func() string {
return "syncing batch"
})
defer shutdown()

cdcState, groupCtx, err := a.maintainPull(ctx, config)
if err != nil {
logger.Error("MaintainPull failed", slog.Any("error", err))
return err
}

currentSyncFlowNum := int32(0)
totalRecordsSynced := int64(0)

for groupCtx.Err() == nil {
currentSyncFlowNum += 1
logger.Info("executing sync flow", slog.Int("count", int(currentSyncFlowNum)))
logger.Info("executing sync flow", slog.Int64("count", int64(currentSyncFlowNum.Add(1))))

var numRecordsSynced int64
var syncResponse *model.SyncResponse
var syncErr error
if config.System == protos.TypeSystem_Q {
numRecordsSynced, syncErr = a.syncRecords(groupCtx, config, options, cdcState)
syncResponse, syncErr = a.syncRecords(groupCtx, config, options, srcConn.(connectors.CDCPullConnector),
normRequests, &syncingBatchID, &syncWaiting)
} else {
numRecordsSynced, syncErr = a.syncPg(groupCtx, config, options, cdcState)
syncResponse, syncErr = a.syncPg(groupCtx, config, options, srcConn.(connectors.CDCPullPgConnector),
normRequests, &syncingBatchID, &syncWaiting)
}
syncWaiting.Store(true)

if syncErr != nil {
if groupCtx.Err() != nil {
// need to return ctx.Err(), avoid returning syncErr that's wrapped context canceled
break
}
logger.Error("failed to sync records", slog.Any("error", syncErr))
close(cdcState.syncDone)
return errors.Join(syncErr, cdcState.errGroup.Wait())
close(syncDone)
return errors.Join(syncErr, group.Wait())
} else {
totalRecordsSynced += numRecordsSynced
logger.Info("synced records",
slog.Int64("numRecordsSynced", numRecordsSynced), slog.Int64("totalRecordsSynced", totalRecordsSynced))
totalRecordsSynced.Add(syncResponse.NumRecordsSynced)
logger.Info("synced records", slog.Int64("numRecordsSynced", syncResponse.NumRecordsSynced),
slog.Int64("totalRecordsSynced", totalRecordsSynced.Load()))

if options.NumberOfSyncs > 0 && currentSyncFlowNum >= options.NumberOfSyncs {
if options.NumberOfSyncs > 0 && currentSyncFlowNum.Load() >= options.NumberOfSyncs {
break
}
}
}

close(cdcState.syncDone)
waitErr := cdcState.errGroup.Wait()
close(syncDone)
waitErr := group.Wait()
if err := ctx.Err(); err != nil {
logger.Info("sync canceled", slog.Any("error", err))
return err
Expand All @@ -371,8 +365,11 @@ func (a *FlowableActivity) syncRecords(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
cdcState CdcState,
) (int64, error) {
srcConn connectors.CDCPullConnector,
normRequests chan<- NormalizeBatchRequest,
syncingBatchID *atomic.Int64,
syncWaiting *atomic.Bool,
) (*model.SyncResponse, error) {
var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error)
if config.Script != "" {
var onErr context.CancelCauseFunc
Expand Down Expand Up @@ -403,7 +400,8 @@ func (a *FlowableActivity) syncRecords(
return stream, nil
}
}
return syncCore(ctx, a, config, options, cdcState, adaptStream,
return syncCore(ctx, a, config, options, srcConn, normRequests,
syncingBatchID, syncWaiting, adaptStream,
connectors.CDCPullConnector.PullRecords,
connectors.CDCSyncConnector.SyncRecords)
}
Expand All @@ -412,9 +410,13 @@ func (a *FlowableActivity) syncPg(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
cdcState CdcState,
) (int64, error) {
return syncCore(ctx, a, config, options, cdcState, nil,
srcConn connectors.CDCPullPgConnector,
normRequests chan<- NormalizeBatchRequest,
syncingBatchID *atomic.Int64,
syncWaiting *atomic.Bool,
) (*model.SyncResponse, error) {
return syncCore(ctx, a, config, options, srcConn, normRequests,
syncingBatchID, syncWaiting, nil,
connectors.CDCPullPgConnector.PullPg,
connectors.CDCSyncPgConnector.SyncPg)
}
Expand Down
Loading

0 comments on commit ac39606

Please sign in to comment.