From 5e17caf41b3e2ed37bc630ff9a6e693f02c378fd Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Wed, 3 Jul 2024 20:12:02 +0800 Subject: [PATCH 1/4] [ISSUE #5311] less concurrency --- .../service/impl/UpstreamCheckService.java | 53 +++++++++++++++---- .../admin/transfer/DiscoveryTransfer.java | 14 +++++ .../service/UpstreamCheckServiceTest.java | 22 +++++++- 3 files changed, 77 insertions(+), 12 deletions(-) diff --git a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java index 142cec04bb06..3c4009271be9 100644 --- a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java +++ b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java @@ -21,7 +21,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.shenyu.admin.listener.DataChangedEvent; @@ -33,13 +32,17 @@ import org.apache.shenyu.admin.model.event.selector.SelectorCreatedEvent; import org.apache.shenyu.admin.model.event.selector.SelectorUpdatedEvent; import org.apache.shenyu.admin.model.query.SelectorConditionQuery; +import org.apache.shenyu.admin.service.DiscoveryUpstreamService; import org.apache.shenyu.admin.service.converter.SelectorHandleConverterFactor; import org.apache.shenyu.admin.transfer.ConditionTransfer; +import org.apache.shenyu.admin.transfer.DiscoveryTransfer; import org.apache.shenyu.admin.utils.CommonUpstreamUtils; import org.apache.shenyu.admin.utils.SelectorUtil; import org.apache.shenyu.common.concurrent.ShenyuThreadFactory; import org.apache.shenyu.common.constant.Constants; import org.apache.shenyu.common.dto.ConditionData; +import org.apache.shenyu.common.dto.DiscoverySyncData; +import org.apache.shenyu.common.dto.DiscoveryUpstreamData; import org.apache.shenyu.common.dto.SelectorData; import org.apache.shenyu.common.dto.convert.selector.CommonUpstream; import org.apache.shenyu.common.dto.convert.selector.DivideUpstream; @@ -59,6 +62,7 @@ import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -111,6 +115,8 @@ public class UpstreamCheckService { private final SelectorHandleConverterFactor converterFactor; + private final DiscoveryUpstreamService discoveryUpstreamService; + private ScheduledThreadPoolExecutor executor; private ScheduledFuture scheduledFuture; @@ -134,12 +140,14 @@ public UpstreamCheckService(final SelectorMapper selectorMapper, final PluginMapper pluginMapper, final SelectorConditionMapper selectorConditionMapper, final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, - final SelectorHandleConverterFactor converterFactor) { + final SelectorHandleConverterFactor converterFactor, + final DiscoveryUpstreamService discoveryUpstreamService) { this.selectorMapper = selectorMapper; this.eventPublisher = eventPublisher; this.pluginMapper = pluginMapper; this.selectorConditionMapper = selectorConditionMapper; this.converterFactor = converterFactor; + this.discoveryUpstreamService = discoveryUpstreamService; Properties props = shenyuRegisterCenterConfig.getProps(); this.checked = Boolean.parseBoolean(props.getProperty(Constants.IS_CHECKED, Constants.DEFAULT_CHECK_VALUE)); this.scheduledThreads = Integer.parseInt(props.getProperty(Constants.ZOMBIE_CHECK_THREADS, Constants.ZOMBIE_CHECK_THREADS_VALUE)); @@ -294,9 +302,7 @@ private void checkZombie0(final ZombieUpstream zombieUpstream) { commonUpstream.setTimestamp(System.currentTimeMillis()); commonUpstream.setStatus(true); LOG.info("UpstreamCacheManager check zombie upstream success the url: {}, host: {} ", commonUpstream.getUpstreamUrl(), commonUpstream.getUpstreamHost()); - List old = ListUtils.unmodifiableList(UPSTREAM_MAP.getOrDefault(selectorId, Collections.emptyList())); this.submit(selectorId, commonUpstream); - updateHandler(selectorId, old, UPSTREAM_MAP.get(selectorId)); } else { LOG.error("check zombie upstream the url={} is fail", commonUpstream.getUpstreamUrl()); if (zombieUpstream.getZombieCheckTimes() > NumberUtils.INTEGER_ZERO) { @@ -367,18 +373,38 @@ private void updateSelectorHandler(final String selectorId, final List conditionDataList = ConditionTransfer.INSTANCE.mapToSelectorDOS( selectorConditionMapper.selectByQuery(new SelectorConditionQuery(selectorDO.getId()))); - SelectorData selectorData = SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList); + SelectorData selectorData = SelectorDO.transFrom(selectorDO, pluginName, conditionDataList); selectorData.setHandle(handler); // publish change event. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData))); + // publish discovery change event. + List discoveryUpstreamDataList = discoveryUpstreamService.findBySelectorId(selectorId); + discoveryUpstreamDataList.removeIf(u -> { + for (CommonUpstream alive : aliveList) { + if (alive.getUpstreamUrl().equals(u.getUrl())) { + return false; + } + } + return true; + }); + DiscoverySyncData discoverySyncData = new DiscoverySyncData(); + discoverySyncData.setUpstreamDataList(discoveryUpstreamDataList); + discoverySyncData.setPluginName(pluginName); + discoverySyncData.setSelectorId(selectorId); + discoverySyncData.setSelectorName(selectorDO.getName()); + eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.DISCOVER_UPSTREAM, DataEventTypeEnum.UPDATE, Collections.singletonList(discoverySyncData))); } /** @@ -394,12 +420,19 @@ public void fetchUpstreamData() { final List selectorDOList = selectorMapper.findByPluginIds(new ArrayList<>(pluginMap.keySet())); long currentTimeMillis = System.currentTimeMillis(); Optional.ofNullable(selectorDOList).orElseGet(ArrayList::new).stream() - .filter(selectorDO -> Objects.nonNull(selectorDO) && StringUtils.isNotEmpty(selectorDO.getHandle())) + .filter(Objects::nonNull) .forEach(selectorDO -> { String name = pluginMap.get(selectorDO.getPluginId()); - List commonUpstreams = converterFactor.newInstance(name).convertUpstream(selectorDO.getHandle()) - .stream().filter(upstream -> upstream.isStatus() || upstream.getTimestamp() > currentTimeMillis - TimeUnit.SECONDS.toMillis(zombieRemovalTimes)) - .collect(Collectors.toList()); + List commonUpstreams = new LinkedList<>(); + discoveryUpstreamService.findBySelectorId(selectorDO.getId()).stream() + .map(DiscoveryTransfer.INSTANCE::mapToCommonUpstream) + .forEach(commonUpstreams::add); + String handle = selectorDO.getHandle(); + if (StringUtils.isNotEmpty(handle)) { + commonUpstreams.addAll(converterFactor.newInstance(name).convertUpstream(handle) + .stream().filter(upstream -> upstream.isStatus() || upstream.getTimestamp() > currentTimeMillis - TimeUnit.SECONDS.toMillis(zombieRemovalTimes)) + .collect(Collectors.toList())); + } if (CollectionUtils.isNotEmpty(commonUpstreams)) { UPSTREAM_MAP.put(selectorDO.getId(), commonUpstreams); PENDING_SYNC.add(NumberUtils.INTEGER_ZERO); diff --git a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java index 4a40cb0df3e0..bd60b695188b 100644 --- a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java +++ b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java @@ -33,6 +33,7 @@ import org.apache.shenyu.admin.model.vo.DiscoveryVO; import org.apache.shenyu.common.dto.DiscoveryUpstreamData; import org.apache.shenyu.common.dto.ProxySelectorData; +import org.apache.shenyu.common.dto.convert.selector.CommonUpstream; import org.apache.shenyu.common.utils.GsonUtils; import java.util.Optional; @@ -65,6 +66,19 @@ public DiscoveryUpstreamDO mapToDo(DiscoveryUpstreamData discoveryUpstreamData) .dateUpdated(data.getDateUpdated()) .dateCreated(data.getDateCreated()).build()).orElse(null); } + + /** + * mapToCommonUpstream. + * + * @param discoveryUpstreamData discoveryUpstreamData + * @return CommonUpstream + */ + public CommonUpstream mapToCommonUpstream(DiscoveryUpstreamData discoveryUpstreamData) { + return Optional.ofNullable(discoveryUpstreamData).map(data -> { + String url = data.getUrl(); + return new CommonUpstream(data.getProtocol(), url.split(":")[0], url, false, data.getDateCreated().getTime()); + }).orElse(null); + } /** * mapToVo. diff --git a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java index fcd8a64df1fb..01bb6f04afeb 100644 --- a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java +++ b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java @@ -29,6 +29,7 @@ import org.apache.shenyu.admin.service.impl.UpstreamCheckService; import org.apache.shenyu.common.concurrent.ShenyuThreadFactory; import org.apache.shenyu.common.constant.Constants; +import org.apache.shenyu.common.dto.DiscoveryUpstreamData; import org.apache.shenyu.common.dto.convert.selector.DivideUpstream; import org.apache.shenyu.common.dto.convert.selector.ZombieUpstream; import org.apache.shenyu.common.enums.PluginEnum; @@ -50,6 +51,7 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.test.util.ReflectionTestUtils; +import java.sql.Timestamp; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -103,6 +105,9 @@ public final class UpstreamCheckServiceTest { private SelectorConditionMapper selectorConditionMapper; private SelectorHandleConverterFactor converterFactor; + + @Mock + private DiscoveryUpstreamService discoveryUpstreamService; private final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig = new ShenyuRegisterCenterConfig(); @@ -133,7 +138,8 @@ public void setUp() { Map maps = new HashMap<>(); maps.put(PluginEnum.DIVIDE.getName(), new DivideSelectorHandleConverter()); converterFactor = new SelectorHandleConverterFactor(maps); - upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, shenyuRegisterCenterConfig, converterFactor); + upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, + shenyuRegisterCenterConfig, converterFactor, discoveryUpstreamService); } @Test @@ -242,11 +248,22 @@ public void testFetchUpstreamData() { .name(MOCK_SELECTOR_NAME_OTHER) .handle("[{\"upstreamHost\":\"localhost\",\"protocol\":\"http://\",\"localhost\":\"divide-upstream-60\",\"weight\":60}]") .build(); + DiscoveryUpstreamData discoveryUpstreamData = DiscoveryUpstreamData.builder() + .dateCreated(new Timestamp(System.currentTimeMillis())) + .protocol("http") + .url("127.0.0.1:8080") + .props("{}") + .discoveryHandlerId("1") + .status(0) + .build(); when(pluginMapper.selectByNames(anyList())).thenReturn(Lists.newArrayList(pluginDO)); when(selectorMapper.findByPluginIds(anyList())).thenReturn(Lists.newArrayList(selectorDOWithUrlError, selectorDOWithUrlReachable)); + when(discoveryUpstreamService.findBySelectorId(anyString())).thenReturn(Lists.newArrayList(discoveryUpstreamData)); upstreamCheckService.fetchUpstreamData(); assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME)); + assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME).size()); assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME_OTHER)); + assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME_OTHER).size()); } @Test @@ -254,7 +271,8 @@ public void testClose() { Properties properties = new Properties(); properties.setProperty(Constants.IS_CHECKED, "true"); shenyuRegisterCenterConfig.setProps(properties); - upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, shenyuRegisterCenterConfig, converterFactor); + upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, + shenyuRegisterCenterConfig, converterFactor, discoveryUpstreamService); upstreamCheckService.close(); } From 25f6af6b6253171d5776ccde6434bbecc117667b Mon Sep 17 00:00:00 2001 From: hailang Date: Thu, 4 Jul 2024 09:59:49 +0800 Subject: [PATCH 2/4] fix grpc CI --- .../grpc/handler/GrpcDiscoveryUpstreamDataHandler.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java index 109fa86239ca..f9e7645d1ac2 100644 --- a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java +++ b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java @@ -23,6 +23,7 @@ import org.apache.shenyu.common.enums.PluginEnum; import org.apache.shenyu.plugin.base.handler.DiscoveryUpstreamDataHandler; import org.apache.shenyu.plugin.grpc.cache.ApplicationConfigCache; +import org.apache.shenyu.plugin.grpc.cache.GrpcClientCache; import org.springframework.util.ObjectUtils; import java.sql.Timestamp; @@ -42,7 +43,9 @@ public void handlerDiscoveryUpstreamData(final DiscoverySyncData discoverySyncDa if (Objects.isNull(discoverySyncData) || Objects.isNull(discoverySyncData.getSelectorId())) { return; } - ApplicationConfigCache.getInstance().handlerUpstream(discoverySyncData.getSelectorId(), convertUpstreamList(discoverySyncData.getUpstreamDataList())); + final String selectorId = discoverySyncData.getSelectorId(); + ApplicationConfigCache.getInstance().handlerUpstream(selectorId, convertUpstreamList(discoverySyncData.getUpstreamDataList())); + GrpcClientCache.initGrpcClient(selectorId); } private List convertUpstreamList(final List upstreamList) { From f49f0cebdc2ec690c66896ae7acee6e823a3ff13 Mon Sep 17 00:00:00 2001 From: hailang Date: Thu, 4 Jul 2024 15:06:03 +0800 Subject: [PATCH 3/4] try fix grpc e2e CI --- .../apache/shenyu/admin/service/impl/UpstreamCheckService.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java index 3c4009271be9..0ce89f1583a7 100644 --- a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java +++ b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java @@ -21,6 +21,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.shenyu.admin.listener.DataChangedEvent; @@ -302,7 +303,9 @@ private void checkZombie0(final ZombieUpstream zombieUpstream) { commonUpstream.setTimestamp(System.currentTimeMillis()); commonUpstream.setStatus(true); LOG.info("UpstreamCacheManager check zombie upstream success the url: {}, host: {} ", commonUpstream.getUpstreamUrl(), commonUpstream.getUpstreamHost()); + List old = ListUtils.unmodifiableList(UPSTREAM_MAP.getOrDefault(selectorId, Collections.emptyList())); this.submit(selectorId, commonUpstream); + updateHandler(selectorId, old, UPSTREAM_MAP.get(selectorId)); } else { LOG.error("check zombie upstream the url={} is fail", commonUpstream.getUpstreamUrl()); if (zombieUpstream.getZombieCheckTimes() > NumberUtils.INTEGER_ZERO) { From 0ff5f4f12cfb44ccb075dabd616874fbae2e6e1c Mon Sep 17 00:00:00 2001 From: hailang Date: Thu, 4 Jul 2024 16:33:37 +0800 Subject: [PATCH 4/4] rollback --- .../admin/service/impl/UpstreamCheckService.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java index 0ce89f1583a7..b8fe4aa20ca9 100644 --- a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java +++ b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java @@ -206,6 +206,15 @@ public void submit(final String selectorId, final CommonUpstream commonUpstream) return; } + Optional.ofNullable(submitJust(selectorId, commonUpstream)) + .ifPresent(upstreams -> executor.execute(() -> updateHandler(selectorId, upstreams, upstreams))); + } + + private List submitJust(final String selectorId, final CommonUpstream commonUpstream) { + if (!REGISTER_TYPE_HTTP.equalsIgnoreCase(registerType) || !checked) { + return null; + } + List upstreams = MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> new CopyOnWriteArrayList<>()); if (commonUpstream.isStatus()) { Optional exists = upstreams.stream().filter(item -> StringUtils.isNotBlank(item.getUpstreamUrl()) @@ -220,7 +229,7 @@ public void submit(final String selectorId, final CommonUpstream commonUpstream) upstreams.removeIf(item -> item.equals(commonUpstream)); PENDING_SYNC.add(NumberUtils.INTEGER_ZERO); } - executor.execute(() -> updateHandler(selectorId, upstreams, upstreams)); + return upstreams; } /** @@ -304,7 +313,8 @@ private void checkZombie0(final ZombieUpstream zombieUpstream) { commonUpstream.setStatus(true); LOG.info("UpstreamCacheManager check zombie upstream success the url: {}, host: {} ", commonUpstream.getUpstreamUrl(), commonUpstream.getUpstreamHost()); List old = ListUtils.unmodifiableList(UPSTREAM_MAP.getOrDefault(selectorId, Collections.emptyList())); - this.submit(selectorId, commonUpstream); + // fix https://github.com/apache/shenyu/issues/5311 + this.submitJust(selectorId, commonUpstream); updateHandler(selectorId, old, UPSTREAM_MAP.get(selectorId)); } else { LOG.error("check zombie upstream the url={} is fail", commonUpstream.getUpstreamUrl()); @@ -473,6 +483,7 @@ public void onSelectorUpdated(final SelectorUpdatedEvent event) { /** * get the zombie removal time value. + * * @return zombie removal time value */ public static int getZombieRemovalTimes() {