Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SyncFlow: top level heartbeat #2401

Merged
merged 4 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 68 additions & 67 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ type NormalizeBatchRequest struct {
BatchID int64
}

type CdcState struct {
connector connectors.CDCPullConnectorCore
syncDone chan struct{}
normalize chan NormalizeBatchRequest
errGroup *errgroup.Group
}

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
Expand Down Expand Up @@ -249,40 +242,67 @@ func (a *FlowableActivity) CreateNormalizedTable(
}, nil
}

func (a *FlowableActivity) maintainPull(
func (a *FlowableActivity) SyncFlow(
ctx context.Context,
config *protos.FlowConnectionConfigs,
) (CdcState, context.Context, error) {
options *protos.SyncFlowOptions,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, config.Env, a.CatalogPool, config.SourceName)
logger := activity.GetLogger(ctx)

var currentSyncFlowNum atomic.Int32
var totalRecordsSynced atomic.Int64
var normalizingBatchID atomic.Int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could probably store some of these in substruct of FlowableActivity and keep calls clean

var normalizeWaiting atomic.Bool
var syncingBatchID atomic.Int64
var syncState atomic.Pointer[string]
syncState.Store(shared.Ptr("setup"))

shutdown := heartbeatRoutine(ctx, func() string {
// Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch
sBatchID := syncingBatchID.Load()
nBatchID := normalizingBatchID.Load()
var nWaiting string
if normalizeWaiting.Load() {
nWaiting = " (W)"
}
return fmt.Sprintf(
"currentSyncFlowNum:%d, totalRecordsSynced:%d, syncingBatchID:%d (%s), normalizingBatchID:%d%s",
currentSyncFlowNum.Load(), totalRecordsSynced.Load(),
sBatchID, *syncState.Load(), nBatchID, nWaiting,
)
})
defer shutdown()

srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, config.Env, a.CatalogPool, config.SourceName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return CdcState{}, nil, err
return err
}

if err := srcConn.SetupReplConn(ctx); err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
connectors.CloseConnector(ctx, srcConn)
return CdcState{}, nil, err
return err
}

normalizeBufferSize, err := peerdbenv.PeerDBNormalizeChannelBufferSize(ctx, config.Env)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
connectors.CloseConnector(ctx, srcConn)
return CdcState{}, nil, err
return err
}

// syncDone will be closed by SyncFlow,
// whereas normalizeDone will be closed by normalizing goroutine
// Wait on normalizeDone at end to not interrupt final normalize
syncDone := make(chan struct{})
normalize := make(chan NormalizeBatchRequest, normalizeBufferSize)
normRequests := make(chan NormalizeBatchRequest, normalizeBufferSize)

group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
// returning error signals sync to stop, normalize can recover connections without interrupting sync, so never return error
a.normalizeLoop(groupCtx, config, syncDone, normalize)
a.normalizeLoop(groupCtx, logger, config, syncDone, normRequests, &normalizingBatchID, &normalizeWaiting)
return nil
})
group.Go(func() error {
Expand All @@ -294,41 +314,17 @@ func (a *FlowableActivity) maintainPull(
return nil
})

return CdcState{
connector: srcConn,
syncDone: syncDone,
normalize: normalize,
errGroup: group,
}, groupCtx, nil
}

func (a *FlowableActivity) SyncFlow(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

cdcState, groupCtx, err := a.maintainPull(ctx, config)
if err != nil {
logger.Error("MaintainPull failed", slog.Any("error", err))
return err
}

currentSyncFlowNum := int32(0)
totalRecordsSynced := int64(0)

for groupCtx.Err() == nil {
currentSyncFlowNum += 1
logger.Info("executing sync flow", slog.Int("count", int(currentSyncFlowNum)))
logger.Info("executing sync flow", slog.Int64("count", int64(currentSyncFlowNum.Add(1))))

var numRecordsSynced int64
var syncResponse *model.SyncResponse
var syncErr error
if config.System == protos.TypeSystem_Q {
numRecordsSynced, syncErr = a.SyncRecords(groupCtx, config, options, cdcState)
syncResponse, syncErr = a.syncRecords(groupCtx, config, options, srcConn.(connectors.CDCPullConnector),
normRequests, &syncingBatchID, &syncState)
} else {
numRecordsSynced, syncErr = a.SyncPg(groupCtx, config, options, cdcState)
syncResponse, syncErr = a.syncPg(groupCtx, config, options, srcConn.(connectors.CDCPullPgConnector),
normRequests, &syncingBatchID, &syncState)
}

if syncErr != nil {
Expand All @@ -337,21 +333,23 @@ func (a *FlowableActivity) SyncFlow(
break
}
logger.Error("failed to sync records", slog.Any("error", syncErr))
close(cdcState.syncDone)
return errors.Join(syncErr, cdcState.errGroup.Wait())
syncState.Store(shared.Ptr("cleanup"))
close(syncDone)
return errors.Join(syncErr, group.Wait())
} else {
totalRecordsSynced += numRecordsSynced
logger.Info("synced records",
slog.Int64("numRecordsSynced", numRecordsSynced), slog.Int64("totalRecordsSynced", totalRecordsSynced))
totalRecordsSynced.Add(syncResponse.NumRecordsSynced)
logger.Info("synced records", slog.Int64("numRecordsSynced", syncResponse.NumRecordsSynced),
slog.Int64("totalRecordsSynced", totalRecordsSynced.Load()))

if options.NumberOfSyncs > 0 && currentSyncFlowNum >= options.NumberOfSyncs {
if options.NumberOfSyncs > 0 && currentSyncFlowNum.Load() >= options.NumberOfSyncs {
break
}
}
}

close(cdcState.syncDone)
waitErr := cdcState.errGroup.Wait()
syncState.Store(shared.Ptr("cleanup"))
close(syncDone)
waitErr := group.Wait()
if err := ctx.Err(); err != nil {
logger.Info("sync canceled", slog.Any("error", err))
return err
Expand All @@ -362,12 +360,15 @@ func (a *FlowableActivity) SyncFlow(
return nil
}

func (a *FlowableActivity) SyncRecords(
func (a *FlowableActivity) syncRecords(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
cdcState CdcState,
) (int64, error) {
srcConn connectors.CDCPullConnector,
normRequests chan<- NormalizeBatchRequest,
syncingBatchID *atomic.Int64,
syncWaiting *atomic.Pointer[string],
) (*model.SyncResponse, error) {
var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error)
if config.Script != "" {
var onErr context.CancelCauseFunc
Expand Down Expand Up @@ -398,23 +399,28 @@ func (a *FlowableActivity) SyncRecords(
return stream, nil
}
}
return syncCore(ctx, a, config, options, cdcState, adaptStream,
return syncCore(ctx, a, config, options, srcConn, normRequests,
syncingBatchID, syncWaiting, adaptStream,
connectors.CDCPullConnector.PullRecords,
connectors.CDCSyncConnector.SyncRecords)
}

func (a *FlowableActivity) SyncPg(
func (a *FlowableActivity) syncPg(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
cdcState CdcState,
) (int64, error) {
return syncCore(ctx, a, config, options, cdcState, nil,
srcConn connectors.CDCPullPgConnector,
normRequests chan<- NormalizeBatchRequest,
syncingBatchID *atomic.Int64,
syncWaiting *atomic.Pointer[string],
) (*model.SyncResponse, error) {
return syncCore(ctx, a, config, options, srcConn, normRequests,
syncingBatchID, syncWaiting, nil,
connectors.CDCPullPgConnector.PullPg,
connectors.CDCSyncPgConnector.SyncPg)
}

func (a *FlowableActivity) StartNormalize(
func (a *FlowableActivity) startNormalize(
ctx context.Context,
config *protos.FlowConnectionConfigs,
batchID int64,
Expand All @@ -435,11 +441,6 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(ctx, dstConn)

shutdown := heartbeatRoutine(ctx, func() string {
return "normalizing records from batch for job"
})
defer shutdown()

tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName)
if err != nil {
return fmt.Errorf("failed to get table name schema mapping: %w", err)
Expand Down
Loading
Loading