diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index be03a41ed..f93dce74c 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -386,8 +386,7 @@ func PullCdcRecords[Items model.Items]( lastEmptyBatchPkmSentTime := time.Now() for { if pkmRequiresResponse { - if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() && - time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { + if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() { metadata := connmetadata.NewPostgresMetadataFromCatalog(logger, p.catalogPool) if err := metadata.SetLastOffset(ctx, req.FlowJobName, int64(clientXLogPos)); err != nil { return err @@ -503,7 +502,9 @@ func PullCdcRecords[Items model.Items]( if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd } - pkmRequiresResponse = true + if pkm.ReplyRequested || time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { + pkmRequiresResponse = true + } case pglogrepl.XLogDataByteID: xld, err := pglogrepl.ParseXLogData(msg.Data[1:])