Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,14 @@
"segment/nuked/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count", "help": "Size in bytes of segments deleted via the Kill Task." },
"task/success/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of successful tasks per emission period."},
"task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of failed tasks per emission period."},
"task/running/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of current running tasks."},
"task/running/count" : { "dimensions" : ["dataSource", "category"], "type" : "count", "help": "Number of current running tasks."},
"task/pending/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of current pending tasks."},
"task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of current waiting tasks."},
"taskSlot/total/count" : { "dimensions" : ["category"], "type" : "gauge", "help": "Number of total task slots per emission period."},
"taskSlot/idle/count" : { "dimensions" : ["category"], "type" : "gauge", "help": "Number of idle task slots per emission period."},
"taskSlot/used/count" : { "dimensions" : ["category"], "type" : "gauge", "help": "Number of busy task slots per emission period."},
"taskSlot/lazy/count" : { "dimensions" : ["category"], "type" : "gauge", "help": "Number of total task slots in lazy marked Middle Managers and Indexers per emission period."},
"taskSlot/blacklisted/count" : { "dimensions" : ["category"], "type" : "gauge", "help": "Number of total task slots in blacklisted Middle Managers and Indexers per emission period."},
"supervisor/count" : { "dimensions" : ["supervisorId", "type", "state", "detailedState"], "type" : "gauge", "help": "Count of active supervisors. Each supervisor emits 1, tagged with its state. Available only if the SupervisorStatsMonitor module is included."},

"segment/assigned/count" : { "dimensions" : ["tier"], "type" : "count", "help": "Number of segments assigned to be loaded in the cluster."},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.server.metrics.TaskSlotCountStatsProvider;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -183,6 +184,17 @@ public Map<RowKey, Long> getRunningTaskCount()
Optional<TaskQueue> taskQueue = getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getRunningTaskCount();
} else {
return Collections.emptyMap();
}
}

@Override
public Map<String, Map<RowKey, Long>> getRunningTaskCountByCategory()
{
Optional<TaskQueue> taskQueue = getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getRunningTaskCountByCategory();
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,26 @@ public Map<RowKey, Long> getRunningTaskCount()
));
}

public Map<String, Map<RowKey, Long>> getRunningTaskCountByCategory()
{
final Map<String, Collection<? extends TaskRunnerWorkItem>> byCategory =
taskRunner.getRunningTasksByCategory();
if (byCategory.isEmpty()) {
return null;
}
final Map<String, RowKey> taskDatasources = getCurrentTaskDatasources();
final Map<String, Map<RowKey, Long>> result = new HashMap<>();
byCategory.forEach((category, items) -> {
final Map<RowKey, Long> countsByKey = new HashMap<>();
for (final TaskRunnerWorkItem item : items) {
final RowKey key = taskDatasources.getOrDefault(item.getTaskId(), RowKey.empty());
countsByKey.merge(key, 1L, Long::sum);
}
result.put(category, countsByKey);
});
return result;
}

public Map<RowKey, Long> getPendingTaskCount()
{
final Map<String, RowKey> taskDatasources = getCurrentTaskDatasources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -111,6 +112,10 @@ default RunnerTaskState getRunnerTaskState(String taskId)
return null;
}

default Map<String, Collection<? extends TaskRunnerWorkItem>> getRunningTasksByCategory()
{
return Collections.emptyMap();
}
default TaskLocation getTaskLocation(String taskId)
{
return TaskLocation.unknown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,19 @@ public int getTotalCapacity()
return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
}

@Override
@SuppressWarnings("GuardedBy") // Read on tasks is safe
public Map<String, Collection<? extends TaskRunnerWorkItem>> getRunningTasksByCategory()
{
final Map<String, List<HttpRemoteTaskRunnerWorkItem>> grouped = new HashMap<>();
for (final HttpRemoteTaskRunnerWorkItem item : tasks.values()) {
if (item.getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING && item.getWorker() != null) {
grouped.computeIfAbsent(item.getWorker().getCategory(), k -> new ArrayList<>()).add(item);
}
}
return new HashMap<>(grouped);
}


/**
* Retrieves the maximum capacity of the task runner when autoscaling is enabled.*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ public interface TaskCountStatsProvider
@Deprecated
Map<RowKey, Long> getWaitingTaskCount();

/**
* Return the number of current running tasks grouped by workerCategory, then by datasource and task type.
* The outer key is the worker category string; the inner map mirrors the structure of
* {@link #getRunningTaskCount()}. Returns null if the underlying implementation does not support
* per-category grouping (e.g., when not using a worker-pool-based task runner).
*/
default Map<String, Map<RowKey, Long>> getRunningTaskCountByCategory()
{
return null;
}

/**
* Collects all task level stats. This method deprecates the other task stats
* methods such as {@link #getPendingTaskCount()}, {@link #getWaitingTaskCount()}
Expand Down
Loading