diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 6a575f37f..37cff0bb2 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -26,6 +26,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/model" "github.com/PeerDB-io/peerdb/flow/model/qvalue" "github.com/PeerDB-io/peerdb/flow/otel_metrics" + "github.com/PeerDB-io/peerdb/flow/peerdbenv" "github.com/PeerDB-io/peerdb/flow/shared" ) @@ -383,7 +384,12 @@ func PullCdcRecords[Items model.Items]( pkmRequiresResponse := false waitingForCommit := false - + pkmEmptyBatchThrottleEnabled, err := peerdbenv.PeerDBPKMEmptyBatchThrottleEnabled(ctx, req.Env) + if err != nil { + logger.Error("failed to get PeerDBPKMEmptyBatchThrottleEnabled", slog.Any("error", err)) + // No need to fail here, just continue without throttling + } + lastEmptyBatchPkmSentTime := time.Now() for { if pkmRequiresResponse { if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() { @@ -392,6 +398,7 @@ func PullCdcRecords[Items model.Items]( return err } req.ConsumedOffset.Store(int64(clientXLogPos)) + lastEmptyBatchPkmSentTime = time.Now() } if err := sendStandbyAfterReplLock("pkm-response"); err != nil { @@ -501,7 +508,13 @@ func PullCdcRecords[Items model.Items]( if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd } - pkmRequiresResponse = true + if pkmEmptyBatchThrottleEnabled { + if pkm.ReplyRequested || time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { + pkmRequiresResponse = true + } + } else { + pkmRequiresResponse = true + } case pglogrepl.XLogDataByteID: xld, err := pglogrepl.ParseXLogData(msg.Data[1:]) diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index 24151d7fe..d11fd6a3e 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -237,6 +237,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, + { + Name: "PEERDB_PKM_EMPTY_BATCH_THROTTLE_ENABLED", + Description: "Throttle sending KeepAlive response to 1 minute (or if reply is requested) when no records are processed", + DefaultValue: "false", + ValueType: protos.DynconfValueType_BOOL, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, + TargetForSetting: protos.DynconfTarget_ALL, + }, } var DynamicIndex = func() map[string]int { @@ -497,3 +505,7 @@ func PeerDBMaintenanceModeEnabled(ctx context.Context, env map[string]string) (b func UpdatePeerDBMaintenanceModeEnabled(ctx context.Context, pool *pgxpool.Pool, enabled bool) error { return UpdateDynamicSetting(ctx, pool, "PEERDB_MAINTENANCE_MODE_ENABLED", ptr.String(strconv.FormatBool(enabled))) } + +func PeerDBPKMEmptyBatchThrottleEnabled(ctx context.Context, env map[string]string) (bool, error) { + return dynamicConfBool(ctx, env, "PEERDB_PKM_EMPTY_BATCH_THROTTLE_ENABLED") +}