diff --git a/common/persistence/cassandra/matching_task_store_queue.go b/common/persistence/cassandra/matching_task_store_queue.go index f5dc8238f0..d88fbdfad4 100644 --- a/common/persistence/cassandra/matching_task_store_queue.go +++ b/common/persistence/cassandra/matching_task_store_queue.go @@ -76,6 +76,20 @@ const ( `AND task_id = ? ` + `IF range_id = ?` + // templateCheckRangeIDQuery is a lightweight CAS query that only verifies the range_id + // fencing token without writing the task_queue metadata blob. Used by CreateTasks to + // reduce Paxos/Raft proposal payload size on the hot path. Metadata is flushed + // periodically by SyncState instead. + templateCheckRangeIDQuery = `UPDATE tasks_v2 SET ` + + `range_id = ? ` + + `WHERE namespace_id = ? ` + + `AND task_queue_name = ? ` + + `AND task_queue_type = ? ` + + `AND type = ? ` + + `AND pass = 0 ` + + `AND task_id = ? ` + + `IF range_id = ?` + templateUpdateTaskQueueQueryWithTTLPart1 = `INSERT INTO tasks_v2 ` + `(namespace_id, task_queue_name, task_queue_type, type, pass, task_id) ` + `VALUES (?, ?, ?, ?, 0, ?) USING TTL ?` diff --git a/common/persistence/cassandra/matching_task_store_v1.go b/common/persistence/cassandra/matching_task_store_v1.go index aa9816b6c7..03599e9ab8 100644 --- a/common/persistence/cassandra/matching_task_store_v1.go +++ b/common/persistence/cassandra/matching_task_store_v1.go @@ -90,11 +90,10 @@ func (d *matchingTaskStoreV1) CreateTasks( } } - // The following query is used to ensure that range_id didn't change - batch.Query(switchTasksTable(templateUpdateTaskQueueQuery, matchingTaskVersion1), + // Verify range_id fencing token without writing the task_queue metadata blob. + // Metadata is flushed periodically by SyncState. + batch.Query(switchTasksTable(templateCheckRangeIDQuery, matchingTaskVersion1), request.RangeID, - request.TaskQueueInfo.Data, - request.TaskQueueInfo.EncodingType.String(), namespaceID, taskQueue, taskQueueType, @@ -116,7 +115,7 @@ func (d *matchingTaskStoreV1) CreateTasks( } } - return &p.CreateTasksResponse{UpdatedMetadata: true}, nil + return &p.CreateTasksResponse{UpdatedMetadata: false}, nil } // GetTasks get a task diff --git a/common/persistence/cassandra/matching_task_store_v2.go b/common/persistence/cassandra/matching_task_store_v2.go index 4533742688..4239da092f 100644 --- a/common/persistence/cassandra/matching_task_store_v2.go +++ b/common/persistence/cassandra/matching_task_store_v2.go @@ -84,11 +84,10 @@ func (d *matchingTaskStoreV2) CreateTasks( task.Task.EncodingType.String()) } - // The following query is used to ensure that range_id didn't change - batch.Query(switchTasksTable(templateUpdateTaskQueueQuery, matchingTaskVersion2), + // Verify range_id fencing token without writing the task_queue metadata blob. + // Metadata is flushed periodically by SyncState. + batch.Query(switchTasksTable(templateCheckRangeIDQuery, matchingTaskVersion2), request.RangeID, - request.TaskQueueInfo.Data, - request.TaskQueueInfo.EncodingType.String(), namespaceID, taskQueue, taskQueueType, @@ -110,7 +109,7 @@ func (d *matchingTaskStoreV2) CreateTasks( } } - return &p.CreateTasksResponse{UpdatedMetadata: true}, nil + return &p.CreateTasksResponse{UpdatedMetadata: false}, nil } // GetTasks get a task diff --git a/common/persistence/task_manager.go b/common/persistence/task_manager.go index fc5f8e4f0a..b8e6b3e555 100644 --- a/common/persistence/task_manager.go +++ b/common/persistence/task_manager.go @@ -7,7 +7,6 @@ import ( "go.temporal.io/api/serviceerror" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/persistence/serialization" - "go.temporal.io/server/common/primitives/timestamp" ) // Subqueue zero corresponds to "the queue" before migrating metadata to subqueues. @@ -162,13 +161,6 @@ func (m *taskManagerImpl) CreateTasks( ctx context.Context, request *CreateTasksRequest, ) (*CreateTasksResponse, error) { - taskQueueInfo := request.TaskQueueInfo.Data - taskQueueInfo.LastUpdateTime = timestamp.TimeNowPtrUtc() - taskQueueInfoBlob, err := m.serializer.TaskQueueInfoToBlob(taskQueueInfo) - if err != nil { - return nil, err - } - tasks := make([]*InternalCreateTask, len(request.Tasks)) for i, task := range request.Tasks { taskBlob, err := m.serializer.TaskInfoToBlob(task) @@ -190,7 +182,7 @@ func (m *taskManagerImpl) CreateTasks( TaskQueue: request.TaskQueueInfo.Data.GetName(), TaskType: request.TaskQueueInfo.Data.GetTaskType(), RangeID: request.TaskQueueInfo.RangeID, - TaskQueueInfo: taskQueueInfoBlob, + TaskQueueInfo: nil, Tasks: tasks, } return m.taskStore.CreateTasks(ctx, internalRequest) diff --git a/common/persistence/tests/task_queue_task.go b/common/persistence/tests/task_queue_task.go index 937801898d..ba81aa88a1 100644 --- a/common/persistence/tests/task_queue_task.go +++ b/common/persistence/tests/task_queue_task.go @@ -233,6 +233,45 @@ func (s *TaskQueueTaskSuite) TestCreateDelete_Multiple() { s.Nil(resp.NextPageToken) } +func (s *TaskQueueTaskSuite) TestCreateTasks_DoesNotUpdateMetadata() { + rangeID := rand.Int63() + taskQueue := s.createTaskQueue(rangeID) + + // Read back the metadata as it was stored at creation time. + getResp, err := s.taskManager.GetTaskQueue(s.ctx, &p.GetTaskQueueRequest{ + NamespaceID: s.namespaceID, + TaskQueue: s.taskQueueName, + TaskType: s.taskQueueType, + }) + s.NoError(err) + originalAckLevel := getResp.TaskQueueInfo.AckLevel + + // CreateTasks with a modified AckLevel in the metadata blob. + modifiedQueue := *taskQueue + modifiedQueue.AckLevel = originalAckLevel + 999 + + taskID := rand.Int63() + task := s.randomTask(taskID) + resp, err := s.taskManager.CreateTasks(s.ctx, &p.CreateTasksRequest{ + TaskQueueInfo: &p.PersistedTaskQueueInfo{ + RangeID: rangeID, + Data: &modifiedQueue, + }, + Tasks: []*persistencespb.AllocatedTaskInfo{task}, + }) + s.NoError(err) + s.False(resp.UpdatedMetadata) + + // Verify the metadata blob in the DB still has the original AckLevel. + getResp2, err := s.taskManager.GetTaskQueue(s.ctx, &p.GetTaskQueueRequest{ + NamespaceID: s.namespaceID, + TaskQueue: s.taskQueueName, + TaskType: s.taskQueueType, + }) + s.NoError(err) + s.Equal(originalAckLevel, getResp2.TaskQueueInfo.AckLevel) +} + func (s *TaskQueueTaskSuite) createTaskQueue( rangeID int64, ) *persistencespb.TaskQueueInfo {