From c58017705b0f1fa431bfba68d56f50578f36f66f Mon Sep 17 00:00:00 2001 From: Orlando Krause Date: Wed, 17 Jul 2024 10:34:54 -0300 Subject: [PATCH] fix: intermitent error on multithread (#70) --- .../io/pyroscope/labels/ScopedContext.java | 26 +++++++++++++---- .../java/io/pyroscope/labels/LabelsTest.java | 6 ++-- .../pyroscope/labels/ScopedContextTest.java | 28 +++++++++++++++++++ 3 files changed, 51 insertions(+), 9 deletions(-) create mode 100644 async-profiler-context/src/test/java/io/pyroscope/labels/ScopedContextTest.java diff --git a/async-profiler-context/src/main/java/io/pyroscope/labels/ScopedContext.java b/async-profiler-context/src/main/java/io/pyroscope/labels/ScopedContext.java index a2b9213..a780eb8 100644 --- a/async-profiler-context/src/main/java/io/pyroscope/labels/ScopedContext.java +++ b/async-profiler-context/src/main/java/io/pyroscope/labels/ScopedContext.java @@ -5,19 +5,21 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; public class ScopedContext implements AutoCloseable { - static final ThreadLocal context = ThreadLocal.withInitial(() -> - new Context(0L, Collections.emptyMap()) - ); + static final Map threadContext = new ConcurrentHashMap<>(); final Context previous; final Context current; + final Thread originalThread; final Ref, Ref>> currentRef; boolean closed = false; + public ScopedContext(LabelsSet labels) { - previous = context.get(); + originalThread = Thread.currentThread(); + previous = getContext(); Map, Ref> nextContext = new HashMap<>( previous.labels.size() + labels.args.length / 2 ); @@ -58,7 +60,7 @@ public ScopedContext(LabelsSet labels) { AsyncProfiler.getInstance().setContextId(currentRef.id); current = new Context(currentRef.id, nextContext); - context.set(current); + setContext(current); } @@ -69,7 +71,7 @@ public void close() { } closed = true; currentRef.refCount.decrementAndGet(); - context.set(previous); + setContext(previous); AsyncProfiler.getInstance().setContextId(previous.id); } @@ -79,6 +81,18 @@ public void forEach(BiConsumer consumer) { } } + private Context getContext() { + return threadContext.computeIfAbsent(this.originalThread, k -> new Context(0L, Collections.emptyMap())); + } + + private void setContext(Context context) { + threadContext.put(this.originalThread, context); + } + + static Context currentContext() { + return threadContext.get(Thread.currentThread()); + } + static class Context { public final Long id; public final Map, Ref> labels; diff --git a/async-profiler-context/src/test/java/io/pyroscope/labels/LabelsTest.java b/async-profiler-context/src/test/java/io/pyroscope/labels/LabelsTest.java index 46516e5..e88338c 100644 --- a/async-profiler-context/src/test/java/io/pyroscope/labels/LabelsTest.java +++ b/async-profiler-context/src/test/java/io/pyroscope/labels/LabelsTest.java @@ -68,7 +68,7 @@ void testOneLabelSet() { assertEquals(1, v1.refCount.get()); } } - assertEquals(0, ScopedContext.context.get().id); + assertEquals(0, ScopedContext.currentContext().id); assertEquals(0, ctxRef.refCount.get()); assertEquals(1, k1.refCount.get()); assertEquals(1, v1.refCount.get()); @@ -195,7 +195,7 @@ void testLabelsSetMerge() { } Pyroscope.LabelsWrapper.dump(); - assertEquals(0, ScopedContext.context.get().id); + assertEquals(0, ScopedContext.currentContext().id); assertEquals(0, RefCounted.strings.valueToRef.size()); assertEquals(0, RefCounted.contexts.valueToRef.size()); } @@ -226,7 +226,7 @@ void stressTest() throws InterruptedException { e.shutdown(); e.awaitTermination(100, TimeUnit.SECONDS); Snapshot res = Pyroscope.LabelsWrapper.dump(); - assertEquals(0, ScopedContext.context.get().id); + assertEquals(0, ScopedContext.currentContext().id); assertEquals(0, RefCounted.strings.valueToRef.size()); assertEquals(0, RefCounted.contexts.valueToRef.size()); } diff --git a/async-profiler-context/src/test/java/io/pyroscope/labels/ScopedContextTest.java b/async-profiler-context/src/test/java/io/pyroscope/labels/ScopedContextTest.java new file mode 100644 index 0000000..45e8729 --- /dev/null +++ b/async-profiler-context/src/test/java/io/pyroscope/labels/ScopedContextTest.java @@ -0,0 +1,28 @@ +package io.pyroscope.labels; + +import io.pyroscope.labels.io.pyroscope.PyroscopeAsyncProfiler; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class ScopedContextTest { + + static { + PyroscopeAsyncProfiler.getAsyncProfiler(); + } + + + /** + * This test demonstrates the issue reported here + */ + @Test + void closeContextOnOtherThread() throws ExecutionException, InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(1); + ScopedContext context = new ScopedContext(new LabelsSet("k1", "v1")); + executorService.submit(context::close).get(); + Pyroscope.LabelsWrapper.dump(); + new ScopedContext(new LabelsSet("k1", "v1")); + } +}