From 66acc324b0ae8e8e18bcd829cd27908cba4103bf Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 29 Jun 2026 10:23:16 +0800 Subject: [PATCH 1/2] Avoid login lock for pipe auth retry --- .../protocol/legacy/IoTDBLegacyPipeSink.java | 7 +- .../task/subtask/PipeAbstractSinkSubtask.java | 9 +- .../task/subtask/PipeReportableSubtask.java | 40 ++++++++- .../sink/client/IoTDBSyncClientManager.java | 6 +- .../pipe/task/PipeSleepIntervalTest.java | 86 +++++++++++++++---- 5 files changed, 127 insertions(+), 21 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index c0a9eb6a79d65..2454484168817 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -278,8 +278,11 @@ private void openClientSession() throws TException { if (openSessionResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { final String errorMsg = String.format( - "Failed to login to receiver %s:%s for legacy pipe transfer because %s", - ipAddress, port, openSessionResp.getStatus().getMessage()); + "Failed to login to receiver %s:%s for legacy pipe transfer because code: %d, message: %s", + ipAddress, + port, + openSessionResp.getStatus().getCode(), + openSessionResp.getStatus().getMessage()); LOGGER.warn(errorMsg); throw new PipeRuntimeCriticalException(errorMsg); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java index 9dbf5af2d0ce2..610fc12ff4e2c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java @@ -177,7 +177,7 @@ private boolean onPipeConnectionException(final Throwable throwable) { MAX_RETRY_TIMES, e); try { - sleepIfNoHighPriorityTask(retry * PipeConfig.getInstance().getPipeSinkRetryIntervalMs()); + sleepIfNoHighPriorityTask(getHandshakeRetrySleepInterval(e, retry)); } catch (final InterruptedException interruptedException) { LOGGER.info( PipeMessages.INTERRUPTED_WHILE_SLEEPING_RETRY_HANDSHAKE, interruptedException); @@ -218,6 +218,13 @@ private boolean onPipeConnectionException(final Throwable throwable) { return false; } + private long getHandshakeRetrySleepInterval(final Throwable throwable, final int retry) { + final long defaultInterval = retry * PipeConfig.getInstance().getPipeSinkRetryIntervalMs(); + return isAuthenticationFailure(throwable) + ? Math.max(defaultInterval, AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS) + : defaultInterval; + } + /** * Submit a {@link PipeSubtask} to the executor to keep it running. Note that the function will be * called when connector starts or the subTask finishes the last round, Thus the {@link diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java index 52bc3bd61f811..51693d944f673 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java @@ -27,15 +27,33 @@ import org.apache.iotdb.commons.i18n.PipeMessages; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; public abstract class PipeReportableSubtask extends PipeSubtask { private static final Logger LOGGER = LoggerFactory.getLogger(PipeReportableSubtask.class); + private static final long DEFAULT_LOGIN_LOCK_WINDOW_MS = TimeUnit.MINUTES.toMillis(10); + private static final int DEFAULT_LOGIN_LOCK_FAILED_ATTEMPTS = 5; + private static final int AUTHENTICATION_FAILURE_IMMEDIATE_ATTEMPTS = 2; + protected static final long AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS = + DEFAULT_LOGIN_LOCK_WINDOW_MS + / (DEFAULT_LOGIN_LOCK_FAILED_ATTEMPTS - AUTHENTICATION_FAILURE_IMMEDIATE_ATTEMPTS) + + TimeUnit.SECONDS.toMillis(1); + private static final Pattern AUTHENTICATION_FAILURE_STATUS_CODE_PATTERN = + Pattern.compile( + String.format( + "(?i)(?:\\b(?:code|status code)\\s*[:=]\\s*(?:%d|%d)\\b|\\b(?:%d|%d):|\\b(?:WRONG_LOGIN_PASSWORD|USER_LOGIN_LOCKED)\\b)", + TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode(), + TSStatusCode.USER_LOGIN_LOCKED.getStatusCode(), + TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode(), + TSStatusCode.USER_LOGIN_LOCKED.getStatusCode())); // To ensure that high-priority tasks can obtain object locks first, a counter is now used to save // the number of high-priority tasks. protected final AtomicLong highPriorityLockTaskCount = new AtomicLong(0); @@ -65,7 +83,7 @@ public synchronized void onFailure(final Throwable throwable) { // is dropped or the process is running normally. } - private long getSleepIntervalBasedOnThrowable(final Throwable throwable) { + protected long getSleepIntervalBasedOnThrowable(final Throwable throwable) { long sleepInterval = Math.min(1000L * retryCount.get(), 10000); // if receiver is read-only/internal-error/write-reject, connector will retry with // power-increasing interval @@ -76,9 +94,24 @@ private long getSleepIntervalBasedOnThrowable(final Throwable throwable) { sleepInterval = 1000L * retryCount.get() * retryCount.get(); } } + if (isAuthenticationFailure(throwable)) { + sleepInterval = Math.max(sleepInterval, AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS); + } return sleepInterval; } + protected static boolean isAuthenticationFailure(final Throwable throwable) { + Throwable current = throwable; + while (current != null) { + final String message = current.getMessage(); + if (message != null && AUTHENTICATION_FAILURE_STATUS_CODE_PATTERN.matcher(message).find()) { + return true; + } + current = current.getCause(); + } + return false; + } + private void onReportEventFailure(final Throwable throwable) { final int maxRetryTimes = throwable instanceof PipeRuntimeSinkRetryTimesConfigurableException @@ -199,10 +232,13 @@ protected void preScheduleLowPriorityTask(int maxRetries) { } protected void sleepIfNoHighPriorityTask(long sleepMillis) throws InterruptedException { + if (sleepMillis <= 0) { + return; + } synchronized (highPriorityLockTaskCount) { // The wait operation will release the highPriorityLockTaskCount lock, so there will be // no deadlock. - if (highPriorityLockTaskCount.get() > 0) { + if (highPriorityLockTaskCount.get() == 0) { highPriorityLockTaskCount.wait(sleepMillis); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java index ff62dbf477b7a..c6ad14aac4dc0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java @@ -270,7 +270,11 @@ public void sendHandshakeReq(final Pair clientAndStatu client.getIpAddress(), client.getPort(), resp.getStatus()); - endPoint2HandshakeErrorMessage.put(client.getEndPoint(), resp.getStatus().getMessage()); + endPoint2HandshakeErrorMessage.put( + client.getEndPoint(), + String.format( + "code: %d, message: %s", + resp.getStatus().getCode(), resp.getStatus().getMessage())); } else { clientAndStatus.setRight(true); client.setTimeout(CONNECTION_TIMEOUT_MS.get()); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java index 07ae50e992e6b..a9b72391b2b75 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java @@ -25,13 +25,48 @@ import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.concurrent.TimeUnit; + public class PipeSleepIntervalTest { + private static class TestSinkSubtask extends PipeAbstractSinkSubtask { + + TestSinkSubtask() { + super(null, 0, null); + } + + @Override + protected String getRootCause(Throwable throwable) { + return null; + } + + @Override + protected void report(EnrichedEvent event, PipeRuntimeException exception) {} + + @Override + protected boolean executeOnce() { + return false; + } + + long getSleepInterval(final Throwable throwable) { + return getSleepIntervalBasedOnThrowable(throwable); + } + + boolean isAuthenticationFailureException(final Throwable throwable) { + return isAuthenticationFailure(throwable); + } + + void sleepWithoutHighPriorityTask(final long sleepMillis) throws InterruptedException { + sleepIfNoHighPriorityTask(sleepMillis); + } + } + private long oldPipeSinkSubtaskSleepIntervalInitMs; private long oldPipeSinkSubtaskSleepIntervalMaxMs; @@ -53,21 +88,7 @@ public void tearDown() throws Exception { @Test public void test() { - try (final PipeAbstractSinkSubtask subtask = - new PipeAbstractSinkSubtask(null, 0, null) { - @Override - protected String getRootCause(Throwable throwable) { - return null; - } - - @Override - protected void report(EnrichedEvent event, PipeRuntimeException exception) {} - - @Override - protected boolean executeOnce() { - return false; - } - }) { + try (final TestSinkSubtask subtask = new TestSinkSubtask()) { long startTime = System.currentTimeMillis(); subtask.sleep4NonReportException(); Assert.assertTrue( @@ -80,4 +101,39 @@ protected boolean executeOnce() { >= PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs()); } } + + @Test + public void testAuthenticationFailureRetryInterval() { + try (final TestSinkSubtask subtask = new TestSinkSubtask()) { + Assert.assertTrue( + subtask.isAuthenticationFailureException( + new PipeConnectionException( + "Handshake error with receiver 127.0.0.1:6667, code: 801, message: Authentication failed."))); + Assert.assertTrue( + subtask.isAuthenticationFailureException( + new PipeConnectionException("801: Failed to check password for pipe a2b."))); + Assert.assertTrue( + subtask.isAuthenticationFailureException( + new PipeConnectionException("status code: 822, message: Account is blocked."))); + Assert.assertFalse( + subtask.isAuthenticationFailureException( + new PipeConnectionException("Network error 801 bytes sent."))); + + final long authenticationFailureRetryInterval = + subtask.getSleepInterval(new PipeConnectionException("code: 801")); + Assert.assertTrue(authenticationFailureRetryInterval > TimeUnit.MINUTES.toMillis(3)); + Assert.assertTrue(authenticationFailureRetryInterval * 3 > TimeUnit.MINUTES.toMillis(10)); + Assert.assertTrue( + subtask.getSleepInterval(new PipeConnectionException("network error")) <= 10000); + } + } + + @Test + public void testSleepIfNoHighPriorityTaskWaits() throws Exception { + try (final TestSinkSubtask subtask = new TestSinkSubtask()) { + final long startTime = System.currentTimeMillis(); + subtask.sleepWithoutHighPriorityTask(20L); + Assert.assertTrue(System.currentTimeMillis() - startTime >= 15L); + } + } } From 471eb0ae13b7aa09b5c046aa70cfed6006da57db Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 1 Jul 2026 10:54:05 +0800 Subject: [PATCH 2/2] Fix legacy pipe login message i18n --- .../i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java | 2 ++ .../i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java | 2 ++ .../iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 050de467f2687..4119b98b726be 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -825,6 +825,8 @@ public final class DataNodePipeMessages { + "PipeRawTabletInsertionEvent. Ignore {}."; public static final String IOTDBDATAREGIONAIRGAPCONNECTOR_ONLY_SUPPORT_PIPETSFILEINSERTIONEVENT_IGNORE = "IoTDBDataRegionAirGapConnector only support PipeTsFileInsertionEvent. Ignore {}."; + public static final String FAILED_TO_LOGIN_TO_RECEIVER_FOR_LEGACY_PIPE_TRANSFER = + "Failed to login to receiver %s:%s for legacy pipe transfer because code: %d, message: %s"; public static final String IOTDBLEGACYPIPECONNECTOR_DOES_NOT_SUPPORT_TRANSFERRING_GENERIC_EVENT = "IoTDBLegacyPipeConnector does not support transferring generic event: {}."; public static final String IOTDBLEGACYPIPECONNECTOR_ONLY_SUPPORT_PIPEINSERTNODEINSERTIONEVENT_AND_PIPETABLE = diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 147d977d57269..e14f0b3eb2eb6 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -791,6 +791,8 @@ public final class DataNodePipeMessages { + "PipeRawTabletInsertionEvent. Ignore {}."; public static final String IOTDBDATAREGIONAIRGAPCONNECTOR_ONLY_SUPPORT_PIPETSFILEINSERTIONEVENT_IGNORE = "IoTDBDataRegionAirGapConnector only support PipeTsFileInsertionEvent. Ignore {}."; + public static final String FAILED_TO_LOGIN_TO_RECEIVER_FOR_LEGACY_PIPE_TRANSFER = + "登录 receiver %s:%s for legacy pipe transfer 失败,原因:code: %d, message: %s"; public static final String IOTDBLEGACYPIPECONNECTOR_DOES_NOT_SUPPORT_TRANSFERRING_GENERIC_EVENT = "IoTDBLegacyPipeConnector 不支持 transferring generic event: {}."; public static final String IOTDBLEGACYPIPECONNECTOR_ONLY_SUPPORT_PIPEINSERTNODEINSERTIONEVENT_AND_PIPETABLE = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index 2454484168817..b937331957f03 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -278,7 +278,7 @@ private void openClientSession() throws TException { if (openSessionResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { final String errorMsg = String.format( - "Failed to login to receiver %s:%s for legacy pipe transfer because code: %d, message: %s", + DataNodePipeMessages.FAILED_TO_LOGIN_TO_RECEIVER_FOR_LEGACY_PIPE_TRANSFER, ipAddress, port, openSessionResp.getStatus().getCode(),