Skip to content

Commit

Permalink
Limit the speed of the generating stats task
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 committed Feb 7, 2025
1 parent 74890da commit 38b9145
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 2 deletions.
4 changes: 4 additions & 0 deletions internal/datacoord/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ func (jm *statsJobManager) triggerSortStatsTask() {
segments := jm.mt.SelectSegments(jm.ctx, SegmentFilterFunc(func(seg *SegmentInfo) bool {
return isFlush(seg) && seg.GetLevel() != datapb.SegmentLevel_L0 && !seg.GetIsSorted() && !seg.GetIsImporting()
}))

for _, segment := range segments {
if !segment.GetIsInvisible() && jm.scheduler.GetTaskCount() > Params.DataCoordCfg.StatsTaskTriggerCount.GetAsInt() {
continue

Check warning on line 105 in internal/datacoord/job_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/job_manager.go#L105

Added line #L105 was not covered by tests
}
jm.createSortStatsTaskForSegment(segment)
}
}
Expand Down
6 changes: 6 additions & 0 deletions internal/datacoord/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ func (s *taskScheduler) enqueue(task Task) {
}
}

func (s *taskScheduler) GetTaskCount() int {
s.RLock()
defer s.RUnlock()
return len(s.tasks)
}

func (s *taskScheduler) AbortTask(taskID int64) {
log.Info("task scheduler receive abort task request", zap.Int64("taskID", taskID))
s.RLock()
Expand Down
15 changes: 13 additions & 2 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3450,8 +3450,9 @@ type dataCoordConfig struct {
MixCompactionSlotUsage ParamItem `refreshable:"true"`
L0DeleteCompactionSlotUsage ParamItem `refreshable:"true"`

EnableStatsTask ParamItem `refreshable:"true"`
TaskCheckInterval ParamItem `refreshable:"true"`
EnableStatsTask ParamItem `refreshable:"true"`
TaskCheckInterval ParamItem `refreshable:"true"`
StatsTaskTriggerCount ParamItem `refreshable:"true"`
}

func (p *dataCoordConfig) init(base *BaseTable) {
Expand Down Expand Up @@ -4335,6 +4336,16 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Export: false,
}
p.TaskCheckInterval.Init(base.mgr)

p.StatsTaskTriggerCount = ParamItem{
Key: "dataCoord.statsTaskTriggerCount",
Version: "2.5.5",
Doc: "stats task count per trigger",
DefaultValue: "100",
PanicIfEmpty: false,
Export: false,
}
p.StatsTaskTriggerCount.Init(base.mgr)
}

// /////////////////////////////////////////////////////////////////////////////
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,13 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 4, Params.L0DeleteCompactionSlotUsage.GetAsInt())
params.Save("datacoord.scheduler.taskSlowThreshold", "1000")
assert.Equal(t, 1000*time.Second, Params.TaskSlowThreshold.GetAsDuration(time.Second))

params.Save("datacoord.statsTask.enable", "true")
assert.True(t, Params.EnableStatsTask.GetAsBool())
params.Save("datacoord.taskCheckInterval", "500")
assert.Equal(t, 500*time.Second, Params.TaskCheckInterval.GetAsDuration(time.Second))
params.Save("datacoord.statsTaskTriggerCount", "3")
assert.Equal(t, 3, Params.StatsTaskTriggerCount.GetAsInt())
})

t.Run("test dataNodeConfig", func(t *testing.T) {
Expand Down

0 comments on commit 38b9145

Please sign in to comment.