Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support auto transform SOFATracer Span in SOFA ThreadPool #191

Merged
merged 2 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.alipay.sofa.common</groupId>
<artifactId>sofa-common-tools</artifactId>
<version>1.3.12</version>
<version>1.3.13</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand All @@ -29,6 +29,7 @@
<spring.boot.version>1.5.16.RELEASE</spring.boot.version>
<junit.version>4.13.1</junit.version>
<guava.version>27.0-jre</guava.version>
<sofa.tracer.version>3.1.3</sofa.tracer.version>
</properties>

<dependencyManagement>
Expand All @@ -49,6 +50,12 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>tracer-core</artifactId>
<version>${sofa.tracer.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.alipay.sofa.common.thread.space.SpaceNamedThreadFactory;
import com.alipay.sofa.common.utils.StringUtil;

import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,11 +34,12 @@
* @version SofaScheduledThreadPoolExecutor.java, v 0.1 2020年11月09日 2:19 下午 huzijie Exp $
*/
public class SofaScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
private static final String SIMPLE_CLASS_NAME = SofaScheduledThreadPoolExecutor.class
.getSimpleName();
private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private static final String SIMPLE_CLASS_NAME = SofaScheduledThreadPoolExecutor.class
.getSimpleName();
private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private final ThreadPoolConfig config;
private final ThreadPoolStatistics statistics;
private boolean sofaTracerTransmit = false;

/**
* Basic constructor
Expand Down Expand Up @@ -193,4 +196,43 @@ public ThreadPoolConfig getConfig() {
public ThreadPoolStatistics getStatistics() {
return statistics;
}

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (sofaTracerTransmit) {
command = SofaTracerCommandFactory.ofRunnable(command);
}
return super.schedule(command, delay, unit);
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (sofaTracerTransmit) {
callable = SofaTracerCommandFactory.ofCallable(callable);
}
return super.schedule(callable, delay, unit);
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
if (sofaTracerTransmit) {
command = SofaTracerCommandFactory.ofRunnable(command);
}
return super.scheduleAtFixedRate(command, initialDelay, period, unit);
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
if (sofaTracerTransmit) {
command = SofaTracerCommandFactory.ofRunnable(command);
}
return super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
this.sofaTracerTransmit = sofaTracerTransmit;
}

public boolean isSofaTracerTransmit() {
return sofaTracerTransmit;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
* Created on 2020/3/16
*/
public class SofaThreadPoolExecutor extends ThreadPoolExecutor {
private static final String SIMPLE_CLASS_NAME = SofaThreadPoolExecutor.class
.getSimpleName();
private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private static final String SIMPLE_CLASS_NAME = SofaThreadPoolExecutor.class
.getSimpleName();
private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private final ThreadPoolConfig config;
private final ThreadPoolStatistics statistics;
private boolean sofaTracerTransmit = false;

/**
* Basic constructor
Expand Down Expand Up @@ -142,7 +143,8 @@ public SofaThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAl

@Override
public void execute(Runnable command) {
ExecutingRunnable runner = new ExecutingRunnable(command);
ExecutingRunnable runner = sofaTracerTransmit ? SofaTracerCommandFactory
.ofExecutingRunnable(command) : new ExecutingRunnable(command);
runner.setEnqueueTime(System.currentTimeMillis());
super.execute(runner);
}
Expand Down Expand Up @@ -221,6 +223,14 @@ private String createName() {
return SIMPLE_CLASS_NAME + String.format("%08x", POOL_COUNTER.getAndIncrement());
}

public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
this.sofaTracerTransmit = sofaTracerTransmit;
}

public boolean isSofaTracerTransmit() {
return sofaTracerTransmit;
}

@Deprecated
public String getThreadPoolName() {
return this.config.getThreadPoolName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

protected long period;

protected boolean sofaTracerTransmit;

@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
RejectedExecutionHandler rejectedExecutionHandler) {
Expand Down Expand Up @@ -73,6 +75,7 @@
rejectedExecutionHandler, threadPoolName, spaceName, taskTimeout, period,
TimeUnit.MILLISECONDS);
}
executor.setSofaTracerTransmit(sofaTracerTransmit);

Boolean allowCoreThreadTimeOut = ClassUtil.getField("allowCoreThreadTimeOut", this);
if (allowCoreThreadTimeOut) {
Expand Down Expand Up @@ -144,4 +147,12 @@
}
return sofaThreadPoolExecutor.getConfig().getTimeUnit();
}

public boolean isSofaTracerTransmit() {
return sofaTracerTransmit;

Check warning on line 152 in src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolTaskExecutor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolTaskExecutor.java#L152

Added line #L152 was not covered by tests
}

public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
this.sofaTracerTransmit = sofaTracerTransmit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class SofaThreadPoolTaskScheduler extends ThreadPoolTaskScheduler {

protected long period;

protected boolean sofaTracerTransmit;

@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
RejectedExecutionHandler rejectedExecutionHandler) {
Expand All @@ -54,6 +56,7 @@ protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
SofaScheduledThreadPoolExecutor executor = new SofaScheduledThreadPoolExecutor(
getPoolSize(), threadFactory, rejectedExecutionHandler, threadPoolName, spaceName,
taskTimeout, period, TimeUnit.MILLISECONDS);
executor.setSofaTracerTransmit(sofaTracerTransmit);

Boolean removeOnCancelPolicy = ClassUtil.getField("removeOnCancelPolicy", this);
if (removeOnCancelPolicy) {
Expand Down Expand Up @@ -126,4 +129,8 @@ public TimeUnit getTimeUnit() {
}
return sofaScheduledThreadPoolExecutor.getConfig().getTimeUnit();
}

public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
this.sofaTracerTransmit = sofaTracerTransmit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.common.thread;

import com.alipay.common.tracer.core.async.SofaTracerCallable;
import com.alipay.common.tracer.core.async.SofaTracerRunnable;
import com.alipay.sofa.common.utils.ClassUtil;

import java.util.concurrent.Callable;

/**
* Factory to create SOFA-Tracer work command.
* @author huzijie
* @version SofaTracerCommandFactory.java, v 0.1 2023年09月26日 2:53 PM huzijie Exp $
*/
public class SofaTracerCommandFactory {

Check warning on line 30 in src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java#L30

Added line #L30 was not covered by tests

private static final String SOFA_TRACER_RUNNABLE_CLASS_NAME = "com.alipay.common.tracer.core.async.SofaTracerRunnable";
private static final boolean SOFA_TRACER_CLASS_PRESENT = ClassUtil
.isPresent(
SOFA_TRACER_RUNNABLE_CLASS_NAME,
SofaTracerCommandFactory.class
.getClassLoader());

static ExecutingRunnable ofExecutingRunnable(Runnable runnable) {
if (!SOFA_TRACER_CLASS_PRESENT) {
return new ExecutingRunnable(runnable);

Check warning on line 41 in src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java#L41

Added line #L41 was not covered by tests
}
return new SofaTracerCommandFactory.SofaTracerExecutingRunnable(runnable);
}

static Runnable ofRunnable(Runnable runnable) {
if (!SOFA_TRACER_CLASS_PRESENT) {
return runnable;

Check warning on line 48 in src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java#L48

Added line #L48 was not covered by tests
}
if (runnable instanceof SofaTracerRunnable) {
return runnable;
}
return new SofaTracerRunnable(runnable);
}

static <V> Callable<V> ofCallable(Callable<V> callable) {
if (!SOFA_TRACER_CLASS_PRESENT) {
return callable;

Check warning on line 58 in src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java#L58

Added line #L58 was not covered by tests
}
if (callable instanceof SofaTracerCallable) {
return callable;
}
return new SofaTracerCallable<>(callable);
}

/**
* The wrapper to the {@link ExecutingRunnable} to transmit SofaTracerSpan.
* @author huzijie
* @version SofaTracerExecutingRunnable.java, v 0.1 2023年09月26日 11:45 AM huzijie Exp $
*/
public static class SofaTracerExecutingRunnable extends ExecutingRunnable {

private final SofaTracerRunnable sofaTracerRunnable;

public SofaTracerExecutingRunnable(Runnable originRunnable) {
super(originRunnable);
if (originRunnable instanceof SofaTracerRunnable) {
this.sofaTracerRunnable = (SofaTracerRunnable) originRunnable;

Check warning on line 78 in src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java#L78

Added line #L78 was not covered by tests
} else {
this.sofaTracerRunnable = new SofaTracerRunnable(originRunnable);
}
}

@Override
public void run() {
sofaTracerRunnable.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author huzijie
Expand Down Expand Up @@ -135,4 +137,81 @@ public void testLoggingBurst() throws Exception {
Assert.assertEquals(numThreads, aberrantListAppender.list.size());
Assert.assertTrue(isLastInfoMatch("Thread pool with name '\\S+' unregistered"));
}

@Test
public void testNoTracerTransmit() throws InterruptedException {
AtomicInteger success = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(1);
threadPool.schedule(() -> {
try {
assertTraceSpanNotExist();
success.incrementAndGet();
} finally {
countDownLatch.countDown();
}
}, 10, TimeUnit.MILLISECONDS);
countDownLatch.await();
Assert.assertEquals(success.get(), 1);
}

@Test
public void testEnableTracerTransmit() throws InterruptedException {
threadPool.setSofaTracerTransmit(true);

AtomicInteger fail = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(1);
threadPool.schedule(() -> {
try {
assertTraceSpanExist();
} catch (Throwable t) {
fail.incrementAndGet();
} finally {
countDownLatch.countDown();
}
}, 10, TimeUnit.MILLISECONDS);
Assert.assertTrue(countDownLatch.await(20, TimeUnit.MILLISECONDS));
Assert.assertEquals(fail.get(), 0);

fail.set(0);
threadPool.schedule(() -> {
try {
return assertTraceSpanExist();
} catch (Throwable t) {
fail.incrementAndGet();
return null;
} finally {
countDownLatch.countDown();
}
}, 10, TimeUnit.MILLISECONDS);
Assert.assertTrue(countDownLatch.await(20, TimeUnit.MILLISECONDS));
Assert.assertEquals(fail.get(), 0);

fail.set(0);
CountDownLatch fixRateCountDownLatch = new CountDownLatch(2);
threadPool.scheduleAtFixedRate(() -> {
try {
assertTraceSpanExist();
} catch (Throwable t) {
fail.incrementAndGet();
} finally {
fixRateCountDownLatch.countDown();
}
}, 10, 10, TimeUnit.MILLISECONDS);
Assert.assertTrue(fixRateCountDownLatch.await(30, TimeUnit.MILLISECONDS));
Assert.assertEquals(fail.get(), 0);

fail.set(0);
CountDownLatch fixDelayCountDownLatch = new CountDownLatch(2);
threadPool.scheduleWithFixedDelay(() -> {
try {
assertTraceSpanExist();
} catch (Throwable t) {
fail.incrementAndGet();
} finally {
fixDelayCountDownLatch.countDown();
}
}, 10, 10, TimeUnit.MILLISECONDS);
Assert.assertTrue(fixDelayCountDownLatch.await(30, TimeUnit.MILLISECONDS));
Assert.assertEquals(fail.get(), 0);
}
}
Loading
Loading