Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: intermitent error on multithread #156

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.function.BiConsumer;

public class ScopedContext implements AutoCloseable {
static final ThreadLocal<Context> context = ThreadLocal.withInitial(() ->
new Context(0L, Collections.emptyMap())
);
static final Map<Thread, Context> threadContext = Collections.synchronizedMap(new WeakHashMap<>());

final Context previous;
final Context current;
final Thread originalThread;
final Ref<Map<Ref<String>, Ref<String>>> currentRef;
boolean closed = false;

public ScopedContext(LabelsSet labels) {
previous = context.get();
originalThread = Thread.currentThread();
previous = getContext();
Map<Ref<String>, Ref<String>> nextContext = new HashMap<>(
previous.labels.size() + labels.args.length / 2
);
Expand Down Expand Up @@ -58,7 +60,7 @@ public ScopedContext(LabelsSet labels) {

AsyncProfiler.getInstance().setContextId(currentRef.id);
current = new Context(currentRef.id, nextContext);
context.set(current);
setContext(current);
}


Expand All @@ -69,7 +71,7 @@ public void close() {
}
closed = true;
currentRef.refCount.decrementAndGet();
context.set(previous);
setContext(previous);
AsyncProfiler.getInstance().setContextId(previous.id);
}

Expand All @@ -79,6 +81,18 @@ public void forEach(BiConsumer<String, String> consumer) {
}
}

private Context getContext() {
return threadContext.computeIfAbsent(this.originalThread, k -> new Context(0L, Collections.emptyMap()));
}

private void setContext(Context context) {
threadContext.put(this.originalThread, context);
}

static Context currentContext() {
return threadContext.get(Thread.currentThread());
}

static class Context {
public final Long id;
public final Map<Ref<String>, Ref<String>> labels;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void testOneLabelSet() {
assertEquals(1, v1.refCount.get());
}
}
assertEquals(0, ScopedContext.context.get().id);
assertEquals(0, ScopedContext.currentContext().id);
assertEquals(0, ctxRef.refCount.get());
assertEquals(1, k1.refCount.get());
assertEquals(1, v1.refCount.get());
Expand Down Expand Up @@ -195,7 +195,7 @@ void testLabelsSetMerge() {

}
Pyroscope.LabelsWrapper.dump();
assertEquals(0, ScopedContext.context.get().id);
assertEquals(0, ScopedContext.currentContext().id);
assertEquals(0, RefCounted.strings.valueToRef.size());
assertEquals(0, RefCounted.contexts.valueToRef.size());
}
Expand Down Expand Up @@ -226,7 +226,7 @@ void stressTest() throws InterruptedException {
e.shutdown();
e.awaitTermination(100, TimeUnit.SECONDS);
Snapshot res = Pyroscope.LabelsWrapper.dump();
assertEquals(0, ScopedContext.context.get().id);
assertEquals(0, ScopedContext.currentContext().id);
assertEquals(0, RefCounted.strings.valueToRef.size());
assertEquals(0, RefCounted.contexts.valueToRef.size());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.pyroscope.labels;

import io.pyroscope.labels.io.pyroscope.PyroscopeAsyncProfiler;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.lang.ref.WeakReference;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

public class ScopedContextTest {

static {
PyroscopeAsyncProfiler.getAsyncProfiler();
}


/**
* This test demonstrates the issue reported <a href="https://github.com/grafana/pyroscope-java/issues/70">here</a>
*/
@Test
void closeContextOnOtherThread() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
ScopedContext context = new ScopedContext(new LabelsSet("k1", "v1"));
executorService.submit(context::close).get();
Pyroscope.LabelsWrapper.dump();
// Shouldn't error
new ScopedContext(new LabelsSet("k1", "v1"));
}

@Test
void assertThreadNotLeaking() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = new CompletableFuture<>();
AtomicReference<WeakReference<Thread>> threadRef = new AtomicReference<>();
new Thread(() -> {
threadRef.set(new WeakReference<>(Thread.currentThread()));
ScopedContext context = new ScopedContext(new LabelsSet("k1", "v1"));
context.close();
future.complete(null);
}).start();
future.get();
Pyroscope.LabelsWrapper.dump();

// Force GC to clean the thread
System.gc();

Assertions.assertNull(threadRef.get().get(), "The thread should be garbage collected");

// In WeakHashMap we need to interact with the map so it can clean the stale references
ScopedContext.threadContext.forEach((thread, context) -> {});
Assertions.assertTrue(ScopedContext.threadContext.isEmpty(), "The map should be empty, as the thread doesn't exists anymore");
}
}