Skip to content

Commit

Permalink
fix: space pkm response to max once per minute (#2422)
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta authored Jan 10, 2025
1 parent 48930ad commit b0e3a05
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
17 changes: 15 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/PeerDB-io/peerdb/flow/model"
"github.com/PeerDB-io/peerdb/flow/model/qvalue"
"github.com/PeerDB-io/peerdb/flow/otel_metrics"
"github.com/PeerDB-io/peerdb/flow/peerdbenv"
"github.com/PeerDB-io/peerdb/flow/shared"
)

Expand Down Expand Up @@ -383,7 +384,12 @@ func PullCdcRecords[Items model.Items](

pkmRequiresResponse := false
waitingForCommit := false

pkmEmptyBatchThrottleEnabled, err := peerdbenv.PeerDBPKMEmptyBatchThrottleEnabled(ctx, req.Env)
if err != nil {
logger.Error("failed to get PeerDBPKMEmptyBatchThrottleEnabled", slog.Any("error", err))
// No need to fail here, just continue without throttling
}
lastEmptyBatchPkmSentTime := time.Now()
for {
if pkmRequiresResponse {
if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() {
Expand All @@ -392,6 +398,7 @@ func PullCdcRecords[Items model.Items](
return err
}
req.ConsumedOffset.Store(int64(clientXLogPos))
lastEmptyBatchPkmSentTime = time.Now()
}

if err := sendStandbyAfterReplLock("pkm-response"); err != nil {
Expand Down Expand Up @@ -501,7 +508,13 @@ func PullCdcRecords[Items model.Items](
if pkm.ServerWALEnd > clientXLogPos {
clientXLogPos = pkm.ServerWALEnd
}
pkmRequiresResponse = true
if pkmEmptyBatchThrottleEnabled {
if pkm.ReplyRequested || time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute {
pkmRequiresResponse = true
}
} else {
pkmRequiresResponse = true
}

case pglogrepl.XLogDataByteID:
xld, err := pglogrepl.ParseXLogData(msg.Data[1:])
Expand Down
12 changes: 12 additions & 0 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_PKM_EMPTY_BATCH_THROTTLE_ENABLED",
Description: "Throttle sending KeepAlive response to 1 minute (or if reply is requested) when no records are processed",
DefaultValue: "false",
ValueType: protos.DynconfValueType_BOOL,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
TargetForSetting: protos.DynconfTarget_ALL,
},
}

var DynamicIndex = func() map[string]int {
Expand Down Expand Up @@ -497,3 +505,7 @@ func PeerDBMaintenanceModeEnabled(ctx context.Context, env map[string]string) (b
func UpdatePeerDBMaintenanceModeEnabled(ctx context.Context, pool *pgxpool.Pool, enabled bool) error {
return UpdateDynamicSetting(ctx, pool, "PEERDB_MAINTENANCE_MODE_ENABLED", ptr.String(strconv.FormatBool(enabled)))
}

func PeerDBPKMEmptyBatchThrottleEnabled(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_PKM_EMPTY_BATCH_THROTTLE_ENABLED")
}

0 comments on commit b0e3a05

Please sign in to comment.