diff --git a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java index fdef9ef6e609..305f18fa026d 100644 --- a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java +++ b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java @@ -33,13 +33,17 @@ import org.apache.shenyu.admin.model.event.selector.SelectorCreatedEvent; import org.apache.shenyu.admin.model.event.selector.SelectorUpdatedEvent; import org.apache.shenyu.admin.model.query.SelectorConditionQuery; +import org.apache.shenyu.admin.service.DiscoveryUpstreamService; import org.apache.shenyu.admin.service.converter.SelectorHandleConverterFactor; import org.apache.shenyu.admin.transfer.ConditionTransfer; +import org.apache.shenyu.admin.transfer.DiscoveryTransfer; import org.apache.shenyu.admin.utils.CommonUpstreamUtils; import org.apache.shenyu.admin.utils.SelectorUtil; import org.apache.shenyu.common.concurrent.ShenyuThreadFactory; import org.apache.shenyu.common.constant.Constants; import org.apache.shenyu.common.dto.ConditionData; +import org.apache.shenyu.common.dto.DiscoverySyncData; +import org.apache.shenyu.common.dto.DiscoveryUpstreamData; import org.apache.shenyu.common.dto.SelectorData; import org.apache.shenyu.common.dto.convert.selector.CommonUpstream; import org.apache.shenyu.common.dto.convert.selector.DivideUpstream; @@ -59,6 +63,7 @@ import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -111,6 +116,8 @@ public class UpstreamCheckService { private final SelectorHandleConverterFactor converterFactor; + private final DiscoveryUpstreamService discoveryUpstreamService; + private ScheduledThreadPoolExecutor executor; private ScheduledFuture scheduledFuture; @@ -134,12 +141,14 @@ public UpstreamCheckService(final SelectorMapper selectorMapper, final PluginMapper pluginMapper, final SelectorConditionMapper selectorConditionMapper, final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, - final SelectorHandleConverterFactor converterFactor) { + final SelectorHandleConverterFactor converterFactor, + final DiscoveryUpstreamService discoveryUpstreamService) { this.selectorMapper = selectorMapper; this.eventPublisher = eventPublisher; this.pluginMapper = pluginMapper; this.selectorConditionMapper = selectorConditionMapper; this.converterFactor = converterFactor; + this.discoveryUpstreamService = discoveryUpstreamService; Properties props = shenyuRegisterCenterConfig.getProps(); this.checked = Boolean.parseBoolean(props.getProperty(Constants.IS_CHECKED, Constants.DEFAULT_CHECK_VALUE)); this.scheduledThreads = Integer.parseInt(props.getProperty(Constants.ZOMBIE_CHECK_THREADS, Constants.ZOMBIE_CHECK_THREADS_VALUE)); @@ -199,6 +208,15 @@ public void submit(final String selectorId, final CommonUpstream commonUpstream) return; } + Optional.ofNullable(submitJust(selectorId, commonUpstream)) + .ifPresent(upstreams -> executor.execute(() -> updateHandler(selectorId, upstreams, upstreams))); + } + + private List submitJust(final String selectorId, final CommonUpstream commonUpstream) { + if (!REGISTER_TYPE_HTTP.equalsIgnoreCase(registerType) || !checked) { + return null; + } + List upstreams = MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> new CopyOnWriteArrayList<>()); if (commonUpstream.isStatus()) { Optional exists = upstreams.stream().filter(item -> StringUtils.isNotBlank(item.getUpstreamUrl()) @@ -213,7 +231,7 @@ public void submit(final String selectorId, final CommonUpstream commonUpstream) upstreams.removeIf(item -> item.equals(commonUpstream)); PENDING_SYNC.add(NumberUtils.INTEGER_ZERO); } - executor.execute(() -> updateHandler(selectorId, upstreams, upstreams)); + return upstreams; } /** @@ -297,7 +315,8 @@ private void checkZombie0(final ZombieUpstream zombieUpstream) { commonUpstream.setStatus(true); LOG.info("UpstreamCacheManager check zombie upstream success the url: {}, host: {} ", commonUpstream.getUpstreamUrl(), commonUpstream.getUpstreamHost()); List old = ListUtils.unmodifiableList(UPSTREAM_MAP.getOrDefault(selectorId, Collections.emptyList())); - this.submit(selectorId, commonUpstream); + // fix https://github.com/apache/shenyu/issues/5311 + this.submitJust(selectorId, commonUpstream); updateHandler(selectorId, old, UPSTREAM_MAP.get(selectorId)); } else { LOG.error("check zombie upstream the url={} is fail", commonUpstream.getUpstreamUrl()); @@ -369,18 +388,38 @@ private void updateSelectorHandler(final String selectorId, final List conditionDataList = ConditionTransfer.INSTANCE.mapToSelectorDOS( selectorConditionMapper.selectByQuery(new SelectorConditionQuery(selectorDO.getId()))); - SelectorData selectorData = SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList); + SelectorData selectorData = SelectorDO.transFrom(selectorDO, pluginName, conditionDataList); selectorData.setHandle(handler); // publish change event. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData))); + // publish discovery change event. + List discoveryUpstreamDataList = discoveryUpstreamService.findBySelectorId(selectorId); + discoveryUpstreamDataList.removeIf(u -> { + for (CommonUpstream alive : aliveList) { + if (alive.getUpstreamUrl().equals(u.getUrl())) { + return false; + } + } + return true; + }); + DiscoverySyncData discoverySyncData = new DiscoverySyncData(); + discoverySyncData.setUpstreamDataList(discoveryUpstreamDataList); + discoverySyncData.setPluginName(pluginName); + discoverySyncData.setSelectorId(selectorId); + discoverySyncData.setSelectorName(selectorDO.getName()); + eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.DISCOVER_UPSTREAM, DataEventTypeEnum.UPDATE, Collections.singletonList(discoverySyncData))); } /** @@ -396,12 +435,19 @@ public void fetchUpstreamData() { final List selectorDOList = selectorMapper.findByPluginIds(new ArrayList<>(pluginMap.keySet())); long currentTimeMillis = System.currentTimeMillis(); Optional.ofNullable(selectorDOList).orElseGet(ArrayList::new).stream() - .filter(selectorDO -> Objects.nonNull(selectorDO) && StringUtils.isNotEmpty(selectorDO.getHandle())) + .filter(Objects::nonNull) .forEach(selectorDO -> { String name = pluginMap.get(selectorDO.getPluginId()); - List commonUpstreams = converterFactor.newInstance(name).convertUpstream(selectorDO.getHandle()) - .stream().filter(upstream -> upstream.isStatus() || upstream.getTimestamp() > currentTimeMillis - TimeUnit.SECONDS.toMillis(zombieRemovalTimes)) - .collect(Collectors.toList()); + List commonUpstreams = new LinkedList<>(); + discoveryUpstreamService.findBySelectorId(selectorDO.getId()).stream() + .map(DiscoveryTransfer.INSTANCE::mapToCommonUpstream) + .forEach(commonUpstreams::add); + String handle = selectorDO.getHandle(); + if (StringUtils.isNotEmpty(handle)) { + commonUpstreams.addAll(converterFactor.newInstance(name).convertUpstream(handle) + .stream().filter(upstream -> upstream.isStatus() || upstream.getTimestamp() > currentTimeMillis - TimeUnit.SECONDS.toMillis(zombieRemovalTimes)) + .collect(Collectors.toList())); + } if (CollectionUtils.isNotEmpty(commonUpstreams)) { UPSTREAM_MAP.put(selectorDO.getId(), commonUpstreams); PENDING_SYNC.add(NumberUtils.INTEGER_ZERO); @@ -439,6 +485,7 @@ public void onSelectorUpdated(final SelectorUpdatedEvent event) { /** * get the zombie removal time value. + * * @return zombie removal time value */ public static int getZombieRemovalTimes() { diff --git a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java index 4a40cb0df3e0..bd60b695188b 100644 --- a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java +++ b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java @@ -33,6 +33,7 @@ import org.apache.shenyu.admin.model.vo.DiscoveryVO; import org.apache.shenyu.common.dto.DiscoveryUpstreamData; import org.apache.shenyu.common.dto.ProxySelectorData; +import org.apache.shenyu.common.dto.convert.selector.CommonUpstream; import org.apache.shenyu.common.utils.GsonUtils; import java.util.Optional; @@ -65,6 +66,19 @@ public DiscoveryUpstreamDO mapToDo(DiscoveryUpstreamData discoveryUpstreamData) .dateUpdated(data.getDateUpdated()) .dateCreated(data.getDateCreated()).build()).orElse(null); } + + /** + * mapToCommonUpstream. + * + * @param discoveryUpstreamData discoveryUpstreamData + * @return CommonUpstream + */ + public CommonUpstream mapToCommonUpstream(DiscoveryUpstreamData discoveryUpstreamData) { + return Optional.ofNullable(discoveryUpstreamData).map(data -> { + String url = data.getUrl(); + return new CommonUpstream(data.getProtocol(), url.split(":")[0], url, false, data.getDateCreated().getTime()); + }).orElse(null); + } /** * mapToVo. diff --git a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java index fcd8a64df1fb..01bb6f04afeb 100644 --- a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java +++ b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java @@ -29,6 +29,7 @@ import org.apache.shenyu.admin.service.impl.UpstreamCheckService; import org.apache.shenyu.common.concurrent.ShenyuThreadFactory; import org.apache.shenyu.common.constant.Constants; +import org.apache.shenyu.common.dto.DiscoveryUpstreamData; import org.apache.shenyu.common.dto.convert.selector.DivideUpstream; import org.apache.shenyu.common.dto.convert.selector.ZombieUpstream; import org.apache.shenyu.common.enums.PluginEnum; @@ -50,6 +51,7 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.test.util.ReflectionTestUtils; +import java.sql.Timestamp; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -103,6 +105,9 @@ public final class UpstreamCheckServiceTest { private SelectorConditionMapper selectorConditionMapper; private SelectorHandleConverterFactor converterFactor; + + @Mock + private DiscoveryUpstreamService discoveryUpstreamService; private final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig = new ShenyuRegisterCenterConfig(); @@ -133,7 +138,8 @@ public void setUp() { Map maps = new HashMap<>(); maps.put(PluginEnum.DIVIDE.getName(), new DivideSelectorHandleConverter()); converterFactor = new SelectorHandleConverterFactor(maps); - upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, shenyuRegisterCenterConfig, converterFactor); + upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, + shenyuRegisterCenterConfig, converterFactor, discoveryUpstreamService); } @Test @@ -242,11 +248,22 @@ public void testFetchUpstreamData() { .name(MOCK_SELECTOR_NAME_OTHER) .handle("[{\"upstreamHost\":\"localhost\",\"protocol\":\"http://\",\"localhost\":\"divide-upstream-60\",\"weight\":60}]") .build(); + DiscoveryUpstreamData discoveryUpstreamData = DiscoveryUpstreamData.builder() + .dateCreated(new Timestamp(System.currentTimeMillis())) + .protocol("http") + .url("127.0.0.1:8080") + .props("{}") + .discoveryHandlerId("1") + .status(0) + .build(); when(pluginMapper.selectByNames(anyList())).thenReturn(Lists.newArrayList(pluginDO)); when(selectorMapper.findByPluginIds(anyList())).thenReturn(Lists.newArrayList(selectorDOWithUrlError, selectorDOWithUrlReachable)); + when(discoveryUpstreamService.findBySelectorId(anyString())).thenReturn(Lists.newArrayList(discoveryUpstreamData)); upstreamCheckService.fetchUpstreamData(); assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME)); + assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME).size()); assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME_OTHER)); + assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME_OTHER).size()); } @Test @@ -254,7 +271,8 @@ public void testClose() { Properties properties = new Properties(); properties.setProperty(Constants.IS_CHECKED, "true"); shenyuRegisterCenterConfig.setProps(properties); - upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, shenyuRegisterCenterConfig, converterFactor); + upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, + shenyuRegisterCenterConfig, converterFactor, discoveryUpstreamService); upstreamCheckService.close(); } diff --git a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java index 109fa86239ca..f9e7645d1ac2 100644 --- a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java +++ b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java @@ -23,6 +23,7 @@ import org.apache.shenyu.common.enums.PluginEnum; import org.apache.shenyu.plugin.base.handler.DiscoveryUpstreamDataHandler; import org.apache.shenyu.plugin.grpc.cache.ApplicationConfigCache; +import org.apache.shenyu.plugin.grpc.cache.GrpcClientCache; import org.springframework.util.ObjectUtils; import java.sql.Timestamp; @@ -42,7 +43,9 @@ public void handlerDiscoveryUpstreamData(final DiscoverySyncData discoverySyncDa if (Objects.isNull(discoverySyncData) || Objects.isNull(discoverySyncData.getSelectorId())) { return; } - ApplicationConfigCache.getInstance().handlerUpstream(discoverySyncData.getSelectorId(), convertUpstreamList(discoverySyncData.getUpstreamDataList())); + final String selectorId = discoverySyncData.getSelectorId(); + ApplicationConfigCache.getInstance().handlerUpstream(selectorId, convertUpstreamList(discoverySyncData.getUpstreamDataList())); + GrpcClientCache.initGrpcClient(selectorId); } private List convertUpstreamList(final List upstreamList) {