Skip to content

Commit 4d94977

Browse files
committed
Fix non-deterministic error caused by upsert search attributes (#828)
1 parent eb10667 commit 4d94977

File tree

3 files changed

+120
-23
lines changed

3 files changed

+120
-23
lines changed

internal/internal_task_handlers.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,16 +1278,19 @@ func isDecisionMatchEvent(d *s.Decision, e *s.HistoryEvent, strictMode bool) boo
12781278
}
12791279
eventAttributes := e.UpsertWorkflowSearchAttributesEventAttributes
12801280
decisionAttributes := d.UpsertWorkflowSearchAttributesDecisionAttributes
1281-
if eventAttributes.SearchAttributes != decisionAttributes.SearchAttributes {
1282-
return false
1283-
}
1284-
return true
1285-
1281+
return isSearchAttributesMatched(eventAttributes.SearchAttributes, decisionAttributes.SearchAttributes)
12861282
}
12871283

12881284
return false
12891285
}
12901286

1287+
func isSearchAttributesMatched(attrFromEvent, attrFromDecision *s.SearchAttributes) bool {
1288+
if attrFromEvent != nil && attrFromDecision != nil {
1289+
return reflect.DeepEqual(attrFromEvent.IndexedFields, attrFromDecision.IndexedFields)
1290+
}
1291+
return attrFromEvent == nil && attrFromDecision == nil
1292+
}
1293+
12911294
// return true if the check fails:
12921295
// domain is not empty in decision
12931296
// and domain is not replayDomain

internal/internal_task_handlers_test.go

Lines changed: 111 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -992,27 +992,121 @@ func Test_NonDeterministicCheck(t *testing.T) {
992992

993993
func Test_IsDecisionMatchEvent_UpsertWorkflowSearchAttributes(t *testing.T) {
994994
diType := s.DecisionTypeUpsertWorkflowSearchAttributes
995-
searchAttr := &s.SearchAttributes{}
996-
decision := &s.Decision{
997-
DecisionType: &diType,
998-
UpsertWorkflowSearchAttributesDecisionAttributes: &s.UpsertWorkflowSearchAttributesDecisionAttributes{
999-
SearchAttributes: searchAttr,
995+
eType := s.EventTypeUpsertWorkflowSearchAttributes
996+
997+
testCases := []struct {
998+
name string
999+
decision *s.Decision
1000+
event *s.HistoryEvent
1001+
expected bool
1002+
}{
1003+
{
1004+
name: "event type not match",
1005+
decision: &s.Decision{
1006+
DecisionType: &diType,
1007+
UpsertWorkflowSearchAttributesDecisionAttributes: &s.UpsertWorkflowSearchAttributesDecisionAttributes{
1008+
SearchAttributes: &s.SearchAttributes{},
1009+
},
1010+
},
1011+
event: &s.HistoryEvent{},
1012+
expected: false,
1013+
},
1014+
{
1015+
name: "attributes not match",
1016+
decision: &s.Decision{
1017+
DecisionType: &diType,
1018+
UpsertWorkflowSearchAttributesDecisionAttributes: &s.UpsertWorkflowSearchAttributesDecisionAttributes{
1019+
SearchAttributes: &s.SearchAttributes{},
1020+
},
1021+
},
1022+
event: &s.HistoryEvent{
1023+
EventType: &eType,
1024+
UpsertWorkflowSearchAttributesEventAttributes: &s.UpsertWorkflowSearchAttributesEventAttributes{},
1025+
},
1026+
expected: false,
1027+
},
1028+
{
1029+
name: "attributes match",
1030+
decision: &s.Decision{
1031+
DecisionType: &diType,
1032+
UpsertWorkflowSearchAttributesDecisionAttributes: &s.UpsertWorkflowSearchAttributesDecisionAttributes{
1033+
SearchAttributes: &s.SearchAttributes{},
1034+
},
1035+
},
1036+
event: &s.HistoryEvent{
1037+
EventType: &eType,
1038+
UpsertWorkflowSearchAttributesEventAttributes: &s.UpsertWorkflowSearchAttributesEventAttributes{
1039+
SearchAttributes: &s.SearchAttributes{},
1040+
},
1041+
},
1042+
expected: true,
10001043
},
10011044
}
1002-
historyEvent := &s.HistoryEvent{}
1003-
ok := isDecisionMatchEvent(decision, historyEvent, false)
10041045

1005-
eType := s.EventTypeUpsertWorkflowSearchAttributes
1006-
historyEvent = &s.HistoryEvent{
1007-
EventType: &eType,
1008-
UpsertWorkflowSearchAttributesEventAttributes: &s.UpsertWorkflowSearchAttributesEventAttributes{},
1046+
for _, testCase := range testCases {
1047+
t.Run(testCase.name, func(t *testing.T) {
1048+
require.Equal(t, testCase.expected, isDecisionMatchEvent(testCase.decision, testCase.event, false))
1049+
})
1050+
}
1051+
}
1052+
1053+
func Test_IsSearchAttributesMatched(t *testing.T) {
1054+
testCases := []struct {
1055+
name string
1056+
lhs *s.SearchAttributes
1057+
rhs *s.SearchAttributes
1058+
expected bool
1059+
}{
1060+
{
1061+
name: "both nil",
1062+
lhs: nil,
1063+
rhs: nil,
1064+
expected: true,
1065+
},
1066+
{
1067+
name: "left nil",
1068+
lhs: nil,
1069+
rhs: &s.SearchAttributes{},
1070+
expected: false,
1071+
},
1072+
{
1073+
name: "right nil",
1074+
lhs: &s.SearchAttributes{},
1075+
rhs: nil,
1076+
expected: false,
1077+
},
1078+
{
1079+
name: "not match",
1080+
lhs: &s.SearchAttributes{
1081+
IndexedFields: map[string][]byte{
1082+
"key1": []byte("1"),
1083+
"key2": []byte("abc"),
1084+
},
1085+
},
1086+
rhs: &s.SearchAttributes{},
1087+
expected: false,
1088+
},
1089+
{
1090+
name: "match",
1091+
lhs: &s.SearchAttributes{
1092+
IndexedFields: map[string][]byte{
1093+
"key1": []byte("1"),
1094+
"key2": []byte("abc"),
1095+
},
1096+
},
1097+
rhs: &s.SearchAttributes{
1098+
IndexedFields: map[string][]byte{
1099+
"key2": []byte("abc"),
1100+
"key1": []byte("1"),
1101+
},
1102+
},
1103+
expected: true,
1104+
},
10091105
}
1010-
ok = isDecisionMatchEvent(decision, historyEvent, false)
1011-
require.False(t, ok)
10121106

1013-
historyEvent.UpsertWorkflowSearchAttributesEventAttributes = &s.UpsertWorkflowSearchAttributesEventAttributes{
1014-
SearchAttributes: searchAttr,
1107+
for _, testCase := range testCases {
1108+
t.Run(testCase.name, func(t *testing.T) {
1109+
require.Equal(t, testCase.expected, isSearchAttributesMatched(testCase.lhs, testCase.rhs))
1110+
})
10151111
}
1016-
ok = isDecisionMatchEvent(decision, historyEvent, false)
1017-
require.True(t, ok)
10181112
}

test/integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (ts *IntegrationTestSuite) SetupSuite() {
7373
func (ts *IntegrationTestSuite) TearDownSuite() {
7474
// sleep for a while to allow the pollers to shutdown
7575
// then assert that there are no lingering go routines
76-
time.Sleep(11 * time.Second)
76+
time.Sleep(20 * time.Second)
7777
// https://github.com/uber-go/cadence-client/issues/739
7878
goleak.VerifyNoLeaks(ts.T(), goleak.IgnoreTopFunction("go.uber.org/cadence/internal.(*coroutineState).initialYield"))
7979
}

0 commit comments

Comments
 (0)