Skip to content

Commit

Permalink
refactor: always send response if it is requested by primary
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Jan 9, 2025
1 parent a0d0ae2 commit 6415919
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,14 +386,12 @@ func PullCdcRecords[Items model.Items](
lastEmptyBatchPkmSentTime := time.Now()
for {
if pkmRequiresResponse {
if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() &&
time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute {
if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() {
metadata := connmetadata.NewPostgresMetadataFromCatalog(logger, p.catalogPool)
if err := metadata.SetLastOffset(ctx, req.FlowJobName, int64(clientXLogPos)); err != nil {
return err
}
req.ConsumedOffset.Store(int64(clientXLogPos))
lastEmptyBatchPkmSentTime = time.Now()
}

if err := sendStandbyAfterReplLock("pkm-response"); err != nil {
Expand Down Expand Up @@ -503,7 +501,10 @@ func PullCdcRecords[Items model.Items](
if pkm.ServerWALEnd > clientXLogPos {
clientXLogPos = pkm.ServerWALEnd
}
pkmRequiresResponse = true
if pkm.ReplyRequested || time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute {
pkmRequiresResponse = true
lastEmptyBatchPkmSentTime = time.Now()
}

case pglogrepl.XLogDataByteID:
xld, err := pglogrepl.ParseXLogData(msg.Data[1:])
Expand Down

0 comments on commit 6415919

Please sign in to comment.