From 0589b4594b86eca16d014916c575f6d01cfc11e3 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 30 Jun 2026 16:25:10 +0800 Subject: [PATCH 1/2] Pipe: add ConfigNode memory control for log reducer --- .../confignode/conf/ConfigNodeDescriptor.java | 9 ++ .../conf/ConfigNodeMemoryConfig.java | 96 +++++++++++++++++++ .../runtime/PipeConfigNodeRuntimeAgent.java | 20 +++- .../PipeConfigNodeResourceManager.java | 14 +++ .../conf/ConfigNodeMemoryConfigTest.java | 61 ++++++++++++ .../runtime/PipeDataNodeRuntimeAgent.java | 20 ++++ .../conf/iotdb-system.properties.template | 5 + .../log/PipePeriodicalLogReducer.java | 26 ++++- .../log/PipePeriodicalLogReducerTest.java | 65 +++++++++++++ 9 files changed, 312 insertions(+), 4 deletions(-) create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfig.java create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfigTest.java create mode 100644 iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducerTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 43996574723a2..0220543d113f6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -60,6 +60,8 @@ public class ConfigNodeDescriptor { private final ConfigNodeConfig conf = new ConfigNodeConfig(); + private final ConfigNodeMemoryConfig memoryConfig = new ConfigNodeMemoryConfig(); + static { URL systemConfigUrl = getPropsUrl(CommonConfig.SYSTEM_CONFIG_NAME); URL configNodeUrl = getPropsUrl(CommonConfig.OLD_CONFIG_NODE_CONFIG_NAME); @@ -83,6 +85,10 @@ public ConfigNodeConfig getConf() { return conf; } + public ConfigNodeMemoryConfig getMemoryConfig() { + return memoryConfig; + } + /** * Get props url location. * @@ -148,11 +154,14 @@ private void loadProps() { LOGGER.warn( ConfigNodeMessages.COULDN_T_LOAD_THE_CONFIGURATION_FROM_ANY_OF_THE_KNOWN, CommonConfig.SYSTEM_CONFIG_NAME); + memoryConfig.init(trimProperties); } } private void loadProperties(TrimProperties properties) throws BadNodeUrlException, IOException { ConfigurationFileUtils.updateAppliedProperties(properties, false); + memoryConfig.init(properties); + conf.setClusterName(properties.getProperty(IoTDBConstant.CLUSTER_NAME, conf.getClusterName())); conf.setInternalAddress( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfig.java new file mode 100644 index 0000000000000..d4e4fe20929fd --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfig.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.conf; + +import org.apache.iotdb.commons.conf.TrimProperties; +import org.apache.iotdb.commons.memory.MemoryConfig; +import org.apache.iotdb.commons.memory.MemoryManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConfigNodeMemoryConfig { + public static final String PIPE_MEMORY_MANAGER_NAME = "Pipe"; + + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNodeMemoryConfig.class); + + /** The memory manager of on heap. */ + private MemoryManager onHeapMemoryManager; + + /** Memory manager for the pipe. */ + private MemoryManager pipeMemoryManager; + + public void init(final TrimProperties properties) { + String memoryAllocateProportion = properties.getProperty("confignode_memory_proportion", null); + if (memoryAllocateProportion == null) { + memoryAllocateProportion = properties.getProperty("config_node_memory_proportion", null); + if (memoryAllocateProportion != null) { + LOGGER.warn( + "The parameter config_node_memory_proportion is deprecated, " + + "please use confignode_memory_proportion instead."); + } + } + + final long maxMemoryAvailable = Runtime.getRuntime().maxMemory(); + long pipeMemorySize = maxMemoryAvailable / 10; + long freeMemorySize = maxMemoryAvailable - pipeMemorySize; + + if (memoryAllocateProportion != null) { + final String[] proportions = memoryAllocateProportion.split(":"); + if (proportions.length >= 2) { + int proportionSum = 0; + for (final String proportion : proportions) { + proportionSum += Integer.parseInt(proportion.trim()); + } + + if (proportionSum != 0) { + pipeMemorySize = + maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum; + freeMemorySize = maxMemoryAvailable - pipeMemorySize; + } + } else { + LOGGER.warn( + "The parameter confignode_memory_proportion should be in the form of Pipe:Free, " + + "but got {}. Use default value 1:9.", + memoryAllocateProportion); + } + } + + onHeapMemoryManager = + MemoryConfig.global().getOrCreateMemoryManager("ConfigNodeOnHeap", maxMemoryAvailable); + pipeMemoryManager = + onHeapMemoryManager.getOrCreateMemoryManager(PIPE_MEMORY_MANAGER_NAME, pipeMemorySize); + // Keep the rest of ConfigNode heap unconnected for now. The memory framework currently only + // serves PipePeriodicalLogReducer on ConfigNode. + + LOGGER.info( + "initial ConfigNode allocateMemoryForPipe = {}", + pipeMemoryManager.getTotalMemorySizeInBytes()); + LOGGER.info("initial ConfigNode freeMemory = {}", freeMemorySize); + } + + public MemoryManager getOnHeapMemoryManager() { + return onHeapMemoryManager; + } + + public MemoryManager getPipeMemoryManager() { + return pipeMemoryManager; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java index 4726b50d899c3..48fc6d98e14e1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; +import org.apache.iotdb.commons.memory.IMemoryBlock; import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor; import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; @@ -33,6 +34,7 @@ import org.apache.iotdb.confignode.i18n.ManagerMessages; import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent; import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeCopiedFileDirStartupCleaner; +import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager; import org.apache.iotdb.confignode.manager.pipe.source.ConfigRegionListeningQueue; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -58,7 +60,7 @@ public class PipeConfigNodeRuntimeAgent implements IService { @Override public synchronized void start() { PipeConfig.getInstance().printAllConfigs(); - PipeLogger.setLogger(PipePeriodicalLogReducer::log); + initPipePeriodicalLogReducer(); // PipeTasks will not be started here and will be started by "HandleLeaderChange" // procedure when the consensus layer notify leader ready @@ -95,6 +97,22 @@ public synchronized void stop() { LOGGER.info(ManagerMessages.PIPERUNTIMECONFIGNODEAGENT_STOPPED); } + private void initPipePeriodicalLogReducer() { + final IMemoryBlock pipeLogReducerMemoryBlock = PipeConfigNodeResourceManager.logReducerMemory(); + PipePeriodicalLogReducer.setMemoryResizeFunction( + targetSizeInBytes -> { + final long nonNegativeTargetSizeInBytes = Math.max(0, targetSizeInBytes); + final long oldSizeInBytes = pipeLogReducerMemoryBlock.getUsedMemoryInBytes(); + if (oldSizeInBytes < nonNegativeTargetSizeInBytes) { + pipeLogReducerMemoryBlock.allocate(nonNegativeTargetSizeInBytes - oldSizeInBytes); + } else if (oldSizeInBytes > nonNegativeTargetSizeInBytes) { + pipeLogReducerMemoryBlock.release(oldSizeInBytes - nonNegativeTargetSizeInBytes); + } + return pipeLogReducerMemoryBlock.getUsedMemoryInBytes(); + }); + PipeLogger.setLogger(PipePeriodicalLogReducer::log); + } + public boolean isShutdown() { return isShutdown.get(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java index 33b68f63821aa..53e3160b95239 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java @@ -19,15 +19,19 @@ package org.apache.iotdb.confignode.manager.pipe.resource; +import org.apache.iotdb.commons.memory.IMemoryBlock; +import org.apache.iotdb.commons.memory.MemoryBlockType; import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager; import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager; import org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.pipe.resource.ref.PipeConfigNodePhantomReferenceManager; import org.apache.iotdb.confignode.manager.pipe.resource.snapshot.PipeConfigNodeSnapshotResourceManager; public class PipeConfigNodeResourceManager { private final PipeSnapshotResourceManager pipeSnapshotResourceManager; + private final IMemoryBlock pipeLogReducerMemoryBlock; private final PipeLogManager pipeLogManager; private final PipePhantomReferenceManager pipePhantomReferenceManager; @@ -36,6 +40,11 @@ public static PipeSnapshotResourceManager snapshot() { .pipeSnapshotResourceManager; } + public static IMemoryBlock logReducerMemory() { + return PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE + .pipeLogReducerMemoryBlock; + } + public static PipeLogManager log() { return PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE.pipeLogManager; } @@ -48,6 +57,11 @@ public static PipePhantomReferenceManager ref() { private PipeConfigNodeResourceManager() { pipeSnapshotResourceManager = new PipeConfigNodeSnapshotResourceManager(); + pipeLogReducerMemoryBlock = + ConfigNodeDescriptor.getInstance() + .getMemoryConfig() + .getPipeMemoryManager() + .exactAllocate("PipePeriodicalLogReducer", MemoryBlockType.DYNAMIC); pipeLogManager = new PipeLogManager(); pipePhantomReferenceManager = new PipeConfigNodePhantomReferenceManager(); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfigTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfigTest.java new file mode 100644 index 0000000000000..43fd1dc1b6fe8 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfigTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.conf; + +import org.apache.iotdb.commons.conf.TrimProperties; +import org.apache.iotdb.commons.memory.MemoryConfig; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ConfigNodeMemoryConfigTest { + + private static final String ON_HEAP_MEMORY_MANAGER_NAME = "ConfigNodeOnHeap"; + + @Before + public void setUp() { + MemoryConfig.global().releaseChildMemoryManager(ON_HEAP_MEMORY_MANAGER_NAME); + } + + @After + public void tearDown() { + MemoryConfig.global().releaseChildMemoryManager(ON_HEAP_MEMORY_MANAGER_NAME); + } + + @Test + public void testConfigNodeMemoryFrameworkOnlyCreatesPipeMemoryManager() { + final TrimProperties properties = new TrimProperties(); + properties.setProperty("confignode_memory_proportion", "1:3"); + + final ConfigNodeMemoryConfig memoryConfig = new ConfigNodeMemoryConfig(); + memoryConfig.init(properties); + + Assert.assertEquals( + Runtime.getRuntime().maxMemory() / 4, + memoryConfig.getPipeMemoryManager().getTotalMemorySizeInBytes()); + Assert.assertNotNull( + memoryConfig + .getOnHeapMemoryManager() + .getMemoryManager(ConfigNodeMemoryConfig.PIPE_MEMORY_MANAGER_NAME)); + Assert.assertNull(memoryConfig.getOnHeapMemoryManager().getMemoryManager("Free")); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java index 80c7f16a0b735..ed7f07d6ade69 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java @@ -43,6 +43,8 @@ import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; @@ -76,6 +78,8 @@ public class PipeDataNodeRuntimeAgent implements IService { private final PipePeriodicalPhantomReferenceCleaner pipePeriodicalPhantomReferenceCleaner = new PipePeriodicalPhantomReferenceCleaner(); + private PipeMemoryBlock pipeLogReducerMemoryBlock; + //////////////////////////// System Service Interface //////////////////////////// public synchronized void preparePipeResources( @@ -91,6 +95,22 @@ public synchronized void preparePipeResources( IoTDBTreePattern.setDevicePathGetter(PipeDataNodeRuntimeAgent::getPath); IoTDBTreePattern.setMeasurementPathGetter(PipeDataNodeRuntimeAgent::getPath); + initPipePeriodicalLogReducer(); + } + + private void initPipePeriodicalLogReducer() { + if (pipeLogReducerMemoryBlock == null) { + pipeLogReducerMemoryBlock = + PipeDataNodeResourceManager.memory() + .tryAllocate(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes()); + } + + PipePeriodicalLogReducer.setMemoryResizeFunction( + targetSizeInBytes -> { + PipeDataNodeResourceManager.memory() + .resize(pipeLogReducerMemoryBlock, Math.max(0, targetSizeInBytes), false); + return pipeLogReducerMemoryBlock.getMemoryUsageInBytes(); + }); PipeLogger.setLogger(PipePeriodicalLogReducer::log); } diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index f176c0e1761bb..473aa7dc75994 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -800,6 +800,11 @@ partition_table_recover_max_read_megabytes_per_second=10 # effectiveMode: restart datanode_memory_proportion=3:3:1:1:1:1 +# ConfigNode Memory Allocation Ratio: Pipe and Free Memory. +# The parameter form is a:b, where a and b are integers. Currently, only PipePeriodicalLogReducer is connected to Pipe memory on ConfigNode. +# effectiveMode: restart +confignode_memory_proportion=1:9 + # Schema Memory Allocation Ratio: SchemaRegion, SchemaCache, and PartitionCache. # The parameter form is a:b:c, where a, b and c are integers. for example: 1:1:1 , 6:2:1 # effectiveMode: restart diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java index 30074f90ebf9b..8063b6e55daf3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java @@ -24,11 +24,20 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.tsfile.utils.RamUsageEstimator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.LongUnaryOperator; public class PipePeriodicalLogReducer { + private static final Logger LOGGER = LoggerFactory.getLogger(PipePeriodicalLogReducer.class); + + private static final LongUnaryOperator DEFAULT_MEMORY_RESIZE_FUNCTION = + sizeInBytes -> sizeInBytes; + + private static volatile LongUnaryOperator memoryResizeFunction = DEFAULT_MEMORY_RESIZE_FUNCTION; protected static final Cache LOGGER_CACHE = Caffeine.newBuilder() @@ -54,11 +63,22 @@ public static boolean log( return false; } - public static void update() { - update(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes()); + public static synchronized void setMemoryResizeFunction( + final LongUnaryOperator memoryResizeFunction) { + PipePeriodicalLogReducer.memoryResizeFunction = + memoryResizeFunction == null ? DEFAULT_MEMORY_RESIZE_FUNCTION : memoryResizeFunction; + update(); + } + + public static synchronized void update() { + final long maxWeight = + memoryResizeFunction.applyAsLong( + PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes()); + LOGGER.info("PipePeriodicalLogReducer is allocated to {} bytes.", maxWeight); + update(maxWeight); } - public static void update(final long maxWeight) { + public static synchronized void update(final long maxWeight) { LOGGER_CACHE .policy() .expireAfterWrite() diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducerTest.java new file mode 100644 index 0000000000000..d390cbb9fddb7 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducerTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.resource.log; + +import org.apache.iotdb.commons.pipe.config.PipeConfig; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class PipePeriodicalLogReducerTest { + + @After + public void tearDown() { + PipePeriodicalLogReducer.setMemoryResizeFunction(null); + } + + @Test + public void testLogReducesDuplicateMessages() { + final AtomicInteger logCount = new AtomicInteger(0); + final String message = "PipePeriodicalLogReducerTest-" + System.nanoTime(); + + Assert.assertTrue(PipePeriodicalLogReducer.log(log -> logCount.incrementAndGet(), message)); + Assert.assertFalse(PipePeriodicalLogReducer.log(log -> logCount.incrementAndGet(), message)); + Assert.assertEquals(1, logCount.get()); + } + + @Test + public void testUpdateUsesMemoryResizeFunction() { + final AtomicLong requestedSizeInBytes = new AtomicLong(-1); + final long allocatedSizeInBytes = 1024; + + PipePeriodicalLogReducer.setMemoryResizeFunction( + sizeInBytes -> { + requestedSizeInBytes.set(sizeInBytes); + return allocatedSizeInBytes; + }); + + Assert.assertEquals( + PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes(), requestedSizeInBytes.get()); + Assert.assertEquals( + allocatedSizeInBytes, + PipePeriodicalLogReducer.LOGGER_CACHE.policy().eviction().get().getMaximum()); + } +} From e5e4e1d245c940a16047f7eca5f3d3ba4ce0ab25 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 1 Jul 2026 11:20:45 +0800 Subject: [PATCH 2/2] Address ConfigNode memory config review --- .../confignode/i18n/ConfigNodeMessages.java | 7 +++++++ .../confignode/i18n/ConfigNodeMessages.java | 7 +++++++ .../conf/ConfigNodeMemoryConfig.java | 19 ++++++------------- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index cea25afa3c9a4..8b1027f5e3797 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -79,6 +79,13 @@ public final class ConfigNodeMessages { public static final String CONFIGNODE_EXITING = "ConfigNode exiting..."; public static final String CONFIGNODE_NEED_REDIRECT_TO_RETRY = "ConfigNode need redirect to {}, retry {} ..."; + public static final String CONFIGNODE_MEMORY_PROPORTION_SHOULD_BE_IN_THE_FORM_OF_PIPE_FREE = + "The parameter confignode_memory_proportion should be in the form of Pipe:Free, " + + "but got {}. Use default value 1:9."; + public static final String INITIAL_CONFIGNODE_ALLOCATE_MEMORY_FOR_PIPE = + "initial ConfigNode allocateMemoryForPipe = {}"; + public static final String INITIAL_CONFIGNODE_FREE_MEMORY = + "initial ConfigNode freeMemory = {}"; public static final String CONFIGNODE_PORT_CHECK_SUCCESSFUL = "configNode port check successful."; public static final String CONFIGNODE_RPC_SERVICE_FINISHED_TO_REMOVE_AINODE_RESULT = "ConfigNode RPC Service finished to remove AINode, result: {}"; diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index 48ab732199ecc..8763ea7eb5353 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -77,6 +77,13 @@ public final class ConfigNodeMessages { public static final String CONFIGNODE_EXITING = "ConfigNode 正在退出..."; public static final String CONFIGNODE_NEED_REDIRECT_TO_RETRY = "ConfigNode need redirect to {}, retry {} ..."; + public static final String CONFIGNODE_MEMORY_PROPORTION_SHOULD_BE_IN_THE_FORM_OF_PIPE_FREE = + "参数 confignode_memory_proportion 应为 Pipe:Free 格式," + + "但当前值为 {}。将使用默认值 1:9。"; + public static final String INITIAL_CONFIGNODE_ALLOCATE_MEMORY_FOR_PIPE = + "初始化 ConfigNode allocateMemoryForPipe = {}"; + public static final String INITIAL_CONFIGNODE_FREE_MEMORY = + "初始化 ConfigNode freeMemory = {}"; public static final String CONFIGNODE_PORT_CHECK_SUCCESSFUL = "ConfigNode 端口检查成功。"; public static final String CONFIGNODE_RPC_SERVICE_FINISHED_TO_REMOVE_AINODE_RESULT = "ConfigNode RPC Service finished to remove AINode, result: {}"; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfig.java index d4e4fe20929fd..dea5a414784e8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfig.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.TrimProperties; import org.apache.iotdb.commons.memory.MemoryConfig; import org.apache.iotdb.commons.memory.MemoryManager; +import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,15 +39,8 @@ public class ConfigNodeMemoryConfig { private MemoryManager pipeMemoryManager; public void init(final TrimProperties properties) { - String memoryAllocateProportion = properties.getProperty("confignode_memory_proportion", null); - if (memoryAllocateProportion == null) { - memoryAllocateProportion = properties.getProperty("config_node_memory_proportion", null); - if (memoryAllocateProportion != null) { - LOGGER.warn( - "The parameter config_node_memory_proportion is deprecated, " - + "please use confignode_memory_proportion instead."); - } - } + final String memoryAllocateProportion = + properties.getProperty("confignode_memory_proportion", null); final long maxMemoryAvailable = Runtime.getRuntime().maxMemory(); long pipeMemorySize = maxMemoryAvailable / 10; @@ -67,8 +61,7 @@ public void init(final TrimProperties properties) { } } else { LOGGER.warn( - "The parameter confignode_memory_proportion should be in the form of Pipe:Free, " - + "but got {}. Use default value 1:9.", + ConfigNodeMessages.CONFIGNODE_MEMORY_PROPORTION_SHOULD_BE_IN_THE_FORM_OF_PIPE_FREE, memoryAllocateProportion); } } @@ -81,9 +74,9 @@ public void init(final TrimProperties properties) { // serves PipePeriodicalLogReducer on ConfigNode. LOGGER.info( - "initial ConfigNode allocateMemoryForPipe = {}", + ConfigNodeMessages.INITIAL_CONFIGNODE_ALLOCATE_MEMORY_FOR_PIPE, pipeMemoryManager.getTotalMemorySizeInBytes()); - LOGGER.info("initial ConfigNode freeMemory = {}", freeMemorySize); + LOGGER.info(ConfigNodeMessages.INITIAL_CONFIGNODE_FREE_MEMORY, freeMemorySize); } public MemoryManager getOnHeapMemoryManager() {