Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions common/persistence/cassandra/matching_task_store_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ const (
`AND task_id = ? ` +
`IF range_id = ?`

// templateCheckRangeIDQuery is a lightweight CAS query that only verifies the range_id
// fencing token without writing the task_queue metadata blob. Used by CreateTasks to
// reduce Paxos/Raft proposal payload size on the hot path. Metadata is flushed
// periodically by SyncState instead.
templateCheckRangeIDQuery = `UPDATE tasks_v2 SET ` +
`range_id = ? ` +
`WHERE namespace_id = ? ` +
`AND task_queue_name = ? ` +
`AND task_queue_type = ? ` +
`AND type = ? ` +
`AND pass = 0 ` +
`AND task_id = ? ` +
`IF range_id = ?`

templateUpdateTaskQueueQueryWithTTLPart1 = `INSERT INTO tasks_v2 ` +
`(namespace_id, task_queue_name, task_queue_type, type, pass, task_id) ` +
`VALUES (?, ?, ?, ?, 0, ?) USING TTL ?`
Expand Down
9 changes: 4 additions & 5 deletions common/persistence/cassandra/matching_task_store_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,10 @@ func (d *matchingTaskStoreV1) CreateTasks(
}
}

// The following query is used to ensure that range_id didn't change
batch.Query(switchTasksTable(templateUpdateTaskQueueQuery, matchingTaskVersion1),
// Verify range_id fencing token without writing the task_queue metadata blob.
// Metadata is flushed periodically by SyncState.
batch.Query(switchTasksTable(templateCheckRangeIDQuery, matchingTaskVersion1),
request.RangeID,
request.TaskQueueInfo.Data,
request.TaskQueueInfo.EncodingType.String(),
namespaceID,
taskQueue,
taskQueueType,
Expand All @@ -116,7 +115,7 @@ func (d *matchingTaskStoreV1) CreateTasks(
}
}

return &p.CreateTasksResponse{UpdatedMetadata: true}, nil
return &p.CreateTasksResponse{UpdatedMetadata: false}, nil
}

// GetTasks get a task
Expand Down
9 changes: 4 additions & 5 deletions common/persistence/cassandra/matching_task_store_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,10 @@ func (d *matchingTaskStoreV2) CreateTasks(
task.Task.EncodingType.String())
}

// The following query is used to ensure that range_id didn't change
batch.Query(switchTasksTable(templateUpdateTaskQueueQuery, matchingTaskVersion2),
// Verify range_id fencing token without writing the task_queue metadata blob.
// Metadata is flushed periodically by SyncState.
batch.Query(switchTasksTable(templateCheckRangeIDQuery, matchingTaskVersion2),
request.RangeID,
request.TaskQueueInfo.Data,
request.TaskQueueInfo.EncodingType.String(),
namespaceID,
taskQueue,
taskQueueType,
Expand All @@ -110,7 +109,7 @@ func (d *matchingTaskStoreV2) CreateTasks(
}
}

return &p.CreateTasksResponse{UpdatedMetadata: true}, nil
return &p.CreateTasksResponse{UpdatedMetadata: false}, nil
}

// GetTasks get a task
Expand Down
10 changes: 1 addition & 9 deletions common/persistence/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"go.temporal.io/api/serviceerror"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/primitives/timestamp"
)

// Subqueue zero corresponds to "the queue" before migrating metadata to subqueues.
Expand Down Expand Up @@ -162,13 +161,6 @@ func (m *taskManagerImpl) CreateTasks(
ctx context.Context,
request *CreateTasksRequest,
) (*CreateTasksResponse, error) {
taskQueueInfo := request.TaskQueueInfo.Data
taskQueueInfo.LastUpdateTime = timestamp.TimeNowPtrUtc()
taskQueueInfoBlob, err := m.serializer.TaskQueueInfoToBlob(taskQueueInfo)
if err != nil {
return nil, err
}

tasks := make([]*InternalCreateTask, len(request.Tasks))
for i, task := range request.Tasks {
taskBlob, err := m.serializer.TaskInfoToBlob(task)
Expand All @@ -190,7 +182,7 @@ func (m *taskManagerImpl) CreateTasks(
TaskQueue: request.TaskQueueInfo.Data.GetName(),
TaskType: request.TaskQueueInfo.Data.GetTaskType(),
RangeID: request.TaskQueueInfo.RangeID,
TaskQueueInfo: taskQueueInfoBlob,
TaskQueueInfo: nil,
Tasks: tasks,
}
return m.taskStore.CreateTasks(ctx, internalRequest)
Expand Down
39 changes: 39 additions & 0 deletions common/persistence/tests/task_queue_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,45 @@ func (s *TaskQueueTaskSuite) TestCreateDelete_Multiple() {
s.Nil(resp.NextPageToken)
}

func (s *TaskQueueTaskSuite) TestCreateTasks_DoesNotUpdateMetadata() {
rangeID := rand.Int63()
taskQueue := s.createTaskQueue(rangeID)

// Read back the metadata as it was stored at creation time.
getResp, err := s.taskManager.GetTaskQueue(s.ctx, &p.GetTaskQueueRequest{
NamespaceID: s.namespaceID,
TaskQueue: s.taskQueueName,
TaskType: s.taskQueueType,
})
s.NoError(err)
originalAckLevel := getResp.TaskQueueInfo.AckLevel

// CreateTasks with a modified AckLevel in the metadata blob.
modifiedQueue := *taskQueue
modifiedQueue.AckLevel = originalAckLevel + 999

taskID := rand.Int63()
task := s.randomTask(taskID)
resp, err := s.taskManager.CreateTasks(s.ctx, &p.CreateTasksRequest{
TaskQueueInfo: &p.PersistedTaskQueueInfo{
RangeID: rangeID,
Data: &modifiedQueue,
},
Tasks: []*persistencespb.AllocatedTaskInfo{task},
})
s.NoError(err)
s.False(resp.UpdatedMetadata)

// Verify the metadata blob in the DB still has the original AckLevel.
getResp2, err := s.taskManager.GetTaskQueue(s.ctx, &p.GetTaskQueueRequest{
NamespaceID: s.namespaceID,
TaskQueue: s.taskQueueName,
TaskType: s.taskQueueType,
})
s.NoError(err)
s.Equal(originalAckLevel, getResp2.TaskQueueInfo.AckLevel)
}

func (s *TaskQueueTaskSuite) createTaskQueue(
rangeID int64,
) *persistencespb.TaskQueueInfo {
Expand Down
Loading