From 7755201d8f3b4a0356f92511425f982086475e56 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 10 Jan 2025 04:58:42 +0530 Subject: [PATCH] refactor: always send response if it is requested by primary --- flow/connectors/postgres/cdc.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index be03a41ed..f93dce74c 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -386,8 +386,7 @@ func PullCdcRecords[Items model.Items]( lastEmptyBatchPkmSentTime := time.Now() for { if pkmRequiresResponse { - if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() && - time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { + if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() { metadata := connmetadata.NewPostgresMetadataFromCatalog(logger, p.catalogPool) if err := metadata.SetLastOffset(ctx, req.FlowJobName, int64(clientXLogPos)); err != nil { return err @@ -503,7 +502,9 @@ func PullCdcRecords[Items model.Items]( if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd } - pkmRequiresResponse = true + if pkm.ReplyRequested || time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { + pkmRequiresResponse = true + } case pglogrepl.XLogDataByteID: xld, err := pglogrepl.ParseXLogData(msg.Data[1:])