Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,22 @@ public class PipeDataNodeTaskBuilder {
private static final PipeProcessorSubtaskExecutor PROCESSOR_EXECUTOR =
PipeSubtaskExecutorManager.getInstance().getProcessorExecutor();

protected final Map<String, String> systemParameters = new HashMap<>();

public PipeDataNodeTaskBuilder(
final PipeStaticMeta pipeStaticMeta, final int regionId, final PipeTaskMeta pipeTaskMeta) {
this.pipeStaticMeta = pipeStaticMeta;
this.regionId = regionId;
this.pipeTaskMeta = pipeTaskMeta;
generateSystemParameters();
}

public PipeDataNodeTask build() {
// Event flow: source -> processor -> sink

// Analyzes the PipeParameters to identify potential conflicts.
final PipeParameters sourceParameters =
blendUserAndSystemParameters(pipeStaticMeta.getSourceParameters());
blendUserAndSystemParameters(pipeStaticMeta.getSourceParameters(), pipeTaskMeta);
final PipeParameters sinkParameters =
blendUserAndSystemParameters(pipeStaticMeta.getSinkParameters());
checkConflict(sourceParameters, sinkParameters);
injectParameters(sourceParameters, sinkParameters);
blendUserAndSystemParameters(pipeStaticMeta.getSinkParameters(), pipeTaskMeta);
preprocessParameters(sourceParameters, sinkParameters);

// We first build the source and sink, then build the processor.
final PipeTaskSourceStage sourceStage =
Expand Down Expand Up @@ -125,7 +121,7 @@ public PipeDataNodeTask build() {
new PipeTaskProcessorStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()),
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters(), pipeTaskMeta),
regionId,
sourceStage.getEventSupplier(),
sinkStage.getPipeSinkPendingQueue(),
Expand All @@ -143,22 +139,25 @@ public PipeDataNodeTask build() {
pipeStaticMeta.getPipeName(), regionId, sourceStage, processorStage, sinkStage);
}

private void generateSystemParameters() {
public static PipeParameters blendUserAndSystemParameters(
final PipeParameters userParameters, final PipeTaskMeta pipeTaskMeta) {
// Deep copy the user parameters to avoid modification of the original parameters.
// If the original parameters are modified, progress index report will be affected.
final Map<String, String> blendedParameters = new HashMap<>(userParameters.getAttribute());
if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)
|| pipeTaskMeta.isNewlyAdded()) {
systemParameters.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, Boolean.TRUE.toString());
blendedParameters.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, Boolean.TRUE.toString());
}
return new PipeParameters(blendedParameters);
}

private PipeParameters blendUserAndSystemParameters(final PipeParameters userParameters) {
// Deep copy the user parameters to avoid modification of the original parameters.
// If the original parameters are modified, progress index report will be affected.
final Map<String, String> blendedParameters = new HashMap<>(userParameters.getAttribute());
blendedParameters.putAll(systemParameters);
return new PipeParameters(blendedParameters);
public static void preprocessParameters(
final PipeParameters sourceParameters, final PipeParameters sinkParameters) {
checkConflict(sourceParameters, sinkParameters);
injectParameters(sourceParameters, sinkParameters);
}

private void checkConflict(
private static void checkConflict(
final PipeParameters sourceParameters, final PipeParameters sinkParameters) {
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair;
final boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
Expand Down Expand Up @@ -228,7 +227,7 @@ private void checkConflict(
}
}

private void injectParameters(
private static void injectParameters(
final PipeParameters sourceParameters, final PipeParameters sinkParameters) {
final boolean isSourceExternal =
!BuiltinPipePlugin.BUILTIN_SOURCES.contains(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,62 +69,31 @@ public synchronized String register(
final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
final PipeParameters pipeSinkParameters,
final PipeTaskSinkRuntimeEnvironment environment) {
final String connectorName =
PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters);
final String connectorKey =
connectorName
// Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase
// for matching in `CONNECTOR_CONSTRUCTORS`
.toLowerCase();
final String connectorKey = getConnectorKey(pipeSinkParameters);
PipeEventCommitManager.getInstance()
.register(
environment.getPipeName(),
environment.getCreationTime(),
environment.getRegionId(),
connectorKey);

final boolean isDataRegionSink =
StorageEngine.getInstance()
.getAllDataRegionIds()
.contains(new DataRegionId(environment.getRegionId()))
|| PipeRuntimeMeta.isSourceExternal(environment.getRegionId());

final int sinkNum;
final boolean isDataRegionSink = isDataRegionSink(environment.getRegionId());
final int sinkNum = calculateSinkSubtaskNum(pipeSinkParameters, isDataRegionSink, connectorKey);
boolean realTimeFirst = false;
boolean serializeByRegion = false;
String attributeSortedString = generateAttributeSortedString(pipeSinkParameters);
final String attributeSortedString =
generateAttributeSortedString(pipeSinkParameters, environment.getRegionId());
final String attributeDisplayString = generateAttributeDisplayString(pipeSinkParameters);
if (isDataRegionSink) {
serializeByRegion = PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters);
sinkNum =
serializeByRegion
? 1
: pipeSinkParameters.getIntOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey)
? 1
: PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
realTimeFirst =
pipeSinkParameters.getBooleanOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
attributeSortedString =
serializeByRegion
? "data_region_" + environment.getRegionId() + "_" + attributeSortedString
: "data_" + attributeSortedString;
} else {
// Do not allow parallel tasks for schema region connectors
// to avoid the potential disorder of the schema region data transfer
sinkNum = 1;
attributeSortedString = "schema_" + attributeSortedString;
}
final String attributeDisplayStringWithPrefix =
isDataRegionSink
? serializeByRegion
? PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters)
? "data_region_" + environment.getRegionId() + "_" + attributeDisplayString
: "data_" + attributeDisplayString
: "schema_" + attributeDisplayString;
Expand Down Expand Up @@ -285,6 +254,59 @@ public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue(
.getPendingQueue();
}

public synchronized boolean hasRegisteredSubtasks(
final PipeParameters pipeSinkParameters, final int regionId) {
return attributeSortedString2SubtaskLifeCycleMap.containsKey(
generateAttributeSortedString(pipeSinkParameters, regionId));
}

public static int calculateSinkSubtaskNum(
final PipeParameters pipeSinkParameters, final int regionId) {
final String connectorKey = getConnectorKey(pipeSinkParameters);
return calculateSinkSubtaskNum(pipeSinkParameters, isDataRegionSink(regionId), connectorKey);
}

public static String generateAttributeSortedString(
final PipeParameters pipeSinkParameters, final int regionId) {
final String attributeSortedString = generateAttributeSortedString(pipeSinkParameters);
if (isDataRegionSink(regionId)) {
return PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters)
? "data_region_" + regionId + "_" + attributeSortedString
: "data_" + attributeSortedString;
}
return "schema_" + attributeSortedString;
}

private static String getConnectorKey(final PipeParameters pipeSinkParameters) {
return PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters).toLowerCase();
}

private static boolean isDataRegionSink(final int regionId) {
return StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId))
|| PipeRuntimeMeta.isSourceExternal(regionId);
}

private static int calculateSinkSubtaskNum(
final PipeParameters pipeSinkParameters,
final boolean isDataRegionSink,
final String connectorKey) {
if (!isDataRegionSink) {
// Do not allow parallel tasks for schema region connectors to avoid the potential disorder of
// the schema region data transfer.
return 1;
}
if (PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters)) {
return 1;
}
return pipeSinkParameters.getIntOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey)
? 1
: PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
}

private static String generateAttributeSortedString(
final PipeParameters pipeConnectorParameters) {
final TreeMap<String, String> sortedStringSourceMap =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.agent.task;

import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;

import org.junit.Assert;
import org.junit.Test;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

public class PipeDataNodeTaskAgentTest {

@Test
public void testCreateMemoryCheckStillRunsWhenNoPipeTasksNeedToBeCreated() throws Exception {
final boolean originalPipeEnableMemoryCheck =
CommonDescriptor.getInstance().getConfig().isPipeEnableMemoryChecked();
final long originalPipeInsertNodeQueueMemory =
CommonDescriptor.getInstance().getConfig().getPipeInsertNodeQueueMemory();
final double originalPipeTotalFloatingMemoryProportion =
CommonDescriptor.getInstance().getConfig().getPipeTotalFloatingMemoryProportion();

try {
CommonDescriptor.getInstance().getConfig().setIsPipeEnableMemoryChecked(true);
CommonDescriptor.getInstance().getConfig().setPipeInsertNodeQueueMemory(1);
CommonDescriptor.getInstance().getConfig().setPipeTotalFloatingMemoryProportion(0);

Assert.assertThrows(
PipeException.class,
() ->
PipeDataNodeAgent.task()
.calculateMemoryUsage(
new PipeMeta(
new PipeStaticMeta(
"p", 1L, new HashMap<>(), new HashMap<>(), new HashMap<>()),
new PipeRuntimeMeta())));
} finally {
CommonDescriptor.getInstance()
.getConfig()
.setIsPipeEnableMemoryChecked(originalPipeEnableMemoryCheck);
CommonDescriptor.getInstance()
.getConfig()
.setPipeInsertNodeQueueMemory(originalPipeInsertNodeQueueMemory);
CommonDescriptor.getInstance()
.getConfig()
.setPipeTotalFloatingMemoryProportion(originalPipeTotalFloatingMemoryProportion);
}
}

@Test
public void testPlainBatchMemoryIncludesLeaderCacheEndpointShards() throws Exception {
final Map<String, String> sinkAttributes = new HashMap<>();
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE);
sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1024");
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6667, 127.0.0.2:6667");
sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY, "127.0.0.3");
sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY, "6667");

Assert.assertEquals(
4 * 1024L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes)));

sinkAttributes.put(
PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY, Boolean.FALSE.toString());
Assert.assertEquals(1024L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes)));
}

@Test
public void testTsFileBatchMemoryIgnoresLeaderCacheEndpointShards() throws Exception {
final Map<String, String> sinkAttributes = new HashMap<>();
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE);
sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "2048");
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6667,127.0.0.2:6667");

Assert.assertEquals(2048L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes)));
}

@Test
public void testPlainBatchMemoryReturnsZeroWhenBatchModeIsDisabled() throws Exception {
final Map<String, String> sinkAttributes = new HashMap<>();
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE);
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, Boolean.FALSE.toString());
sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1024");

Assert.assertEquals(0L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes)));
}

@Test
public void testSendTsFileReadBufferMemoryUsesSinkReadFileBufferSize() throws Exception {
final Map<String, String> sourceAttributes = new HashMap<>();
sourceAttributes.put(PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY, Boolean.FALSE.toString());

final Map<String, String> sinkAttributes = new HashMap<>();
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE);
Assert.assertEquals(
0L,
invokeCalculateSendTsFileReadBufferMemory(
new PipeParameters(sourceAttributes), new PipeParameters(sinkAttributes)));

sinkAttributes.put(
PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE);
Assert.assertEquals(
PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
invokeCalculateSendTsFileReadBufferMemory(
new PipeParameters(sourceAttributes), new PipeParameters(sinkAttributes)));
}

private long invokeCalculateSinkBatchMemory(final PipeParameters sinkParameters)
throws Exception {
final Method method =
PipeDataNodeTaskAgent.class.getDeclaredMethod(
"calculateSinkBatchMemory", PipeParameters.class);
method.setAccessible(true);
return (long) method.invoke(null, sinkParameters);
}

private long invokeCalculateSendTsFileReadBufferMemory(
final PipeParameters sourceParameters, final PipeParameters sinkParameters) throws Exception {
final Method method =
PipeDataNodeTaskAgent.class.getDeclaredMethod(
"calculateSendTsFileReadBufferMemory", PipeParameters.class, PipeParameters.class);
method.setAccessible(true);
return (long) method.invoke(null, sourceParameters, sinkParameters);
}
}
Loading