Skip to content

Commit

Permalink
[ISSUE #5311] less concurrency (#5587)
Browse files Browse the repository at this point in the history
* [ISSUE #5311] less concurrency

* fix grpc CI

* try fix grpc e2e CI

* rollback

---------

Co-authored-by: moremind <[email protected]>
  • Loading branch information
loongs-zhang and moremind committed Jul 6, 2024
1 parent 51cc917 commit 3a69b0b
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
import org.apache.shenyu.admin.model.event.selector.SelectorCreatedEvent;
import org.apache.shenyu.admin.model.event.selector.SelectorUpdatedEvent;
import org.apache.shenyu.admin.model.query.SelectorConditionQuery;
import org.apache.shenyu.admin.service.DiscoveryUpstreamService;
import org.apache.shenyu.admin.service.converter.SelectorHandleConverterFactor;
import org.apache.shenyu.admin.transfer.ConditionTransfer;
import org.apache.shenyu.admin.transfer.DiscoveryTransfer;
import org.apache.shenyu.admin.utils.CommonUpstreamUtils;
import org.apache.shenyu.admin.utils.SelectorUtil;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.dto.ConditionData;
import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.dto.convert.selector.CommonUpstream;
import org.apache.shenyu.common.dto.convert.selector.DivideUpstream;
Expand All @@ -59,6 +63,7 @@
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -111,6 +116,8 @@ public class UpstreamCheckService {

private final SelectorHandleConverterFactor converterFactor;

private final DiscoveryUpstreamService discoveryUpstreamService;

private ScheduledThreadPoolExecutor executor;

private ScheduledFuture<?> scheduledFuture;
Expand All @@ -134,12 +141,14 @@ public UpstreamCheckService(final SelectorMapper selectorMapper,
final PluginMapper pluginMapper,
final SelectorConditionMapper selectorConditionMapper,
final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig,
final SelectorHandleConverterFactor converterFactor) {
final SelectorHandleConverterFactor converterFactor,
final DiscoveryUpstreamService discoveryUpstreamService) {
this.selectorMapper = selectorMapper;
this.eventPublisher = eventPublisher;
this.pluginMapper = pluginMapper;
this.selectorConditionMapper = selectorConditionMapper;
this.converterFactor = converterFactor;
this.discoveryUpstreamService = discoveryUpstreamService;
Properties props = shenyuRegisterCenterConfig.getProps();
this.checked = Boolean.parseBoolean(props.getProperty(Constants.IS_CHECKED, Constants.DEFAULT_CHECK_VALUE));
this.scheduledThreads = Integer.parseInt(props.getProperty(Constants.ZOMBIE_CHECK_THREADS, Constants.ZOMBIE_CHECK_THREADS_VALUE));
Expand Down Expand Up @@ -199,6 +208,15 @@ public void submit(final String selectorId, final CommonUpstream commonUpstream)
return;
}

Optional.ofNullable(submitJust(selectorId, commonUpstream))
.ifPresent(upstreams -> executor.execute(() -> updateHandler(selectorId, upstreams, upstreams)));
}

private List<CommonUpstream> submitJust(final String selectorId, final CommonUpstream commonUpstream) {
if (!REGISTER_TYPE_HTTP.equalsIgnoreCase(registerType) || !checked) {
return null;
}

List<CommonUpstream> upstreams = MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> new CopyOnWriteArrayList<>());
if (commonUpstream.isStatus()) {
Optional<CommonUpstream> exists = upstreams.stream().filter(item -> StringUtils.isNotBlank(item.getUpstreamUrl())
Expand All @@ -213,7 +231,7 @@ public void submit(final String selectorId, final CommonUpstream commonUpstream)
upstreams.removeIf(item -> item.equals(commonUpstream));
PENDING_SYNC.add(NumberUtils.INTEGER_ZERO);
}
executor.execute(() -> updateHandler(selectorId, upstreams, upstreams));
return upstreams;
}

/**
Expand Down Expand Up @@ -297,7 +315,8 @@ private void checkZombie0(final ZombieUpstream zombieUpstream) {
commonUpstream.setStatus(true);
LOG.info("UpstreamCacheManager check zombie upstream success the url: {}, host: {} ", commonUpstream.getUpstreamUrl(), commonUpstream.getUpstreamHost());
List<CommonUpstream> old = ListUtils.unmodifiableList(UPSTREAM_MAP.getOrDefault(selectorId, Collections.emptyList()));
this.submit(selectorId, commonUpstream);
// fix https://github.com/apache/shenyu/issues/5311
this.submitJust(selectorId, commonUpstream);
updateHandler(selectorId, old, UPSTREAM_MAP.get(selectorId));
} else {
LOG.error("check zombie upstream the url={} is fail", commonUpstream.getUpstreamUrl());
Expand Down Expand Up @@ -369,18 +388,38 @@ private void updateSelectorHandler(final String selectorId, final List<CommonUps
}

PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
String handler = converterFactor.newInstance(pluginDO.getName()).handler(selectorDO.getHandle(), aliveList);
if (Objects.isNull(pluginDO)) {
return;
}
String pluginName = pluginDO.getName();
String handler = converterFactor.newInstance(pluginName).handler(selectorDO.getHandle(), aliveList);
selectorDO.setHandle(handler);
selectorMapper.updateSelective(selectorDO);

List<ConditionData> conditionDataList = ConditionTransfer.INSTANCE.mapToSelectorDOS(
selectorConditionMapper.selectByQuery(new SelectorConditionQuery(selectorDO.getId())));
SelectorData selectorData = SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList);
SelectorData selectorData = SelectorDO.transFrom(selectorDO, pluginName, conditionDataList);
selectorData.setHandle(handler);

// publish change event.
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));

// publish discovery change event.
List<DiscoveryUpstreamData> discoveryUpstreamDataList = discoveryUpstreamService.findBySelectorId(selectorId);
discoveryUpstreamDataList.removeIf(u -> {
for (CommonUpstream alive : aliveList) {
if (alive.getUpstreamUrl().equals(u.getUrl())) {
return false;
}
}
return true;
});
DiscoverySyncData discoverySyncData = new DiscoverySyncData();
discoverySyncData.setUpstreamDataList(discoveryUpstreamDataList);
discoverySyncData.setPluginName(pluginName);
discoverySyncData.setSelectorId(selectorId);
discoverySyncData.setSelectorName(selectorDO.getName());
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.DISCOVER_UPSTREAM, DataEventTypeEnum.UPDATE, Collections.singletonList(discoverySyncData)));
}

/**
Expand All @@ -396,12 +435,19 @@ public void fetchUpstreamData() {
final List<SelectorDO> selectorDOList = selectorMapper.findByPluginIds(new ArrayList<>(pluginMap.keySet()));
long currentTimeMillis = System.currentTimeMillis();
Optional.ofNullable(selectorDOList).orElseGet(ArrayList::new).stream()
.filter(selectorDO -> Objects.nonNull(selectorDO) && StringUtils.isNotEmpty(selectorDO.getHandle()))
.filter(Objects::nonNull)
.forEach(selectorDO -> {
String name = pluginMap.get(selectorDO.getPluginId());
List<CommonUpstream> commonUpstreams = converterFactor.newInstance(name).convertUpstream(selectorDO.getHandle())
.stream().filter(upstream -> upstream.isStatus() || upstream.getTimestamp() > currentTimeMillis - TimeUnit.SECONDS.toMillis(zombieRemovalTimes))
.collect(Collectors.toList());
List<CommonUpstream> commonUpstreams = new LinkedList<>();
discoveryUpstreamService.findBySelectorId(selectorDO.getId()).stream()
.map(DiscoveryTransfer.INSTANCE::mapToCommonUpstream)
.forEach(commonUpstreams::add);
String handle = selectorDO.getHandle();
if (StringUtils.isNotEmpty(handle)) {
commonUpstreams.addAll(converterFactor.newInstance(name).convertUpstream(handle)
.stream().filter(upstream -> upstream.isStatus() || upstream.getTimestamp() > currentTimeMillis - TimeUnit.SECONDS.toMillis(zombieRemovalTimes))
.collect(Collectors.toList()));
}
if (CollectionUtils.isNotEmpty(commonUpstreams)) {
UPSTREAM_MAP.put(selectorDO.getId(), commonUpstreams);
PENDING_SYNC.add(NumberUtils.INTEGER_ZERO);
Expand Down Expand Up @@ -439,6 +485,7 @@ public void onSelectorUpdated(final SelectorUpdatedEvent event) {

/**
* get the zombie removal time value.
*
* @return zombie removal time value
*/
public static int getZombieRemovalTimes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.shenyu.admin.model.vo.DiscoveryVO;
import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.common.dto.ProxySelectorData;
import org.apache.shenyu.common.dto.convert.selector.CommonUpstream;
import org.apache.shenyu.common.utils.GsonUtils;

import java.util.Optional;
Expand Down Expand Up @@ -65,6 +66,19 @@ public DiscoveryUpstreamDO mapToDo(DiscoveryUpstreamData discoveryUpstreamData)
.dateUpdated(data.getDateUpdated())
.dateCreated(data.getDateCreated()).build()).orElse(null);
}

/**
* mapToCommonUpstream.
*
* @param discoveryUpstreamData discoveryUpstreamData
* @return CommonUpstream
*/
public CommonUpstream mapToCommonUpstream(DiscoveryUpstreamData discoveryUpstreamData) {
return Optional.ofNullable(discoveryUpstreamData).map(data -> {
String url = data.getUrl();
return new CommonUpstream(data.getProtocol(), url.split(":")[0], url, false, data.getDateCreated().getTime());
}).orElse(null);
}

/**
* mapToVo.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.shenyu.admin.service.impl.UpstreamCheckService;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.common.dto.convert.selector.DivideUpstream;
import org.apache.shenyu.common.dto.convert.selector.ZombieUpstream;
import org.apache.shenyu.common.enums.PluginEnum;
Expand All @@ -50,6 +51,7 @@
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.test.util.ReflectionTestUtils;

import java.sql.Timestamp;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -103,6 +105,9 @@ public final class UpstreamCheckServiceTest {
private SelectorConditionMapper selectorConditionMapper;

private SelectorHandleConverterFactor converterFactor;

@Mock
private DiscoveryUpstreamService discoveryUpstreamService;

private final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig = new ShenyuRegisterCenterConfig();

Expand Down Expand Up @@ -133,7 +138,8 @@ public void setUp() {
Map<String, SelectorHandleConverter> maps = new HashMap<>();
maps.put(PluginEnum.DIVIDE.getName(), new DivideSelectorHandleConverter());
converterFactor = new SelectorHandleConverterFactor(maps);
upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, shenyuRegisterCenterConfig, converterFactor);
upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper,
shenyuRegisterCenterConfig, converterFactor, discoveryUpstreamService);
}

@Test
Expand Down Expand Up @@ -242,19 +248,31 @@ public void testFetchUpstreamData() {
.name(MOCK_SELECTOR_NAME_OTHER)
.handle("[{\"upstreamHost\":\"localhost\",\"protocol\":\"http://\",\"localhost\":\"divide-upstream-60\",\"weight\":60}]")
.build();
DiscoveryUpstreamData discoveryUpstreamData = DiscoveryUpstreamData.builder()
.dateCreated(new Timestamp(System.currentTimeMillis()))
.protocol("http")
.url("127.0.0.1:8080")
.props("{}")
.discoveryHandlerId("1")
.status(0)
.build();
when(pluginMapper.selectByNames(anyList())).thenReturn(Lists.newArrayList(pluginDO));
when(selectorMapper.findByPluginIds(anyList())).thenReturn(Lists.newArrayList(selectorDOWithUrlError, selectorDOWithUrlReachable));
when(discoveryUpstreamService.findBySelectorId(anyString())).thenReturn(Lists.newArrayList(discoveryUpstreamData));
upstreamCheckService.fetchUpstreamData();
assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME));
assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME).size());
assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME_OTHER));
assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME_OTHER).size());
}

@Test
public void testClose() {
Properties properties = new Properties();
properties.setProperty(Constants.IS_CHECKED, "true");
shenyuRegisterCenterConfig.setProps(properties);
upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, shenyuRegisterCenterConfig, converterFactor);
upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper,
shenyuRegisterCenterConfig, converterFactor, discoveryUpstreamService);
upstreamCheckService.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.shenyu.common.enums.PluginEnum;
import org.apache.shenyu.plugin.base.handler.DiscoveryUpstreamDataHandler;
import org.apache.shenyu.plugin.grpc.cache.ApplicationConfigCache;
import org.apache.shenyu.plugin.grpc.cache.GrpcClientCache;
import org.springframework.util.ObjectUtils;

import java.sql.Timestamp;
Expand All @@ -42,7 +43,9 @@ public void handlerDiscoveryUpstreamData(final DiscoverySyncData discoverySyncDa
if (Objects.isNull(discoverySyncData) || Objects.isNull(discoverySyncData.getSelectorId())) {
return;
}
ApplicationConfigCache.getInstance().handlerUpstream(discoverySyncData.getSelectorId(), convertUpstreamList(discoverySyncData.getUpstreamDataList()));
final String selectorId = discoverySyncData.getSelectorId();
ApplicationConfigCache.getInstance().handlerUpstream(selectorId, convertUpstreamList(discoverySyncData.getUpstreamDataList()));
GrpcClientCache.initGrpcClient(selectorId);
}

private List<GrpcUpstream> convertUpstreamList(final List<DiscoveryUpstreamData> upstreamList) {
Expand Down

0 comments on commit 3a69b0b

Please sign in to comment.