Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #483] Virtual thread compatible #508

Closed
wants to merge 45 commits into from

Conversation

EachannChan
Copy link
Contributor

Virtual thread compatible

EachannChan and others added 30 commits October 14, 2024 18:33
# Conflicts:
#	adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/DtpAdapterListener.java
#	core/src/main/java/org/dromara/dynamictp/core/monitor/collector/MicroMeterCollector.java
#	core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/JMXCollector.java
#	example/example-nacos-cloud/src/main/resources/dynamic-tp-nacos-cloud-demo-dtp-dev.yml
#	extension/extension-limiter-redis/src/main/java/org/dromara/dynamictp/extension/limiter/redis/ratelimiter/NotifyRedisRateLimiterFilter.java
#	spring/src/main/java/org/dromara/dynamictp/spring/DtpPostProcessor.java
#	spring/src/main/java/org/dromara/dynamictp/spring/annotation/DtpBeanDefinitionRegistrar.java
#	test/test-core/src/test/resources/postprocessor-dtp-dev.yml
@@ -109,13 +109,13 @@ public Map<String, ExecutorWrapper> getExecutorWrappers() {
* @return thead pools stats
*/
@Override
public List<ThreadPoolStats> getMultiPoolStats() {
public List<ExecutorStats> getMultiPoolStats() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

改完全点,方法、变量名

if (!Objects.equals(currentRejectHandlerType, props.getRejectedHandlerType())) {
val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType());
executorWrapper.setRejectHandler(rejectHandler);
if (!executorWrapper.isVirtualThreadExecutor()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个判断去掉吧,判断太多可读性会变差

poolStats.setDynamic(executor instanceof DtpExecutor);
ExecutorStats executorStats = convertCommon(executor);
executorStats.setPoolName(wrapper.getThreadPoolName());
executorStats.setPoolAliasName(wrapper.getThreadPoolAliasName());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

新加的两个字段没赋值?

Copy link
Contributor Author

@EachannChan EachannChan Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我还是用的poolname那两个打通的逻辑,想着到时一起改可能会好些。要是只改虚拟线程这边的话我怕会出现问题

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我还是用的poolname那两个打通的逻辑,想着到时一起改可能会好些。要是只改虚拟线程这边的话我怕会出现问题

字段赋值不影响啊,就是监控指标多了两个字段,可用可不用

Copy link
Contributor Author

@EachannChan EachannChan Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我还是用的poolname那两个打通的逻辑,想着到时一起改可能会好些。要是只改虚拟线程这边的话我怕会出现问题

字段赋值不影响啊,就是监控指标多了两个字段,可用可不用

好,那我把那两个加上去

poolStats.setTp99(performanceSnapshot.getTp99());
poolStats.setTp999(performanceSnapshot.getTp999());
return poolStats;
if (!wrapper.isVirtualThreadExecutor()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥加这个判断,虚拟线程也会有超时计数


Iterable<Tag> tags = getTags(executorStats);

if (!executorStats.isVirtualExecutor()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

判断不用加,不存在的用空值

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static final String APP_NAME_TAG = "app.name";

private static final Map<String, ThreadPoolStats> GAUGE_CACHE = new ConcurrentHashMap<>();
private static final Map<String, org.dromara.dynamictp.common.entity.Metrics> GAUGE_CACHE = new ConcurrentHashMap<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

去掉包路径

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

它这个冲突了,有个micrometer的这个类
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

现在我换了一个类

@Override
public <T> Future<T> submit(Runnable runnable, T t) {
runnable = getEnhancedTask(runnable);
EnhancedRunnable.of(runnable, this);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

返回值

Copy link
Contributor Author

@EachannChan EachannChan Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

返回值

这个该咋改呢,我就返回callable可以吗
image

pom.xml Outdated
<maven.compiler.target>17</maven.compiler.target>

<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用17,前后兼容

propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout());

if (!ExecutorType.getClass(props.getExecutorType()).equals(VirtualThreadExecutorProxy.class)) {
propertyValues.put(ALLOW_CORE_THREAD_TIMEOUT, props.isAllowCoreThreadTimeOut());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

判断不用加

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个目前虚拟线程的proxy没有这些属性,不加的话会报错

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

采用提前返回的形式吧,看着舒服点代码

propertyValues.put(PRE_START_ALL_CORE_THREADS, props.isPreStartAllCoreThreads());
propertyValues.put(REJECT_HANDLER_TYPE, props.getRejectedHandlerType());
propertyValues.put(REJECT_ENHANCED, props.isRejectEnhanced());
propertyValues.put(RUN_TIMEOUT, props.getRunTimeout());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些字段虚拟线程也会用到,超时相关是在AwareManager扩展中用到

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些字段虚拟线程也会用到,超时相关是在AwareManager扩展中用到

这些字段虚拟线程的proxy目前没有这些属性,如果暴露的话会无法生成实例。要是之后有需要的话再进行添加重构吧

/**
* 执行器名字
*/
private String executorName;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JMXCollector没有用到此名字

private String executorName;

/**
* 执行器别名
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这两个名字字段如果标注@deprecated的话需要兼容,不然直接改名就好?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这两个名字字段如果标注@deprecated的话需要兼容,不然直接改名就好?

行,我看看要是没有其他地方用到这两个字段我就去掉算了

@@ -196,7 +196,7 @@ private static void refresh(ExecutorWrapper executorWrapper, DtpExecutorProps pr
TpMainFields oldFields = ExecutorConverter.toMainFields(executorWrapper);
doRefresh(executorWrapper, props);
TpMainFields newFields = ExecutorConverter.toMainFields(executorWrapper);
if (oldFields.equals(newFields)) {
if (oldFields.equals(newFields) && !executorWrapper.isVirtualThreadExecutor()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里使用&&有问题

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里使用&&有问题

那我是直接删掉还是?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里使用&&有问题

那我是直接删掉还是?

感觉删掉较好

@@ -121,6 +125,9 @@ private Object registerAndReturnCommon(Object bean, String beanName) {
} else {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
if (!(beanDefinition instanceof AnnotatedBeanDefinition)) {
if (beanDefinition.getBeanClassName().equals("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy")) {
return doRegisterAndReturnCommon(bean, beanName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

使用VirtualThreadExecutorProxy.class.getName(),后面维护也简单

@@ -121,6 +125,9 @@ private Object registerAndReturnCommon(Object bean, String beanName) {
} else {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
if (!(beanDefinition instanceof AnnotatedBeanDefinition)) {
if (beanDefinition.getBeanClassName().equals("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy")) {
return doRegisterAndReturnCommon(bean, beanName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里doRegisterAndReturnCommon调用类似原来的代码最后再调用?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里doRegisterAndReturnCommon调用类似原来的代码最后再调用?

这个我是想着因为现在配置中心里没有需要动态配置的参数所以就直接以common的登记了。想着到时新增参数后再做重构

DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE);
return bean;
}
Executor proxy;
if (bean instanceof ScheduledThreadPoolExecutor) {
proxy = newScheduledTpProxy(poolName, (ScheduledThreadPoolExecutor) bean);
} else if (bean.getClass().getName().equals("java.util.concurrent.ThreadPerTaskExecutor")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里一样使用getClass().name(),使用JreEnum做下版本限制

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里一样使用getClass().name(),使用JreEnum做下版本限制

这个地方改不了,ThreadPerTaskExecutor这个类JDK没有暴露出来,所以只能用这种方式来写。这也是虚拟线程的proxy有些不太一样的原因

try {
args = buildConstructorArgs(executorTypeClass, e);
} catch (UnsupportedOperationException exception) {
return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里加下异常日志

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里加下异常日志

加了,在调用的方法里面
image

} else if (clazz.equals(VirtualThreadExecutorProxy.class)) {
int jdkVersion = JreEnum.currentVersion().ordinal();
if (jdkVersion < JDK_VERSION_21_OFFSET) {
log.warn("DynamicTp virtual thread executor {} register warn: update your JDK version or don't use virtual thread executor!", props.getThreadPoolName());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里日志可以去掉,抛异常带上信息,外层捕获的时候打印

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那我把这个日志移到外层去吧

BlockingQueue<Runnable> taskQueue;
if (clazz.equals(EagerDtpExecutor.class)) {
taskQueue = new TaskQueue(props.getQueueCapacity());
} else if (clazz.equals(PriorityDtpExecutor.class)) {
taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator());
} else if (clazz.equals(VirtualThreadExecutorProxy.class)) {
int jdkVersion = JreEnum.currentVersion().ordinal();
if (jdkVersion < JDK_VERSION_21_OFFSET) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

版本判断加个方法到JreEnum里面,以后也会用到

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new UnsupportedOperationException();
}
return new Object[]{
Executors.newVirtualThreadPerTaskExecutor()
Copy link
Collaborator

@KamToHung KamToHung Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadFactory factory = Thread.ofVirtual().name("xx").factory();
Executors.newThreadPerTaskExecutor(factory);

这里使用factory设置下thread name

@@ -125,7 +125,7 @@ private Object registerAndReturnCommon(Object bean, String beanName) {
} else {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
if (!(beanDefinition instanceof AnnotatedBeanDefinition)) {
if (beanDefinition.getBeanClassName().equals("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy")) {
if (VirtualThreadExecutorProxy.class.getName().equals("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是"org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy"换成VirtualThreadExecutorProxy.class.getName()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是"org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy"换成VirtualThreadExecutorProxy.class.getName()

是指反过来equal对不

BlockingQueue<Runnable> taskQueue;
if (clazz.equals(EagerDtpExecutor.class)) {
taskQueue = new TaskQueue(props.getQueueCapacity());
} else if (clazz.equals(PriorityDtpExecutor.class)) {
taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator());
} else if (clazz.equals(VirtualThreadExecutorProxy.class)) {
int jreVersion = JreEnum.currentIntVersion();
if (jreVersion < JRE_VERSION_21) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我在springboot3分支加了判断版本大小方法,merge过来后可以使用

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我在springboot3分支加了判断版本大小方法,merge过来后可以使用

@@ -71,6 +75,10 @@
@Slf4j
public class DtpBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {

private static final Integer JRE_VERSION_21 = 21;

private static final String VIRTUAL_THREAD_EXECUTOR_TYPE = "virtual";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

放在common.constant下比较好

@EachannChan
Copy link
Contributor Author

EachannChan commented Nov 22, 2024

GitHub的那个检查报错是不是该解决一下,好像是maven的编译版本低了,我去把它版本调高可以吗,或者有什么其他的解决方案

@EachannChan
Copy link
Contributor Author

GitHub的那个检查报错是不是该解决一下,好像是maven的编译版本低了,我去把它版本调高可以吗

@KamToHung
Copy link
Collaborator

GitHub的那个检查报错是不是该解决一下,好像是maven的编译版本低了,我去把它版本调高可以吗

不是maven版本问题吧,JDK版本

@EachannChan
Copy link
Contributor Author

GitHub的那个检查报错是不是该解决一下,好像是maven的编译版本低了,我去把它版本调高可以吗

不是maven版本问题吧,JDK版本
是的,maven的jdk编译版本吧应该。

@@ -121,6 +125,9 @@ private Object registerAndReturnCommon(Object bean, String beanName) {
} else {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
if (!(beanDefinition instanceof AnnotatedBeanDefinition)) {
if ("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy".equals(VirtualThreadExecutorProxy.class.getName())) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里判断应该改为:beanDefinition.getBeanClassName().equals(VirtualThreadExecutorProxy.class.getName())

/**
* jre
*/
private static final Integer JRE_VERSION_21 = 21;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果没用到可以删除

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}
gauge(GAUGE_CACHE.get(threadPoolStats.getPoolName()));
gauge((ExecutorStats) GAUGE_CACHE.get(executorStats.getExecutorName()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不用强转了

private static ThreadPoolStats convertCommon(ExecutorAdapter<?> executor) {
ThreadPoolStats poolStats = new ThreadPoolStats();
private static ExecutorStats convertCommon(ExecutorAdapter<?> executor) {
ExecutorStats poolStats = new ExecutorStats();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

变量名也改了

@@ -88,13 +90,15 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro

@Override
public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull String beanName) throws BeansException {
if (!(bean instanceof ThreadPoolExecutor) && !(bean instanceof ThreadPoolTaskExecutor)) {
if (!(bean instanceof ThreadPoolExecutor) && !(bean instanceof ThreadPoolTaskExecutor) &&
!(bean.getClass().getName().equals("java.util.concurrent.ThreadPerTaskExecutor")) &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.util.concurrent.ThreadPerTaskExecutor 提取成常量吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map<String, Object> propertyValues = buildPropertyValues(e);
Object[] args = buildConstructorArgs(executorTypeClass, e);
Map<String, Object> propertyValues;
propertyValues = buildPropertyValues(e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥不合一行?

try {
args = buildConstructorArgs(executorTypeClass, e);
} catch (UnsupportedOperationException exception) {
log.warn("DynamicTp virtual thread executor {} register warn: update your JDK version or don't use virtual thread executor!", e.getThreadPoolName());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

日志描述有问题,不一定是虚拟线程的异常

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是我自己抛的异常,应该没有其他的情况了吧
image

BlockingQueue<Runnable> taskQueue;
if (clazz.equals(EagerDtpExecutor.class)) {
taskQueue = new TaskQueue(props.getQueueCapacity());
} else if (clazz.equals(PriorityDtpExecutor.class)) {
taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator());
} else if (clazz.equals(VirtualThreadExecutorProxy.class)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

虚拟线程判断可以提前判断、返回

propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout());

if (!ExecutorType.getClass(props.getExecutorType()).equals(VirtualThreadExecutorProxy.class)) {
propertyValues.put(ALLOW_CORE_THREAD_TIMEOUT, props.isAllowCoreThreadTimeOut());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

采用提前返回的形式吧,看着舒服点代码

@EachannChan EachannChan closed this Dec 2, 2024
@EachannChan EachannChan reopened this Dec 2, 2024
@EachannChan EachannChan closed this Dec 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants