Skip to content

Commit ec5588f

Browse files
authored
Do not submit corrupted history tasks to scheduler (#6999)
1 parent 15ec192 commit ec5588f

File tree

6 files changed

+19
-7
lines changed

6 files changed

+19
-7
lines changed

common/metrics/defs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2407,6 +2407,7 @@ const (
24072407
ProcessingQueueSelectedDomainSplitCounter
24082408
ProcessingQueueRandomSplitCounter
24092409
ProcessingQueueThrottledCounter
2410+
CorruptedHistoryTaskCounter
24102411

24112412
QueueValidatorLostTaskCounter
24122413
QueueValidatorDropTaskCounter
@@ -3157,6 +3158,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
31573158
ProcessingQueueSelectedDomainSplitCounter: {metricName: "processing_queue_selected_domain_split_counter", metricType: Counter},
31583159
ProcessingQueueRandomSplitCounter: {metricName: "processing_queue_random_split_counter", metricType: Counter},
31593160
ProcessingQueueThrottledCounter: {metricName: "processing_queue_throttled_counter", metricType: Counter},
3161+
CorruptedHistoryTaskCounter: {metricName: "corrupted_history_task_counter", metricType: Counter},
31603162
QueueValidatorLostTaskCounter: {metricName: "queue_validator_lost_task_counter", metricType: Counter},
31613163
QueueValidatorDropTaskCounter: {metricName: "queue_validator_drop_task_counter", metricType: Counter},
31623164
QueueValidatorInvalidLoadCounter: {metricName: "queue_validator_invalid_load_counter", metricType: Counter},

service/history/queue/processing_queue.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/uber/cadence/common/log"
2929
"github.com/uber/cadence/common/log/tag"
3030
"github.com/uber/cadence/common/metrics"
31-
"github.com/uber/cadence/common/persistence"
3231
t "github.com/uber/cadence/common/task"
3332
"github.com/uber/cadence/service/history/task"
3433
)
@@ -192,11 +191,6 @@ func (q *processingQueueImpl) AddTasks(tasks map[task.Key]task.Task, newReadLeve
192191
continue
193192
}
194193

195-
if persistence.IsTaskCorrupted(task) {
196-
q.logger.Error("Processing queue encountered a corrupted task", tag.Dynamic("task", task))
197-
continue
198-
}
199-
200194
if !taskBelongsToProcessQueue(q.state, key, task) {
201195
errMsg := "Processing queue encountered a task doesn't belong to its scope"
202196
q.logger.Error(errMsg, tag.Error(

service/history/queue/timer_queue_processor_base.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,12 @@ func (t *timerQueueProcessorBase) processQueueCollections(levels map[int]struct{
305305
continue
306306
}
307307

308+
if persistence.IsTaskCorrupted(taskInfo) {
309+
t.logger.Error("Processing queue encountered a corrupted task", tag.Dynamic("task", taskInfo))
310+
t.metricsScope.IncCounter(metrics.CorruptedHistoryTaskCounter)
311+
continue
312+
}
313+
308314
task := t.taskInitializer(taskInfo)
309315
tasks[newTimerTaskKey(taskInfo.GetVisibilityTimestamp(), taskInfo.GetTaskID())] = task
310316
submitted, err := t.submitTask(task)

service/history/queue/transfer_queue_processor_base.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,12 @@ func (t *transferQueueProcessorBase) processQueueCollections() {
428428
continue
429429
}
430430

431+
if persistence.IsTaskCorrupted(taskInfo) {
432+
t.logger.Error("Processing queue encountered a corrupted task", tag.Dynamic("task", taskInfo))
433+
t.metricsScope.IncCounter(metrics.CorruptedHistoryTaskCounter)
434+
continue
435+
}
436+
431437
task := t.taskInitializer(taskInfo)
432438
tasks[newTransferTaskKey(taskInfo.GetTaskID())] = task
433439
submitted, err := t.submitTask(task)

service/history/queue/transfer_queue_processor_base_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,9 @@ func (s *transferQueueProcessorBaseSuite) TestProcessQueueCollections_WithNextPa
354354
taskInfos := []persistence.Task{
355355
&persistence.DecisionTask{
356356
WorkflowIdentifier: persistence.WorkflowIdentifier{
357-
DomainID: "testDomain1",
357+
DomainID: "testDomain1",
358+
WorkflowID: "testWorkflowID",
359+
RunID: "testRunID",
358360
},
359361
TaskData: persistence.TaskData{
360362
TaskID: 500,

service/history/queuev2/virtual_queue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() {
243243
for _, task := range tasks {
244244
if persistence.IsTaskCorrupted(task) {
245245
q.logger.Error("Virtual queue encountered a corrupted task", tag.Dynamic("task", task))
246+
q.metricsScope.IncCounter(metrics.CorruptedHistoryTaskCounter)
247+
task.Ack()
246248
continue
247249
}
248250

0 commit comments

Comments
 (0)