Skip to content

Commit

Permalink
Flow worker utilization probe (#16532)
Browse files Browse the repository at this point in the history
* flow: refactor pipeline refs to keep worker flows separate

* health: add worker_utilization probe

pipeline is:
  - RED "completely blocked" when last_5_minutes >= 99.999
  - YELLOW "nearly blocked" when last_5_minutes > 95
    - and inludes "recovering" info when last_1_minute < 80
  - YELLOW "completely blocked" when last_1_minute >= 99.999
  - YELLOW "nearly blocked" when last_1_minute > 95

* tests: improve coverage of PipelineIndicator probes

* Apply suggestions from code review
  • Loading branch information
yaauie authored Oct 11, 2024
1 parent 0657696 commit a931b2c
Show file tree
Hide file tree
Showing 7 changed files with 441 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[[health-report-pipeline-flow-worker-utilization]]
=== Health Report Pipeline Flow: Worker Utilization

The Pipeline indicator has a `flow:worker_utilization` probe that is capable of producing one of several diagnoses about blockages in the pipeline.

A pipeline is considered "blocked" when its workers are fully-utilized, because if they are consistently spending 100% of their time processing events, they are unable to pick up new events from the queue.
This can cause back-pressure to cascade to upstream services, which can result in data loss or duplicate processing depending on upstream configuration.

The issue typically stems from one or more causes:

* a downstream resource being blocked,
* a plugin consuming more resources than expected, and/or
* insufficient resources being allocated to the pipeline.

To address the issue, observe the <<plugin-flow-rates>> from the <<node-stats-api>>, and identify which plugins have the highest `worker_utilization`.
This will tell you which plugins are spending the most of the pipeline's worker resources.

* If the offending plugin connects to a downstream service or another pipeline that is exerting back-pressure, the issue needs to be addressed in the downstream service or pipeline.
* If the offending plugin connects to a downstream service with high network latency, throughput for the pipeline may be improved by <<tuning-logstash-settings, allocating more worker resources to the pipeline>>.
* If the offending plugin is a computation-heavy filter such as `grok` or `kv`, its configuration may need to be tuned to eliminate wasted computation.

[[health-report-pipeline-flow-worker-utilization-diagnosis-blocked-5m]]
==== [[blocked-5m]]Blocked Pipeline (5 minutes)

A pipeline that has been completely blocked for five minutes or more represents a critical blockage to the flow of events through your pipeline that needs to be addressed immediately to avoid or limit data loss.
See above for troubleshooting steps.

[[health-report-pipeline-flow-worker-utilization-diagnosis-nearly-blocked-5m]]
==== [[nearly-blocked-5m]]Nearly Blocked Pipeline (5 minutes)

A pipeline that has been nearly blocked for five minutes or more may be creating intermittent blockage to the flow of events through your pipeline, which can result in the risk of data loss.
See above for troubleshooting steps.

[[health-report-pipeline-flow-worker-utilization-diagnosis-blocked-1m]]
==== [[blocked-1m]]Blocked Pipeline (1 minute)

A pipeline that has been completely blocked for one minute or more represents a high-risk or upcoming blockage to the flow of events through your pipeline that likely needs to be addressed soon to avoid or limit data loss.
See above for troubleshooting steps.

[[health-report-pipeline-flow-worker-utilization-diagnosis-nearly-blocked-1m]]
==== [[nearly-blocked-1m]]Nearly Blocked Pipeline (1 minute)

A pipeline that has been nearly blocked for one minute or more may be creating intermittent blockage to the flow of events through your pipeline, which can result in the risk of data loss.
See above for troubleshooting steps.
1 change: 1 addition & 0 deletions docs/static/troubleshoot/troubleshooting.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ include::ts-plugins-general.asciidoc[]
include::ts-plugins.asciidoc[]
include::ts-other-issues.asciidoc[]
include::health-pipeline-status.asciidoc[]
include::health-pipeline-flow-worker-utilization.asciidoc[]
20 changes: 10 additions & 10 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,17 @@ def pipeline_details(pipeline_id)
return PipelineIndicator::Details.new(PipelineIndicator::Status::UNKNOWN)
end

status = pipeline_state.synchronize do |sync_state|
case
when sync_state.loading? then PipelineIndicator::Status::LOADING
when sync_state.crashed? then PipelineIndicator::Status::TERMINATED
when sync_state.running? then PipelineIndicator::Status::RUNNING
when sync_state.finished? then PipelineIndicator::Status::FINISHED
else PipelineIndicator::Status::UNKNOWN
end
pipeline_state.synchronize do |sync_state|
status = case
when sync_state.loading? then PipelineIndicator::Status::LOADING
when sync_state.crashed? then PipelineIndicator::Status::TERMINATED
when sync_state.running? then PipelineIndicator::Status::RUNNING
when sync_state.finished? then PipelineIndicator::Status::FINISHED
else PipelineIndicator::Status::UNKNOWN
end

PipelineIndicator::Details.new(status, sync_state.pipeline&.to_java.collectWorkerUtilizationFlowObservation)
end

return PipelineIndicator::Details.new(status)
end

def auto_reload?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -82,6 +86,7 @@
import org.logstash.execution.queue.QueueWriter;
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
import org.logstash.ext.JRubyWrappedWriteClientExt;
import org.logstash.health.PipelineIndicator;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.FlowMetric;
Expand Down Expand Up @@ -161,7 +166,7 @@ public class AbstractPipelineExt extends RubyBasicObject {

private QueueReadClientBase filterQueueClient;

private ArrayList<FlowMetric> flowMetrics = new ArrayList<>();
private transient final ScopedFlowMetrics scopedFlowMetrics = new ScopedFlowMetrics();
private @SuppressWarnings("rawtypes") RubyArray inputs;
private @SuppressWarnings("rawtypes") RubyArray filters;
private @SuppressWarnings("rawtypes") RubyArray outputs;
Expand Down Expand Up @@ -563,34 +568,34 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) {

final LongCounter eventsInCounter = initOrGetCounterMetric(context, eventsNamespace, IN_KEY);
final FlowMetric inputThroughput = createFlowMetric(INPUT_THROUGHPUT_KEY, eventsInCounter, uptimeInPreciseSeconds);
this.flowMetrics.add(inputThroughput);
this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, inputThroughput);
storeMetric(context, flowNamespace, inputThroughput);

final LongCounter eventsFilteredCounter = initOrGetCounterMetric(context, eventsNamespace, FILTERED_KEY);
final FlowMetric filterThroughput = createFlowMetric(FILTER_THROUGHPUT_KEY, eventsFilteredCounter, uptimeInPreciseSeconds);
this.flowMetrics.add(filterThroughput);
this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, filterThroughput);
storeMetric(context, flowNamespace, filterThroughput);

final LongCounter eventsOutCounter = initOrGetCounterMetric(context, eventsNamespace, OUT_KEY);
final FlowMetric outputThroughput = createFlowMetric(OUTPUT_THROUGHPUT_KEY, eventsOutCounter, uptimeInPreciseSeconds);
this.flowMetrics.add(outputThroughput);
this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, outputThroughput);
storeMetric(context, flowNamespace, outputThroughput);

final TimerMetric queuePushWaitInMillis = initOrGetTimerMetric(context, eventsNamespace, PUSH_DURATION_KEY);
final FlowMetric backpressureFlow = createFlowMetric(QUEUE_BACKPRESSURE_KEY, queuePushWaitInMillis, uptimeInPreciseMillis);
this.flowMetrics.add(backpressureFlow);
this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, backpressureFlow);
storeMetric(context, flowNamespace, backpressureFlow);

final TimerMetric durationInMillis = initOrGetTimerMetric(context, eventsNamespace, DURATION_IN_MILLIS_KEY);
final FlowMetric concurrencyFlow = createFlowMetric(WORKER_CONCURRENCY_KEY, durationInMillis, uptimeInPreciseMillis);
this.flowMetrics.add(concurrencyFlow);
this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, concurrencyFlow);
storeMetric(context, flowNamespace, concurrencyFlow);

final int workerCount = getSetting(context, SettingKeyDefinitions.PIPELINE_WORKERS).convertToInteger().getIntValue();
final UpScaledMetric percentScaledDurationInMillis = new UpScaledMetric(durationInMillis, 100);
final UpScaledMetric availableWorkerTimeInMillis = new UpScaledMetric(uptimeInPreciseMillis, workerCount);
final FlowMetric utilizationFlow = createFlowMetric(WORKER_UTILIZATION_KEY, percentScaledDurationInMillis, availableWorkerTimeInMillis);
this.flowMetrics.add(utilizationFlow);
this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, utilizationFlow);
storeMetric(context, flowNamespace, utilizationFlow);

initializePqFlowMetrics(context, flowNamespace, uptimeMetric);
Expand All @@ -600,10 +605,22 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) {

@JRubyMethod(name = "collect_flow_metrics")
public final IRubyObject collectFlowMetrics(final ThreadContext context) {
this.flowMetrics.forEach(FlowMetric::capture);
this.scopedFlowMetrics.captureAll();
return context.nil;
}

// short-term limits the scope of what is included in the flow observations
public final PipelineIndicator.FlowObservation collectWorkerUtilizationFlowObservation() {
return this.collectFlowObservation(WORKER_UTILIZATION_KEY.asJavaString()::equals);
}

public final PipelineIndicator.FlowObservation collectFlowObservation(final Predicate<String> filter) {
Map<String, Map<String, Double>> collect = this.scopedFlowMetrics.getFlowMetrics(ScopedFlowMetrics.Scope.WORKER).stream()
.filter(fm -> filter.test(fm.getName()))
.collect(Collectors.toUnmodifiableMap(FlowMetric::getName, FlowMetric::getValue));
return new PipelineIndicator.FlowObservation(collect);
}

private static FlowMetric createFlowMetric(final RubySymbol name,
final Metric<? extends Number> numeratorMetric,
final Metric<? extends Number> denominatorMetric) {
Expand Down Expand Up @@ -671,12 +688,13 @@ private void initializePqFlowMetrics(final ThreadContext context, final RubySymb

final Supplier<NumberGauge> eventsGaugeMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueNamespace, EVENTS_KEY).orElse(null);
final FlowMetric growthEventsFlow = createFlowMetric(QUEUE_PERSISTED_GROWTH_EVENTS_KEY, eventsGaugeMetricSupplier, () -> uptimeInPreciseSeconds);
this.flowMetrics.add(growthEventsFlow);
this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, growthEventsFlow);
storeMetric(context, flowNamespace, growthEventsFlow);

final Supplier<NumberGauge> queueSizeInBytesMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueCapacityNamespace, QUEUE_SIZE_IN_BYTES_KEY).orElse(null);
final FlowMetric growthBytesFlow = createFlowMetric(QUEUE_PERSISTED_GROWTH_BYTES_KEY, queueSizeInBytesMetricSupplier, () -> uptimeInPreciseSeconds);
this.flowMetrics.add(growthBytesFlow);
this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, growthBytesFlow);

storeMetric(context, flowNamespace, growthBytesFlow);
}
}
Expand Down Expand Up @@ -705,7 +723,7 @@ private void initializePluginThroughputFlowMetric(final ThreadContext context, f
final LongCounter eventsOut = initOrGetCounterMetric(context, eventsNamespace, OUT_KEY);

final FlowMetric throughputFlow = createFlowMetric(PLUGIN_THROUGHPUT_KEY, eventsOut, uptimeInPreciseSeconds);
this.flowMetrics.add(throughputFlow);
this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.PLUGIN, throughputFlow);

final RubySymbol[] flowNamespace = buildNamespace(PLUGINS_KEY, INPUTS_KEY, RubyUtil.RUBY.newString(id).intern(), FLOW_KEY);
storeMetric(context, flowNamespace, throughputFlow);
Expand All @@ -718,12 +736,12 @@ private void initializePluginWorkerFlowMetrics(final ThreadContext context, fina
final TimerMetric durationInMillis = initOrGetTimerMetric(context, eventsNamespace, DURATION_IN_MILLIS_KEY);
final LongCounter counterEvents = initOrGetCounterMetric(context, eventsNamespace, IN_KEY);
final FlowMetric workerCostPerEvent = createFlowMetric(WORKER_MILLIS_PER_EVENT_KEY, durationInMillis, counterEvents);
this.flowMetrics.add(workerCostPerEvent);
this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.PLUGIN, workerCostPerEvent);

final UpScaledMetric percentScaledDurationInMillis = new UpScaledMetric(durationInMillis, 100);
final UpScaledMetric availableWorkerTimeInMillis = new UpScaledMetric(uptimeInPreciseMillis, workerCount);
final FlowMetric workerUtilization = createFlowMetric(WORKER_UTILIZATION_KEY, percentScaledDurationInMillis, availableWorkerTimeInMillis);
this.flowMetrics.add(workerUtilization);
this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.PLUGIN, workerUtilization);

final RubySymbol[] flowNamespace = buildNamespace(PLUGINS_KEY, key, RubyUtil.RUBY.newString(id).intern(), FLOW_KEY);
storeMetric(context, flowNamespace, workerCostPerEvent);
Expand Down Expand Up @@ -884,4 +902,33 @@ public IRubyObject isShutdownRequested(final ThreadContext context) {
public final RubyString getLastErrorEvaluationReceived(final ThreadContext context) {
return RubyString.newString(context.runtime, lastErrorEvaluationReceived);
}

private static class ScopedFlowMetrics {
enum Scope {
WORKER,
PLUGIN
}
private final Map<Scope, List<FlowMetric>> flowsByScope = new ConcurrentHashMap<>();

void register(final Scope scope, final FlowMetric metric) {
flowsByScope.compute(scope, (s, scopedFlows) -> {
if (scopedFlows == null) {
return List.of(metric);
} else {
final ArrayList<FlowMetric> mutable = new ArrayList<>(scopedFlows.size() + 1);
mutable.addAll(scopedFlows);
mutable.add(metric);
return List.copyOf(mutable);
}
});
}

void captureAll() {
flowsByScope.values().stream().flatMap(List::stream).forEach(FlowMetric::capture);
}

List<FlowMetric> getFlowMetrics(final Scope scope) {
return flowsByScope.getOrDefault(scope, List.of());
}
}
}
Loading

0 comments on commit a931b2c

Please sign in to comment.