Skip to content

Commit

Permalink
Avoid retry when deletes non exists visibility message (#6260)
Browse files Browse the repository at this point in the history
* Check if GenericBulkableRequest is delete request to avoid retry during migration
  • Loading branch information
neil-xie authored Sep 4, 2024
1 parent 30d403f commit f6e4360
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 1 deletion.
34 changes: 33 additions & 1 deletion service/worker/indexer/esProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,28 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka
tag.WorkflowID(wid),
tag.WorkflowRunID(rid),
tag.WorkflowDomainID(domainID))

// check if it is a delete request and status code
// 404 means the document does not exist
// 409 means means the document's version does not match (or if the document has been updated or deleted by another process)
// this can happen during the data migration, the doc was deleted in the old index but not exists in the new index
if err.Status == 409 || err.Status == 404 {
status := err.Status
req, err := request.Source()
if err == nil {
if p.isDeleteRequest(req) {
p.logger.Info("Delete request encountered a version conflict. Acknowledging to prevent retry.",
tag.ESResponseStatus(status), tag.ESRequest(request.String()),
tag.WorkflowID(wid),
tag.WorkflowRunID(rid),
tag.WorkflowDomainID(domainID))
p.ackKafkaMsg(key)
}
} else {
p.logger.Error("Get request source err.", tag.Error(err), tag.ESRequest(request.String()))
p.scope.IncCounter(metrics.ESProcessorCorruptedData)
}
}
p.nackKafkaMsg(key)
} else {
p.logger.Error("ES request failed", tag.ESRequest(request.String()))
Expand Down Expand Up @@ -224,7 +246,7 @@ func (p *ESProcessorImpl) retrieveKafkaKey(request bulk.GenericBulkableRequest)
}

var key string
if len(req) == 2 { // index or update requests
if !p.isDeleteRequest(req) { // index or update requests
var body map[string]interface{}
if err := json.Unmarshal([]byte(req[1]), &body); err != nil {
p.logger.Error("Unmarshal index request body err.", tag.Error(err))
Expand Down Expand Up @@ -287,6 +309,16 @@ func (p *ESProcessorImpl) hashFn(key interface{}) uint32 {
return uint32(common.WorkflowIDToHistoryShard(id, numOfShards))
}

func (p *ESProcessorImpl) isDeleteRequest(request []string) bool {
// The Source() method typically returns a slice of strings, where each string represents a part of the bulk request in JSON format.
// For delete operations, the Source() method typically returns only one part
// The metadata that specifies the delete action, including _index and _id.
// "{\"delete\":{\"_index\":\"my-index\",\"_id\":\"1\"}}"
// For index/update operations, the Source() method typically returns two parts
// reference: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/docs-bulk.html
return len(request) == 1
}

// 409 - Version Conflict
// 404 - Not Found
func isResponseSuccess(status int) bool {
Expand Down
85 changes: 85 additions & 0 deletions service/worker/indexer/esProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,44 @@ func (s *esProcessorSuite) TestBulkAfterAction_Error() {
s.esProcessor.bulkAfterAction(0, requests, response, &bulk.GenericError{Details: fmt.Errorf("some error")})
}

func (s *esProcessorSuite) TestBulkAfterAction_Error_Nack() {
version := int64(3)
testKey := "testKey"
request := &mocks2.GenericBulkableRequest{}
request.On("String").Return("")
request.On("Source").Return([]string{`{"delete":{"_index":"test-index","_id":"testKey"}}`}, nil)
requests := []bulk.GenericBulkableRequest{request}

mFailed := map[string]*bulk.GenericBulkResponseItem{
"delete": {
Index: testIndex,
Type: testType,
ID: testID,
Version: version,
Status: 409,
},
}
response := &bulk.GenericBulkResponse{
Took: 3,
Errors: true,
Items: []map[string]*bulk.GenericBulkResponseItem{mFailed},
}

wid := "test-workflowID"
rid := "test-runID"
domainID := "test-domainID"
payload := s.getEncodedMsg(wid, rid, domainID)

mockKafkaMsg := &msgMocks.Message{}
mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch)
s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal)
mockKafkaMsg.On("Nack").Return(nil).Once()
mockKafkaMsg.On("Ack").Return(nil).Once() // Expect Ack to be called
mockKafkaMsg.On("Value").Return(payload).Once()
s.mockScope.On("IncCounter", metrics.ESProcessorFailures).Once()
s.esProcessor.bulkAfterAction(0, requests, response, &bulk.GenericError{Status: 404, Details: fmt.Errorf("some error")})
}

func (s *esProcessorSuite) TestAckKafkaMsg() {
key := "test-key"
// no msg in map, nothing called
Expand Down Expand Up @@ -402,3 +440,50 @@ func (s *esProcessorSuite) TestIsErrorRetriable() {
s.Equal(test.expected, isResponseRetriable(test.input.Status))
}
}

func (s *esProcessorSuite) TestIsDeleteRequest() {
tests := []struct {
request bulk.GenericBulkableRequest
bIsDelete bool
}{
{
request: bulk.NewBulkIndexRequest().
ID("request.ID").
Index("request.Index").
Version(int64(0)).
VersionType("request.VersionType").Doc("request.Doc"),
bIsDelete: false,
},
{
request: bulk.NewBulkDeleteRequest().
ID("request.ID").
Index("request.Index"),
bIsDelete: true,
},
}
for _, test := range tests {
req, _ := test.request.Source()
s.Equal(test.bIsDelete, s.esProcessor.isDeleteRequest(req))
}
}

func (s *esProcessorSuite) TestIsDeleteRequest_Error() {
request := &MockBulkableRequest{}
s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return()
req, err := request.Source()
s.False(s.esProcessor.isDeleteRequest(req))
s.Error(err)
}

// MockBulkableRequest is a mock implementation of the GenericBulkableRequest interface
type MockBulkableRequest struct{}

// String returns a mock string
func (m *MockBulkableRequest) String() string {
return "mock request"
}

// Source returns an error to simulate a failure
func (m *MockBulkableRequest) Source() ([]string, error) {
return nil, fmt.Errorf("simulated source error")
}

0 comments on commit f6e4360

Please sign in to comment.