Skip to content

Commit

Permalink
[snapshot] close stream when fail to get srcConn in errgroup (#2409)
Browse files Browse the repository at this point in the history
if pull connector fails to initialize due to some reason and sync is
already stuck on `chan receive`, then partClone workflow will hang
indefinitely. fixed by closing stream explicitly before pull side
errgroup goroutine returns.
  • Loading branch information
heavycrystal authored Jan 5, 2025
1 parent a4d6525 commit bb88bd1
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
4 changes: 4 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type FlowableActivity struct {
OtelManager *otel_metrics.OtelManager
}

type StreamCloser interface {
Close(error)
}

func (a *FlowableActivity) CheckConnection(
ctx context.Context,
config *protos.SetupInput,
Expand Down
3 changes: 2 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
}

// replicateQRepPartition replicates a QRepPartition from the source to the destination.
func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConnectorCore, TPull connectors.QRepPullConnectorCore](
func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRepSyncConnectorCore, TPull connectors.QRepPullConnectorCore](
ctx context.Context,
a *FlowableActivity,
config *protos.QRepConfig,
Expand Down Expand Up @@ -440,6 +440,7 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
srcConn, err := connectors.GetByNameAs[TPull](ctx, config.Env, a.CatalogPool, config.SourceName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
stream.Close(err)
return fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)
Expand Down

0 comments on commit bb88bd1

Please sign in to comment.