diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index e867876fb..af18ba566 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -47,6 +47,10 @@ type FlowableActivity struct { OtelManager *otel_metrics.OtelManager } +type StreamCloser interface { + Close(error) +} + func (a *FlowableActivity) CheckConnection( ctx context.Context, config *protos.SetupInput, diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 6aee4a7e0..32b404891 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -387,7 +387,7 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto } // replicateQRepPartition replicates a QRepPartition from the source to the destination. -func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConnectorCore, TPull connectors.QRepPullConnectorCore]( +func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRepSyncConnectorCore, TPull connectors.QRepPullConnectorCore]( ctx context.Context, a *FlowableActivity, config *protos.QRepConfig, @@ -440,6 +440,7 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn srcConn, err := connectors.GetByNameAs[TPull](ctx, config.Env, a.CatalogPool, config.SourceName) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + stream.Close(err) return fmt.Errorf("failed to get qrep source connector: %w", err) } defer connectors.CloseConnector(ctx, srcConn)