From a9846666449fb24bbd627e982edd62c3bcfcd49e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=9F=E6=9D=91?= Date: Mon, 24 Apr 2023 16:31:18 +0800 Subject: [PATCH] 1. decoupling multi datacenter data change merger delay times config 2. push_delay bucket 30->10 3. update version --- VERSION | 2 +- client/all/pom.xml | 2 +- client/api/pom.xml | 2 +- client/impl/pom.xml | 2 +- client/log/pom.xml | 2 +- client/pom.xml | 2 +- core/pom.xml | 2 +- pom.xml | 2 +- server/common/model/pom.xml | 2 +- .../model/constants/ValueConstants.java | 6 + .../metaserver/DataChangeMergeConfig.java | 35 +++++ server/common/pom.xml | 2 +- server/common/util/pom.xml | 2 +- server/distribution/all/pom.xml | 2 +- server/distribution/pom.xml | 2 +- server/pom.xml | 2 +- server/remoting/api/pom.xml | 2 +- server/remoting/bolt/pom.xml | 2 +- server/remoting/http/pom.xml | 2 +- server/remoting/pom.xml | 2 +- server/server/data/pom.xml | 2 +- .../data/change/DataChangeEventCenter.java | 121 ++++++++++++++---- .../FetchDataChangeMergerConfigService.java | 115 +++++++++++++++++ .../change/DataChangeEventCenterTest.java | 22 ++-- ...etchDataChangeMergerConfigServiceTest.java | 59 +++++++++ server/server/integration/pom.xml | 2 +- server/server/meta/pom.xml | 2 +- server/server/pom.xml | 2 +- server/server/session/pom.xml | 2 +- .../server/session/push/PushMetrics.java | 2 +- server/server/shared/pom.xml | 2 +- server/store/api/pom.xml | 2 +- server/store/jdbc/pom.xml | 2 +- server/store/jraft/pom.xml | 2 +- server/store/pom.xml | 2 +- test/pom.xml | 2 +- 36 files changed, 354 insertions(+), 64 deletions(-) create mode 100644 server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/DataChangeMergeConfig.java create mode 100644 server/server/data/src/main/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigService.java create mode 100644 server/server/data/src/test/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigServiceTest.java diff --git a/VERSION b/VERSION index 8904eeccd..798e38995 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -6.3.0-SNAPSHOT +6.3.0 diff --git a/client/all/pom.xml b/client/all/pom.xml index 048f9f11c..897ff39ab 100644 --- a/client/all/pom.xml +++ b/client/all/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-client-all - 6.3.0-SNAPSHOT + 6.3.0 ${project.groupId}:${project.artifactId} http://github.com/alipay/sofa-registry diff --git a/client/api/pom.xml b/client/api/pom.xml index fbeb72e2e..a5a556d02 100644 --- a/client/api/pom.xml +++ b/client/api/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-client-parent - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/client/impl/pom.xml b/client/impl/pom.xml index 3f918818b..7a168ff9c 100644 --- a/client/impl/pom.xml +++ b/client/impl/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-client-parent - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/client/log/pom.xml b/client/log/pom.xml index 48affafe5..2b966b6aa 100644 --- a/client/log/pom.xml +++ b/client/log/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-client-parent - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/client/pom.xml b/client/pom.xml index 618645e92..a6723e508 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -7,7 +7,7 @@ com.alipay.sofa registry-parent - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 4e2094633..b248c32f9 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-parent - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/pom.xml b/pom.xml index e6c4d8ca7..e0d89772e 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.alipay.sofa registry-parent - 6.3.0-SNAPSHOT + 6.3.0 pom diff --git a/server/common/model/pom.xml b/server/common/model/pom.xml index caef8c84e..6af9687bc 100644 --- a/server/common/model/pom.xml +++ b/server/common/model/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-common - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java index fbfe5ff3d..b090a9b68 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java @@ -157,6 +157,12 @@ public class ValueConstants { SESSION_PROVIDE_DATA_INSTANCE_ID, SESSION_PROVIDE_DATA_GROUP); + public static final String DATA_MERGER_TASK_DELAY_CONFIG_DATA_ID = + DataInfo.toDataInfoId( + "data_merger_task.delay.config", + SESSION_PROVIDE_DATA_INSTANCE_ID, + SESSION_PROVIDE_DATA_GROUP); + public static final String DISABLE_DATA_ID_CASE_SENSITIVE_SWITCH = "disable.dataId.case.sensitive"; diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/DataChangeMergeConfig.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/DataChangeMergeConfig.java new file mode 100644 index 000000000..e86c29786 --- /dev/null +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/DataChangeMergeConfig.java @@ -0,0 +1,35 @@ +package com.alipay.sofa.registry.common.model.metaserver; + +/** + * @author jiangcun.hlc@antfin.com + * @since 2023/4/24 + */ +public class DataChangeMergeConfig { + + private int dataChangeMergeDelay; + private int multiDataChangeMergeDelay; + + public int getDataChangeMergeDelay() { + return dataChangeMergeDelay; + } + + public void setDataChangeMergeDelay(int dataChangeMergeDelay) { + this.dataChangeMergeDelay = dataChangeMergeDelay; + } + + public int getMultiDataChangeMergeDelay() { + return multiDataChangeMergeDelay; + } + + public void setMultiDataChangeMergeDelay(int multiDataChangeMergeDelay) { + this.multiDataChangeMergeDelay = multiDataChangeMergeDelay; + } + + @Override + public String toString() { + return "DataChangeMergeConfig{" + + "dataChangeMergeDelay=" + dataChangeMergeDelay + + ", multiDataChangeMergeDelay=" + multiDataChangeMergeDelay + + '}'; + } +} diff --git a/server/common/pom.xml b/server/common/pom.xml index d1e0aefcb..7a33805a6 100644 --- a/server/common/pom.xml +++ b/server/common/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/common/util/pom.xml b/server/common/util/pom.xml index b5f2269cc..1f9f9b2b9 100644 --- a/server/common/util/pom.xml +++ b/server/common/util/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-common - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/distribution/all/pom.xml b/server/distribution/all/pom.xml index 128196e38..4dedd9be5 100644 --- a/server/distribution/all/pom.xml +++ b/server/distribution/all/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-distribution - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/distribution/pom.xml b/server/distribution/pom.xml index 43a85967e..060135f83 100644 --- a/server/distribution/pom.xml +++ b/server/distribution/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-server-parent - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/pom.xml b/server/pom.xml index 336fd51df..3445e50ad 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -7,7 +7,7 @@ com.alipay.sofa registry-parent - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml diff --git a/server/remoting/api/pom.xml b/server/remoting/api/pom.xml index 8f00d52c8..5c7b315fc 100644 --- a/server/remoting/api/pom.xml +++ b/server/remoting/api/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-remoting - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/remoting/bolt/pom.xml b/server/remoting/bolt/pom.xml index dc592d182..b46108ed6 100644 --- a/server/remoting/bolt/pom.xml +++ b/server/remoting/bolt/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-remoting - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/remoting/http/pom.xml b/server/remoting/http/pom.xml index f495b41be..98a6de9af 100644 --- a/server/remoting/http/pom.xml +++ b/server/remoting/http/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-remoting - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/remoting/pom.xml b/server/remoting/pom.xml index 88b4d7927..68548e8a5 100644 --- a/server/remoting/pom.xml +++ b/server/remoting/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/server/data/pom.xml b/server/server/data/pom.xml index 30de68f93..314c89a43 100644 --- a/server/server/data/pom.xml +++ b/server/server/data/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenter.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenter.java index 153d352c7..92988329e 100644 --- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenter.java +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenter.java @@ -23,6 +23,7 @@ import com.alipay.sofa.registry.common.model.Tuple; import com.alipay.sofa.registry.common.model.dataserver.Datum; import com.alipay.sofa.registry.common.model.dataserver.DatumVersion; +import com.alipay.sofa.registry.common.model.metaserver.DataChangeMergeConfig; import com.alipay.sofa.registry.common.model.sessionserver.DataChangeRequest; import com.alipay.sofa.registry.common.model.sessionserver.DataPushRequest; import com.alipay.sofa.registry.common.model.store.Publisher; @@ -51,6 +52,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.springframework.beans.factory.annotation.Autowired; /** @@ -71,17 +73,22 @@ public class DataChangeEventCenter { @Autowired private DefaultCommonConfig defaultCommonConfig; private final Map dataCenter2Changes = Maps.newConcurrentMap(); + // redundancy store dataCenter2Changes to notify remote dateCenter + private final Map multiDataCenter2Changes = Maps.newConcurrentMap(); private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final LinkedList retryNotifiers = Lists.newLinkedList(); + private final LinkedList multiRetryNotifiers = Lists.newLinkedList(); private final Map> dataCenter2TempChanges = Maps.newConcurrentMap(); private final ReadWriteLock tempLock = new ReentrantReadWriteLock(); private final TempChangeMerger tempChangeMerger = new TempChangeMerger(); private final ChangeMerger changeMerger = new ChangeMerger(); + private final MultiChangeMerger multiChangeMerger = new MultiChangeMerger(); private KeyedThreadPoolExecutor notifyExecutor; private KeyedThreadPoolExecutor notifyTempExecutor; + private DataChangeMergeConfig dataChangeMergeConfig; public void init() { this.notifyExecutor = @@ -96,6 +103,7 @@ public void init() { dataServerConfig.getNotifyTempExecutorQueueSize()); ConcurrentUtils.createDaemonThread("changeMerger", changeMerger).start(); + ConcurrentUtils.createDaemonThread("multiChangeMerger", multiChangeMerger).start(); ConcurrentUtils.createDaemonThread("tempChangeMerger", tempChangeMerger).start(); LOGGER.info( "start DataChange NotifyIntervalMs={}, NotifyTempIntervalMs={}", @@ -103,6 +111,10 @@ public void init() { dataServerConfig.getNotifyTempDataIntervalMillis()); } + public void setDataChangeConfig(DataChangeMergeConfig dataChangeMergeConfig) { + this.dataChangeMergeConfig = dataChangeMergeConfig; + } + public void onTempPubChange(Publisher publisher, String dataCenter) { Map changes = dataCenter2TempChanges.computeIfAbsent(dataCenter, k -> Maps.newConcurrentMap()); @@ -123,9 +135,12 @@ public void onChange( } DataChangeMerger changes = dataCenter2Changes.computeIfAbsent(dataCenter, k -> new DataChangeMerger()); + DataChangeMerger multiChanges = + multiDataCenter2Changes.computeIfAbsent(dataCenter, k -> new DataChangeMerger()); lock.readLock().lock(); try { changes.addChanges(dataInfoIds, dataChangeType); + multiChanges.addChanges(dataInfoIds, dataChangeType); } finally { lock.readLock().unlock(); } @@ -173,6 +188,7 @@ final class ChangeNotifier implements Runnable { final String dataCenter; final Map dataInfoIds; final TraceTimes times; + final NodeType nodeType; volatile int retryCount; @@ -181,13 +197,14 @@ private ChangeNotifier( int notifyPort, String dataCenter, Map dataInfoIds, - TraceTimes parentTimes) { + TraceTimes parentTimes, NodeType nodeType) { this.dataCenter = dataCenter; this.channel = channel; this.notifyPort = notifyPort; this.dataInfoIds = dataInfoIds; this.times = parentTimes.copy(); this.times.setDatumNotifyCreate(System.currentTimeMillis()); + this.nodeType = nodeType; } @Override @@ -247,26 +264,49 @@ boolean commitRetry(ChangeNotifier retry) { final int maxSize = dataServerConfig.getNotifyRetryQueueSize(); final long expireTimestamp = System.currentTimeMillis() + dataServerConfig.getNotifyRetryBackoffMillis(); - synchronized (retryNotifiers) { - if (retryNotifiers.size() >= maxSize) { - // remove first - retryNotifiers.removeFirst(); + if(NodeType.DATA == retry.nodeType){ + synchronized (multiRetryNotifiers) { + if (multiRetryNotifiers.size() >= maxSize) { + // remove first + multiRetryNotifiers.removeFirst(); + } + multiRetryNotifiers.add(new ChangeNotifierRetry(retry, expireTimestamp)); + } + }else{ + synchronized (retryNotifiers) { + if (retryNotifiers.size() >= maxSize) { + // remove first + retryNotifiers.removeFirst(); + } + retryNotifiers.add(new ChangeNotifierRetry(retry, expireTimestamp)); } - retryNotifiers.add(new ChangeNotifierRetry(retry, expireTimestamp)); } return true; } - List getExpires() { + List getExpires(NodeType nodeType) { final List expires = Lists.newLinkedList(); final long now = System.currentTimeMillis(); - synchronized (retryNotifiers) { - final Iterator it = retryNotifiers.iterator(); - while (it.hasNext()) { - ChangeNotifierRetry retry = it.next(); - if (retry.expireTimestamp <= now) { - expires.add(retry.notifier); - it.remove(); + if(NodeType.DATA == nodeType){ + synchronized (multiRetryNotifiers) { + final Iterator it = multiRetryNotifiers.iterator(); + while (it.hasNext()) { + ChangeNotifierRetry retry = it.next(); + if (retry.expireTimestamp <= now) { + expires.add(retry.notifier); + it.remove(); + } + } + } + }else{ + synchronized (retryNotifiers) { + final Iterator it = retryNotifiers.iterator(); + while (it.hasNext()) { + ChangeNotifierRetry retry = it.next(); + if (retry.expireTimestamp <= now) { + expires.add(retry.notifier); + it.remove(); + } } } } @@ -414,7 +454,7 @@ boolean handleChanges( try { notifyExecutor.execute( channel.getRemoteAddress(), - new ChangeNotifier(channel, notifyPort, dataCenter, changes, event.getTraceTimes())); + new ChangeNotifier(channel, notifyPort, dataCenter, changes, event.getTraceTimes(), nodeType)); CHANGE_COMMIT_COUNTER.inc(); } catch (FastRejectedExecutionException e) { CHANGE_SKIP_COUNTER.inc(); @@ -428,8 +468,8 @@ boolean handleChanges( return true; } - void handleExpire() { - final List retries = getExpires(); + void handleExpire(NodeType nodeType) { + final List retries = getExpires(nodeType); // commit retry for (ChangeNotifier retry : retries) { try { @@ -450,11 +490,15 @@ void handleExpire() { } } - List transferChangeEvent(int maxItems) { + List transferChangeEvent(int maxItems, boolean isMulti) { final List events = Lists.newArrayList(); lock.writeLock().lock(); try { - for (Map.Entry change : dataCenter2Changes.entrySet()) { + Set> dateChangeSet = dataCenter2Changes.entrySet(); + if (isMulti) { + dateChangeSet = multiDataCenter2Changes.entrySet(); + } + for (Map.Entry change : dateChangeSet) { final String dataCenter = change.getKey(); DataChangeMerger merger = change.getValue(); TraceTimes traceTimes = merger.createTraceTime(); @@ -478,18 +522,44 @@ public void runUnthrowable() { try { // first clean the event final int maxItems = dataServerConfig.getNotifyMaxItems(); - final List events = transferChangeEvent(maxItems); + final List events = transferChangeEvent(maxItems, false); // notify local session handleChanges(events, NodeType.SESSION, dataServerConfig.getNotifyPort(), true); + handleExpire(NodeType.SESSION); + } catch (Throwable e) { + LOGGER.error("failed to merge change", e); + } + } + + @Override + public void waitingUnthrowable() { + int notifyIntervalMillis = dataServerConfig.getNotifyIntervalMillis(); + if(null != dataChangeMergeConfig){ + notifyIntervalMillis = dataChangeMergeConfig.getDataChangeMergeDelay(); + } + ConcurrentUtils.sleepUninterruptibly( + notifyIntervalMillis, TimeUnit.MILLISECONDS); + } + } + + private final class MultiChangeMerger extends LoopRunnable { + + @Override + public void runUnthrowable() { + try { + // first clean the event + final int maxItems = dataServerConfig.getNotifyMaxItems(); + final List events = transferChangeEvent(maxItems, true); + // notify remote data handleChanges( events, NodeType.DATA, multiClusterDataServerConfig.getSyncRemoteSlotLeaderPort(), false); - handleExpire(); + handleExpire(NodeType.DATA); } catch (Throwable e) { LOGGER.error("failed to merge change", e); } @@ -497,8 +567,13 @@ public void runUnthrowable() { @Override public void waitingUnthrowable() { + //multi dataCenter data change merger notify interval 100 millis + int notifyIntervalMillis = 100; + if(null != dataChangeMergeConfig){ + notifyIntervalMillis = dataChangeMergeConfig.getMultiDataChangeMergeDelay(); + } ConcurrentUtils.sleepUninterruptibly( - dataServerConfig.getNotifyIntervalMillis(), TimeUnit.MILLISECONDS); + notifyIntervalMillis, TimeUnit.MILLISECONDS); } } @@ -532,7 +607,7 @@ void setExchange(Exchange boltExchange) { @VisibleForTesting ChangeNotifier newChangeNotifier( Channel channel, int notifyPort, String dataCenter, Map dataInfoIds) { - return new ChangeNotifier(channel, notifyPort, dataCenter, dataInfoIds, new TraceTimes()); + return new ChangeNotifier(channel, notifyPort, dataCenter, dataInfoIds, new TraceTimes(), NodeType.SESSION); } @VisibleForTesting diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigService.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigService.java new file mode 100644 index 000000000..f90095df9 --- /dev/null +++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigService.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.data.providedata; + +import com.alipay.sofa.registry.common.model.constants.ValueConstants; +import com.alipay.sofa.registry.common.model.metaserver.DataChangeMergeConfig; +import com.alipay.sofa.registry.common.model.metaserver.ProvideData; +import com.alipay.sofa.registry.log.Logger; +import com.alipay.sofa.registry.log.LoggerFactory; +import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig; +import com.alipay.sofa.registry.server.data.change.DataChangeEventCenter; +import com.alipay.sofa.registry.server.shared.providedata.AbstractFetchSystemPropertyService; +import com.alipay.sofa.registry.server.shared.providedata.SystemDataStorage; +import com.alipay.sofa.registry.util.JsonUtils; +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author jiangcun.hlc@antfin.com + * @since 2023/2/21 + */ +public class FetchDataChangeMergerConfigService + extends AbstractFetchSystemPropertyService { + + private static final Logger LOGGER = + LoggerFactory.getLogger(FetchDataChangeMergerConfigService.class); + + @Autowired + private DataServerConfig dataServerConfig; + + @Autowired + protected DataChangeEventCenter dataChangeEventCenter; + + + public FetchDataChangeMergerConfigService() { + super( + ValueConstants.DATA_MERGER_TASK_DELAY_CONFIG_DATA_ID, + new SwitchStorage(INIT_VERSION, new DataChangeMergeConfig())); + } + + @Override + protected int getSystemPropertyIntervalMillis() { + return dataServerConfig.getSystemPropertyIntervalMillis(); + } + + @Override + protected boolean doProcess(SwitchStorage expect, ProvideData data) { + final String configString = ProvideData.toString(data); + + DataChangeMergeConfig dataChangeMergeConfig = null; + if (StringUtils.isBlank(configString)) { + return true; + } + try { + dataChangeMergeConfig = JsonUtils.read(configString, DataChangeMergeConfig.class); + } catch (Throwable e) { + LOGGER.error("Decode DataChangeMergeConfig failed", e); + return false; + } + if (dataChangeMergeConfig == null) { + LOGGER.error( + "Fetch DataChangeMergeConfig invalid, value={}", dataChangeMergeConfig); + return false; + } + + SwitchStorage update = new SwitchStorage(data.getVersion(), dataChangeMergeConfig); + if (!compareAndSet(expect, update)) { + return false; + } + + dataChangeEventCenter.setDataChangeConfig(dataChangeMergeConfig); + + LOGGER.info( + "Fetch DataChangeMergeConfig success, prev={}, current={}", + expect.dataChangeMergeConfig, + dataChangeMergeConfig); + return true; + } + + protected static class SwitchStorage extends SystemDataStorage { + protected final DataChangeMergeConfig dataChangeMergeConfig; + + public SwitchStorage(long version, DataChangeMergeConfig dataChangeMergeConfig) { + super(version); + this.dataChangeMergeConfig = dataChangeMergeConfig; + } + } + @VisibleForTesting + public FetchDataChangeMergerConfigService setDataServerConfig(DataServerConfig dataServerConfig) { + this.dataServerConfig = dataServerConfig; + return this; + } + @VisibleForTesting + public FetchDataChangeMergerConfigService setDataChangeEventCenter(DataChangeEventCenter dataChangeEventCenter) { + this.dataChangeEventCenter = dataChangeEventCenter; + return this; + } + + +} diff --git a/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenterTest.java b/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenterTest.java index 6a4cc9687..a0a86c98a 100644 --- a/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenterTest.java +++ b/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenterTest.java @@ -90,7 +90,7 @@ public void testHandleChangeNotInit() { Assert.assertFalse( center.handleChanges( - center.transferChangeEvent(dataServerConfig.getNotifyMaxItems()), + center.transferChangeEvent(dataServerConfig.getNotifyMaxItems(), false), NodeType.SESSION, dataServerConfig.getNotifyPort(), true)); @@ -104,7 +104,7 @@ public void testHandleChangeNotInit() { center.onChange(changes1, DataChangeType.PUT, DC); Assert.assertFalse( center.handleChanges( - center.transferChangeEvent(dataServerConfig.getNotifyMaxItems()), + center.transferChangeEvent(dataServerConfig.getNotifyMaxItems(), false), NodeType.SESSION, dataServerConfig.getNotifyPort(), true)); @@ -126,7 +126,7 @@ public void testHandleChangeNotInit() { channelsMap.put("localhost", Lists.newArrayList(channel)); Assert.assertTrue( center.handleChanges( - center.transferChangeEvent(dataServerConfig.getNotifyMaxItems()), + center.transferChangeEvent(dataServerConfig.getNotifyMaxItems(), false), NodeType.SESSION, dataServerConfig.getNotifyPort(), true)); @@ -137,7 +137,7 @@ public void testHandleChangeNotInit() { // npe Assert.assertTrue( center.handleChanges( - center.transferChangeEvent(dataServerConfig.getNotifyMaxItems()), + center.transferChangeEvent(dataServerConfig.getNotifyMaxItems(), false), NodeType.SESSION, dataServerConfig.getNotifyPort(), true)); @@ -148,7 +148,7 @@ public void testHandleChangeNotInit() { double pre = ChangeMetrics.CHANGE_SKIP_COUNTER.get(); Assert.assertTrue( center.handleChanges( - center.transferChangeEvent(dataServerConfig.getNotifyMaxItems()), + center.transferChangeEvent(dataServerConfig.getNotifyMaxItems(), false), NodeType.SESSION, dataServerConfig.getNotifyPort(), true)); @@ -219,14 +219,14 @@ public void testTempNotify() { @Test public void testHandleExpire_npe() { initHandleExpire(); - center.handleExpire(); + center.handleExpire(NodeType.SESSION); } @Test public void testHandleExpire_reject() { initHandleExpire(); center.setNotifyExecutor(TestBaseUtils.rejectExecutor()); - center.handleExpire(); + center.handleExpire(NodeType.SESSION); } private void initHandleExpire() { @@ -250,14 +250,14 @@ private void initHandleExpire() { DC, Collections.singletonMap(String.valueOf(i + 100), new DatumVersion(200)))); } - List expires = center.getExpires(); + List expires = center.getExpires(NodeType.SESSION); Assert.assertTrue(expires.isEmpty()); // is full, make expire now dataServerConfig.setNotifyRetryBackoffMillis(0); for (DataChangeEventCenter.ChangeNotifier n : list) { center.commitRetry(n); } - expires = center.getExpires(); + expires = center.getExpires(NodeType.SESSION); Assert.assertEquals(expires.size(), list.size()); Assert.assertArrayEquals(expires.toArray(), list.toArray()); for (DataChangeEventCenter.ChangeNotifier n : list) { @@ -305,13 +305,13 @@ public void testTransfer() { setCenter(); List changes1 = Lists.newArrayList("1", "2", "3"); center.onChange(changes1, DataChangeType.PUT, DC); - List events = center.transferChangeEvent(3); + List events = center.transferChangeEvent(3, false); Assert.assertNotNull(events.get(0).toString()); Assert.assertEquals(1, events.size()); assertEvent(events, changes1); center.onChange(changes1, DataChangeType.PUT, DC); - events = center.transferChangeEvent(2); + events = center.transferChangeEvent(2, false); Assert.assertEquals(2, events.size()); assertEvent(events, changes1); } diff --git a/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigServiceTest.java b/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigServiceTest.java new file mode 100644 index 000000000..cac8cafba --- /dev/null +++ b/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigServiceTest.java @@ -0,0 +1,59 @@ +package com.alipay.sofa.registry.server.data.providedata; + +import com.alipay.sofa.registry.common.model.ServerDataBox; +import com.alipay.sofa.registry.common.model.constants.ValueConstants; +import com.alipay.sofa.registry.common.model.metaserver.ProvideData; +import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig; +import com.alipay.sofa.registry.server.data.change.DataChangeEventCenter; +import junit.framework.TestCase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +/** + * @author jiangcun.hlc@antfin.com + * @since 2023/4/24 + */ +public class FetchDataChangeMergerConfigServiceTest { + + FetchDataChangeMergerConfigService fetchDataChangeMergerConfigService; + + @Before + public void beforeTest() { + fetchDataChangeMergerConfigService = new FetchDataChangeMergerConfigService(); + fetchDataChangeMergerConfigService + .setDataChangeEventCenter(mock(DataChangeEventCenter.class)) + .setDataServerConfig(mock(DataServerConfig.class)); + } + + @Test + public void testDoProcess() { + Assert.assertTrue(fetchDataChangeMergerConfigService.getSystemPropertyIntervalMillis() == 0); + Assert.assertTrue( + fetchDataChangeMergerConfigService.doProcess( + fetchDataChangeMergerConfigService.getStorage().get(), + new ProvideData( + new ServerDataBox( + ""), + ValueConstants.DATA_MERGER_TASK_DELAY_CONFIG_DATA_ID, + 0L))); + Assert.assertFalse( + fetchDataChangeMergerConfigService.doProcess( + new FetchDataChangeMergerConfigService.SwitchStorage(0L, null), + new ProvideData( + new ServerDataBox( + "{\"changeDebouncingMillis\":1000,\"changeDebouncingMaxMillis\":3000,\"changeTaskWaitingMillis\":100,\"pushTaskWaitingMillis\":0,\"pushTaskDebouncingMillis\":500,\"regWorkWaitingMillis\":200,\"zoneSet\":[\"ALL_ZONE\"]}"), + ValueConstants.CHANGE_PUSH_TASK_DELAY_CONFIG_DATA_ID, + 0L))); + Assert.assertTrue( + fetchDataChangeMergerConfigService.doProcess( + fetchDataChangeMergerConfigService.getStorage().get(), + new ProvideData( + new ServerDataBox( + "{\"dataChangeMergeDelay\":1000,\"multiDataChangeMergeDelay\":200}"), + ValueConstants.CHANGE_PUSH_TASK_DELAY_CONFIG_DATA_ID, + 2L))); + } +} \ No newline at end of file diff --git a/server/server/integration/pom.xml b/server/server/integration/pom.xml index ebd309986..712715597 100644 --- a/server/server/integration/pom.xml +++ b/server/server/integration/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/server/meta/pom.xml b/server/server/meta/pom.xml index 9e74f7e32..291865692 100644 --- a/server/server/meta/pom.xml +++ b/server/server/meta/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/server/pom.xml b/server/server/pom.xml index dcbb33ff7..f96c54223 100644 --- a/server/server/pom.xml +++ b/server/server/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/server/session/pom.xml b/server/server/session/pom.xml index 67e04c9e9..c48f7ff04 100644 --- a/server/server/session/pom.xml +++ b/server/server/session/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushMetrics.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushMetrics.java index fb33a0ffc..525391f87 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushMetrics.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushMetrics.java @@ -119,7 +119,7 @@ static final class Push { .register(); private static final Histogram PUSH_DELAY_HISTOGRAM = Histogram.build() - .linearBuckets(0, 1000, 30) + .linearBuckets(0, 1000, 10) .namespace("session") .subsystem("push") .name("push_delay") diff --git a/server/server/shared/pom.xml b/server/server/shared/pom.xml index 030c0d940..1614e89b8 100644 --- a/server/server/shared/pom.xml +++ b/server/server/shared/pom.xml @@ -5,7 +5,7 @@ registry-server com.alipay.sofa - 6.3.0-SNAPSHOT + 6.3.0 4.0.0 diff --git a/server/store/api/pom.xml b/server/store/api/pom.xml index b380a464d..eb6060816 100644 --- a/server/store/api/pom.xml +++ b/server/store/api/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-store - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/server/store/jdbc/pom.xml b/server/store/jdbc/pom.xml index e49aaeb71..b55a72d6b 100644 --- a/server/store/jdbc/pom.xml +++ b/server/store/jdbc/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-store - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml diff --git a/server/store/jraft/pom.xml b/server/store/jraft/pom.xml index 57254ebe2..bc5d63988 100644 --- a/server/store/jraft/pom.xml +++ b/server/store/jraft/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-store - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml diff --git a/server/store/pom.xml b/server/store/pom.xml index b0487b597..d93818f6f 100644 --- a/server/store/pom.xml +++ b/server/store/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0 diff --git a/test/pom.xml b/test/pom.xml index 70eb2b3de..cb72d0ec4 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-parent - 6.3.0-SNAPSHOT + 6.3.0 ../pom.xml 4.0.0