From da373763c426671469f9faf5a6aaac44e1d2a550 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Sun, 5 Jan 2025 17:43:38 +0530 Subject: [PATCH 1/3] [snapshot] close stream when fail to get srcConn in errgroup --- flow/activities/flowable_core.go | 3 ++- flow/model/qrecord_stream.go | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 6aee4a7e0..c8233fe01 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -387,7 +387,7 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto } // replicateQRepPartition replicates a QRepPartition from the source to the destination. -func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConnectorCore, TPull connectors.QRepPullConnectorCore]( +func replicateQRepPartition[TRead any, TWrite model.StreamCloser, TSync connectors.QRepSyncConnectorCore, TPull connectors.QRepPullConnectorCore]( ctx context.Context, a *FlowableActivity, config *protos.QRepConfig, @@ -440,6 +440,7 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn srcConn, err := connectors.GetByNameAs[TPull](ctx, config.Env, a.CatalogPool, config.SourceName) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + stream.Close(err) return fmt.Errorf("failed to get qrep source connector: %w", err) } defer connectors.CloseConnector(ctx, srcConn) diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index eeca424b3..f971c9cf8 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -4,6 +4,10 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) +type StreamCloser interface { + Close(error) +} + type QRecordStream struct { schemaLatch chan struct{} Records chan []qvalue.QValue From 5c16092927dbb16c96f4bcfe7fcfb0980e99adcb Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Sun, 5 Jan 2025 20:16:01 +0530 Subject: [PATCH 2/3] move interface because weird lint --- flow/activities/flowable.go | 4 ++++ flow/model/qrecord_stream.go | 4 ---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index e867876fb..af18ba566 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -47,6 +47,10 @@ type FlowableActivity struct { OtelManager *otel_metrics.OtelManager } +type StreamCloser interface { + Close(error) +} + func (a *FlowableActivity) CheckConnection( ctx context.Context, config *protos.SetupInput, diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index f971c9cf8..eeca424b3 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -4,10 +4,6 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) -type StreamCloser interface { - Close(error) -} - type QRecordStream struct { schemaLatch chan struct{} Records chan []qvalue.QValue From f738bf026ddbe36a2bf7e785a3b4357edf5c39ec Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Sun, 5 Jan 2025 20:27:05 +0530 Subject: [PATCH 3/3] try Cmd+S --- flow/activities/flowable_core.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index c8233fe01..32b404891 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -387,7 +387,7 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto } // replicateQRepPartition replicates a QRepPartition from the source to the destination. -func replicateQRepPartition[TRead any, TWrite model.StreamCloser, TSync connectors.QRepSyncConnectorCore, TPull connectors.QRepPullConnectorCore]( +func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRepSyncConnectorCore, TPull connectors.QRepPullConnectorCore]( ctx context.Context, a *FlowableActivity, config *protos.QRepConfig,