diff --git a/VERSION b/VERSION
index 8904eeccd..798e38995 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-6.3.0-SNAPSHOT
+6.3.0
diff --git a/client/all/pom.xml b/client/all/pom.xml
index 048f9f11c..897ff39ab 100644
--- a/client/all/pom.xml
+++ b/client/all/pom.xml
@@ -6,7 +6,7 @@
com.alipay.sofa
registry-client-all
- 6.3.0-SNAPSHOT
+ 6.3.0
${project.groupId}:${project.artifactId}
http://github.com/alipay/sofa-registry
diff --git a/client/api/pom.xml b/client/api/pom.xml
index fbeb72e2e..a5a556d02 100644
--- a/client/api/pom.xml
+++ b/client/api/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-client-parent
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/client/impl/pom.xml b/client/impl/pom.xml
index 3f918818b..7a168ff9c 100644
--- a/client/impl/pom.xml
+++ b/client/impl/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-client-parent
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/client/log/pom.xml b/client/log/pom.xml
index 48affafe5..2b966b6aa 100644
--- a/client/log/pom.xml
+++ b/client/log/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-client-parent
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/client/pom.xml b/client/pom.xml
index 618645e92..a6723e508 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -7,7 +7,7 @@
com.alipay.sofa
registry-parent
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
diff --git a/core/pom.xml b/core/pom.xml
index 4e2094633..b248c32f9 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-parent
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/pom.xml b/pom.xml
index e6c4d8ca7..e0d89772e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
4.0.0
com.alipay.sofa
registry-parent
- 6.3.0-SNAPSHOT
+ 6.3.0
pom
diff --git a/server/common/model/pom.xml b/server/common/model/pom.xml
index caef8c84e..6af9687bc 100644
--- a/server/common/model/pom.xml
+++ b/server/common/model/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-common
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java
index fbfe5ff3d..b090a9b68 100644
--- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java
+++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java
@@ -157,6 +157,12 @@ public class ValueConstants {
SESSION_PROVIDE_DATA_INSTANCE_ID,
SESSION_PROVIDE_DATA_GROUP);
+ public static final String DATA_MERGER_TASK_DELAY_CONFIG_DATA_ID =
+ DataInfo.toDataInfoId(
+ "data_merger_task.delay.config",
+ SESSION_PROVIDE_DATA_INSTANCE_ID,
+ SESSION_PROVIDE_DATA_GROUP);
+
public static final String DISABLE_DATA_ID_CASE_SENSITIVE_SWITCH =
"disable.dataId.case.sensitive";
diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/DataChangeMergeConfig.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/DataChangeMergeConfig.java
new file mode 100644
index 000000000..e86c29786
--- /dev/null
+++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/metaserver/DataChangeMergeConfig.java
@@ -0,0 +1,35 @@
+package com.alipay.sofa.registry.common.model.metaserver;
+
+/**
+ * @author jiangcun.hlc@antfin.com
+ * @since 2023/4/24
+ */
+public class DataChangeMergeConfig {
+
+ private int dataChangeMergeDelay;
+ private int multiDataChangeMergeDelay;
+
+ public int getDataChangeMergeDelay() {
+ return dataChangeMergeDelay;
+ }
+
+ public void setDataChangeMergeDelay(int dataChangeMergeDelay) {
+ this.dataChangeMergeDelay = dataChangeMergeDelay;
+ }
+
+ public int getMultiDataChangeMergeDelay() {
+ return multiDataChangeMergeDelay;
+ }
+
+ public void setMultiDataChangeMergeDelay(int multiDataChangeMergeDelay) {
+ this.multiDataChangeMergeDelay = multiDataChangeMergeDelay;
+ }
+
+ @Override
+ public String toString() {
+ return "DataChangeMergeConfig{" +
+ "dataChangeMergeDelay=" + dataChangeMergeDelay +
+ ", multiDataChangeMergeDelay=" + multiDataChangeMergeDelay +
+ '}';
+ }
+}
diff --git a/server/common/pom.xml b/server/common/pom.xml
index d1e0aefcb..7a33805a6 100644
--- a/server/common/pom.xml
+++ b/server/common/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-server-parent
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/common/util/pom.xml b/server/common/util/pom.xml
index b5f2269cc..1f9f9b2b9 100644
--- a/server/common/util/pom.xml
+++ b/server/common/util/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-common
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/distribution/all/pom.xml b/server/distribution/all/pom.xml
index 128196e38..4dedd9be5 100644
--- a/server/distribution/all/pom.xml
+++ b/server/distribution/all/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-distribution
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/distribution/pom.xml b/server/distribution/pom.xml
index 43a85967e..060135f83 100644
--- a/server/distribution/pom.xml
+++ b/server/distribution/pom.xml
@@ -6,7 +6,7 @@
com.alipay.sofa
registry-server-parent
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/pom.xml b/server/pom.xml
index 336fd51df..3445e50ad 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -7,7 +7,7 @@
com.alipay.sofa
registry-parent
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
diff --git a/server/remoting/api/pom.xml b/server/remoting/api/pom.xml
index 8f00d52c8..5c7b315fc 100644
--- a/server/remoting/api/pom.xml
+++ b/server/remoting/api/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-remoting
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/remoting/bolt/pom.xml b/server/remoting/bolt/pom.xml
index dc592d182..b46108ed6 100644
--- a/server/remoting/bolt/pom.xml
+++ b/server/remoting/bolt/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-remoting
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/remoting/http/pom.xml b/server/remoting/http/pom.xml
index f495b41be..98a6de9af 100644
--- a/server/remoting/http/pom.xml
+++ b/server/remoting/http/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-remoting
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/remoting/pom.xml b/server/remoting/pom.xml
index 88b4d7927..68548e8a5 100644
--- a/server/remoting/pom.xml
+++ b/server/remoting/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-server-parent
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/server/data/pom.xml b/server/server/data/pom.xml
index 30de68f93..314c89a43 100644
--- a/server/server/data/pom.xml
+++ b/server/server/data/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-server
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenter.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenter.java
index 153d352c7..92988329e 100644
--- a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenter.java
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenter.java
@@ -23,6 +23,7 @@
import com.alipay.sofa.registry.common.model.Tuple;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.dataserver.DatumVersion;
+import com.alipay.sofa.registry.common.model.metaserver.DataChangeMergeConfig;
import com.alipay.sofa.registry.common.model.sessionserver.DataChangeRequest;
import com.alipay.sofa.registry.common.model.sessionserver.DataPushRequest;
import com.alipay.sofa.registry.common.model.store.Publisher;
@@ -51,6 +52,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.springframework.beans.factory.annotation.Autowired;
/**
@@ -71,17 +73,22 @@ public class DataChangeEventCenter {
@Autowired private DefaultCommonConfig defaultCommonConfig;
private final Map dataCenter2Changes = Maps.newConcurrentMap();
+ // redundancy store dataCenter2Changes to notify remote dateCenter
+ private final Map multiDataCenter2Changes = Maps.newConcurrentMap();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final LinkedList retryNotifiers = Lists.newLinkedList();
+ private final LinkedList multiRetryNotifiers = Lists.newLinkedList();
private final Map> dataCenter2TempChanges = Maps.newConcurrentMap();
private final ReadWriteLock tempLock = new ReentrantReadWriteLock();
private final TempChangeMerger tempChangeMerger = new TempChangeMerger();
private final ChangeMerger changeMerger = new ChangeMerger();
+ private final MultiChangeMerger multiChangeMerger = new MultiChangeMerger();
private KeyedThreadPoolExecutor notifyExecutor;
private KeyedThreadPoolExecutor notifyTempExecutor;
+ private DataChangeMergeConfig dataChangeMergeConfig;
public void init() {
this.notifyExecutor =
@@ -96,6 +103,7 @@ public void init() {
dataServerConfig.getNotifyTempExecutorQueueSize());
ConcurrentUtils.createDaemonThread("changeMerger", changeMerger).start();
+ ConcurrentUtils.createDaemonThread("multiChangeMerger", multiChangeMerger).start();
ConcurrentUtils.createDaemonThread("tempChangeMerger", tempChangeMerger).start();
LOGGER.info(
"start DataChange NotifyIntervalMs={}, NotifyTempIntervalMs={}",
@@ -103,6 +111,10 @@ public void init() {
dataServerConfig.getNotifyTempDataIntervalMillis());
}
+ public void setDataChangeConfig(DataChangeMergeConfig dataChangeMergeConfig) {
+ this.dataChangeMergeConfig = dataChangeMergeConfig;
+ }
+
public void onTempPubChange(Publisher publisher, String dataCenter) {
Map changes =
dataCenter2TempChanges.computeIfAbsent(dataCenter, k -> Maps.newConcurrentMap());
@@ -123,9 +135,12 @@ public void onChange(
}
DataChangeMerger changes =
dataCenter2Changes.computeIfAbsent(dataCenter, k -> new DataChangeMerger());
+ DataChangeMerger multiChanges =
+ multiDataCenter2Changes.computeIfAbsent(dataCenter, k -> new DataChangeMerger());
lock.readLock().lock();
try {
changes.addChanges(dataInfoIds, dataChangeType);
+ multiChanges.addChanges(dataInfoIds, dataChangeType);
} finally {
lock.readLock().unlock();
}
@@ -173,6 +188,7 @@ final class ChangeNotifier implements Runnable {
final String dataCenter;
final Map dataInfoIds;
final TraceTimes times;
+ final NodeType nodeType;
volatile int retryCount;
@@ -181,13 +197,14 @@ private ChangeNotifier(
int notifyPort,
String dataCenter,
Map dataInfoIds,
- TraceTimes parentTimes) {
+ TraceTimes parentTimes, NodeType nodeType) {
this.dataCenter = dataCenter;
this.channel = channel;
this.notifyPort = notifyPort;
this.dataInfoIds = dataInfoIds;
this.times = parentTimes.copy();
this.times.setDatumNotifyCreate(System.currentTimeMillis());
+ this.nodeType = nodeType;
}
@Override
@@ -247,26 +264,49 @@ boolean commitRetry(ChangeNotifier retry) {
final int maxSize = dataServerConfig.getNotifyRetryQueueSize();
final long expireTimestamp =
System.currentTimeMillis() + dataServerConfig.getNotifyRetryBackoffMillis();
- synchronized (retryNotifiers) {
- if (retryNotifiers.size() >= maxSize) {
- // remove first
- retryNotifiers.removeFirst();
+ if(NodeType.DATA == retry.nodeType){
+ synchronized (multiRetryNotifiers) {
+ if (multiRetryNotifiers.size() >= maxSize) {
+ // remove first
+ multiRetryNotifiers.removeFirst();
+ }
+ multiRetryNotifiers.add(new ChangeNotifierRetry(retry, expireTimestamp));
+ }
+ }else{
+ synchronized (retryNotifiers) {
+ if (retryNotifiers.size() >= maxSize) {
+ // remove first
+ retryNotifiers.removeFirst();
+ }
+ retryNotifiers.add(new ChangeNotifierRetry(retry, expireTimestamp));
}
- retryNotifiers.add(new ChangeNotifierRetry(retry, expireTimestamp));
}
return true;
}
- List getExpires() {
+ List getExpires(NodeType nodeType) {
final List expires = Lists.newLinkedList();
final long now = System.currentTimeMillis();
- synchronized (retryNotifiers) {
- final Iterator it = retryNotifiers.iterator();
- while (it.hasNext()) {
- ChangeNotifierRetry retry = it.next();
- if (retry.expireTimestamp <= now) {
- expires.add(retry.notifier);
- it.remove();
+ if(NodeType.DATA == nodeType){
+ synchronized (multiRetryNotifiers) {
+ final Iterator it = multiRetryNotifiers.iterator();
+ while (it.hasNext()) {
+ ChangeNotifierRetry retry = it.next();
+ if (retry.expireTimestamp <= now) {
+ expires.add(retry.notifier);
+ it.remove();
+ }
+ }
+ }
+ }else{
+ synchronized (retryNotifiers) {
+ final Iterator it = retryNotifiers.iterator();
+ while (it.hasNext()) {
+ ChangeNotifierRetry retry = it.next();
+ if (retry.expireTimestamp <= now) {
+ expires.add(retry.notifier);
+ it.remove();
+ }
}
}
}
@@ -414,7 +454,7 @@ boolean handleChanges(
try {
notifyExecutor.execute(
channel.getRemoteAddress(),
- new ChangeNotifier(channel, notifyPort, dataCenter, changes, event.getTraceTimes()));
+ new ChangeNotifier(channel, notifyPort, dataCenter, changes, event.getTraceTimes(), nodeType));
CHANGE_COMMIT_COUNTER.inc();
} catch (FastRejectedExecutionException e) {
CHANGE_SKIP_COUNTER.inc();
@@ -428,8 +468,8 @@ boolean handleChanges(
return true;
}
- void handleExpire() {
- final List retries = getExpires();
+ void handleExpire(NodeType nodeType) {
+ final List retries = getExpires(nodeType);
// commit retry
for (ChangeNotifier retry : retries) {
try {
@@ -450,11 +490,15 @@ void handleExpire() {
}
}
- List transferChangeEvent(int maxItems) {
+ List transferChangeEvent(int maxItems, boolean isMulti) {
final List events = Lists.newArrayList();
lock.writeLock().lock();
try {
- for (Map.Entry change : dataCenter2Changes.entrySet()) {
+ Set> dateChangeSet = dataCenter2Changes.entrySet();
+ if (isMulti) {
+ dateChangeSet = multiDataCenter2Changes.entrySet();
+ }
+ for (Map.Entry change : dateChangeSet) {
final String dataCenter = change.getKey();
DataChangeMerger merger = change.getValue();
TraceTimes traceTimes = merger.createTraceTime();
@@ -478,18 +522,44 @@ public void runUnthrowable() {
try {
// first clean the event
final int maxItems = dataServerConfig.getNotifyMaxItems();
- final List events = transferChangeEvent(maxItems);
+ final List events = transferChangeEvent(maxItems, false);
// notify local session
handleChanges(events, NodeType.SESSION, dataServerConfig.getNotifyPort(), true);
+ handleExpire(NodeType.SESSION);
+ } catch (Throwable e) {
+ LOGGER.error("failed to merge change", e);
+ }
+ }
+
+ @Override
+ public void waitingUnthrowable() {
+ int notifyIntervalMillis = dataServerConfig.getNotifyIntervalMillis();
+ if(null != dataChangeMergeConfig){
+ notifyIntervalMillis = dataChangeMergeConfig.getDataChangeMergeDelay();
+ }
+ ConcurrentUtils.sleepUninterruptibly(
+ notifyIntervalMillis, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private final class MultiChangeMerger extends LoopRunnable {
+
+ @Override
+ public void runUnthrowable() {
+ try {
+ // first clean the event
+ final int maxItems = dataServerConfig.getNotifyMaxItems();
+ final List events = transferChangeEvent(maxItems, true);
+
// notify remote data
handleChanges(
events,
NodeType.DATA,
multiClusterDataServerConfig.getSyncRemoteSlotLeaderPort(),
false);
- handleExpire();
+ handleExpire(NodeType.DATA);
} catch (Throwable e) {
LOGGER.error("failed to merge change", e);
}
@@ -497,8 +567,13 @@ public void runUnthrowable() {
@Override
public void waitingUnthrowable() {
+ //multi dataCenter data change merger notify interval 100 millis
+ int notifyIntervalMillis = 100;
+ if(null != dataChangeMergeConfig){
+ notifyIntervalMillis = dataChangeMergeConfig.getMultiDataChangeMergeDelay();
+ }
ConcurrentUtils.sleepUninterruptibly(
- dataServerConfig.getNotifyIntervalMillis(), TimeUnit.MILLISECONDS);
+ notifyIntervalMillis, TimeUnit.MILLISECONDS);
}
}
@@ -532,7 +607,7 @@ void setExchange(Exchange boltExchange) {
@VisibleForTesting
ChangeNotifier newChangeNotifier(
Channel channel, int notifyPort, String dataCenter, Map dataInfoIds) {
- return new ChangeNotifier(channel, notifyPort, dataCenter, dataInfoIds, new TraceTimes());
+ return new ChangeNotifier(channel, notifyPort, dataCenter, dataInfoIds, new TraceTimes(), NodeType.SESSION);
}
@VisibleForTesting
diff --git a/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigService.java b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigService.java
new file mode 100644
index 000000000..f90095df9
--- /dev/null
+++ b/server/server/data/src/main/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigService.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.registry.server.data.providedata;
+
+import com.alipay.sofa.registry.common.model.constants.ValueConstants;
+import com.alipay.sofa.registry.common.model.metaserver.DataChangeMergeConfig;
+import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
+import com.alipay.sofa.registry.log.Logger;
+import com.alipay.sofa.registry.log.LoggerFactory;
+import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
+import com.alipay.sofa.registry.server.data.change.DataChangeEventCenter;
+import com.alipay.sofa.registry.server.shared.providedata.AbstractFetchSystemPropertyService;
+import com.alipay.sofa.registry.server.shared.providedata.SystemDataStorage;
+import com.alipay.sofa.registry.util.JsonUtils;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * @author jiangcun.hlc@antfin.com
+ * @since 2023/2/21
+ */
+public class FetchDataChangeMergerConfigService
+ extends AbstractFetchSystemPropertyService {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(FetchDataChangeMergerConfigService.class);
+
+ @Autowired
+ private DataServerConfig dataServerConfig;
+
+ @Autowired
+ protected DataChangeEventCenter dataChangeEventCenter;
+
+
+ public FetchDataChangeMergerConfigService() {
+ super(
+ ValueConstants.DATA_MERGER_TASK_DELAY_CONFIG_DATA_ID,
+ new SwitchStorage(INIT_VERSION, new DataChangeMergeConfig()));
+ }
+
+ @Override
+ protected int getSystemPropertyIntervalMillis() {
+ return dataServerConfig.getSystemPropertyIntervalMillis();
+ }
+
+ @Override
+ protected boolean doProcess(SwitchStorage expect, ProvideData data) {
+ final String configString = ProvideData.toString(data);
+
+ DataChangeMergeConfig dataChangeMergeConfig = null;
+ if (StringUtils.isBlank(configString)) {
+ return true;
+ }
+ try {
+ dataChangeMergeConfig = JsonUtils.read(configString, DataChangeMergeConfig.class);
+ } catch (Throwable e) {
+ LOGGER.error("Decode DataChangeMergeConfig failed", e);
+ return false;
+ }
+ if (dataChangeMergeConfig == null) {
+ LOGGER.error(
+ "Fetch DataChangeMergeConfig invalid, value={}", dataChangeMergeConfig);
+ return false;
+ }
+
+ SwitchStorage update = new SwitchStorage(data.getVersion(), dataChangeMergeConfig);
+ if (!compareAndSet(expect, update)) {
+ return false;
+ }
+
+ dataChangeEventCenter.setDataChangeConfig(dataChangeMergeConfig);
+
+ LOGGER.info(
+ "Fetch DataChangeMergeConfig success, prev={}, current={}",
+ expect.dataChangeMergeConfig,
+ dataChangeMergeConfig);
+ return true;
+ }
+
+ protected static class SwitchStorage extends SystemDataStorage {
+ protected final DataChangeMergeConfig dataChangeMergeConfig;
+
+ public SwitchStorage(long version, DataChangeMergeConfig dataChangeMergeConfig) {
+ super(version);
+ this.dataChangeMergeConfig = dataChangeMergeConfig;
+ }
+ }
+ @VisibleForTesting
+ public FetchDataChangeMergerConfigService setDataServerConfig(DataServerConfig dataServerConfig) {
+ this.dataServerConfig = dataServerConfig;
+ return this;
+ }
+ @VisibleForTesting
+ public FetchDataChangeMergerConfigService setDataChangeEventCenter(DataChangeEventCenter dataChangeEventCenter) {
+ this.dataChangeEventCenter = dataChangeEventCenter;
+ return this;
+ }
+
+
+}
diff --git a/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenterTest.java b/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenterTest.java
index 6a4cc9687..a0a86c98a 100644
--- a/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenterTest.java
+++ b/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/change/DataChangeEventCenterTest.java
@@ -90,7 +90,7 @@ public void testHandleChangeNotInit() {
Assert.assertFalse(
center.handleChanges(
- center.transferChangeEvent(dataServerConfig.getNotifyMaxItems()),
+ center.transferChangeEvent(dataServerConfig.getNotifyMaxItems(), false),
NodeType.SESSION,
dataServerConfig.getNotifyPort(),
true));
@@ -104,7 +104,7 @@ public void testHandleChangeNotInit() {
center.onChange(changes1, DataChangeType.PUT, DC);
Assert.assertFalse(
center.handleChanges(
- center.transferChangeEvent(dataServerConfig.getNotifyMaxItems()),
+ center.transferChangeEvent(dataServerConfig.getNotifyMaxItems(), false),
NodeType.SESSION,
dataServerConfig.getNotifyPort(),
true));
@@ -126,7 +126,7 @@ public void testHandleChangeNotInit() {
channelsMap.put("localhost", Lists.newArrayList(channel));
Assert.assertTrue(
center.handleChanges(
- center.transferChangeEvent(dataServerConfig.getNotifyMaxItems()),
+ center.transferChangeEvent(dataServerConfig.getNotifyMaxItems(), false),
NodeType.SESSION,
dataServerConfig.getNotifyPort(),
true));
@@ -137,7 +137,7 @@ public void testHandleChangeNotInit() {
// npe
Assert.assertTrue(
center.handleChanges(
- center.transferChangeEvent(dataServerConfig.getNotifyMaxItems()),
+ center.transferChangeEvent(dataServerConfig.getNotifyMaxItems(), false),
NodeType.SESSION,
dataServerConfig.getNotifyPort(),
true));
@@ -148,7 +148,7 @@ public void testHandleChangeNotInit() {
double pre = ChangeMetrics.CHANGE_SKIP_COUNTER.get();
Assert.assertTrue(
center.handleChanges(
- center.transferChangeEvent(dataServerConfig.getNotifyMaxItems()),
+ center.transferChangeEvent(dataServerConfig.getNotifyMaxItems(), false),
NodeType.SESSION,
dataServerConfig.getNotifyPort(),
true));
@@ -219,14 +219,14 @@ public void testTempNotify() {
@Test
public void testHandleExpire_npe() {
initHandleExpire();
- center.handleExpire();
+ center.handleExpire(NodeType.SESSION);
}
@Test
public void testHandleExpire_reject() {
initHandleExpire();
center.setNotifyExecutor(TestBaseUtils.rejectExecutor());
- center.handleExpire();
+ center.handleExpire(NodeType.SESSION);
}
private void initHandleExpire() {
@@ -250,14 +250,14 @@ private void initHandleExpire() {
DC,
Collections.singletonMap(String.valueOf(i + 100), new DatumVersion(200))));
}
- List expires = center.getExpires();
+ List expires = center.getExpires(NodeType.SESSION);
Assert.assertTrue(expires.isEmpty());
// is full, make expire now
dataServerConfig.setNotifyRetryBackoffMillis(0);
for (DataChangeEventCenter.ChangeNotifier n : list) {
center.commitRetry(n);
}
- expires = center.getExpires();
+ expires = center.getExpires(NodeType.SESSION);
Assert.assertEquals(expires.size(), list.size());
Assert.assertArrayEquals(expires.toArray(), list.toArray());
for (DataChangeEventCenter.ChangeNotifier n : list) {
@@ -305,13 +305,13 @@ public void testTransfer() {
setCenter();
List changes1 = Lists.newArrayList("1", "2", "3");
center.onChange(changes1, DataChangeType.PUT, DC);
- List events = center.transferChangeEvent(3);
+ List events = center.transferChangeEvent(3, false);
Assert.assertNotNull(events.get(0).toString());
Assert.assertEquals(1, events.size());
assertEvent(events, changes1);
center.onChange(changes1, DataChangeType.PUT, DC);
- events = center.transferChangeEvent(2);
+ events = center.transferChangeEvent(2, false);
Assert.assertEquals(2, events.size());
assertEvent(events, changes1);
}
diff --git a/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigServiceTest.java b/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigServiceTest.java
new file mode 100644
index 000000000..cac8cafba
--- /dev/null
+++ b/server/server/data/src/test/java/com/alipay/sofa/registry/server/data/providedata/FetchDataChangeMergerConfigServiceTest.java
@@ -0,0 +1,59 @@
+package com.alipay.sofa.registry.server.data.providedata;
+
+import com.alipay.sofa.registry.common.model.ServerDataBox;
+import com.alipay.sofa.registry.common.model.constants.ValueConstants;
+import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
+import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
+import com.alipay.sofa.registry.server.data.change.DataChangeEventCenter;
+import junit.framework.TestCase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * @author jiangcun.hlc@antfin.com
+ * @since 2023/4/24
+ */
+public class FetchDataChangeMergerConfigServiceTest {
+
+ FetchDataChangeMergerConfigService fetchDataChangeMergerConfigService;
+
+ @Before
+ public void beforeTest() {
+ fetchDataChangeMergerConfigService = new FetchDataChangeMergerConfigService();
+ fetchDataChangeMergerConfigService
+ .setDataChangeEventCenter(mock(DataChangeEventCenter.class))
+ .setDataServerConfig(mock(DataServerConfig.class));
+ }
+
+ @Test
+ public void testDoProcess() {
+ Assert.assertTrue(fetchDataChangeMergerConfigService.getSystemPropertyIntervalMillis() == 0);
+ Assert.assertTrue(
+ fetchDataChangeMergerConfigService.doProcess(
+ fetchDataChangeMergerConfigService.getStorage().get(),
+ new ProvideData(
+ new ServerDataBox(
+ ""),
+ ValueConstants.DATA_MERGER_TASK_DELAY_CONFIG_DATA_ID,
+ 0L)));
+ Assert.assertFalse(
+ fetchDataChangeMergerConfigService.doProcess(
+ new FetchDataChangeMergerConfigService.SwitchStorage(0L, null),
+ new ProvideData(
+ new ServerDataBox(
+ "{\"changeDebouncingMillis\":1000,\"changeDebouncingMaxMillis\":3000,\"changeTaskWaitingMillis\":100,\"pushTaskWaitingMillis\":0,\"pushTaskDebouncingMillis\":500,\"regWorkWaitingMillis\":200,\"zoneSet\":[\"ALL_ZONE\"]}"),
+ ValueConstants.CHANGE_PUSH_TASK_DELAY_CONFIG_DATA_ID,
+ 0L)));
+ Assert.assertTrue(
+ fetchDataChangeMergerConfigService.doProcess(
+ fetchDataChangeMergerConfigService.getStorage().get(),
+ new ProvideData(
+ new ServerDataBox(
+ "{\"dataChangeMergeDelay\":1000,\"multiDataChangeMergeDelay\":200}"),
+ ValueConstants.CHANGE_PUSH_TASK_DELAY_CONFIG_DATA_ID,
+ 2L)));
+ }
+}
\ No newline at end of file
diff --git a/server/server/integration/pom.xml b/server/server/integration/pom.xml
index ebd309986..712715597 100644
--- a/server/server/integration/pom.xml
+++ b/server/server/integration/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-server
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/server/meta/pom.xml b/server/server/meta/pom.xml
index 9e74f7e32..291865692 100644
--- a/server/server/meta/pom.xml
+++ b/server/server/meta/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-server
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/server/pom.xml b/server/server/pom.xml
index dcbb33ff7..f96c54223 100644
--- a/server/server/pom.xml
+++ b/server/server/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-server-parent
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/server/session/pom.xml b/server/server/session/pom.xml
index 67e04c9e9..c48f7ff04 100644
--- a/server/server/session/pom.xml
+++ b/server/server/session/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-server
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushMetrics.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushMetrics.java
index fb33a0ffc..525391f87 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushMetrics.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushMetrics.java
@@ -119,7 +119,7 @@ static final class Push {
.register();
private static final Histogram PUSH_DELAY_HISTOGRAM =
Histogram.build()
- .linearBuckets(0, 1000, 30)
+ .linearBuckets(0, 1000, 10)
.namespace("session")
.subsystem("push")
.name("push_delay")
diff --git a/server/server/shared/pom.xml b/server/server/shared/pom.xml
index 030c0d940..1614e89b8 100644
--- a/server/server/shared/pom.xml
+++ b/server/server/shared/pom.xml
@@ -5,7 +5,7 @@
registry-server
com.alipay.sofa
- 6.3.0-SNAPSHOT
+ 6.3.0
4.0.0
diff --git a/server/store/api/pom.xml b/server/store/api/pom.xml
index b380a464d..eb6060816 100644
--- a/server/store/api/pom.xml
+++ b/server/store/api/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-store
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/server/store/jdbc/pom.xml b/server/store/jdbc/pom.xml
index e49aaeb71..b55a72d6b 100644
--- a/server/store/jdbc/pom.xml
+++ b/server/store/jdbc/pom.xml
@@ -6,7 +6,7 @@
com.alipay.sofa
registry-store
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
diff --git a/server/store/jraft/pom.xml b/server/store/jraft/pom.xml
index 57254ebe2..bc5d63988 100644
--- a/server/store/jraft/pom.xml
+++ b/server/store/jraft/pom.xml
@@ -6,7 +6,7 @@
com.alipay.sofa
registry-store
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
diff --git a/server/store/pom.xml b/server/store/pom.xml
index b0487b597..d93818f6f 100644
--- a/server/store/pom.xml
+++ b/server/store/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-server-parent
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0
diff --git a/test/pom.xml b/test/pom.xml
index 70eb2b3de..cb72d0ec4 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -5,7 +5,7 @@
com.alipay.sofa
registry-parent
- 6.3.0-SNAPSHOT
+ 6.3.0
../pom.xml
4.0.0