From 46dbb23ee8f2cf40fb9eb9b03b911148774d66d3 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 20 Nov 2024 16:00:32 +0200 Subject: [PATCH] [improve][common] Improve logic for enabling Netty leak detection --- .../allocator/PulsarByteBufAllocator.java | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java index ac12bb2df124d..87c100332e62e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; @@ -44,6 +45,12 @@ public class PulsarByteBufAllocator { public static final String PULSAR_ALLOCATOR_LEAK_DETECTION = "pulsar.allocator.leak_detection"; public static final String PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY = "pulsar.allocator.out_of_memory_policy"; + private static final String[] LEAK_DETECTION_PROPERTY_NAMES = { + PULSAR_ALLOCATOR_LEAK_DETECTION, + "io.netty.leakDetection.level", // io.netty.util.ResourceLeakDetector.PROP_LEVEL + "io.netty.leakDetectionLevel", // io.netty.util.ResourceLeakDetector.PROP_LEVEL_OLD + }; + public static final ByteBufAllocator DEFAULT; private static final List> LISTENERS = new CopyOnWriteArrayList<>(); @@ -64,8 +71,7 @@ static ByteBufAllocator createByteBufAllocator() { final OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.valueOf( System.getProperty(PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY, "FallbackToHeap")); - final LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy - .valueOf(System.getProperty(PULSAR_ALLOCATOR_LEAK_DETECTION, "Disabled")); + final LeakDetectionPolicy leakDetectionPolicy = resolveLeakDetectionPolicy(); if (log.isDebugEnabled()) { log.debug("Is Pooled: {} -- Exit on OOM: {}", isPooled, isExitOnOutOfMemory); } @@ -98,4 +104,18 @@ static ByteBufAllocator createByteBufAllocator() { return builder.build(); } + + /** + * Resolve the leak detection policy. The value is resolved from the system properties in + * the order of LEAK_DETECTION_PROPERTY_NAMES. + * @return parsed leak detection policy + */ + private static LeakDetectionPolicy resolveLeakDetectionPolicy() { + String stringValue = Arrays.stream(LEAK_DETECTION_PROPERTY_NAMES) + .map(System::getProperty) + .filter(v -> v != null) + .findFirst() + .orElse(LeakDetectionPolicy.Disabled.name()); + return LeakDetectionPolicy.parseLevel(stringValue); + } }