From 64159196fa72a71dce40388170182c973737f77d Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 10 Jan 2025 04:58:42 +0530 Subject: [PATCH] refactor: always send response if it is requested by primary --- flow/connectors/postgres/cdc.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index be03a41ed..3edbb165e 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -386,14 +386,12 @@ func PullCdcRecords[Items model.Items]( lastEmptyBatchPkmSentTime := time.Now() for { if pkmRequiresResponse { - if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() && - time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { + if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() { metadata := connmetadata.NewPostgresMetadataFromCatalog(logger, p.catalogPool) if err := metadata.SetLastOffset(ctx, req.FlowJobName, int64(clientXLogPos)); err != nil { return err } req.ConsumedOffset.Store(int64(clientXLogPos)) - lastEmptyBatchPkmSentTime = time.Now() } if err := sendStandbyAfterReplLock("pkm-response"); err != nil { @@ -503,7 +501,10 @@ func PullCdcRecords[Items model.Items]( if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd } - pkmRequiresResponse = true + if pkm.ReplyRequested || time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { + pkmRequiresResponse = true + lastEmptyBatchPkmSentTime = time.Now() + } case pglogrepl.XLogDataByteID: xld, err := pglogrepl.ParseXLogData(msg.Data[1:])