Skip to content

Commit 67d0b0e

Browse files
yuppie-flugianm
authored andcommitted
Add taskType dimension to task metrics (apache#5664)
1 parent a95ec92 commit 67d0b0e

File tree

8 files changed

+85
-57
lines changed

8 files changed

+85
-57
lines changed

docs/content/operations/metrics.md

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -105,22 +105,23 @@ These metrics are only available if the RealtimeMetricsMonitor is included in th
105105

106106
|Metric|Description|Dimensions|Normal Value|
107107
|------|-----------|----------|------------|
108-
|`ingest/events/thrownAway`|Number of events rejected because they are outside the windowPeriod.|dataSource.|0|
109-
|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|dataSource.|0|
110-
|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|dataSource.|0|
111-
|`ingest/events/processed`|Number of events successfully processed per emission period.|dataSource.|Equal to your # of events per emission period.|
112-
|`ingest/rows/output`|Number of Druid rows persisted.|dataSource.|Your # of events with rollup.|
113-
|`ingest/persists/count`|Number of times persist occurred.|dataSource.|Depends on configuration.|
114-
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource.|Depends on configuration. Generally a few minutes at most.|
115-
|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|dataSource.|Depends on configuration. Generally a few minutes at most.|
116-
|`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and blocking waiting for them to finish.|dataSource.|0 or very low|
117-
|`ingest/persists/failed`|Number of persists that failed.|dataSource.|0|
118-
|`ingest/handoff/failed`|Number of handoffs that failed.|dataSource.|0|
119-
|`ingest/merge/time`|Milliseconds spent merging intermediate segments|dataSource.|Depends on configuration. Generally a few minutes at most.|
120-
|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|dataSource.|Depends on configuration. Generally a few minutes at most.|
121-
|`ingest/handoff/count`|Number of handoffs that happened.|dataSource.|Varies. Generally greater than 0 once every segment granular period if cluster operating normally|
122-
|`ingest/sink/count`|Number of sinks not handoffed.|dataSource.|1~3|
123-
|`ingest/events/messageGap`|Time gap between the data time in event and current system time.|dataSource.|Greater than 0, depends on the time carried in event |
108+
|`ingest/events/thrownAway`|Number of events rejected because they are outside the windowPeriod.|dataSource, taskId, taskType.|0|
109+
|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|dataSource, taskId, taskType.|0|
110+
|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|dataSource, taskId, taskType.|0|
111+
|`ingest/events/processed`|Number of events successfully processed per emission period.|dataSource, taskId, taskType.|Equal to your # of events per
112+
emission period.|
113+
|`ingest/rows/output`|Number of Druid rows persisted.|dataSource, taskId, taskType.|Your # of events with rollup.|
114+
|`ingest/persists/count`|Number of times persist occurred.|dataSource, taskId, taskType.|Depends on configuration.|
115+
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource, taskId, taskType.|Depends on configuration. Generally a few minutes at most.|
116+
|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|dataSource, taskId, taskType.|Depends on configuration. Generally a few minutes at most.|
117+
|`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and blocking waiting for them to finish.|dataSource, taskId, taskType.|0 or very low|
118+
|`ingest/persists/failed`|Number of persists that failed.|dataSource, taskId, taskType.|0|
119+
|`ingest/handoff/failed`|Number of handoffs that failed.|dataSource, taskId, taskType.|0|
120+
|`ingest/merge/time`|Milliseconds spent merging intermediate segments|dataSource, taskId, taskType.|Depends on configuration. Generally a few minutes at most.|
121+
|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|dataSource, taskId, taskType.|Depends on configuration. Generally a few minutes at most.|
122+
|`ingest/handoff/count`|Number of handoffs that happened.|dataSource, taskId, taskType.|Varies. Generally greater than 0 once every segment granular period if cluster operating normally|
123+
|`ingest/sink/count`|Number of sinks not handoffed.|dataSource, taskId, taskType.|1~3|
124+
|`ingest/events/messageGap`|Time gap between the data time in event and current system time.|dataSource, taskId, taskType.|Greater than 0, depends on the time carried in event |
124125
|`ingest/kafka/lag`|Applicable for Kafka Indexing Service. Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, should not be a very high number |
125126

126127

@@ -201,8 +202,10 @@ The following metric is only available if the EventReceiverFirehoseMonitor modul
201202

202203
|Metric|Description|Dimensions|Normal Value|
203204
|------|-----------|----------|------------|
204-
|`ingest/events/buffered`|Number of events queued in the EventReceiverFirehose's buffer|serviceName, dataSource, taskId, bufferCapacity.|Equal to current # of events in the buffer queue.|
205-
|`ingest/bytes/received`|Number of bytes received by the EventReceiverFirehose.|serviceName, dataSource, taskId.|Varies.|
205+
|`ingest/events/buffered`|Number of events queued in the EventReceiverFirehose's buffer|serviceName, dataSource, taskId, taskType, bufferCapacity
206+
.|Equal
207+
to current # of events in the buffer queue.|
208+
|`ingest/bytes/received`|Number of bytes received by the EventReceiverFirehose.|serviceName, dataSource, taskId, taskType.|Varies.|
206209

207210
## Sys
208211

extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
5656
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
5757
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
58+
import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
5859
import io.druid.indexing.common.TaskReport;
5960
import io.druid.indexing.common.TaskStatus;
6061
import io.druid.indexing.common.TaskToolbox;
@@ -77,7 +78,6 @@
7778
import io.druid.java.util.common.guava.Sequence;
7879
import io.druid.java.util.common.parsers.ParseException;
7980
import io.druid.java.util.emitter.EmittingLogger;
80-
import io.druid.query.DruidMetrics;
8181
import io.druid.query.NoopQueryRunner;
8282
import io.druid.query.Query;
8383
import io.druid.query.QueryPlus;
@@ -87,7 +87,6 @@
8787
import io.druid.segment.realtime.FireDepartment;
8888
import io.druid.segment.realtime.FireDepartmentMetrics;
8989
import io.druid.segment.realtime.FireDepartmentMetricsTaskMetricsGetter;
90-
import io.druid.segment.realtime.RealtimeMetricsMonitor;
9190
import io.druid.segment.realtime.appenderator.Appenderator;
9291
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
9392
import io.druid.segment.realtime.appenderator.Appenderators;
@@ -513,12 +512,7 @@ private TaskStatus runInternal(final TaskToolbox toolbox) throws Exception
513512
);
514513
fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
515514
metricsGetter = new FireDepartmentMetricsTaskMetricsGetter(fireDepartmentMetrics);
516-
toolbox.getMonitorScheduler().addMonitor(
517-
new RealtimeMetricsMonitor(
518-
ImmutableList.of(fireDepartmentForMetrics),
519-
ImmutableMap.of(DruidMetrics.TASK_ID, new String[]{getId()})
520-
)
521-
);
515+
toolbox.getMonitorScheduler().addMonitor(TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics));
522516

523517
LookupNodeService lookupNodeService = getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER) == null ?
524518
toolbox.getLookupNodeService() :
@@ -957,12 +951,7 @@ private TaskStatus runInternalLegacy(final TaskToolbox toolbox) throws Exception
957951
);
958952
fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
959953
metricsGetter = new FireDepartmentMetricsTaskMetricsGetter(fireDepartmentMetrics);
960-
toolbox.getMonitorScheduler().addMonitor(
961-
new RealtimeMetricsMonitor(
962-
ImmutableList.of(fireDepartmentForMetrics),
963-
ImmutableMap.of(DruidMetrics.TASK_ID, new String[]{getId()})
964-
)
965-
);
954+
toolbox.getMonitorScheduler().addMonitor(TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics));
966955

967956
LookupNodeService lookupNodeService = getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER) == null ?
968957
toolbox.getLookupNodeService() :
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Metamarkets licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.druid.indexing.common;
21+
22+
import com.google.common.collect.ImmutableList;
23+
import com.google.common.collect.ImmutableMap;
24+
import io.druid.indexing.common.task.Task;
25+
import io.druid.query.DruidMetrics;
26+
import io.druid.segment.realtime.FireDepartment;
27+
import io.druid.segment.realtime.RealtimeMetricsMonitor;
28+
29+
public class TaskRealtimeMetricsMonitorBuilder
30+
{
31+
private TaskRealtimeMetricsMonitorBuilder() {}
32+
33+
public static RealtimeMetricsMonitor build(Task task, FireDepartment fireDepartment)
34+
{
35+
return new RealtimeMetricsMonitor(
36+
ImmutableList.of(fireDepartment),
37+
ImmutableMap.of(
38+
DruidMetrics.TASK_ID, new String[]{task.getId()},
39+
DruidMetrics.TASK_TYPE, new String[]{task.getType()}
40+
)
41+
);
42+
}
43+
}

indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.google.common.base.Optional;
2727
import com.google.common.base.Supplier;
2828
import com.google.common.base.Throwables;
29-
import com.google.common.collect.ImmutableList;
3029
import com.google.common.collect.ImmutableMap;
3130
import com.google.common.collect.Maps;
3231
import com.google.common.util.concurrent.Futures;
@@ -44,6 +43,7 @@
4443
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
4544
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
4645
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
46+
import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
4747
import io.druid.indexing.common.TaskReport;
4848
import io.druid.indexing.common.TaskStatus;
4949
import io.druid.indexing.common.TaskToolbox;
@@ -58,7 +58,6 @@
5858
import io.druid.java.util.common.guava.CloseQuietly;
5959
import io.druid.java.util.common.parsers.ParseException;
6060
import io.druid.java.util.emitter.EmittingLogger;
61-
import io.druid.query.DruidMetrics;
6261
import io.druid.query.NoopQueryRunner;
6362
import io.druid.query.Query;
6463
import io.druid.query.QueryRunner;
@@ -249,12 +248,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
249248
dataSchema, new RealtimeIOConfig(null, null, null), null
250249
);
251250

252-
final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(
253-
ImmutableList.of(fireDepartmentForMetrics),
254-
ImmutableMap.of(
255-
DruidMetrics.TASK_ID, new String[]{getId()}
256-
)
257-
);
251+
final RealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics);
258252

259253
this.metrics = fireDepartmentForMetrics.getMetrics();
260254
metricsGetter = new FireDepartmentMetricsTaskMetricsGetter(metrics);

indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.google.common.base.Preconditions;
3131
import com.google.common.base.Throwables;
3232
import com.google.common.collect.ImmutableList;
33-
import com.google.common.collect.ImmutableMap;
3433
import com.google.common.collect.Iterables;
3534
import com.google.common.collect.Maps;
3635
import com.google.common.hash.HashFunction;
@@ -48,6 +47,7 @@
4847
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
4948
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
5049
import io.druid.indexing.common.TaskLock;
50+
import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
5151
import io.druid.indexing.common.TaskReport;
5252
import io.druid.indexing.common.TaskStatus;
5353
import io.druid.indexing.common.TaskToolbox;
@@ -62,7 +62,6 @@
6262
import io.druid.java.util.common.guava.Comparators;
6363
import io.druid.java.util.common.logger.Logger;
6464
import io.druid.java.util.common.parsers.ParseException;
65-
import io.druid.query.DruidMetrics;
6665
import io.druid.segment.IndexSpec;
6766
import io.druid.segment.indexing.DataSchema;
6867
import io.druid.segment.indexing.IOConfig;
@@ -843,12 +842,8 @@ dataSchema, new RealtimeIOConfig(null, null, null), null
843842
buildSegmentsMetricsGetter = new FireDepartmentMetricsTaskMetricsGetter(buildSegmentsFireDepartmentMetrics);
844843

845844
if (toolbox.getMonitorScheduler() != null) {
846-
toolbox.getMonitorScheduler().addMonitor(
847-
new RealtimeMetricsMonitor(
848-
ImmutableList.of(fireDepartmentForMetrics),
849-
ImmutableMap.of(DruidMetrics.TASK_ID, new String[]{getId()})
850-
)
851-
);
845+
final RealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics);
846+
toolbox.getMonitorScheduler().addMonitor(metricsMonitor);
852847
}
853848

854849
final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();

indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.druid.discovery.LookupNodeService;
3636
import io.druid.indexing.common.TaskLock;
3737
import io.druid.indexing.common.TaskLockType;
38+
import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
3839
import io.druid.indexing.common.TaskStatus;
3940
import io.druid.indexing.common.TaskToolbox;
4041
import io.druid.indexing.common.actions.LockAcquireAction;
@@ -44,7 +45,6 @@
4445
import io.druid.java.util.common.StringUtils;
4546
import io.druid.java.util.common.guava.CloseQuietly;
4647
import io.druid.java.util.emitter.EmittingLogger;
47-
import io.druid.query.DruidMetrics;
4848
import io.druid.query.FinalizeResultsQueryRunner;
4949
import io.druid.query.Query;
5050
import io.druid.query.QueryRunner;
@@ -320,12 +320,8 @@ public String getVersion(final Interval interval)
320320
tuningConfig
321321
);
322322
this.metrics = fireDepartment.getMetrics();
323-
final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(
324-
ImmutableList.of(fireDepartment),
325-
ImmutableMap.of(
326-
DruidMetrics.TASK_ID, new String[]{getId()}
327-
)
328-
);
323+
final RealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartment);
324+
329325
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();
330326

331327
// NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means

indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ public TaskStatus call()
358358
}
359359
}
360360

361-
// Add dataSource and taskId for metrics or logging
361+
// Add dataSource, taskId and taskType for metrics or logging
362362
command.add(
363363
StringUtils.format(
364364
"-D%s%s=%s",
@@ -375,6 +375,14 @@ public TaskStatus call()
375375
task.getId()
376376
)
377377
);
378+
command.add(
379+
StringUtils.format(
380+
"-D%s%s=%s",
381+
MonitorsConfig.METRIC_DIMENSION_PREFIX,
382+
DruidMetrics.TASK_TYPE,
383+
task.getType()
384+
)
385+
);
378386

379387
command.add(StringUtils.format("-Ddruid.host=%s", childHost));
380388
command.add(StringUtils.format("-Ddruid.port=%d", childPort));

server/src/main/java/io/druid/server/metrics/EventReceiverFirehoseMonitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public EventReceiverFirehoseMonitor(
4848
this.register = eventReceiverFirehoseRegister;
4949
this.dimensions = MonitorsConfig.extractDimensions(
5050
props,
51-
Lists.newArrayList(DruidMetrics.DATASOURCE, DruidMetrics.TASK_ID)
51+
Lists.newArrayList(DruidMetrics.DATASOURCE, DruidMetrics.TASK_ID, DruidMetrics.TASK_TYPE)
5252
);
5353
}
5454

0 commit comments

Comments
 (0)