From 5e17caf41b3e2ed37bc630ff9a6e693f02c378fd Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Wed, 3 Jul 2024 20:12:02 +0800 Subject: [PATCH] [ISSUE #5311] less concurrency --- .../service/impl/UpstreamCheckService.java | 53 +++++++++++++++---- .../admin/transfer/DiscoveryTransfer.java | 14 +++++ .../service/UpstreamCheckServiceTest.java | 22 +++++++- 3 files changed, 77 insertions(+), 12 deletions(-) diff --git a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java index 142cec04bb06..3c4009271be9 100644 --- a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java +++ b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java @@ -21,7 +21,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.shenyu.admin.listener.DataChangedEvent; @@ -33,13 +32,17 @@ import org.apache.shenyu.admin.model.event.selector.SelectorCreatedEvent; import org.apache.shenyu.admin.model.event.selector.SelectorUpdatedEvent; import org.apache.shenyu.admin.model.query.SelectorConditionQuery; +import org.apache.shenyu.admin.service.DiscoveryUpstreamService; import org.apache.shenyu.admin.service.converter.SelectorHandleConverterFactor; import org.apache.shenyu.admin.transfer.ConditionTransfer; +import org.apache.shenyu.admin.transfer.DiscoveryTransfer; import org.apache.shenyu.admin.utils.CommonUpstreamUtils; import org.apache.shenyu.admin.utils.SelectorUtil; import org.apache.shenyu.common.concurrent.ShenyuThreadFactory; import org.apache.shenyu.common.constant.Constants; import org.apache.shenyu.common.dto.ConditionData; +import org.apache.shenyu.common.dto.DiscoverySyncData; +import org.apache.shenyu.common.dto.DiscoveryUpstreamData; import org.apache.shenyu.common.dto.SelectorData; import org.apache.shenyu.common.dto.convert.selector.CommonUpstream; import org.apache.shenyu.common.dto.convert.selector.DivideUpstream; @@ -59,6 +62,7 @@ import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -111,6 +115,8 @@ public class UpstreamCheckService { private final SelectorHandleConverterFactor converterFactor; + private final DiscoveryUpstreamService discoveryUpstreamService; + private ScheduledThreadPoolExecutor executor; private ScheduledFuture scheduledFuture; @@ -134,12 +140,14 @@ public UpstreamCheckService(final SelectorMapper selectorMapper, final PluginMapper pluginMapper, final SelectorConditionMapper selectorConditionMapper, final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, - final SelectorHandleConverterFactor converterFactor) { + final SelectorHandleConverterFactor converterFactor, + final DiscoveryUpstreamService discoveryUpstreamService) { this.selectorMapper = selectorMapper; this.eventPublisher = eventPublisher; this.pluginMapper = pluginMapper; this.selectorConditionMapper = selectorConditionMapper; this.converterFactor = converterFactor; + this.discoveryUpstreamService = discoveryUpstreamService; Properties props = shenyuRegisterCenterConfig.getProps(); this.checked = Boolean.parseBoolean(props.getProperty(Constants.IS_CHECKED, Constants.DEFAULT_CHECK_VALUE)); this.scheduledThreads = Integer.parseInt(props.getProperty(Constants.ZOMBIE_CHECK_THREADS, Constants.ZOMBIE_CHECK_THREADS_VALUE)); @@ -294,9 +302,7 @@ private void checkZombie0(final ZombieUpstream zombieUpstream) { commonUpstream.setTimestamp(System.currentTimeMillis()); commonUpstream.setStatus(true); LOG.info("UpstreamCacheManager check zombie upstream success the url: {}, host: {} ", commonUpstream.getUpstreamUrl(), commonUpstream.getUpstreamHost()); - List old = ListUtils.unmodifiableList(UPSTREAM_MAP.getOrDefault(selectorId, Collections.emptyList())); this.submit(selectorId, commonUpstream); - updateHandler(selectorId, old, UPSTREAM_MAP.get(selectorId)); } else { LOG.error("check zombie upstream the url={} is fail", commonUpstream.getUpstreamUrl()); if (zombieUpstream.getZombieCheckTimes() > NumberUtils.INTEGER_ZERO) { @@ -367,18 +373,38 @@ private void updateSelectorHandler(final String selectorId, final List conditionDataList = ConditionTransfer.INSTANCE.mapToSelectorDOS( selectorConditionMapper.selectByQuery(new SelectorConditionQuery(selectorDO.getId()))); - SelectorData selectorData = SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList); + SelectorData selectorData = SelectorDO.transFrom(selectorDO, pluginName, conditionDataList); selectorData.setHandle(handler); // publish change event. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData))); + // publish discovery change event. + List discoveryUpstreamDataList = discoveryUpstreamService.findBySelectorId(selectorId); + discoveryUpstreamDataList.removeIf(u -> { + for (CommonUpstream alive : aliveList) { + if (alive.getUpstreamUrl().equals(u.getUrl())) { + return false; + } + } + return true; + }); + DiscoverySyncData discoverySyncData = new DiscoverySyncData(); + discoverySyncData.setUpstreamDataList(discoveryUpstreamDataList); + discoverySyncData.setPluginName(pluginName); + discoverySyncData.setSelectorId(selectorId); + discoverySyncData.setSelectorName(selectorDO.getName()); + eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.DISCOVER_UPSTREAM, DataEventTypeEnum.UPDATE, Collections.singletonList(discoverySyncData))); } /** @@ -394,12 +420,19 @@ public void fetchUpstreamData() { final List selectorDOList = selectorMapper.findByPluginIds(new ArrayList<>(pluginMap.keySet())); long currentTimeMillis = System.currentTimeMillis(); Optional.ofNullable(selectorDOList).orElseGet(ArrayList::new).stream() - .filter(selectorDO -> Objects.nonNull(selectorDO) && StringUtils.isNotEmpty(selectorDO.getHandle())) + .filter(Objects::nonNull) .forEach(selectorDO -> { String name = pluginMap.get(selectorDO.getPluginId()); - List commonUpstreams = converterFactor.newInstance(name).convertUpstream(selectorDO.getHandle()) - .stream().filter(upstream -> upstream.isStatus() || upstream.getTimestamp() > currentTimeMillis - TimeUnit.SECONDS.toMillis(zombieRemovalTimes)) - .collect(Collectors.toList()); + List commonUpstreams = new LinkedList<>(); + discoveryUpstreamService.findBySelectorId(selectorDO.getId()).stream() + .map(DiscoveryTransfer.INSTANCE::mapToCommonUpstream) + .forEach(commonUpstreams::add); + String handle = selectorDO.getHandle(); + if (StringUtils.isNotEmpty(handle)) { + commonUpstreams.addAll(converterFactor.newInstance(name).convertUpstream(handle) + .stream().filter(upstream -> upstream.isStatus() || upstream.getTimestamp() > currentTimeMillis - TimeUnit.SECONDS.toMillis(zombieRemovalTimes)) + .collect(Collectors.toList())); + } if (CollectionUtils.isNotEmpty(commonUpstreams)) { UPSTREAM_MAP.put(selectorDO.getId(), commonUpstreams); PENDING_SYNC.add(NumberUtils.INTEGER_ZERO); diff --git a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java index 4a40cb0df3e0..bd60b695188b 100644 --- a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java +++ b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java @@ -33,6 +33,7 @@ import org.apache.shenyu.admin.model.vo.DiscoveryVO; import org.apache.shenyu.common.dto.DiscoveryUpstreamData; import org.apache.shenyu.common.dto.ProxySelectorData; +import org.apache.shenyu.common.dto.convert.selector.CommonUpstream; import org.apache.shenyu.common.utils.GsonUtils; import java.util.Optional; @@ -65,6 +66,19 @@ public DiscoveryUpstreamDO mapToDo(DiscoveryUpstreamData discoveryUpstreamData) .dateUpdated(data.getDateUpdated()) .dateCreated(data.getDateCreated()).build()).orElse(null); } + + /** + * mapToCommonUpstream. + * + * @param discoveryUpstreamData discoveryUpstreamData + * @return CommonUpstream + */ + public CommonUpstream mapToCommonUpstream(DiscoveryUpstreamData discoveryUpstreamData) { + return Optional.ofNullable(discoveryUpstreamData).map(data -> { + String url = data.getUrl(); + return new CommonUpstream(data.getProtocol(), url.split(":")[0], url, false, data.getDateCreated().getTime()); + }).orElse(null); + } /** * mapToVo. diff --git a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java index fcd8a64df1fb..01bb6f04afeb 100644 --- a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java +++ b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java @@ -29,6 +29,7 @@ import org.apache.shenyu.admin.service.impl.UpstreamCheckService; import org.apache.shenyu.common.concurrent.ShenyuThreadFactory; import org.apache.shenyu.common.constant.Constants; +import org.apache.shenyu.common.dto.DiscoveryUpstreamData; import org.apache.shenyu.common.dto.convert.selector.DivideUpstream; import org.apache.shenyu.common.dto.convert.selector.ZombieUpstream; import org.apache.shenyu.common.enums.PluginEnum; @@ -50,6 +51,7 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.test.util.ReflectionTestUtils; +import java.sql.Timestamp; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -103,6 +105,9 @@ public final class UpstreamCheckServiceTest { private SelectorConditionMapper selectorConditionMapper; private SelectorHandleConverterFactor converterFactor; + + @Mock + private DiscoveryUpstreamService discoveryUpstreamService; private final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig = new ShenyuRegisterCenterConfig(); @@ -133,7 +138,8 @@ public void setUp() { Map maps = new HashMap<>(); maps.put(PluginEnum.DIVIDE.getName(), new DivideSelectorHandleConverter()); converterFactor = new SelectorHandleConverterFactor(maps); - upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, shenyuRegisterCenterConfig, converterFactor); + upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, + shenyuRegisterCenterConfig, converterFactor, discoveryUpstreamService); } @Test @@ -242,11 +248,22 @@ public void testFetchUpstreamData() { .name(MOCK_SELECTOR_NAME_OTHER) .handle("[{\"upstreamHost\":\"localhost\",\"protocol\":\"http://\",\"localhost\":\"divide-upstream-60\",\"weight\":60}]") .build(); + DiscoveryUpstreamData discoveryUpstreamData = DiscoveryUpstreamData.builder() + .dateCreated(new Timestamp(System.currentTimeMillis())) + .protocol("http") + .url("127.0.0.1:8080") + .props("{}") + .discoveryHandlerId("1") + .status(0) + .build(); when(pluginMapper.selectByNames(anyList())).thenReturn(Lists.newArrayList(pluginDO)); when(selectorMapper.findByPluginIds(anyList())).thenReturn(Lists.newArrayList(selectorDOWithUrlError, selectorDOWithUrlReachable)); + when(discoveryUpstreamService.findBySelectorId(anyString())).thenReturn(Lists.newArrayList(discoveryUpstreamData)); upstreamCheckService.fetchUpstreamData(); assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME)); + assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME).size()); assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME_OTHER)); + assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME_OTHER).size()); } @Test @@ -254,7 +271,8 @@ public void testClose() { Properties properties = new Properties(); properties.setProperty(Constants.IS_CHECKED, "true"); shenyuRegisterCenterConfig.setProps(properties); - upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, shenyuRegisterCenterConfig, converterFactor); + upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, + shenyuRegisterCenterConfig, converterFactor, discoveryUpstreamService); upstreamCheckService.close(); }