diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index be03a41ed..3edbb165e 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -386,14 +386,12 @@ func PullCdcRecords[Items model.Items]( lastEmptyBatchPkmSentTime := time.Now() for { if pkmRequiresResponse { - if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() && - time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { + if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() { metadata := connmetadata.NewPostgresMetadataFromCatalog(logger, p.catalogPool) if err := metadata.SetLastOffset(ctx, req.FlowJobName, int64(clientXLogPos)); err != nil { return err } req.ConsumedOffset.Store(int64(clientXLogPos)) - lastEmptyBatchPkmSentTime = time.Now() } if err := sendStandbyAfterReplLock("pkm-response"); err != nil { @@ -503,7 +501,10 @@ func PullCdcRecords[Items model.Items]( if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd } - pkmRequiresResponse = true + if pkm.ReplyRequested || time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { + pkmRequiresResponse = true + lastEmptyBatchPkmSentTime = time.Now() + } case pglogrepl.XLogDataByteID: xld, err := pglogrepl.ParseXLogData(msg.Data[1:])