From 96e0673bb8464ba7a1bc874b2c2c9c914a7f3d86 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 10 Jan 2025 04:17:13 +0530 Subject: [PATCH 1/5] fix: space space pkm response to max once per minute --- flow/connectors/postgres/cdc.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 6a575f37f..7d3e43c27 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -386,7 +386,8 @@ func PullCdcRecords[Items model.Items]( for { if pkmRequiresResponse { - if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() { + if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() && + time.Since(standByLastLogged) >= 1*time.Minute { metadata := connmetadata.NewPostgresMetadataFromCatalog(logger, p.catalogPool) if err := metadata.SetLastOffset(ctx, req.FlowJobName, int64(clientXLogPos)); err != nil { return err From a0d0ae2c2626d21eefd097742a9437e599ae384d Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 10 Jan 2025 04:33:14 +0530 Subject: [PATCH 2/5] fix: suggestion --- flow/connectors/postgres/cdc.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 7d3e43c27..be03a41ed 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -383,16 +383,17 @@ func PullCdcRecords[Items model.Items]( pkmRequiresResponse := false waitingForCommit := false - + lastEmptyBatchPkmSentTime := time.Now() for { if pkmRequiresResponse { if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() && - time.Since(standByLastLogged) >= 1*time.Minute { + time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { metadata := connmetadata.NewPostgresMetadataFromCatalog(logger, p.catalogPool) if err := metadata.SetLastOffset(ctx, req.FlowJobName, int64(clientXLogPos)); err != nil { return err } req.ConsumedOffset.Store(int64(clientXLogPos)) + lastEmptyBatchPkmSentTime = time.Now() } if err := sendStandbyAfterReplLock("pkm-response"); err != nil { From 7755201d8f3b4a0356f92511425f982086475e56 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 10 Jan 2025 04:58:42 +0530 Subject: [PATCH 3/5] refactor: always send response if it is requested by primary --- flow/connectors/postgres/cdc.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index be03a41ed..f93dce74c 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -386,8 +386,7 @@ func PullCdcRecords[Items model.Items]( lastEmptyBatchPkmSentTime := time.Now() for { if pkmRequiresResponse { - if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() && - time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { + if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() { metadata := connmetadata.NewPostgresMetadataFromCatalog(logger, p.catalogPool) if err := metadata.SetLastOffset(ctx, req.FlowJobName, int64(clientXLogPos)); err != nil { return err @@ -503,7 +502,9 @@ func PullCdcRecords[Items model.Items]( if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd } - pkmRequiresResponse = true + if pkm.ReplyRequested || time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { + pkmRequiresResponse = true + } case pglogrepl.XLogDataByteID: xld, err := pglogrepl.ParseXLogData(msg.Data[1:]) From f155242911694216141774054ee489c542dea89a Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 10 Jan 2025 05:38:20 +0530 Subject: [PATCH 4/5] refactor: push behind feature flag --- flow/connectors/postgres/cdc.go | 12 +++++++++++- flow/peerdbenv/dynamicconf.go | 12 ++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index f93dce74c..23d271648 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -26,6 +26,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/model" "github.com/PeerDB-io/peerdb/flow/model/qvalue" "github.com/PeerDB-io/peerdb/flow/otel_metrics" + "github.com/PeerDB-io/peerdb/flow/peerdbenv" "github.com/PeerDB-io/peerdb/flow/shared" ) @@ -383,6 +384,11 @@ func PullCdcRecords[Items model.Items]( pkmRequiresResponse := false waitingForCommit := false + pkmEmptyBatchThrottleEnabled, err := peerdbenv.PeerDBPKMEmptyBatchThrottleEnabled(ctx, nil) + if err != nil { + logger.Error("failed to get PeerDBPKMEmptyBatchThrottleEnabled", slog.Any("error", err)) + // No need to fail here, just continue without throttling + } lastEmptyBatchPkmSentTime := time.Now() for { if pkmRequiresResponse { @@ -502,7 +508,11 @@ func PullCdcRecords[Items model.Items]( if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd } - if pkm.ReplyRequested || time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { + if pkmEmptyBatchThrottleEnabled { + if pkm.ReplyRequested || time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { + pkmRequiresResponse = true + } + } else { pkmRequiresResponse = true } diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index 24151d7fe..d11fd6a3e 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -237,6 +237,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, + { + Name: "PEERDB_PKM_EMPTY_BATCH_THROTTLE_ENABLED", + Description: "Throttle sending KeepAlive response to 1 minute (or if reply is requested) when no records are processed", + DefaultValue: "false", + ValueType: protos.DynconfValueType_BOOL, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, + TargetForSetting: protos.DynconfTarget_ALL, + }, } var DynamicIndex = func() map[string]int { @@ -497,3 +505,7 @@ func PeerDBMaintenanceModeEnabled(ctx context.Context, env map[string]string) (b func UpdatePeerDBMaintenanceModeEnabled(ctx context.Context, pool *pgxpool.Pool, enabled bool) error { return UpdateDynamicSetting(ctx, pool, "PEERDB_MAINTENANCE_MODE_ENABLED", ptr.String(strconv.FormatBool(enabled))) } + +func PeerDBPKMEmptyBatchThrottleEnabled(ctx context.Context, env map[string]string) (bool, error) { + return dynamicConfBool(ctx, env, "PEERDB_PKM_EMPTY_BATCH_THROTTLE_ENABLED") +} From c01fea4dc956a48f79fb9e821ae98a2e808ab7a6 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 10 Jan 2025 05:44:06 +0530 Subject: [PATCH 5/5] fix: passing env to dynamic config --- flow/connectors/postgres/cdc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 23d271648..37cff0bb2 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -384,7 +384,7 @@ func PullCdcRecords[Items model.Items]( pkmRequiresResponse := false waitingForCommit := false - pkmEmptyBatchThrottleEnabled, err := peerdbenv.PeerDBPKMEmptyBatchThrottleEnabled(ctx, nil) + pkmEmptyBatchThrottleEnabled, err := peerdbenv.PeerDBPKMEmptyBatchThrottleEnabled(ctx, req.Env) if err != nil { logger.Error("failed to get PeerDBPKMEmptyBatchThrottleEnabled", slog.Any("error", err)) // No need to fail here, just continue without throttling